Source code for poiesis.core.services.torc.torc

"""Task orchestrator (Torc)."""

import logging

from kubernetes.client import (
    V1ObjectMeta,
    V1PersistentVolumeClaim,
    V1PersistentVolumeClaimSpec,
    V1ResourceRequirements,
)

from poiesis.api.tes.models import (
    TesInput,
    TesOutput,
    TesState,
    TesTask,
)
from poiesis.core.adaptors.kubernetes.kubernetes import KubernetesAdapter
from poiesis.core.constants import get_labels, get_poiesis_core_constants
from poiesis.core.services.torc.torc_texam_execution import TorcTexamExecution
from poiesis.core.services.torc.torc_tif_execution import TorcTifExecution
from poiesis.core.services.torc.torc_tof_execution import TorcTofExecution
from poiesis.repository.mongo import MongoDBClient

logger = logging.getLogger(__name__)

core_constants = get_poiesis_core_constants()


[docs] class Torc: """Torc service. Args: task: Task object from task request Attributes: task: Task object from task request kubernetes_client: Kubernetes client pvc_name: Name of the PVC created """ def __init__(self, task: TesTask) -> None: """Torc service initialization. Args: task: Task object from task request Attributes: task: Task object from task request kubernetes_client: Kubernetes client pvc_name: Name of the PVC created """ self.task = task if task.id is None: raise ValueError("Task ID is required") self.id = task.id self.kubernetes_client = KubernetesAdapter() self.db = MongoDBClient() self.pvc_name = "" logger.info(f"Torc initialized with task ID: {self.id}")
[docs] async def execute(self) -> None: """Defines the template method, for each service namely Texam, Tif, Tof.""" assert self.id is not None, "The API should have validated the task name." disk_gb = None if self.task.resources is not None: disk_gb = self.task.resources.disk_gb logger.debug(f"Task {self.id} requested disk size: {disk_gb}GB") assert self.id is not None # Already checked in execute() try: # Create PVC logger.info(f"Task {self.id}: Creating PVC") await self.create_pvc(self.id, disk_gb) # Update task state logger.info(f"Task {self.id}: Updating task state to RUNNING") await self.db.update_task_state(self.id, TesState.RUNNING) await self.db.add_task_log(self.id) # Execute pipeline stages logger.info(f"Task {self.id}: Starting TIF execution") await self.tif_execution(self.id, self.task.inputs) logger.info(f"Task {self.id}: Starting TExAM execution") await self.texam_execution(self.task) logger.info(f"Task {self.id}: Starting TOF execution") await self.tof_execution(self.id, self.task.outputs) logger.info(f"Task {self.id}: Adding system logs") await self.db.add_tes_task_system_logs(self.id) await self.db.add_tes_task_log_end_time(self.id) # If we get here, everything succeeded logger.info(f"Task {self.id}: Execution completed successfully") await self.db.update_task_state(self.id, TesState.COMPLETE) # Clean up PVC after successful completion await self.kubernetes_client.delete_pvc(self.pvc_name) logger.info(f"Task {self.id}: PVC {self.pvc_name} deleted successfully") except Exception as e: logger.error(f"Task {self.id}: Execution failed: {str(e)}") await self.db.add_tes_task_system_logs(self.id) await self.db.add_tes_task_log_end_time(self.id) await self.kubernetes_client.delete_pvc(self.pvc_name) raise
[docs] async def create_pvc(self, name: str, size: float | None) -> None: """Create a PVC for the task. Tif and Tof will use this PVC to read and write data, and executor will the data from the PVC for its use. Args: name: Name of the PVC size: Size of the PVC Raises: Exception: If the PVC creation fails. """ pvc_name = f"{core_constants.K8s.PVC_PREFIX}-{name}" logger.debug( f"PVC storage size: {size}Gi if size else " f"{core_constants.K8s.PVC_DEFAULT_DISK_SIZE}" ) if ( not core_constants.K8s.PVC_ACCESS_MODE and not core_constants.K8s.PVC_STORAGE_CLASS ): logger.warning( "PVC access mode and storage class are not set. Using default values." ) logger.debug(f"PVC access mode: {core_constants.K8s.PVC_ACCESS_MODE}") logger.debug(f"PVC storage class: {core_constants.K8s.PVC_STORAGE_CLASS}") pvc = V1PersistentVolumeClaim( api_version="v1", kind="PersistentVolumeClaim", metadata=V1ObjectMeta( name=pvc_name, labels=get_labels( component=core_constants.K8s.PVC_PREFIX, task_id=name, name=pvc_name, parent=f"{core_constants.K8s.TORC_PREFIX}-{name}", ), ), spec=V1PersistentVolumeClaimSpec( access_modes=[core_constants.K8s.PVC_ACCESS_MODE] if core_constants.K8s.PVC_ACCESS_MODE else None, storage_class_name=core_constants.K8s.PVC_STORAGE_CLASS or None, resources=V1ResourceRequirements( requests={ "storage": f"{size}Gi" if size else core_constants.K8s.PVC_DEFAULT_DISK_SIZE } ), ), ) try: self.pvc_name = await self.kubernetes_client.create_pvc(pvc) logger.info(f"PVC created: {self.pvc_name}") except Exception as e: logger.error(f"Failed to create PVC: {str(e)}") _id = str(self.id) # This will be str as we are using uuid4 logger.error(f"Updating task {_id} state to SYSTEM_ERROR") await self.db.update_task_state(_id, TesState.SYSTEM_ERROR) raise
[docs] async def tif_execution(self, name: str, inputs: list[TesInput] | None) -> None: """Execute the Tif job. Args: name: Name of the task, will be modified to create Tif job name. inputs: List of inputs given in the task. volumes: List of volumes given in the task. Raises: Exception: If the Tif job fails. """ logger.info(f"Starting TIF execution for task {name}") if inputs: logger.debug(f"Task {name} has {len(inputs)} inputs") else: logger.debug(f"Task {name} has no inputs") if inputs is None or len(inputs) == 0: logger.info(f"Task {name} has no inputs, skipping TIF execution") return try: tif_executor = TorcTifExecution(name, inputs) await tif_executor.execute() logger.info(f"TIF execution completed successfully for task {name}") except Exception as e: logger.error(f"Failed to execute Tif: {str(e)}") _id = str(self.id) # This will be str as we are using uuid4 logger.error(f"Updating task {_id} state to SYSTEM_ERROR") await self.db.update_task_state(_id, TesState.SYSTEM_ERROR) raise
[docs] async def texam_execution( self, task: TesTask, ) -> None: """Execute the Texam job. Args: task: The TES task that needs to be executed. Raises: Exception: If the Texam job fails. """ logger.info(f"Starting TEXAM execution for task {task.id}") try: texam_executor = TorcTexamExecution(task) await texam_executor.execute() logger.info(f"TEXAM execution completed successfully for task {task.id}") except Exception as e: logger.error(f"Failed to execute Texam: {str(e)}") logger.error(e) _id = str(self.id) # This will be str as we are using uuid4 logger.error(f"Updating task {_id} state to SYSTEM_ERROR") await self.db.update_task_state(_id, TesState.SYSTEM_ERROR) raise
[docs] async def tof_execution( self, name: str, outputs: list[TesOutput] | None, ) -> None: """Execute the Tof job. Args: name: Name of the task, will be modified to create Tof job name. outputs: List of outputs given in the task. Raises: Exception: If the Tof job fails. """ logger.info(f"Starting TOF execution for task {name}") if outputs: logger.debug(f"Task {name} has {len(outputs)} outputs") else: logger.debug(f"Task {name} has no outputs") if outputs is None or len(outputs) == 0: logger.info(f"Task {name} has no outputs, skipping TOF execution") return try: tof_executor = TorcTofExecution(name, outputs) await tof_executor.execute() logger.info(f"TOF execution completed successfully for task {name}") except Exception as e: logger.error(f"Failed to execute Tof: {str(e)}") _id = str(self.id) # This will be str as we are using uuid4 logger.error(f"Updating task {_id} state to SYSTEM_ERROR") await self.db.update_task_state(_id, TesState.SYSTEM_ERROR) raise