[docs]classSingletonMeta(type):"""Singleton metaclass for OpenIDConnectAuthProvider."""_instances:dict[type,"OpenIDConnectAuthProvider"]={}_lock=threading.Lock()def__call__(cls,*args:Any,**kwargs:Any)->"OpenIDConnectAuthProvider":"""Create a singleton instance of OpenIDConnectAuthProvider."""withcls._lock:ifclsnotincls._instances:cls._instances[cls]=super().__call__(*args,**kwargs)returncls._instances[cls]
[docs]classOpenIDConnectAuthProvider(AuthProvider,metaclass=SingletonMeta):"""Generic OpenID Connect authentication provider."""_oidc_client:OAuth2Client|None=None_introspect_url:str|None=Nonedef__init__(self):"""Initialize the OpenIDConnectAuthProvider singleton."""ifgetattr(self,"_initialized",False):returnlogger.debug("Initializing OpenIDConnectAuthProvider singleton")ifnotgetattr(constants.Auth.OIDC,"ISSUER",None)ornotgetattr(constants.Auth.OIDC,"CLIENT_ID",None):logger.error("Missing required OIDC configuration")raiseInternalServerException("OIDC_ISSUER and OIDC_CLIENT_ID must be configured.")ifnotOpenIDConnectAuthProvider._introspect_url:OpenIDConnectAuthProvider._introspect_url=get_oidc_introspect_url()logger.debug(f"Using introspection URL: {OpenIDConnectAuthProvider._introspect_url}")client_secret=os.getenv("OIDC_CLIENT_SECRET")ifnotclient_secret:logger.error("OIDC_CLIENT_SECRET is not configured")raiseInternalServerException("OIDC_CLIENT_SECRET is not configured")# Initialize the OAuth2Client singleton if not already doneifOpenIDConnectAuthProvider._oidc_clientisNone:OpenIDConnectAuthProvider._oidc_client=OAuth2Client(client_id=constants.Auth.OIDC.CLIENT_ID,client_secret=client_secret,server_metadata_url=constants.Auth.OIDC.DISCOVERY_URL,)logger.debug("OIDC client initialized with "f"client_id: {constants.Auth.OIDC.CLIENT_ID}")else:logger.debug("OIDC client already initialized with "f"client_id: {constants.Auth.OIDC.CLIENT_ID}")self.oidc_client=OpenIDConnectAuthProvider._oidc_clientself.introspect_url=OpenIDConnectAuthProvider._introspect_urlself._initialized=Truedef_validate_token_local(self,token:str)->dict[str,Any]:"""Locally validate a JWT using the provider's JWKS."""try:jwks_uri=get_oidc_jwks_uri()jwk_client=PyJWKClient(jwks_uri)signing_key=jwk_client.get_signing_key_from_jwt(token)data=jwt.decode(token,signing_key.key,algorithms=["RS256","RS384","RS512"],audience=constants.Auth.OIDC.CLIENT_ID,issuer=constants.Auth.OIDC.ISSUER,options={"verify_exp":True,"verify_aud":True,"verify_iss":True},)ifnotisinstance(data,dict):raiseInternalServerException("Decoded JWT payload is not a dictionary.")logger.debug(f"Local JWT validation successful: {data}")returndataexceptInvalidTokenErrorase:logger.warning(f"Local JWT validation failed: {e}")raiseUnauthorizedException(f"Invalid JWT: {e}")fromeexceptExceptionase:logger.error(f"Unexpected error during local JWT validation: {e}",exc_info=True)raiseInternalServerException("Local JWT validation failed")frome
[docs]defvalidate_token(self,token:str)->dict[str,Any]:"""Validates an OIDC ID token. This method checks the token's signature, issuer, audience, and expiration. Tries local validation first, falls back to introspection if needed. """try:returnself._validate_token_local(token)exceptUnauthorizedExceptionase:logger.warning(f"Falling back to introspection due to local validation failure: {e}")exceptExceptionase:logger.error(f"Local validation failed unexpectedly: {e}",exc_info=True)# Fallback to introspectiontry:data:dict[str,Any]=self.oidc_client.introspect_token(url=self.introspect_url,token=token).json()logger.debug(f"Token introspection response: {data}")ifnotdata.get("active",False):logger.warning("OIDC token is not active")raiseUnauthorizedException("OIDC token is not active")logger.debug("Token validation successful (via introspection)")returndataexceptExceptionase:logger.error(f"OIDC token validation failed: {e}",exc_info=True)raiseInternalServerException("OIDC token validation failed")frome