Source code for api.admin.controller.admin_auth_services
import flask
from flask import Response
from flask_babel import lazy_gettext as _
from core.model import (
ExternalIntegration,
get_one,
get_one_or_create,
)
from core.util.problem_detail import ProblemDetail
from . import SettingsController
from api.admin.google_oauth_admin_authentication_provider import GoogleOAuthAdminAuthenticationProvider
from api.admin.problem_details import *
[docs]class AdminAuthServicesController(SettingsController):
def __init__(self, manager):
super(AdminAuthServicesController, self).__init__(manager)
provider_apis = [GoogleOAuthAdminAuthenticationProvider]
self.protocols = self._get_integration_protocols(
provider_apis, protocol_name_attr="NAME")
[docs] def process_admin_auth_services(self):
"""Fetch, create, or update admin_auth_services
Returns:
dict: if Get request returns a dict of auth services and protocols
Response: If POST request updates or creates auth services and protocols.
"""
self.require_system_admin()
if flask.request.method == 'GET':
return self.process_get()
else:
return self.process_post()
[docs] def process_get(self):
"""Return dict of auth services and protocols available to library
Returns:
dict: auth_services and protocols available
"""
auth_services = self._get_integration_info(
ExternalIntegration.ADMIN_AUTH_GOAL, self.protocols)
return dict(
admin_auth_services=auth_services,
protocols=self.protocols,
)
[docs] def process_post(self):
"""Create new auth_service if none exists and set service and protocol
form: 'protocol'
form: 'id'
form: 'name'
Returns:
Response: ProblemDetail or string of auth_service.protocol and 200, or 201 if newly created service
"""
protocol = flask.request.form.get("protocol")
id = flask.request.form.get("id")
auth_service = ExternalIntegration.admin_authentication(self._db)
fields = {"protocol": protocol, "id": id, "auth_service": auth_service}
error = self.validate_form_fields(**fields)
if error:
return error
is_new = False
if not auth_service:
if protocol:
auth_service, is_new = get_one_or_create(
self._db, ExternalIntegration, protocol=protocol,
goal=ExternalIntegration.ADMIN_AUTH_GOAL
)
else:
return NO_PROTOCOL_FOR_NEW_SERVICE
name = flask.request.form.get("name")
auth_service.name = name
[protocol] = [p for p in self.protocols if p.get("name") == protocol]
result = self._set_integration_settings_and_libraries(
auth_service, protocol)
if isinstance(result, ProblemDetail):
return result
if is_new:
return Response(str(auth_service.protocol), 201)
else:
return Response(str(auth_service.protocol), 200)
[docs] def process_delete(self, protocol):
"""Delete an auth service from the database
Args:
protocol (string): Name of protocol to search for the service to be deleted.
Returns:
Response: ('Deleted', 200)
"""
self.require_system_admin()
service = get_one(self._db, ExternalIntegration,
protocol=protocol, goal=ExternalIntegration.ADMIN_AUTH_GOAL)
if not service:
return MISSING_SERVICE
self._db.delete(service)
return Response(str(_("Deleted")), 200)