Source code for openbrokerapi.service_broker

from enum import Enum
from typing import List, Optional, Union

from openbrokerapi.catalog import (
    ServiceDashboardClient,
    ServiceMetadata,
    ServicePlan,
)
import openbrokerapi.settings


[docs]class ProvisionDetails: def __init__( self, service_id: str, plan_id: str, organization_guid: Optional[str] = None, space_guid: Optional[str] = None, parameters: Optional[dict] = None, context: Optional[dict] = None, **kwargs, ): self.service_id = service_id self.plan_id = plan_id self.organization_guid = organization_guid self.space_guid = space_guid self.parameters = parameters self.context = context # Usage context information if isinstance(context, dict) and "organization_guid" in context: if organization_guid is not None and context["organization_guid"] != organization_guid: raise TypeError("organization_guid does not match with context.organization_guid") self.organization_guid = context["organization_guid"] if isinstance(context, dict) and "space_guid" in context: if space_guid is not None and context["space_guid"] != space_guid: raise TypeError("space_guid does not match with context.space_guid") self.space_guid = context["space_guid"] if openbrokerapi.settings.DISABLE_SPACE_ORG_GUID_CHECK: pass elif None in (self.organization_guid, self.space_guid): raise TypeError("Organization and space guid are required.") # HTTP contextual data self.authorization_username = None #: username of HTTP Basic Auth self.originating_identity = None #: decoded X-Broker-Originating-Identity HTTP Header
[docs]class ProvisionState(Enum): IS_ASYNC = "is_async" SUCCESSFUL_CREATED = "successfully created" IDENTICAL_ALREADY_EXISTS = "exists with identical config"
[docs]class ProvisionedServiceSpec: def __init__( self, state: ProvisionState = ProvisionState.SUCCESSFUL_CREATED, dashboard_url: Optional[str] = None, operation: Optional[str] = None, ): self.state = state self.dashboard_url = dashboard_url self.operation = operation @property def is_async(self): return self.state == ProvisionState.IS_ASYNC
[docs]class GetInstanceDetailsSpec: def __init__( self, service_id: str, plan_id: str, dashboard_url: Optional[str] = None, parameters: Optional[dict] = None, ): self.service_id = service_id self.plan_id = plan_id self.dashboard_url = dashboard_url self.parameters = parameters
[docs]class DeprovisionDetails: def __init__(self, service_id: str, plan_id: str): self.service_id = service_id self.plan_id = plan_id # HTTP contextual data self.authorization_username = None #: username of HTTP Basic Auth self.originating_identity = None #: decoded X-Broker-Originating-Identity HTTP Header
[docs]class DeprovisionServiceSpec: def __init__(self, is_async: bool, operation: Optional[str] = None): self.is_async = is_async self.operation = operation
[docs]class PreviousValues: def __init__( self, plan_id: Optional[str] = None, service_id: Optional[str] = None, organization_id: Optional[str] = None, space_id: Optional[str] = None, **kwargs, ): self.plan_id = plan_id self.service_id = service_id self.organization_id = organization_id self.space_id = space_id
[docs]class UpdateDetails: def __init__( self, service_id: str, plan_id: Optional[str] = None, parameters: Optional[dict] = None, previous_values: Optional[dict] = None, context: Optional[dict] = None, **kwargs, ): self.service_id = service_id self.plan_id = plan_id self.parameters = parameters self.previous_values = PreviousValues(**previous_values) if previous_values else None self.context = context # HTTP contextual data self.authorization_username = None #: username of HTTP Basic Auth self.originating_identity = None #: decoded X-Broker-Originating-Identity HTTP Header
[docs]class UpdateServiceSpec: def __init__( self, is_async: bool, operation: Optional[str] = None, dashboard_url: Optional[str] = None, ): self.is_async = is_async self.operation = operation self.dashboard_url = dashboard_url
[docs]class BindResource: def __init__(self, app_guid: Optional[str] = None, route: Optional[str] = None, **kwargs): self.app_guid = app_guid self.route = route
[docs]class BindDetails: def __init__( self, service_id: str, plan_id: str, app_guid: Optional[str] = None, bind_resource: Optional[dict] = None, parameters: Optional[dict] = None, context: Optional[dict] = None, **kwargs, ): self.app_guid = app_guid self.plan_id = plan_id self.service_id = service_id self.parameters = parameters self.bind_resource = BindResource(**bind_resource) if bind_resource else None self.context = context # HTTP contextual data self.authorization_username = None #: username of HTTP Basic Auth self.originating_identity = None #: decoded X-Broker-Originating-Identity HTTP Header
[docs]class SharedDevice: def __init__(self, volume_id: str, mount_config: Optional[dict] = None): self.volume_id = volume_id self.mount_config = mount_config
[docs]class VolumeMount: def __init__( self, driver: str, container_dir: str, mode: str, device_type: str, device: SharedDevice, ): self.driver = driver self.container_dir = container_dir self.mode = mode self.device_type = device_type self.device = device
[docs]class BindState(Enum): IS_ASYNC = "is_async" SUCCESSFUL_BOUND = "successfully created" IDENTICAL_ALREADY_EXISTS = "exists with identical config"
[docs]class Binding: def __init__( self, state=BindState.SUCCESSFUL_BOUND, credentials: Optional[dict] = None, syslog_drain_url: Optional[str] = None, route_service_url: Optional[str] = None, volume_mounts: Optional[List[VolumeMount]] = None, operation: Optional[str] = None, ): self.state = state self.credentials = credentials self.syslog_drain_url = syslog_drain_url self.route_service_url = route_service_url self.volume_mounts = volume_mounts self.operation = operation
[docs]class GetBindingSpec: def __init__( self, credentials: Optional[dict] = None, syslog_drain_url: Optional[str] = None, route_service_url: Optional[str] = None, volume_mounts: Optional[List[VolumeMount]] = None, parameters: Optional[dict] = None, **kwargs, ): self.credentials = credentials self.syslog_drain_url = syslog_drain_url self.route_service_url = route_service_url self.volume_mounts = volume_mounts self.parameters = parameters
[docs]class UnbindDetails: def __init__(self, service_id: str, plan_id: str): self.plan_id = plan_id self.service_id = service_id # HTTP contextual data self.authorization_username = None #: username of HTTP Basic Auth self.originating_identity = None #: decoded X-Broker-Originating-Identity HTTP Header
[docs]class UnbindSpec: def __init__(self, is_async: bool, operation: Optional[str] = None): self.is_async = is_async self.operation = operation
[docs]class OperationState(Enum): IN_PROGRESS = "in progress" SUCCEEDED = "succeeded" FAILED = "failed"
[docs]class LastOperation: def __init__(self, state: OperationState, description: Optional[str] = None): self.state = state self.description = description
[docs]class Service: def __init__( self, id: str, name: str, description: str, bindable: bool, plans: List[ServicePlan], tags: Optional[List[str]] = None, requires: Optional[List[str]] = None, metadata: Optional[ServiceMetadata] = None, dashboard_client: Optional[ServiceDashboardClient] = None, plan_updateable: bool = False, instances_retrievable: bool = False, bindings_retrievable: bool = False, **kwargs, ): """ :param requires: syslog_drain, route_forwarding or volume_mount """ self.id = id self.name = name self.description = description self.bindable = bindable self.plans = plans self.tags = tags self.requires = requires self.metadata = metadata self.dashboard_client = dashboard_client self.plan_updateable = plan_updateable self.instances_retrievable = instances_retrievable self.bindings_retrievable = bindings_retrievable self.__dict__.update(kwargs)
[docs]class ServiceBroker: """ Provides a service. This covers catalog, provision, update, bind, unbind, deprovision and last operation. """
[docs] def catalog(self) -> Union[Service, List[Service]]: """ Returns the services information which is provided by this broker. :return: Service or list of services """ raise NotImplementedError()
[docs] def provision( self, instance_id: str, details: ProvisionDetails, async_allowed: bool, **kwargs ) -> ProvisionedServiceSpec: """ Further readings `CF Broker API#Provisioning <https://docs.cloudfoundry.org/services/api.html#provisioning>`_ :param instance_id: Instance id provided by the platform :param details: Details about the service to create :param async_allowed: Client allows async creation :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: ProvisionedServiceSpec :raises ErrInstanceAlreadyExists: If instance already exists :raises ErrAsyncRequired: If async is required but not supported """ raise NotImplementedError()
[docs] def update(self, instance_id: str, details: UpdateDetails, async_allowed: bool, **kwargs) -> UpdateServiceSpec: """ Further readings `CF Broker API#Update <https://docs.cloudfoundry.org/services/api.html#updating_service_instance>`_ :param instance_id: Instance id provided by the platform :param details: Details about the service to update :param async_allowed: Client allows async creation :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: UpdateServiceSpec :raises ErrAsyncRequired: If async is required but not supported """ raise NotImplementedError()
[docs] def deprovision( self, instance_id: str, details: DeprovisionDetails, async_allowed: bool, **kwargs ) -> DeprovisionServiceSpec: """ Further readings `CF Broker API#Deprovisioning <https://docs.cloudfoundry.org/services/api.html#deprovisioning>`_ :param instance_id: Instance id provided by the platform :param details: Details about the service to delete :param async_allowed: Client allows async creation :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: DeprovisionServiceSpec :raises ErrInstanceDoesNotExist: If instance does not exists :raises ErrAsyncRequired: If async is required but not supported """ raise NotImplementedError()
[docs] def bind(self, instance_id: str, binding_id: str, details: BindDetails, async_allowed: bool, **kwargs) -> Binding: """ Further readings `CF Broker API#Binding <https://docs.cloudfoundry.org/services/api.html#binding>`_ :param instance_id: Instance id provided by the platform :param binding_id: Binding id provided by the platform :param details: Details about the binding to create :param async_allowed: Client allows async binding :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: Binding :raises ErrBindingAlreadyExists: If binding already exists :raises ErrAppGuidNotProvided: If AppGuid is required but not provided """ raise NotImplementedError()
[docs] def unbind( self, instance_id: str, binding_id: str, details: UnbindDetails, async_allowed: bool, **kwargs ) -> UnbindSpec: """ Further readings `CF Broker API#Unbinding <https://docs.cloudfoundry.org/services/api.html#unbinding>`_ :param instance_id: Instance id provided by the platform :param binding_id: Binding id provided by the platform :param details: Details about the binding to delete :param async_allowed: Client allows async unbind :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: UnbindDetails :raises ErrBindingAlreadyExists: If binding already exists """ raise NotImplementedError()
[docs] def get_instance(self, instance_id: str, **kwargs) -> GetInstanceDetailsSpec: """ Further readings `CF Broker API#FetchServiceInstance <https://github.com/openservicebrokerapi/servicebroker/blob/v2.14/spec.md#fetching-a-service-instance>`_ Must be implemented if `"instances_retrievable" :true` is declared for a service in `catalog`. :param instance_id: Instance id provided by the platform :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: GetInstanceDetailsSpec :raises ErrInstanceDoesNotExist: If instance does not exists :raises ErrConcurrentInstanceAccess: If instance is being updated """ raise NotImplementedError()
[docs] def get_binding(self, instance_id: str, binding_id: str, **kwargs) -> GetBindingSpec: """ Further readings `CF Broker API#FetchServiceBinding <https://github.com/openservicebrokerapi/servicebroker/blob/v2.14/spec.md#fetching-a-service-binding>`_ Must be implemented if `"bindings_retrievable" :true` is declared for a service in `catalog`. :param instance_id: Instance id provided by the platform :param binding_id: Instance id provided by the platform :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: GetBindingSpec :raises ErrBindingDoesNotExist: If binding does not exist """ raise NotImplementedError()
[docs] def last_operation( self, instance_id: str, operation_data: Optional[str], service_id: Optional[str], plan_id: Optional[str], **kwargs, ) -> LastOperation: """ Further readings `CF Broker API#LastOperation <https://docs.cloudfoundry.org/services/api.html#polling>`_ :param instance_id: Instance id provided by the platform :param operation_data: Operation data received from async operation :param service_id: service identifier :param plan_id: plan identifier :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: LastOperation """ raise NotImplementedError()
[docs] def last_binding_operation( self, instance_id: str, binding_id: str, operation_data: Optional[str], service_id: Optional[str], plan_id: Optional[str], **kwargs, ) -> LastOperation: """ Further readings `CF Broker API#LastOperationForBindings <https://github.com/openservicebrokerapi/servicebroker/blob/v2.14/spec.md#polling-last-operation-for-service-bindings>`_ Must be implemented if `Provision`, `Update`, or `Deprovision` are async. :param instance_id: Instance id provided by the platform :param binding_id: Binding id provided by the platform :param operation_data: Operation data received from async operation :param service_id: service identifier :param plan_id: plan identifier :param kwargs: May contain additional information, improves compatibility with upstream versions :rtype: LastOperation """ raise NotImplementedError()