import datetime
import email
import json
import logging
import os
import pytz
import sys
import urllib.parse
from collections import defaultdict
from time import mktime
from wsgiref.handlers import format_date_time
import flask
from expiringdict import ExpiringDict
from flask import (
make_response,
Response,
redirect,
)
from flask_babel import lazy_gettext as _
from lxml import etree
from sqlalchemy.orm import eagerload
from .util.short_client_token import ShortClientTokenUtility
from .annotations import (
AnnotationWriter,
AnnotationParser,
)
from api.rbdigital import (
RBDigitalFulfillmentProxy,
RBDProxyException,
)
from api.saml.controller import SAMLController
from .authenticator import (
Authenticator,
BasicAuthTempTokenController,
CirculationPatronProfileStorage,
OAuthController,
)
from .base_controller import BaseCirculationManagerController
from .circulation import CirculationAPI, FulfillmentInfo
from .circulation_exceptions import *
from .config import (
Configuration,
CannotLoadConfiguration,
)
from core.analytics import Analytics
from core.app_server import (
cdn_url_for,
url_for,
load_facets_from_request,
load_pagination_from_request,
ComplaintController,
HeartbeatController,
URNLookupController as CoreURNLookupController,
)
from core.entrypoint import EverythingEntryPoint
from core.external_search import (
ExternalSearchIndex,
MockExternalSearchIndex,
SortKeyPagination,
)
from core.lane import (
BaseFacets,
FeaturedFacets,
Pagination,
Lane,
SearchFacets,
WorkList,
)
from core.log import LogConfiguration
from core.marc import MARCExporter
from core.metadata_layer import ContributorData
from core.model import (
get_one,
Admin,
Annotation,
CachedFeed,
CirculationEvent,
Collection,
Complaint,
ConfigurationSetting,
CustomList,
DataSource,
DeliveryMechanism,
ExternalIntegration,
Hold,
Identifier,
IntegrationClient,
Library,
LicensePool,
Loan,
LicensePoolDeliveryMechanism,
Patron,
Representation,
Session,
)
from core.opds import (
AcquisitionFeed,
NavigationFacets,
NavigationFeed,
)
from core.opensearch import OpenSearchDocument
from core.user_profile import ProfileController as CoreProfileController
from core.util.authentication_for_opds import AuthenticationForOPDSDocument
from core.util.datetime_helpers import (
from_timestamp,
utc_now,
)
from core.util.http import (
HTTP,
RemoteIntegrationException,
)
from core.util.opds_writer import (
OPDSFeed,
)
from core.util.problem_detail import ProblemDetail
from core.util.string_helpers import base64
from .custom_index import CustomIndexView
from .lanes import (
load_lanes,
ContributorFacets,
ContributorLane,
HasSeriesFacets,
JackpotFacets,
JackpotWorkList,
RecommendationLane,
RelatedBooksLane,
SeriesFacets,
SeriesLane,
CrawlableCollectionBasedLane,
CrawlableCustomListBasedLane,
CrawlableFacets,
)
from .odl import ODLAPI
from .opds import (
CirculationManagerAnnotator,
LibraryAnnotator,
SharedCollectionAnnotator,
LibraryLoanAndHoldAnnotator,
SharedCollectionLoanAndHoldAnnotator,
)
from .problem_details import *
from .shared_collection import SharedCollectionAPI
from .testing import MockCirculationAPI, MockSharedCollectionAPI
[docs]class CirculationManager(object):
def __init__(self, _db, testing=False):
self.log = logging.getLogger("Circulation manager web app")
self._db = _db
if not testing:
try:
self.config = Configuration.load(_db)
except CannotLoadConfiguration as exception:
self.log.exception(
"Could not load configuration file: {0}".format(exception))
sys.exit()
self.testing = testing
self.site_configuration_last_update = (
Configuration.site_configuration_last_update(self._db, timeout=0)
)
self.setup_one_time_controllers()
self.load_settings()
[docs] def load_facets_from_request(self, *args, **kwargs):
"""Load a faceting object from the incoming request, but also apply some
application-specific access restrictions:
* You can't use nonstandard caching rules unless you're an authenticated administrator.
* You can't access a WorkList that's not accessible to you.
"""
facets = load_facets_from_request(*args, **kwargs)
worklist = kwargs.get('worklist')
if worklist is not None:
# Try to get the index controller. If it's not initialized
# for any reason, don't run this check -- we have bigger
# problems.
index_controller = getattr(self, 'index_controller', None)
if (index_controller and not
worklist.accessible_to(index_controller.request_patron)):
return NO_SUCH_LANE.detailed(_("Lane does not exist"))
if isinstance(facets, BaseFacets) and getattr(facets, 'max_cache_age', None) is not None:
# A faceting object was loaded, and it tried to do something nonstandard
# with caching.
# Try to get the AdminSignInController, which is
# associated with the CirculationManager object by the
# admin interface in admin/controller.
#
# If the admin interface wasn't initialized for whatever
# reason, we'll default to assuming the user is not an
# authenticated admin.
authenticated = False
controller = getattr(self, 'admin_sign_in_controller', None)
if controller:
admin = controller.authenticated_admin_from_request()
# If authenticated_admin_from_request returns anything other than an admin (probably
# a ProblemDetail), the user is not an authenticated admin.
if isinstance(admin, Admin):
authenticated = True
if not authenticated:
facets.max_cache_age = None
return facets
[docs] def reload_settings_if_changed(self):
"""If the site configuration has been updated, reload the
CirculationManager's configuration from the database.
"""
last_update = Configuration.site_configuration_last_update(self._db)
if last_update > self.site_configuration_last_update:
self.load_settings()
self.site_configuration_last_update = last_update
[docs] def load_settings(self):
"""Load all necessary configuration settings and external
integrations from the database.
This is called once when the CirculationManager is
initialized. It may also be called later to reload the site
configuration after changes are made in the administrative
interface.
"""
LogConfiguration.initialize(self._db)
self.analytics = Analytics(self._db)
self.auth = Authenticator(self._db, self.analytics)
self.setup_external_search()
# Track the Lane configuration for each library by mapping its
# short name to the top-level lane.
new_top_level_lanes = {}
# Create a CirculationAPI for each library.
new_circulation_apis = {}
# Potentially load a CustomIndexView for each library
new_custom_index_views = {}
# Make sure there's a site-wide public/private key pair.
self.sitewide_key_pair
for library in self._db.query(Library):
lanes = load_lanes(self._db, library)
new_top_level_lanes[library.id] = lanes
new_custom_index_views[library.id] = CustomIndexView.for_library(
library
)
new_circulation_apis[library.id] = self.setup_circulation(
library, self.analytics
)
self.top_level_lanes = new_top_level_lanes
self.circulation_apis = new_circulation_apis
self.custom_index_views = new_custom_index_views
self.shared_collection_api = self.setup_shared_collection()
# Assemble the list of patron web client domains from individual
# library registration settings as well as a sitewide setting.
patron_web_domains = set()
admin_web_domains = set()
def get_domain(url):
url = url.strip()
if url == "*":
return url
scheme, netloc, path, parameters, query, fragment = urllib.parse.urlparse(
url)
if scheme and netloc:
return scheme + "://" + netloc
else:
return None
sitewide_patron_web_client_urls = ConfigurationSetting.sitewide(
self._db, Configuration.PATRON_WEB_HOSTNAMES).value
if sitewide_patron_web_client_urls:
for url in sitewide_patron_web_client_urls.split('|'):
domain = get_domain(url)
if domain:
patron_web_domains.add(domain)
sitewide_admin_web_client_urls = ConfigurationSetting.sitewide(
self._db, Configuration.ADMIN_WEB_HOSTNAMES).value
if sitewide_admin_web_client_urls:
for url in sitewide_admin_web_client_urls.split('|'):
domain = get_domain(url)
if domain:
admin_web_domains.add(domain)
from .registry import Registration
for setting in self._db.query(
ConfigurationSetting).filter(
ConfigurationSetting.key == Registration.LIBRARY_REGISTRATION_WEB_CLIENT):
if setting.value:
patron_web_domains.add(get_domain(setting.value))
self.patron_web_domains = patron_web_domains
self.admin_web_domains = admin_web_domains
self.setup_configuration_dependent_controllers()
authentication_document_cache_time = int(
ConfigurationSetting.sitewide(
self._db, Configuration.AUTHENTICATION_DOCUMENT_CACHE_TIME
).value_or_default(0)
)
self.authentication_for_opds_documents = ExpiringDict(
max_len=1000, max_age_seconds=authentication_document_cache_time
)
self.wsgi_debug = ConfigurationSetting.sitewide(
self._db, Configuration.WSGI_DEBUG_KEY
).bool_value or False
@property
def external_search(self):
"""Retrieve or create a connection to the search interface.
This is created lazily so that a failure to connect only
affects feeds that depend on the search engine, not the whole
circulation manager.
"""
if not self._external_search:
self.setup_external_search()
return self._external_search
[docs] def setup_external_search(self):
try:
self._external_search = self.setup_search()
self.external_search_initialization_exception = None
except Exception as e:
self.log.error(
"Exception initializing search engine: %s", e
)
self._external_search = None
self.external_search_initialization_exception = e
return self._external_search
[docs] def cdn_url_for(self, view, *args, **kwargs):
"""Generate a URL for a view that (probably) passes through a CDN.
:param view: Name of the view.
:param _facets: The faceting object used to generate the document that's calling
this method. This may change which function is actually used to generate the
URL; in particular, it may disable a CDN that would otherwise be used. This is
called _facets just in case there's ever a view that takes 'facets' as a real
keyword argument.
:param args: Positional arguments to the view function.
:param kwargs: Keyword arguments to the view function.
"""
url_for = self._cdn_url_for
facets = kwargs.pop('_facets', None)
if facets and facets.max_cache_age is CachedFeed.IGNORE_CACHE:
# The faceting object in play has disabled cache
# checking. A CDN is also a cache, so we should disable
# CDN URLs in the feed to make it more likely that the
# client continues to see up-to-the-minute feeds as they
# click around.
url_for = self.url_for
return url_for(view, *args, **kwargs)
def _cdn_url_for(self, *args, **kwargs):
"""Call the cdn_url_for function.
Defined solely to be overridden in tests.
"""
return cdn_url_for(*args, **kwargs)
[docs] def url_for(self, view, *args, **kwargs):
"""Call the url_for function, ensuring that Flask generates an absolute URL.
"""
kwargs['_external'] = True
return url_for(view, *args, **kwargs)
[docs] def log_lanes(self, lanelist=None, level=0):
"""Output information about the lane layout."""
lanelist = lanelist or self.top_level_lane.sublanes
for lane in lanelist:
self.log.debug("%s%r", "-" * level, lane)
if lane.sublanes:
self.log_lanes(lane.sublanes, level+1)
[docs] def setup_search(self):
"""Set up a search client."""
if self.testing:
return MockExternalSearchIndex()
else:
search = ExternalSearchIndex(self._db)
if not search:
self.log.warning("No external search server configured.")
return None
return search
[docs] def setup_circulation(self, library, analytics):
"""Set up the Circulation object."""
if self.testing:
cls = MockCirculationAPI
else:
cls = CirculationAPI
return cls(self._db, library, analytics)
[docs] def setup_shared_collection(self):
if self.testing:
cls = MockSharedCollectionAPI
else:
cls = SharedCollectionAPI
return cls(self._db)
[docs] def setup_one_time_controllers(self):
"""Set up all the controllers that will be used by the web app.
This method will be called only once, no matter how many times the
site configuration changes.
"""
self.index_controller = IndexController(self)
self.opds_feeds = OPDSFeedController(self)
self.marc_records = MARCRecordController(self)
self.loans = LoanController(self)
self.annotations = AnnotationController(self)
self.urn_lookup = URNLookupController(self)
self.work_controller = WorkController(self)
self.analytics_controller = AnalyticsController(self)
self.profiles = ProfileController(self)
self.heartbeat = HeartbeatController()
self.odl_notification_controller = ODLNotificationController(self)
self.shared_collection_controller = SharedCollectionController(self)
self.static_files = StaticFileController(self)
self.rbdproxy = RBDFulfillmentProxyController(self)
from api.lcp.controller import LCPController
self.lcp_controller = LCPController(self)
[docs] def setup_configuration_dependent_controllers(self):
"""Set up all the controllers that depend on the
current site configuration.
This method will be called fresh every time the site
configuration changes.
"""
self.basic_auth_token_controller = BasicAuthTempTokenController(
self.auth)
self.oauth_controller = OAuthController(self.auth)
self.saml_controller = SAMLController(self, self.auth)
[docs] def setup_adobe_vendor_id(self, _db, library):
"""If this Library has an Adobe Vendor ID integration,
configure the controller for it.
:return: An Authdata object for `library`, if one could be created.
"""
short_client_token_initialization_exceptions = dict()
adobe = ExternalIntegration.lookup(
_db, ExternalIntegration.ADOBE_VENDOR_ID,
ExternalIntegration.DRM_GOAL, library=library
)
if adobe:
# Relatively few libraries will have this setup.
vendor_id = adobe.username
node_value = adobe.password
if not (vendor_id and node_value):
self.log.warn(
"Adobe Vendor ID is disabled due to missing or incomplete configuration. This is probably nothing to worry about.")
# But almost all libraries will have a Short Client Token
# setup. We're not setting anything up here, but this is useful
# information for the calling code to have so it knows
# whether or not we should support the Device Management Protocol.
registry = ExternalIntegration.lookup(
_db, ExternalIntegration.OPDS_REGISTRATION,
ExternalIntegration.DISCOVERY_GOAL, library=library
)
authdata = None
if registry:
try:
authdata = ShortClientTokenUtility.from_config(library, _db)
except CannotLoadConfiguration as e:
short_client_token_initialization_exceptions[library.id] = e
self.log.error(
"Short Client Token configuration for %s is present but not working. This may be cause for concern. Original error: %s",
library.name, str(e)
)
self.short_client_token_initialization_exceptions = short_client_token_initialization_exceptions
return authdata
[docs] def annotator(self, lane, facets=None, *args, **kwargs):
"""Create an appropriate OPDS annotator for the given lane.
:param lane: A Lane or WorkList.
:param facets: A faceting object.
:param annotator_class: Instantiate this annotator class if possible.
Intended for use in unit tests.
"""
library = None
if lane and isinstance(lane, Lane):
library = lane.library
elif lane and isinstance(lane, WorkList):
library = lane.get_library(self._db)
if not library and hasattr(flask.request, 'library'):
library = flask.request.library
# If no library is provided, the best we can do is a generic
# annotator for this application.
if not library:
return CirculationManagerAnnotator(lane)
# At this point we know the request is in a library context, so we
# can create a LibraryAnnotator customized for that library.
# Some features are only available if a patron authentication
# mechanism is set up for this library.
authenticator = self.auth.library_authenticators.get(
library.short_name)
library_identifies_patrons = (
authenticator is not None and authenticator.identifies_individuals
)
annotator_class = kwargs.pop('annotator_class', LibraryAnnotator)
return annotator_class(
self.circulation_apis[library.id], lane,
library, top_level_title='All Books',
library_identifies_patrons=library_identifies_patrons,
facets=facets, *args, **kwargs
)
@property
def authentication_for_opds_document(self):
"""Make sure the current request's library has an Authentication For
OPDS document in the cache, then return the cached version.
If the cache is disabled, a fresh document is created every time.
If the query argument `debug` is provided and the
WSGI_DEBUG_KEY site-wide setting is set to True, the
authentication document is annotated with a '_debug' section
describing the current WSGI environment. Since this can reveal
internal details of deployment, it should only be enabled when
diagnosing deployment problems.
"""
name = flask.request.library.short_name
value = self.authentication_for_opds_documents.get(name, None)
if value is None:
# The document was not in the cache, either because it's
# expired or because the cache itself has been disabled.
# Create a new one and stick it in the cache for next
# time.
value = self.auth.create_authentication_document()
self.authentication_for_opds_documents[name] = value
if self.wsgi_debug and 'debug' in flask.request.args:
# Annotate with debugging information about the WSGI
# environment and the authentication document cache
# itself.
value = json.loads(value)
value['_debug'] = dict(
url=self.url_for(
'authentication_document', library_short_name=name
),
environ=str(dict(flask.request.environ)),
cache=str(self.authentication_for_opds_documents),
)
value = json.dumps(value)
return value
@property
def sitewide_key_pair(self):
"""Look up or create the sitewide public/private key pair."""
setting = ConfigurationSetting.sitewide(
self._db, Configuration.KEY_PAIR
)
return Configuration.key_pair(setting)
@property
def public_key_integration_document(self):
"""Serve a document with the sitewide public key."""
site_id = ConfigurationSetting.sitewide(
self._db, Configuration.BASE_URL_KEY).value
document = dict(id=site_id)
public, private = self.sitewide_key_pair
document['public_key'] = dict(type='RSA', value=public)
return json.dumps(document)
[docs]class CirculationManagerController(BaseCirculationManagerController):
[docs] def get_patron_circ_objects(self, object_class, patron, license_pools):
if not patron:
return []
pool_ids = [pool.id for pool in license_pools]
return self._db.query(object_class).filter(
object_class.patron_id == patron.id,
object_class.license_pool_id.in_(pool_ids)
).options(eagerload(object_class.license_pool)).all()
[docs] def get_patron_loan(self, patron, license_pools):
loans = self.get_patron_circ_objects(Loan, patron, license_pools)
if loans:
loan = loans[0]
return loan, loan.license_pool
return None, None
[docs] def get_patron_hold(self, patron, license_pools):
holds = self.get_patron_circ_objects(Hold, patron, license_pools)
if holds:
hold = holds[0]
return hold, hold.license_pool
return None, None
@property
def circulation(self):
"""Return the appropriate CirculationAPI for the request Library."""
library_id = flask.request.library.id
return self.manager.circulation_apis[library_id]
@property
def shared_collection(self):
"""Return the appropriate SharedCollectionAPI for the request library."""
return self.manager.shared_collection_api
@property
def search_engine(self):
"""Return the configured external search engine, or a
ProblemDetail if none is configured.
"""
search_engine = self.manager.external_search
if not search_engine:
return REMOTE_INTEGRATION_FAILED.detailed(
_("The search index for this site is not properly configured.")
)
return search_engine
[docs] def handle_conditional_request(self, last_modified=None):
"""Handle a conditional HTTP request.
:param last_modified: A datetime representing the time this
resource was last modified.
:return: a Response, if the incoming request can be handled
conditionally. Otherwise, None.
"""
if not last_modified:
return None
# If-Modified-Since values have resolution of one second. If
# last_modified has millisecond resolution, change its
# resolution to one second.
if last_modified.microsecond:
last_modified = last_modified.replace(microsecond=0)
if_modified_since = flask.request.headers.get('If-Modified-Since')
if not if_modified_since:
return None
try:
parsed_if_modified_since = email.utils.parsedate_to_datetime(
if_modified_since
)
except (TypeError, ValueError):
# Parse error.
return None
if not parsed_if_modified_since:
return None
# "[I]f the date is conforming to the RFCs it will represent a
# time in UTC but with no indication of the actual source
# timezone of the message the date comes from."
if parsed_if_modified_since.tzinfo is None:
parsed_if_modified_since = parsed_if_modified_since.replace(
tzinfo=pytz.UTC)
if parsed_if_modified_since >= last_modified:
return Response(status=304)
return None
[docs] def load_lane(self, lane_identifier):
"""Turn user input into a Lane object."""
library_id = flask.request.library.id
lane = None
if lane_identifier is None:
# Return the top-level lane.
lane = self.manager.top_level_lanes[library_id]
if isinstance(lane, Lane):
lane = self._db.merge(lane)
elif isinstance(lane, WorkList):
lane.children = [self._db.merge(child)
for child in lane.children]
else:
try:
lane_identifier = int(lane_identifier)
except ValueError as e:
pass
if isinstance(lane_identifier, int):
lane = get_one(
self._db, Lane, id=lane_identifier, library_id=library_id
)
if lane and not lane.accessible_to(self.request_patron):
# The authenticated patron cannot access the lane they
# requested. Act like the lane does not exist.
lane = None
if not lane:
return NO_SUCH_LANE.detailed(
_("Lane %(lane_identifier)s does not exist or is not associated with library %(library_id)s",
lane_identifier=lane_identifier, library_id=library_id
)
)
return lane
[docs] def load_work(self, library, identifier_type, identifier):
pools = self.load_licensepools(library, identifier_type, identifier)
if isinstance(pools, ProblemDetail):
return pools
# We know there is at least one LicensePool, and all LicensePools
# for an Identifier have the same Work.
work = pools[0].work
if work and not work.age_appropriate_for_patron(self.request_patron):
# This work is not age-appropriate for the authenticated
# patron. Don't show it.
work = NOT_AGE_APPROPRIATE
return work
[docs] def load_licensepools(self, library, identifier_type, identifier):
"""Turn user input into one or more LicensePool objects.
:param library: The LicensePools must be associated with one of this
Library's Collections.
:param identifier_type: A type of identifier, e.g. "ISBN"
:param identifier: An identifier string, used with `identifier_type`
to look up an Identifier.
"""
_db = Session.object_session(library)
pools = _db.query(LicensePool).join(LicensePool.collection).join(
LicensePool.identifier).join(Collection.libraries).filter(
Identifier.type == identifier_type
).filter(
Identifier.identifier == identifier
).filter(
Library.id == library.id
).all()
if not pools:
return NO_LICENSES.detailed(
_("The item you're asking about (%s/%s) isn't in this collection.") % (
identifier_type, identifier
)
)
return pools
[docs] def load_licensepool(self, license_pool_id):
"""Turns user input into a LicensePool"""
license_pool = get_one(self._db, LicensePool, id=license_pool_id)
if not license_pool:
return INVALID_INPUT.detailed(
_("License Pool #%s does not exist.") % license_pool_id
)
return license_pool
[docs] def load_licensepooldelivery(self, pool, mechanism_id):
"""Turn user input into a LicensePoolDeliveryMechanism object."""
mechanism = get_one(
self._db, LicensePoolDeliveryMechanism,
data_source=pool.data_source, identifier=pool.identifier,
delivery_mechanism_id=mechanism_id, on_multiple='interchangeable'
)
return mechanism or BAD_DELIVERY_MECHANISM
[docs] def apply_borrowing_policy(self, patron, license_pool):
"""Apply the borrowing policy of the patron's library to the
book they're trying to check out.
This prevents a patron from borrowing an age-inappropriate book
or from placing a hold in a library that prohibits holds.
Generally speaking, both of these operations should be
prevented before they get to this point; this is an extra
layer of protection.
:param patron: A `Patron`. It's okay if this turns out to be a
`ProblemDetail` or `None` due to a problem earlier in the
process.
:param license_pool`: The `LicensePool` the patron is trying to act on.
"""
if patron is None or isinstance(patron, ProblemDetail):
# An earlier stage in the process failed to authenticate
# the patron.
return patron
work = license_pool.work
if work is not None and not work.age_appropriate_for_patron(patron):
return NOT_AGE_APPROPRIATE
if (not patron.library.allow_holds and
license_pool.licenses_available == 0 and
not license_pool.open_access and
not license_pool.unlimited_access and
not license_pool.self_hosted
):
return FORBIDDEN_BY_POLICY.detailed(
_("Library policy prohibits the placement of holds."),
status_code=403
)
return None
[docs]class IndexController(CirculationManagerController):
"""Redirect the patron to the appropriate feed."""
def __call__(self):
# If this library provides a custom index view, use that.
library = flask.request.library
custom = self.manager.custom_index_views.get(library.id)
if custom is not None:
annotator = self.manager.annotator(None)
return custom(library, annotator)
# The simple case: the app is equally open to all clients.
library_short_name = flask.request.library.short_name
if not self.has_root_lanes():
return redirect(self.cdn_url_for('acquisition_groups', library_short_name=library_short_name))
# The more complex case. We must authorize the patron, check
# their type, and redirect them to an appropriate feed.
return self.appropriate_index_for_patron_type()
[docs] def authentication_document(self):
"""Serve this library's Authentication For OPDS document."""
return Response(
self.manager.authentication_for_opds_document,
200,
{
"Content-Type": AuthenticationForOPDSDocument.MEDIA_TYPE
}
)
[docs] def has_root_lanes(self):
"""Does the active library feature root lanes for patrons of
certain types?
:return: A boolean
"""
return flask.request.library.has_root_lanes
[docs] def authenticated_patron_root_lane(self):
patron = self.authenticated_patron_from_request()
if isinstance(patron, ProblemDetail):
return patron
if isinstance(patron, Response):
return patron
return patron.root_lane
[docs] def appropriate_index_for_patron_type(self):
library_short_name = flask.request.library.short_name
root_lane = self.authenticated_patron_root_lane()
if isinstance(root_lane, ProblemDetail):
return root_lane
if isinstance(root_lane, Response):
return root_lane
if root_lane is None:
return redirect(
self.cdn_url_for(
'acquisition_groups',
library_short_name=library_short_name,
)
)
return redirect(
self.cdn_url_for(
'acquisition_groups',
library_short_name=library_short_name,
lane_identifier=root_lane.id,
)
)
[docs] def public_key_document(self):
"""Serves a sitewide public key document"""
return Response(
self.manager.public_key_integration_document,
200, {'Content-Type': 'application/opds+json'}
)
[docs]class OPDSFeedController(CirculationManagerController):
[docs] def groups(self, lane_identifier, feed_class=AcquisitionFeed):
"""Build or retrieve a grouped acquisition feed.
:param lane_identifier: An identifier that uniquely identifiers
the WorkList whose feed we want.
:param feed_class: A replacement for AcquisitionFeed, for use in
tests.
"""
library = flask.request.library
# Special case: a patron with a root lane who attempts to access
# the library's top-level WorkList is redirected to their root
# lane (as though they had accessed the index controller)
# rather than being denied access.
if lane_identifier is None:
patron = self.request_patron
if patron is not None and patron.root_lane:
return redirect(
self.cdn_url_for(
'acquisition_groups',
library_short_name=library.short_name,
lane_identifier=patron.root_lane.id,
_external=True
)
)
lane = self.load_lane(lane_identifier)
if isinstance(lane, ProblemDetail):
return lane
if not lane.children:
# This lane has no children. Although we can technically
# create a grouped feed, it would be an unsatisfying
# gateway to a paginated feed. We should just serve the
# paginated feed.
return self.feed(lane_identifier, feed_class)
facet_class_kwargs = dict(
minimum_featured_quality=library.minimum_featured_quality,
)
facets = self.manager.load_facets_from_request(
worklist=lane, base_class=FeaturedFacets,
base_class_constructor_kwargs=facet_class_kwargs
)
if isinstance(facets, ProblemDetail):
return facets
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
url = self.cdn_url_for(
"acquisition_groups", lane_identifier=lane_identifier,
library_short_name=library.short_name, _facets=facets
)
annotator = self.manager.annotator(lane, facets)
return feed_class.groups(
_db=self._db, title=lane.display_name, url=url, worklist=lane,
annotator=annotator, facets=facets, search_engine=search_engine
)
[docs] def feed(self, lane_identifier, feed_class=AcquisitionFeed):
"""Build or retrieve a paginated acquisition feed.
:param lane_identifier: An identifier that uniquely identifiers
the WorkList whose feed we want.
:param feed_class: A replacement for AcquisitionFeed, for use in
tests.
"""
lane = self.load_lane(lane_identifier)
if isinstance(lane, ProblemDetail):
return lane
facets = self.manager.load_facets_from_request(worklist=lane)
if isinstance(facets, ProblemDetail):
return facets
pagination = load_pagination_from_request(SortKeyPagination)
if isinstance(pagination, ProblemDetail):
return pagination
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
library_short_name = flask.request.library.short_name
url = self.cdn_url_for(
"feed", lane_identifier=lane_identifier,
library_short_name=library_short_name, _facets=facets
)
annotator = self.manager.annotator(lane, facets=facets)
return feed_class.page(
_db=self._db, title=lane.display_name,
url=url, worklist=lane, annotator=annotator,
facets=facets, pagination=pagination,
search_engine=search_engine
)
[docs] def navigation(self, lane_identifier):
"""Build or retrieve a navigation feed, for clients that do not support groups."""
lane = self.load_lane(lane_identifier)
if isinstance(lane, ProblemDetail):
return lane
library = flask.request.library
library_short_name = library.short_name
url = self.cdn_url_for(
"navigation_feed", lane_identifier=lane_identifier, library_short_name=library_short_name,
)
title = lane.display_name
facet_class_kwargs = dict(
minimum_featured_quality=library.minimum_featured_quality,
)
facets = self.manager.load_facets_from_request(
worklist=lane, base_class=NavigationFacets,
base_class_constructor_kwargs=facet_class_kwargs
)
annotator = self.manager.annotator(lane, facets)
return NavigationFeed.navigation(
self._db, title, url, lane, annotator, facets=facets
)
[docs] def crawlable_library_feed(self):
"""Build or retrieve a crawlable acquisition feed for the
request library.
"""
library = flask.request.library
url = self.cdn_url_for(
"crawlable_library_feed",
library_short_name=library.short_name,
)
title = library.name
lane = CrawlableCollectionBasedLane()
lane.initialize(library)
return self._crawlable_feed(title=title, url=url, worklist=lane)
[docs] def crawlable_collection_feed(self, collection_name):
"""Build or retrieve a crawlable acquisition feed for the
requested collection.
"""
collection = get_one(self._db, Collection, name=collection_name)
if not collection:
return NO_SUCH_COLLECTION
title = collection.name
url = self.cdn_url_for(
"crawlable_collection_feed",
collection_name=collection.name
)
lane = CrawlableCollectionBasedLane()
lane.initialize([collection])
if collection.protocol in [ODLAPI.NAME]:
annotator = SharedCollectionAnnotator(collection, lane)
else:
# We'll get a generic CirculationManagerAnnotator.
annotator = None
return self._crawlable_feed(
title=title, url=url, worklist=lane, annotator=annotator
)
[docs] def crawlable_list_feed(self, list_name):
"""Build or retrieve a crawlable, paginated acquisition feed for the
named CustomList, sorted by update date.
"""
# TODO: A library is not strictly required here, since some
# CustomLists aren't associated with a library, but this isn't
# a use case we need to support now.
library = flask.request.library
list = CustomList.find(self._db, list_name, library=library)
if not list:
return NO_SUCH_LIST
library_short_name = library.short_name
title = list.name
url = self.cdn_url_for(
"crawlable_list_feed", list_name=list.name,
library_short_name=library_short_name,
)
lane = CrawlableCustomListBasedLane()
lane.initialize(library, list)
return self._crawlable_feed(title=title, url=url, worklist=lane)
def _crawlable_feed(self, title, url, worklist, annotator=None,
feed_class=AcquisitionFeed):
"""Helper method to create a crawlable feed.
:param title: The title to use for the feed.
:param url: The URL from which the feed will be served.
:param worklist: A crawlable Lane which controls which works show up
in the feed.
:param annotator: A custom Annotator to use when generating the feed.
:param feed_class: A drop-in replacement for AcquisitionFeed
for use in tests.
"""
pagination = load_pagination_from_request(
SortKeyPagination, default_size=Pagination.DEFAULT_CRAWLABLE_SIZE
)
if isinstance(pagination, ProblemDetail):
return pagination
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
annotator = annotator or self.manager.annotator(worklist)
# A crawlable feed has only one possible set of Facets,
# so library settings are irrelevant.
facets = CrawlableFacets.default(None)
return feed_class.page(
_db=self._db, title=title, url=url, worklist=worklist,
annotator=annotator,
facets=facets, pagination=pagination,
search_engine=search_engine
)
def _load_search_facets(self, lane):
entrypoints = list(flask.request.library.entrypoints)
if len(entrypoints) > 1:
# There is more than one enabled EntryPoint.
# By default, search them all.
default_entrypoint = EverythingEntryPoint
else:
# There is only one enabled EntryPoint,
# and no need for a special default.
default_entrypoint = None
return self.manager.load_facets_from_request(
worklist=lane, base_class=SearchFacets,
default_entrypoint=default_entrypoint,
)
[docs] def search(self, lane_identifier, feed_class=AcquisitionFeed):
"""Search for books."""
lane = self.load_lane(lane_identifier)
if isinstance(lane, ProblemDetail):
return lane
# Althoug the search query goes against Elasticsearch, we must
# use normal pagination because the results are sorted by
# match quality, not bibliographic information.
pagination = load_pagination_from_request(
Pagination, default_size=Pagination.DEFAULT_SEARCH_SIZE
)
if isinstance(pagination, ProblemDetail):
return pagination
facets = self._load_search_facets(lane)
if isinstance(facets, ProblemDetail):
return facets
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
# Check whether there is a query string -- if not, we want to
# send an OpenSearch document explaining how to search.
query = flask.request.args.get('q')
library_short_name = flask.request.library.short_name
# Create a function that, when called, generates a URL to the
# search controller.
#
# We'll call this one way if there is no query string in the
# request arguments, and another way if there is a query
# string.
make_url_kwargs = dict(list(facets.items()))
def make_url(): return self.url_for(
'lane_search', lane_identifier=lane_identifier,
library_short_name=library_short_name,
**make_url_kwargs
)
if not query:
# Send the search form
open_search_doc = OpenSearchDocument.for_lane(lane, make_url())
headers = {"Content-Type": "application/opensearchdescription+xml"}
return Response(open_search_doc, 200, headers)
# We have a query -- add it to the keyword arguments used when
# generating a URL.
make_url_kwargs['q'] = query.encode("utf8")
# Run a search.
annotator = self.manager.annotator(lane, facets)
info = OpenSearchDocument.search_info(lane)
return feed_class.search(
_db=self._db, title=info['name'],
url=make_url(), lane=lane, search_engine=search_engine,
query=query, annotator=annotator, pagination=pagination,
facets=facets
)
def _qa_feed(self, feed_factory, feed_title, controller_name, facet_class,
worklist_factory):
"""Create some kind of OPDS feed designed for consumption by an
automated QA process.
:param feed_factory: This function will be called to create the feed.
It must either be AcquisitionFeed.groups or Acquisition.page,
or it must take the same arguments as those methods.
:param feed_title: String title of the feed.
:param controller_name: Controller name to use when generating
the URL to the feed.
:param facet_class: Faceting class to load (through
load_facets_from_request).
:param worklist_factory: Function that takes (Library, Facets)
and returns a Worklist configured to generate the feed.
:return: A ProblemDetail if there's a problem loading the faceting
object; otherwise the return value of `feed_factory`.
"""
library = flask.request.library
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
url = self.url_for(
controller_name,
library_short_name=library.short_name,
)
facets = load_facets_from_request(
base_class=facet_class, default_entrypoint=EverythingEntryPoint
)
if isinstance(facets, ProblemDetail):
return facets
worklist = worklist_factory(library, facets)
annotator = self.manager.annotator(worklist)
# Since this feed will be consumed by an automated client, and
# we're choosing titles for specific purposes, there's no
# reason to put more than a single item in each group.
pagination = Pagination(size=1)
return feed_factory(
_db=self._db, title=feed_title, url=url, pagination=pagination,
worklist=worklist, annotator=annotator, search_engine=search_engine,
facets=facets, max_age=CachedFeed.IGNORE_CACHE
)
[docs] def qa_feed(self, feed_class=AcquisitionFeed):
"""Create an OPDS feed containing the information necessary to
run a full set of integration tests against this server and
the vendors it relies on.
:param feed_class: Class to substitute for AcquisitionFeed during
tests.
"""
def factory(library, facets):
return JackpotWorkList(library, facets)
return self._qa_feed(
feed_factory=feed_class.groups,
feed_title="QA test feed",
controller_name="qa_feed",
facet_class=JackpotFacets,
worklist_factory=factory
)
[docs] def qa_series_feed(self, feed_class=AcquisitionFeed):
"""Create an OPDS feed containing books that belong to _some_
series, without regard to _which_ series.
:param feed_class: Class to substitute for AcquisitionFeed during
tests.
"""
def factory(library, facets):
wl = WorkList()
wl.initialize(library)
return wl
return self._qa_feed(
feed_factory=feed_class.page,
feed_title="QA series test feed",
controller_name="qa_series_feed",
facet_class=HasSeriesFacets,
worklist_factory=factory
)
[docs]class MARCRecordController(CirculationManagerController):
DOWNLOAD_TEMPLATE = """
<html lang="en">
<head><meta charset="utf8"></head>
<body>
%(body)s
</body>
</html>"""
[docs] def download_page(self):
library = flask.request.library
body = "<h2>Download MARC files for %s</h2>" % library.name
time_format = "%B %-d, %Y"
# Check if a MARC exporter is configured so we can show a
# message if it's not.
exporter = None
try:
exporter = MARCExporter.from_config(library)
except CannotLoadConfiguration as e:
body += "<p>" + \
_("No MARC exporter is currently configured for this library.") + "</p>"
if len(library.cachedmarcfiles) < 1 and exporter:
body += "<p>" + \
_("MARC files aren't ready to download yet.") + "</p>"
files_by_lane = defaultdict(dict)
for file in library.cachedmarcfiles:
if file.start_time == None:
files_by_lane[file.lane]["full"] = file
else:
if not files_by_lane[file.lane].get("updates"):
files_by_lane[file.lane]["updates"] = []
files_by_lane[file.lane]["updates"].append(file)
# TODO: By default the MARC script only caches one level of lanes,
# so sorting by priority is good enough.
lanes = sorted(list(files_by_lane.keys()),
key=lambda x: x.priority if x else -1)
for lane in lanes:
files = files_by_lane[lane]
body += "<section>"
body += "<h3>%s</h3>" % (lane.display_name if lane else _("All Books"))
if files.get("full"):
file = files.get("full")
full_url = file.representation.mirror_url
full_label = _("Full file - last updated %(update_time)s",
update_time=file.end_time.strftime(time_format))
body += '<a href="%s">%s</a>' % (
files.get("full").representation.mirror_url, full_label)
if files.get("updates"):
body += "<h4>%s</h4>" % _("Update-only files")
body += "<ul>"
files.get("updates").sort(key=lambda x: x.end_time)
for update in files.get("updates"):
update_url = update.representation.mirror_url
update_label = _("Updates from %(start_time)s to %(end_time)s",
start_time=update.start_time.strftime(
time_format),
end_time=update.end_time.strftime(time_format))
body += '<li><a href="%s">%s</a></li>' % (
update_url, update_label)
body += "</ul>"
body += "</section>"
body += "<br />"
html = self.DOWNLOAD_TEMPLATE % dict(body=body)
headers = dict()
headers['Content-Type'] = "text/html"
return Response(html, 200, headers)
[docs]class LoanController(CirculationManagerController):
[docs] def sync(self):
"""Sync the authenticated patron's loans and holds with all third-party
providers.
:return: A Response containing an OPDS feed with up-to-date information.
"""
patron = flask.request.patron
# Save some time if we don't believe the patron's loans or holds have
# changed since the last time the client requested this feed.
response = self.handle_conditional_request(
patron.last_loan_activity_sync
)
if isinstance(response, Response):
return response
# First synchronize our local list of loans and holds with all
# third-party loan providers.
if patron.authorization_identifier:
header = self.authorization_header()
credential = self.manager.auth.get_credential_from_header(header)
try:
self.circulation.sync_bookshelf(patron, credential)
except Exception as e:
# If anything goes wrong, omit the sync step and just
# display the current active loans, as we understand them.
self.manager.log.error(
"ERROR DURING SYNC for %s: %r", patron.id, e, exc_info=e
)
# Then make the feed.
return LibraryLoanAndHoldAnnotator.active_loans_for(
self.circulation, patron
)
[docs] def borrow(self, identifier_type, identifier, mechanism_id=None):
"""Create a new loan or hold for a book.
:return: A Response containing an OPDS entry that includes a link of rel
"http://opds-spec.org/acquisition", which can be used to fetch the
book or the license file.
"""
patron = flask.request.patron
library = flask.request.library
header = self.authorization_header()
credential = self.manager.auth.get_credential_from_header(header)
result = self.best_lendable_pool(
library, patron, identifier_type, identifier, mechanism_id
)
if not result:
# No LicensePools were found and no ProblemDetail
# was returned. Send a generic ProblemDetail.
return NO_LICENSES.detailed(
_("I've never heard of this work.")
)
if isinstance(result, ProblemDetail):
# There was a problem determining the appropriate
# LicensePool to use.
return result
if isinstance(result, Loan):
# We already have a Loan, so there's no need to go to the API.
loan_or_hold = result
is_new = False
else:
# We need to actually go out to the API
# and try to take out a loan.
pool, mechanism = result
loan_or_hold, is_new = self._borrow(
patron, credential, pool, mechanism)
if isinstance(loan_or_hold, ProblemDetail):
return loan_or_hold
# At this point we have either a loan or a hold. If a loan, serve
# a feed that tells the patron how to fulfill the loan. If a hold,
# serve a feed that talks about the hold.
response_kwargs = {}
if is_new:
response_kwargs['status'] = 201
else:
response_kwargs['status'] = 200
return LibraryLoanAndHoldAnnotator.single_item_feed(
self.circulation, loan_or_hold, **response_kwargs
)
def _borrow(self, patron, credential, pool, mechanism):
"""Go out to the API, try to take out a loan, and handle errors as
problem detail documents.
:param patron: The Patron who's trying to take out the loan
:param credential: A Credential to use when authenticating
as this Patron with the external API.
:param pool: The LicensePool for the book the Patron wants.
:mechanism: The DeliveryMechanism to request when asking for
a loan.
:return: a 2-tuple (result, is_new) `result` is a Loan (if one
could be created or found), a Hold (if a Loan could not be
created but a Hold could be), or a ProblemDetail (if the
entire operation failed).
"""
result = None
is_new = False
try:
loan, hold, is_new = self.circulation.borrow(
patron, credential, pool, mechanism
)
result = loan or hold
except NoOpenAccessDownload as e:
result = NO_LICENSES.detailed(
_("Couldn't find an open-access download link for this book."),
status_code=404
)
except PatronAuthorizationFailedException as e:
result = INVALID_CREDENTIALS
except (PatronLoanLimitReached, PatronHoldLimitReached) as e:
result = e.as_problem_detail_document().with_debug(str(e))
except DeliveryMechanismError as e:
result = BAD_DELIVERY_MECHANISM.with_debug(
str(e), status_code=e.status_code
)
except OutstandingFines as e:
result = OUTSTANDING_FINES.detailed(
_("You must pay your $%(fine_amount).2f outstanding fines before you can borrow more books.",
fine_amount=patron.fines)
)
except AuthorizationExpired as e:
result = e.as_problem_detail_document(debug=False)
except AuthorizationBlocked as e:
result = e.as_problem_detail_document(debug=False)
except CannotLoan as e:
result = CHECKOUT_FAILED.with_debug(str(e))
except CannotHold as e:
result = HOLD_FAILED.with_debug(str(e))
except CannotRenew as e:
result = RENEW_FAILED.with_debug(str(e))
except NotFoundOnRemote as e:
result = NOT_FOUND_ON_REMOTE
except CirculationException as e:
# Generic circulation error.
result = CHECKOUT_FAILED.with_debug(str(e))
if result is None:
# This shouldn't happen, but if it does, it means no exception
# was raised but we just didn't get a loan or hold. Return a
# generic circulation error.
result = HOLD_FAILED
return result, is_new
[docs] def best_lendable_pool(self, library, patron, identifier_type, identifier, mechanism_id):
"""
Of the available LicensePools for the given Identifier, return the
one that's the best candidate for loaning out right now.
:return: A Loan if this patron already has an active loan, otherwise a LicensePool.
"""
# Turn source + identifier into a set of LicensePools
pools = self.load_licensepools(
library, identifier_type, identifier
)
if isinstance(pools, ProblemDetail):
# Something went wrong.
return pools
best = None
mechanism = None
problem_doc = None
existing_loans = self._db.query(Loan).filter(
Loan.license_pool_id.in_([lp.id for lp in pools]),
Loan.patron == patron
).all()
if existing_loans:
# The patron already has at least one loan on this book already.
# To make the "borrow" operation idempotent, return one of
# those loans instead of an error.
return existing_loans[0]
# We found a number of LicensePools. Try to locate one that
# we can actually loan to the patron.
for pool in pools:
problem_doc = self.apply_borrowing_policy(patron, pool)
if problem_doc:
# As a matter of policy, the patron is not allowed to borrow
# this book.
continue
# Beyond this point we know that site policy does not prohibit
# us from lending this pool to this patron.
if mechanism_id:
# But the patron has requested a license pool that
# supports a specific delivery mechanism. This pool
# must offer that mechanism.
mechanism = self.load_licensepooldelivery(pool, mechanism_id)
if isinstance(mechanism, ProblemDetail):
problem_doc = mechanism
continue
# Beyond this point we have a license pool that we can
# actually loan or put on hold.
# But there might be many such LicensePools, and we want
# to pick the one that will get the book to the patron
# with the shortest wait.
if (not best
or pool.licenses_available > best.licenses_available
or pool.patrons_in_hold_queue < best.patrons_in_hold_queue):
best = pool
if not best:
# We were unable to find any LicensePool that fit the
# criteria.
return problem_doc
return best, mechanism
[docs] def fulfill(self, license_pool_id, mechanism_id=None, part=None, do_get=None):
"""Fulfill a book that has already been checked out,
or which can be fulfilled with no active loan.
If successful, this will serve the patron a downloadable copy
of the book, a key (such as a DRM license file or bearer
token) which can be used to get the book, or an OPDS entry
containing a link to the book.
:param license_pool_id: Database ID of a LicensePool.
:param mechanism_id: Database ID of a DeliveryMechanism.
:param part: Vendor-specific identifier used when fulfilling a
specific part of a book rather than the whole thing (e.g. a
single chapter of an audiobook).
"""
do_get = do_get or Representation.simple_http_get
# Unlike most controller methods, this one has different
# behavior whether or not the patron is authenticated. This is
# why we're about to do something we don't usually do--call
# authenticated_patron_from_request from within a controller
# method.
authentication_response = self.authenticated_patron_from_request()
if isinstance(authentication_response, Patron):
# The patron is authenticated.
patron = authentication_response
else:
# The patron is not authenticated, either due to bad credentials
# (in which case authentication_response is a Response)
# or due to an integration error with the auth provider (in
# which case it is a ProblemDetail).
#
# There's still a chance this request can succeed, but if not,
# we'll be sending out authentication_response.
patron = None
library = flask.request.library
header = self.authorization_header()
credential = self.manager.auth.get_credential_from_header(header)
# Turn source + identifier into a LicensePool.
pool = self.load_licensepool(license_pool_id)
if isinstance(pool, ProblemDetail):
return pool
loan, loan_license_pool = self.get_patron_loan(patron, [pool])
requested_license_pool = loan_license_pool or pool
# Find the LicensePoolDeliveryMechanism they asked for.
mechanism = None
if mechanism_id:
mechanism = self.load_licensepooldelivery(
requested_license_pool, mechanism_id
)
if isinstance(mechanism, ProblemDetail):
return mechanism
if (not loan or not loan_license_pool) and not (
self.can_fulfill_without_loan(
library, patron, requested_license_pool, mechanism
)
):
if patron:
# Since a patron was identified, the problem is they have
# no active loan.
return NO_ACTIVE_LOAN.detailed(
_("You have no active loan for this title.")
)
else:
# Since no patron was identified, the problem is
# whatever problem was revealed by the earlier
# authenticated_patron_from_request() call -- either the
# patron didn't authenticate or there's a problem
# integrating with the auth provider.
return authentication_response
if not mechanism:
# See if the loan already has a mechanism set. We can use that.
if loan and loan.fulfillment:
mechanism = loan.fulfillment
else:
return BAD_DELIVERY_MECHANISM.detailed(
_("You must specify a delivery mechanism to fulfill this loan.")
)
# Define a function that, given a part identifier, will create
# an appropriate link to this controller.
def fulfill_part_url(part):
return url_for(
"fulfill", license_pool_id=requested_license_pool.id,
mechanism_id=mechanism.delivery_mechanism.id,
library_short_name=library.short_name,
part=str(part), _external=True
)
try:
fulfillment = self.circulation.fulfill(
patron, credential, requested_license_pool, mechanism,
part=part, fulfill_part_url=fulfill_part_url
)
except DeliveryMechanismConflict as e:
return DELIVERY_CONFLICT.detailed(str(e))
except NoActiveLoan as e:
return NO_ACTIVE_LOAN.detailed(
_('Can\'t fulfill loan because you have no active loan for this book.'),
status_code=e.status_code
)
except CannotFulfill as e:
return CANNOT_FULFILL.with_debug(
str(e), status_code=e.status_code
)
except FormatNotAvailable as e:
return NO_ACCEPTABLE_FORMAT.with_debug(
str(e), status_code=e.status_code
)
except DeliveryMechanismError as e:
return BAD_DELIVERY_MECHANISM.with_debug(
str(e), status_code=e.status_code
)
# A subclass of FulfillmentInfo may want to bypass the whole
# response creation process.
response = fulfillment.as_response
if response:
return response
headers = dict()
encoding_header = dict()
if (fulfillment.data_source_name == DataSource.ENKI
and mechanism.delivery_mechanism.drm_scheme_media_type == DeliveryMechanism.NO_DRM):
encoding_header["Accept-Encoding"] = "deflate"
if mechanism.delivery_mechanism.is_streaming:
# If this is a streaming delivery mechanism, create an OPDS entry
# with a fulfillment link to the streaming reader url.
feed = LibraryLoanAndHoldAnnotator.single_item_feed(
self.circulation, loan, fulfillment=fulfillment
)
if isinstance(feed, Response):
return feed
if isinstance(feed, OPDSFeed):
content = str(feed)
else:
content = etree.tostring(feed)
status_code = 200
headers["Content-Type"] = OPDSFeed.ACQUISITION_FEED_TYPE
else:
content = fulfillment.content
if fulfillment.content_link:
# If we have a link to the content on a remote server, web clients may not
# be able to access it if the remote server does not support CORS requests.
# If the pool is open access though, the web client can link directly to the
# file to download it, so it's safe to redirect.
if requested_license_pool.open_access:
return redirect(fulfillment.content_link)
# Otherwise, we need to fetch the content and return it instead
# of redirecting to it, since it may be downloaded through an
# indirect acquisition link.
try:
status_code, headers, content = do_get(
fulfillment.content_link, headers=encoding_header)
headers = dict(headers)
except RemoteIntegrationException as e:
return e.as_problem_detail_document(debug=False)
else:
status_code = 200
if fulfillment.content_type:
headers['Content-Type'] = fulfillment.content_type
return Response(response=content, status=status_code, headers=headers)
[docs] def can_fulfill_without_loan(self, library, patron, pool, lpdm):
"""Is it acceptable to fulfill the given LicensePoolDeliveryMechanism
for the given Patron without creating a Loan first?
This question is usually asked because no Patron has been
authenticated, and thus no Loan can be created, but somebody
wants a book anyway.
:param library: A Library.
:param patron: A Patron, probably None.
:param lpdm: A LicensePoolDeliveryMechanism.
"""
authenticator = self.manager.auth.library_authenticators.get(
library.short_name)
if authenticator and authenticator.identifies_individuals:
# This library identifies individual patrons, so there is
# no reason to fulfill books without a loan. Even if the
# books are free and the 'loans' are nominal, having a
# Loan object makes it possible for a patron to sync their
# collection across devices, so that's the way we do it.
return False
# If the library doesn't require that individual patrons
# identify themselves, it's up to the CirculationAPI object.
# Most of them will say no. (This would indicate that the
# collection is improperly associated with a library that
# doesn't identify its patrons.)
return self.circulation.can_fulfill_without_loan(patron, pool, lpdm)
[docs] def revoke(self, license_pool_id):
patron = flask.request.patron
pool = self.load_licensepool(license_pool_id)
if isinstance(pool, ProblemDetail):
return pool
loan, _ignore = self.get_patron_loan(patron, [pool])
if loan:
hold = None
else:
hold, _ignore = self.get_patron_hold(patron, [pool])
if not loan and not hold:
if not pool.work:
title = 'this book'
else:
title = '"%s"' % pool.work.title
return NO_ACTIVE_LOAN_OR_HOLD.detailed(
_('Can\'t revoke because you have no active loan or hold for "%(title)s".', title=title),
status_code=404
)
header = self.authorization_header()
credential = self.manager.auth.get_credential_from_header(header)
if loan:
try:
self.circulation.revoke_loan(patron, credential, pool)
except RemoteRefusedReturn as e:
title = _(
"Loan deleted locally but remote refused. Loan is likely to show up again on next sync.")
return COULD_NOT_MIRROR_TO_REMOTE.detailed(title, status_code=503)
except CannotReturn as e:
title = _("Loan deleted locally but remote failed.")
return COULD_NOT_MIRROR_TO_REMOTE.detailed(title, 503).with_debug(str(e))
elif hold:
if not self.circulation.can_revoke_hold(pool, hold):
title = _("Cannot release a hold once it enters reserved state.")
return CANNOT_RELEASE_HOLD.detailed(title, 400)
try:
self.circulation.release_hold(patron, credential, pool)
except CannotReleaseHold as e:
title = _("Hold released locally but remote failed.")
return CANNOT_RELEASE_HOLD.detailed(title, 503).with_debug(str(e))
work = pool.work
annotator = self.manager.annotator(None)
return AcquisitionFeed.single_entry(self._db, work, annotator)
[docs] def detail(self, identifier_type, identifier):
patron = flask.request.patron
library = flask.request.library
pools = self.load_licensepools(library, identifier_type, identifier)
if isinstance(pools, ProblemDetail):
return pools
loan, pool = self.get_patron_loan(patron, pools)
if loan:
hold = None
else:
hold, pool = self.get_patron_hold(patron, pools)
if not loan and not hold:
return NO_ACTIVE_LOAN_OR_HOLD.detailed(
_('You have no active loan or hold for "%(title)s".',
title=pool.work.title),
status_code=404
)
if flask.request.method == 'GET':
if loan:
item = loan
else:
item = hold
return LibraryLoanAndHoldAnnotator.single_item_feed(
self.circulation, item
)
[docs]class AnnotationController(CirculationManagerController):
[docs] def container(self, identifier=None, accept_post=True):
headers = dict()
if accept_post:
headers['Allow'] = 'GET,HEAD,OPTIONS,POST'
headers['Accept-Post'] = AnnotationWriter.CONTENT_TYPE
else:
headers['Allow'] = 'GET,HEAD,OPTIONS'
if flask.request.method == 'HEAD':
return Response(status=200, headers=headers)
patron = flask.request.patron
if flask.request.method == 'GET':
headers['Link'] = ['<http://www.w3.org/ns/ldp#BasicContainer>; rel="type"',
'<http://www.w3.org/TR/annotation-protocol/>; rel="http://www.w3.org/ns/ldp#constrainedBy"']
headers['Content-Type'] = AnnotationWriter.CONTENT_TYPE
container, timestamp = AnnotationWriter.annotation_container_for(
patron, identifier=identifier)
etag = 'W/""'
if timestamp:
etag = 'W/"%s"' % timestamp
headers['Last-Modified'] = format_date_time(
mktime(timestamp.timetuple()))
headers['ETag'] = etag
content = json.dumps(container)
return Response(content, status=200, headers=headers)
data = flask.request.data
annotation = AnnotationParser.parse(self._db, data, patron)
if isinstance(annotation, ProblemDetail):
return annotation
content = json.dumps(AnnotationWriter.detail(annotation))
status_code = 200
headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type"'
headers['Content-Type'] = AnnotationWriter.CONTENT_TYPE
return Response(content, status_code, headers)
[docs] def container_for_work(self, identifier_type, identifier):
id_obj, ignore = Identifier.for_foreign_id(
self._db, identifier_type, identifier)
return self.container(identifier=id_obj, accept_post=False)
[docs] def detail(self, annotation_id):
headers = dict()
headers['Allow'] = 'GET,HEAD,OPTIONS,DELETE'
if flask.request.method == 'HEAD':
return Response(status=200, headers=headers)
patron = flask.request.patron
annotation = get_one(
self._db, Annotation,
patron=patron,
id=annotation_id,
active=True)
if not annotation:
return NO_ANNOTATION
if flask.request.method == 'DELETE':
annotation.set_inactive()
return Response()
content = json.dumps(AnnotationWriter.detail(annotation))
status_code = 200
headers['Link'] = '<http://www.w3.org/ns/ldp#Resource>; rel="type"'
headers['Content-Type'] = AnnotationWriter.CONTENT_TYPE
return Response(content, status_code, headers)
[docs]class WorkController(CirculationManagerController):
def _lane_details(self, languages, audiences):
if languages:
languages = languages.split(',')
if audiences:
audiences = [urllib.parse.unquote_plus(
a) for a in audiences.split(',')]
return languages, audiences
[docs] def contributor(
self, contributor_name, languages, audiences,
feed_class=AcquisitionFeed
):
"""Serve a feed of books written by a particular author"""
library = flask.request.library
if not contributor_name:
return NO_SUCH_LANE.detailed(_("No contributor provided"))
# contributor_name is probably a display_name, but it could be a
# sort_name. Pass it in for both fields and
# ContributorData.lookup() will do its best to figure it out.
contributor = ContributorData.lookup(
self._db, sort_name=contributor_name, display_name=contributor_name
)
if not contributor:
return NO_SUCH_LANE.detailed(
_("Unknown contributor: %s") % contributor_name
)
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
languages, audiences = self._lane_details(languages, audiences)
lane = ContributorLane(
library, contributor, languages=languages, audiences=audiences
)
facets = self.manager.load_facets_from_request(
worklist=lane, base_class=ContributorFacets
)
if isinstance(facets, ProblemDetail):
return facets
pagination = load_pagination_from_request(SortKeyPagination)
if isinstance(pagination, ProblemDetail):
return pagination
annotator = self.manager.annotator(lane, facets)
url = annotator.feed_url(
lane,
facets=facets,
pagination=pagination,
)
return feed_class.page(
_db=self._db, title=lane.display_name, url=url, worklist=lane,
facets=facets, pagination=pagination,
annotator=annotator, search_engine=search_engine
)
[docs] def permalink(self, identifier_type, identifier):
"""Serve an entry for a single book.
This does not include any loan or hold-specific information for
the authenticated patron.
This is different from the /works lookup protocol, in that it
returns a single entry while the /works lookup protocol returns a
feed containing any number of entries.
"""
library = flask.request.library
work = self.load_work(library, identifier_type, identifier)
if isinstance(work, ProblemDetail):
return work
patron = flask.request.patron
if patron:
pools = self.load_licensepools(
library, identifier_type, identifier)
if isinstance(pools, ProblemDetail):
return pools
loan, pool = self.get_patron_loan(patron, pools)
hold = None
if not loan:
hold, pool = self.get_patron_hold(patron, pools)
item = loan or hold
pool = pool or pools[0]
return LibraryLoanAndHoldAnnotator.single_item_feed(
self.circulation, item or pool
)
else:
annotator = self.manager.annotator(lane=None)
return AcquisitionFeed.single_entry(
self._db, work, annotator,
max_age=OPDSFeed.DEFAULT_MAX_AGE
)
[docs] def recommendations(self, identifier_type, identifier, novelist_api=None,
feed_class=AcquisitionFeed):
"""Serve a feed of recommendations related to a given book."""
library = flask.request.library
work = self.load_work(library, identifier_type, identifier)
if isinstance(work, ProblemDetail):
return work
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
lane_name = "Recommendations for %s by %s" % (work.title, work.author)
try:
lane = RecommendationLane(
library=library, work=work, display_name=lane_name,
novelist_api=novelist_api
)
except CannotLoadConfiguration as e:
# NoveList isn't configured.
return NO_SUCH_LANE.detailed(_("Recommendations not available"))
facets = self.manager.load_facets_from_request(worklist=lane)
if isinstance(facets, ProblemDetail):
return facets
# We use a normal Pagination object because recommendations
# are looked up in a third-party API and paginated through the
# database lookup.
pagination = load_pagination_from_request(Pagination)
if isinstance(pagination, ProblemDetail):
return pagination
annotator = self.manager.annotator(lane)
url = annotator.feed_url(
lane,
facets=facets,
pagination=pagination,
)
return feed_class.page(
_db=self._db, title=lane.DISPLAY_NAME, url=url, worklist=lane,
facets=facets, pagination=pagination,
annotator=annotator, search_engine=search_engine
)
[docs] def report(self, identifier_type, identifier):
"""Report a problem with a book."""
# TODO: We don't have a reliable way of knowing whether the
# complaing is being lodged against the work or against a
# specific LicensePool.
# Turn source + identifier into a set of LicensePools
library = flask.request.library
pools = self.load_licensepools(library, identifier_type, identifier)
if isinstance(pools, ProblemDetail):
# Something went wrong.
return pools
if flask.request.method == 'GET':
# Return a list of valid URIs to use as the type of a problem detail
# document.
data = "\n".join(Complaint.VALID_TYPES)
return Response(data, 200, {"Content-Type": "text/uri-list"})
data = flask.request.data
controller = ComplaintController()
return controller.register(pools[0], data)
[docs] def series(self, series_name, languages, audiences, feed_class=AcquisitionFeed):
"""Serve a feed of books in a given series."""
library = flask.request.library
if not series_name:
return NO_SUCH_LANE.detailed(_("No series provided"))
search_engine = self.search_engine
if isinstance(search_engine, ProblemDetail):
return search_engine
languages, audiences = self._lane_details(languages, audiences)
lane = SeriesLane(
library, series_name=series_name, languages=languages,
audiences=audiences
)
facets = self.manager.load_facets_from_request(
worklist=lane, base_class=SeriesFacets
)
if isinstance(facets, ProblemDetail):
return facets
pagination = load_pagination_from_request(SortKeyPagination)
if isinstance(pagination, ProblemDetail):
return pagination
annotator = self.manager.annotator(lane)
url = annotator.feed_url(lane, facets=facets, pagination=pagination)
return feed_class.page(
_db=self._db, title=lane.display_name, url=url, worklist=lane,
facets=facets, pagination=pagination,
annotator=annotator, search_engine=search_engine
)
[docs]class ProfileController(CirculationManagerController):
"""Implement the User Profile Management Protocol."""
@property
def _controller(self):
"""Instantiate a CoreProfileController that actually does the work.
"""
# TODO: Probably better to use request_patron and check for
# None here.
patron = self.authenticated_patron_from_request()
storage = CirculationPatronProfileStorage(patron, flask.url_for)
return CoreProfileController(storage)
[docs] def protocol(self):
"""Handle a UPMP request."""
controller = self._controller
if flask.request.method == 'GET':
result = controller.get()
else:
result = controller.put(flask.request.headers, flask.request.data)
if isinstance(result, ProblemDetail):
return result
return make_response(*result)
[docs]class URNLookupController(CoreURNLookupController):
def __init__(self, manager):
self.manager = manager
super(URNLookupController, self).__init__(manager._db)
[docs] def work_lookup(self, route_name):
"""Build a CirculationManagerAnnotor based on the current library's
top-level WorkList, and use it to generate an OPDS lookup
feed.
"""
library = flask.request.library
top_level_worklist = self.manager.top_level_lanes[library.id]
annotator = CirculationManagerAnnotator(top_level_worklist)
return super(URNLookupController, self).work_lookup(
annotator, route_name
)
[docs]class AnalyticsController(CirculationManagerController):
[docs] def track_event(self, identifier_type, identifier, event_type):
# TODO: It usually doesn't matter, but there should be
# a way to distinguish between different LicensePools for the
# same book.
if event_type in CirculationEvent.CLIENT_EVENTS:
library = flask.request.library
# Authentication on the AnalyticsController is optional,
# so flask.request.patron may or may not be set.
patron = getattr(flask.request, 'patron', None)
neighborhood = None
if patron:
neighborhood = getattr(patron, 'neighborhood', None)
pools = self.load_licensepools(
library, identifier_type, identifier)
if isinstance(pools, ProblemDetail):
return pools
self.manager.analytics.collect_event(
library, pools[0], event_type, utc_now(),
neighborhood=neighborhood
)
return Response({}, 200)
else:
return INVALID_ANALYTICS_EVENT_TYPE
[docs]class ODLNotificationController(CirculationManagerController):
"""Receive notifications from an ODL distributor when the
status of a loan changes.
"""
[docs] def notify(self, loan_id):
library = flask.request.library
status_doc = flask.request.data
loan = get_one(self._db, Loan, id=loan_id)
if not loan:
return NO_ACTIVE_LOAN.detailed(_("No loan was found for this identifier."))
collection = loan.license_pool.collection
if collection.protocol != ODLAPI.NAME:
return INVALID_LOAN_FOR_ODL_NOTIFICATION
api = self.manager.circulation_apis[library.id].api_for_license_pool(
loan.license_pool)
api.update_loan(loan, json.loads(status_doc))
return Response(_('Success'), 200)
[docs]class SharedCollectionController(CirculationManagerController):
"""Enable this circulation manager to share its collections with
libraries on other circulation managers, for collection types that
support it."""
[docs] def info(self, collection_name):
"""Return an OPDS2 catalog-like document with a link to register."""
collection = get_one(self._db, Collection, name=collection_name)
if not collection:
return NO_SUCH_COLLECTION
register_url = self.url_for('shared_collection_register',
collection_name=collection_name)
register_link = dict(href=register_url, rel='register')
content = json.dumps(dict(links=[register_link]))
headers = dict()
headers["Content-Type"] = "application/opds+json;profile=https://librarysimplified.org/rel/profile/directory"
return Response(content, 200, headers)
[docs] def load_collection(self, collection_name):
collection = get_one(self._db, Collection, name=collection_name)
if not collection:
return NO_SUCH_COLLECTION
return collection
[docs] def register(self, collection_name):
collection = self.load_collection(collection_name)
if isinstance(collection, ProblemDetail):
return collection
url = flask.request.form.get("url")
try:
response = self.shared_collection.register(collection, url)
except InvalidInputException as e:
return INVALID_REGISTRATION.detailed(str(e))
except AuthorizationFailedException as e:
return INVALID_CREDENTIALS.detailed(str(e))
except RemoteInitiatedServerError as e:
return e.as_problem_detail_document(debug=False)
return Response(json.dumps(response), 200)
[docs] def authenticated_client_from_request(self):
header = flask.request.headers.get('Authorization')
if header and 'bearer' in header.lower():
shared_secret = base64.b64decode(header.split(' ')[1])
client = IntegrationClient.authenticate(self._db, shared_secret)
if client:
return client
return INVALID_CREDENTIALS
[docs] def loan_info(self, collection_name, loan_id):
collection = self.load_collection(collection_name)
if isinstance(collection, ProblemDetail):
return collection
client = self.authenticated_client_from_request()
if isinstance(client, ProblemDetail):
return client
loan = get_one(self._db, Loan, id=loan_id, integration_client=client)
if not loan or loan.license_pool.collection != collection:
return LOAN_NOT_FOUND
return SharedCollectionLoanAndHoldAnnotator.single_item_feed(
collection, loan
)
[docs] def borrow(self, collection_name, identifier_type, identifier, hold_id):
collection = self.load_collection(collection_name)
if isinstance(collection, ProblemDetail):
return collection
client = self.authenticated_client_from_request()
if isinstance(client, ProblemDetail):
return client
if identifier_type and identifier:
pools = self._db.query(LicensePool).join(
LicensePool.identifier).filter(
Identifier.type == identifier_type
).filter(
Identifier.identifier == identifier
).filter(
LicensePool.collection_id == collection.id
).all()
if not pools:
return NO_LICENSES.detailed(
_("The item you're asking about (%s/%s) isn't in this collection.") % (
identifier_type, identifier
)
)
pool = pools[0]
hold = None
elif hold_id:
hold = get_one(self._db, Hold, id=hold_id)
pool = hold.license_pool
try:
loan = self.shared_collection.borrow(
collection, client, pool, hold)
except AuthorizationFailedException as e:
return INVALID_CREDENTIALS.detailed(str(e))
except NoAvailableCopies as e:
return NO_AVAILABLE_LICENSE.detailed(str(e))
except CannotLoan as e:
return CHECKOUT_FAILED.detailed(str(e))
except RemoteIntegrationException as e:
return e.as_problem_detail_document(debug=False)
if loan:
return SharedCollectionLoanAndHoldAnnotator.single_item_feed(
collection, loan, status=201
)
[docs] def revoke_loan(self, collection_name, loan_id):
"""Revoke a loan for a given collection and loan ID.
:param collection_name: The name of the collection.
:param loan_id: The ID of the loan.
:return: A success response with a status code of 200 if the loan is successfully revoked,
or a ProblemDetail object if an error occurs.
"""
# Load the collection
collection = self.load_collection(collection_name)
# If the collection could not be loaded, return a ProblemDetail object
if isinstance(collection, ProblemDetail):
return collection
# Authenticate the client
client = self.authenticated_client_from_request()
# If the client could not be authenticated, return a ProblemDetail object
if isinstance(client, ProblemDetail):
return client
# Retrieve the loan with the given ID that belongs to the authenticated client
loan = get_one(self._db, Loan, id=loan_id, integration_client=client)
# If the loan is not found or does not belong to the collection, return the LOAN_NOT_FOUND constant
if not loan or not loan.license_pool.collection == collection:
return LOAN_NOT_FOUND
# Attempt to revoke the loan
try:
self.shared_collection.revoke_loan(collection, client, loan)
# If the authorization fails, return the INVALID_CREDENTIALS constant with a detailed error message
except AuthorizationFailedException as e:
return INVALID_CREDENTIALS.detailed(str(e))
# If the loan is not checked out, return the NO_ACTIVE_LOAN constant with a detailed error message
except NotCheckedOut as e:
return NO_ACTIVE_LOAN.detailed(str(e))
# If the loan cannot be returned, return the COULD_NOT_MIRROR_TO_REMOTE constant with a detailed error message
except CannotReturn as e:
return COULD_NOT_MIRROR_TO_REMOTE.detailed(str(e))
# If the loan is successfully revoked, return a success response with a status code of 200
return Response(_("Success"), 200)
[docs] def fulfill(self, collection_name, loan_id, mechanism_id, do_get=HTTP.get_with_timeout):
"""Fulfill a loan for a given collection, loan ID, and delivery mechanism ID.
:param collection_name: The name of the collection.
:param loan_id: The ID of the loan.
:param mechanism_id: The ID of the delivery mechanism.
:param do_get: An optional function for making HTTP GET requests.
:return: The content of the fulfillment with a status code of 200 and the appropriate headers if the loan is successfully fulfilled, or a ProblemDetail object if an error occurs.
"""
collection = self.load_collection(collection_name)
if isinstance(collection, ProblemDetail):
return collection
client = self.authenticated_client_from_request()
if isinstance(client, ProblemDetail):
return client
loan = get_one(self._db, Loan, id=loan_id)
if not loan or not loan.license_pool.collection == collection:
return LOAN_NOT_FOUND
mechanism = None
if mechanism_id:
mechanism = self.load_licensepooldelivery(
loan.license_pool, mechanism_id
)
if isinstance(mechanism, ProblemDetail):
return mechanism
if not mechanism:
# See if the loan already has a mechanism set. We can use that.
if loan and loan.fulfillment:
mechanism = loan.fulfillment
else:
return BAD_DELIVERY_MECHANISM.detailed(
_("You must specify a delivery mechanism to fulfill this loan.")
)
try:
fulfillment = self.shared_collection.fulfill(
collection, client, loan, mechanism)
except AuthorizationFailedException as e:
return INVALID_CREDENTIALS.detailed(str(e))
except CannotFulfill as e:
return CANNOT_FULFILL.detailed(str(e))
except RemoteIntegrationException as e:
return e.as_problem_detail_document(debug=False)
headers = dict()
content = fulfillment.content
if fulfillment.content_link:
# If we have a link to the content on a remote server, web clients may not
# be able to access it if the remote server does not support CORS requests.
# We need to fetch the content and return it instead of redirecting to it.
try:
response = do_get(fulfillment.content_link)
status_code = response.status_code
headers = dict(response.headers)
content = response.content
except RemoteIntegrationException as e:
return e.as_problem_detail_document(debug=False)
else:
status_code = 200
if fulfillment.content_type:
headers['Content-Type'] = fulfillment.content_type
return Response(content, status_code, headers)
[docs] def hold_info(self, collection_name, hold_id):
collection = self.load_collection(collection_name)
if isinstance(collection, ProblemDetail):
return collection
client = self.authenticated_client_from_request()
if isinstance(client, ProblemDetail):
return client
hold = get_one(self._db, Hold, id=hold_id, integration_client=client)
if not hold or not hold.license_pool.collection == collection:
return HOLD_NOT_FOUND
return SharedCollectionLoanAndHoldAnnotator.single_item_feed(
collection, hold
)
[docs] def revoke_hold(self, collection_name, hold_id):
collection = self.load_collection(collection_name)
if isinstance(collection, ProblemDetail):
return collection
client = self.authenticated_client_from_request()
if isinstance(client, ProblemDetail):
return client
hold = get_one(self._db, Hold, id=hold_id, integration_client=client)
if not hold or not hold.license_pool.collection == collection:
return HOLD_NOT_FOUND
try:
self.shared_collection.revoke_hold(collection, client, hold)
except AuthorizationFailedException as e:
return INVALID_CREDENTIALS.detailed(str(e))
except NotOnHold as e:
return NO_ACTIVE_HOLD.detailed(str(e))
except CannotReleaseHold as e:
return CANNOT_RELEASE_HOLD.detailed(str(e))
return Response(_("Success"), 200)
[docs]class StaticFileController(CirculationManagerController):
[docs] def static_file(self, directory, filename):
cache_timeout = ConfigurationSetting.sitewide(
self._db, Configuration.STATIC_FILE_CACHE_TIME
).int_value
return flask.send_from_directory(directory, filename, max_age=cache_timeout)
[docs]class RBDFulfillmentProxyController(CirculationManagerController):
def __init__(self, *args, **kwargs):
super(RBDFulfillmentProxyController, self).__init__(*args, **kwargs)
self.log = logging.getLogger("RBDigital fulfillment proxy")
[docs] def proxy(self, bearer, api_class=None):
# This method expects a proxy URL with a "url" query parameter.
# It returns a Flask response.
fulfillment_url = flask.request.values.get('url', None)
try:
response = RBDigitalFulfillmentProxy.proxy(self._db, bearer, fulfillment_url,
api_class=api_class)
except RBDProxyException as e:
status = e.args[0].get('status', 500)
message = e.args[0].get('message', 'unspecified error')
self.log.error('RBDProxyException: {} {}'.format(status, message))
response = Response(
response=json.dumps({"message": message}),
status=status, content_type='application/json;charset=UTF-8',
)
return response