import cgi
import html
import json
from threading import Lock
from contextlib2 import contextmanager
from flask_babel import lazy_gettext as _
from onelogin.saml2.settings import OneLogin_Saml2_Settings
from api.saml.metadata.federations import incommon
from api.saml.metadata.federations.model import (
SAMLFederatedIdentityProvider,
SAMLFederation,
)
from api.saml.metadata.model import SAMLAttributeType, SAMLServiceProviderMetadata
from api.saml.metadata.parser import SAMLMetadataParser
from core.exceptions import BaseError
from core.model.configuration import (
ConfigurationAttributeType,
ConfigurationFactory,
ConfigurationGrouping,
ConfigurationMetadata,
ConfigurationOption,
)
cgi.escape = html.escape
[docs]class SAMLConfigurationError(BaseError):
"""Raised in the case of any configuration errors."""
[docs]class SAMLConfiguration(ConfigurationGrouping):
"""Contains SP and IdP settings."""
service_provider_xml_metadata = ConfigurationMetadata(
key="sp_xml_metadata",
label=_("Service Provider's XML Metadata"),
description=_(
"SAML metadata of the Circulation Manager's Service Provider in an XML format. "
"MUST contain exactly one SPSSODescriptor tag with at least one "
"AssertionConsumerService tag with Binding attribute set to "
"urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST."
),
type=ConfigurationAttributeType.TEXTAREA,
required=True,
)
service_provider_private_key = ConfigurationMetadata(
key="sp_private_key",
label=_("Service Provider's Private Key"),
description=_("Private key used for encrypting SAML requests."),
type=ConfigurationAttributeType.TEXTAREA,
required=False,
)
federated_identity_provider_entity_ids = ConfigurationMetadata(
key="saml_federated_idp_entity_ids",
label=_("List of Federated IdPs"),
description=_(
"List of federated (for example, from InCommon Federation) IdPs supported by this authentication provider. "
"Try to type the name of the IdP to find it in the list."
),
type=ConfigurationAttributeType.MENU,
required=False,
options=[],
default=[],
format="narrow",
)
patron_id_use_name_id = ConfigurationMetadata(
key="saml_patron_id_use_name_id",
label=_("Patron ID: SAML NameID"),
description=_(
"Configuration setting indicating whether SAML NameID should be searched for a unique patron ID. "
"If NameID found, it will supersede any SAML attributes selected in the next section."
),
type=ConfigurationAttributeType.SELECT,
required=False,
default="true",
options=[
ConfigurationOption("true", "Use SAML NameID"),
ConfigurationOption("false", "Do NOT use SAML NameID")
]
)
patron_id_attributes = ConfigurationMetadata(
key="saml_patron_id_attributes",
label=_("Patron ID: SAML Attributes"),
description=_(
"List of SAML attributes that MAY contain a unique patron ID. "
"The attributes will be scanned sequentially in the order you chose them, "
"and the first existing attribute will be used to extract a unique patron ID."
"<br>"
"NOTE: If a SAML attribute contains several values, only the first will be used."
),
type=ConfigurationAttributeType.MENU,
required=False,
options=[
ConfigurationOption(attribute.name, attribute.name)
for attribute in SAMLAttributeType
],
default=[
SAMLAttributeType.eduPersonUniqueId.name,
SAMLAttributeType.eduPersonTargetedID.name,
SAMLAttributeType.eduPersonPrincipalName.name,
SAMLAttributeType.uid.name,
],
format="narrow",
)
patron_id_regular_expression = ConfigurationMetadata(
key="saml_patron_id_regular_expression",
label=_("Patron ID: Regular expression"),
description=_(
"Regular expression used to extract a unique patron ID from the attributes "
"specified in <b>Patron ID: SAML Attributes</b> and/or NameID "
"(if it's enabled in <b>Patron ID: SAML NameID</b>). "
"<br>"
"The expression MUST contain a named group <b>patron_id</b> used to match the patron ID. "
"For example:"
"<br>"
"<pre>"
"{the_regex_pattern}"
"</pre>"
"The expression will extract the <b>patron_id</b> from the first SAML attribute that matches "
"or NameID if it matches the expression."
).format(the_regex_pattern=cgi.escape(r"(?P<patron_id>.+)@university\.org")),
type=ConfigurationAttributeType.TEXT,
required=False,
)
non_federated_identity_provider_xml_metadata = ConfigurationMetadata(
key="idp_xml_metadata",
label=_("Identity Provider's XML metadata"),
description=_(
"SAML metadata of Identity Providers in an XML format. "
"MAY contain multiple IDPSSODescriptor tags but each of them MUST contain "
"at least one SingleSignOnService tag with Binding attribute set to "
"urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect."
),
type=ConfigurationAttributeType.TEXTAREA,
required=False,
)
session_lifetime = ConfigurationMetadata(
key="saml_session_lifetime",
label=_("Session Lifetime"),
description=_(
"This configuration setting determines how long "
"a session created by the SAML authentication provider will live in days. "
"By default it's empty meaning that the lifetime of the Circulation Manager's session "
"is exactly the same as the lifetime of the IdP's session. "
"Setting this value to a specific number will override this behaviour."
"<br>"
"NOTE: This setting affects the session's lifetime only Circulation Manager's side. "
"Accessing content protected by SAML will still be governed by the IdP and patrons "
"will have to reauthenticate each time the IdP's session expires."
),
type=ConfigurationAttributeType.NUMBER,
required=False,
default=None,
)
filter_expression = ConfigurationMetadata(
key="saml_filter_expression",
label=_("Filter Expression"),
description=_(
"Python expression used for filtering out patrons by their SAML attributes."
"<br>"
"<br>"
'For example, if you want to authenticate using SAML only patrons having "eresources" '
'as the value of their "eduPersonEntitlement" then you need to use the following expression:'
"<br>"
"<pre>"
"""
"urn:mace:nyu.edu:entl:lib:eresources" == subject.attribute_statement.attributes["eduPersonEntitlement"].values[0]
"""
"</pre>"
"<br>"
'If "eduPersonEntitlement" can have multiple values, you can use the following expression:'
"<br>"
"<pre>"
"""
"urn:mace:nyu.edu:entl:lib:eresources" in subject.attribute_statement.attributes["eduPersonEntitlement"].values
"""
"</pre>"
),
type=ConfigurationAttributeType.TEXTAREA,
required=False,
)
service_provider_strict_mode = ConfigurationMetadata(
key="strict",
label=_("Service Provider's Strict Mode"),
description=_(
"If strict is 1, then the Python Toolkit will reject unsigned or unencrypted messages "
"if it expects them to be signed or encrypted. Also, it will reject the messages "
"if the SAML standard is not strictly followed."
),
type=ConfigurationAttributeType.NUMBER,
required=False,
default=0,
)
service_provider_debug_mode = ConfigurationMetadata(
key="debug",
label=_("Service Provider's Debug Mode"),
description=_("Enable debug mode (outputs errors)."),
type=ConfigurationAttributeType.NUMBER,
required=False,
default=0,
)
IDP_DISPLAY_NAME_DEFAULT_TEMPLATE = "Identity Provider #{0}"
def __init__(self, configuration_storage, db, metadata_parser):
"""Initializes a new instance of SAMLConfiguration class
:param configuration_storage: SAML configuration storage
:type configuration_storage: ConfigurationStorage
:param metadata_parser: SAML metadata parser
:type metadata_parser: SAMLMetadataParser
"""
super(SAMLConfiguration, self).__init__(configuration_storage, db)
self._metadata_parser = metadata_parser
self._identity_providers = None
self._service_provider = None
def _get_federated_identity_providers(self, db):
"""Return a list of federated IdPs corresponding to the entity IDs selected by the admin.
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:return: List of SAMLFederatedIdP objects
:rtype: List[api.saml.metadata.federations.model.SAMLFederatedIdP]
"""
if not self.federated_identity_provider_entity_ids:
return []
federated_identity_provider_entity_ids = json.loads(
self.federated_identity_provider_entity_ids
)
return (
db.query(SAMLFederatedIdentityProvider)
.filter(
SAMLFederatedIdentityProvider.entity_id.in_(
federated_identity_provider_entity_ids
)
)
.all()
)
def _load_identity_providers(self, db):
"""Loads IdP settings from the library's configuration settings
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:return: List of IdentityProviderMetadata objects
:rtype: List[IdentityProviderMetadata]
:raise: SAMLParsingError
"""
identity_providers = []
if self.non_federated_identity_provider_xml_metadata:
parsing_results = self._metadata_parser.parse(
self.non_federated_identity_provider_xml_metadata
)
identity_providers = [
parsing_result.provider for parsing_result in parsing_results
]
if self.federated_identity_provider_entity_ids:
for identity_provider_metadata in self._get_federated_identity_providers(
db
):
parsing_results = self._metadata_parser.parse(
identity_provider_metadata.xml_metadata
)
for parsing_result in parsing_results:
identity_providers.append(parsing_result.provider)
return identity_providers
def _load_service_provider(self, db):
"""Loads SP settings from the library's configuration settings
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:return: SAMLServiceProviderMetadata object
:rtype: SAMLServiceProviderMetadata
:raise: SAMLParsingError
"""
parsing_results = self._metadata_parser.parse(
self.service_provider_xml_metadata
)
if not isinstance(parsing_results, list) or len(parsing_results) != 1:
raise SAMLConfigurationError(
_("SAML Service Provider's configuration is not correct")
)
parsing_result = parsing_results[0]
service_provider = parsing_result.provider
if not isinstance(service_provider, SAMLServiceProviderMetadata):
raise SAMLConfigurationError(
_("SAML Service Provider's configuration is not correct")
)
service_provider.private_key = (
self.service_provider_private_key
if self.service_provider_private_key
else ""
)
return service_provider
[docs] def get_identity_providers(self, db):
"""Returns identity providers
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:return: List of IdentityProviderMetadata objects
:rtype: List[IdentityProviderMetadata]
:raise: ConfigurationError
"""
if self._identity_providers is None:
self._identity_providers = self._load_identity_providers(db)
return self._identity_providers
[docs] def get_service_provider(self, db):
"""Returns service provider
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:return: ServiceProviderMetadata object
:rtype: ServiceProviderMetadata
:raise: ConfigurationError
"""
if self._service_provider is None:
self._service_provider = self._load_service_provider(db)
return self._service_provider
[docs]class SAMLSettings(dict):
"""Converts SAMLConfiguration to SETTINGS-compatible dictionary.
Once a database session becomes available,
this class updates SAMLConfiguration with a list of available federated IdPs.
"""
_mutex = Lock()
def __get__(self, instance, owner):
"""Return a SETTINGS-compatible dictionary.
:return: SETTINGS-compatible dictionary
:rtype: Dict
"""
with self._mutex:
if not SAMLConfiguration.federated_identity_provider_entity_ids.options:
try:
from api.app import app
# 1. Load all InCommon IdPs from the database
incommon_federated_identity_providers = (
app._db.query(
SAMLFederatedIdentityProvider.entity_id,
SAMLFederatedIdentityProvider.display_name,
)
.join(SAMLFederation)
.filter(SAMLFederation.type == incommon.FEDERATION_TYPE)
.order_by(SAMLFederatedIdentityProvider.display_name)
).all()
# 2. Convert SAMLFederatedIdentityProvider objects to ConfigurationOption objects
configuration_options = []
for (
incommon_federated_identity_provider
) in incommon_federated_identity_providers:
configuration_options.append(
ConfigurationOption(
key=incommon_federated_identity_provider[0],
label=incommon_federated_identity_provider[1],
)
)
# 3. Update SAMLConfiguration.federated_identity_provider_entity_ids.options
SAMLConfiguration.federated_identity_provider_entity_ids = ConfigurationMetadata(
SAMLConfiguration.federated_identity_provider_entity_ids.key,
SAMLConfiguration.federated_identity_provider_entity_ids.label,
SAMLConfiguration.federated_identity_provider_entity_ids.description,
SAMLConfiguration.federated_identity_provider_entity_ids.type,
SAMLConfiguration.federated_identity_provider_entity_ids.required,
SAMLConfiguration.federated_identity_provider_entity_ids.default,
configuration_options,
SAMLConfiguration.federated_identity_provider_entity_ids.category,
SAMLConfiguration.federated_identity_provider_entity_ids.format,
SAMLConfiguration.federated_identity_provider_entity_ids.index,
)
except:
pass
# 4. Return updated settings
return SAMLConfiguration.to_settings()
[docs]class SAMLConfigurationFactory(ConfigurationFactory):
"""Factory creating new instances of SAMLConfiguration class."""
def __init__(self, parser):
"""Initialize a new instance of SAMLConfigurationFactory class.
:param parser: SAMLMetadataParser object
:type parser: api.saml.metadata.parser.SAMLMetadataParser
"""
if not isinstance(parser, SAMLMetadataParser):
raise ValueError(
"Argument 'parser' must be an instance of {0} class".format(
SAMLMetadataParser
)
)
self._parser = parser
[docs] @contextmanager
def create(self, configuration_storage, db, configuration_grouping_class):
"""Create a new instance of SAMLConfiguration.
:param configuration_storage: ConfigurationStorage object
:type configuration_storage: ConfigurationStorage
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param configuration_grouping_class: Configuration bucket's class
:type configuration_grouping_class: Type[ConfigurationGrouping]
:return: SAMLConfiguration object
:rtype: SAMLConfiguration
"""
if not issubclass(configuration_grouping_class, SAMLConfiguration):
raise ValueError(
"Argument 'configuration_grouping_class' must be a subclass of {0} class".format(
SAMLConfiguration
)
)
with configuration_grouping_class(
configuration_storage, db, self._parser
) as configuration_bucket:
yield configuration_bucket
[docs]class SAMLOneLoginConfiguration(object):
"""Converts metadata objects to the OneLogin's SAML Toolkit format"""
DEBUG = "debug"
STRICT = "strict"
ENTITY_ID = "entityId"
URL = "url"
BINDING = "binding"
X509_CERT = "x509cert"
X509_CERT_MULTI = "x509certMulti"
SIGNING = "signing"
ENCRYPTION = "encryption"
IDP = "idp"
SINGLE_SIGN_ON_SERVICE = "singleSignOnService"
SP = "sp"
ASSERTION_CONSUMER_SERVICE = "assertionConsumerService"
NAME_ID_FORMAT = "NameIDFormat"
PRIVATE_KEY = "privateKey"
SECURITY = "security"
AUTHN_REQUESTS_SIGNED = "authnRequestsSigned"
def __init__(self, configuration):
"""Initializes a new instance of SAMLOneLoginConfiguration class
:param configuration: Configuration object containing SAML metadata
:type configuration: api.saml.configuration.model.SAMLConfiguration
"""
self._configuration = configuration
self._service_provider = None
self._identity_providers = {}
def _get_identity_provider_settings(self, identity_provider):
"""Converts ServiceProviderMetadata object to the OneLogin's SAML Toolkit format
:param identity_provider: IdentityProviderMetadata object
:type identity_provider: IdentityProviderMetadata
:return: Dictionary containing service provider's settings in the OneLogin's SAML Toolkit format
:rtype: Dict
"""
onelogin_identity_provider = {
self.IDP: {
self.ENTITY_ID: identity_provider.entity_id,
self.SINGLE_SIGN_ON_SERVICE: {
self.URL: identity_provider.sso_service.url,
self.BINDING: identity_provider.sso_service.binding.value,
},
},
self.SECURITY: {
self.AUTHN_REQUESTS_SIGNED: identity_provider.want_authn_requests_signed
},
}
if (
len(identity_provider.signing_certificates) == 1
and len(identity_provider.encryption_certificates) == 1
and identity_provider.signing_certificates[0]
== identity_provider.encryption_certificates[0]
):
onelogin_identity_provider[self.IDP][
self.X509_CERT
] = identity_provider.signing_certificates[0]
else:
if len(identity_provider.signing_certificates) > 0:
if self.X509_CERT_MULTI not in onelogin_identity_provider[self.IDP]:
onelogin_identity_provider[self.IDP][self.X509_CERT_MULTI] = {}
onelogin_identity_provider[self.IDP][self.X509_CERT_MULTI][
self.SIGNING
] = identity_provider.signing_certificates
if len(identity_provider.encryption_certificates) > 0:
if self.X509_CERT_MULTI not in onelogin_identity_provider[self.IDP]:
onelogin_identity_provider[self.IDP][self.X509_CERT_MULTI] = {}
onelogin_identity_provider[self.IDP][self.X509_CERT_MULTI][
self.ENCRYPTION
] = identity_provider.encryption_certificates
return onelogin_identity_provider
def _get_service_provider_settings(self, service_provider):
"""Converts ServiceProviderMetadata object to the OneLogin's SAML Toolkit format
:param service_provider: ServiceProviderMetadata object
:type service_provider: ServiceProviderMetadata
:return: Dictionary containing service provider's settings in the OneLogin's SAML Toolkit format
:rtype: Dict
"""
onelogin_service_provider = {
self.SP: {
self.ENTITY_ID: service_provider.entity_id,
self.ASSERTION_CONSUMER_SERVICE: {
self.URL: service_provider.acs_service.url,
self.BINDING: service_provider.acs_service.binding.value,
},
self.NAME_ID_FORMAT: service_provider.name_id_format,
self.X509_CERT: service_provider.certificate
if service_provider.certificate
else "",
self.PRIVATE_KEY: service_provider.private_key
if service_provider.private_key
else "",
},
self.SECURITY: {
self.AUTHN_REQUESTS_SIGNED: service_provider.authn_requests_signed
},
}
return onelogin_service_provider
@property
def configuration(self):
"""Returns original configuration
:return: Original configuration
:rtype: api.saml.configuration.model.SAMLConfiguration
"""
return self._configuration
[docs] def get_identity_provider_settings(self, db, idp_entity_id):
"""Returns a dictionary containing identity provider's settings in a OneLogin's SAML Toolkit format
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param idp_entity_id: IdP's entity ID
:type idp_entity_id: string
:return: Dictionary containing identity provider's settings in a OneLogin's SAML Toolkit format
:rtype: Dict
"""
if idp_entity_id in self._identity_providers:
return self._identity_providers[idp_entity_id]
identity_providers = [
idp
for idp in self._configuration.get_identity_providers(db)
if idp.entity_id == idp_entity_id
]
if not identity_providers:
raise SAMLConfigurationError(
_(
"There is no identity provider with entityID = {0}".format(
idp_entity_id
)
)
)
if len(identity_providers) > 1:
raise SAMLConfigurationError(
_(
"There are multiple identity providers with entityID = {0}".format(
idp_entity_id
)
)
)
identity_provider = identity_providers[0]
identity_provider = self._get_identity_provider_settings(identity_provider)
self._identity_providers[idp_entity_id] = identity_provider
return identity_provider
[docs] def get_service_provider_settings(self, db):
"""Returns a dictionary containing service provider's settings in the OneLogin's SAML Toolkit format
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:return: Dictionary containing service provider's settings in the OneLogin's SAML Toolkit format
:rtype: Dict
"""
if self._service_provider is None:
self._service_provider = self._get_service_provider_settings(
self._configuration.get_service_provider(db)
)
return self._service_provider
[docs] def get_settings(self, db, idp_entity_id):
"""Returns a dictionary containing SP's and IdP's settings in the OneLogin's SAML Toolkit format
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param idp_entity_id: IdP's entity ID
:type idp_entity_id: string
:return: Dictionary containing SP's and IdP's settings in the OneLogin's SAML Toolkit format
:rtype: Dict
"""
onelogin_settings = {
self.DEBUG: self._configuration.service_provider_debug_mode,
self.STRICT: self._configuration.service_provider_strict_mode,
}
identity_provider_settings = self.get_identity_provider_settings(
db, idp_entity_id
)
service_provider_settings = self.get_service_provider_settings(db)
onelogin_settings.update(identity_provider_settings)
onelogin_settings.update(service_provider_settings)
# We need to use disjunction separately because dict.update just overwrites values
onelogin_settings[self.SECURITY][self.AUTHN_REQUESTS_SIGNED] = (
service_provider_settings[self.SECURITY][self.AUTHN_REQUESTS_SIGNED]
or service_provider_settings[self.SECURITY][self.AUTHN_REQUESTS_SIGNED]
)
settings = OneLogin_Saml2_Settings(onelogin_settings)
return {
self.DEBUG: self._configuration.service_provider_debug_mode,
self.STRICT: self._configuration.service_provider_strict_mode,
self.IDP: settings.get_idp_data(),
self.SP: settings.get_sp_data(),
self.SECURITY: settings.get_security_data(),
}