Source code for poiesis.api.controllers.create_task

"""Controller for creating a task."""

import asyncio
import json
import logging
import uuid
from typing import Any

from kubernetes.client.models import (
    V1ConfigMapKeySelector,
    V1Container,
    V1EnvVar,
    V1EnvVarSource,
    V1Job,
    V1JobSpec,
    V1ObjectMeta,
    V1PodSpec,
    V1PodTemplateSpec,
)

from poiesis.api.constants import get_poiesis_api_constants
from poiesis.api.controllers.interface import InterfaceController
from poiesis.api.exceptions import DBException
from poiesis.api.tes.models import (
    TesCreateTaskResponse,
    TesState,
    TesTask,
)
from poiesis.constants import get_poiesis_constants
from poiesis.core.adaptors.kubernetes.kubernetes import KubernetesAdapter
from poiesis.core.constants import (
    get_configmap_names,
    get_infrastructure_container_security_context,
    get_infrastructure_pod_security_context,
    get_infrastructure_security_volume,
    get_infrastructure_security_volume_mount,
    get_labels,
    get_message_broker_envs,
    get_mongo_envs,
    get_poiesis_core_constants,
    get_secret_names,
    get_security_context_envs,
)
from poiesis.repository.mongo import MongoDBClient
from poiesis.repository.schemas import TaskSchema

constants = get_poiesis_constants()
api_constants = get_poiesis_api_constants()
core_constants = get_poiesis_core_constants()

logger = logging.getLogger(__name__)


[docs] class CreateTaskController(InterfaceController): """Controller for creating a task.""" def __init__(self, db: MongoDBClient, task: TesTask, user_id: str): """Initialize the controller. Args: db: The database client. task: The task to create user_id: User unique identifier """ self.db = db self.task = task self.user_id = user_id self.kubernetes_client = KubernetesAdapter()
[docs] async def execute(self, *args: Any, **kwargs: Any) -> TesCreateTaskResponse: """Execute the controller.""" _task = self._create_dummy_task_document(self.task) try: await self.db.insert_task(_task) except Exception as e: logger.error(f"Failed to create task: {str(e)}") raise DBException( "Failed to create task", ) from e asyncio.create_task(self._create_torc_job()) return TesCreateTaskResponse(id=str(_task.task_id))
async def _create_torc_job(self) -> None: torc_job_name = f"{core_constants.K8s.TORC_PREFIX}-{self.task.id}" try: _ttl = ( int(core_constants.K8s.JOB_TTL) if core_constants.K8s.JOB_TTL else None ) except (ValueError, TypeError): logger.warning( f"Invalid JOB_TTL {core_constants.K8s.JOB_TTL}, falling back to no TTL " "(None).", ) _ttl = None job = V1Job( api_version="batch/v1", kind="Job", metadata=V1ObjectMeta( name=torc_job_name, namespace=core_constants.K8s.K8S_NAMESPACE, labels=get_labels( component=core_constants.K8s.TORC_PREFIX, name=torc_job_name, task_id=str(self.task.id), parent="poiesis-api", ), ), spec=V1JobSpec( backoff_limit=int(core_constants.K8s.BACKOFF_LIMIT), template=V1PodTemplateSpec( metadata=V1ObjectMeta( labels=get_labels( component=core_constants.K8s.TORC_PREFIX, name=torc_job_name, task_id=str(self.task.id), parent="poiesis-api", ), ), spec=V1PodSpec( service_account_name=core_constants.K8s.SERVICE_ACCOUNT_NAME, security_context=get_infrastructure_pod_security_context(), containers=[ V1Container( name=core_constants.K8s.TORC_PREFIX, image=core_constants.K8s.POIESIS_IMAGE, command=["poiesis", "torc", "run"], security_context=get_infrastructure_container_security_context(), args=["--task", json.dumps(self.task.model_dump())], env=list(get_message_broker_envs()) + list(get_mongo_envs()) + list(get_secret_names()) + list(get_configmap_names()) + list(get_security_context_envs()) + [ V1EnvVar( name="POIESIS_IMAGE", value=core_constants.K8s.POIESIS_IMAGE, ), V1EnvVar( name="LOG_LEVEL", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="LOG_LEVEL", ) ), ), V1EnvVar( name="POIESIS_K8S_NAMESPACE", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_K8S_NAMESPACE", ) ), ), V1EnvVar( name="POIESIS_SERVICE_ACCOUNT_NAME", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_SERVICE_ACCOUNT_NAME", ) ), ), V1EnvVar( name="POIESIS_RESTART_POLICY", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_RESTART_POLICY", ) ), ), V1EnvVar( name="POIESIS_IMAGE_PULL_POLICY", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_IMAGE_PULL_POLICY", ) ), ), V1EnvVar( name="POIESIS_JOB_TTL", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_JOB_TTL", ) ), ), V1EnvVar( name="POIESIS_PVC_ACCESS_MODE", value=core_constants.K8s.PVC_ACCESS_MODE, ), V1EnvVar( name="POIESIS_PVC_STORAGE_CLASS", value=core_constants.K8s.PVC_STORAGE_CLASS, ), ], image_pull_policy=core_constants.K8s.IMAGE_PULL_POLICY, volume_mounts=get_infrastructure_security_volume_mount(), ), ], restart_policy=core_constants.K8s.RESTART_POLICY, volumes=get_infrastructure_security_volume(), ), ), ttl_seconds_after_finished=_ttl, ), ) logger.debug(job) try: await self.kubernetes_client.create_job(job) except Exception as e: logger.error(f"Failed to create TORC job: {str(e)}") _id = str(self.task.id) # This will be str as we are using uuid4 await self.db.update_task_state(_id, TesState.SYSTEM_ERROR) def _create_dummy_task_document(self, task: TesTask) -> TaskSchema: _task_id = uuid.uuid4() task.id = str(_task_id) task.name = task.name or api_constants.Task.NAME task.tags = task.tags or {} task.state = TesState.INITIALIZING return TaskSchema( name=task.name, state=TesState.INITIALIZING, tags=task.tags, task_id=str(_task_id), user_id=self.user_id, service_hash="-1", # TODO: Add service hash when service is implemented data=task, )