# encoding: utf-8
# WorkGenre, Work
import logging
from collections import Counter
from sqlalchemy import (
Boolean,
Column,
DateTime,
Enum,
Float,
ForeignKey,
Integer,
Numeric,
String,
Unicode,
)
from sqlalchemy.dialects.postgresql import INT4RANGE
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import (
contains_eager,
relationship,
)
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import (
and_,
or_,
select,
join,
literal_column,
case,
)
from sqlalchemy.sql.functions import func
from .constants import (
DataSourceConstants,
)
from .contributor import (
Contribution,
Contributor,
)
from .coverage import (
CoverageRecord,
WorkCoverageRecord,
)
from .datasource import DataSource
from .edition import Edition
from .identifier import Identifier
from .measurement import Measurement
from . import (
Base,
flush,
get_one_or_create,
numericrange_to_string,
numericrange_to_tuple,
PresentationCalculationPolicy,
tuple_to_numericrange,
)
from ..classifier import (
Classifier,
WorkClassifier,
)
from ..config import CannotLoadConfiguration
from ..util import LanguageCodes
from ..util.datetime_helpers import utc_now
[docs]class WorkGenre(Base):
"""An assignment of a genre to a work."""
__tablename__ = 'workgenres'
id = Column(Integer, primary_key=True)
genre_id = Column(Integer, ForeignKey('genres.id'), index=True)
work_id = Column(Integer, ForeignKey('works.id'), index=True)
affinity = Column(Float, index=True, default=0)
[docs] @classmethod
def from_genre(cls, genre):
wg = WorkGenre()
wg.genre = genre
return wg
def __repr__(self):
return "%s (%d%%)" % (self.genre.name, self.affinity*100)
[docs]class Work(Base):
APPEALS_URI = "http://librarysimplified.org/terms/appeals/"
CHARACTER_APPEAL = "Character"
LANGUAGE_APPEAL = "Language"
SETTING_APPEAL = "Setting"
STORY_APPEAL = "Story"
UNKNOWN_APPEAL = "Unknown"
NOT_APPLICABLE_APPEAL = "Not Applicable"
NO_APPEAL = "None"
CURRENTLY_AVAILABLE = "currently_available"
ALL = "all"
# If no quality data is available for a work, it will be assigned
# a default quality based on where we got it.
#
# The assumption is that a librarian would not have ordered a book
# if it didn't meet a minimum level of quality.
#
# For data sources where librarians tend to order big packages of
# books instead of selecting individual titles, the default
# quality is lower. For data sources where there is no curation at
# all, the default quality is zero.
#
# If there is absolutely no way to get quality data for a curated
# data source, each work is assigned the minimum level of quality
# necessary to show up in featured feeds.
default_quality_by_data_source = {
DataSourceConstants.GUTENBERG: 0,
DataSourceConstants.RB_DIGITAL: 0.4,
DataSourceConstants.OVERDRIVE: 0.4,
DataSourceConstants.BIBLIOTHECA : 0.65,
DataSourceConstants.AXIS_360: 0.65,
DataSourceConstants.STANDARD_EBOOKS: 0.8,
DataSourceConstants.UNGLUE_IT: 0.4,
DataSourceConstants.PLYMPTON: 0.5,
}
__tablename__ = 'works'
id = Column(Integer, primary_key=True)
# One Work may have copies scattered across many LicensePools.
license_pools = relationship("LicensePool", backref="work", lazy='joined')
# A Work takes its presentation metadata from a single Edition.
# But this Edition is a composite of provider, metadata wrangler, admin interface, etc.-derived Editions.
presentation_edition_id = Column(Integer, ForeignKey('editions.id'), index=True)
# One Work may have many associated WorkCoverageRecords.
coverage_records = relationship(
"WorkCoverageRecord", backref="work",
cascade="all, delete-orphan"
)
# One Work may be associated with many CustomListEntries.
# However, a CustomListEntry may lose its Work without
# ceasing to exist.
custom_list_entries = relationship('CustomListEntry', backref='work')
# One Work may have multiple CachedFeeds, and if a CachedFeed
# loses its Work, it ceases to exist.
cached_feeds = relationship(
'CachedFeed', backref='work', cascade="all, delete-orphan"
)
# One Work may participate in many WorkGenre assignments.
genres = association_proxy('work_genres', 'genre',
creator=WorkGenre.from_genre)
work_genres = relationship("WorkGenre", backref="work",
cascade="all, delete-orphan")
audience = Column(Unicode, index=True)
target_age = Column(INT4RANGE, index=True)
fiction = Column(Boolean, index=True)
summary_id = Column(
Integer, ForeignKey(
'resources.id', use_alter=True, name='fk_works_summary_id'),
index=True)
# This gives us a convenient place to store a cleaned-up version of
# the content of the summary Resource.
summary_text = Column(Unicode)
# The overall suitability of this work for unsolicited
# presentation to a patron. This is a calculated value taking both
# rating and popularity into account.
quality = Column(Numeric(4,3), index=True)
# The overall rating given to this work.
rating = Column(Float, index=True)
# The overall current popularity of this work.
popularity = Column(Float, index=True)
appeal_type = Enum(CHARACTER_APPEAL, LANGUAGE_APPEAL, SETTING_APPEAL,
STORY_APPEAL, NOT_APPLICABLE_APPEAL, NO_APPEAL,
UNKNOWN_APPEAL, name="appeal")
primary_appeal = Column(appeal_type, default=None, index=True)
secondary_appeal = Column(appeal_type, default=None, index=True)
appeal_character = Column(Float, default=None, index=True)
appeal_language = Column(Float, default=None, index=True)
appeal_setting = Column(Float, default=None, index=True)
appeal_story = Column(Float, default=None, index=True)
# The last time the availability or metadata changed for this Work.
last_update_time = Column(DateTime(timezone=True), index=True)
# This is set to True once all metadata and availability
# information has been obtained for this Work. Until this is True,
# the work will not show up in feeds.
presentation_ready = Column(Boolean, default=False, index=True)
# This is the last time we tried to make this work presentation ready.
presentation_ready_attempt = Column(DateTime(timezone=True), default=None, index=True)
# This is the error that occured while trying to make this Work
# presentation ready. Until this is cleared, no further attempt
# will be made to make the Work presentation ready.
presentation_ready_exception = Column(Unicode, default=None, index=True)
# A precalculated OPDS entry containing all metadata about this
# work that would be relevant to display to a library patron.
simple_opds_entry = Column(Unicode, default=None)
# A precalculated OPDS entry containing all metadata about this
# work that would be relevant to display in a machine-to-machine
# integration context.
verbose_opds_entry = Column(Unicode, default=None)
# A precalculated MARC record containing metadata about this
# work that would be relevant to display in a library's public
# catalog.
marc_record = Column(String, default=None)
# These fields are potentially large and can be deferred if you
# don't need all the data in a Work.
LARGE_FIELDS = [
'simple_opds_entry', 'verbose_opds_entry', 'marc_record',
'summary_text',
]
@property
def title(self):
if self.presentation_edition:
return self.presentation_edition.title
return None
@property
def sort_title(self):
if not self.presentation_edition:
return None
return self.presentation_edition.sort_title or self.presentation_edition.title
@property
def subtitle(self):
if not self.presentation_edition:
return None
return self.presentation_edition.subtitle
@property
def series(self):
if not self.presentation_edition:
return None
return self.presentation_edition.series
@property
def series_position(self):
if not self.presentation_edition:
return None
return self.presentation_edition.series_position
@property
def author(self):
if self.presentation_edition:
return self.presentation_edition.author
return None
@property
def sort_author(self):
if not self.presentation_edition:
return None
return self.presentation_edition.sort_author or self.presentation_edition.author
@property
def language(self):
if self.presentation_edition:
return self.presentation_edition.language
return None
@property
def language_code(self):
if not self.presentation_edition:
return None
return self.presentation_edition.language_code
@property
def publisher(self):
if not self.presentation_edition:
return None
return self.presentation_edition.publisher
@property
def imprint(self):
if not self.presentation_edition:
return None
return self.presentation_edition.imprint
@property
def cover_full_url(self):
if not self.presentation_edition:
return None
return self.presentation_edition.cover_full_url
@property
def cover_thumbnail_url(self):
if not self.presentation_edition:
return None
return self.presentation_edition.cover_thumbnail_url
@property
def target_age_string(self):
return numericrange_to_string(self.target_age)
@property
def has_open_access_license(self):
return any(x.open_access for x in self.license_pools)
@property
def complaints(self):
complaints = list()
[complaints.extend(pool.complaints) for pool in self.license_pools]
return complaints
def __repr__(self):
return '<Work #%s "%s" (by %s) %s lang=%s (%s lp)>' % (
self.id, self.title, self.author,
", ".join([g.name for g in self.genres]), self.language,
len(self.license_pools)
)
[docs] @classmethod
def missing_coverage_from(
cls, _db, operation=None, count_as_covered=None,
count_as_missing_before=None
):
"""Find Works which have no WorkCoverageRecord for the given
`operation`.
"""
clause = and_(Work.id==WorkCoverageRecord.work_id,
WorkCoverageRecord.operation==operation)
q = _db.query(Work).outerjoin(WorkCoverageRecord, clause)
missing = WorkCoverageRecord.not_covered(
count_as_covered, count_as_missing_before
)
q2 = q.filter(missing)
return q2
[docs] @classmethod
def for_unchecked_subjects(cls, _db):
from .classification import (
Classification,
Subject,
)
from .licensing import LicensePool
"""Find all Works whose LicensePools have an Identifier that
is classified under an unchecked Subject.
This is a good indicator that the Work needs to be
reclassified.
"""
qu = _db.query(Work).join(Work.license_pools).join(
LicensePool.identifier).join(
Identifier.classifications).join(
Classification.subject)
return qu.filter(Subject.checked==False).order_by(Subject.id)
@classmethod
def _potential_open_access_works_for_permanent_work_id(
cls, _db, pwid, medium, language
):
"""Find all Works that might be suitable for use as the
canonical open-access Work for the given `pwid`, `medium`,
and `language`.
:return: A 2-tuple (pools, counts_by_work). `pools` is a set
containing all affected LicensePools; `counts_by_work is a
Counter tallying the number of affected LicensePools
associated with a given work.
"""
from .licensing import LicensePool
qu = _db.query(LicensePool).join(
LicensePool.presentation_edition).filter(
LicensePool.open_access==True
).filter(
Edition.permanent_work_id==pwid
).filter(
Edition.medium==medium
).filter(
Edition.language==language
)
pools = set(qu.all())
# Build the Counter of Works that are eligible to represent
# this pwid/medium/language combination.
affected_licensepools_for_work = Counter()
for lp in pools:
work = lp.work
if not lp.work:
continue
if affected_licensepools_for_work[lp.work]:
# We already got this information earlier in the loop.
continue
pe = work.presentation_edition
if pe and (
pe.language != language or pe.medium != medium
or pe.permanent_work_id != pwid
):
# This Work's presentation edition doesn't match
# this LicensePool's presentation edition.
# It would be better to create a brand new Work and
# remove this LicensePool from its current Work.
continue
affected_licensepools_for_work[lp.work] = len(
[x for x in pools if x.work == lp.work]
)
return pools, affected_licensepools_for_work
[docs] @classmethod
def open_access_for_permanent_work_id(cls, _db, pwid, medium, language):
"""Find or create the Work encompassing all open-access LicensePools
whose presentation Editions have the given permanent work ID,
the given medium, and the given language.
This may result in the consolidation or splitting of Works, if
a book's permanent work ID has changed without
calculate_work() being called, or if the data is in an
inconsistent state for any other reason.
"""
is_new = False
licensepools, licensepools_for_work = cls._potential_open_access_works_for_permanent_work_id(
_db, pwid, medium, language
)
if not licensepools:
# There is no work for this PWID/medium/language combination
# because no LicensePools offer it.
return None, is_new
work = None
if len(licensepools_for_work) == 0:
# None of these LicensePools have a Work. Create a new one.
work = Work()
is_new = True
else:
# Pick the Work with the most LicensePools.
work, count = licensepools_for_work.most_common(1)[0]
# In the simple case, there will only be the one Work.
if len(licensepools_for_work) > 1:
# But in this case, for whatever reason (probably bad
# data caused by a bug) there's more than one
# Work. Merge the other Works into the one we chose
# earlier. (This is why we chose the work with the
# most LicensePools--it minimizes the disruption
# here.)
# First, make sure this Work is the exclusive
# open-access work for its permanent work ID.
# Otherwise the merge may fail.
work.make_exclusive_open_access_for_permanent_work_id(
pwid, medium, language
)
for needs_merge in list(licensepools_for_work.keys()):
if needs_merge != work:
# Make sure that Work we're about to merge has
# nothing but LicensePools whose permanent
# work ID matches the permanent work ID of the
# Work we're about to merge into.
needs_merge.make_exclusive_open_access_for_permanent_work_id(pwid, medium, language)
needs_merge.merge_into(work)
# At this point we have one, and only one, Work for this
# permanent work ID. Assign it to every LicensePool whose
# presentation Edition has that permanent work ID/medium/language
# combination.
for lp in licensepools:
lp.work = work
return work, is_new
[docs] def make_exclusive_open_access_for_permanent_work_id(self, pwid, medium, language):
"""Ensure that every open-access LicensePool associated with this Work
has the given PWID and medium. Any non-open-access
LicensePool, and any LicensePool with a different PWID or a
different medium, is kicked out and assigned to a different
Work. LicensePools with no presentation edition or no PWID
are kicked out.
In most cases this Work will be the _only_ work for this PWID,
but inside open_access_for_permanent_work_id this is called as
a preparatory step for merging two Works, and after the call
(but before the merge) there may be two Works for a given PWID.
"""
_db = Session.object_session(self)
for pool in list(self.license_pools):
other_work = is_new = None
if not pool.open_access:
# This needs to have its own Work--we don't mix
# open-access and commercial versions of the same book.
pool.work = None
if pool.presentation_edition:
pool.presentation_edition.work = None
other_work, is_new = pool.calculate_work()
elif not pool.presentation_edition:
# A LicensePool with no presentation edition
# cannot have an associated Work.
logging.warning(
"LicensePool %r has no presentation edition, setting .work to None.",
pool
)
pool.work = None
else:
e = pool.presentation_edition
this_pwid = e.permanent_work_id
if not this_pwid:
# A LicensePool with no permanent work ID
# cannot have an associated Work.
logging.warning(
"Presentation edition for LicensePool %r has no PWID, setting .work to None.",
pool
)
e.work = None
pool.work = None
continue
if this_pwid != pwid or e.medium != medium or e.language != language:
# This LicensePool should not belong to this Work.
# Make sure it gets its own Work, creating a new one
# if necessary.
pool.work = None
pool.presentation_edition.work = None
other_work, is_new = Work.open_access_for_permanent_work_id(
_db, this_pwid, e.medium, e.language
)
if other_work and is_new:
other_work.calculate_presentation()
@property
def pwids(self):
"""Return the set of permanent work IDs associated with this Work.
There should only be one permanent work ID associated with a
given work, but if there is more than one, this will find all
of them.
"""
pwids = set()
for pool in self.license_pools:
if pool.presentation_edition and pool.presentation_edition.permanent_work_id:
pwids.add(pool.presentation_edition.permanent_work_id)
return pwids
[docs] def merge_into(self, other_work):
"""Merge this Work into another Work and delete it."""
# Neither the source nor the destination work may have any
# non-open-access LicensePools.
for w in self, other_work:
for pool in w.license_pools:
if not pool.open_access:
raise ValueError(
"Refusing to merge %r into %r because it would put an open-access LicensePool into the same work as a non-open-access LicensePool." %
(self, other_work)
)
my_pwids = self.pwids
other_pwids = other_work.pwids
if not my_pwids == other_pwids:
raise ValueError(
"Refusing to merge %r into %r because permanent work IDs don't match: %s vs. %s" % (
self, other_work, ",".join(sorted(my_pwids)),
",".join(sorted(other_pwids))
)
)
# Every LicensePool associated with this work becomes
# associated instead with the other work.
for pool in self.license_pools:
other_work.license_pools.append(pool)
# All WorkGenres and WorkCoverageRecords for this Work are
# deleted. (WorkGenres are deleted via cascade.)
_db = Session.object_session(self)
for cr in self.coverage_records:
_db.delete(cr)
_db.delete(self)
other_work.calculate_presentation()
[docs] def set_summary(self, resource):
self.summary = resource
# TODO: clean up the content
if resource and resource.representation:
self.summary_text = resource.representation.unicode_content
else:
self.summary_text = ""
WorkCoverageRecord.add_for(
self, operation=WorkCoverageRecord.SUMMARY_OPERATION
)
[docs] @classmethod
def with_genre(cls, _db, genre):
"""Find all Works classified under the given genre."""
from .classification import Genre
if isinstance(genre, (bytes, str)):
genre, ignore = Genre.lookup(_db, genre)
return _db.query(Work).join(WorkGenre).filter(WorkGenre.genre==genre)
[docs] @classmethod
def with_no_genres(self, q):
"""Modify a query so it finds only Works that are not classified under
any genre."""
q = q.outerjoin(Work.work_genres)
q = q.options(contains_eager(Work.work_genres))
q = q.filter(WorkGenre.genre==None)
return q
[docs] @classmethod
def from_identifiers(cls, _db, identifiers, base_query=None, policy=None):
"""Returns all of the works that have one or more license_pools
associated with either an identifier in the given list or an
identifier considered equivalent to one of those listed.
:param policy: A PresentationCalculationPolicy, used to
determine how far to go when looking for equivalent
Identifiers. By default, this method will be very strict
about equivalencies.
"""
from .licensing import LicensePool
identifier_ids = [identifier.id for identifier in identifiers]
if not identifier_ids:
return None
if not base_query:
# A raw base query that makes no accommodations for works that are
# suppressed or otherwise undeliverable.
base_query = _db.query(Work).join(Work.license_pools).\
join(LicensePool.identifier)
if policy is None:
policy = PresentationCalculationPolicy(
equivalent_identifier_levels=1,
equivalent_identifier_threshold=0.999
)
identifier_ids_subquery = Identifier.recursively_equivalent_identifier_ids_query(
Identifier.id, policy=policy)
identifier_ids_subquery = identifier_ids_subquery.where(Identifier.id.in_(identifier_ids))
query = base_query.filter(Identifier.id.in_(identifier_ids_subquery))
return query
[docs] @classmethod
def reject_covers(cls, _db, works_or_identifiers,
search_index_client=None):
"""Suppresses the currently visible covers of a number of Works"""
from .licensing import LicensePool
from .resource import (
Resource,
Hyperlink,
)
works = list(set(works_or_identifiers))
if not isinstance(works[0], cls):
# This assumes that everything in the provided list is the
# same class: either Work or Identifier.
works = cls.from_identifiers(_db, works_or_identifiers).all()
work_ids = [w.id for w in works]
if len(works) == 1:
logging.info("Suppressing cover for %r", works[0])
else:
logging.info("Supressing covers for %i Works", len(works))
cover_urls = list()
for work in works:
# Create a list of the URLs of the works' active cover images.
edition = work.presentation_edition
if edition:
if edition.cover_full_url:
cover_urls.append(edition.cover_full_url)
if edition.cover_thumbnail_url:
cover_urls.append(edition.cover_thumbnail_url)
if not cover_urls:
# All of the target Works have already had their
# covers suppressed. Nothing to see here.
return
covers = _db.query(Resource).join(Hyperlink.identifier).\
join(Identifier.licensed_through).filter(
Resource.url.in_(cover_urls),
LicensePool.work_id.in_(work_ids)
)
editions = list()
for cover in covers:
# Record a downvote that will dismiss the Resource.
cover.reject()
if len(cover.cover_editions) > 1:
editions += cover.cover_editions
flush(_db)
editions = list(set(editions))
if editions:
# More Editions and Works have been impacted by this cover
# suppression.
works += [ed.work for ed in editions if ed.work]
editions = [ed for ed in editions if not ed.work]
# Remove the cover from the Work and its Edition and reset
# cached OPDS entries.
policy = PresentationCalculationPolicy.reset_cover()
for work in works:
work.calculate_presentation(
policy=policy, search_index_client=search_index_client
)
for edition in editions:
edition.calculate_presentation(policy=policy)
_db.commit()
[docs] def reject_cover(self, search_index_client=None):
"""Suppresses the current cover of the Work"""
_db = Session.object_session(self)
self.suppress_covers(
_db, [self], search_index_client=search_index_client
)
[docs] def all_editions(self, policy=None):
"""All Editions identified by an Identifier equivalent to
the identifiers of this Work's license pools.
:param policy: A PresentationCalculationPolicy, used to
determine how far to go when looking for equivalent
Identifiers.
"""
from .licensing import LicensePool
_db = Session.object_session(self)
identifier_ids_subquery = Identifier.recursively_equivalent_identifier_ids_query(
LicensePool.identifier_id, policy=policy
)
identifier_ids_subquery = identifier_ids_subquery.where(LicensePool.work_id==self.id)
q = _db.query(Edition).filter(
Edition.primary_identifier_id.in_(identifier_ids_subquery)
)
return q
@property
def _direct_identifier_ids(self):
"""Return all Identifier IDs associated with one of this
Work's LicensePools.
"""
return [
lp.identifier.id for lp in self.license_pools
if lp.identifier
]
[docs] def all_identifier_ids(self, policy=None):
"""Return all Identifier IDs associated with this Work.
:param policy: A `PresentationCalculationPolicy`.
:return: A set containing all Identifier IDs associated
with this Work (as per the rules set down in `policy`).
"""
_db = Session.object_session(self)
# Get a dict that maps identifier ids to lists of their equivalents.
equivalent_lists = Identifier.recursively_equivalent_identifier_ids(
_db, self._direct_identifier_ids, policy=policy
)
all_identifier_ids = set()
for equivs in list(equivalent_lists.values()):
all_identifier_ids.update(equivs)
return all_identifier_ids
@property
def language_code(self):
"""A single 2-letter language code for display purposes."""
if not self.language:
return None
language = self.language
if language in LanguageCodes.three_to_two:
language = LanguageCodes.three_to_two[language]
return language
[docs] def age_appropriate_for_patron(self, patron):
"""Is this Work age-appropriate for the given Patron?
:param patron: A Patron.
:return: A boolean
"""
if patron is None:
return True
return patron.work_is_age_appropriate(self.audience, self.target_age)
[docs] def set_presentation_edition(self, new_presentation_edition):
""" Sets presentation edition and lets owned pools and editions know.
Raises exception if edition to set to is None.
"""
# only bother if something changed, or if were explicitly told to
# set (useful for setting to None)
if not new_presentation_edition:
error_message = "Trying to set presentation_edition to None on Work [%s]" % self.id
raise ValueError(error_message)
self.presentation_edition = new_presentation_edition
# if the edition is the presentation edition for any license
# pools, let them know they have a Work.
for pool in self.presentation_edition.is_presentation_for:
pool.work = self
[docs] def calculate_presentation_edition(self, policy=None):
""" Which of this Work's Editions should be used as the default?
First, every LicensePool associated with this work must have
its presentation edition set.
Then, we go through the pools, see which has the best presentation edition,
and make it our presentation edition.
"""
changed = False
policy = policy or PresentationCalculationPolicy()
if not policy.choose_edition:
return changed
# For each owned edition, see if its LicensePool was superceded or suppressed
# if yes, the edition is unlikely to be the best.
# An open access pool may be "superceded", if there's a better-quality
# open-access pool available.
self.mark_licensepools_as_superceded()
edition_metadata_changed = False
old_presentation_edition = self.presentation_edition
new_presentation_edition = None
for pool in self.license_pools:
# a superceded pool's composite edition is not good enough
# Note: making the assumption here that we won't have a situation
# where we marked all of the work's pools as superceded or suppressed.
if pool.superceded or pool.suppressed:
continue
# make sure the pool has most up-to-date idea of its presentation edition,
# and then ask what it is.
pool_edition_changed = pool.set_presentation_edition()
edition_metadata_changed = (
edition_metadata_changed or
pool_edition_changed
)
potential_presentation_edition = pool.presentation_edition
# We currently have no real way to choose between
# competing presentation editions. But it doesn't matter much
# because in the current system there should never be more
# than one non-superceded license pool per Work.
#
# So basically we pick the first available edition and
# make it the presentation edition.
if (not new_presentation_edition
or (potential_presentation_edition is old_presentation_edition and old_presentation_edition)):
# We would prefer not to change the Work's presentation
# edition unnecessarily, so if the current presentation
# edition is still an option, choose it.
new_presentation_edition = potential_presentation_edition
if ((self.presentation_edition != new_presentation_edition) and new_presentation_edition != None):
# did we find a pool whose presentation edition was better than the work's?
self.set_presentation_edition(new_presentation_edition)
# tell everyone else we tried to set work's presentation edition
WorkCoverageRecord.add_for(
self, operation=WorkCoverageRecord.CHOOSE_EDITION_OPERATION
)
changed = (
edition_metadata_changed or
old_presentation_edition != self.presentation_edition
)
return changed
def _get_default_audience(self):
"""Return the default audience.
:return: Default audience
:rtype: Optional[str]
"""
for license_pool in self.license_pools:
if license_pool.collection.default_audience:
return license_pool.collection.default_audience
return None
[docs] def calculate_presentation(
self, policy=None, search_index_client=None, exclude_search=False,
default_fiction=None, default_audience=None
):
"""Make a Work ready to show to patrons.
Call calculate_presentation_edition() to find the best-quality presentation edition
that could represent this work.
Then determine the following information, global to the work:
* Subject-matter classifications for the work.
* Whether or not the work is fiction.
* The intended audience for the work.
* The best available summary for the work.
* The overall popularity of the work.
"""
if not default_audience:
default_audience = self._get_default_audience()
# Gather information up front so we can see if anything
# actually changed.
changed = False
edition_changed = False
classification_changed = False
policy = policy or PresentationCalculationPolicy()
edition_changed = self.calculate_presentation_edition(policy)
if not self.presentation_edition:
# Without a presentation edition, we can't calculate presentation
# for the work.
return
if policy.choose_cover or policy.set_edition_metadata:
cover_changed = self.presentation_edition.calculate_presentation(policy)
edition_changed = edition_changed or cover_changed
summary = self.summary
summary_text = self.summary_text
quality = self.quality
# If we find a cover or description that comes direct from a
# license source, it may short-circuit the process of finding
# a good cover or description.
licensed_data_sources = set()
for pool in self.license_pools:
# Descriptions from Gutenberg are useless, so we
# specifically exclude it from being a privileged data
# source.
if pool.data_source.name != DataSourceConstants.GUTENBERG:
licensed_data_sources.add(pool.data_source)
if policy.classify or policy.choose_summary or policy.calculate_quality:
# Find all related IDs that might have associated descriptions,
# classifications, or measurements.
_db = Session.object_session(self)
direct_identifier_ids = self._direct_identifier_ids
all_identifier_ids = self.all_identifier_ids(policy=policy)
else:
# Don't bother.
direct_identifier_ids = all_identifier_ids = []
if policy.classify:
classification_changed = self.assign_genres(
all_identifier_ids,
default_fiction=default_fiction,
default_audience=default_audience
)
WorkCoverageRecord.add_for(
self, operation=WorkCoverageRecord.CLASSIFY_OPERATION
)
if policy.choose_summary:
self._choose_summary(
direct_identifier_ids, all_identifier_ids,
licensed_data_sources
)
if policy.calculate_quality:
# In the absense of other data, we will make a rough
# judgement as to the quality of a book based on the
# license source. Commercial data sources have higher
# default quality, because it's presumed that a librarian
# put some work into deciding which books to buy.
default_quality = None
for source in licensed_data_sources:
q = self.default_quality_by_data_source.get(
source.name, None
)
if q is None:
continue
if default_quality is None or q > default_quality:
default_quality = q
if not default_quality:
# if we still haven't found anything of a quality measurement,
# then at least make it an integer zero, not none.
default_quality = 0
self.calculate_quality(
all_identifier_ids, default_quality
)
if self.summary_text:
if isinstance(self.summary_text, str):
new_summary_text = self.summary_text
else:
new_summary_text = self.summary_text.decode("utf8")
else:
new_summary_text = self.summary_text
changed = (
edition_changed or
classification_changed or
summary != self.summary or
summary_text != new_summary_text or
float(quality) != float(self.quality)
)
if changed:
# last_update_time tracks the last time the data actually
# changed, not the last time we checked whether or not to
# change it.
self.last_update_time = utc_now()
if changed or policy.regenerate_opds_entries:
self.calculate_opds_entries()
if changed or policy.regenerate_marc_record:
self.calculate_marc_record()
if (changed or policy.update_search_index) and not exclude_search:
self.external_index_needs_updating()
# Now that everything's calculated, print it out.
if policy.verbose:
if changed:
changed = "changed"
representation = self.detailed_representation
else:
# TODO: maybe change changed to a boolean, and return it as method result
changed = "unchanged"
representation = repr(self)
logging.info("Presentation %s for work: %s", changed, representation)
# We want works to be presentation-ready as soon as possible,
# unless they are missing crucial information like language or
# title.
self.set_presentation_ready_based_on_content()
def _choose_summary(
self, direct_identifier_ids, all_identifier_ids,
licensed_data_sources
):
"""Helper method for choosing a summary as part of presentation
calculation.
Summaries closer to a LicensePool, or from a more trusted source
will be preferred.
:param direct_identifier_ids: All IDs of Identifiers of LicensePools
directly associated with this Work. Summaries associated with
these IDs will be preferred. In the real world, this will happen
almost all the time.
:param all_identifier_ids: All IDs of Identifiers of
LicensePools associated (directly or indirectly) with this
Work. Summaries associated with these IDs will be
used only if none are found from direct_identifier_ids.
:param licensed_data_sources: A list of DataSources that should be
given priority -- either because they provided the books or because
they are trusted sources such as library staff.
"""
_db = Session.object_session(self)
staff_data_source = DataSource.lookup(
_db, DataSourceConstants.LIBRARY_STAFF
)
data_sources = [staff_data_source, licensed_data_sources]
summary = None
for id_set in (direct_identifier_ids, all_identifier_ids):
summary, summaries = Identifier.evaluate_summary_quality(
_db, id_set, data_sources
)
if summary:
# We found a summary.
break
self.set_summary(summary)
@property
def detailed_representation(self):
"""A description of this work more detailed than repr()"""
l = ["%s (by %s)" % (self.title, self.author)]
l.append(" language=%s" % self.language)
l.append(" quality=%s" % self.quality)
if self.presentation_edition and self.presentation_edition.primary_identifier:
primary_identifier = self.presentation_edition.primary_identifier
else:
primary_identifier=None
l.append(" primary id=%s" % primary_identifier)
if self.fiction:
fiction = "Fiction"
elif self.fiction == False:
fiction = "Nonfiction"
else:
fiction = "???"
if self.target_age and (self.target_age.upper or self.target_age.lower):
target_age = " age=" + self.target_age_string
else:
target_age = ""
l.append(" %(fiction)s a=%(audience)s%(target_age)r" % (
dict(fiction=fiction,
audience=self.audience, target_age=target_age)))
l.append(" " + ", ".join(repr(wg) for wg in self.work_genres))
if self.cover_full_url:
l.append(" Full cover: %s" % self.cover_full_url)
else:
l.append(" No full cover.")
if self.cover_thumbnail_url:
l.append(" Cover thumbnail: %s" % self.cover_thumbnail_url)
else:
l.append(" No thumbnail cover.")
downloads = []
expect_downloads = False
for pool in self.license_pools:
if pool.superceded:
continue
if pool.open_access:
expect_downloads = True
for lpdm in pool.delivery_mechanisms:
if lpdm.resource and lpdm.resource.final_url:
downloads.append(lpdm.resource)
if downloads:
l.append(" Open-access downloads:")
for r in downloads:
l.append(" " + r.final_url)
elif expect_downloads:
l.append(" Expected open-access downloads but found none.")
def _ensure(s):
if not s:
return ""
elif isinstance(s, str):
return s
else:
return s.decode("utf8", "replace")
if self.summary and self.summary.representation:
snippet = _ensure(self.summary.representation.content)[:100]
d = " Description (%.2f) %s" % (self.summary.quality, snippet)
l.append(d)
l = [_ensure(s) for s in l]
return "\n".join(l)
[docs] def calculate_opds_entries(self, verbose=True):
from ..opds import (
AcquisitionFeed,
Annotator,
VerboseAnnotator,
)
_db = Session.object_session(self)
simple = AcquisitionFeed.single_entry(
_db, self, Annotator, force_create=True
)
if verbose is True:
verbose = AcquisitionFeed.single_entry(
_db, self, VerboseAnnotator, force_create=True
)
WorkCoverageRecord.add_for(
self, operation=WorkCoverageRecord.GENERATE_OPDS_OPERATION
)
[docs] def calculate_marc_record(self):
from ..marc import (
Annotator,
MARCExporter
)
_db = Session.object_session(self)
record = MARCExporter.create_record(
self, annotator=Annotator, force_create=True)
WorkCoverageRecord.add_for(
self, operation=WorkCoverageRecord.GENERATE_MARC_OPERATION
)
[docs] def active_license_pool(self):
# The active license pool is the one that *would* be
# associated with a loan, were a loan to be issued right
# now.
active_license_pool = None
for p in self.license_pools:
if p.superceded:
continue
edition = p.presentation_edition
if p.open_access:
if p.best_open_access_link:
active_license_pool = p
# We have an unlimited source for this book.
# There's no need to keep looking.
break
elif p.unlimited_access or p.self_hosted:
active_license_pool = p
elif edition and edition.title and p.licenses_owned > 0:
active_license_pool = p
return active_license_pool
def _reset_coverage(self, operation):
"""Put this work's WorkCoverageRecord for the given `operation`
into the REGISTERED state.
This is useful for erasing the record of work that was done,
so that automated scripts know the work needs to be done
again.
:return: A WorkCoverageRecord.
"""
_db = Session.object_session(self)
record, is_new = WorkCoverageRecord.add_for(
self, operation=operation, status=CoverageRecord.REGISTERED
)
return record
[docs] def external_index_needs_updating(self):
"""Mark this work as needing to have its search document reindexed.
This is a more efficient alternative to reindexing immediately,
since these WorkCoverageRecords are handled in large batches.
"""
return self._reset_coverage(
WorkCoverageRecord.UPDATE_SEARCH_INDEX_OPERATION
)
[docs] def update_external_index(self, client, add_coverage_record=True):
"""Create a WorkCoverageRecord so that this work's
entry in the search index can be modified or deleted.
This method is deprecated -- call
external_index_needs_updating() instead.
"""
self.external_index_needs_updating()
[docs] def needs_full_presentation_recalculation(self):
"""Mark this work as needing to have its presentation completely
recalculated.
This shifts the time spent recalculating presentation to a
script dedicated to this purpose, rather than a script that
interacts with APIs. It's also more efficient, since a work
might be flagged multiple times before we actually get around
to recalculating the presentation.
"""
return self._reset_coverage(WorkCoverageRecord.CLASSIFY_OPERATION)
[docs] def needs_new_presentation_edition(self):
"""Mark this work as needing to have its presentation edition
regenerated. This is significantly less work than
calling needs_full_presentation_recalculation, but it will
not update a Work's quality score, summary, or genre classification.
"""
return self._reset_coverage(WorkCoverageRecord.CHOOSE_EDITION_OPERATION)
[docs] def set_presentation_ready(
self, as_of=None, search_index_client=None, exclude_search=False
):
"""Set this work as presentation-ready, no matter what.
This assumes that we know the work has the minimal information
necessary to be found with typical queries and that patrons
will be able to understand what work we're talking about.
In most cases you should call set_presentation_ready_based_on_content
instead, which runs those checks.
"""
as_of = as_of or utc_now()
self.presentation_ready = True
self.presentation_ready_exception = None
self.presentation_ready_attempt = as_of
if not exclude_search:
self.external_index_needs_updating()
[docs] def set_presentation_ready_based_on_content(self, search_index_client=None):
"""Set this work as presentation ready, if it appears to
be ready based on its data.
Presentation ready means the book is ready to be shown to
patrons and (pending availability) checked out. It doesn't
necessarily mean the presentation is complete.
The absolute minimum data necessary is a title, a language,
and a medium. We don't need a cover or an author -- we can
fill in that info later if it exists.
TODO: search_index_client is redundant here.
"""
if (not self.presentation_edition
or not self.license_pools
or not self.title
or not self.language
or not self.presentation_edition.medium
):
self.presentation_ready = False
# The next time the search index WorkCoverageRecords are
# processed, this work will be removed from the search
# index.
self.external_index_needs_updating()
logging.warning("Work is not presentation ready: %r", self)
else:
self.set_presentation_ready(search_index_client=search_index_client)
[docs] def calculate_quality(self, identifier_ids, default_quality=0):
_db = Session.object_session(self)
# Relevant Measurements are direct measurements of popularity
# and quality, plus any quantity that might be mapppable to the 0..1
# range -- ratings, and measurements with an associated percentile
# score.
quantities = set([
Measurement.POPULARITY, Measurement.QUALITY, Measurement.RATING
])
quantities = quantities.union(list(Measurement.PERCENTILE_SCALES.keys()))
measurements = _db.query(Measurement).filter(
Measurement.identifier_id.in_(identifier_ids)).filter(
Measurement.is_most_recent==True).filter(
Measurement.quantity_measured.in_(quantities)).all()
self.quality = Measurement.overall_quality(
measurements, default_value=default_quality)
WorkCoverageRecord.add_for(
self, operation=WorkCoverageRecord.QUALITY_OPERATION
)
[docs] def assign_genres(self, identifier_ids, default_fiction=False, default_audience=Classifier.AUDIENCE_ADULT):
"""Set classification information for this work based on the
subquery to get equivalent identifiers.
:return: A boolean explaining whether or not any data actually
changed.
"""
classifier = WorkClassifier(self)
old_fiction = self.fiction
old_audience = self.audience
old_target_age = self.target_age
_db = Session.object_session(self)
classifications = Identifier.classifications_for_identifier_ids(
_db, identifier_ids
)
for classification in classifications:
classifier.add(classification)
(genre_weights, self.fiction, self.audience,
target_age) = classifier.classify(default_fiction=default_fiction,
default_audience=default_audience)
self.target_age = tuple_to_numericrange(target_age)
workgenres, workgenres_changed = self.assign_genres_from_weights(
genre_weights
)
classification_changed = (
workgenres_changed or
old_fiction != self.fiction or
old_audience != self.audience or
numericrange_to_tuple(old_target_age) != target_age
)
return classification_changed
[docs] def assign_genres_from_weights(self, genre_weights):
# Assign WorkGenre objects to the remainder.
from .classification import Genre
changed = False
_db = Session.object_session(self)
total_genre_weight = float(sum(genre_weights.values()))
workgenres = []
current_workgenres = _db.query(WorkGenre).filter(WorkGenre.work==self)
by_genre = dict()
for wg in current_workgenres:
by_genre[wg.genre] = wg
for g, score in list(genre_weights.items()):
affinity = score / total_genre_weight
if not isinstance(g, Genre):
g, ignore = Genre.lookup(_db, g.name)
if g in by_genre:
wg = by_genre[g]
is_new = False
del by_genre[g]
else:
wg, is_new = get_one_or_create(
_db, WorkGenre, work=self, genre=g)
if is_new or round(wg.affinity,2) != round(affinity, 2):
changed = True
wg.affinity = affinity
workgenres.append(wg)
# Any WorkGenre objects left over represent genres the Work
# was once classified under, but is no longer. Delete them.
for wg in list(by_genre.values()):
_db.delete(wg)
changed = True
# ensure that work_genres is up to date without having to read from database again
self.work_genres = workgenres
return workgenres, changed
[docs] def assign_appeals(self, character, language, setting, story,
cutoff=0.20):
"""Assign the given appeals to the corresponding database fields,
as well as calculating the primary and secondary appeal.
"""
self.appeal_character = character
self.appeal_language = language
self.appeal_setting = setting
self.appeal_story = story
c = Counter()
c[self.CHARACTER_APPEAL] = character
c[self.LANGUAGE_APPEAL] = language
c[self.SETTING_APPEAL] = setting
c[self.STORY_APPEAL] = story
primary, secondary = c.most_common(2)
if primary[1] > cutoff:
self.primary_appeal = primary[0]
else:
self.primary_appeal = self.UNKNOWN_APPEAL
if secondary[1] > cutoff:
self.secondary_appeal = secondary[0]
else:
self.secondary_appeal = self.NO_APPEAL
# This can be used in func.to_char to convert a SQL datetime into a string
# that Elasticsearch can parse as a date.
ELASTICSEARCH_TIME_FORMAT = 'YYYY-MM-DD"T"HH24:MI:SS"."MS'
[docs] @classmethod
def to_search_documents(cls, works, policy=None):
"""Generate search documents for these Works.
This is done by constructing an extremely complicated
SQL query. The code is ugly, but it's about 100 times
faster than using python to create documents for
each work individually. When working on the search
index, it's very important for this to be fast.
:param policy: A PresentationCalculationPolicy to use when
deciding how deep to go to find Identifiers equivalent to
these works.
"""
if not works:
return []
_db = Session.object_session(works[0])
# If this is a batch of search documents, postgres needs extra working
# memory to process the query quickly.
if len(works) > 50:
_db.execute("set work_mem='200MB'")
# This query gets relevant columns from Work and Edition for the Works we're
# interested in. The work_id, edition_id, and identifier_id columns are used
# by other subqueries to filter, and the remaining columns are used directly
# to create the json document.
works_alias = select(
[Work.id.label('work_id'),
Edition.id.label('edition_id'),
Edition.primary_identifier_id.label('identifier_id'),
Edition.title,
Edition.subtitle,
Edition.series,
Edition.series_position,
Edition.language,
Edition.sort_title,
Edition.author,
Edition.sort_author,
Edition.medium,
Edition.publisher,
Edition.imprint,
Edition.permanent_work_id,
Work.fiction,
Work.audience,
Work.summary_text,
Work.quality,
Work.rating,
Work.popularity,
Work.presentation_ready,
Work.presentation_edition_id,
func.extract(
"EPOCH",
Work.last_update_time,
).label('last_update_time')
],
Work.id.in_((w.id for w in works))
).select_from(
join(
Work, Edition,
Work.presentation_edition_id==Edition.id
)
).alias('works_alias')
work_id_column = literal_column(
works_alias.name + '.' + works_alias.c.work_id.name
)
work_presentation_edition_id_column = literal_column(
works_alias.name + '.' + works_alias.c.presentation_edition_id.name
)
work_quality_column = literal_column(
works_alias.name + '.' + works_alias.c.quality.name
)
def query_to_json(query):
"""Convert the results of a query to a JSON object."""
return select(
[func.row_to_json(literal_column(query.name))]
).select_from(query)
def query_to_json_array(query):
"""Convert the results of a query into a JSON array."""
return select(
[func.array_to_json(
func.array_agg(
func.row_to_json(
literal_column(query.name)
)))]
).select_from(query)
# This subquery gets Collection IDs for collections
# that own more than zero licenses for this book.
from .classification import (
Genre,
Subject,
)
from .customlist import CustomListEntry
from .licensing import LicensePool
# We need information about LicensePools for a few reasons:
#
# * We always want to filter out Works that are not available
# in any of the collections associated with a given library
# -- either because no licenses are owned, because the
# LicensePools are suppressed, or (TODO) because there are no
# delivery mechanisms.
# * A patron may want to sort a list of books by availability
# date.
# * A patron may want to show only books currently available,
# or only open-access books.
#
# Whenever LicensePool.open_access is changed, or
# licenses_available moves to zero or away from zero, the
# LicensePool signals that its Work needs reindexing.
#
# The work quality field is stored in the main document, but
# it's also stored here, so that we can apply a nested filter
# that combines quality with other fields found only in the subdocument.
def explicit_bool(label, t):
# Ensure we always generate True/False instead of
# True/None. Elasticsearch can't filter on null values.
return case([(t, True)], else_=False).label(label)
licensepools = select(
[
LicensePool.id.label('licensepool_id'),
LicensePool.data_source_id.label('data_source_id'),
LicensePool.collection_id.label('collection_id'),
LicensePool.open_access.label('open_access'),
LicensePool.suppressed,
explicit_bool(
'available',
or_(
LicensePool.unlimited_access,
LicensePool.self_hosted,
LicensePool.licenses_available > 0,
)
),
explicit_bool(
'licensed',
or_(
LicensePool.unlimited_access,
LicensePool.self_hosted,
LicensePool.licenses_owned > 0
)
),
work_quality_column,
Edition.medium,
func.extract(
"EPOCH",
LicensePool.availability_time,
).label('availability_time')
]
).where(
and_(
LicensePool.work_id==work_id_column,
work_presentation_edition_id_column==Edition.id,
or_(
LicensePool.open_access,
LicensePool.unlimited_access,
LicensePool.self_hosted,
LicensePool.licenses_owned>0,
),
)
).alias("licensepools_subquery")
licensepools_json = query_to_json_array(licensepools)
# This subquery gets CustomList IDs for all lists
# that contain the work.
#
# We also keep track of whether the work is featured on each
# list. This is used when determining which works should be
# featured for a lane based on CustomLists.
#
# And we keep track of the first time the work appears on the list.
# This is used when generating a crawlable feed for the customlist,
# which is ordered by a work's first appearance on the list.
customlists = select(
[
CustomListEntry.list_id.label('list_id'),
CustomListEntry.featured.label('featured'),
func.extract(
"EPOCH",
CustomListEntry.first_appearance,
).label('first_appearance')
]
).where(
CustomListEntry.work_id==work_id_column
).alias("listentries_subquery")
customlists_json = query_to_json_array(customlists)
# This subquery gets Contributors, filtered on edition_id.
contributors = select(
[Contributor.sort_name,
Contributor.display_name,
Contributor.family_name,
Contributor.lc,
Contributor.viaf,
Contribution.role,
]
).where(
Contribution.edition_id==literal_column(works_alias.name + "." + works_alias.c.edition_id.name)
).select_from(
join(
Contributor, Contribution,
Contributor.id==Contribution.contributor_id
)
).alias("contributors_subquery")
contributors_json = query_to_json_array(contributors)
# Use a subquery to get recursively equivalent Identifiers
# for the Edition's primary_identifier_id.
#
# NOTE: we don't reliably reindex works when this information
# changes, but it's not critical that this information be
# totally up to date -- we only use it for subject searches
# and recommendations. The index is completely rebuilt once a
# day, and that's good enough.
equivalent_identifiers = Identifier.recursively_equivalent_identifier_ids_query(
literal_column(
works_alias.name + "." + works_alias.c.identifier_id.name
),
policy=policy
).alias("equivalent_identifiers_subquery")
identifiers = select(
[
Identifier.identifier.label('identifier'),
Identifier.type.label('type'),
]
).where(
Identifier.id.in_(equivalent_identifiers)
).alias("identifier_subquery")
identifiers_json = query_to_json_array(identifiers)
# Map our constants for Subject type to their URIs.
scheme_column = case(
[(Subject.type==key, literal_column("'%s'" % val)) for key, val in list(Subject.uri_lookup.items())]
)
# If the Subject has a name, use that, otherwise use the Subject's identifier.
# Also, 3M's classifications have slashes, e.g. "FICTION/Adventure". Make sure
# we get separated words for search.
term_column = func.replace(case([(Subject.name != None, Subject.name)], else_=Subject.identifier), "/", " ")
# Normalize by dividing each weight by the sum of the weights for that Identifier's Classifications.
from .classification import Classification
weight_column = func.sum(Classification.weight) / func.sum(func.sum(Classification.weight)).over()
# The subquery for Subjects, with those three columns. The labels will become keys in json objects.
subjects = select(
[scheme_column.label('scheme'),
term_column.label('term'),
weight_column.label('weight'),
],
# Only include Subjects with terms that are useful for search.
and_(Subject.type.in_(Subject.TYPES_FOR_SEARCH),
term_column != None)
).group_by(
scheme_column, term_column
).where(
Classification.identifier_id.in_(equivalent_identifiers)
).select_from(
join(Classification, Subject, Classification.subject_id==Subject.id)
).alias("subjects_subquery")
subjects_json = query_to_json_array(subjects)
# Subquery for genres.
genres = select(
# All Genres have the same scheme - the simplified genre URI.
[literal_column("'%s'" % Subject.SIMPLIFIED_GENRE).label('scheme'),
Genre.name,
Genre.id.label('term'),
WorkGenre.affinity.label('weight'),
]
).where(
WorkGenre.work_id==literal_column(works_alias.name + "." + works_alias.c.work_id.name)
).select_from(
join(WorkGenre, Genre, WorkGenre.genre_id==Genre.id)
).alias("genres_subquery")
genres_json = query_to_json_array(genres)
target_age = cls.target_age_query(
literal_column(works_alias.name + "." + works_alias.c.work_id.name)
).alias('target_age_subquery')
target_age_json = query_to_json(target_age)
# Now, create a query that brings together everything we need for the final
# search document.
search_data = select(
[works_alias.c.work_id.label("_id"),
works_alias.c.work_id.label("work_id"),
works_alias.c.title,
works_alias.c.sort_title,
works_alias.c.subtitle,
works_alias.c.series,
works_alias.c.series_position,
works_alias.c.language,
works_alias.c.author,
works_alias.c.sort_author,
works_alias.c.medium,
works_alias.c.publisher,
works_alias.c.imprint,
works_alias.c.permanent_work_id,
works_alias.c.presentation_ready,
works_alias.c.last_update_time,
# Convert true/false to "Fiction"/"Nonfiction".
case(
[(works_alias.c.fiction==True, literal_column("'Fiction'"))],
else_=literal_column("'Nonfiction'")
).label("fiction"),
# Replace "Young Adult" with "YoungAdult" and "Adults Only" with "AdultsOnly".
func.replace(works_alias.c.audience, " ", "").label('audience'),
works_alias.c.summary_text.label('summary'),
works_alias.c.quality,
works_alias.c.rating,
works_alias.c.popularity,
# Here are all the subqueries.
licensepools_json.label("licensepools"),
customlists_json.label("customlists"),
contributors_json.label("contributors"),
identifiers_json.label("identifiers"),
subjects_json.label("classifications"),
genres_json.label('genres'),
target_age_json.label('target_age'),
]
).select_from(
works_alias
).alias("search_data_subquery")
# Finally, convert everything to json.
search_json = query_to_json(search_data)
result = _db.execute(search_json)
if result:
return [r[0] for r in result]
[docs] @classmethod
def target_age_query(self, foreign_work_id_field):
# If the upper limit of the target age is inclusive, we leave
# it alone. Otherwise, we subtract one to make it inclusive.
upper_field = func.upper(Work.target_age)
upper = case(
[(func.upper_inc(Work.target_age), upper_field)],
else_=upper_field-1
).label('upper')
# If the lower limit of the target age is inclusive, we leave
# it alone. Otherwise, we add one to make it inclusive.
lower_field = func.lower(Work.target_age)
lower = case(
[(func.lower_inc(Work.target_age), lower_field)],
else_=lower_field+1
).label('lower')
# Subquery for target age. This has to be a subquery so it can
# become a nested object in the final json.
target_age = select(
[upper, lower]
).where(
Work.id==foreign_work_id_field
)
return target_age
[docs] def to_search_document(self):
"""Generate a search document for this Work."""
return Work.to_search_documents([self])[0]
[docs] def mark_licensepools_as_superceded(self):
"""Make sure that all but the single best open-access LicensePool for
this Work are superceded. A non-open-access LicensePool should
never be superceded, and this method will mark them as
un-superceded.
"""
champion_open_access_license_pool = None
for pool in self.license_pools:
if not pool.open_access:
pool.superceded = False
continue
if pool.better_open_access_pool_than(champion_open_access_license_pool):
if champion_open_access_license_pool:
champion_open_access_license_pool.superceded = True
champion_open_access_license_pool = pool
pool.superceded = False
else:
pool.superceded = True
[docs] @classmethod
def restrict_to_custom_lists_from_data_source(
cls, _db, base_query, data_source, on_list_as_of=None):
"""Annotate a query that joins Work against Edition to match only
Works that are on a custom list from the given data source."""
condition = CustomList.data_source==data_source
return cls._restrict_to_customlist_subquery_condition(
_db, base_query, condition, on_list_as_of)
[docs] @classmethod
def restrict_to_custom_lists(
cls, _db, base_query, custom_lists, on_list_as_of=None):
"""Annotate a query that joins Work against Edition to match only
Works that are on one of the given custom lists."""
condition = CustomList.id.in_([x.id for x in custom_lists])
return cls._restrict_to_customlist_subquery_condition(
_db, base_query, condition, on_list_as_of)
@classmethod
def _restrict_to_customlist_subquery_condition(
cls, _db, base_query, condition, on_list_as_of=None):
"""Annotate a query that joins Work against Edition to match only
Works that are on a custom list from the given data source."""
# Find works that are on a list that meets the given condition.
qu = base_query.join(LicensePool.custom_list_entries).join(
CustomListEntry.customlist)
if on_list_as_of:
qu = qu.filter(
CustomListEntry.most_recent_appearance >= on_list_as_of)
qu = qu.filter(condition)
return qu
[docs] def classifications_with_genre(self):
from .classification import (
Classification,
Subject,
)
_db = Session.object_session(self)
identifier = self.presentation_edition.primary_identifier
return _db.query(Classification) \
.join(Subject) \
.filter(Classification.identifier_id == identifier.id) \
.filter(Subject.genre_id != None) \
.order_by(Classification.weight.desc())
[docs] def top_genre(self):
from .classification import Genre
_db = Session.object_session(self)
genre = _db.query(Genre) \
.join(WorkGenre) \
.filter(WorkGenre.work_id == self.id) \
.order_by(WorkGenre.affinity.desc()) \
.first()
return genre.name if genre else None
[docs] def delete(self, search_index=None):
"""Delete the work from both the DB and search index."""
_db = Session.object_session(self)
if search_index is None:
try:
from ..external_search import ExternalSearchIndex
search_index = ExternalSearchIndex(_db)
except CannotLoadConfiguration as e:
# No search index is configured. This is fine -- just skip that part.
pass
if search_index is not None:
search_index.remove_work(self)
_db.delete(self)