Source code for api.lcp.encrypt

import json
import logging
import os
import re
import subprocess
from json import JSONEncoder

from flask_babel import lazy_gettext as _

from api.lcp import utils
from core.exceptions import BaseError
from core.model.configuration import ConfigurationGrouping, ConfigurationMetadata, ConfigurationAttributeType


[docs]class LCPEncryptionException(BaseError): """Raised in the case of any errors occurring during LCP encryption process"""
[docs]class LCPEncryptionConfiguration(ConfigurationGrouping): """Contains different settings required by LCPEncryptor""" DEFAULT_LCPENCRYPT_LOCATION = '/go/bin/lcpencrypt' DEFAULT_LCPENCRYPT_DOCKER_IMAGE = 'readium/lcpencrypt' lcpencrypt_location = ConfigurationMetadata( key='lcpencrypt_location', label=_('lcpencrypt\'s location'), description=_( 'Full path to the local lcpencrypt binary. ' 'The default value is {0}'.format( DEFAULT_LCPENCRYPT_LOCATION ) ), type=ConfigurationAttributeType.TEXT, required=False, default=DEFAULT_LCPENCRYPT_LOCATION ) lcpencrypt_output_directory = ConfigurationMetadata( key='lcpencrypt_output_directory', label=_('lcpencrypt\'s output directory'), description=_( 'Full path to the directory where lcpencrypt stores encrypted content. ' 'If not set encrypted books will be stored in lcpencrypt\'s working directory'), type=ConfigurationAttributeType.TEXT, required=False )
[docs]class LCPEncryptionResult(object): """Represents an output sent by lcpencrypt""" CONTENT_ID = 'content-id' CONTENT_ENCRYPTION_KEY = 'content-encryption-key' PROTECTED_CONTENT_LOCATION = 'protected-content-location' PROTECTED_CONTENT_LENGTH = 'protected-content-length' PROTECTED_CONTENT_SHA256 = 'protected-content-sha256' PROTECTED_CONTENT_DISPOSITION = 'protected-content-disposition' PROTECTED_CONTENT_TYPE = 'protected-content-type' def __init__( self, content_id, content_encryption_key, protected_content_location, protected_content_disposition, protected_content_type, protected_content_length, protected_content_sha256): """Initializes a new instance of LCPEncryptorResult class :param: content_id: Content identifier :type content_id: Optional[string] :param: content_encryption_key: Content encryption key :type content_encryption_key: Optional[string] :param: protected_content_location: Complete file path of the encrypted content :type protected_content_location: Optional[string] :param: protected_content_disposition: File name of the encrypted content :type protected_content_disposition: Optional[string] :param: protected_content_type: Media type of the encrypted content :type protected_content_type: Optional[string] :param: protected_content_length: Size of the encrypted content :type protected_content_length: Optional[string] :param: protected_content_sha256: Hash of the encrypted content :type protected_content_sha256: Optional[string] """ self._content_id = content_id self._content_encryption_key = content_encryption_key self._protected_content_location = protected_content_location self._protected_content_disposition = protected_content_disposition self._protected_content_type = protected_content_type self._protected_content_length = protected_content_length self._protected_content_sha256 = protected_content_sha256 @property def content_id(self): """Returns a content encryption key :return: Content encryption key :rtype: Optional[string] """ return self._content_id @property def content_encryption_key(self): """Returns a content identifier :return: Content identifier :rtype: Optional[string] """ return self._content_encryption_key @property def protected_content_location(self): """Returns a complete file path of the encrypted content :return: Complete file path of the encrypted content :rtype: Optional[string] """ return self._protected_content_location @property def protected_content_disposition(self): """Returns a file name of the encrypted content :return: File name of the encrypted content :rtype: Optional[string] """ return self._protected_content_disposition @property def protected_content_type(self): """Returns a media type of the encrypted content :return: Media type of the encrypted content :rtype: Optional[string] """ return self._protected_content_type @property def protected_content_length(self): """Returns a size of the encrypted content :return: Size of the encrypted content :rtype: Optional[string] """ return self._protected_content_length @property def protected_content_sha256(self): """Returns a hash of the encrypted content :return: Hash of the encrypted content :rtype: Optional[string] """ return self._protected_content_sha256
[docs] @classmethod def from_dict(cls, result_dict): """Creates an LCPEncryptorResult object from a Python dictionary :param result_dict: Python dictionary containing an lcpencrypt output :type result_dict: Dict :return: LCPEncryptorResult object :rtype: LCPEncryptionResult """ content_id = result_dict.get(cls.CONTENT_ID) content_encryption_key = result_dict.get(cls.CONTENT_ENCRYPTION_KEY) protected_content_location = result_dict.get(cls.PROTECTED_CONTENT_LOCATION) protected_content_length = result_dict.get(cls.PROTECTED_CONTENT_LENGTH) protected_content_sha256 = result_dict.get(cls.PROTECTED_CONTENT_SHA256) protected_content_disposition = result_dict.get(cls.PROTECTED_CONTENT_DISPOSITION) protected_content_type = result_dict.get(cls.PROTECTED_CONTENT_TYPE) return cls( content_id=content_id, content_encryption_key=content_encryption_key, protected_content_location=protected_content_location, protected_content_disposition=protected_content_disposition, protected_content_type=protected_content_type, protected_content_length=protected_content_length, protected_content_sha256=protected_content_sha256 )
def __eq__(self, other): """Compares two LCPEncryptorResult objects :param other: LCPEncryptorResult object :type other: LCPEncryptionResult :return: Boolean value indicating whether two items are equal :rtype: bool """ if not isinstance(other, LCPEncryptionResult): return False return \ self.content_id == other.content_id and \ self.content_encryption_key == other.content_encryption_key and \ self.protected_content_location == other.protected_content_location and \ self.protected_content_length == other.protected_content_length and \ self.protected_content_sha256 == other.protected_content_sha256 and \ self.protected_content_disposition == other.protected_content_disposition and \ self.protected_content_type == other.protected_content_type def __repr__(self): """Returns a string representation of a LCPEncryptorResult object :return: string representation of a LCPEncryptorResult object :rtype: string """ return \ '<LCPEncryptor.Result(' \ 'content_id={0}, ' \ 'content_encryption_key={1}, ' \ 'protected_content_location={2}, ' \ 'protected_content_length={3}, ' \ 'protected_content_sha256={4}, ' \ 'protected_content_disposition={5}, ' \ 'protected_content_type={6})>'.format( self.content_id, self.content_encryption_key, self.protected_content_location, self.protected_content_length, self.protected_content_sha256, self.protected_content_disposition, self.protected_content_type )
[docs]class LCPEncryptorResultJSONEncoder(JSONEncoder): """Serializes LCPEncryptorResult as a JSON object"""
[docs] def default(self, result): """Serializers a Subject object to JSON :param result: LCPEncryptorResult object :type result: LCPEncryptionResult :return: String containing JSON representation of the LCPEncryptorResult object :rtype: string """ if not isinstance(result, LCPEncryptionResult): raise ValueError('result must have type LCPEncryptorResult') result = { 'content-id': result.content_id, 'content-encryption-key': result.content_encryption_key, 'protected-content-location': result.protected_content_location, 'protected-content-length': result.protected_content_length, 'protected-content-sha256': result.protected_content_sha256, 'protected-content-disposition': result.protected_content_disposition, 'protected-content-type': result.protected_content_type } return result
[docs]class LCPEncryptor(object): """Wrapper around lcpencrypt tool containing logic to run it locally and in a Docker container"""
[docs] class Parameters(object): """Parses input parameters for lcpencrypt""" def __init__(self, file_path, identifier, configuration): """Initializes a new instance of Parameters class :param file_path: File path to the book to be encrypted :type file_path: string :param identifier: Book's identifier :type identifier: string :param configuration: LCPEncryptionConfiguration instance :type configuration: instance """ self._lcpencrypt_location = configuration.lcpencrypt_location self._input_file_path = str(file_path) self._content_id = str(identifier) output_directory = configuration.lcpencrypt_output_directory self._output_file_path = None if output_directory: _, input_extension = os.path.splitext(file_path) target_extension = utils.get_target_extension(input_extension) output_file_path = os.path.join( output_directory, identifier + target_extension if target_extension not in identifier else identifier ) self._output_file_path = output_file_path @property def lcpencrypt_location(self): """Returns location of lcpencrypt binary :return: Location of lcpencrypt binary :rtype: string """ return self._lcpencrypt_location @property def input_file_path(self): """Returns path of the input file :return: Path of the input file :rtype: string """ return self._input_file_path @property def content_id(self): """Returns content ID :return: Content ID :rtype: string """ return self._content_id @property def output_file_path(self): """Returns path of the output file :return: Path of the output file :rtype: string """ return self._output_file_path
[docs] def to_array(self): """Returns parameters in an array :return: Parameters in an array :rtype: List """ parameters = [ self._lcpencrypt_location, '-input', self._input_file_path, '-contentid', self._content_id ] if self._output_file_path: parameters.extend([ '-output', self._output_file_path ]) return parameters
OUTPUT_REGEX = re.compile(r'(\{.+\})?(.+)', re.DOTALL) def __init__(self, configuration_storage, configuration_factory): """Initializes a new instance of LCPEncryptor class :param configuration_storage: ConfigurationStorage object :type configuration_storage: ConfigurationStorage :param configuration_factory: Factory creating LCPEncryptionConfiguration instance :type configuration_factory: api.config.ConfigurationFactory """ self._logger = logging.getLogger(__name__) self._configuration_storage = configuration_storage self._configuration_factory = configuration_factory def _lcpencrypt_exists_locally(self, configuration): """Returns a Boolean value indicating whether lcpencrypt exists locally :param configuration: LCPEncryptionConfiguration instance :type configuration: instance :return: Boolean value indicating whether lcpencrypt exists locally :rtype: bool """ return os.path.isfile(configuration.lcpencrypt_location) def _parse_output(self, output): """Parses lcpencrypt's output :param output: lcpencrypt's output :type output: string :return: Encryption result :rtype: LCPEncryptionResult """ bracket_index = output.find('{') if bracket_index > 0: output = output[bracket_index:] match = self.OUTPUT_REGEX.match(output) if not match: raise LCPEncryptionException('Output has a wrong format') match_groups = match.groups() if not match_groups: raise LCPEncryptionException('Output has a wrong format') if not match_groups[0]: raise LCPEncryptionException(match_groups[1].strip()) json_output = match_groups[0] json_result = json.loads(json_output) result = LCPEncryptionResult.from_dict(json_result) if not result.protected_content_length or \ not result.protected_content_sha256 or \ not result.content_encryption_key: raise LCPEncryptionException('Encryption failed') return result def _run_lcpencrypt_locally(self, file_path, identifier, configuration): """Runs lcpencrypt using a local binary :param file_path: File path to the book to be encrypted :type file_path: string :param identifier: Book's identifier :type identifier: string :param configuration: LCPEncryptionConfiguration instance :type configuration: instance :return: Encryption result :rtype: LCPEncryptionResult """ self._logger.info( 'Started running a local lcpencrypt binary. File path: {0}. Identifier: {1}'.format( file_path, identifier ) ) parameters = LCPEncryptor.Parameters(file_path, identifier, configuration) try: if parameters.output_file_path: self._logger.info('Creating a directory tree for {0}'.format(parameters.output_file_path)) output_directory = os.path.dirname(parameters.output_file_path) if not os.path.exists(output_directory): os.makedirs(output_directory) self._logger.info('Directory tree {0} has been successfully created'.format(output_directory)) self._logger.info('Running lcpencrypt using the following parameters: {0}'.format(parameters.to_array())) output = subprocess.check_output(parameters.to_array()) result = self._parse_output(output) except Exception as exception: self._logger.exception('An unhandled exception occurred during running a local lcpencrypt binary') raise LCPEncryptionException(str(exception), inner_exception=exception) self._logger.info( 'Finished running a local lcpencrypt binary. File path: {0}. Identifier: {1}. Result: {2}'.format( file_path, identifier, result ) ) return result
[docs] def encrypt(self, db, file_path, identifier): """Encrypts a book :param db: Database session :type db: sqlalchemy.orm.session.Session :param file_path: File path to the book to be encrypted :type file_path: string :param identifier: Book's identifier :type identifier: string :return: Encryption result :rtype: LCPEncryptionResult """ with self._configuration_factory.create( self._configuration_storage, db, LCPEncryptionConfiguration) as configuration: if self._lcpencrypt_exists_locally(configuration): result = self._run_lcpencrypt_locally(file_path, identifier, configuration) return result else: raise NotImplementedError()