Source code for poiesis.api.controllers.cancel_task
"""Controller for canceling tasks."""importasyncioimportloggingfromtypingimportAnyfromkubernetes.client.exceptionsimportApiExceptionfrompoiesis.api.controllers.interfaceimportInterfaceControllerfrompoiesis.api.exceptionsimportBadRequestException,NotFoundExceptionfrompoiesis.api.tes.modelsimportTesCancelTaskResponse,TesStatefrompoiesis.core.adaptors.kubernetes.kubernetesimportKubernetesAdapterfrompoiesis.core.constantsimportget_poiesis_core_constantsfrompoiesis.repository.mongoimportMongoDBClientcore_constants=get_poiesis_core_constants()logger=logging.getLogger(__name__)
[docs]classCancelTaskController(InterfaceController):"""Controller for canceling a task. This controller handles the cancellation of a task in the database. Args: db: The database client. task_id: The ID of the task to cancel. user_id: The ID of the user making the request. """def__init__(self,db:MongoDBClient,task_id:str,user_id:str,)->None:"""Initialize the controller. Args: db: The database client. task_id: The ID of the task to cancel. user_id: The ID of the user making the request. """self.db=dbself.task_id=task_idself.user_id=user_idself.kubernetes_client=KubernetesAdapter()asyncdef_clean_task_resources_and_set_final_state(self)->None:"""Clean up the task resources and set final state."""awaitself._clean_task_task_resources()awaitself.db.update_task_state(self.task_id,TesState.CANCELED)logger.info(f"Task {self.task_id} has been canceled and resources cleaned up.")asyncdef_clean_task_task_resources(self)->None:"""Clean up the task resources."""label_selector=f"tes-task-id={self.task_id}"logger.debug(f"Deleting all the jobs with label selector: {label_selector}")resources=[{"name":"jobs","list_fn":self.kubernetes_client.list_jobs_by_label,"delete_fn":self.kubernetes_client.delete_jobs_by_label,"log_fail":"Failed to delete all jobs after retries and waiting.",},{"name":"pods","list_fn":self.kubernetes_client.list_pods_by_label,"delete_fn":self.kubernetes_client.delete_pods_by_label,"log_fail":"Failed to delete all pods after retries and waiting.",},{"name":"PVCs","list_fn":self.kubernetes_client.list_pvcs_by_label,"delete_fn":self.kubernetes_client.delete_pvcs_by_label,"log_fail":"Failed to delete all PVCs after retries and waiting.",},]max_retries=3forresourceinresources:logger.debug(f"Deleting all the {resource['name']} with "f"label selector: {label_selector}")try_count=0try:list_fn=resource["list_fn"]delete_fn=resource["delete_fn"]# We can safely cast here as we know the types in the resources listwhile(awaitlist_fn(label_selector)# type: ignoreandtry_count<=max_retries):awaitdelete_fn(label_selector)# type: ignoretry_count+=1awaitasyncio.sleep(2<<try_count)ifawaitlist_fn(label_selector):# type: ignorelog_fail=resource["log_fail"]logger.warning(log_fail)exceptApiExceptionase:logger.warning(f"Error deleting {resource['name']} for task {self.task_id}: {e}")exceptTypeErrorase:logger.error(f"Error calling function: {e}")
[docs]asyncdefexecute(self,*args:Any,**kwargs:Any)->TesCancelTaskResponse:"""Cancel a task. Returns: A response indicating the task was canceled. Raises: NotFoundException: If the task is not found for the user. BadRequestException: If the task is already completed, canceled, or being canceled. """task=awaitself.db.get_task(self.task_id)iftask.user_id!=self.user_id:raiseNotFoundException(f"Task {self.task_id} not found for user")iftask.statein(TesState.COMPLETE,TesState.CANCELED,TesState.CANCELING,):raiseBadRequestException(f"Task {self.task_id} is already in a terminal state"f": {task.state.value}")awaitself.db.update_task_state(self.task_id,TesState.CANCELING)asyncio.create_task(self._clean_task_resources_and_set_final_state())returnTesCancelTaskResponse()