Source code for poiesis.core.constants

"""Constants used in core services."""

import json
import logging
import os
from dataclasses import dataclass
from functools import lru_cache
from pathlib import Path
from typing import Any

from kubernetes.client.models import (
    V1ConfigMapKeySelector,
    V1ConfigMapVolumeSource,
    V1EnvVar,
    V1EnvVarSource,
    V1PodSecurityContext,
    V1SecretKeySelector,
    V1SecurityContext,
    V1Volume,
    V1VolumeMount,
)
from pydantic import ValidationError

from poiesis.api.exceptions import InternalServerException
from poiesis.core.adaptors.kubernetes.models import (
    V1PodSecurityContextPydanticModel,
    V1SecurityContextPydanticModel,
)

logger = logging.getLogger(__name__)


[docs] @dataclass(frozen=True) class PoiesisCoreConstants: """Constants used in core services. Attributes: K8s: Constants used in Kubernetes. MessageBroker: Constants used in message broker. Texam: Constants used in Texam. """
[docs] @dataclass(frozen=True) class K8s: """Constants used in Kubernetes. Attributes: K8S_NAMESPACE: The namespace in Kubernetes. TORC_PREFIX: The prefix for the Task Orchestrator job name. TIF_PREFIX: The prefix for the Task Input Filer job name. TE_PREFIX: The prefix for the Task Executor pod name. TOF_PREFIX: The prefix for the Task Output Filer job name. PVC_PREFIX: The prefix for the Persistent Volume Claim name. TEXAM_PREFIX: The prefix for the Texam job name. PVC_DEFAULT_DISK_SIZE: The default disk size for the Persistent Volume Claim. PVC_ACCESS_MODE: The access mode for PVCs (e.g., ReadWriteOnce, ReadWriteMany). Defaults to ReadWriteOnce for compatibility with most storage providers. PVC_STORAGE_CLASS: The storage class name for PVCs. Defaults to 'standard' which is commonly available across different K8s distributions. POIESIS_IMAGE: The Poiesis image. COMMON_PVC_VOLUME_NAME: The common PVC volume name. FILER_PVC_PATH: The path in the PVC for the filer. S3_SECRET_NAME: The S3 K8s secret name. REDIS_SECRET_NAME: The redis K8s secret name. MONGODB_SECRET_NAME: The mongo K8s secret name. MONGODB_URI_SECRET_KEY: The mongo K8s secret key. SERVICE_ACCOUNT_NAME: The K8s service account name that allows core component to interact with K8s API and create, list and delete pods. BACKOFF_LIMIT: The backoff limit for Job. CONFIGMAP_NAME: The configmap name for the core services. RESTART_POLICY: Restart policy for pods. IMAGE_PULL_POLICY: Image pull policy. JOB_TTL: Time in seconds after which the completed or failed job will be removed. """ K8S_NAMESPACE = os.getenv("POIESIS_K8S_NAMESPACE", "poiesis") TORC_PREFIX = "torc" TIF_PREFIX = "tif" TE_PREFIX = "te" TOF_PREFIX = "tof" PVC_PREFIX = "pvc" TEXAM_PREFIX = "texam" PVC_DEFAULT_DISK_SIZE = "1Gi" PVC_ACCESS_MODE = os.getenv("POIESIS_PVC_ACCESS_MODE") PVC_STORAGE_CLASS = os.getenv("POIESIS_PVC_STORAGE_CLASS") POIESIS_IMAGE = os.getenv("POIESIS_IMAGE", "docker.io/jaeaeich/poiesis:latest") COMMON_PVC_VOLUME_NAME = "task-pvc-volume" FILER_PVC_PATH = "/transfer" REDIS_SECRET_NAME = os.getenv("POIESIS_REDIS_SECRET_NAME") S3_SECRET_NAME = os.getenv("POIESIS_S3_SECRET_NAME") MONGODB_SECRET_NAME = os.getenv("POIESIS_MONGODB_SECRET_NAME") MONGODB_URI_SECRET_KEY = os.getenv( "POIESIS_MONGODB_URI_SECRET_KEY", "MONGODB_URI" ) SERVICE_ACCOUNT_NAME = os.getenv("POIESIS_SERVICE_ACCOUNT_NAME") BACKOFF_LIMIT = 0 CONFIGMAP_NAME = os.getenv("POIESIS_CORE_CONFIGMAP_NAME") RESTART_POLICY = os.getenv("POIESIS_RESTART_POLICY", "Never") IMAGE_PULL_POLICY = os.getenv("POIESIS_IMAGE_PULL_POLICY", "IfNotPresent") JOB_TTL = os.getenv("POIESIS_JOB_TTL") SECURITY_CONTEXT_CONFIGMAP_NAME = os.getenv( "POIESIS_SECURITY_CONTEXT_CONFIGMAP_NAME" ) SECURITY_CONTEXT_PATH = os.getenv("POIESIS_SECURITY_CONTEXT_PATH") INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED = ( os.getenv("POIESIS_INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED", "true").lower() == "true" ) EXECUTOR_SECURITY_CONTEXT_ENABLED = ( os.getenv("POIESIS_EXECUTOR_SECURITY_CONTEXT_ENABLED", "true").lower() == "true" )
[docs] @dataclass(frozen=True) class Texam: """Constants used in Texam. Attributes: BACKOFF_LIMIT: The maximum time is second to wait in exponential backoff. starting from 1 second for a failing executor pod. POLL_INTERVAL: The interval in seconds to poll the executor pod status as a fallback strategy if watch is not available. MONITOR_TIMEOUT_SECONDS: The timeout in seconds to monitor the executor pod status. Default to 0, which means infinity. """ BACKOFF_LIMIT = 60 POLL_INTERVAL = 10 MONITOR_TIMEOUT_SECONDS = 0
[docs] @lru_cache def get_poiesis_core_constants() -> PoiesisCoreConstants: """Get the Poiesis core constants. Returns: PoesisCoreConstants: The Poiesis core constants. """ return PoiesisCoreConstants()
core_constants = get_poiesis_core_constants()
[docs] @lru_cache def get_message_broker_envs() -> tuple[V1EnvVar, ...]: """Get the env vars for redis. Used in k8s manifest for `tif`, `torc` etc. """ common = ( V1EnvVar( name="MESSAGE_BROKER_HOST", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="MESSAGE_BROKER_HOST", ) ), ), V1EnvVar( name="MESSAGE_BROKER_PORT", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="MESSAGE_BROKER_PORT", ) ), ), ) auth = ( V1EnvVar( name="MESSAGE_BROKER_PASSWORD", value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name=core_constants.K8s.REDIS_SECRET_NAME, key="MESSAGE_BROKER_PASSWORD", optional=True, ) ), ), ) return common + auth if core_constants.K8s.REDIS_SECRET_NAME else common
[docs] @lru_cache def get_mongo_envs() -> tuple[V1EnvVar, ...]: """Get the env vars for mongo. Used in k8s manifest for `tif`, `torc` etc. """ envs = ( V1EnvVar( name="POIESIS_MONGODB_SECRET_NAME", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_MONGODB_SECRET_NAME", optional=True, ) ), ), V1EnvVar( name="POIESIS_MONGODB_URI_SECRET_KEY", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_MONGODB_URI_SECRET_KEY", optional=True, ) ), ), V1EnvVar( name=core_constants.K8s.MONGODB_URI_SECRET_KEY, value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name=core_constants.K8s.MONGODB_SECRET_NAME, key=core_constants.K8s.MONGODB_URI_SECRET_KEY, optional=True, ) ), ), ) if not core_constants.K8s.MONGODB_SECRET_NAME: logger.error("MongoDB secret name is not set") raise ValueError("MongoDB secret name is not set") return envs
[docs] @lru_cache def get_s3_envs() -> tuple[V1EnvVar, ...]: """Get the env vars for s3. Used in k8s manifest for `tif`, `tof`. """ return ( ( V1EnvVar( name="S3_URL", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="S3_URL", optional=True, ) ), ), V1EnvVar( name="AWS_ACCESS_KEY_ID", value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name=core_constants.K8s.S3_SECRET_NAME, key="AWS_ACCESS_KEY_ID", optional=True, ) ), ), V1EnvVar( name="AWS_SECRET_ACCESS_KEY", value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector( name=core_constants.K8s.S3_SECRET_NAME, key="AWS_SECRET_ACCESS_KEY", optional=True, ) ), ), ) if core_constants.K8s.S3_SECRET_NAME else () )
[docs] @lru_cache def get_secret_names() -> tuple[V1EnvVar, ...]: """Returns name of the secrets as env.""" return ( V1EnvVar( name="POIESIS_S3_SECRET_NAME", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_S3_SECRET_NAME", optional=True, ) ), ), V1EnvVar( name="POIESIS_MONGODB_SECRET_NAME", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_MONGODB_SECRET_NAME", ) ), ), V1EnvVar( name="POIESIS_REDIS_SECRET_NAME", value_from=V1EnvVarSource( config_map_key_ref=V1ConfigMapKeySelector( name=core_constants.K8s.CONFIGMAP_NAME, key="POIESIS_REDIS_SECRET_NAME", optional=True, ), ), ), )
[docs] @lru_cache def get_configmap_names() -> tuple[V1EnvVar, ...]: """Get names of the configmap.""" return ( V1EnvVar( name="POIESIS_CORE_CONFIGMAP_NAME", value=core_constants.K8s.CONFIGMAP_NAME ), )
[docs] @lru_cache def get_security_context_envs() -> tuple[V1EnvVar, ...]: """Get the env vars for security context.""" return ( V1EnvVar( name="POIESIS_SECURITY_CONTEXT_CONFIGMAP_NAME", value=core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME, ), V1EnvVar( name="POIESIS_SECURITY_CONTEXT_PATH", value=core_constants.K8s.SECURITY_CONTEXT_PATH, ), V1EnvVar( name="POIESIS_INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED", value=str(core_constants.K8s.INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED), ), V1EnvVar( name="POIESIS_EXECUTOR_SECURITY_CONTEXT_ENABLED", value=str(core_constants.K8s.EXECUTOR_SECURITY_CONTEXT_ENABLED), ), )
def _read_security_context_json(filename: str) -> dict[str, Any]: """Read a security context JSON file and return the parsed data. Args: filename: Name of the JSON file to read (e.g., "infrastructure_pod_security_context.json") Returns: Parsed JSON data as dict Raises: InternalServerException: If the file doesn't exist or can't be read """ try: security_context_path = ( core_constants.K8s.SECURITY_CONTEXT_PATH.strip() if core_constants.K8s.SECURITY_CONTEXT_PATH is not None else "" ) if not security_context_path and ( core_constants.K8s.INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED or core_constants.K8s.EXECUTOR_SECURITY_CONTEXT_ENABLED ): raise InternalServerException("Security context path is not set.") file_path = Path(str(security_context_path)) / filename if not file_path.exists(): raise InternalServerException(f"Security context file {filename} not found") with open(file_path) as f: context: dict[str, Any] = json.load(f) if not context: logger.warning(f"Security context is empty in {filename}.") logger.debug(f"Security context: \n{json.dumps(context, indent=2)}") return context except (FileNotFoundError, json.JSONDecodeError, PermissionError) as e: raise InternalServerException( "Failed to read security context JSON file" ) from e
[docs] @lru_cache def get_infrastructure_pod_security_context() -> V1PodSecurityContext | None: """Returns a V1PodSecurityContext for infrastructure components. Returns: V1PodSecurityContext: The security context for infrastructure components. None: If security context is disabled. """ if not core_constants.K8s.INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED: return None filename = "infrastructure_pod_security_context.json" json_data = _read_security_context_json(filename) try: return V1PodSecurityContextPydanticModel.model_validate( json_data ).to_k8s_model() except ValidationError as e: raise InternalServerException(f"Failed to validate {filename}") from e
[docs] @lru_cache def get_infrastructure_container_security_context() -> V1SecurityContext | None: """Returns a V1SecurityContext for infrastructure containers. Returns: V1SecurityContext: The security context for infrastructure containers. None: If security context is disabled. """ if not core_constants.K8s.INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED: return None filename = "infrastructure_container_security_context.json" json_data = _read_security_context_json(filename) try: return V1SecurityContextPydanticModel.model_validate(json_data).to_k8s_model() except ValidationError as e: raise InternalServerException(f"Failed to validate {filename}") from e
[docs] @lru_cache def get_executor_container_security_context() -> V1SecurityContext | None: """Returns a V1SecurityContext for task executor containers. Returns: V1SecurityContext: The security context for task executor containers. None: If security context is disabled. """ if not core_constants.K8s.EXECUTOR_SECURITY_CONTEXT_ENABLED: return None filename = "executor_container_security_context.json" json_data = _read_security_context_json(filename) try: return V1SecurityContextPydanticModel.model_validate(json_data).to_k8s_model() except ValidationError as e: raise InternalServerException(f"Failed to validate {filename}") from e
[docs] @lru_cache def get_executor_pod_security_context() -> V1PodSecurityContext | None: """Returns a V1PodSecurityContext for task executor pods. Returns: V1PodSecurityContext: The security context for task executor pods. None: If security context is disabled. """ if not core_constants.K8s.EXECUTOR_SECURITY_CONTEXT_ENABLED: return None filename = "executor_pod_security_context.json" json_data = _read_security_context_json(filename) try: return V1PodSecurityContextPydanticModel.model_validate( json_data ).to_k8s_model() except ValidationError as e: raise InternalServerException(f"Failed to validate {filename}") from e
[docs] @lru_cache def get_infrastructure_security_volume() -> list[V1Volume]: """Returns the name of the security context configmap.""" if not core_constants.K8s.INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED: return [] if ( not core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME or not core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME.strip() ): raise InternalServerException( "Security context configmap name is not set or is empty/whitespace." ) return [ V1Volume( name=str(core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME), config_map=V1ConfigMapVolumeSource( name=str(core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME) ), ) ]
[docs] @lru_cache def get_infrastructure_security_volume_mount() -> list[V1VolumeMount]: """Returns the name of the security context configmap.""" if not core_constants.K8s.INFRASTRUCTURE_SECURITY_CONTEXT_ENABLED: return [] if not core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME: raise InternalServerException("Security context configmap name is not set.") if not core_constants.K8s.SECURITY_CONTEXT_PATH: raise InternalServerException("Security context path is not set.") return [ V1VolumeMount( name=str(core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME), mount_path=core_constants.K8s.SECURITY_CONTEXT_PATH, read_only=True, ) ]
[docs] @lru_cache def get_executor_security_volume() -> list[V1Volume]: """Returns the name of the security context configmap.""" if not core_constants.K8s.EXECUTOR_SECURITY_CONTEXT_ENABLED: return [] if not core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME: raise InternalServerException("Security context configmap name is not set.") return [ V1Volume( name=str(core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME), config_map=V1ConfigMapVolumeSource( name=str(core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME) ), ) ]
[docs] @lru_cache def get_executor_security_volume_mount() -> list[V1VolumeMount]: """Returns the name of the security context configmap.""" if not core_constants.K8s.EXECUTOR_SECURITY_CONTEXT_ENABLED: return [] if not core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME: raise InternalServerException("Security context configmap name is not set.") if not core_constants.K8s.SECURITY_CONTEXT_PATH: raise InternalServerException("Security context path is not set.") return [ V1VolumeMount( name=str(core_constants.K8s.SECURITY_CONTEXT_CONFIGMAP_NAME), mount_path=core_constants.K8s.SECURITY_CONTEXT_PATH, read_only=True, ) ]
[docs] def get_labels( component: str, task_id: str, name: str | None = None, parent: str | None = None, ) -> dict[str, str]: """Get the labels for a job or a PVC. Args: component: The component that is creating the resource. task_id: The id of the task. name: The name of the resource. parent: The parent of the resource. Returns: The labels for the resource. """ labels = { "app.kubernetes.io/component": component, "tes-task-id": task_id, } if name: labels["app.kubernetes.io/resource-name"] = name if parent: labels["app.kubernetes.io/part-of"] = parent return labels