"""Schemas for the NoSQL database."""fromcollections.abcimportCallablefromdatetimeimportUTC,datetimefromtypingimportAnnotated,AnyfrombsonimportObjectIdfrompydanticimportBaseModel,Field,field_serializerfrompydantic_coreimportcore_schemafrompoiesis.api.constantsimportPoiesisApiConstantsfrompoiesis.api.tes.modelsimportService,TesState,TesTaskfrompoiesis.cli.utilsimportget_versionconstants=PoiesisApiConstants()class_ObjectIdPydanticAnnotation:# cf. https://stackoverflow.com/a/76837550@classmethoddef__get_pydantic_core_schema__(cls,_source_type:Any,_handler:Callable[[Any],core_schema.CoreSchema],)->core_schema.CoreSchema:defvalidate_from_str(input_value:str)->ObjectId:returnObjectId(input_value)returncore_schema.union_schema([# check if it's an instance first before doing any further workcore_schema.is_instance_schema(ObjectId),core_schema.no_info_plain_validator_function(validate_from_str),],serialization=core_schema.to_string_ser_schema(),)PydanticObjectId=Annotated[ObjectId,_ObjectIdPydanticAnnotation]
[docs]classTaskSchema(BaseModel):"""Schema for Task documents in the NoSQL database. Args: name: Name of the task either provided by the user or the default name tags: Tags of the task either provided by the user or empty, would be used for filtering tasks task_id: Generated ID of the task user_id: Unique user ID from the authentication service service_hash: Hash of the service document when the task is created state: State of the task which is initialized to INITIALIZING but this would be updated as the task progresses updated_at: Timestamp when the task is updated, this would happen by K8s job namely TeXaM and TOrc. data: Task data which would be updated by K8s job namely TeXaM and TOrc. Attributes: id: ID of the Database document name: Name of the task either provided by the user or the default name tags: Tags of the task either provided by the user or empty, would be used for filtering tasks task_id: Generated ID of the task user_id: Unique user ID from the authentication service service_hash: Hash of the service document when the task is created tes_version: Version of the TES against which the task is registered state: State of the task which is initialized to INITIALIZING created_at: Timestamp when the task is created updated_at: Timestamp when the task is updated data: Task data """id:PydanticObjectId|None=Field(default_factory=PydanticObjectId,alias="_id",frozen=True)name:str=Field(default=constants.Task.NAME,frozen=True)tags:dict[str,str]=Field(default_factory=dict,frozen=True)task_id:str=Field(frozen=True)user_id:str=Field(frozen=True)service_hash:str=Field(frozen=True)tes_version:str=Field(default_factory=lambda:get_version(),frozen=True)state:TesState=Field(default=TesState.INITIALIZING)created_at:datetime=Field(default_factory=lambda:datetime.now(UTC),frozen=True)updated_at:datetime=Field(default_factory=lambda:datetime.now(UTC))data:TesTask
[docs]@field_serializer("state")defserialize_state(self,v:TesState)->str:"""Serialize the state to a string."""returnv.value
[docs]classServiceSchema(BaseModel):"""Schema for Service documents in the NoSQL database. Args: id: ID of the Database document service_hash: Hash of the service document update_by: Unique user ID of the admin from the authentication service updated_at: Timestamp when the service is updated data: Service data Attributes: id: ID of the Database document service_hash: Hash of the service document update_by: Unique user ID from the authentication service created_at: Timestamp when the service is created updated_at: Timestamp when the service is updated data: Service data """id:PydanticObjectId|None=Field(default_factory=ObjectId,alias="_id",frozen=True)service_hash:str=Field(frozen=True)update_by:str=Field(frozen=True)created_at:datetime=Field(default_factory=lambda:datetime.now(UTC),frozen=True)updated_at:datetime=Field(default_factory=lambda:datetime.now(UTC))data:Service=Field(frozen=True)