Source code for api.admin.controller.storage_services

import flask
from flask import Response

from api.admin.problem_details import *
from core.mirror import MirrorUploader
from core.model import (
    ExternalIntegration,
    get_one
)
from core.util.problem_detail import ProblemDetail
from . import SettingsController

# NOTE: We need to import it explicitly to initialize MirrorUploader.IMPLEMENTATION_REGISTRY
from api.lcp import mirror


[docs]class StorageServicesController(SettingsController): def __init__(self, manager): super(StorageServicesController, self).__init__(manager) self.goal = ExternalIntegration.STORAGE_GOAL self.protocols = self._get_integration_protocols( list(MirrorUploader.IMPLEMENTATION_REGISTRY.values()), protocol_name_attr="NAME" )
[docs] def process_services(self): if flask.request.method == 'GET': return self.process_get() else: return self.process_post()
[docs] def process_get(self): services = self._get_integration_info(self.goal, self.protocols) return dict( storage_services=services, protocols=self.protocols )
[docs] def process_post(self): protocol = flask.request.form.get("protocol") name = flask.request.form.get("name") is_new = False protocol_error = self.validate_protocol() if protocol_error: return protocol_error id = flask.request.form.get("id") if id: # Find an existing service to edit storage_service = get_one(self._db, ExternalIntegration, id=id, goal=self.goal) if not storage_service: return MISSING_SERVICE if protocol != storage_service.protocol: return CANNOT_CHANGE_PROTOCOL else: # Create a new service storage_service, is_new = self._create_integration( self.protocols, protocol, self.goal ) if isinstance(storage_service, ProblemDetail): self._db.rollback() return storage_service protocol_error = self.set_protocols(storage_service, protocol, self.protocols) if protocol_error: self._db.rollback() return protocol_error storage_service.name = name if is_new: return Response(str(storage_service.id), 201) else: return Response(str(storage_service.id), 200)
[docs] def process_delete(self, service_id): return self._delete_integration( service_id, ExternalIntegration.STORAGE_GOAL )