Source code for openbrokerapi.api

import logging
import warnings
from http import HTTPStatus
from json.decoder import JSONDecodeError
from typing import List, Union, Optional

from flask import Blueprint, Request
from flask import json, request

from openbrokerapi import errors, constants
from openbrokerapi.helper import to_json_response, ensure_list
from openbrokerapi.request_filter import (
    print_request,
    check_originating_identity,
    check_version,
    requires_application_json,
)
from openbrokerapi.auth import BrokerAuthenticator, BasicBrokerAuthenticator, NoneBrokerAuthenticator, BrokerCredentials
from openbrokerapi.response import (
    BindResponse,
    CatalogResponse,
    DeprovisionResponse,
    EmptyResponse,
    ErrorResponse,
    LastOperationResponse,
    ProvisioningResponse,
    UpdateResponse,
    UnbindResponse,
    GetInstanceResponse,
    GetBindingResponse,
)
from openbrokerapi.router import Router
from openbrokerapi.service_broker import (
    BindDetails,
    BindState,
    DeprovisionDetails,
    ProvisionDetails,
    ProvisionState,
    UnbindDetails,
    UpdateDetails,
    ServiceBroker,
    OperationState,
)
import openbrokerapi.settings


def _check_plan_id(broker: ServiceBroker, plan_id) -> bool:
    """
    Checks that the plan_id exists in the catalog
    :return: boolean
    """
    for service in ensure_list(broker.catalog()):
        for plan in service.plans:
            if plan.id == plan_id:
                return True
    return False


[docs]def get_blueprint( service_broker: ServiceBroker, broker_credentials: Union[None, List[BrokerCredentials], BrokerCredentials], logger: logging.Logger, *, authenticator: Optional[BrokerAuthenticator] = None, ) -> Blueprint: """ Returns the blueprint with service broker api. :param service_broker: Services that this broker exposes :param broker_credentials: Optional Usernames and passwords that will be required to communicate with service broker :param logger: Used for api logs. This will not influence Flasks logging behavior. :param authenticator: provide an authenticator to secure endpoints, broker_credentials will be ignored :return: Blueprint to register with Flask app instance """ openbroker = Blueprint("open_broker", __name__) # Apply filters logger.debug("Apply print_request filter for debugging") openbroker.before_request(print_request) if openbrokerapi.settings.DISABLE_VERSION_CHECK: logger.warning( "Minimum API version is not checked, this can cause illegal contracts between service broker and platform!", stacklevel=0, ) else: logger.debug("Apply check_version filter for version %s" % str(openbrokerapi.settings.MIN_VERSION)) openbroker.before_request(check_version) # apply filter: global load X-Broker-API-Originating-Identity logger.debug("Apply check_originating_identity filter") openbroker.before_request(check_originating_identity) # apply filter: authentication if authenticator is None and broker_credentials is not None: # setup BasicAuthenticator with broker_credentials broker_credentials = ensure_list(broker_credentials) logger.debug(f"Apply check_auth filter with {broker_credentials} credentials", stacklevel=0) # TODO: remove type: ignore authenticator = BasicBrokerAuthenticator(*broker_credentials) # type: ignore elif authenticator and broker_credentials: warnings.warn("Provided authenticator and broker_credential, only the authenticator is used", stacklevel=0) elif authenticator is None and broker_credentials is None: logger.warning("No authentication set, endpoints are not secured!", stacklevel=0) authenticator = NoneBrokerAuthenticator() logger.debug("Apply authentication filter") # TODO: remove type: ignore openbroker.before_request(authenticator) # type: ignore def extract_authorization_username(request: Request): if request.authorization is not None: return request.authorization.username else: return None @openbroker.errorhandler(Exception) def error_handler(e): logger.exception(e) return ( to_json_response(ErrorResponse(description=constants.DEFAULT_EXCEPTION_ERROR_MESSAGE)), HTTPStatus.INTERNAL_SERVER_ERROR, ) @openbroker.errorhandler(NotImplementedError) def error_handler_not_implemented(e): logger.exception(e) return ( to_json_response(ErrorResponse(description=constants.DEFAULT_NOT_IMPLEMENTED_ERROR_MESSAGE)), HTTPStatus.NOT_IMPLEMENTED, ) @openbroker.errorhandler(errors.ErrBadRequest) def error_handler_bad_request(e): logger.exception(e) return ( to_json_response(ErrorResponse(description=str(e))), HTTPStatus.BAD_REQUEST, ) @openbroker.route("/v2/catalog", methods=["GET"]) def catalog(): """ :return: Catalog of broker (List of services) """ catalog = ensure_list(service_broker.catalog()) return to_json_response(CatalogResponse(list(catalog))) @openbroker.route("/v2/service_instances/<instance_id>", methods=["PUT"]) @requires_application_json def provision(instance_id): try: accepts_incomplete = "true" == request.args.get("accepts_incomplete", "false") provision_details = ProvisionDetails(**json.loads(request.data)) provision_details.originating_identity = request.originating_identity provision_details.authorization_username = extract_authorization_username(request) if not _check_plan_id(service_broker, provision_details.plan_id): raise TypeError("plan_id not found in this service.") except (TypeError, KeyError, JSONDecodeError) as e: logger.exception(e) return ( to_json_response(ErrorResponse(description=str(e))), HTTPStatus.BAD_REQUEST, ) try: result = service_broker.provision( instance_id=instance_id, details=provision_details, async_allowed=accepts_incomplete ) if result is None: warnings.warn("Provision has to return ProvisionedServiceSpec", stacklevel=0) raise errors.ServiceException("Internal broker error") except errors.ErrInstanceAlreadyExists as e: logger.exception(e) return to_json_response(EmptyResponse()), HTTPStatus.CONFLICT except errors.ErrInvalidParameters as e: return ( to_json_response(ErrorResponse("InvalidParameters", str(e))), HTTPStatus.BAD_REQUEST, ) except errors.ErrAsyncRequired as e: logger.exception(e) return ( to_json_response( ErrorResponse( error="AsyncRequired", description="This service plan requires client support for asynchronous service operations.", ) ), HTTPStatus.UNPROCESSABLE_ENTITY, ) if result.state == ProvisionState.IS_ASYNC: return ( to_json_response(ProvisioningResponse(result.dashboard_url, result.operation)), HTTPStatus.ACCEPTED, ) elif result.state == ProvisionState.IDENTICAL_ALREADY_EXISTS: return ( to_json_response(ProvisioningResponse(result.dashboard_url, result.operation)), HTTPStatus.OK, ) elif result.state == ProvisionState.SUCCESSFUL_CREATED: return ( to_json_response(ProvisioningResponse(result.dashboard_url, result.operation)), HTTPStatus.CREATED, ) else: raise errors.ServiceException("IllegalState, ProvisioningState unknown.") @openbroker.route("/v2/service_instances/<instance_id>", methods=["PATCH"]) @requires_application_json def update(instance_id): try: accepts_incomplete = "true" == request.args.get("accepts_incomplete", "false") update_details = UpdateDetails(**json.loads(request.data)) update_details.originating_identity = request.originating_identity update_details.authorization_username = extract_authorization_username(request) plan_id = update_details.plan_id if plan_id and not _check_plan_id(service_broker, plan_id): raise TypeError("plan_id not found in this service.") except (TypeError, KeyError, JSONDecodeError) as e: logger.exception(e) return ( to_json_response(ErrorResponse(description=str(e))), HTTPStatus.BAD_REQUEST, ) try: result = service_broker.update( instance_id=instance_id, details=update_details, async_allowed=accepts_incomplete ) if result is None: warnings.warn("Update has to return UpdateServiceSpec", stacklevel=0) raise errors.ServiceException("Internal broker error") except errors.ErrInvalidParameters as e: return ( to_json_response(ErrorResponse("InvalidParameters", str(e))), HTTPStatus.BAD_REQUEST, ) except errors.ErrAsyncRequired as e: logger.exception(e) return ( to_json_response( ErrorResponse( error="AsyncRequired", description="This service plan requires client support for asynchronous service operations.", ) ), HTTPStatus.UNPROCESSABLE_ENTITY, ) except errors.ErrConcurrentInstanceAccess as e: logger.exception(e) error_response = ErrorResponse( error="ConcurrencyError", description="The Service Broker does not support concurrent requests that mutate the same resource.", ) return to_json_response(error_response), HTTPStatus.UNPROCESSABLE_ENTITY if result.is_async: return ( to_json_response(UpdateResponse(result.operation, result.dashboard_url)), HTTPStatus.ACCEPTED, ) else: return ( to_json_response(UpdateResponse(None, result.dashboard_url)), HTTPStatus.OK, ) @openbroker.route( "/v2/service_instances/<instance_id>/service_bindings/<binding_id>", methods=["PUT"], ) @requires_application_json def bind(instance_id, binding_id): try: accepts_incomplete = "true" == request.args.get("accepts_incomplete", "false") binding_details = BindDetails(**json.loads(request.data)) binding_details.originating_identity = request.originating_identity binding_details.authorization_username = extract_authorization_username(request) if not _check_plan_id(service_broker, binding_details.plan_id): raise TypeError("plan_id not found in this service.") except (TypeError, KeyError, JSONDecodeError) as e: logger.exception(e) return ( to_json_response(ErrorResponse(description=str(e))), HTTPStatus.BAD_REQUEST, ) try: result = service_broker.bind( instance_id=instance_id, binding_id=binding_id, details=binding_details, async_allowed=accepts_incomplete, ) if result is None: warnings.warn("Bind has to return a Binding", stacklevel=0) raise errors.ServiceException("Internal broker error") except errors.ErrBindingAlreadyExists as e: logger.exception(e) return to_json_response(EmptyResponse()), HTTPStatus.CONFLICT except errors.ErrAppGuidNotProvided as e: logger.exception(e) return ( to_json_response( ErrorResponse( error="RequiresApp", description="This service supports generation of credentials through binding an application only.", ) ), HTTPStatus.UNPROCESSABLE_ENTITY, ) except errors.ErrConcurrentInstanceAccess as e: logger.exception(e) error_response = ErrorResponse( error="ConcurrencyError", description="The Service Broker does not support concurrent requests that mutate the same resource.", ) return to_json_response(error_response), HTTPStatus.UNPROCESSABLE_ENTITY response = BindResponse( credentials=result.credentials, syslog_drain_url=result.syslog_drain_url, route_service_url=result.route_service_url, volume_mounts=result.volume_mounts, ) if result.state == BindState.SUCCESSFUL_BOUND: return to_json_response(response), HTTPStatus.CREATED elif result.state == BindState.IDENTICAL_ALREADY_EXISTS: return to_json_response(response), HTTPStatus.OK elif result.state == BindState.IS_ASYNC: return ( to_json_response(BindResponse(operation=result.operation)), HTTPStatus.ACCEPTED, ) else: raise errors.ServiceException("IllegalState, BindState unknown.") @openbroker.route( "/v2/service_instances/<instance_id>/service_bindings/<binding_id>", methods=["DELETE"], ) def unbind(instance_id, binding_id): try: accepts_incomplete = "true" == request.args.get("accepts_incomplete", "false") plan_id = request.args["plan_id"] service_id = request.args["service_id"] unbind_details = UnbindDetails(service_id=service_id, plan_id=plan_id) unbind_details.originating_identity = request.originating_identity unbind_details.authorization_username = extract_authorization_username(request) if not _check_plan_id(service_broker, unbind_details.plan_id): raise TypeError("plan_id not found in this service.") except (TypeError, KeyError) as e: logger.exception(e) return ( to_json_response(ErrorResponse(description=str(e))), HTTPStatus.BAD_REQUEST, ) try: result = service_broker.unbind( instance_id=instance_id, binding_id=binding_id, details=unbind_details, async_allowed=accepts_incomplete ) if result is None: warnings.warn("Unbind has to return a UnbindSpec", stacklevel=0) raise errors.ServiceException("Internal broker error") except errors.ErrBindingDoesNotExist as e: logger.exception(e) return to_json_response(EmptyResponse()), HTTPStatus.GONE except errors.ErrConcurrentInstanceAccess as e: logger.exception(e) error_response = ErrorResponse( error="ConcurrencyError", description="The Service Broker does not support concurrent requests that mutate the same resource.", ) return to_json_response(error_response), HTTPStatus.UNPROCESSABLE_ENTITY if result.is_async: return ( to_json_response(UnbindResponse(result.operation)), HTTPStatus.ACCEPTED, ) else: return to_json_response(EmptyResponse()), HTTPStatus.OK @openbroker.route("/v2/service_instances/<instance_id>", methods=["DELETE"]) def deprovision(instance_id): try: plan_id = request.args["plan_id"] service_id = request.args["service_id"] accepts_incomplete = "true" == request.args.get("accepts_incomplete", "false") deprovision_details = DeprovisionDetails(service_id=service_id, plan_id=plan_id) deprovision_details.originating_identity = request.originating_identity deprovision_details.authorization_username = extract_authorization_username(request) if not _check_plan_id(service_broker, deprovision_details.plan_id): raise TypeError("plan_id not found in this service.") except (TypeError, KeyError) as e: logger.exception(e) return ( to_json_response(ErrorResponse(description=str(e))), HTTPStatus.BAD_REQUEST, ) try: result = service_broker.deprovision( instance_id=instance_id, details=deprovision_details, async_allowed=accepts_incomplete ) if result is None: warnings.warn("Deprovision has to return a DeprovisionServiceSpec", stacklevel=0) raise errors.ServiceException("Internal broker error") except errors.ErrInstanceDoesNotExist as e: logger.exception(e) return to_json_response(EmptyResponse()), HTTPStatus.GONE except errors.ErrAsyncRequired as e: logger.exception(e) return ( to_json_response( ErrorResponse( error="AsyncRequired", description="This service plan requires client support for asynchronous service operations.", ) ), HTTPStatus.UNPROCESSABLE_ENTITY, ) except errors.ErrConcurrentInstanceAccess as e: logger.exception(e) error_response = ErrorResponse( error="ConcurrencyError", description="The Service Broker does not support concurrent requests that mutate the same resource.", ) return to_json_response(error_response), HTTPStatus.UNPROCESSABLE_ENTITY if result.is_async: return ( to_json_response(DeprovisionResponse(result.operation)), HTTPStatus.ACCEPTED, ) else: return to_json_response(EmptyResponse()), HTTPStatus.OK @openbroker.route("/v2/service_instances/<instance_id>/last_operation", methods=["GET"]) def last_operation(instance_id): service_id = request.args.get("service_id", None) plan_id = request.args.get("plan_id", None) operation_data = request.args.get("operation", None) try: result = service_broker.last_operation( instance_id=instance_id, operation_data=operation_data, service_id=service_id, plan_id=plan_id ) if result is None: warnings.warn("Last Operation has to return a LastOperation", stacklevel=0) raise errors.ServiceException("Internal broker error") return ( to_json_response(LastOperationResponse(result.state, result.description)), HTTPStatus.OK, ) except errors.ErrInstanceDoesNotExist: return ( to_json_response(LastOperationResponse(OperationState.SUCCEEDED, "")), HTTPStatus.GONE, ) @openbroker.route( "/v2/service_instances/<instance_id>/service_bindings/<binding_id>/last_operation", methods=["GET"], ) def last_binding_operation(instance_id, binding_id): service_id = request.args.get("service_id", None) plan_id = request.args.get("plan_id", None) operation_data = request.args.get("operation", None) result = service_broker.last_binding_operation( instance_id=instance_id, binding_id=binding_id, operation_data=operation_data, service_id=service_id, plan_id=plan_id, ) if result is None: warnings.warn("Last Binding Operation has to return a LastOperation", stacklevel=0) raise errors.ServiceException("Internal broker error") return ( to_json_response(LastOperationResponse(result.state, result.description)), HTTPStatus.OK, ) @openbroker.route("/v2/service_instances/<instance_id>", methods=["GET"]) def get_instance(instance_id): try: result = service_broker.get_instance(instance_id=instance_id) if result is None: warnings.warn("Get Instance has to return GetInstanceDetailsSpec", stacklevel=0) raise errors.ServiceException("Internal broker error") response = GetInstanceResponse( service_id=result.service_id, plan_id=result.plan_id, dashboard_url=result.dashboard_url, parameters=result.parameters, ) return to_json_response(response), HTTPStatus.OK except errors.ErrInstanceDoesNotExist: return to_json_response(EmptyResponse()), HTTPStatus.NOT_FOUND except errors.ErrConcurrentInstanceAccess: error_response = ErrorResponse( error="ConcurrencyError", description="The Service Broker does not support concurrent requests that mutate the same resource.", ) return to_json_response(error_response), HTTPStatus.UNPROCESSABLE_ENTITY @openbroker.route( "/v2/service_instances/<instance_id>/service_bindings/<binding_id>", methods=["GET"], ) def get_binding(instance_id, binding_id): try: result = service_broker.get_binding(instance_id=instance_id, binding_id=binding_id) if result is None: warnings.warn("Get Binding has to return a GetBindingSpec", stacklevel=0) raise errors.ServiceException("Internal broker error") response = GetBindingResponse( credentials=result.credentials, syslog_drain_url=result.syslog_drain_url, route_service_url=result.route_service_url, volume_mounts=result.volume_mounts, parameters=result.parameters, ) return to_json_response(response), HTTPStatus.OK except errors.ErrBindingDoesNotExist: return to_json_response(EmptyResponse()), HTTPStatus.NOT_FOUND return openbroker
[docs]def serve_multiple( service_brokers: List[ServiceBroker], credentials: Union[List[BrokerCredentials], BrokerCredentials, None], logger: logging.Logger = logging.root, host="0.0.0.0", port=5000, debug=False, ): router = Router(*service_brokers) serve( service_broker=router, credentials=credentials, logger=logger, host=host, port=port, debug=debug, )
[docs]def serve( service_broker: ServiceBroker, credentials: Union[List[BrokerCredentials], BrokerCredentials, None], logger: logging.Logger = logging.root, authenticator: Optional[BrokerAuthenticator] = None, host="0.0.0.0", port=5000, debug=False, ): """ Starts flask with the given brokers. You can provide a list or just one ServiceBroker :param service_broker: ServicesBroker for services to provide :param credentials: Username and password that will be required to communicate with service broker :param logger: Used for api logs. This will not influence Flasks logging behavior :param authenticator: provide an authenticator to secure endpoints, broker_credentials will be ignored :param host: Host, defaults to all interfaces (0.0.0.0) :param port: Port :param debug: Enables debugging in flask app """ from flask import Flask app = Flask(__name__) app.debug = debug blueprint = get_blueprint( service_broker=service_broker, broker_credentials=credentials, logger=logger, authenticator=authenticator ) logger.debug("Register openbrokerapi blueprint") app.register_blueprint(blueprint) try: # see: https://github.com/gevent/gevent/issues/1900 from gevent.pywsgi import WSGIServer # type: ignore logger.info(f"Start Gevent server on {host}:{port}") http_server = WSGIServer((host, port), app) http_server.serve_forever() except ImportError: logger.info(f"Start Flask on {host}:{port}") logger.warning("Use a server like gevent or gunicorn for production!") app.run(host, port)