Source code for api.local_analytics_exporter

import logging
import unicodecsv as csv
from io import BytesIO

from sqlalchemy.sql import (
    func,
    select,
)
from sqlalchemy.sql.expression import (
    and_,
    case,
    literal_column,
    join,
    or_,
)

from core.model import (
    CirculationEvent,
    Edition,
    Genre,
    Identifier,
    LicensePool,
    Work,
    WorkGenre,
)

[docs]class LocalAnalyticsExporter(object): """Export large numbers of analytics events in CSV format."""
[docs] def export(self, _db, start, end, locations=None, library=None): # Get the results from the database. query = self.analytics_query(start, end, locations, library) results = _db.execute(query) # Write the CSV file to a BytesIO. header = [ "time", "event", "identifier", "identifier_type", "title", "author", "fiction", "audience", "publisher", "imprint", "language", "target_age", "genres", "location" ] output = BytesIO() writer = csv.writer(output, encoding="utf-8") writer.writerow(header) writer.writerows(results) return output.getvalue().decode("utf-8")
[docs] def analytics_query(self, start, end, locations=None, library=None): """Build a database query that fetches rows of analytics data. This method uses low-level SQLAlchemy code to do all calculations and data conversations in the database. It's modeled after Work.to_search_documents, which generates a large JSON document entirely in the database. :return: An iterator of results, each of which can be written directly to a CSV file. """ clauses = [ CirculationEvent.start >= start, CirculationEvent.start < end, ] if locations: event_types = [ CirculationEvent.CM_CHECKOUT, CirculationEvent.CM_FULFILL, CirculationEvent.OPEN_BOOK ] locations = locations.strip().split(",") clauses += [ CirculationEvent.type.in_(event_types), CirculationEvent.location.in_(locations), ] if library: clauses += [ CirculationEvent.library == library ] # Build the primary query. This is a query against the # CirculationEvent table and a few other tables joined against # it. This makes up the bulk of the data. events_alias = select( [ func.to_char( CirculationEvent.start, "YYYY-MM-DD HH24:MI:SS" ).label("start"), CirculationEvent.type.label("event_type"), Identifier.identifier, Identifier.type.label("identifier_type"), Edition.sort_title, Edition.sort_author, case( [(Work.fiction==True, literal_column("'fiction'"))], else_=literal_column("'nonfiction'") ).label("fiction"), Work.id.label("work_id"), Work.audience, Edition.publisher, Edition.imprint, Edition.language, CirculationEvent.location, ], ).select_from( join( CirculationEvent, LicensePool, CirculationEvent.license_pool_id==LicensePool.id ).join( Identifier, LicensePool.identifier_id==Identifier.id ).join( Work, Work.id==LicensePool.work_id ).join( Edition, Work.presentation_edition_id==Edition.id ) ).where( and_(*clauses) ).order_by( CirculationEvent.start.asc() ).alias("events_alias") # A subquery can hook into the main query by referencing its # 'work_id' field in its WHERE clause. work_id_column = literal_column( events_alias.name + "." + events_alias.c.work_id.name ) # This subquery gets the names of a Work's genres as a single # comma-separated string. # # This Alias selects some number of rows, each containing one # string column (Genre.name). Genres with higher affinities with # this work go first. genres_alias = select( [Genre.name.label("genre_name")] ).select_from( join( WorkGenre, Genre, WorkGenre.genre_id==Genre.id ) ).where( WorkGenre.work_id==work_id_column ).order_by( WorkGenre.affinity.desc(), Genre.name ).alias("genres_subquery") # Use array_agg() to consolidate the rows into one row -- this # gives us a single value, an array of strings, for each # Work. Then use array_to_string to convert the array into a # single comma-separated string. genres = select( [ func.array_to_string( func.array_agg(genres_alias.c.genre_name), "," ) ] ).select_from(genres_alias) # This subquery gets the a Work's target age as a single string. # # This Alias selects two fields: the lower and upper bounds of # the Work's target age. This reuses code originally written # for Work.to_search_documents(). target_age = Work.target_age_query(work_id_column).alias( "target_age_subquery" ) # Concatenate the lower and upper bounds with a dash in the # middle. If both lower and upper bound are empty, just give # the empty string. This simulates the behavior of # Work.target_age_string. target_age_string = select([ case( [ (or_(target_age.c.lower != None, target_age.c.upper != None), func.concat(target_age.c.lower, "-", target_age.c.upper)) ], else_=literal_column("''") ) ]).select_from(target_age) # Build the main query out of the subqueries. events = events_alias.c query = select( [ events.start, events.event_type, events.identifier, events.identifier_type, events.sort_title, events.sort_author, events.fiction, events.audience, events.publisher, events.imprint, events.language, target_age_string.label('target_age'), genres.label('genres'), events.location, ] ).select_from( events_alias ) return query