from io import BytesIO
from flask_babel import lazy_gettext as _
import re
from pymarc import (
Field,
Record,
MARCWriter
)
from .config import (
Configuration,
CannotLoadConfiguration,
)
from .lane import BaseFacets
from .external_search import (
ExternalSearchIndex,
SortKeyPagination,
)
from .model import (
get_one,
get_one_or_create,
CachedMARCFile,
Collection,
ConfigurationSetting,
DeliveryMechanism,
Edition,
ExternalIntegration,
Identifier,
Representation,
Session,
Work,
)
from .classifier import Classifier
from .mirror import MirrorUploader
from .s3 import S3Uploader
from .lane import Lane
from .util import LanguageCodes
from .util.datetime_helpers import utc_now
[docs]class Annotator(object):
"""The Annotator knows how to add information about a Work to
a MARC record."""
marc_cache_field = Work.marc_record.name
# From https://www.loc.gov/standards/valuelist/marctarget.html
AUDIENCE_TERMS = {
Classifier.AUDIENCE_CHILDREN: "Juvenile",
Classifier.AUDIENCE_YOUNG_ADULT: "Adolescent",
Classifier.AUDIENCE_ADULTS_ONLY: "Adult",
Classifier.AUDIENCE_ADULT: "General",
}
# TODO: Add remaining formats. Maybe there's a better place to
# store this so it's easier to keep up-to-date.
# There doesn't seem to be any particular vocabulary for this.
FORMAT_TERMS = {
(Representation.EPUB_MEDIA_TYPE, DeliveryMechanism.NO_DRM): "EPUB eBook",
(Representation.EPUB_MEDIA_TYPE, DeliveryMechanism.ADOBE_DRM): "Adobe EPUB eBook",
(Representation.PDF_MEDIA_TYPE, DeliveryMechanism.NO_DRM): "PDF eBook",
(Representation.PDF_MEDIA_TYPE, DeliveryMechanism.ADOBE_DRM): "Adobe PDF eBook",
}
[docs] def annotate_work_record(self, work, active_license_pool, edition,
identifier, record, integration=None, updated=None):
"""Add metadata from this work to a MARC record.
:work: The Work whose record is being annotated.
:active_license_pool: Of all the LicensePools associated with this
Work, the client has expressed interest in this one.
:edition: The Edition to use when associating bibliographic
metadata with this entry.
:identifier: Of all the Identifiers associated with this
Work, the client has expressed interest in this one.
:param record: A MARCRecord object to be annotated.
"""
self.add_distributor(record, active_license_pool)
self.add_formats(record, active_license_pool)
[docs] @classmethod
def leader(cls, work):
# The record length is automatically updated once fields are added.
initial_record_length = "00000"
record_status = "n" # New record
if getattr(work, cls.marc_cache_field):
record_status = "c" # Corrected or revised
# Distributors consistently seem to use type "a" - language material - for
# ebooks, though there is also type "m" for computer files.
record_type = "a"
bibliographic_level = "m" # Monograph/item
leader = initial_record_length + record_status + record_type + bibliographic_level
# Additional information about the record that's always the same.
leader += " 2200000 4500"
return leader
[docs] @classmethod
def add_control_fields(cls, record, identifier, pool, edition):
# Unique identifier for this record.
record.add_field(
Field(tag="001", data=identifier.urn))
# Field 003 (MARC organization code) is library-specific, so it's added separately.
record.add_field(
Field(tag="005", data=utc_now().strftime("%Y%m%d%H%M%S.0")))
# Field 006: m = computer file, d = the file is a document
record.add_field(
Field(tag="006", data="m d "))
# Field 007: more details about electronic resource
# Since this depends on the pool, it might be better not to cache it.
# But it's probably not a huge problem if it's outdated.
# File formats: a=one format, m=multiple formats, u=unknown
if len(pool.delivery_mechanisms) == 1:
file_formats_code = "a"
else:
file_formats_code = "m"
record.add_field(
Field(tag="007", data="cr cn ---" + file_formats_code + "nuuu"))
# Field 008 (fixed-length data elements):
data = utc_now().strftime("%y%m%d")
publication_date = edition.issued or edition.published
if publication_date:
date_type = "s" # single known date
# Not using strftime because some years are pre-1900.
date_value = "%04i" % publication_date.year
else:
date_type = "n" # dates unknown
date_value = " "
data += date_type + date_value
data += " "
# TODO: Start tracking place of publication when available. Since we don't have
# this yet, assume everything was published in the US.
data += "xxu"
data += " "
language = "eng"
if edition.language:
language = LanguageCodes.string_to_alpha_3(edition.language)
data += language
data += " "
record.add_field(
Field(tag="008", data=data))
[docs] @classmethod
def add_marc_organization_code(cls, record, marc_org):
record.add_field(
Field(tag="003", data=marc_org))
[docs] @classmethod
def add_isbn(cls, record, identifier):
# Add the ISBN if we have one.
isbn = None
if identifier.type == Identifier.ISBN:
isbn = identifier
if not isbn:
_db = Session.object_session(identifier)
identifier_ids = identifier.equivalent_identifier_ids()[identifier.id]
isbn = _db.query(Identifier).filter(
Identifier.type==Identifier.ISBN).filter(
Identifier.id.in_(identifier_ids)).order_by(
Identifier.id).first()
if isbn:
record.add_field(
Field(
tag="020",
indicators=[" "," "],
subfields=[
"a", isbn.identifier,
]))
[docs] @classmethod
def add_title(cls, record, edition):
# Non-filing characters are used to indicate when the beginning of a title
# should not be used in sorting. This code tries to identify them by comparing
# the title and the sort_title.
non_filing_characters = 0
if edition.title != edition.sort_title and ("," in edition.sort_title):
stemmed = edition.sort_title[:edition.sort_title.rindex(",")]
non_filing_characters = edition.title.index(stemmed)
# MARC only supports up to 9 non-filing characters, but if we got more
# something is probably wrong anyway.
if non_filing_characters > 9:
non_filing_characters = 0
subfields = ["a", str(edition.title or "")]
if edition.subtitle:
subfields += ["b", str(edition.subtitle)]
if edition.author:
subfields += ["c", str(edition.author)]
record.add_field(
Field(
tag="245",
indicators=["0", non_filing_characters],
subfields=subfields,
))
[docs] @classmethod
def add_contributors(cls, record, edition):
"""Create contributor fields for this edition.
TODO: Use canonical names from LoC.
"""
contibutor_fields = []
# If there's one author, use the 100 field.
if edition.sort_author and len(edition.contributions) == 1:
record.add_field(
Field(
tag="100",
indicators=["1"," "],
subfields=[
"a", str(edition.sort_author),
]))
if len(edition.contributions) > 1:
for contribution in edition.contributions:
contributor = contribution.contributor
record.add_field(
Field(
tag="700",
indicators=["1", " "],
subfields=[
"a", str(contributor.sort_name),
"e", contribution.role,
]))
[docs] @classmethod
def add_publisher(cls, record, edition):
if edition.publisher:
publication_date = edition.issued or edition.published
year = ""
if publication_date:
year = str(publication_date.year)
record.add_field(
Field(
tag="264",
indicators=[" ", "1"],
subfields=[
"a", "[Place of publication not identified]",
"b", str(edition.publisher or ""),
"c", year,
]))
[docs] @classmethod
def add_distributor(cls, record, pool):
# Distributor
record.add_field(
Field(
tag="264",
indicators=[" ", "2"],
subfields=[
"b", str(pool.data_source.name),
]))
[docs] @classmethod
def add_physical_description(cls, record, edition):
# These 3xx fields are for a physical description of the item.
if edition.medium == Edition.BOOK_MEDIUM:
record.add_field(
Field(
tag="300",
indicators=[" ", " "],
subfields=[
"a", "1 online resource",
]))
record.add_field(
Field(
tag="336",
indicators=[" ", " "],
subfields=[
"a", "text",
"b", "txt",
"2", "rdacontent"
]))
elif edition.medium == Edition.AUDIO_MEDIUM:
record.add_field(
Field(
tag="300",
indicators=[" ", " "],
subfields=[
"a", "1 sound file",
"b", "digital",
]))
record.add_field(
Field(
tag="336",
indicators=[" ", " "],
subfields=[
"a", "spoken word",
"b", "spw",
"2", "rdacontent"
]))
record.add_field(
Field(
tag="337",
indicators=[" ", " "],
subfields=[
"a", "computer",
"b", "c",
"2", "rdamedia"
]))
record.add_field(
Field(
tag="338",
indicators=[" ", " "],
subfields=[
"a", "online resource",
"b", "cr",
"2", "rdacarrier",
]))
file_type = None
if edition.medium == Edition.BOOK_MEDIUM:
file_type = "text file"
elif edition.medium == Edition.AUDIO_MEDIUM:
file_type = "audio file"
if file_type:
record.add_field(
Field(
tag="347",
indicators=[" ", " "],
subfields=[
"a", file_type,
"2", "rda",
]))
# Form of work
form = None
if edition.medium == Edition.BOOK_MEDIUM:
form = "eBook"
elif edition.medium == Edition.AUDIO_MEDIUM:
# This field doesn't seem to be used for audio.
pass
if form:
record.add_field(
Field(
tag="380",
indicators=[" ", " "],
subfields=[
"a", "eBook",
"2", "tlcgt",
]))
[docs] @classmethod
def add_audience(cls, record, work):
audience = cls.AUDIENCE_TERMS.get(work.audience, "General")
record.add_field(
Field(
tag="385",
indicators=[" ", " "],
subfields=[
"a", audience,
"2", "tlctarget",
]))
[docs] @classmethod
def add_series(cls, record, edition):
if edition.series:
subfields = ["a", str(edition.series)]
if edition.series_position:
subfields.extend(["v", str(edition.series_position)])
record.add_field(
Field(
tag="490",
indicators=["0", " "],
subfields=subfields,
))
[docs] @classmethod
def add_system_details(cls, record):
record.add_field(
Field(
tag="538",
indicators=[" ", " "],
subfields=[
"a", "Mode of access: World Wide Web."
]))
[docs] @classmethod
def add_summary(cls, record, work):
summary = work.summary_text
if summary:
stripped = re.sub('<[^>]+?>', ' ', summary)
record.add_field(
Field(
tag="520",
indicators=[" ", " "],
subfields=[
"a", stripped.encode('ascii', 'ignore'),
]))
[docs] @classmethod
def add_simplified_genres(cls, record, work):
"""Create subject fields for this work."""
genres = []
genres = work.genres
for genre in genres:
record.add_field(
Field(
tag="650",
indicators=["0", "7"],
subfields=[
"a", genre.name,
"2", "Library Simplified",
]))
[docs] @classmethod
def add_ebooks_subject(cls, record):
# This is a general subject that can be added to all records.
record.add_field(
Field(
tag="655",
indicators=[" ", "0"],
subfields=[
"a", "Electronic books.",
]))
[docs]class MARCExporterFacets(BaseFacets):
"""A faceting object used to configure the search engine so that
it only works updated since a certain time.
"""
def __init__(self, start_time):
self.start_time = start_time
[docs] def modify_search_filter(self, filter):
filter.order = self.SORT_ORDER_TO_ELASTICSEARCH_FIELD_NAME[
self.ORDER_LAST_UPDATE
]
filter.order_ascending = True
filter.updated_after = self.start_time
[docs]class MARCExporter(object):
"""Turn a work into a record for a MARC file."""
NAME = ExternalIntegration.MARC_EXPORT
DESCRIPTION = _("Export metadata into MARC files that can be imported into an ILS manually.")
# This setting (in days) controls how often MARC files should be
# automatically updated. Since the crontab in docker isn't easily
# configurable, we can run a script daily but check this to decide
# whether to do anything.
UPDATE_FREQUENCY = "marc_update_frequency"
DEFAULT_UPDATE_FREQUENCY = 30
# MARC organization codes are assigned by the
# Library of Congress and can be found here:
# http://www.loc.gov/marc/organizations/org-search.php
MARC_ORGANIZATION_CODE = "marc_organization_code"
WEB_CLIENT_URL = 'marc_web_client_url'
INCLUDE_SUMMARY = 'include_summary'
INCLUDE_SIMPLIFIED_GENRES = 'include_simplified_genres'
LIBRARY_SETTINGS = [
{ "key": UPDATE_FREQUENCY,
"label": _("Update frequency (in days)"),
"description": _("The circulation manager will wait this number of days between generating MARC files."),
"type": "number",
"default": DEFAULT_UPDATE_FREQUENCY,
},
{ "key": MARC_ORGANIZATION_CODE,
"label": _("The MARC organization code for this library (003 field)."),
"description": _("MARC organization codes are assigned by the Library of Congress."),
},
{
"key": WEB_CLIENT_URL,
"label": _("The base URL for the web catalog for this library, for the 856 field."),
"description": _("If using a library registry that provides a web catalog, this can be left blank."),
},
{ "key": INCLUDE_SUMMARY,
"label": _("Include summaries in MARC records (520 field)"),
"type": "select",
"options": [
{ "key": "false", "label": _("Do not include summaries") },
{ "key": "true", "label": _("Include summaries") },
],
"default": "false",
},
{ "key": INCLUDE_SIMPLIFIED_GENRES,
"label": _("Include Library Simplified genres in MARC records (650 fields)"),
"type": "select",
"options": [
{ "key": "false", "label": _("Do not include Library Simplified genres") },
{ "key": "true", "label": _("Include Library Simplified genres") },
],
"default": "false",
},
]
NO_MIRROR_INTEGRATION = "NO_MIRROR"
DEFAULT_MIRROR_INTEGRATION = dict(
key=NO_MIRROR_INTEGRATION,
label=_("None - Do not mirror MARC files")
)
SETTING = {
"key": "mirror_integration_id",
"label": _("MARC Mirror"),
"description": _("Storage protocol to use for uploading generated MARC files. The service must already be configured under 'Storage Services'."),
"type": "select",
"options" : [DEFAULT_MIRROR_INTEGRATION]
}
[docs] @classmethod
def from_config(cls, library):
_db = Session.object_session(library)
integration = ExternalIntegration.lookup(
_db, ExternalIntegration.MARC_EXPORT,
ExternalIntegration.CATALOG_GOAL, library=library
)
if not integration:
raise CannotLoadConfiguration(
"No MARC export service is configured for this library"
)
return cls(_db, library, integration)
def __init__(self, _db, library, integration):
self._db = _db
self.library = library
self.integration = integration
[docs] @classmethod
def get_storage_settings(cls, _db):
integrations = ExternalIntegration.for_goal(
_db, ExternalIntegration.STORAGE_GOAL
)
cls.SETTING['options'] = [cls.DEFAULT_MIRROR_INTEGRATION]
for integration in integrations:
# Only add an integration to choose from if it has a
# MARC File Bucket field in its settings.
configuration_settings = [s for s in integration.settings if s.key=="marc_bucket"]
if configuration_settings:
if configuration_settings[0].value:
cls.SETTING['options'].append(
dict(key=str(integration.id), label=integration.name)
)
return cls.SETTING
[docs] @classmethod
def create_record(cls, work, annotator, force_create=False, integration=None):
"""Build a complete MARC record for a given work."""
if callable(annotator):
annotator = annotator()
pool = work.active_license_pool()
if not pool:
return None
edition = pool.presentation_edition
identifier = pool.identifier
_db = Session.object_session(work)
record = None
existing_record = getattr(work, annotator.marc_cache_field)
if existing_record and not force_create:
record = Record(data=existing_record.encode("utf-8"), force_utf8=True)
if not record:
record = Record(leader=annotator.leader(work), force_utf8=True)
annotator.add_control_fields(record, identifier, pool, edition)
annotator.add_isbn(record, identifier)
# TODO: The 240 and 130 fields are for translated works, so they can be grouped even
# though they have different titles. We do not group editions of the same work in
# different languages, so we can't use those yet.
annotator.add_title(record, edition)
annotator.add_contributors(record, edition)
annotator.add_publisher(record, edition)
annotator.add_physical_description(record, edition)
annotator.add_audience(record, work)
annotator.add_series(record, edition)
annotator.add_system_details(record)
annotator.add_ebooks_subject(record)
data = record.as_marc()
setattr(work, annotator.marc_cache_field, data.decode("utf8"))
# Add additional fields that should not be cached.
annotator.annotate_work_record(work, pool, edition, identifier, record, integration)
return record
[docs] def records(self, lane, annotator, mirror_integration, start_time=None,
force_refresh=False, mirror=None, search_engine=None,
query_batch_size=500, upload_batch_size=7500,
):
"""
Create and export a MARC file for the books in a lane.
:param lane: The Lane to export books from.
:param annotator: The Annotator to use when creating MARC records.
:param mirror_integration: The mirror integration to use for MARC files.
:param start_time: Only include records that were created or modified after this time.
:param force_refresh: Create new records even when cached records are available.
:param mirror: Optional mirror to use instead of loading one from configuration.
:param query_batch_size: Number of works to retrieve with a single Elasticsearch query.
:param upload_batch_size: Number of records to mirror at a time. This is different
from query_batch_size because S3 enforces a minimum size of 5MB for all parts
of a multipart upload except the last, but 5MB of records would be too many
works for a single query.
"""
# We mirror the content, if it's not empty. If it's empty, we create a CachedMARCFile
# and Representation, but don't actually mirror it.
if not mirror:
storage_protocol = mirror_integration.protocol
mirror = MirrorUploader.implementation(mirror_integration)
if mirror.NAME != storage_protocol:
raise Exception("Mirror integration does not match configured storage protocol")
if not mirror:
raise Exception("No mirror integration is configured")
search_engine = search_engine or ExternalSearchIndex(self._db)
# End time is before we start the query, because if any records are changed
# during the processing we may not catch them, and they should be handled
# again on the next run.
end_time = utc_now()
facets = MARCExporterFacets(start_time=start_time)
pagination = SortKeyPagination(size=query_batch_size)
url = mirror.marc_file_url(self.library, lane, end_time, start_time)
representation, ignore = get_one_or_create(
self._db, Representation, url=url,
media_type=Representation.MARC_MEDIA_TYPE
)
with mirror.multipart_upload(representation, url) as upload:
this_batch = BytesIO()
this_batch_size = 0
while pagination is not None:
# Retrieve one 'page' of works from the search index.
works = lane.works(
self._db, pagination=pagination, facets=facets,
search_engine=search_engine
)
for work in works:
# Create a record for each work and add it to the
# MARC file in progress.
record = self.create_record(
work, annotator, force_refresh, self.integration
)
if record:
this_batch.write(record.as_marc())
this_batch_size += pagination.this_page_size
if this_batch_size >= upload_batch_size:
# We've reached or exceeded the upload threshold.
# Upload one part of the multi-part document.
self._upload_batch(this_batch, upload)
this_batch = BytesIO()
this_batch_size = 0
pagination = pagination.next_page
# Upload the final part of the multi-document, if
# necessary.
self._upload_batch(this_batch, upload)
representation.fetched_at = end_time
if not representation.mirror_exception:
cached, is_new = get_one_or_create(
self._db, CachedMARCFile, library=self.library,
lane=(lane if isinstance(lane, Lane) else None),
start_time=start_time,
create_method_kwargs=dict(representation=representation))
if not is_new:
cached.representation = representation
cached.end_time = end_time
def _upload_batch(self, output, upload):
"Upload a batch of MARC records as one part of a multi-part upload."
content = output.getvalue()
if content:
upload.upload_part(content)
output.close()