import logging
import traceback
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.functions import func
from .model import (
get_one,
get_one_or_create,
BaseCoverageRecord,
Collection,
CollectionMissing,
CoverageRecord,
DataSource,
Edition,
ExternalIntegration,
Identifier,
LicensePool,
PresentationCalculationPolicy,
Timestamp,
Work,
WorkCoverageRecord,
)
from .metadata_layer import (
ReplacementPolicy,
TimestampData,
)
from .util.worker_pools import DatabaseJob
from .util.datetime_helpers import utc_now
from . import log # This sets the appropriate log format.
[docs]class CoverageFailure(object):
"""Object representing the failure to provide coverage."""
def __init__(self, obj, exception, data_source=None, transient=True,
collection=None):
self.obj = obj
self.data_source = data_source
self.exception = exception
self.transient = transient
self.collection = collection
def __repr__(self):
if self.data_source:
data_source = self.data_source.name
else:
data_source = None
return "<CoverageFailure: obj=%r data_source=%r transient=%r exception=%r>" % (
self.obj, data_source, self.transient, self.exception
)
[docs] def to_coverage_record(self, operation=None):
"""Convert this failure into a CoverageRecord."""
if not self.data_source:
raise Exception(
"Cannot convert coverage failure to CoverageRecord because it has no output source."
)
record, ignore = CoverageRecord.add_for(
self.obj, self.data_source, operation=operation,
collection=self.collection
)
record.exception = self.exception
if self.transient:
record.status = CoverageRecord.TRANSIENT_FAILURE
else:
record.status = CoverageRecord.PERSISTENT_FAILURE
return record
[docs] def to_work_coverage_record(self, operation):
"""Convert this failure into a WorkCoverageRecord."""
record, ignore = WorkCoverageRecord.add_for(
self.obj, operation=operation
)
record.exception = self.exception
if self.transient:
record.status = CoverageRecord.TRANSIENT_FAILURE
else:
record.status = CoverageRecord.PERSISTENT_FAILURE
return record
[docs]class CoverageProviderProgress(TimestampData):
"""A TimestampData optimized for the special needs of
CoverageProviders.
"""
def __init__(self, *args, **kwargs):
super(CoverageProviderProgress, self).__init__(*args, **kwargs)
# The offset is distinct from the counter, in that it's not written
# to the database -- it's used to track state that's necessary within
# a single run of the CoverageProvider.
self.offset = 0
self.successes = 0
self.transient_failures = 0
self.persistent_failures = 0
@property
def achievements(self):
"""Represent the achievements of a CoverageProvider as a
human-readable string.
"""
template = "Items processed: %d. Successes: %d, transient failures: %d, persistent failures: %d"
total = (self.successes + self.transient_failures
+ self.persistent_failures)
return template % (
total, self.successes, self.transient_failures,
self.persistent_failures
)
@achievements.setter
def achievements(self, value):
# It's not possible to set .achievements directly. Do nothing.
pass
[docs]class BaseCoverageProvider(object):
"""Run certain objects through an algorithm. If the algorithm returns
success, add a coverage record for that object, so the object
doesn't need to be processed again. If the algorithm returns a
CoverageFailure, that failure may itself be memorialized as a
coverage record.
Instead of instantiating this class directly, subclass one of its
subclasses: either IdentifierCoverageProvider or
WorkCoverageProvider.
In IdentifierCoverageProvider the 'objects' are Identifier objects
and the coverage records are CoverageRecord objects. In
WorkCoverageProvider the 'objects' are Work objects and the
coverage records are WorkCoverageRecord objects.
"""
# In your subclass, set this to the name of the service,
# e.g. "Overdrive Bibliographic Coverage Provider".
SERVICE_NAME = None
# In your subclass, you _may_ set this to a string that distinguishes
# two different CoverageProviders from the same data source.
# (You may also override the operation method, if you need
# database access to determine which operation to use.)
OPERATION = None
# The database session will be committed each time the
# BaseCoverageProvider has (attempted to) provide coverage to this
# number of Identifiers. You may change this in your subclass.
# It's also possible to change it by passing in a value for
# `batch_size` in the constructor, but generally nobody bothers
# doing this.
DEFAULT_BATCH_SIZE = 100
def __init__(self, _db, batch_size=None, cutoff_time=None,
registered_only=False,
):
"""Constructor.
:param batch_size: The maximum number of objects that will be processed
at once.
:param cutoff_time: Coverage records created before this time
will be treated as though they did not exist.
:param registered_only: Optional. Determines whether this
CoverageProvider will only cover items that already have been
"preregistered" with a CoverageRecord with a registered or failing
status. This option is only used on the Metadata Wrangler.
"""
self._db = _db
if not self.__class__.SERVICE_NAME:
raise ValueError(
"%s must define SERVICE_NAME." % self.__class__.__name__
)
service_name = self.__class__.SERVICE_NAME
operation = self.operation
if operation:
service_name += ' (%s)' % operation
self.service_name = service_name
if not batch_size or batch_size < 0:
batch_size = self.DEFAULT_BATCH_SIZE
self.batch_size = batch_size
self.cutoff_time = cutoff_time
self.registered_only = registered_only
self.collection_id = None
@property
def log(self):
if not hasattr(self, '_log'):
self._log = logging.getLogger(self.service_name)
return self._log
@property
def collection(self):
"""Retrieve the Collection object associated with this
CoverageProvider.
"""
if not self.collection_id:
return None
return get_one(self._db, Collection, id=self.collection_id)
@property
def operation(self):
"""Which operation should this CoverageProvider use to
distinguish between multiple CoverageRecords from the same data
source?
"""
return self.OPERATION
[docs] def run(self):
start = utc_now()
result = self.run_once_and_update_timestamp()
result = result or CoverageProviderProgress()
self.finalize_timestampdata(result, start=start)
return result
[docs] def run_once_and_update_timestamp(self):
# First prioritize items that have never had a coverage attempt before.
# Then cover items that failed with a transient failure on a
# previous attempt.
covered_status_lists = [
BaseCoverageRecord.PREVIOUSLY_ATTEMPTED,
BaseCoverageRecord.DEFAULT_COUNT_AS_COVERED
]
start_time = utc_now()
timestamp = self.timestamp
# We'll use this TimestampData object to track our progress
# as we grant coverage to items.
progress = CoverageProviderProgress(start=start_time)
for covered_statuses in covered_status_lists:
# We may have completed our work for the previous value of
# covered_statuses, but there's more work to do. Unset the
# 'finish' date to guarantee that progress.is_complete
# starts out False.
#
# Also set the offset to zero to ensure that we always start
# at the start of the database table.
original_finish = progress.finish = None
progress.offset = 0
# Call run_once() until we get an exception or
# progress.finish is set.
while not progress.is_complete:
try:
new_progress = self.run_once(
progress, count_as_covered=covered_statuses
)
# run_once can either return a new
# CoverageProviderProgress object, or modify
# in-place the one it was passed.
if new_progress is not None:
progress = new_progress
except Exception as e:
logging.error(
"CoverageProvider %s raised uncaught exception.",
self.service_name, exc_info=e
)
progress.exception=traceback.format_exc()
progress.finish=utc_now()
# The next run_once() call might raise an exception,
# so let's write the work to the database as it's
# done.
original_finish = progress.finish
self.finalize_timestampdata(progress)
# That wrote a value for progress.finish to the
# database, which is fine, but we don't necessarily
# want that value for progress.finish to stand. It
# might incorrectly make progress.is_complete appear
# to be True, making us exit the loop before we mean
# to.
if not progress.exception:
progress.finish = original_finish
# TODO: We should be able to return a list of progress objects,
# not just one.
return progress
@property
def timestamp(self):
"""Look up the Timestamp object for this CoverageProvider."""
return Timestamp.lookup(
self._db, self.service_name, Timestamp.COVERAGE_PROVIDER_TYPE,
collection=self.collection
)
[docs] def finalize_timestampdata(self, timestamp, **kwargs):
"""Finalize the given TimestampData and write it to the
database.
"""
timestamp.finalize(
self.service_name, Timestamp.COVERAGE_PROVIDER_TYPE,
collection=self.collection, **kwargs
)
timestamp.apply(self._db)
self._db.commit()
[docs] def run_once(self, progress, count_as_covered=None):
"""Try to grant coverage to a number of uncovered items.
NOTE: If you override this method, it's very important that
your implementation eventually do one of the following:
* Set progress.finish
* Set progress.exception
* Raise an exception
If you don't do any of these things, run() will assume you still
have work to do, and will keep calling run_once() forever.
:param progress: A CoverageProviderProgress representing the
progress made so far, and the number of records that
need to be ignored for the rest of the run.
:param count_as_covered: Which values for CoverageRecord.status
should count as meaning 'already covered'.
:return: A CoverageProviderProgress representing whatever
additional progress has been made.
"""
count_as_covered = count_as_covered or BaseCoverageRecord.DEFAULT_COUNT_AS_COVERED
# Make it clear which class of items we're covering on this
# run.
count_as_covered_message = ' (counting %s as covered)' % (', '.join(count_as_covered))
qu = self.items_that_need_coverage(count_as_covered=count_as_covered)
self.log.info("%d items need coverage%s", qu.count(),
count_as_covered_message)
batch = qu.limit(self.batch_size).offset(progress.offset)
if not batch.count():
# The batch is empty. We're done.
progress.finish = utc_now()
return progress
(successes, transient_failures, persistent_failures), results = (
self.process_batch_and_handle_results(batch)
)
# Update the running totals so that the service's eventual timestamp
# will have a useful .achievements.
progress.successes += successes
progress.transient_failures += transient_failures
progress.persistent_failures += persistent_failures
if BaseCoverageRecord.SUCCESS not in count_as_covered:
# If any successes happened in this batch, increase the
# offset to ignore them, or they will just show up again
# the next time we run this batch.
progress.offset += successes
if BaseCoverageRecord.TRANSIENT_FAILURE not in count_as_covered:
# If any transient failures happened in this batch,
# increase the offset to ignore them, or they will
# just show up again the next time we run this batch.
progress.offset += transient_failures
if BaseCoverageRecord.PERSISTENT_FAILURE not in count_as_covered:
# If any persistent failures happened in this batch,
# increase the offset to ignore them, or they will
# just show up again the next time we run this batch.
progress.offset += persistent_failures
return progress
[docs] def process_batch_and_handle_results(self, batch):
""":return: A 2-tuple (counts, records).
`counts` is a 3-tuple (successes, transient failures,
persistent_failures).
`records` is a mixed list of coverage record objects (for
successes and persistent failures) and CoverageFailure objects
(for transient failures).
"""
# Batch is a query that may not be ordered, so it may return
# different results when executed multiple times. Converting to
# a list ensures that all subsequent code will run on the same items.
batch = list(batch)
offset_increment = 0
results = self.process_batch(batch)
successes = 0
transient_failures = 0
persistent_failures = 0
num_ignored = 0
records = []
unhandled_items = set(batch)
success_items = []
for item in results:
if isinstance(item, CoverageFailure):
if item.obj in unhandled_items:
unhandled_items.remove(item.obj)
record = self.record_failure_as_coverage_record(item)
if item.transient:
self.log.warning(
"Transient failure covering %r: %s",
item.obj, item.exception
)
record.status = BaseCoverageRecord.TRANSIENT_FAILURE
transient_failures += 1
else:
self.log.error(
"Persistent failure covering %r: %s",
item.obj, item.exception
)
record.status = BaseCoverageRecord.PERSISTENT_FAILURE
persistent_failures += 1
records.append(record)
else:
# Count this as a success and prepare to add a
# coverage record for it. It won't show up anymore, on
# this run or subsequent runs.
if item in unhandled_items:
unhandled_items.remove(item)
successes += 1
success_items.append(item)
records.extend(self.add_coverage_records_for(success_items))
# Perhaps some records were ignored--they neither succeeded nor
# failed. Treat them as transient failures.
for item in unhandled_items:
self.log.warning(
"%r was ignored by a coverage provider that was supposed to cover it.", item
)
failure = self.failure_for_ignored_item(item)
record = self.record_failure_as_coverage_record(failure)
record.status = BaseCoverageRecord.TRANSIENT_FAILURE
records.append(record)
num_ignored += 1
self.log.info(
"Batch processed with %d successes, %d transient failures, %d persistent failures, %d ignored.",
successes, transient_failures, persistent_failures, num_ignored
)
# Finalize this batch before moving on to the next one.
self.finalize_batch()
# For all purposes outside this method, treat an ignored identifier
# as a transient failure.
transient_failures += num_ignored
return (successes, transient_failures, persistent_failures), records
[docs] def process_batch(self, batch):
"""Do what it takes to give coverage records to a batch of
items.
:return: A mixed list of coverage records and CoverageFailures.
"""
results = []
for item in batch:
result = self.process_item(item)
if not isinstance(result, CoverageFailure):
self.handle_success(item)
results.append(result)
return results
[docs] def add_coverage_records_for(self, items):
"""Add CoverageRecords for a group of items from a batch,
each of which was successful.
"""
return [self.add_coverage_record_for(item) for item in items]
[docs] def handle_success(self, item):
"""Do something special to mark the successful coverage of the
given item.
"""
pass
[docs] def should_update(self, coverage_record):
"""Should we do the work to update the given coverage record?"""
if coverage_record is None:
# An easy decision -- there is no existing coverage record,
# so we need to do the work.
return True
if coverage_record.status==BaseCoverageRecord.REGISTERED:
# There's a CoverageRecord, but coverage hasn't actually
# been attempted. Try to get covered.
return True
if self.cutoff_time is None:
# An easy decision -- without a cutoff_time, once we
# create a coverage record we never update it.
return False
# We update a coverage record if it was last updated before
# cutoff_time.
return coverage_record.timestamp < self.cutoff_time
[docs] def finalize_batch(self):
"""Do whatever is necessary to complete this batch before moving on to
the next one.
e.g. committing the database session or uploading a bunch of
assets to S3.
"""
self._db.commit()
#
# Subclasses must implement these virtual methods.
#
[docs] def items_that_need_coverage(self, identifiers=None, **kwargs):
"""Create a database query returning only those items that
need coverage.
:param subset: A list of Identifier objects. If present, return
only items that need coverage *and* are associated with one
of these identifiers.
Implemented in CoverageProvider and WorkCoverageProvider.
"""
raise NotImplementedError()
[docs] def add_coverage_record_for(self, item):
"""Add a coverage record for the given item.
Implemented in IdentifierCoverageProvider and WorkCoverageProvider.
"""
raise NotImplementedError()
[docs] def record_failure_as_coverage_record(self, failure):
"""Convert the given CoverageFailure to a coverage record.
Implemented in IdentifierCoverageProvider and WorkCoverageProvider.
"""
raise NotImplementedError()
[docs] def failure_for_ignored_item(self, work):
"""Create a CoverageFailure recording the coverage provider's
failure to even try to process an item.
Implemented in IdentifierCoverageProvider and
WorkCoverageProvider.
"""
raise NotImplementedError()
[docs] def process_item(self, item):
"""Do the work necessary to give coverage to one specific item.
Since this is where the actual work happens, this is not
implemented in IdentifierCoverageProvider or
WorkCoverageProvider, and must be handled in a subclass.
"""
raise NotImplementedError()
[docs]class IdentifierCoverageProvider(BaseCoverageProvider):
"""Run Identifiers of certain types (ISBN, Overdrive, OCLC Number,
etc.) through an algorithm associated with a certain DataSource.
This class is designed to be subclassed rather than instantiated
directly. Subclasses should define SERVICE_NAME, OPERATION
(optional), DATA_SOURCE_NAME, and
INPUT_IDENTIFIER_TYPES. SERVICE_NAME and OPERATION are described
in BaseCoverageProvider; the rest are described in appropriate
comments in this class.
"""
# In your subclass, set this to the name of the data source you
# consult when providing coverage, e.g. DataSource.OVERDRIVE.
DATA_SOURCE_NAME = None
# In your subclass, set this to a single identifier type, or a list
# of identifier types. The CoverageProvider will attempt to give
# coverage to every Identifier of this type.
#
# Setting this to None will attempt to give coverage to every single
# Identifier in the system, which is probably not what you want.
NO_SPECIFIED_TYPES = object()
INPUT_IDENTIFIER_TYPES = NO_SPECIFIED_TYPES
# Set this to False if a given Identifier needs to be run through
# this CoverageProvider once for every Collection that has this
# Identifier in its catalog. If this is set to True, a given
# Identifier will be considered completely covered the first time
# it's run through this CoverageProvider, no matter how many
# Collections the Identifier belongs to.
COVERAGE_COUNTS_FOR_EVERY_COLLECTION = True
def __init__(self, _db, collection=None, input_identifiers=None,
replacement_policy=None, **kwargs
):
"""Constructor.
:param collection: Optional. If information comes in from a
third party about a license pool associated with an
Identifier, the LicensePool that belongs to this Collection
will be used to contain that data. You may pass in None for
this value, but that means that no circulation information
(such as the formats in which a book is available) will be
stored as a result of running this CoverageProvider. Only
bibliographic information will be stored.
:param input_identifiers: Optional. This CoverageProvider is
requested to provide coverage for these specific
Identifiers.
:param replacement_policy: Optional. A ReplacementPolicy to use
when updating local data with data from the third party.
"""
super(IdentifierCoverageProvider, self).__init__(_db, **kwargs)
# We store the collection ID rather than the Collection to
# avoid breakage if an app server with a scoped session ever
# uses a IdentifierCoverageProvider.
self.collection_id = None
if collection:
self.collection_id = collection.id
self.input_identifiers = input_identifiers
self.replacement_policy = (
replacement_policy or self._default_replacement_policy(_db)
)
if not self.DATA_SOURCE_NAME:
raise ValueError(
"%s must define DATA_SOURCE_NAME" % self.__class__.__name__
)
# Get this information immediately so that an error happens immediately
# if INPUT_IDENTIFIER_TYPES is not set properly.
self.input_identifier_types = self._input_identifier_types()
def _default_replacement_policy(self, _db):
"""Unless told otherwise, assume that we are getting
this data from a reliable metadata source.
"""
return ReplacementPolicy.from_metadata_source()
@property
def collection_or_not(self):
"""If this CoverageProvider needs to be run multiple times on
the same identifier in different collections, this
returns the collection. Otherwise, this returns None.
"""
if self.COVERAGE_COUNTS_FOR_EVERY_COLLECTION:
return None
return self.collection
@classmethod
def _input_identifier_types(cls):
"""Create a normalized value for `input_identifier_types`
based on the INPUT_IDENTIFIER_TYPES class variable.
"""
value = cls.INPUT_IDENTIFIER_TYPES
# Nip in the bud a situation where someone subclassed this
# class without thinking about a value for
# INPUT_IDENTIFIER_TYPES.
if value is cls.NO_SPECIFIED_TYPES:
raise ValueError(
"%s must define INPUT_IDENTIFIER_TYPES, even if the value is None." % (cls.__name__)
)
if not value:
# We will be processing every single type of identifier in
# the system. This (hopefully) means that the identifiers
# are restricted in some other way, such as being licensed
# to a specific Collection.
return None
elif not isinstance(value, list):
# We will be processing every identifier of a given type.
return [value]
else:
# We will be processing every identify whose type belongs to
# a list of types.
return value
[docs] @classmethod
def register(cls, identifier, data_source=None, collection=None,
force=False, autocreate=False
):
"""Registers an identifier for future coverage.
See `CoverageProvider.bulk_register` for more information about using
this method.
"""
name = cls.SERVICE_NAME or cls.__name__
log = logging.getLogger(name)
new_records, ignored_identifiers = cls.bulk_register(
[identifier], data_source=data_source, collection=collection,
force=force, autocreate=autocreate
)
was_registered = identifier not in ignored_identifiers
new_record = None
if new_records:
[new_record] = new_records
if was_registered and new_record:
log.info('CREATED %r' % new_record)
return new_record, was_registered
_db = Session.object_session(identifier)
data_source = cls._data_source_for_registration(
_db, data_source, autocreate=autocreate
)
if collection and cls.COVERAGE_COUNTS_FOR_EVERY_COLLECTION:
# There's no need for a collection when registering this
# Identifier, even if it provided the DataSource.
collection = None
existing_record = CoverageRecord.lookup(
identifier, data_source, cls.OPERATION, collection=collection
)
log.info('FOUND %r' % existing_record)
return existing_record, was_registered
[docs] @classmethod
def bulk_register(cls, identifiers, data_source=None, collection=None,
force=False, autocreate=False
):
"""Registers identifiers for future coverage.
This method is primarily for use with CoverageProviders that use the
`registered_only` flag to process items. It's currently only in use
on the Metadata Wrangler.
:param data_source: DataSource object or basestring representing a
DataSource name.
:param collection: Collection object to be associated with the
CoverageRecords.
:param force: When True, even existing CoverageRecords will have
their status reset to CoverageRecord.REGISTERED.
:param autocreate: When True, a basestring provided by data_source will
be autocreated in the database if it didn't previously exist.
:return: A tuple of two lists: the first has fresh new REGISTERED
CoverageRecords and the second list already has Identifiers that
were ignored because they already had coverage.
TODO: Take identifier eligibility into account when registering.
"""
if not identifiers:
return list(), list()
_db = Session.object_session(identifiers[0])
data_source = cls._data_source_for_registration(
_db, data_source, autocreate=autocreate
)
if collection and cls.COVERAGE_COUNTS_FOR_EVERY_COLLECTION:
# There's no need for a collection on this CoverageRecord.
collection = None
new_records, ignored_identifiers = CoverageRecord.bulk_add(
identifiers, data_source, operation=cls.OPERATION,
status=CoverageRecord.REGISTERED, collection=collection,
force=force,
)
return new_records, ignored_identifiers
@classmethod
def _data_source_for_registration(cls, _db, data_source, autocreate=False):
"""Finds or creates a DataSource for the registration methods
`cls.register` and `cls.bulk_register`.
"""
if not data_source:
return DataSource.lookup(_db, cls.DATA_SOURCE_NAME)
if isinstance(data_source, DataSource):
return data_source
if isinstance(data_source, (bytes, str)):
return DataSource.lookup(_db, data_source, autocreate=autocreate)
@property
def data_source(self):
"""Look up the DataSource object corresponding to the
service we're running this data through.
Out of an excess of caution, we look up the DataSource every
time, rather than storing it, in case a CoverageProvider is
ever used in an environment where the database session is
scoped (e.g. the circulation manager).
"""
return DataSource.lookup(self._db, self.DATA_SOURCE_NAME)
[docs] def failure(self, identifier, error, transient=True):
"""Create a CoverageFailure object to memorialize an error."""
return CoverageFailure(
identifier, error,
data_source=self.data_source,
transient=transient,
collection=self.collection_or_not,
)
[docs] def can_cover(self, identifier):
"""Can this IdentifierCoverageProvider do anything with the given
Identifier?
This is not needed in the normal course of events, but a
caller may need to decide whether to pass an Identifier
into ensure_coverage() or register().
"""
return (not self.input_identifier_types
or identifier.type in self.input_identifier_types)
[docs] def run_on_specific_identifiers(self, identifiers):
"""Split a specific set of Identifiers into batches and process one
batch at a time.
This is for use by IdentifierInputScript.
:return: The same (counts, records) 2-tuple as
process_batch_and_handle_results.
"""
index = 0
successes = 0
transient_failures = 0
persistent_failures = 0
records = []
# Of all the items that need coverage, find the intersection
# with the given list of items.
need_coverage = self.items_that_need_coverage(identifiers).all()
# Treat any items with up-to-date coverage records as
# automatic successes.
#
# NOTE: We won't actually be returning those coverage records
# in `records`, since items_that_need_coverage() filters them
# out, but nobody who calls this method really needs those
# records.
automatic_successes = len(identifiers) - len(need_coverage)
successes += automatic_successes
self.log.info("%d automatic successes.", successes)
# Iterate over any items that were not automatic
# successes.
while index < len(need_coverage):
batch = need_coverage[index:index+self.batch_size]
(s, t, p), r = self.process_batch_and_handle_results(batch)
successes += s
transient_failures += t
persistent_failures += p
records += r
index += self.batch_size
return (successes, transient_failures, persistent_failures), records
[docs] def ensure_coverage(self, item, force=False):
"""Ensure coverage for one specific item.
:param item: This should always be an Identifier, but this
code will also work if it's an Edition. (The Edition's
.primary_identifier will be covered.)
:param force: Run the coverage code even if an existing
coverage record for this item was created after `self.cutoff_time`.
:return: Either a coverage record or a CoverageFailure.
TODO: This could be abstracted and moved to BaseCoverageProvider.
"""
if isinstance(item, Identifier):
identifier = item
else:
identifier = item.primary_identifier
if self.COVERAGE_COUNTS_FOR_EVERY_COLLECTION:
# We need to cover this Identifier once, and then we're
# done, for all collections.
collection = None
else:
# We need separate coverage for the specific Collection
# associated with this CoverageProvider.
collection = self.collection
coverage_record = get_one(
self._db, CoverageRecord,
identifier=identifier,
collection=collection,
data_source=self.data_source,
operation=self.operation,
on_multiple='interchangeable',
)
if not force and not self.should_update(coverage_record):
return coverage_record
counts, records = self.process_batch_and_handle_results(
[identifier]
)
if records:
coverage_record = records[0]
else:
coverage_record = None
return coverage_record
[docs] def edition(self, identifier):
"""Finds or creates an Edition representing this coverage provider's
view of a given Identifier.
"""
edition, ignore = Edition.for_foreign_id(
self._db, self.data_source, identifier.type,
identifier.identifier
)
return edition
#
# Implementation of BaseCoverageProvider virtual methods.
#
[docs] def items_that_need_coverage(self, identifiers=None, **kwargs):
"""Find all items lacking coverage from this CoverageProvider.
Items should be Identifiers, though Editions should also work.
By default, all identifiers of the `INPUT_IDENTIFIER_TYPES` which
don't already have coverage are chosen.
:param identifiers: The batch of identifier objects to test
for coverage. identifiers and self.input_identifiers can
intersect -- if this provider was created for the purpose
of running specific Identifiers, and within those
Identifiers you want to batch, you can use both
parameters.
"""
qu = Identifier.missing_coverage_from(
self._db, self.input_identifier_types, self.data_source,
count_as_missing_before=self.cutoff_time, operation=self.operation,
identifiers=self.input_identifiers, collection=self.collection_or_not,
**kwargs
)
if identifiers:
qu = qu.filter(Identifier.id.in_([x.id for x in identifiers]))
if not identifiers and identifiers != None:
# An empty list was provided. The returned query should be empty.
qu = qu.filter(Identifier.id==None)
if self.registered_only:
# Return Identifiers that have been "registered" for coverage
# or already have a failure from previous coverage attempts.
qu = qu.filter(CoverageRecord.id != None)
return qu
[docs] def add_coverage_record_for(self, item):
"""Record this CoverageProvider's coverage for the given
Edition/Identifier, as a CoverageRecord.
"""
record, is_new = CoverageRecord.add_for(
item, data_source=self.data_source, operation=self.operation,
collection=self.collection_or_not
)
record.status = CoverageRecord.SUCCESS
record.exception = None
return record
[docs] def record_failure_as_coverage_record(self, failure):
"""Turn a CoverageFailure into a CoverageRecord object."""
return failure.to_coverage_record(operation=self.operation)
[docs] def failure_for_ignored_item(self, item):
"""Create a CoverageFailure recording the CoverageProvider's
failure to even try to process an item.
"""
return self.failure(
item, "Was ignored by CoverageProvider.", transient=True
)
[docs]class CollectionCoverageProvider(IdentifierCoverageProvider):
"""A CoverageProvider that covers all the Identifiers currently
licensed to a given Collection.
You should subclass this CoverageProvider if you want to create
Works (as opposed to operating on existing Works) or update the
circulation information for LicensePools. You can't use it to
create new LicensePools, since it only operates on Identifiers
that already have a LicencePool in the given Collection.
If a book shows up in multiple Collections, the first Collection
to process it takes care of it for the others. Any books that were
processed through their membership in another Collection will be
left alone.
For this reason it's important that subclasses of this
CoverageProvider only deal with bibliographic information and
format availability information (such as links to open-access
downloads). You'll have problems if you try to use
CollectionCoverageProvider to keep track of information like the
number of licenses available for a book.
In addition to defining the class variables defined by
CoverageProvider, you must define the class variable PROTOCOL when
subclassing this class. This is the entity that provides the
licenses for this Collection. It should be one of the
collection-type provider constants defined in the
`ExternalIntegration` class, such as
ExternalIntegration.OPDS_IMPORT or ExternalIntegration.OVERDRIVE.
"""
# By default, this type of CoverageProvider will provide coverage to
# all Identifiers in the given Collection, regardless of their type.
INPUT_IDENTIFIER_TYPES = None
DEFAULT_BATCH_SIZE = 10
# Set this to the name of the protocol managed by this type of
# CoverageProvider. If this CoverageProvider can manage collections
# for any protocol, leave this as None.
PROTOCOL = None
# By default, Works calculated by a CollectionCoverageProvider update
# the ExternalSearchIndex. Set this value to True for applications that
# don't use external search, such as the Metadata Wrangler.
EXCLUDE_SEARCH_INDEX = False
def __init__(self, collection, **kwargs):
"""Constructor.
:param collection: Will provide coverage to all Identifiers with
a LicensePool licensed to the given Collection.
"""
if not isinstance(collection, Collection):
raise CollectionMissing(
"%s must be instantiated with a Collection." % (
self.__class__.__name__
)
)
if self.PROTOCOL and collection.protocol != self.PROTOCOL:
raise ValueError(
"Collection protocol (%s) does not match CoverageProvider protocol (%s)" % (
collection.protocol, self.PROTOCOL
)
)
_db = Session.object_session(collection)
super(CollectionCoverageProvider, self).__init__(
_db, collection, **kwargs
)
def _default_replacement_policy(self, _db):
"""Unless told otherwise, assume that we are getting
this data from a reliable source of both metadata and circulation
information.
"""
return ReplacementPolicy.from_license_source(_db)
[docs] @classmethod
def collections(cls, _db):
"""Returns a list of randomly sorted list of collections covered by the
provider.
"""
if cls.PROTOCOL:
collections = Collection.by_protocol(_db, cls.PROTOCOL)
else:
collections = _db.query(Collection)
return collections.order_by(func.random()).all()
[docs] @classmethod
def all(cls, _db, **kwargs):
"""Yield a sequence of CollectionCoverageProvider instances, one for
every Collection that gets its licenses from cls.PROTOCOL.
CollectionCoverageProviders will be yielded in a random order.
:param kwargs: Keyword arguments passed into the constructor for
CollectionCoverageProvider (or, more likely, one of its subclasses).
"""
for collection in cls.collections(_db):
yield cls(collection, **kwargs)
[docs] def run_once(self, *args, **kwargs):
self.log.info("Considering collection %s", self.collection.name)
return super(CollectionCoverageProvider, self).run_once(
*args, **kwargs
)
[docs] def items_that_need_coverage(self, identifiers=None, **kwargs):
"""Find all Identifiers associated with this Collection but lacking
coverage through this CoverageProvider.
"""
qu = super(CollectionCoverageProvider, self).items_that_need_coverage(
identifiers, **kwargs
)
qu = qu.join(Identifier.licensed_through).filter(
LicensePool.collection_id==self.collection_id
)
return qu
[docs] def license_pool(self, identifier, data_source=None):
"""Finds this Collection's LicensePool for the given Identifier,
creating one if necessary.
:param data_source: If it's necessary to create a LicensePool,
the new LicensePool will have this DataSource. The default is to
use the DataSource associated with the CoverageProvider. This
should only be needed by the metadata wrangler.
"""
license_pools = [
p for p in identifier.licensed_through
if self.collection==p.collection
]
if license_pools:
# A given Collection may have at most one LicensePool for
# a given identifier.
return license_pools[0]
data_source = data_source or self.data_source
if isinstance(data_source, (bytes, str)):
data_source = DataSource.lookup(self._db, data_source)
# This Collection has no LicensePool for the given Identifier.
# Create one.
#
# Under normal circumstances, this will never happen, because
# CollectionCoverageProvider only operates on Identifiers that
# already have a LicensePool in this Collection.
#
# However, this does happen in the metadata wrangler,
# which typically has to manage information about books it has no
# rights to.
license_pool, ignore = LicensePool.for_foreign_id(
self._db, data_source, identifier.type,
identifier.identifier, collection=self.collection
)
return license_pool
[docs] def work(self, identifier, license_pool=None, **calculate_work_kwargs):
"""Finds or creates a Work for this Identifier as licensed through
this Collection.
If the given Identifier already has a Work associated with it,
that Work will always be used, since an Identifier can only have one
Work associated with it.
However, if there is no current Work, a Work will only be
created if the given Identifier already has a LicensePool in
the Collection associated with this CoverageProvider (or if a
LicensePool to use is provided.) This method will not create
new LicensePools.
If the work is newly created or an existing work is not
presentation-ready, a new Work will be created by calling
LicensePool.calculate_work(). If there is an existing
presentation-ready work, calculate_work() will not be called;
instead, the work will be slated for recalculation when its
metadata changes through Metadata.apply().
:param calculate_work_kwargs: Keyword arguments to pass into
calculate_work() if and when it is called.
:return: A Work, if possible. Otherwise, a CoverageFailure explaining
why no Work could be created.
"""
work = identifier.work
if work and work.presentation_ready:
# There is already a presentation-ready Work associated
# with this Identifier. Return it.
return work
# There is no presentation-ready Work associated with this
# Identifier. We need to create one, if possible.
error = None
if not license_pool:
license_pool, ignore = LicensePool.for_foreign_id(
self._db, self.data_source, identifier.type,
identifier.identifier, collection=self.collection,
autocreate=False
)
if license_pool:
if (not license_pool.work
or not license_pool.work.presentation_ready):
for (v, default) in (
('exclude_search', self.EXCLUDE_SEARCH_INDEX),
):
if not v in calculate_work_kwargs:
calculate_work_kwargs[v] = default
# Calling calculate_work will calculate the work's
# presentation and make it presentation-ready if
# possible.
work, created = license_pool.calculate_work(
**calculate_work_kwargs
)
if not work:
error = "Work could not be calculated"
else:
error = "Cannot locate LicensePool"
if error:
return self.failure(identifier, error, transient=True)
return work
def _set_circulationdata(self, identifier, circulationdata):
"""Finds or creates a LicensePool for an Identifier, updates it
with the given circulationdata, then creates a Work for the book.
:return: The Identifier (if successful) or an appropriate
CoverageFailure (if not).
"""
error = None
if circulationdata:
primary_identifier = circulationdata.primary_identifier(self._db)
if identifier != primary_identifier:
error = "Identifier did not match CirculationData's primary identifier."
else:
error = "Did not receive circulationdata from input source"
if error:
return self.failure(identifier, error, transient=True)
try:
circulationdata.apply(
self._db, self.collection, replace=self.replacement_policy
)
except Exception as e:
if self.collection:
collection_name = " to collection %s" % self.collection.name
else:
collection_name = ""
self.log.warning(
"Error applying circulationdata%s: %s",
collection_name, e, exc_info=e
)
return self.failure(identifier, repr(e), transient=True)
return identifier
[docs] def set_presentation_ready(self, identifier):
"""Set a Work presentation-ready."""
work = self.work(identifier)
if isinstance(work, CoverageFailure):
return work
work.set_presentation_ready(exclude_search=self.EXCLUDE_SEARCH_INDEX)
return identifier
[docs]class CollectionCoverageProviderJob(DatabaseJob):
def __init__(self, collection, provider_class, progress,
**provider_kwargs
):
self.collection = collection
self.progress = progress
self.provider_class = provider_class
self.provider_kwargs = provider_kwargs
[docs] def run(self, _db, **kwargs):
collection = _db.merge(self.collection)
provider = self.provider_class(collection, **self.provider_kwargs)
provider.run_once(self.progress)
provider.finalize_timestampdata(self.progress)
[docs]class CatalogCoverageProvider(CollectionCoverageProvider):
"""Most CollectionCoverageProviders provide coverage to Identifiers
that are licensed through a given Collection.
A CatalogCoverageProvider provides coverage to Identifiers that
are present in a given Collection's catalog.
"""
[docs] def items_that_need_coverage(self, identifiers=None, **kwargs):
"""Find all Identifiers in this Collection's catalog but lacking
coverage through this CoverageProvider.
"""
qu = super(CollectionCoverageProvider, self).items_that_need_coverage(
identifiers, **kwargs
)
qu = qu.join(Identifier.collections).filter(
Collection.id==self.collection_id
)
return qu
[docs]class BibliographicCoverageProvider(CollectionCoverageProvider):
"""Fill in bibliographic metadata for all books in a Collection.
e.g. ensures that we get Overdrive coverage for all Overdrive IDs
in a collection.
Although a BibliographicCoverageProvider may gather
CirculationData for a book, it cannot guarantee equal coverage for
all Collections that contain that book. CirculationData should be
limited to things like formats that don't vary between
Collections, and you should use a CollectionMonitor to make sure
your circulation information is up-to-date for each Collection.
"""
[docs] def handle_success(self, identifier):
"""Once a book has bibliographic coverage, it can be given a
work and made presentation ready.
"""
self.set_presentation_ready(identifier)
[docs]class WorkCoverageProvider(BaseCoverageProvider):
"""Perform coverage operations on Works rather than Identifiers."""
[docs] @classmethod
def register(cls, work, force=False):
"""Registers a work for future coverage.
This method is primarily for use with CoverageProviders that use the
`registered_only` flag to process items. It's currently only in use
on the Metadata Wrangler.
:param force: Set to True to reset an existing CoverageRecord's status
"registered", regardless of its current status.
"""
was_registered = True
if not force:
record = WorkCoverageRecord.lookup(work, cls.OPERATION)
if record:
was_registered = False
return record, was_registered
# WorkCoverageRecord.add_for overwrites the status already,
# so it can be used to create and to force-register records.
record, is_new = WorkCoverageRecord.add_for(
work, cls.OPERATION, status=CoverageRecord.REGISTERED
)
return record, was_registered
#
# Implementation of BaseCoverageProvider virtual methods.
#
[docs] def items_that_need_coverage(self, identifiers=None, **kwargs):
"""Find all Works lacking coverage from this CoverageProvider.
By default, all Works which don't already have coverage are
chosen.
:param: Only Works connected with one of the given identifiers
are chosen.
"""
qu = Work.missing_coverage_from(
self._db, operation=self.operation,
count_as_missing_before=self.cutoff_time,
**kwargs
)
if identifiers:
ids = [x.id for x in identifiers]
qu = qu.join(Work.license_pools).filter(
LicensePool.identifier_id.in_(ids)
)
if self.registered_only:
# Return Identifiers that have been "registered" for coverage
# or already have a failure from previous coverage attempts.
qu = qu.filter(WorkCoverageRecord.id != None)
return qu
[docs] def failure(self, work, error, transient=True):
"""Create a CoverageFailure object."""
return CoverageFailure(work, error, transient=transient)
[docs] def failure_for_ignored_item(self, work):
"""Create a CoverageFailure recording the WorkCoverageProvider's
failure to even try to process a Work.
"""
return CoverageFailure(
work, "Was ignored by WorkCoverageProvider.", transient=True
)
[docs] def add_coverage_records_for(self, works):
"""Add WorkCoverageRecords for a group of works from a batch,
each of which was successful.
"""
WorkCoverageRecord.bulk_add(
works, operation=self.operation
)
# We can't return the specific WorkCoverageRecords that were
# created, but it doesn't matter because they're not used except
# in tests.
return []
[docs] def add_coverage_record_for(self, work):
"""Record this CoverageProvider's coverage for the given
Edition/Identifier, as a WorkCoverageRecord.
"""
return WorkCoverageRecord.add_for(work, operation=self.operation)
[docs] def record_failure_as_coverage_record(self, failure):
"""Turn a CoverageFailure into a WorkCoverageRecord object."""
return failure.to_work_coverage_record(operation=self.operation)
[docs]class PresentationReadyWorkCoverageProvider(WorkCoverageProvider):
"""A WorkCoverageProvider that only covers presentation-ready works.
"""
[docs] def items_that_need_coverage(self, identifiers=None, **kwargs):
qu = super(PresentationReadyWorkCoverageProvider, self).items_that_need_coverage(
identifiers, **kwargs
)
qu = qu.filter(Work.presentation_ready==True)
return qu
[docs]class WorkPresentationProvider(PresentationReadyWorkCoverageProvider):
"""Recalculate some part of presentation for works that are
presentation-ready.
A Work's presentation is set when it's made presentation-ready
(thus the name). When that happens, a number of WorkCoverageRecords
are set for that Work.
A migration script may remove a coverage record if it knows a work
needs to have some aspect of its presentation recalculated. These
providers give back the 'missing' coverage.
"""
DEFAULT_BATCH_SIZE = 100
[docs]class OPDSEntryWorkCoverageProvider(WorkPresentationProvider):
"""Make sure all presentation-ready works have an up-to-date OPDS
entry.
This is different from the OPDSEntryCacheMonitor, which sweeps
over all presentation-ready works, even ones which are already
covered.
"""
SERVICE_NAME = "OPDS Entry Work Coverage Provider"
OPERATION = WorkCoverageRecord.GENERATE_OPDS_OPERATION
DEFAULT_BATCH_SIZE = 1000
[docs] def process_item(self, work):
work.calculate_opds_entries()
return work
[docs]class MARCRecordWorkCoverageProvider(WorkPresentationProvider):
"""Make sure all presentation-ready works have an up-to-date MARC
record.
"""
SERVICE_NAME = "MARC Record Work Coverage Provider"
OPERATION = WorkCoverageRecord.GENERATE_MARC_OPERATION
DEFAULT_BATCH_SIZE = 1000
[docs] def process_item(self, work):
work.calculate_marc_record()
return work
[docs]class WorkPresentationEditionCoverageProvider(WorkPresentationProvider):
"""Make sure each Work has an up-to-date presentation edition.
This basically means comparing all the Editions associated with the
Work and building a composite Edition.
Expensive operations -- calculating work quality, summary, and genre
classification -- are reserved for WorkClassificationCoverageProvider
"""
SERVICE_NAME = 'Calculated presentation coverage provider'
OPERATION = WorkCoverageRecord.CHOOSE_EDITION_OPERATION
POLICY = PresentationCalculationPolicy(
choose_edition=True, set_edition_metadata=True, verbose=True,
# These are the expensive ones, and they're covered by
# WorkSummaryQualityClassificationCoverageProvider.
classify=False, choose_summary=False, calculate_quality=False,
# It would be better if there were a separate class for this
# operation (COVER_OPERATION), but it's a little complicated because
# that's not a WorkCoverageRecord operation.
choose_cover=True,
# We do this even though it's redundant with
# OPDSEntryWorkCoverageProvider. If you change a
# Work's presentation edition but don't update its OPDS entry,
# it effectively didn't happen.
regenerate_opds_entries=True,
# Same logic for the search index. This will flag the Work as
# needing a search index update, and SearchIndexCoverageProvider
# will take care of it.
update_search_index=True,
)
[docs] def process_item(self, work):
"""Recalculate the presentation for a Work."""
# Calling calculate_presentation_edition won't, on its own,
# regenerate the OPDS feeds or update the search index. So we
# call calculate_presentation with a policy that ensures the
# presentation edition will be reevaluated, but nothing
# expensive will happen.
work.calculate_presentation(self.POLICY)
return work
[docs]class WorkClassificationCoverageProvider(
WorkPresentationEditionCoverageProvider
):
"""Calculates the 'expensive' parts of a work's presentation:
classifications, summary, and quality.
We do all three at once because these gathering together all
equivalent identifiers for the work, which can be, by far, the
most expensive part of the work.
This is called 'classification' because that's the most likely use
of this coverage provider. If you want to make sure a bunch of
works get their summaries recalculated, you need to remember that
the coverage record to delete is CLASSIFY_OPERATION.
"""
SERVICE_NAME = "Work classification coverage provider"
DEFAULT_BATCH_SIZE = 20
OPERATION = WorkCoverageRecord.CLASSIFY_OPERATION
# This is going to be expensive -- we might as well recalculate
# everything.
POLICY = PresentationCalculationPolicy.recalculate_everything()