Source code for core.model.integrationclient

# encoding: utf-8
# IntegrationClient

import os
import re
from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    Integer,
    Unicode,
)
from sqlalchemy.orm import (
    relationship,
)

from . import (
    Base,
    get_one,
    get_one_or_create,
)
from ..util.string_helpers import random_string
from ..util.datetime_helpers import utc_now

[docs]class IntegrationClient(Base): """A client that has authenticated access to this application. Currently used to represent circulation managers that have access to the metadata wrangler. """ __tablename__ = 'integrationclients' id = Column(Integer, primary_key=True) # URL (or human readable name) to represent the server. url = Column(Unicode, unique=True) # Shared secret shared_secret = Column(Unicode, unique=True, index=True) # It may be necessary to disable an integration client until it # upgrades to fix a known bug. enabled = Column(Boolean, default=True) created = Column(DateTime(timezone=True)) last_accessed = Column(DateTime(timezone=True)) loans = relationship('Loan', backref='integration_client') holds = relationship('Hold', backref='integration_client') def __repr__(self): return "<IntegrationClient: URL=%s ID=%s>" % (self.url, self.id)
[docs] @classmethod def for_url(cls, _db, url): """Finds the IntegrationClient for the given server URL. :return: an IntegrationClient. If it didn't already exist, it will be created. If it didn't already have a secret, no secret will be set. """ url = cls.normalize_url(url) now = utc_now() client, is_new = get_one_or_create( _db, cls, url=url, create_method_kwargs=dict(created=now) ) client.last_accessed = now return client, is_new
[docs] @classmethod def register(cls, _db, url, submitted_secret=None): """Creates a new server with client details.""" client, is_new = cls.for_url(_db, url) if not is_new and (not submitted_secret or submitted_secret != client.shared_secret): raise ValueError('Cannot update existing IntegratedClient without valid shared_secret') generate_secret = (client.shared_secret is None) or submitted_secret if generate_secret: client.randomize_secret() return client, is_new
[docs] @classmethod def normalize_url(cls, url): url = re.sub(r'^(http://|https://)', '', url) url = re.sub(r'^www\.', '', url) if url.endswith('/'): url = url[:-1] return str(url.lower())
[docs] @classmethod def authenticate(cls, _db, shared_secret): client = get_one(_db, cls, shared_secret=str(shared_secret)) if client: client.last_accessed = utc_now() # Committing immediately reduces the risk of contention. _db.commit() return client return None
[docs] def randomize_secret(self): self.shared_secret = random_string(24)