Source code for api.testing

import contextlib
import datetime
import json
import logging
from collections import defaultdict

from core.testing import DatabaseTest

from core.model import (
    ConfigurationSetting,
    DataSource,
    ExternalIntegration,
    Identifier,
    Library,
    Loan,
    Hold,
    Session,
)
from api.circulation import (
    BaseCirculationAPI,
    CirculationAPI,
    LoanInfo,
    HoldInfo,
)
from api.shared_collection import (
    SharedCollectionAPI,
)
from api.config import (
    Configuration,
    temp_config,
)

from api.util.short_client_token import ShortClientTokenUtility

[docs]class VendorIDTest(DatabaseTest): """A DatabaseTest that knows how to set up an Adobe Vendor ID integration. """ TEST_VENDOR_ID = "vendor id" TEST_NODE_VALUE = 114740953091845
[docs] def initialize_adobe(self, vendor_id_library, short_token_libraries=[]): """Initialize an Adobe Vendor ID integration and a Short Client Token integration with a number of libraries. :param vendor_id_library: The Library that should have an Adobe Vendor ID integration. :param short_token_libraries: The Libraries that should have a Short Client Token integration. """ short_token_libraries = list(short_token_libraries) if not vendor_id_library in short_token_libraries: short_token_libraries.append(vendor_id_library) # The first library acts as an Adobe Vendor ID server. self.adobe_vendor_id = self._external_integration( ExternalIntegration.ADOBE_VENDOR_ID, ExternalIntegration.DRM_GOAL, username=self.TEST_VENDOR_ID, libraries=[vendor_id_library] ) # The other libraries will share a registry integration. self.registry = self._external_integration( ExternalIntegration.OPDS_REGISTRATION, ExternalIntegration.DISCOVERY_GOAL, libraries=short_token_libraries ) # The integration knows which Adobe Vendor ID server it # gets its Adobe IDs from. self.registry.set_setting( ShortClientTokenUtility.VENDOR_ID_KEY, self.adobe_vendor_id.username ) # As we give libraries their Short Client Token settings, # we build the 'other_libraries' setting we'll apply to the # Adobe Vendor ID integration. other_libraries = dict() # Every library in the system can generate Short Client # Tokens. for library in short_token_libraries: # Each library will get a slightly different short # name and secret for generating Short Client Tokens. library_uri = self._url short_name = library.short_name + "token" secret = library.short_name + " token secret" ConfigurationSetting.for_library_and_externalintegration( self._db, ExternalIntegration.USERNAME, library, self.registry ).value = short_name ConfigurationSetting.for_library_and_externalintegration( self._db, ExternalIntegration.PASSWORD, library, self.registry ).value = secret library.setting(Configuration.WEBSITE_URL).value = library_uri # Each library's Short Client Token configuration will be registered # with that Adobe Vendor ID server. if library != vendor_id_library: other_libraries[library_uri] = (short_name, secret) # Tell the Adobe Vendor ID server about the other libraries. other_libraries = json.dumps(other_libraries) self.adobe_vendor_id.set_setting( ShortClientTokenUtility.OTHER_LIBRARIES_KEY, other_libraries )
[docs]class MonitorTest(DatabaseTest): @property def ts(self): """Make the timestamp used by run() when calling run_once(). This makes it easier to test run_once() in isolation. """ return self.monitor.timestamp().to_data()
[docs]class AnnouncementTest(object): """A test that needs to create announcements.""" # Create raw data to be used in tests. format = '%Y-%m-%d' today = datetime.date.today() yesterday = (today - datetime.timedelta(days=1)).strftime(format) tomorrow = (today + datetime.timedelta(days=1)).strftime(format) a_week_ago = (today - datetime.timedelta(days=7)).strftime(format) in_a_week = (today + datetime.timedelta(days=7)).strftime(format) today = today.strftime(format) # This announcement is active. active = dict( id="active", start=today, finish=tomorrow, content="A sample announcement." ) # This announcement expired yesterday. expired = dict(active) expired['id'] = 'expired' expired['start'] = a_week_ago expired['finish'] = yesterday # This announcement should be displayed starting tomorrow. forthcoming = dict(active) forthcoming['id'] = 'forthcoming' forthcoming['start'] = tomorrow forthcoming['finish'] = in_a_week
[docs]class MockRemoteAPI(BaseCirculationAPI): def __init__(self, set_delivery_mechanism_at, can_revoke_hold_when_reserved): self.SET_DELIVERY_MECHANISM_AT = set_delivery_mechanism_at self.CAN_REVOKE_HOLD_WHEN_RESERVED = can_revoke_hold_when_reserved self.responses = defaultdict(list) self.log = logging.getLogger("Mock remote API") self.availability_updated_for = []
[docs] def checkout( self, patron_obj, patron_password, licensepool, delivery_mechanism ): # Should be a LoanInfo. return self._return_or_raise('checkout')
[docs] def update_availability(self, licensepool): """Simply record the fact that update_availability was called.""" self.availability_updated_for.append(licensepool)
[docs] def place_hold(self, patron, pin, licensepool, hold_notification_email=None): # Should be a HoldInfo. return self._return_or_raise('hold')
[docs] def fulfill(self, patron, pin, licensepool, internal_format=None, part=None, fulfill_part_url=None): # Should be a FulfillmentInfo. return self._return_or_raise('fulfill')
[docs] def checkin(self, patron, pin, licensepool): # Return value is not checked. return self._return_or_raise('checkin')
[docs] def release_hold(self, patron, pin, licensepool): # Return value is not checked. return self._return_or_raise('release_hold')
[docs] def internal_format(self, delivery_mechanism): return delivery_mechanism
[docs] def update_loan(self, loan, status_doc): self.availability_updated_for.append(loan.license_pool)
[docs] def queue_checkout(self, response): self._queue('checkout', response)
[docs] def queue_hold(self, response): self._queue('hold', response)
[docs] def queue_fulfill(self, response): self._queue('fulfill', response)
[docs] def queue_checkin(self, response): self._queue('checkin', response)
[docs] def queue_release_hold(self, response): self._queue('release_hold', response)
def _queue(self, k, v): self.responses[k].append(v) def _return_or_raise(self, k): self.log.debug(k) l = self.responses[k] v = l.pop() if isinstance(v, Exception): raise v return v
[docs]class MockCirculationAPI(CirculationAPI): def __init__(self, *args, **kwargs): super(MockCirculationAPI, self).__init__(*args, **kwargs) self.responses = defaultdict(list) self.remote_loans = [] self.remote_holds = [] self.remotes = {}
[docs] def local_loans(self, patron): return self._db.query(Loan).filter(Loan.patron==patron)
[docs] def local_holds(self, patron): return self._db.query(Hold).filter(Hold.patron==patron)
[docs] def add_remote_loan(self, *args, **kwargs): self.remote_loans.append(LoanInfo(*args, **kwargs))
[docs] def add_remote_hold(self, *args, **kwargs): self.remote_holds.append(HoldInfo(*args, **kwargs))
[docs] def patron_activity(self, patron, pin): """Return a 3-tuple (loans, holds, completeness).""" return self.remote_loans, self.remote_holds, True
[docs] def queue_checkout(self, licensepool, response): self._queue('checkout', licensepool, response)
[docs] def queue_hold(self, licensepool, response): self._queue('hold', licensepool, response)
[docs] def queue_fulfill(self, licensepool, response): self._queue('fulfill', licensepool, response)
[docs] def queue_checkin(self, licensepool, response): self._queue('checkin', licensepool, response)
[docs] def queue_release_hold(self, licensepool, response): self._queue('release_hold', licensepool, response)
def _queue(self, method, licensepool, response): mock = self.api_for_license_pool(licensepool) return mock._queue(method, response)
[docs] def api_for_license_pool(self, licensepool): source = licensepool.data_source.name if source not in self.remotes: set_delivery_mechanism_at = BaseCirculationAPI.FULFILL_STEP can_revoke_hold_when_reserved = True if source == DataSource.AXIS_360: set_delivery_mechanism_at = BaseCirculationAPI.BORROW_STEP if source == DataSource.THREEM: can_revoke_hold_when_reserved = False remote = MockRemoteAPI( set_delivery_mechanism_at, can_revoke_hold_when_reserved ) self.remotes[source] = remote return self.remotes[source]
[docs]class MockSharedCollectionAPI(SharedCollectionAPI): def __init__(self, *args, **kwargs): super(MockSharedCollectionAPI, self).__init__(*args, **kwargs) self.responses = defaultdict(list) def _queue(self, k, v): self.responses[k].append(v) def _return_or_raise(self, k): self.log.debug(k) l = self.responses[k] v = l.pop(0) if isinstance(v, Exception): raise v return v
[docs] def queue_register(self, response): self._queue('register', response)
[docs] def register(self, collection, url): return self._return_or_raise('register')
[docs] def queue_borrow(self, response): self._queue('borrow', response)
[docs] def borrow(self, collection, client, pool, hold=None): return self._return_or_raise('borrow')
[docs] def queue_revoke_loan(self, response): self._queue('revoke-loan', response)
[docs] def revoke_loan(self, collection, client, loan): return self._return_or_raise('revoke-loan')
[docs] def queue_fulfill(self, response): self._queue('fulfill', response)
[docs] def fulfill(self, patron, pin, licensepool, internal_format=None, part=None, fulfill_part_url=None): return self._return_or_raise('fulfill')
[docs] def queue_revoke_hold(self, response): self._queue('revoke-hold', response)
[docs] def revoke_hold(self, collection, client, hold): return self._return_or_raise('revoke-hold')