import feedparser
from flask_babel import lazy_gettext as _
from html_sanitizer import Sanitizer
import json
import logging
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.session import Session
from core.model import (
create,
get_one,
get_one_or_create,
ConfigurationSetting,
ExternalIntegration,
)
from core.scripts import LibraryInputScript
from core.util.http import HTTP
from core.util.problem_detail import (
ProblemDetail,
JSON_MEDIA_TYPE as PROBLEM_DETAIL_JSON_MEDIA_TYPE,
)
import base64
from api.util.short_client_token import ShortClientTokenUtility
from api.config import Configuration
from api.controller import CirculationManager
from api.problem_details import *
[docs]class RemoteRegistry(object):
"""A circulation manager's view of a remote service that supports
the OPDS Directory Registration Protocol:
https://github.com/NYPL-Simplified/Simplified/wiki/OPDS-Directory-Registration-Protocol
In practical terms, this may be a library registry (which has
DISCOVERY_GOAL and wants to help patrons find their libraries) or
it may be a shared ODL collection (which has LICENSE_GOAL).
"""
DEFAULT_LIBRARY_REGISTRY_URL = "https://libraryregistry.librarysimplified.org/"
OPDS_1_PREFIX = "application/atom+xml;profile=opds-catalog"
OPDS_2_TYPE = "application/opds+json"
def __init__(self, integration):
"""Constructor."""
self.integration = integration
[docs] @classmethod
def for_integration_id(cls, _db, integration_id, goal):
"""Find a LibraryRegistry object configured
by the given ExternalIntegration ID.
:param goal: The ExternalIntegration's .goal must be this goal.
"""
integration = get_one(_db, ExternalIntegration,
goal=goal,
id=integration_id)
if not integration:
return None
return cls(integration)
[docs] @classmethod
def for_protocol_and_goal(cls, _db, protocol, goal):
"""Find all LibraryRegistry objects with the given protocol and goal."""
for i in _db.query(ExternalIntegration).filter(
ExternalIntegration.goal==goal,
ExternalIntegration.protocol==protocol,
):
yield cls(i)
[docs] @classmethod
def for_protocol_goal_and_url(cls, _db, protocol, goal, url):
"""Get a LibraryRegistry for the given protocol, goal, and
URL. Create the corresponding ExternalIntegration if necessary.
"""
try:
integration = ExternalIntegration.with_setting_value(
_db, protocol, goal, ExternalIntegration.URL, url
).one()
except NoResultFound:
integration = None
if not integration:
integration, is_new = create(
_db, ExternalIntegration, protocol=protocol, goal=goal
)
integration.setting(ExternalIntegration.URL).value = url
return cls(integration)
@property
def registrations(self):
"""Find all of this site's successful registrations with
this RemoteRegistry.
:yield: A sequence of Registration objects.
"""
for x in self.integration.libraries:
yield Registration(self, x)
[docs] def fetch_catalog(self, catalog_url=None, do_get=HTTP.debuggable_get):
"""Fetch the root catalog for this RemoteRegistry.
:return: A ProblemDetail if there's a problem communicating
with the service or parsing the catalog; otherwise a 2-tuple
(registration URL, Adobe vendor ID).
"""
catalog_url = catalog_url or self.integration.url
response = do_get(catalog_url)
if isinstance(response, ProblemDetail):
return response
return self._extract_catalog_information(response)
@classmethod
def _extract_catalog_information(cls, response):
"""From an OPDS catalog, extract information that's essential to
kickstarting the OPDS Directory Registration Protocol.
:param response: A requests-style Response object.
:return A ProblemDetail if there's a problem accessing the
catalog; otherwise a 2-tuple (registration URL, Adobe vendor
ID).
"""
result = cls._extract_links(response)
if isinstance(result, ProblemDetail):
return result
catalog, links = result
if catalog:
vendor_id = catalog.get("metadata", {}).get("adobe_vendor_id")
else:
vendor_id = None
register_url = None
for link in links:
if link.get("rel") == "register":
register_url = link.get("href")
break
if not register_url:
return REMOTE_INTEGRATION_FAILED.detailed(_("The service at %(url)s did not provide a register link.", url=response.url))
return register_url, vendor_id
[docs] def fetch_registration_document(self, do_get=HTTP.debuggable_get):
"""Fetch a discovery service's registration document and extract
useful information from it.
:return: A ProblemDetail if there's a problem accessing the
service; otherwise, a 2-tuple (terms_of_service_link,
terms_of_service_html), containing information about the
Terms of Service that govern a circulation manager's
registration with the discovery service.
"""
catalog = self.fetch_catalog(do_get=do_get)
if isinstance(catalog, ProblemDetail):
return catalog
registration_url, vendor_id = catalog
response = do_get(registration_url)
if isinstance(response, ProblemDetail):
return response
terms_of_service_link, terms_of_service_html = (
self._extract_registration_information(response)
)
return terms_of_service_link, terms_of_service_html
@classmethod
def _extract_registration_information(cls, response):
"""From an OPDS registration document, extract information that's
useful to kickstarting the OPDS Directory Registration Protocol.
The registration document is completely optional, so an
invalid or unintelligible document is treated the same as a
missing document.
:return: A 2-tuple (terms_of_service_link,
terms_of_service_html), containing information about the
Terms of Service that govern a circulation manager's
registration with the discovery service. If the
registration document is missing or malformed, both values
will be None.
"""
tos_link = None
tos_html = None
result = cls._extract_links(response)
if isinstance(result, ProblemDetail):
return None, None
catalog, links = result
for link in links:
if link.get("rel") != "terms-of-service":
continue
url = link.get('href')
is_http = any(
[url.startswith(protocol + "://")
for protocol in ("http", "https")]
)
if is_http and not tos_link:
tos_link = url
elif url.startswith("data:") and not tos_html:
try:
tos_html = cls._decode_data_url(url)
except Exception as e:
tos_html = None
return tos_link, tos_html
@classmethod
def _extract_links(cls, response):
"""Parse an OPDS 1 or OPDS feed out of a Requests response object.
:return: A 2-tuple (parsed_catalog, links),
with `links` being a list of dictionaries, each containing
one OPDS link.
"""
# The response must contain either an OPDS 2 catalog or an OPDS 1 feed.
type = response.headers.get("Content-Type")
if type and type.startswith(cls.OPDS_2_TYPE):
# This is an OPDS 2 catalog.
catalog = json.loads(response.content)
links = catalog.get("links", [])
elif type and type.startswith(cls.OPDS_1_PREFIX):
# This is an OPDS 1 feed.
feed = feedparser.parse(response.content)
links = feed.get("feed", {}).get("links", [])
catalog = None
else:
return REMOTE_INTEGRATION_FAILED.detailed(_("The service at %(url)s did not return OPDS.", url=response.url))
return catalog, links
@classmethod
def _decode_data_url(cls, url):
"""Convert a data: URL to a string of sanitized HTML.
:raise ValueError: If the data: URL is invalid, in an
unexpected format, or does not have a supported media type.
:return: A string.
"""
if not url.startswith("data:"):
raise ValueError("Not a data: URL: %s" % url)
parts = url.split(",")
if len(parts) != 2:
raise ValueError("Invalid data: URL: %s" % url)
header, encoded = parts
if not header.endswith(";base64"):
raise ValueError("data: URL not base64-encoded: %s" % url)
media_type = header[len("data:"):-len(";base64")]
if not any(
media_type.startswith(x) for x in ("text/html", "text/plain")
):
raise ValueError(
"Unsupported media type in data: URL: %s" % media_type
)
html = base64.b64decode(encoded.encode("utf-8")).decode("utf-8")
return Sanitizer().sanitize(html)
[docs]class Registration(object):
"""A library's registration for a particular registry.
The registration does not correspond to one specific data model
object -- it's a relationship between a Library and an
ExternalIntegration, and a set of ConfigurationSettings that
configure the relationship between the two.
"""
# A library may be succesfully registered with a registry, or the
# registration may have failed.
LIBRARY_REGISTRATION_STATUS = "library-registration-status"
SUCCESS_STATUS = "success"
FAILURE_STATUS = "failure"
# A library may be registered in a 'testing' stage or a
# 'production' stage. This represents the _library's_ opinion
# about whether the integration is ready for production. The
# library won't actually be in production (whatever that means for
# a given integration) until the _remote_ also thinks it should.
#
# TODO: Registration through the admin interface always happens in
# 'production' because there is no UI for specifying which stage
# to use. When registration happens through a script, the admin gets
# to specify 'testing' or 'production'.
LIBRARY_REGISTRATION_STAGE = "library-registration-stage"
TESTING_STAGE = "testing"
PRODUCTION_STAGE = "production"
VALID_REGISTRATION_STAGES = [TESTING_STAGE, PRODUCTION_STAGE]
# A registry may provide access to a web client. If so, we'll store
# the URL so we can enable CORS headers in requests from that client,
# and use it in MARC records so the library's main catalog can link
# to it.
LIBRARY_REGISTRATION_WEB_CLIENT = "library-registration-web-client"
def __init__(self, registry, library):
self.registry = registry
self.integration = self.registry.integration
self.library = library
self._db = Session.object_session(self.integration)
if not library in self.integration.libraries:
self.integration.libraries.append(library)
# Find or create all the ConfigurationSettings that configure
# this relationship between library and registry.
# Has the registration succeeded? (Initial value: no.)
self.status_field = self.setting(
self.LIBRARY_REGISTRATION_STATUS, self.FAILURE_STATUS
)
# Does the library want to be in the testing or production stage?
# (Initial value: testing.)
self.stage_field = self.setting(
self.LIBRARY_REGISTRATION_STAGE, self.TESTING_STAGE
)
# If the registry provides a web client for the library, it will
# be stored in this setting.
self.web_client_field = self.setting(self.LIBRARY_REGISTRATION_WEB_CLIENT)
[docs] def setting(self, key, default_value=None):
"""Find or create a ConfigurationSetting that configures this
relationship between library and registry.
:param key: Name of the ConfigurationSetting.
:return: A 2-tuple (ConfigurationSetting, is_new)
"""
setting = ConfigurationSetting.for_library_and_externalintegration(
self._db, key, self.library, self.integration
)
if setting.value is None and default_value is not None:
setting.value = default_value
return setting
[docs] def push(self, stage, url_for, catalog_url=None, do_get=HTTP.debuggable_get,
do_post=HTTP.debuggable_post):
"""Attempt to register a library with a RemoteRegistry.
NOTE: This method is designed to be used in a
controller. Other callers may use this method, but they must be
able to render a ProblemDetail when there's a failure.
NOTE: The application server must be running when this method
is called, because part of the OPDS Directory Registration
Protocol is the remote server retrieving the library's
Authentication For OPDS document.
:param stage: Either TESTING_STAGE or PRODUCTION_STAGE
:param url_for: Flask url_for() or equivalent, used to generate URLs
for the application server.
:param do_get: Mockable method to make a GET request.
:param do_post: Mockable method to make a POST request.
:return: A ProblemDetail if there was a problem; otherwise True.
"""
# Assume that the registration will fail.
#
# TODO: If a registration has previously succeeded, failure to
# re-register probably means a maintenance of the status quo,
# not a change of success to failure. But we don't have any way
# of being sure.
self.status_field.value = self.FAILURE_STATUS
if stage not in self.VALID_REGISTRATION_STAGES:
return INVALID_INPUT.detailed(
_("%r is not a valid registration stage") % stage
)
# Verify that a public/private key pair exists for this library.
# This key pair is created during initialization of the
# LibraryAuthenticator, so this should always be present.
#
# We can't just create the key pair here because the process
# of pushing a registration involves the other site making a
# request to the circulation manager. This means the key pair
# needs to be committed to the database _before_ the push
# attempt starts.
key_pair = ConfigurationSetting.for_library(
Configuration.KEY_PAIR, self.library).json_value
if not key_pair:
# TODO: We could create the key pair _here_. The database
# session will be committed at the end of this request,
# so the push attempt would succeed if repeated.
return SHARED_SECRET_DECRYPTION_ERROR.detailed(
_("Library %(library)s has no key pair set.",
library=self.library.short_name)
)
public_key, private_key = key_pair
cipher = Configuration.cipher(private_key)
# Before we can start the registration protocol, we must fetch
# the remote catalog's URL and extract the link to the
# registration resource that kicks off the protocol.
result = self.registry.fetch_catalog(catalog_url, do_get)
if isinstance(result, ProblemDetail):
return result
register_url, vendor_id = result
# Store the vendor id as a ConfigurationSetting on the integration
# -- it'll be the same value for all libraries.
if vendor_id:
ConfigurationSetting.for_externalintegration(
ShortClientTokenUtility.VENDOR_ID_KEY, self.integration
).value = vendor_id
# Build the document we'll be sending to the registration URL.
payload = self._create_registration_payload(url_for, stage)
if isinstance(payload, ProblemDetail):
return payload
headers = self._create_registration_headers()
if isinstance(headers, ProblemDetail):
return headers
# Send the document.
response = self._send_registration_request(
register_url, headers, payload, do_post
)
if isinstance(response, ProblemDetail):
return response
catalog = json.loads(response.content)
# Process the result.
return self._process_registration_result(catalog, cipher, stage)
def _create_registration_payload(self, url_for, stage):
"""Collect the key-value pairs to be sent when kicking off the
registration protocol.
:param url_for: An implementation of Flask url_for.
:param state: The registrant's opinion about what stage this
registration should be in.
:return: A dictionary suitable for passing into requests.post.
"""
auth_document_url = url_for(
"authentication_document",
library_short_name=self.library.short_name
)
payload = dict(url=auth_document_url, stage=stage)
# Find the email address the administrator should use if they notice
# a problem with the way the library is using an integration.
contact = Configuration.configuration_contact_uri(self.library)
if contact:
payload['contact'] = contact
return payload
def _create_registration_headers(self):
shared_secret = self.setting(ExternalIntegration.PASSWORD).value
headers = {}
if shared_secret:
headers['Authorization'] = "Bearer %s" % shared_secret
return headers
@classmethod
def _send_registration_request(cls, register_url, headers, payload, do_post):
"""Send the request that actually kicks off the OPDS Directory
Registration Protocol.
:return: Either a ProblemDetail or a requests-like Response object.
"""
# Allow 400 and 401 so we can provide a more useful error message.
response = do_post(
register_url, headers=headers, payload=payload, timeout=60,
allowed_response_codes=["2xx", "3xx", "400", "401"],
)
if response.status_code in [400, 401]:
if response.headers.get("Content-Type") == PROBLEM_DETAIL_JSON_MEDIA_TYPE:
problem = json.loads(response.content)
return INTEGRATION_ERROR.detailed(
_("Remote service returned: \"%(problem)s\"", problem=problem.get("detail")))
else:
return INTEGRATION_ERROR.detailed(
_("Remote service returned: \"%(problem)s\"", problem=response.content.decode("utf-8")))
return response
@classmethod
def _decrypt_shared_secret(cls, cipher, shared_secret):
"""Attempt to decrypt an encrypted shared secret.
:param cipher: A Cipher object.
:param shared_secret: A byte string.
:return: The decrypted shared secret, as a bytestring, or
a ProblemDetail if it could not be decrypted.
"""
try:
shared_secret = cipher.decrypt(base64.b64decode(shared_secret))
except ValueError as e:
return SHARED_SECRET_DECRYPTION_ERROR.detailed(
_("Could not decrypt shared secret %s") % shared_secret
)
return shared_secret
def _process_registration_result(self, catalog, cipher, desired_stage):
"""We just sent out a registration request and got an OPDS catalog
in return. Process that catalog.
:param catalog: A dictionary derived from an OPDS 2 catalog.
:param cipher: A Cipher object.
:param desired_stage: Our opinion, as communicated to the
server, about whether this library is ready to go into
production.
"""
# Since every library has a public key, the catalog should have provided
# credentials for future authenticated communication,
# e.g. through Short Client Tokens or authenticated API
# requests.
if not isinstance(catalog, dict):
return INTEGRATION_ERROR.detailed(
_("Remote service served %(representation)r, which I can't make sense of as an OPDS document.", representation=catalog)
)
metadata = catalog.get("metadata", {})
short_name = metadata.get("short_name")
shared_secret = metadata.get("shared_secret")
links = catalog.get("links", [])
web_client_url = None
for link in links:
if link.get("rel") == "self" and link.get("type") == "text/html":
web_client_url = link.get("href")
break
if short_name:
setting = self.setting(ExternalIntegration.USERNAME)
setting.value = short_name
if shared_secret:
shared_secret = self._decrypt_shared_secret(
cipher, shared_secret
)
if isinstance(shared_secret, ProblemDetail):
return shared_secret
setting = self.setting(ExternalIntegration.PASSWORD)
# NOTE: we can only store Unicode data in the
# ConfigurationSetting.value, so this requires that the
# shared secret encoded as UTF-8. This works for the
# library registry product, which uses a long string of
# hex digits as its shared secret.
setting.value = shared_secret.decode("utf8")
# We have successfully completed the registration.
self.status_field.value = self.SUCCESS_STATUS
# Our opinion about the proper stage of this library was succesfully
# communicated to the registry.
self.stage_field.value = desired_stage
# Store the web client URL as a ConfigurationSetting.
if web_client_url:
self.web_client_field.value = web_client_url
return True
[docs]class LibraryRegistrationScript(LibraryInputScript):
"""Register local libraries with a remote library registry."""
PROTOCOL = ExternalIntegration.OPDS_REGISTRATION
GOAL = ExternalIntegration.DISCOVERY_GOAL
[docs] @classmethod
def arg_parser(cls, _db):
parser = LibraryInputScript.arg_parser(_db)
parser.add_argument(
'--registry-url',
help="Register libraries with the given registry.",
default=RemoteRegistry.DEFAULT_LIBRARY_REGISTRY_URL
)
parser.add_argument(
'--stage',
help="Register these libraries in the 'testing' stage or the 'production' stage.",
choices=(Registration.TESTING_STAGE, Registration.PRODUCTION_STAGE)
)
return parser
[docs] def do_run(self, cmd_args=None, in_unit_test=False):
parser = self.arg_parser(self._db)
parsed = self.parse_command_line(self._db, cmd_args)
url = parsed.registry_url
registry = RemoteRegistry.for_protocol_goal_and_url(
self._db, self.PROTOCOL, self.GOAL, url
)
stage = parsed.stage
# Set up an application context so we have access to url_for.
from api.app import app
app.manager = CirculationManager(self._db, testing=in_unit_test)
base_url = ConfigurationSetting.sitewide(
self._db, Configuration.BASE_URL_KEY
).value
ctx = app.test_request_context(base_url=base_url)
ctx.push()
for library in parsed.libraries:
registration = Registration(registry, library)
library_stage = stage or registration.stage_field.value
self.process_library(
registration, library_stage, app.manager.url_for
)
ctx.pop()
# For testing purposes, return the application object that was
# created.
return app
[docs] def process_library(self, registration, stage, url_for):
"""Push one Library's registration to the given RemoteRegistry."""
logger = logging.getLogger(
"Registration of library %r" % registration.library.short_name
)
logger.info(
"Registering with %s as %s",
registration.registry.integration.url, stage
)
try:
result = registration.push(stage, url_for)
except Exception as e:
logger.error("Exception during registration", exc_info=e)
return False
if isinstance(result, ProblemDetail):
data, status_code, headers = result.response
logger.error(
"Could not complete registration. Problem detail document: %r" % data
)
return result
else:
logger.info("Success.")
return result