[docs]classMessageStatus(Enum):"""Status of K8s job sent via message broker."""SUCCESS="SUCCESS"ERROR="ERROR"
[docs]@dataclassclassMessage:"""Base message class for all messages in the system."""message:strstatus:MessageStatus=field(default_factory=lambda:MessageStatus.SUCCESS)timestamp:datetime=field(default_factory=lambda:datetime.now(UTC))
[docs]defto_json(self)->str:"""Convert to json string."""dict_data=asdict(self)dict_data["timestamp"]=dict_data["timestamp"].isoformat()dict_data["status"]=dict_data["status"].valuereturnjson.dumps(dict_data)
[docs]classMessageBroker(ABC):"""Abstract base class for message broker implementations."""
[docs]@abstractmethoddefpublish(self,channel:str,message:Message)->None:"""Publish a message to a specific channel. Args: channel: The channel/topic to publish to message: The message to publish """pass
[docs]@abstractmethoddefsubscribe(self,channel:str)->Iterator[Message]:"""Subscribe to a channel and yield messages as they arrive. Args: channel: The channel/topic to subscribe to Returns: Iterator yielding messages as they arrive """pass
[docs]@abstractmethoddefclose(self)->None:"""Close the message broker."""pass