# encoding: utf-8
# Collection, CollectionIdentifier, CollectionMissing
from abc import ABCMeta, abstractmethod
from sqlalchemy import (
Column,
exists,
ForeignKey,
func,
Integer,
Table,
Unicode,
UniqueConstraint,
Boolean
)
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import (
backref,
contains_eager,
joinedload,
mapper,
relationship,
)
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import (
and_,
or_,
)
from .configuration import (
ConfigurationSetting,
ExternalIntegration,
BaseConfigurationStorage)
from .constants import EditionConstants
from .coverage import (
CoverageRecord,
WorkCoverageRecord,
)
from .datasource import DataSource
from .edition import Edition
from .hasfulltablecache import HasFullTableCache
from .identifier import Identifier
from .integrationclient import IntegrationClient
from .library import Library
from .licensing import (
LicensePool,
LicensePoolDeliveryMechanism,
)
from .work import Work
from . import (
Base,
create,
get_one,
get_one_or_create,
)
from ..util.string_helpers import base64
[docs]class Collection(Base, HasFullTableCache):
"""A Collection is a set of LicensePools obtained through some mechanism.
"""
__tablename__ = 'collections'
id = Column(Integer, primary_key=True)
name = Column(Unicode, unique=True, nullable=False, index=True)
DATA_SOURCE_NAME_SETTING = 'data_source'
# For use in forms that edit Collections.
EXTERNAL_ACCOUNT_ID_KEY = 'external_account_id'
# How does the provider of this collection distinguish it from
# other collections it provides? On the other side this is usually
# called a "library ID".
external_account_id = Column(Unicode, nullable=True)
# How do we connect to the provider of this collection? Any url,
# authentication information, or additional configuration goes
# into the external integration, as does the 'protocol', which
# designates the integration technique we will use to actually get
# the metadata and licenses. Each Collection has a distinct
# ExternalIntegration.
external_integration_id = Column(
Integer, ForeignKey('externalintegrations.id'), unique=True, index=True)
# A Collection may specialize some other Collection. For instance,
# an Overdrive Advantage collection is a specialization of an
# ordinary Overdrive collection. It uses the same access key and
# secret as the Overdrive collection, but it has a distinct
# external_account_id.
parent_id = Column(Integer, ForeignKey('collections.id'), index=True)
# When deleting a collection, this flag is set to True so that the deletion
# script can take care of deleting it in the background. This is
# useful for deleting large collections which can timeout when deleting.
marked_for_deletion = Column(Boolean, default=False)
# A collection may have many child collections. For example,
# An Overdrive collection may have many children corresponding
# to Overdrive Advantage collections.
children = relationship(
"Collection", backref=backref("parent", remote_side = [id]),
uselist=True
)
# A Collection can provide books to many Libraries.
libraries = relationship(
"Library", secondary=lambda: collections_libraries,
backref="collections"
)
# A Collection can include many LicensePools.
licensepools = relationship(
"LicensePool", backref="collection",
cascade="all, delete-orphan"
)
# A Collection can have many associated Credentials.
credentials = relationship("Credential", backref="collection", cascade="delete")
# A Collection can be monitored by many Monitors, each of which
# will have its own Timestamp.
timestamps = relationship("Timestamp", backref="collection")
catalog = relationship(
"Identifier", secondary=lambda: collections_identifiers,
backref="collections"
)
# A Collection can be associated with multiple CoverageRecords
# for Identifiers in its catalog.
coverage_records = relationship(
"CoverageRecord", backref="collection",
cascade="all"
)
# A collection may be associated with one or more custom lists.
# When a new license pool is added to the collection, it will
# also be added to the list. Admins can remove items from the
# the list and they won't be added back, so the list doesn't
# necessarily match the collection.
customlists = relationship(
"CustomList", secondary=lambda: collections_customlists,
backref="collections"
)
_cache = HasFullTableCache.RESET
_id_cache = HasFullTableCache.RESET
# Most data sources offer different catalogs to different
# libraries. Data sources in this list offer the same catalog to
# every library.
GLOBAL_COLLECTION_DATA_SOURCES = [DataSource.ENKI]
def __repr__(self):
return '<Collection "%s"/"%s" ID=%d>' % (
self.name, self.protocol, self.id
)
[docs] def cache_key(self):
return (self.name, self.external_integration.protocol)
[docs] @classmethod
def by_name_and_protocol(cls, _db, name, protocol):
"""Find or create a Collection with the given name and the given
protocol.
This method uses the full-table cache if possible.
:return: A 2-tuple (collection, is_new)
"""
key = (name, protocol)
def lookup_hook():
return cls._by_name_and_protocol(_db, key)
return cls.by_cache_key(_db, key, lookup_hook)
@classmethod
def _by_name_and_protocol(cls, _db, cache_key):
"""Find or create a Collection with the given name and the given
protocol.
We can't use get_one_or_create because the protocol is kept in
a separate database object, (an ExternalIntegration).
:return: A 2-tuple (collection, is_new)
"""
name, protocol = cache_key
qu = cls.by_protocol(_db, protocol)
qu = qu.filter(Collection.name==name)
try:
collection = qu.one()
is_new = False
except NoResultFound as e:
# Make a new Collection.
collection, is_new = get_one_or_create(_db, Collection, name=name)
if not is_new and collection.protocol != protocol:
# The collection already exists, it just uses a different
# protocol than the one we asked about.
raise ValueError(
'Collection "%s" does not use protocol "%s".' % (
name, protocol
)
)
integration = collection.create_external_integration(
protocol=protocol
)
collection.external_integration.protocol=protocol
return collection, is_new
[docs] @classmethod
def by_protocol(cls, _db, protocol):
"""Query collections that get their licenses through the given protocol.
Collections marked for deletion are not included.
:param protocol: Protocol to use. If this is None, all
Collections will be returned except those marked for deletion.
"""
qu = _db.query(Collection)
if protocol:
qu = qu.join(
ExternalIntegration,
ExternalIntegration.id==Collection.external_integration_id).filter(
ExternalIntegration.goal==ExternalIntegration.LICENSE_GOAL
).filter(ExternalIntegration.protocol==protocol).filter(
Collection.marked_for_deletion==False
)
return qu
[docs] @classmethod
def by_datasource(cls, _db, data_source):
"""Query collections that are associated with the given DataSource.
Collections marked for deletion are not included.
"""
if isinstance(data_source, DataSource):
data_source = data_source.name
qu = _db.query(cls).join(ExternalIntegration,
cls.external_integration_id==ExternalIntegration.id)\
.join(ExternalIntegration.settings)\
.filter(ConfigurationSetting.key==Collection.DATA_SOURCE_NAME_SETTING)\
.filter(ConfigurationSetting.value==data_source).filter(
Collection.marked_for_deletion==False
)
return qu
@hybrid_property
def protocol(self):
"""What protocol do we need to use to get licenses for this
collection?
"""
return self.external_integration.protocol
@protocol.setter
def protocol(self, new_protocol):
"""Modify the protocol in use by this Collection."""
if self.parent and self.parent.protocol != new_protocol:
raise ValueError(
"Proposed new protocol (%s) contradicts parent collection's protocol (%s)." % (
new_protocol, self.parent.protocol
)
)
self.external_integration.protocol = new_protocol
for child in self.children:
child.protocol = new_protocol
@hybrid_property
def primary_identifier_source(self):
""" Identify if should try to use another identifier than <id> """
return self.external_integration.primary_identifier_source
@primary_identifier_source.setter
def primary_identifier_source(self, new_primary_identifier_source):
""" Modify the primary identifier source in use by this Collection."""
self.external_integration.primary_identifier_source = new_primary_identifier_source
# For collections that can control the duration of the loans they
# create, the durations are stored in these settings and new loans are
# expected to be created using these settings. For collections
# where loan duration is negotiated out-of-bounds, all loans are
# _assumed_ to have these durations unless we hear otherwise from
# the server.
AUDIOBOOK_LOAN_DURATION_KEY = 'audio_loan_duration'
EBOOK_LOAN_DURATION_KEY = 'ebook_loan_duration'
STANDARD_DEFAULT_LOAN_PERIOD = 21
[docs] def default_loan_period(self, library, medium=EditionConstants.BOOK_MEDIUM):
"""Until we hear otherwise from the license provider, we assume
that someone who borrows a non-open-access item from this
collection has it for this number of days.
"""
return self.default_loan_period_setting(
library, medium).int_value or self.STANDARD_DEFAULT_LOAN_PERIOD
[docs] def default_loan_period_setting(self, library, medium=EditionConstants.BOOK_MEDIUM):
"""Until we hear otherwise from the license provider, we assume
that someone who borrows a non-open-access item from this
collection has it for this number of days.
"""
_db = Session.object_session(library)
if medium == EditionConstants.AUDIO_MEDIUM:
key = self.AUDIOBOOK_LOAN_DURATION_KEY
else:
key = self.EBOOK_LOAN_DURATION_KEY
if isinstance(library, Library):
return (
ConfigurationSetting.for_library_and_externalintegration(
_db, key, library, self.external_integration
)
)
elif isinstance(library, IntegrationClient):
return self.external_integration.setting(key)
DEFAULT_RESERVATION_PERIOD_KEY = 'default_reservation_period'
STANDARD_DEFAULT_RESERVATION_PERIOD = 3
@hybrid_property
def default_reservation_period(self):
"""Until we hear otherwise from the license provider, we assume
that someone who puts an item on hold has this many days to
check it out before it goes to the next person in line.
"""
return (
self.external_integration.setting(
self.DEFAULT_RESERVATION_PERIOD_KEY,
).int_value or self.STANDARD_DEFAULT_RESERVATION_PERIOD
)
@default_reservation_period.setter
def default_reservation_period(self, new_value):
new_value = int(new_value)
self.external_integration.setting(
self.DEFAULT_RESERVATION_PERIOD_KEY).value = str(new_value)
# When you import an OPDS feed, you may know the intended audience of the works (e.g. children or researchers),
# even though the OPDS feed may not contain that information.
# It should be possible to configure a collection with a default audience,
# so that books imported from the OPDS feed end up with the right audience.
DEFAULT_AUDIENCE_KEY = 'default_audience'
@hybrid_property
def default_audience(self):
"""Return the default audience set up for this collection.
:return: Default audience
:rtype: Optional[str]
"""
setting = self.external_integration.setting(self.DEFAULT_AUDIENCE_KEY)
return setting.value_or_default(None)
@default_audience.setter
def default_audience(self, new_value):
"""Set the default audience for this collection.
:param new_value: New default audience
:type new_value: Optional[str]
"""
setting = self.external_integration.setting(self.DEFAULT_AUDIENCE_KEY)
setting.value = str(new_value)
[docs] def create_external_integration(self, protocol):
"""Create an ExternalIntegration for this Collection.
To be used immediately after creating a new Collection,
e.g. in by_name_and_protocol, from_metadata_identifier, and
various test methods that create mock Collections.
If an external integration already exists, return it instead
of creating another one.
:param protocol: The protocol known to be in use when getting
licenses for this collection.
"""
_db = Session.object_session(self)
goal = ExternalIntegration.LICENSE_GOAL
external_integration, is_new = get_one_or_create(
_db, ExternalIntegration, id=self.external_integration_id,
create_method_kwargs=dict(protocol=protocol, goal=goal)
)
if external_integration.protocol != protocol:
raise ValueError(
"Located ExternalIntegration, but its protocol (%s) does not match desired protocol (%s)." % (
external_integration.protocol, protocol
)
)
self.external_integration_id = external_integration.id
return external_integration
@property
def external_integration(self):
"""Find the external integration for this Collection, assuming
it already exists.
This is generally a safe assumption since by_name_and_protocol and
from_metadata_identifier both create ExternalIntegrations for the
Collections they create.
"""
# We don't enforce this on the database level because it is
# legitimate for a newly created Collection to have no
# ExternalIntegration. But by the time it's being used for real,
# it needs to have one.
if not self.external_integration_id:
raise ValueError(
"No known external integration for collection %s" % self.name
)
return self._external_integration
@property
def unique_account_id(self):
"""Identifier that uniquely represents this Collection of works"""
if (self.data_source
and self.data_source.name in self.GLOBAL_COLLECTION_DATA_SOURCES
and not self.parent):
# Every top-level collection from this data source has the
# same catalog. Treat them all as one collection named
# after the data source.
unique_account_id = self.data_source.name
else:
unique_account_id = self.external_account_id
if not unique_account_id:
raise ValueError("Unique account identifier not set")
if self.parent:
return self.parent.unique_account_id + '+' + unique_account_id
return unique_account_id
@hybrid_property
def data_source(self):
"""Find the data source associated with this Collection.
Bibliographic metadata obtained through the collection
protocol is recorded as coming from this data source. A
LicensePool inserted into this collection will be associated
with this data source, unless its bibliographic metadata
indicates some other data source.
For most Collections, the integration protocol sets the data
source. For collections that use the OPDS import protocol,
the data source is a Collection-specific setting.
"""
data_source = None
name = ExternalIntegration.DATA_SOURCE_FOR_LICENSE_PROTOCOL.get(
self.protocol
)
if not name:
name = self.external_integration.setting(
Collection.DATA_SOURCE_NAME_SETTING
).value
_db = Session.object_session(self)
if name:
data_source = DataSource.lookup(_db, name, autocreate=True)
return data_source
@data_source.setter
def data_source(self, new_value):
if isinstance(new_value, DataSource):
new_value = new_value.name
if self.protocol == new_value:
return
# Only set a DataSource for Collections that don't have an
# implied source.
if self.protocol not in ExternalIntegration.DATA_SOURCE_FOR_LICENSE_PROTOCOL:
setting = self.external_integration.setting(
Collection.DATA_SOURCE_NAME_SETTING
)
if new_value is not None:
new_value = str(new_value)
setting.value = new_value
@property
def parents(self):
if self.parent_id:
_db = Session.object_session(self)
parent = Collection.by_id(_db, self.parent_id)
yield parent
for collection in parent.parents:
yield collection
@property
def metadata_identifier(self):
"""Identifier based on collection details that uniquely represents
this Collection on the metadata wrangler. This identifier is
composed of the Collection protocol and account identifier.
A circulation manager provides a Collection's metadata
identifier as part of collection registration. The metadata
wrangler creates a corresponding Collection on its side,
*named after* the metadata identifier -- regardless of the name
of that collection on the circulation manager side.
"""
account_id = self.unique_account_id
if self.protocol == ExternalIntegration.OPDS_IMPORT:
# Remove ending / from OPDS url that could duplicate the collection
# on the Metadata Wrangler.
while account_id.endswith('/'):
account_id = account_id[:-1]
encode = base64.urlsafe_b64encode
account_id = encode(account_id)
protocol = encode(self.protocol)
metadata_identifier = protocol + ':' + account_id
return encode(metadata_identifier)
[docs] def disassociate_library(self, library):
"""Disassociate a Library from this Collection and delete any relevant
ConfigurationSettings.
"""
if library is None or not library in self.libraries:
# No-op.
return
self.libraries.remove(library)
_db = Session.object_session(self)
qu = _db.query(
ConfigurationSetting
).filter(
ConfigurationSetting.library==library
).filter(
ConfigurationSetting.external_integration==self.external_integration
)
qu.delete()
@classmethod
def _decode_metadata_identifier(cls, metadata_identifier):
"""Invert the metadata_identifier property."""
if not metadata_identifier:
raise ValueError("No metadata identifier provided.")
try:
decode = base64.urlsafe_b64decode
details = decode(metadata_identifier)
encoded_details = details.split(':', 1)
[protocol, account_id] = [decode(d) for d in encoded_details]
except (TypeError, ValueError) as e:
raise ValueError(
"Metadata identifier '%s' is invalid: %s" % (
metadata_identifier, str(e)
)
)
return protocol, account_id
@property
def pools_with_no_delivery_mechanisms(self):
"""Find all LicensePools in this Collection that have no delivery
mechanisms whatsoever.
:return: A query object.
"""
_db = Session.object_session(self)
qu = LicensePool.with_no_delivery_mechanisms(_db)
return qu.filter(LicensePool.collection==self)
[docs] def explain(self, include_secrets=False):
"""Create a series of human-readable strings to explain a collection's
settings.
:param include_secrets: For security reasons,
sensitive settings such as passwords are not displayed by default.
:return: A list of explanatory strings.
"""
lines = []
if self.name:
lines.append('Name: "%s"' % self.name)
if self.parent:
lines.append('Parent: %s' % self.parent.name)
integration = self.external_integration
if integration.protocol:
lines.append('Protocol: "%s"' % integration.protocol)
for library in self.libraries:
lines.append('Used by library: "%s"' % library.short_name)
if self.external_account_id:
lines.append('External account ID: "%s"' % self.external_account_id)
for setting in sorted(integration.settings, key=lambda x: x.key):
if (include_secrets or not setting.is_secret) and setting.value is not None:
lines.append('Setting "%s": "%s"' % (setting.key, setting.value))
return lines
[docs] def catalog_identifier(self, identifier):
"""Inserts an identifier into a catalog"""
self.catalog_identifiers([identifier])
[docs] def catalog_identifiers(self, identifiers):
"""Inserts identifiers into the catalog"""
if not identifiers:
# Nothing to do.
return
_db = Session.object_session(identifiers[0])
already_in_catalog = _db.query(Identifier).join(
CollectionIdentifier
).filter(
CollectionIdentifier.collection_id==self.id
).filter(
Identifier.id.in_([x.id for x in identifiers])
).all()
new_catalog_entries = [
dict(collection_id=self.id, identifier_id=identifier.id)
for identifier in identifiers
if identifier not in already_in_catalog
]
_db.bulk_insert_mappings(CollectionIdentifier, new_catalog_entries)
_db.commit()
[docs] def unresolved_catalog(self, _db, data_source_name, operation):
"""Returns a query with all identifiers in a Collection's catalog that
have unsuccessfully attempted resolution. This method is used on the
metadata wrangler.
:return: a sqlalchemy.Query
"""
coverage_source = DataSource.lookup(_db, data_source_name)
is_not_resolved = and_(
CoverageRecord.operation==operation,
CoverageRecord.data_source_id==coverage_source.id,
CoverageRecord.status!=CoverageRecord.SUCCESS,
)
query = _db.query(Identifier)\
.outerjoin(Identifier.licensed_through)\
.outerjoin(Identifier.coverage_records)\
.outerjoin(LicensePool.work).outerjoin(Identifier.collections)\
.filter(
Collection.id==self.id, is_not_resolved, Work.id==None
).order_by(Identifier.id)
return query
[docs] def licensepools_with_works_updated_since(self, _db, timestamp):
"""Finds all LicensePools in a collection's catalog whose Works' OPDS
entries have been updated since the timestamp. Used by the
metadata wrangler.
:param _db: A database connection,
:param timestamp: A datetime.timestamp object
:return: a Query that yields LicensePools. The Work and
Identifier associated with each LicensePool have been
pre-loaded, giving the caller all the information
necessary to create full OPDS entries for the works.
"""
opds_operation = WorkCoverageRecord.GENERATE_OPDS_OPERATION
qu = _db.query(
LicensePool
).join(
LicensePool.work,
).join(
LicensePool.identifier,
).join(
Work.coverage_records,
).join(
CollectionIdentifier,
Identifier.id==CollectionIdentifier.identifier_id
)
qu = qu.filter(
WorkCoverageRecord.operation==opds_operation,
CollectionIdentifier.collection_id==self.id
)
qu = qu.options(
contains_eager(LicensePool.work),
contains_eager(LicensePool.identifier),
)
if timestamp:
qu = qu.filter(
WorkCoverageRecord.timestamp > timestamp
)
qu = qu.order_by(WorkCoverageRecord.timestamp)
return qu
[docs] def isbns_updated_since(self, _db, timestamp):
"""Finds all ISBNs in a collection's catalog that have been updated
since the timestamp but don't have a Work to show for it. Used in
the metadata wrangler.
:return: a Query
"""
isbns = _db.query(Identifier, func.max(CoverageRecord.timestamp).label('latest'))\
.join(Identifier.collections)\
.join(Identifier.coverage_records)\
.outerjoin(Identifier.licensed_through)\
.group_by(Identifier.id).order_by('latest')\
.filter(
Collection.id==self.id,
LicensePool.work_id==None,
CoverageRecord.status==CoverageRecord.SUCCESS,
).enable_eagerloads(False).options(joinedload(Identifier.coverage_records))
if timestamp:
isbns = isbns.filter(CoverageRecord.timestamp > timestamp)
return isbns
[docs] @classmethod
def restrict_to_ready_deliverable_works(
cls, query, collection_ids=None, show_suppressed=False,
allow_holds=True,
):
"""Restrict a query to show only presentation-ready works present in
an appropriate collection which the default client can
fulfill.
Note that this assumes the query has an active join against
LicensePool and Edition.
:param query: The query to restrict.
:param show_suppressed: Include titles that have nothing but
suppressed LicensePools.
:param collection_ids: Only include titles in the given
collections.
:param allow_holds: If false, pools with no available copies
will be hidden.
"""
# Only find presentation-ready works.
query = query.filter(Work.presentation_ready == True)
# Only find books that have some kind of DeliveryMechanism.
LPDM = LicensePoolDeliveryMechanism
exists_clause = exists().where(
and_(LicensePool.data_source_id==LPDM.data_source_id,
LicensePool.identifier_id==LPDM.identifier_id)
)
query = query.filter(exists_clause)
# Some sources of audiobooks may be excluded because the
# server can't fulfill them or the expected client can't play
# them.
_db = query.session
excluded = ConfigurationSetting.excluded_audio_data_sources(_db)
if excluded:
audio_excluded_ids = [
DataSource.lookup(_db, x).id for x in excluded
]
query = query.filter(
or_(Edition.medium != EditionConstants.AUDIO_MEDIUM,
~LicensePool.data_source_id.in_(audio_excluded_ids))
)
# Only find books with unsuppressed LicensePools.
if not show_suppressed:
query = query.filter(LicensePool.suppressed==False)
# Only find books with available licenses or books from self-hosted collections using MirrorUploader
query = query.filter(
or_(
LicensePool.licenses_owned > 0,
LicensePool.open_access,
LicensePool.unlimited_access,
LicensePool.self_hosted
)
)
# Only find books in an appropriate collection.
if collection_ids is not None:
query = query.filter(
LicensePool.collection_id.in_(collection_ids)
)
# If we don't allow holds, hide any books with no available copies.
if not allow_holds:
query = query.filter(
or_(
LicensePool.licenses_available > 0,
LicensePool.open_access,
LicensePool.self_hosted,
LicensePool.unlimited_access
)
)
return query
[docs] def delete(self, search_index=None):
"""Delete a collection.
Collections can have hundreds of thousands of
LicensePools. This deletes a collection gradually in a way
that can be confined to the background and survive interruption.
"""
if not self.marked_for_deletion:
raise Exception(
"Cannot delete %s: it is not marked for deletion." % self.name
)
_db = Session.object_session(self)
# Disassociate all libraries from this collection.
for library in self.libraries:
self.disassociate_library(library)
# Delete all the license pools. This should be the only part
# of the application where LicensePools are permanently
# deleted.
for i, pool in enumerate(self.licensepools):
work = pool.work
_db.delete(pool)
if not i % 100:
_db.commit()
if work and not work.license_pools:
work.delete(search_index)
# Delete the ExternalIntegration associated with this
# Collection, assuming it wasn't deleted already.
if self.external_integration:
_db.delete(self.external_integration)
# Now delete the Collection itself.
_db.delete(self)
_db.commit()
collections_libraries = Table(
'collections_libraries', Base.metadata,
Column(
'collection_id', Integer, ForeignKey('collections.id'),
index=True, nullable=False
),
Column(
'library_id', Integer, ForeignKey('libraries.id'),
index=True, nullable=False
),
UniqueConstraint('collection_id', 'library_id'),
)
collections_identifiers = Table(
'collections_identifiers', Base.metadata,
Column(
'collection_id', Integer, ForeignKey('collections.id'),
index=True, nullable=False
),
Column(
'identifier_id', Integer, ForeignKey('identifiers.id'),
index=True, nullable=False
),
UniqueConstraint('collection_id', 'identifier_id'),
)
# Create an ORM model for the collections_identifiers join table
# so it can be used in a bulk_insert_mappings call.
[docs]class CollectionIdentifier(object):
pass
[docs]class CollectionMissing(Exception):
"""An operation was attempted that can only happen within the context
of a Collection, but there was no Collection available.
"""
mapper(
CollectionIdentifier, collections_identifiers,
primary_key=(
collections_identifiers.columns.collection_id,
collections_identifiers.columns.identifier_id
)
)
collections_customlists = Table(
'collections_customlists', Base.metadata,
Column(
'collection_id', Integer, ForeignKey('collections.id'),
index=True, nullable=False,
),
Column(
'customlist_id', Integer, ForeignKey('customlists.id'),
index=True, nullable=False,
),
UniqueConstraint('collection_id', 'customlist_id'),
)
[docs]class HasExternalIntegrationPerCollection(metaclass=ABCMeta):
"""Interface allowing to get access to an external integration"""
[docs] @abstractmethod
def collection_external_integration(self, collection):
"""Returns an external integration associated with the collection
:param collection: Collection
:type collection: core.model.Collection
:return: External integration associated with the collection
:rtype: core.model.configuration.ExternalIntegration
"""
raise NotImplementedError()
[docs]class CollectionConfigurationStorage(BaseConfigurationStorage):
"""Serializes and deserializes values as library's configuration settings"""
def __init__(self, external_integration_association, collection):
"""Initializes a new instance of ConfigurationStorage class
:param external_integration_association: Association with an external integtation
:type external_integration_association: HasExternalIntegrationPerCollection
:param collection: Collection object
:type collection: Collection
"""
self._integration_owner = external_integration_association
self._collection_id = collection.id
[docs] def save(self, db, setting_name, value):
"""Save the value as as a new configuration setting
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param setting_name: Name of the library's configuration setting
:type setting_name: string
:param value: Value to be saved
:type value: Any
"""
collection = Collection.by_id(db, self._collection_id)
integration = self._integration_owner.collection_external_integration(collection)
ConfigurationSetting.for_externalintegration(
setting_name,
integration).value = value
[docs] def load(self, db, setting_name):
"""Loads and returns the library's configuration setting
:param db: Database session
:type db: sqlalchemy.orm.session.Session
:param setting_name: Name of the library's configuration setting
:type setting_name: string
:return: Any
"""
collection = Collection.by_id(db, self._collection_id)
integration = self._integration_owner.collection_external_integration(collection)
value = ConfigurationSetting.for_externalintegration(
setting_name,
integration).value
return value