Source code for api.saml.metadata.parser

import logging

from defusedxml.lxml import fromstring
from flask_babel import lazy_gettext as _
from lxml.etree import XMLSyntaxError
from onelogin.saml2.constants import OneLogin_Saml2_Constants
from onelogin.saml2.utils import OneLogin_Saml2_XML

from api.saml.metadata.model import (
    SAMLAttribute,
    SAMLAttributeStatement,
    SAMLAttributeType,
    SAMLBinding,
    SAMLIdentityProviderMetadata,
    SAMLLocalizedMetadataItem,
    SAMLNameID,
    SAMLNameIDFormat,
    SAMLOrganization,
    SAMLService,
    SAMLServiceProviderMetadata,
    SAMLSubject,
    SAMLUIInfo,
)
from core.exceptions import BaseError


[docs]class SAMLMetadataParsingError(BaseError): """Raised in the case of any errors occurred during parsing of SAML metadata"""
[docs]class SAMLMetadataParsingResult(object): def __init__(self, provider, xml_node): """Initialize a new instance of SAMLMetadataParsingResult class. :param provider: Object containing either SP's or IdP's metadata :type provider: api.saml.metadata.model.SAMLProviderMetadata :param xml_node: XML node containing metadata :type xml_node: defusedxml.lxml.RestrictedElement """ self._provider = provider self._xml_node = xml_node @property def provider(self): """Return the object containing either SP's or IdP's metadata. :return: Object containing either SP's or IdP's metadata :rtype: api.saml.metadata.model.SAMLProviderMetadata """ return self._provider @property def xml_node(self): """Return the XML node containing metadata :return: XML node containing metadata :rtype: defusedxml.lxml.RestrictedElement """ return self._xml_node
[docs]class SAMLMetadataParser(object): """Parses SAML metadata""" def __init__(self, skip_incorrect_providers=False): """Initialize a new instance of MetadataParser class. :param skip_incorrect_providers: Boolean value indicating whether the parse should skip incorrect SAML provider declarations instead of raising an exception :type skip_incorrect_providers: bool """ self._skip_incorrect_providers = skip_incorrect_providers self._logger = logging.getLogger(__name__) # Add missing namespaces to be able to parse mdui:UIInfoType OneLogin_Saml2_Constants.NS_PREFIX_MDUI = "mdui" OneLogin_Saml2_Constants.NS_MDUI = "urn:oasis:names:tc:SAML:metadata:ui" OneLogin_Saml2_Constants.NSMAP[ OneLogin_Saml2_Constants.NS_PREFIX_MDUI ] = OneLogin_Saml2_Constants.NS_MDUI OneLogin_Saml2_Constants.NS_PREFIX_ALG = "alg" OneLogin_Saml2_Constants.NS_ALG = "urn:oasis:names:tc:SAML:metadata:algsupport" OneLogin_Saml2_Constants.NSMAP[ OneLogin_Saml2_Constants.NS_PREFIX_ALG ] = OneLogin_Saml2_Constants.NS_ALG def _convert_xml_string_to_dom(self, xml_metadata): """Converts an XML string containing SAML metadata into XML DOM :param xml_metadata: XML string containing SAML metadata :type xml_metadata: string :return: XML DOM tree containing SAML metadata :rtype: defusedxml.lxml.RestrictedElement :raise: MetadataParsingError """ self._logger.debug( "Started converting XML string containing SAML metadata into XML DOM" ) try: metadata_dom = fromstring(xml_metadata.encode("utf-8"), forbid_dtd=True) except ( ValueError, XMLSyntaxError, ) as exception: self._logger.exception( "An unhandled exception occurred during converting XML string containing SAML metadata into XML DOM" ) raise SAMLMetadataParsingError(inner_exception=exception) self._logger.debug( "Finished converting XML string containing SAML metadata into XML DOM" ) return metadata_dom def _parse_certificates(self, certificate_nodes): """Parses XML nodes containing X.509 certificates into a list of strings :param certificate_nodes: List of XML nodes containing X.509 certificates :type certificate_nodes: List[defusedxml.lxml.RestrictedElement] :return: List of string containing X.509 certificates :rtype: List[string] :raise: MetadataParsingError """ certificates = [] self._logger.debug( "Started parsing {0} certificates".format(len(certificate_nodes)) ) try: for certificate_node in certificate_nodes: certificate = "".join( OneLogin_Saml2_XML.element_text(certificate_node).split() ) self._logger.debug( "Found the following certificate: {0}".format(certificate) ) certificates.append(certificate) except XMLSyntaxError as exception: raise SAMLMetadataParsingError(inner_exception=exception) self._logger.debug( "Finished parsing {0} certificates: {1}".format( len(certificate_nodes), certificates ) ) return certificates def _parse_providers(self, entity_descriptor_node, provider_nodes, parse_function): """Parses a list of IDPSSODescriptor/SPSSODescriptor nodes and translates them into IdentityProviderMetadata/ServiceProviderMetadata object :param entity_descriptor_node: Parent EntityDescriptor node :type entity_descriptor_node: defusedxml.lxml.RestrictedElement :param provider_nodes: List of IDPSSODescriptor/SPSSODescriptor nodes :type provider_nodes: List[defusedxml.lxml.RestrictedElement] :param parse_function: Function used to parse body of IDPSSODescriptor/SPSSODescriptor nodes and return corresponding IdentityProviderMetadata/ServiceProviderMetadata objects :type parse_function: Callable[[defusedxml.lxml.RestrictedElement, string, UIInfo], ProviderMetadata] :return: List of IdentityProviderMetadata/ServiceProviderMetadata objects containing SAML metadata from the XML :rtype: List[ProviderMetadata] :raise: MetadataParsingError """ providers = [] for provider_node in provider_nodes: entity_id = entity_descriptor_node.get("entityID", None) ui_info = self._parse_ui_info(provider_node) organization = self._parse_organization_metadata(entity_descriptor_node) try: provider = parse_function( provider_node, entity_id, ui_info, organization ) providers.append(provider) except SAMLMetadataParsingError: if not self._skip_incorrect_providers: raise return providers def _parse_localizable_metadata_items( self, provider_descriptor_node, xpath, required=False ): """Parses IDPSSODescriptor/SPSSODescriptor's mdui:UIInfo child elements (for example, mdui:DisplayName) :param provider_descriptor_node: Parent IDPSSODescriptor/SPSSODescriptor XML node :type provider_descriptor_node: defusedxml.lxml.RestrictedElement :param xpath: XPath expression for a particular md:localizedNameType child element (for example, mdui:DisplayName) :type xpath: string :param required: Boolean value indicating whether particular md:localizedNameType child element is required or not :type required: bool :return: List of md:localizedNameType child elements :rtype: Optional[List[LocalizableMetadataItem]] :raise: MetadataParsingError """ localizable_metadata_nodes = OneLogin_Saml2_XML.query( provider_descriptor_node, xpath ) if not localizable_metadata_nodes and required: last_slash_index = xpath.rfind("/") localizable_metadata_tag_name = xpath[last_slash_index + 1 :] raise SAMLMetadataParsingError( _("{0} tag is missing".format(localizable_metadata_tag_name)) ) localizable_items = None if localizable_metadata_nodes: localizable_items = [] for localizable_metadata_node in localizable_metadata_nodes: localizable_item_text = localizable_metadata_node.text localizable_item_language = localizable_metadata_node.get( "{http://www.w3.org/XML/1998/namespace}lang", None ) localizable_item = SAMLLocalizedMetadataItem( localizable_item_text, localizable_item_language ) localizable_items.append(localizable_item) return localizable_items def _parse_ui_info(self, provider_node): """Parses IDPSSODescriptor/SPSSODescriptor's mdui:UIInfo and translates it into UIInfo object :param provider_node: Parent IDPSSODescriptor/SPSSODescriptor node :type provider_node: defusedxml.lxml.RestrictedElement :return: UIInfo object :rtype: UIInfo :raise: MetadataParsingError """ display_names = self._parse_localizable_metadata_items( provider_node, "./md:Extensions/mdui:UIInfo/mdui:DisplayName" ) descriptions = self._parse_localizable_metadata_items( provider_node, "./md:Extensions/mdui:UIInfo/mdui:Description" ) information_urls = self._parse_localizable_metadata_items( provider_node, "./md:Extensions/mdui:UIInfo/mdui:InformationURL" ) privacy_statement_urls = self._parse_localizable_metadata_items( provider_node, "./md:Extensions/mdui:UIInfo/mdui:PrivacyStatementURL" ) logos = self._parse_localizable_metadata_items( provider_node, "./md:Extensions/mdui:UIInfo/mdui:Logo" ) ui_info = SAMLUIInfo( display_names, descriptions, information_urls, privacy_statement_urls, logos ) return ui_info def _parse_organization_metadata(self, entity_descriptor_node): """Parses IDPSSODescriptor/SPSSODescriptor's mdui:Organization and translates it into Organization object :param entity_descriptor_node: Parent EntityDescriptor node :type entity_descriptor_node: defusedxml.lxml.RestrictedElement :return: Organization object :rtype: Organization :raise: MetadataParsingError """ organization_names = self._parse_localizable_metadata_items( entity_descriptor_node, "./md:Organization/md:OrganizationName" ) organization_display_names = self._parse_localizable_metadata_items( entity_descriptor_node, "./md:Organization/md:OrganizationDisplayName" ) organization_urls = self._parse_localizable_metadata_items( entity_descriptor_node, "./md:Organization/md:OrganizationURL" ) organization = SAMLOrganization( organization_names, organization_display_names, organization_urls ) return organization def _parse_name_id_format(self, provider_node): """Parses a name ID format NOTE: OneLogin's python-saml library used for implementing SAML authentication support only one name ID format. If there are multiple name ID formats specified in the XML metadata, we select the first one. :param provider_node: Parent IDPSSODescriptor/SPSSODescriptor node :type provider_node: defusedxml.lxml.RestrictedElement :return: Name ID format :rtype: string """ name_id_format = SAMLNameIDFormat.UNSPECIFIED.value name_id_format_nodes = OneLogin_Saml2_XML.query( provider_node, "./ md:NameIDFormat" ) if len(name_id_format_nodes) > 0: # OneLogin's python-saml supports only one name ID format so we select the first one name_id_format = OneLogin_Saml2_XML.element_text(name_id_format_nodes[0]) return name_id_format def _parse_idp_metadata( self, provider_node, entity_id, ui_info, organization, required_sso_binding=SAMLBinding.HTTP_REDIRECT, required_slo_binding=SAMLBinding.HTTP_REDIRECT, ): """Parses IDPSSODescriptor node and translates it into an IdentityProviderMetadata object :param provider_node: IDPSSODescriptor node containing IdP metadata :param provider_node: defusedxml.lxml.RestrictedElement :param entity_id: String containing IdP's entityID :type entity_id: string :param ui_info: UIInfo object containing IdP's description :type ui_info: UIInfo :param organization: Organization object containing basic information about an organization responsible for a SAML entity or role :type organization: Organization :param required_sso_binding: Required binding for Single Sign-On profile (HTTP-Redirect by default) :type required_sso_binding: Binding :param required_slo_binding: Required binding for Single Sing-Out profile (HTTP-Redirect by default) :type required_slo_binding: Binding :return: IdentityProviderMetadata containing IdP metadata :rtype: IdentityProviderMetadata :raise: MetadataParsingError """ want_authn_requests_signed = provider_node.get("WantAuthnRequestsSigned", False) name_id_format = self._parse_name_id_format(provider_node) sso_service = None sso_nodes = OneLogin_Saml2_XML.query( provider_node, "./md:SingleSignOnService[@Binding='%s']" % required_sso_binding.value, ) if len(sso_nodes) > 0: sso_node = self._select_default_or_first_indexed_element(sso_nodes) sso_url = sso_node.get("Location", None) sso_service = SAMLService(sso_url, required_sso_binding) else: raise SAMLMetadataParsingError( _( "Missing {0} SingleSignOnService service declaration".format( required_sso_binding.value ) ) ) slo_service = None slo_nodes = OneLogin_Saml2_XML.query( provider_node, "./md:SingleLogoutService[@Binding='%s']" % required_slo_binding.value, ) if len(slo_nodes) > 0: slo_node = self._select_default_or_first_indexed_element(slo_nodes) slo_url = slo_node.get("Location", None) slo_service = SAMLService(slo_url, required_slo_binding) signing_certificate_nodes = OneLogin_Saml2_XML.query( provider_node, './md:KeyDescriptor[not(contains(@use, "encryption"))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate', ) signing_certificates = self._parse_certificates(signing_certificate_nodes) encryption_certificate_nodes = OneLogin_Saml2_XML.query( provider_node, './md:KeyDescriptor[not(contains(@use, "signing"))]/ds:KeyInfo/ds:X509Data/ds:X509Certificate', ) encryption_certificates = self._parse_certificates(encryption_certificate_nodes) idp = SAMLIdentityProviderMetadata( entity_id, ui_info, organization, name_id_format, sso_service, slo_service, want_authn_requests_signed, signing_certificates, encryption_certificates, ) return idp def _parse_sp_metadata( self, provider_node, entity_id, ui_info, organization, required_acs_binding=SAMLBinding.HTTP_POST, ): """Parses SPSSODescriptor node and translates it into a ServiceProvider object :param provider_node: SPSSODescriptor node containing SP metadata :param provider_node: defusedxml.lxml.RestrictedElement :param entity_id: String containing IdP's entityID :type entity_id: string :param ui_info: UIInfo object containing IdP's description :type ui_info: UIInfo :param organization: Organization object containing basic information about an organization responsible for a SAML entity or role :type organization: Organization :param required_acs_binding: Required binding for Assertion Consumer Service (HTTP-Redirect by default) :type required_acs_binding: Binding :return: ServiceProvider containing SP metadata :rtype: ServiceProvider :raise: MetadataParsingError """ authn_requests_signed = provider_node.get("AuthnRequestsSigned", False) want_assertions_signed = provider_node.get("WantAssertionsSigned", False) name_id_format = self._parse_name_id_format(provider_node) acs_service = None acs_service_nodes = OneLogin_Saml2_XML.query( provider_node, "./md:AssertionConsumerService[@Binding='%s']" % required_acs_binding.value, ) if len(acs_service_nodes) > 0: acs_service_node = self._select_default_or_first_indexed_element( acs_service_nodes ) acs_url = acs_service_node.get("Location", None) acs_service = SAMLService(acs_url, required_acs_binding) else: raise SAMLMetadataParsingError( _( "Missing {0} AssertionConsumerService".format( required_acs_binding.value ) ) ) certificate_nodes = OneLogin_Saml2_XML.query( provider_node, "./md:KeyDescriptor/ds:KeyInfo/ds:X509Data/ds:X509Certificate", ) certificates = self._parse_certificates(certificate_nodes) if len(certificates) > 1: raise SAMLMetadataParsingError( _( "There are more than 1 SP certificates".format( required_acs_binding.value ) ) ) certificate = next(iter(certificates)) if certificates else None sp = SAMLServiceProviderMetadata( entity_id, ui_info, organization, name_id_format, acs_service, authn_requests_signed, want_assertions_signed, certificate, ) return sp def _select_default_element(self, nodes): """Selects a node with attribute "isDefault=true" :param nodes: List of XML nodes :type nodes: List[defusedxml.lxml.RestrictedElement] :return: "Default" node or None if there is no one :rtype: Optional[defusedxml.lxml.RestrictedElement] """ default_nodes = [node for node in nodes if node.get("isDefault", False)] default_node = self._select_first_indexed_element(default_nodes) return default_node def _select_first_indexed_element(self, nodes): """Sorts a list of XML nodes by "index" attribute and selects the first node :param nodes: List of XML nodes :type nodes: List[defusedxml.lxml.RestrictedElement] :return: Node with the smallest index or None if there is no one :rtype: Optional[defusedxml.lxml.RestrictedElement] """ if not nodes: return None nodes = sorted(nodes, key=lambda node: node.get("index", 0)) return nodes[0] def _select_default_or_first_indexed_element(self, nodes): """Selects a node with attribute "isDefault=true" or a node with the smallest "index" attribute :param nodes: List of XML nodes :type nodes: List[defusedxml.lxml.RestrictedElement] :return: "Default" node or the node with the smallest "index" attribute or None if there is no one :rtype: Optional[defusedxml.lxml.RestrictedElement] """ default_node = self._select_default_element(nodes) if default_node: return default_node return self._select_first_indexed_element(nodes)
[docs] def parse(self, xml_metadata): """Parses an XML string containing SAML metadata and translates it into a list of IdentityProviderMetadata/ServiceProviderMetadata objects :param xml_metadata: XML string containing SAML metadata :type xml_metadata: string :return: List of SAMLMetadataParsingResult objects :rtype: List[SAMLMetadataParsingResult] :raise: MetadataParsingError """ self._logger.info("Started parsing an XML string containing SAML metadata") metadata_dom = self._convert_xml_string_to_dom(xml_metadata) parsing_results = [] try: entity_descriptor_nodes = OneLogin_Saml2_XML.query( metadata_dom, "//md:EntityDescriptor" ) for entity_descriptor_node in entity_descriptor_nodes: idp_descriptor_nodes = OneLogin_Saml2_XML.query( entity_descriptor_node, "./md:IDPSSODescriptor" ) idps = self._parse_providers( entity_descriptor_node, idp_descriptor_nodes, self._parse_idp_metadata, ) for idp in idps: parsing_result = SAMLMetadataParsingResult( idp, entity_descriptor_node ) parsing_results.append(parsing_result) sp_descriptor_nodes = OneLogin_Saml2_XML.query( entity_descriptor_node, "./md:SPSSODescriptor" ) sps = self._parse_providers( entity_descriptor_node, sp_descriptor_nodes, self._parse_sp_metadata ) for sp in sps: parsing_result = SAMLMetadataParsingResult( sp, entity_descriptor_node ) parsing_results.append(parsing_result) except XMLSyntaxError as exception: self._logger.exception( "An unexpected error occurred during parsing an XML string containing SAML metadata" ) raise SAMLMetadataParsingError(inner_exception=exception) self._logger.info("Finished parsing an XML string containing SAML metadata") return parsing_results
[docs]class SAMLSubjectParser(object): """Parses SAML response into Subject object""" def _parse_name_id(self, name_id_attributes): """Parses NameID attributes :param name_id_attributes: Dictionary containing NameID attributes :type name_id_attributes: Dict :return: NameID object :rtype: NameID """ name_id = SAMLNameID( name_id_attributes["Format"], name_id_attributes["NameQualifier"], None, name_id_attributes["value"], ) return name_id def _parse_attribute_values(self, attribute_values): """Parses SAML attribute values :param attribute_values: List containing SAML attribute values :type attribute_values: List[Union[str, Dict]] :return: 2-tuple containing an optional name ID from the attribute list and a list of parsed SAML attribute values :rtype: Tuple[Optional[NameID, List[str]] """ name_id = None parsed_attribute_values = [] for attribute_value in attribute_values: if isinstance(attribute_value, dict) and "NameID" in attribute_value: name_id = self._parse_name_id(attribute_value["NameID"]) parsed_attribute_values.append(name_id.name_id) else: parsed_attribute_values.append(attribute_value) return name_id, parsed_attribute_values def _parse_attributes(self, attributes): """Parses SAML attributes :param attributes: Dictionary containing SAML attributes :type attributes: Dict :return: 2-tuple containing an optional name ID from the attribute list and an attribute statement object :rtype: Tuple[Optional[NameID, AttributeStatement] """ name_id = None parsed_attributes = [] attribute_names = { attribute.value: attribute for attribute in SAMLAttributeType } for name, attribute_values in list(attributes.items()): if name in attribute_names: name = attribute_names[name].name current_name_id, parsed_attribute_values = self._parse_attribute_values( attribute_values ) if current_name_id: name_id = current_name_id attribute = SAMLAttribute(name=name, values=parsed_attribute_values) parsed_attributes.append(attribute) attribute_statement = SAMLAttributeStatement(parsed_attributes) return name_id, attribute_statement
[docs] def parse(self, auth): """Parses OneLogin_Saml2_Auth object containing SAML response data into Subject :param auth: OneLogin_Saml2_Auth object containing SAML response :type auth: OneLogin_Saml2_Auth :return: Subject object containing SAML attributes and NameID :rtype: api.saml.metadata.Subject """ name_id = SAMLNameID( auth.get_nameid_format(), auth.get_nameid_nq(), auth.get_nameid_spnq(), auth.get_nameid(), ) raw_attributes = auth.get_attributes() attribute_name_id, attribute_statement = self._parse_attributes(raw_attributes) valid_till = auth.get_session_expiration() if attribute_name_id: name_id = attribute_name_id if not valid_till: valid_till = auth.get_last_assertion_not_on_or_after() subject = SAMLSubject(name_id, attribute_statement, valid_till) return subject