poiesis.repository package

Submodules

poiesis.repository.mongo module

MongoDB adaptor for NoSQL database operations.

class poiesis.repository.mongo.MongoDBClient[source]

Bases: object

Simple MongoDB client using Motor for async operations.

async add_task_executor_log(task_id)[source]

Append a log for a task in the database.

Each executor has a log.

Parameters:

task_id (str) – ID of the task

Return type:

None

Note

This assumes that the executors are called sequentially, hence we will just append to the last log.

async add_task_log(task_id)[source]

Add a log for a task in the database.

Parameters:

task_id (str) – ID of the task

Return type:

None

Note

This is because the spec defines that in case of task failure and retry, another log will be added to the task.

async add_tes_task_log_end_time(task_id)[source]

Add the end time of a task in the database.

Parameters:

task_id (str) – ID of the task

Return type:

None

async add_tes_task_system_logs(task_id, system_logs=None)[source]

Add system logs for a task in the database.

Parameters:
  • task_id (str) – ID of the task

  • system_logs (list[str] | None) – System logs to add, custom logs apart from the pod logs.

Return type:

None

async close()[source]

Close all connections in the pool.

connection()[source]

Async context manager for explicit connection handling.

Yields:

AsyncIOMotorDatabase instance

async get_task(task_id)[source]

Get a task from the database.

Parameters:

task_id (str) – ID of the task

Return type:

TaskSchema

async insert_task(task)[source]

Insert a single document into the specified collection.

Parameters:

task (TaskSchema) – Task to insert

Return type:

str

Returns:

The inserted document ID as a string

Raises:

DBException – If the document is not valid or the insert operation fails

async list_tasks(query, page_size=None, next_page_token=None)[source]

List tasks from the database with pagination.

Parameters:
  • query (dict[str, Any]) – Query to filter tasks

  • page_size (int | None) – Number of tasks to return

  • next_page_token (str | None) – Token for returning the next page of results

Returns:

list of tasks matching the query,

and next page token

Return type:

tuple[list[TesTask], Optional[str]]

async update_executor_log(job_name, pod_phase, stdout, stderr=None)[source]

Update the executor log in the database.

Get the index of the executor from executor name and then updates the idx log of executor of the last log of the task.

Note: If the pods fails to start, we can’t call the get_pod_log method. If the

stdout and stderr are provided, we use them instead of the pod log, else try to call the get_pod_log method, if that fails, we use empty strings.

Parameters:
  • job_name (str) – Name of the job

  • pod_phase (str) – Phase of the pod

  • stdout (str) – Standard output of the pod

  • stderr (str | None) – Standard error of the pod

Return type:

None

async update_task_state(task_id, state)[source]

Update the state of a task in the database.

This would be called by jobs in case of task state change or failure.

Parameters:
  • task_id (str) – ID of the task

  • state (TesState) – State of the task

Raises:

DBException – If the update operation fails

Return type:

None

poiesis.repository.schemas module

Schemas for the NoSQL database.

class poiesis.repository.schemas.ServiceSchema(**data)[source]

Bases: BaseModel

Schema for Service documents in the NoSQL database.

Parameters:
  • id – ID of the Database document

  • service_hash – Hash of the service document

  • update_by – Unique user ID of the admin from the authentication service

  • updated_at – Timestamp when the service is updated

  • data (Any) – Service data

id

ID of the Database document

service_hash

Hash of the service document

update_by

Unique user ID from the authentication service

created_at

Timestamp when the service is created

updated_at

Timestamp when the service is updated

data

Service data

class Config[source]

Bases: object

Pydantic configuration.

arbitrary_types_allowed = True
populate_by_name = True
created_at: datetime
data: Service
id: Annotated[ObjectId, _ObjectIdPydanticAnnotation] | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

service_hash: str
update_by: str
updated_at: datetime
class poiesis.repository.schemas.TaskSchema(**data)[source]

Bases: BaseModel

Schema for Task documents in the NoSQL database.

Parameters:
  • name – Name of the task either provided by the user or the default name

  • tags – Tags of the task either provided by the user or empty, would be used for filtering tasks

  • task_id – Generated ID of the task

  • user_id – Unique user ID from the authentication service

  • service_hash – Hash of the service document when the task is created

  • state – State of the task which is initialized to INITIALIZING but this would be updated as the task progresses

  • updated_at – Timestamp when the task is updated, this would happen by K8s job namely TeXaM and TOrc.

  • data (Any) – Task data which would be updated by K8s job namely TeXaM and TOrc.

id

ID of the Database document

name

Name of the task either provided by the user or the default name

tags

Tags of the task either provided by the user or empty, would be used for filtering tasks

task_id

Generated ID of the task

user_id

Unique user ID from the authentication service

service_hash

Hash of the service document when the task is created

tes_version

Version of the TES against which the task is registered

state

State of the task which is initialized to INITIALIZING

created_at

Timestamp when the task is created

updated_at

Timestamp when the task is updated

data

Task data

class Config[source]

Bases: object

Pydantic configuration.

arbitrary_types_allowed = True
populate_by_name = True
created_at: datetime
data: TesTask
id: Annotated[ObjectId, _ObjectIdPydanticAnnotation] | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'populate_by_name': True, 'validate_by_alias': True, 'validate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str
serialize_state(v)[source]

Serialize the state to a string.

Return type:

str

service_hash: str
state: TesState
tags: dict[str, str]
task_id: str
tes_version: str
updated_at: datetime
user_id: str

Module contents

Repository module for Poiesis.

The TesDB is a NoSQL database that stores the task and service information. We would be storing extra information in TesDB for back compatibility purposes.