Source code for core.model.coverage

# encoding: utf-8
# BaseCoverageRecord, Timestamp, CoverageRecord, WorkCoverageRecord

from sqlalchemy import (
    Column,
    DateTime,
    Enum,
    ForeignKey,
    Index,
    Integer,
    String,
    Unicode,
    UniqueConstraint,
)
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import (
    and_,
    or_,
    literal,
    literal_column,
)

from . import (
    Base,
    get_one,
    get_one_or_create,
)
from ..util.datetime_helpers import utc_now

[docs]class BaseCoverageRecord(object): """Contains useful constants used by both CoverageRecord and WorkCoverageRecord. """ SUCCESS = 'success' TRANSIENT_FAILURE = 'transient failure' PERSISTENT_FAILURE = 'persistent failure' REGISTERED = 'registered' ALL_STATUSES = [REGISTERED, SUCCESS, TRANSIENT_FAILURE, PERSISTENT_FAILURE] # Count coverage as attempted if the record is not 'registered'. PREVIOUSLY_ATTEMPTED = [SUCCESS, TRANSIENT_FAILURE, PERSISTENT_FAILURE] # By default, count coverage as present if it ended in # success or in persistent failure. Do not count coverage # as present if it ended in transient failure. DEFAULT_COUNT_AS_COVERED = [SUCCESS, PERSISTENT_FAILURE] status_enum = Enum(SUCCESS, TRANSIENT_FAILURE, PERSISTENT_FAILURE, REGISTERED, name='coverage_status')
[docs] @classmethod def not_covered(cls, count_as_covered=None, count_as_not_covered_if_covered_before=None): """Filter a query to find only items without coverage records. :param count_as_covered: A list of constants that indicate types of coverage records that should count as 'coverage' for purposes of this query. :param count_as_not_covered_if_covered_before: If a coverage record exists, but is older than the given date, do not count it as covered. :return: A clause that can be passed in to Query.filter(). """ if not count_as_covered: count_as_covered = cls.DEFAULT_COUNT_AS_COVERED elif isinstance(count_as_covered, (bytes, str)): count_as_covered = [count_as_covered] # If there is no coverage record, then of course the item is # not covered. missing = cls.id==None # If we're looking for specific coverage statuses, then a # record does not count if it has some other status. missing = or_( missing, ~cls.status.in_(count_as_covered) ) # If the record's timestamp is before the cutoff time, we # don't count it as covered, regardless of which status it # has. if count_as_not_covered_if_covered_before: missing = or_( missing, cls.timestamp < count_as_not_covered_if_covered_before ) return missing
[docs]class Timestamp(Base): """Tracks the activities of Monitors, CoverageProviders, and general scripts. """ __tablename__ = 'timestamps' MONITOR_TYPE = "monitor" COVERAGE_PROVIDER_TYPE = "coverage_provider" SCRIPT_TYPE = "script" # A stand-in value used to indicate that a field in the timestamps # table should be explicitly set to None. Passing in None for most # fields will use default values. CLEAR_VALUE = object() service_type_enum = Enum( MONITOR_TYPE, COVERAGE_PROVIDER_TYPE, SCRIPT_TYPE, name="service_type", ) # Unique ID id = Column(Integer, primary_key=True) # Name of the service. service = Column(String(255), index=True, nullable=False) # Type of the service -- monitor, coverage provider, or script. # If the service type does not fit into these categories, this field # can be left null. service_type = Column(service_type_enum, index=True, default=None) # The collection, if any, associated with this service -- some services # run separately on a number of collections. collection_id = Column(Integer, ForeignKey('collections.id'), index=True, nullable=True) # The last time the service _started_ running. start = Column(DateTime(timezone=True), nullable=True) # The last time the service _finished_ running. In most cases this # is the 'timestamp' proper. finish = Column(DateTime(timezone=True)) # A description of the things the service achieved during its last # run. Each service may decide for itself what counts as an # 'achievement'; this is just a way to distinguish services that # do a lot of things from services that do a few things, or to see # services that run to completion but don't actually do anything. achievements = Column(Unicode, nullable=True) # This column allows a service to keep one item of state between # runs. For example, a monitor that iterates over a database table # needs to keep track of the last database ID it processed. counter = Column(Integer, nullable=True) # The exception, if any, that stopped the service from running # during its previous run. exception = Column(Unicode, nullable=True) def __repr__(self): format = '%b %d, %Y at %H:%M' if self.finish: finish = self.finish.strftime(format) else: finish = None if self.start: start = self.start.strftime(format) else: start = None if self.collection: collection = self.collection.name else: collection = None message = "<Timestamp %s: collection=%s, start=%s finish=%s counter=%s>" % ( self.service, collection, start, finish, self.counter ) return message
[docs] @classmethod def lookup(cls, _db, service, service_type, collection): return get_one( _db, Timestamp, service=service, service_type=service_type, collection=collection )
[docs] @classmethod def value(cls, _db, service, service_type, collection): """Return the current value of the given Timestamp, if it exists. """ stamp = cls.lookup(_db, service, service_type, collection) if not stamp: return None return stamp.finish
[docs] @classmethod def stamp( cls, _db, service, service_type, collection=None, start=None, finish=None, achievements=None, counter=None, exception=None ): """Set a Timestamp, creating it if necessary. This should be called once a service has stopped running, whether or not it was able to complete its task. :param _db: A database connection. :param service: The name of the service associated with the Timestamp. :param service_type: The type of the service associated with the Timestamp. This must be one of the values in Timestmap.service_type_enum. :param collection: The Collection, if any, on which this service just ran. :param start: The time at which this service started running. Defaults to now. :param finish: The time at which this service stopped running. Defaults to now. :param achievements: A human-readable description of what the service did during its run. :param counter: An integer item of state that the service may use to track its progress between runs. :param exception: A stack trace for the exception, if any, which stopped the service from running. """ if start is None and finish is None: start = finish = utc_now() elif start is None: start = finish elif finish is None: finish = start stamp, was_new = get_one_or_create( _db, Timestamp, service=service, service_type=service_type, collection=collection, ) stamp.update(start, finish, achievements, counter, exception) # Committing immediately reduces the risk of contention. _db.commit() return stamp
[docs] def update(self, start=None, finish=None, achievements=None, counter=None, exception=None): """Use a single method to update all the fields that aren't used to identify a Timestamp. """ if start is not None: if start is self.CLEAR_VALUE: # In most cases, None is not a valid value for # Timestamp.start, but this can be overridden. start = None self.start = start if finish is not None: if finish is self.CLEAR_VALUE: # In most cases, None is not a valid value for # Timestamp.finish, but this can be overridden. finish = None self.finish = finish if achievements is not None: if achievements is self.CLEAR_VALUE: achievements = None self.achievements = achievements if counter is not None: if counter is self.CLEAR_VALUE: counter = None self.counter = counter # Unlike the other fields, None is the default value for # .exception, so passing in None to mean "use the default" and # None to mean "no exception" mean the same thing. But we'll # support CLEAR_VALUE anyway. if exception is self.CLEAR_VALUE: exception = None self.exception = exception
[docs] def to_data(self): """Convert this Timestamp to an unfinalized TimestampData.""" from ..metadata_layer import TimestampData return TimestampData( start=self.start, finish=self.finish, achievements=self.achievements, counter=self.counter )
__table_args__ = ( UniqueConstraint('service', 'collection_id'), )
[docs]class CoverageRecord(Base, BaseCoverageRecord): """A record of a Identifier being used as input into some process.""" __tablename__ = 'coveragerecords' SET_EDITION_METADATA_OPERATION = 'set-edition-metadata' CHOOSE_COVER_OPERATION = 'choose-cover' REAP_OPERATION = 'reap' IMPORT_OPERATION = 'import' RESOLVE_IDENTIFIER_OPERATION = 'resolve-identifier' REPAIR_SORT_NAME_OPERATION = 'repair-sort-name' METADATA_UPLOAD_OPERATION = 'metadata-upload' id = Column(Integer, primary_key=True) identifier_id = Column( Integer, ForeignKey('identifiers.id'), index=True) # If applicable, this is the ID of the data source that took the # Identifier as input. data_source_id = Column( Integer, ForeignKey('datasources.id') ) operation = Column(String(255), default=None) timestamp = Column(DateTime(timezone=True), index=True) status = Column(BaseCoverageRecord.status_enum, index=True) exception = Column(Unicode, index=True) # If applicable, this is the ID of the collection for which # coverage has taken place. This is currently only applicable # for Metadata Wrangler coverage. collection_id = Column( Integer, ForeignKey('collections.id'), nullable=True ) __table_args__ = ( Index( 'ix_identifier_id_data_source_id_operation', identifier_id, data_source_id, operation, unique=True, postgresql_where=collection_id.is_(None)), Index( 'ix_identifier_id_data_source_id_operation_collection_id', identifier_id, data_source_id, operation, collection_id, unique=True ), ) def __repr__(self): template = '<CoverageRecord: %(timestamp)s identifier=%(identifier_type)s/%(identifier)s data_source="%(data_source)s"%(operation)s status="%(status)s" %(exception)s>' return self.human_readable(template)
[docs] def human_readable(self, template): """Interpolate data into a human-readable template.""" if self.operation: operation = ' operation="%s"' % self.operation else: operation = '' if self.exception: exception = ' exception="%s"' % self.exception else: exception = '' return template % dict( timestamp=self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), identifier_type=self.identifier.type, identifier=self.identifier.identifier, data_source=self.data_source.name, operation=operation, status=self.status, exception=exception, )
[docs] @classmethod def lookup(cls, edition_or_identifier, data_source, operation=None, collection=None): from .datasource import DataSource from .edition import Edition from .identifier import Identifier _db = Session.object_session(edition_or_identifier) if isinstance(edition_or_identifier, Identifier): identifier = edition_or_identifier elif isinstance(edition_or_identifier, Edition): identifier = edition_or_identifier.primary_identifier else: raise ValueError( "Cannot look up a coverage record for %r." % edition) if isinstance(data_source, (bytes, str)): data_source = DataSource.lookup(_db, data_source) return get_one( _db, CoverageRecord, identifier=identifier, data_source=data_source, operation=operation, collection=collection, on_multiple='interchangeable', )
[docs] @classmethod def add_for(self, edition, data_source, operation=None, timestamp=None, status=BaseCoverageRecord.SUCCESS, collection=None): from .edition import Edition from .identifier import Identifier _db = Session.object_session(edition) if isinstance(edition, Identifier): identifier = edition elif isinstance(edition, Edition): identifier = edition.primary_identifier else: raise ValueError( "Cannot create a coverage record for %r." % edition) timestamp = timestamp or utc_now() coverage_record, is_new = get_one_or_create( _db, CoverageRecord, identifier=identifier, data_source=data_source, operation=operation, collection=collection, on_multiple='interchangeable' ) coverage_record.status = status coverage_record.timestamp = timestamp return coverage_record, is_new
[docs] @classmethod def bulk_add(cls, identifiers, data_source, operation=None, timestamp=None, status=BaseCoverageRecord.SUCCESS, exception=None, collection=None, force=False, ): """Create and update CoverageRecords so that every Identifier in `identifiers` has an identical record. """ from .identifier import Identifier if not identifiers: # Nothing to do. return _db = Session.object_session(identifiers[0]) timestamp = timestamp or utc_now() identifier_ids = [i.id for i in identifiers] equivalent_record = and_( cls.operation==operation, cls.data_source==data_source, cls.collection==collection, ) updated_or_created_results = list() if force: # Make sure that works that previously had a # CoverageRecord for this operation have their timestamp # and status updated. update = cls.__table__.update().where(and_( cls.identifier_id.in_(identifier_ids), equivalent_record, )).values( dict(timestamp=timestamp, status=status, exception=exception) ).returning(cls.id, cls.identifier_id) updated_or_created_results = _db.execute(update).fetchall() already_covered = _db.query(cls.id, cls.identifier_id).filter( equivalent_record, cls.identifier_id.in_(identifier_ids), ).subquery() # Make sure that any identifiers that need a CoverageRecord get one. # The SELECT part of the INSERT...SELECT query. data_source_id = data_source.id collection_id = None if collection: collection_id = collection.id new_records = _db.query( Identifier.id.label('identifier_id'), literal(operation, type_=String(255)).label('operation'), literal(timestamp, type_=DateTime).label('timestamp'), literal(status, type_=BaseCoverageRecord.status_enum).label('status'), literal(exception, type_=Unicode).label('exception'), literal(data_source_id, type_=Integer).label('data_source_id'), literal(collection_id, type_=Integer).label('collection_id'), ).select_from(Identifier).outerjoin( already_covered, Identifier.id==already_covered.c.identifier_id, ).filter(already_covered.c.id==None) new_records = new_records.filter(Identifier.id.in_(identifier_ids)) # The INSERT part. insert = cls.__table__.insert().from_select( [ literal_column('identifier_id'), literal_column('operation'), literal_column('timestamp'), literal_column('status'), literal_column('exception'), literal_column('data_source_id'), literal_column('collection_id'), ], new_records ).returning(cls.id, cls.identifier_id) inserts = _db.execute(insert).fetchall() updated_or_created_results.extend(inserts) _db.commit() # Default return for the case when all of the identifiers were # ignored. new_records = list() ignored_identifiers = identifiers new_and_updated_record_ids = [r[0] for r in updated_or_created_results] impacted_identifier_ids = [r[1] for r in updated_or_created_results] if new_and_updated_record_ids: new_records = _db.query(cls).filter(cls.id.in_( new_and_updated_record_ids )).all() ignored_identifiers = [i for i in identifiers if i.id not in impacted_identifier_ids] return new_records, ignored_identifiers
Index("ix_coveragerecords_data_source_id_operation_identifier_id", CoverageRecord.data_source_id, CoverageRecord.operation, CoverageRecord.identifier_id)
[docs]class WorkCoverageRecord(Base, BaseCoverageRecord): """A record of some operation that was performed on a Work. This is similar to CoverageRecord, which operates on Identifiers, but since Work identifiers have no meaning outside of the database, we presume that all the operations involve internal work only, and as such there is no data_source_id. """ __tablename__ = 'workcoveragerecords' CHOOSE_EDITION_OPERATION = 'choose-edition' CLASSIFY_OPERATION = 'classify' SUMMARY_OPERATION = 'summary' QUALITY_OPERATION = 'quality' GENERATE_OPDS_OPERATION = 'generate-opds' GENERATE_MARC_OPERATION = 'generate-marc' UPDATE_SEARCH_INDEX_OPERATION = 'update-search-index' id = Column(Integer, primary_key=True) work_id = Column(Integer, ForeignKey('works.id'), index=True) operation = Column(String(255), index=True, default=None) timestamp = Column(DateTime(timezone=True), index=True) status = Column(BaseCoverageRecord.status_enum, index=True) exception = Column(Unicode, index=True) __table_args__ = ( UniqueConstraint('work_id', 'operation'), ) def __repr__(self): if self.exception: exception = ' exception="%s"' % self.exception else: exception = '' template = '<WorkCoverageRecord: work_id=%s operation="%s" timestamp="%s"%s>' return template % ( self.work_id, self.operation, self.timestamp.strftime("%Y-%m-%d %H:%M:%S"), exception )
[docs] @classmethod def lookup(self, work, operation): _db = Session.object_session(work) return get_one( _db, WorkCoverageRecord, work=work, operation=operation, on_multiple='interchangeable', )
[docs] @classmethod def add_for(self, work, operation, timestamp=None, status=CoverageRecord.SUCCESS): _db = Session.object_session(work) timestamp = timestamp or utc_now() coverage_record, is_new = get_one_or_create( _db, WorkCoverageRecord, work=work, operation=operation, on_multiple='interchangeable' ) coverage_record.status = status coverage_record.timestamp = timestamp return coverage_record, is_new
[docs] @classmethod def bulk_add(self, works, operation, timestamp=None, status=CoverageRecord.SUCCESS, exception=None): """Create and update WorkCoverageRecords so that every Work in `works` has an identical record. """ from .work import Work if not works: # Nothing to do. return _db = Session.object_session(works[0]) timestamp = timestamp or utc_now() work_ids = [w.id for w in works] # Make sure that works that previously had a # WorkCoverageRecord for this operation have their timestamp # and status updated. update = WorkCoverageRecord.__table__.update().where( and_(WorkCoverageRecord.work_id.in_(work_ids), WorkCoverageRecord.operation==operation) ).values(dict(timestamp=timestamp, status=status, exception=exception)) _db.execute(update) # Make sure that any works that are missing a # WorkCoverageRecord for this operation get one. # Works that already have a WorkCoverageRecord will be ignored # by the INSERT but handled by the UPDATE. already_covered = _db.query(WorkCoverageRecord.work_id).select_from( WorkCoverageRecord).filter( WorkCoverageRecord.work_id.in_(work_ids) ).filter( WorkCoverageRecord.operation==operation ) # The SELECT part of the INSERT...SELECT query. new_records = _db.query( Work.id.label('work_id'), literal(operation, type_=String(255)).label('operation'), literal(timestamp, type_=DateTime).label('timestamp'), literal(status, type_=BaseCoverageRecord.status_enum).label('status') ).select_from( Work ) new_records = new_records.filter( Work.id.in_(work_ids) ).filter( ~Work.id.in_(already_covered) ) # The INSERT part. insert = WorkCoverageRecord.__table__.insert().from_select( [ literal_column('work_id'), literal_column('operation'), literal_column('timestamp'), literal_column('status'), ], new_records ) _db.execute(insert)
Index("ix_workcoveragerecords_operation_work_id", WorkCoverageRecord.operation, WorkCoverageRecord.work_id)