import datetime
import json
import logging
from enum import Enum
from api.saml.metadata.model import SAMLAttributeType, SAMLSubjectJSONDecoder
from core.model import Credential, DataSource, DataSourceConstants, Patron
from core.util import first_or_default, is_session
[docs]class ProQuestCredentialType(Enum):
"""Contains an enumeration of different ProQuest credential types"""
PROQUEST_JWT_TOKEN = "ProQuest JWT Token"
[docs]class ProQuestCredentialManager(object):
"""Manages ProQuest credentials."""
def __init__(self):
"""Initialize a new instance of ProQuestCredentialManager class."""
self._logger = logging.getLogger(__name__)
def _extract_saml_subject(self, credential):
"""Extract a SAML subject from SAML token.
:param credential: Credential object containing a SAML token
:type credential: core.model.credential.Credential
:return: SAML subject
:rtype: api.saml.metadata.Subject
"""
self._logger.debug("Started deserializing SAML token {0}".format(credential))
subject = json.loads(credential.credential, cls=SAMLSubjectJSONDecoder)
self._logger.debug(
"Finished deserializing SAML token {0}: {1}".format(credential, subject)
)
return subject
def _lookup_saml_token(self, db, patron):
"""Look up for a SAML token.
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param patron: Patron object
:type patron: core.model.patron.Patron
:return: SAML subject (if any)
:rtype: Optional[api.saml.metadata.Subject]
"""
self._logger.debug("Started looking up for a SAML token")
from api.authenticator import BaseSAMLAuthenticationProvider
credential = Credential.lookup_by_patron(
db,
BaseSAMLAuthenticationProvider.TOKEN_DATA_SOURCE_NAME,
BaseSAMLAuthenticationProvider.TOKEN_TYPE,
patron,
allow_persistent_token=False,
auto_create_datasource=True,
)
self._logger.debug(
"Finished looking up for a SAML token: {0}".format(credential)
)
return credential
[docs] def lookup_proquest_token(self, db, patron):
"""Look up for a JWT bearer token used required to use ProQuest API.
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param patron: Patron object
:type patron: core.model.patron.Patron
:return: Credential object containing the existing ProQuest JWT bearer token (if any)
:rtype: Optional[core.model.credential.Credential]
"""
if not is_session(db):
raise ValueError('"db" argument must be a valid SQLAlchemy session')
if not isinstance(patron, Patron):
raise ValueError('"patron" argument must be an instance of Patron class')
self._logger.debug("Started looking up for a ProQuest JWT token")
credential = Credential.lookup_by_patron(
db,
DataSourceConstants.PROQUEST,
ProQuestCredentialType.PROQUEST_JWT_TOKEN.value,
patron,
allow_persistent_token=False,
auto_create_datasource=True,
)
self._logger.debug(
"Finished looking up for a ProQuest JWT token: {0}".format(credential)
)
if credential:
return credential
return None
[docs] def save_proquest_token(self, db, patron, duration, token):
"""Save a ProQuest JWT bearer token for later use.
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param patron: Patron object
:type patron: core.model.patron.Patron
:param duration: How long this token can be valid
:type duration: datetime.timedelta
:param token: ProQuest JWT bearer token
:type token: str
:return: Credential object containing a new ProQuest JWT bearer token
:rtype: Optional[core.model.credential.Credential]
"""
if not is_session(db):
raise ValueError('"db" argument must be a valid SQLAlchemy session')
if not isinstance(patron, Patron):
raise ValueError('"patron" argument must be an instance of Patron class')
if not isinstance(duration, datetime.timedelta):
raise ValueError(
'"duration" argument must be an instance of datetime.timedelta class'
)
if not isinstance(token, str) or not token:
raise ValueError('"token" argument must be a non-empty string')
self._logger.debug(
"Started saving a ProQuest JWT bearer token {0}".format(token)
)
data_source = DataSource.lookup(
db, DataSourceConstants.PROQUEST, autocreate=True
)
credential, is_new = Credential.temporary_token_create(
db,
data_source,
ProQuestCredentialType.PROQUEST_JWT_TOKEN.value,
patron,
duration,
token,
)
self._logger.debug(
"Finished saving a ProQuest JWT bearer token {0}: {1} (new = {2})".format(
token, credential, is_new
)
)
return credential
[docs] def lookup_patron_affiliation_id(
self,
db,
patron,
affiliation_attributes=(
SAMLAttributeType.eduPersonPrincipalName.name,
SAMLAttributeType.eduPersonScopedAffiliation.name,
),
):
"""Look up for patron's SAML affiliation ID.
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param patron: Patron object
:type patron: core.model.patron.Patron
:param affiliation_attributes: SAML attributes containing an affiliation ID
:type affiliation_attributes: Tuple
:return: Patron's SAML affiliation ID (if any)
:rtype: Optional[str]
"""
if not is_session(db):
raise ValueError('"db" argument must be a valid SQLAlchemy session')
if not isinstance(patron, Patron):
raise ValueError('"patron" argument must be an instance of Patron class')
if affiliation_attributes and not isinstance(affiliation_attributes, tuple):
raise ValueError('"affiliation_attributes" argument must be a tuple')
self._logger.debug(
"Started looking for SAML affiliation ID in for patron {0} in {1}".format(
patron, affiliation_attributes
)
)
saml_credential = self._lookup_saml_token(db, patron)
if not saml_credential:
self._logger.debug("Patron {0} does not have a SAML token".format(patron))
return None
saml_subject = self._extract_saml_subject(saml_credential)
self._logger.debug(
"Patron {0} has the following SAML subject: {1}".format(
patron, saml_subject
)
)
affiliation_id = None
for attribute_name in affiliation_attributes:
self._logger.debug("Trying to find attribute {0}".format(attribute_name))
if attribute_name in saml_subject.attribute_statement.attributes:
attribute = saml_subject.attribute_statement.attributes[attribute_name]
self._logger.debug(
"Found {0} with the following values: {1}".format(
attribute, attribute.values
)
)
affiliation_id = first_or_default(attribute.values)
break
self._logger.debug(
"Finished looking for SAML affiliation ID in for patron {0} in {1}: {2}".format(
patron, affiliation_attributes, affiliation_id
)
)
return affiliation_id