Source code for api.saml.metadata.monitor
import datetime
import logging
from api.saml.metadata.federations.model import SAMLFederation
from core.monitor import Monitor
from core.util.datetime_helpers import utc_now
[docs]class SAMLMetadataMonitor(Monitor):
SERVICE_NAME = "SAML Metadata Monitor"
MAX_AGE = datetime.timedelta(days=1)
def __init__(self, db, loader):
"""Initialize a new instance of SAMLMetadataMonitor class.
:param loader: IdP loader
:type loader: api.saml.loader.SAMLFederatedIdPLoader
"""
super(SAMLMetadataMonitor, self).__init__(db)
self._loader = loader
self._logger = logging.getLogger(__name__)
def _update_saml_federation_idps_metadata(self, saml_federation):
"""Update IdPs' metadata belonging to the specified SAML federation.
:param saml_federation: SAML federation
:type saml_federation: api.saml.metadata.federations.model.SAMLFederation
"""
self._logger.info("Started processing {0}".format(saml_federation))
for existing_identity_provider in saml_federation.identity_providers:
self._db.delete(existing_identity_provider)
new_identity_providers = self._loader.load(saml_federation)
for new_identity_provider in new_identity_providers:
self._db.add(new_identity_provider)
saml_federation.last_updated_at = utc_now()
self._logger.info("Finished processing {0}".format(saml_federation))
[docs] def run_once(self, progress):
self._logger.info("Started running the SAML metadata monitor")
with self._db.begin(subtransactions=True):
saml_federations = self._db.query(SAMLFederation).all()
self._logger.info(
"Found {0} SAML federations".format(len(saml_federations))
)
for outdated_saml_federation in saml_federations:
self._update_saml_federation_idps_metadata(outdated_saml_federation)
self._logger.info("Finished running the SAML metadata monitor")