Source code for api.lcp.server

import json
import os
import urllib.parse

import requests
from flask_babel import lazy_gettext as _
from requests.auth import HTTPBasicAuth

from api.lcp import utils
from api.lcp.encrypt import LCPEncryptionResult, LCPEncryptorResultJSONEncoder
from api.lcp.hash import HashingAlgorithm
from core.model.configuration import ConfigurationGrouping, ConfigurationMetadata, ConfigurationAttributeType, \
    ConfigurationOption


[docs]class LCPServerConfiguration(ConfigurationGrouping): """Contains LCP License Server's settings""" DEFAULT_PAGE_SIZE = 100 DEFAULT_PASSPHRASE_HINT = 'If you do not remember your passphrase, please contact your administrator' DEFAULT_ENCRYPTION_ALGORITHM = HashingAlgorithm.SHA256.value lcpserver_url = ConfigurationMetadata( key='lcpserver_url', label=_('LCP License Server\'s URL'), description=_('URL of the LCP License Server'), type=ConfigurationAttributeType.TEXT, required=True ) lcpserver_user = ConfigurationMetadata( key='lcpserver_user', label=_('LCP License Server\'s user'), description=_('Name of the user used to connect to the LCP License Server'), type=ConfigurationAttributeType.TEXT, required=True ) lcpserver_password = ConfigurationMetadata( key='lcpserver_password', label=_('LCP License Server\'s password'), description=_('Password of the user used to connect to the LCP License Server'), type=ConfigurationAttributeType.TEXT, required=True ) lcpserver_input_directory = ConfigurationMetadata( key='lcpserver_input_directory', label=_('LCP License Server\'s input directory'), description=_( 'Full path to the directory containing encrypted books. ' 'This directory should be the same as lcpencrypt\'s output directory' ), type=ConfigurationAttributeType.TEXT, required=True ) lcpserver_page_size = ConfigurationMetadata( key='lcpserver_page_size', label=_('LCP License Server\'s page size'), description=_('Number of licences returned by the server'), type=ConfigurationAttributeType.NUMBER, required=False, default=DEFAULT_PAGE_SIZE ) provider_name = ConfigurationMetadata( key='provider_name', label=_('LCP service provider\'s identifier'), description=_( 'URI that identifies the provider in an unambiguous way' ), type=ConfigurationAttributeType.TEXT, required=True ) passphrase_hint = ConfigurationMetadata( key='passphrase_hint', label=_('Passphrase hint'), description=_('Hint proposed to the user for selecting their passphrase'), type=ConfigurationAttributeType.TEXT, required=False, default=DEFAULT_PASSPHRASE_HINT ) encryption_algorithm = ConfigurationMetadata( key='encryption_algorithm', label=_('Passphrase encryption algorithm'), description=_('Algorithm used for encrypting the passphrase'), type=ConfigurationAttributeType.SELECT, required=False, default=DEFAULT_ENCRYPTION_ALGORITHM, options=ConfigurationOption.from_enum(HashingAlgorithm) ) max_printable_pages = ConfigurationMetadata( key='max_printable_pages', label=_('Maximum number or printable pages'), description=_('Maximum number of pages that can be printed over the lifetime of the license'), type=ConfigurationAttributeType.NUMBER, required=False ) max_copiable_pages = ConfigurationMetadata( key='max_copiable_pages', label=_('Maximum number or copiable characters'), description=_('Maximum number of characters that can be copied to the clipboard'), type=ConfigurationAttributeType.NUMBER, required=False )
[docs]class LCPServer(object): """Wrapper around LCP License Server's API""" def __init__(self, configuration_storage, configuration_factory, hasher_factory, credential_factory): """Initializes a new instance of LCPServer class :param configuration_storage: ConfigurationStorage object :type configuration_storage: ConfigurationStorage :param configuration_factory: Factory creating LCPEncryptionConfiguration instance :type configuration_factory: api.config.ConfigurationFactory :param hasher_factory: Factory responsible for creating Hasher implementations :type hasher_factory: hash.HasherFactory :param credential_factory: Factory responsible for creating Hasher implementations :type credential_factory: credential.CredentialFactory """ self._configuration_storage = configuration_storage self._configuration_factory = configuration_factory self._hasher_factory = hasher_factory self._credential_factory = credential_factory self._hasher_instance = None def _get_hasher(self, configuration): """Returns a Hasher instance :param configuration: Configuration object :type configuration: LCPServerConfiguration :return: Hasher instance :rtype: hash.Hasher """ if self._hasher_instance is None: self._hasher_instance = self._hasher_factory.create( configuration.encryption_algorithm) return self._hasher_instance def _create_partial_license(self, db, configuration, patron, license_start=None, license_end=None): """Creates a partial LCP license used an input by the LCP License Server for generation of LCP licenses :param configuration: Configuration object :type configuration: LCPServerConfiguration :param patron: Patron object :type patron: Patron :param license_start: Date and time when the license begins :type license_start: Optional[datetime.datetime] :param license_end: Date and time when the license ends :type license_end: Optional[datetime.datetime] :return: Partial LCP license :rtype: Dict """ hasher = self._get_hasher(configuration) hashed_passphrase = hasher.hash(self._credential_factory.get_patron_passphrase(db, patron)) self._credential_factory.set_hashed_passphrase(db, patron, hashed_passphrase) partial_license = { 'provider': configuration.provider_name, 'encryption': { 'user_key': { 'text_hint': configuration.passphrase_hint, 'hex_value': hashed_passphrase, } } } if patron: partial_license['user'] = { 'id': self._credential_factory.get_patron_id(db, patron) } rights_fields = [ license_start, license_end, configuration.max_printable_pages, configuration.max_copiable_pages] if any([rights_field is not None and rights_field != '' for rights_field in rights_fields]): partial_license['rights'] = {} if license_start: partial_license['rights']['start'] = utils.format_datetime(license_start) if license_end: partial_license['rights']['end'] = utils.format_datetime(license_end) if configuration.max_printable_pages is not None and configuration.max_printable_pages != '': partial_license['rights']['print'] = int(configuration.max_printable_pages) if configuration.max_copiable_pages is not None and configuration.max_copiable_pages != '': partial_license['rights']['copy'] = int(configuration.max_copiable_pages) return partial_license @staticmethod def _send_request(configuration, method, path, payload, json_encoder=None): """Sends a request to the LCP License Server :param configuration: Configuration object :type configuration: LCPServerConfiguration :param path: URL path part :type path: string :param payload: Dictionary containing request's payload (should be JSON compatible) :type payload: Union[Dict, object] :param json_encoder: JSON encoder :type json_encoder: JSONEncoder :return: Dictionary containing LCP License Server's response :rtype: Dict """ json_payload = json.dumps(payload, cls=json_encoder) url = urllib.parse.urljoin(configuration.lcpserver_url, path) response = requests.request( method, url, data=json_payload, headers={'Content-Type': 'application/json'}, auth=HTTPBasicAuth( configuration.lcpserver_user, configuration.lcpserver_password ) ) response.raise_for_status() return response
[docs] def add_content(self, db, encrypted_content): """Notifies LCP License Server about new encrypted content :param db: Database session :type db: sqlalchemy.orm.session.Session :param encrypted_content: LCPEncryptionResult object containing information about encrypted content :type encrypted_content: LCPEncryptionResult """ with self._configuration_factory.create( self._configuration_storage, db, LCPServerConfiguration) as configuration: content_location = os.path.join( configuration.lcpserver_input_directory, encrypted_content.protected_content_disposition) payload = LCPEncryptionResult( content_id=encrypted_content.content_id, content_encryption_key=encrypted_content.content_encryption_key, protected_content_location=content_location, protected_content_disposition=encrypted_content.protected_content_disposition, protected_content_type=encrypted_content.protected_content_type, protected_content_length=encrypted_content.protected_content_length, protected_content_sha256=encrypted_content.protected_content_sha256 ) path = '/contents/{0}'.format(encrypted_content.content_id) self._send_request(configuration, 'put', path, payload, LCPEncryptorResultJSONEncoder)
[docs] def generate_license(self, db, content_id, patron, license_start, license_end): """Generates a new LCP license :param db: Database session :type db: sqlalchemy.orm.session.Session :param content_id: Unique content ID :type content_id: string :param patron: Patron object :type patron: Patron :param license_start: Unique patron ID :type license_start: string :param license_start: Date and time when the license begins :type license_start: datetime.datetime :param license_end: Date and time when the license ends :type license_end: datetime.datetime :return: LCP license :rtype: Dict """ with self._configuration_factory.create( self._configuration_storage, db, LCPServerConfiguration) as configuration: partial_license_payload = self._create_partial_license( db, configuration, patron, license_start, license_end) path = 'contents/{0}/license'.format(content_id) response = self._send_request(configuration, 'post', path, partial_license_payload) return response.json()
[docs] def get_license(self, db, license_id, patron): """Returns an existing license :param db: Database session :type db: sqlalchemy.orm.session.Session :param license_id: License's ID :type license_id: int :param patron: Patron object :type patron: Patron :return: Existing license :rtype: string """ with self._configuration_factory.create( self._configuration_storage, db, LCPServerConfiguration) as configuration: partial_license_payload = self._create_partial_license(db, configuration, patron) path = 'licenses/{0}'.format(license_id) response = self._send_request(configuration, 'post', path, partial_license_payload) return response.json()