Skip to content

Middleware

aas_persistence_middleware

AasMiddleware (Middleware)

Middleware that automatically that has aas and submodel repositories as persistence providers and consumers.

Source code in aas_middleware\middleware\aas_persistence_middleware.py
class AasMiddleware(Middleware):
    """
    Middleware that automatically that has aas and submodel repositories as persistence providers and consumers.
    """

    def __init__(self):
        super().__init__()

    def load_aas_persistent_data_model(self, name: str, data_model: DataModel, aas_host: str, aas_port: int, submodel_host: str, submodel_port: int, persist_instances: bool = False):
        """
        Function to load a data model into the middleware to be used for synchronization.

        Args:
            name (str): The name of the data model.
            data_model (DataModel): Data model containing the types and values.
            aas_host (str): The host of the AAS server.
            aas_port (int): The port of the AAS server.
            submodel_host (str): The host of the submodel server.
            submodel_port (int): The port of the submodel server.
            persist_instances (bool, optional): Whether to persist instances of the data model. Defaults to False.
        """
        # aas_data_model = DataModelRebuilder(data_model).rebuild_data_model_for_AAS_structure()
        aas_data_model = data_model
        self.load_data_model(name, aas_data_model, persist_instances)

        aas_persistence_factory = PersistenceFactory(BasyxAASConnector, host=aas_host, port=aas_port, submodel_host=submodel_host, submodel_port=submodel_port)
        submodel_persistence_factory = PersistenceFactory(BasyxSubmodelConnector, host=submodel_host, port=submodel_port)
        self.add_default_persistence(aas_persistence_factory, name, None, AAS)
        self.add_default_persistence(submodel_persistence_factory, name, None, Submodel)

    def scan_aas_server(self):
        """
        Function to scan the AAS server for all available AAS and Submodels.
        """
        # TODO: implement function
        pass

load_aas_persistent_data_model(self, name, data_model, aas_host, aas_port, submodel_host, submodel_port, persist_instances=False)

Function to load a data model into the middleware to be used for synchronization.

Parameters:

Name Type Description Default
name str

The name of the data model.

required
data_model DataModel

Data model containing the types and values.

required
aas_host str

The host of the AAS server.

required
aas_port int

The port of the AAS server.

required
submodel_host str

The host of the submodel server.

required
submodel_port int

The port of the submodel server.

required
persist_instances bool

Whether to persist instances of the data model. Defaults to False.

False
Source code in aas_middleware\middleware\aas_persistence_middleware.py
def load_aas_persistent_data_model(self, name: str, data_model: DataModel, aas_host: str, aas_port: int, submodel_host: str, submodel_port: int, persist_instances: bool = False):
    """
    Function to load a data model into the middleware to be used for synchronization.

    Args:
        name (str): The name of the data model.
        data_model (DataModel): Data model containing the types and values.
        aas_host (str): The host of the AAS server.
        aas_port (int): The port of the AAS server.
        submodel_host (str): The host of the submodel server.
        submodel_port (int): The port of the submodel server.
        persist_instances (bool, optional): Whether to persist instances of the data model. Defaults to False.
    """
    # aas_data_model = DataModelRebuilder(data_model).rebuild_data_model_for_AAS_structure()
    aas_data_model = data_model
    self.load_data_model(name, aas_data_model, persist_instances)

    aas_persistence_factory = PersistenceFactory(BasyxAASConnector, host=aas_host, port=aas_port, submodel_host=submodel_host, submodel_port=submodel_port)
    submodel_persistence_factory = PersistenceFactory(BasyxSubmodelConnector, host=submodel_host, port=submodel_port)
    self.add_default_persistence(aas_persistence_factory, name, None, AAS)
    self.add_default_persistence(submodel_persistence_factory, name, None, Submodel)

scan_aas_server(self)

Function to scan the AAS server for all available AAS and Submodels.

Source code in aas_middleware\middleware\aas_persistence_middleware.py
def scan_aas_server(self):
    """
    Function to scan the AAS server for all available AAS and Submodels.
    """
    # TODO: implement function
    pass

connector_router

generate_connector_endpoint(connector_id, connector, model_type)

Generates endpoints for a workflow to execute the workflow.

Parameters:

Name Type Description Default
workflow Workflow

Workflow that contains the function to be executed by the workflow.

required

Returns:

Type Description
APIRouter

FastAPI router with an endpoint to execute the workflow.

Source code in aas_middleware\middleware\connector_router.py
def generate_connector_endpoint(connector_id: str, connector: Union[Consumer, Provider, Connector], model_type: Type[Any]) -> List[APIRouter]:
    """
    Generates endpoints for a workflow to execute the workflow.

    Args:
        workflow (Workflow): Workflow that contains the function to be executed by the workflow.

    Returns:
        APIRouter: FastAPI router with an endpoint to execute the workflow.
    """
    router = APIRouter(
        prefix=f"/connectors/{connector_id}",
        tags=["connectors"],
        responses={404: {"description": "Not found"}},
    )

    @router.get("/description", response_model=ConnectorDescription)
    async def describe_connector():
        return ConnectorDescription(
            connector_id=connector_id,
            connector_type=type(connector).__name__,
            persistence_connection=None,
            model_type=model_type.__name__
        )

    if isinstance(connector, Consumer):
        @router.post("/value", response_model=Dict[str, str])
        async def set_value(value: model_type):
            try:
                await connector.consume(value)
            except ConnectionError as e:
                raise HTTPException(status_code=500, detail=str(e))
            return {"message": f"Set value for {connector_id}"}

    if isinstance(connector, Provider):
        @router.get("/value", response_model=model_type)
        async def get_value():
            try:
                return await connector.provide()
            except ConnectionError as e:
                raise HTTPException(status_code=500, detail=str(e))

    return router

generate_persistence_connector_endpoint(connector_id, connector, connection_info, model_type)

Generates endpoints for a workflow to execute the workflow.

Parameters:

Name Type Description Default
workflow Workflow

Workflow that contains the function to be executed by the workflow.

required

Returns:

Type Description
APIRouter

FastAPI router with an endpoint to execute the workflow.

Source code in aas_middleware\middleware\connector_router.py
def generate_persistence_connector_endpoint(connector_id: str, connector: Union[Consumer, Provider, Connector], connection_info: ConnectionInfo, model_type: Type[Any]) -> List[APIRouter]:
    """
    Generates endpoints for a workflow to execute the workflow.

    Args:
        workflow (Workflow): Workflow that contains the function to be executed by the workflow.

    Returns:
        APIRouter: FastAPI router with an endpoint to execute the workflow.
    """
    router = APIRouter(
        prefix=f"/connectors/{connector_id}",
        tags=["connectors"],
        responses={404: {"description": "Not found"}},
    )

    @router.get("/description", response_model=ConnectorDescription)
    async def describe_connector():
        return ConnectorDescription(
            connector_id=connector_id,
            connector_type=type(connector).__name__,
            persistence_connection=connection_info,
            model_type=model_type.__name__
        )

    if isinstance(connector, Consumer):
        @router.post("/value", response_model=Dict[str, str])
        async def set_value(value: Optional[model_type]=None):
            try:
                await connector.consume(value)
            except ConnectionError as e:
                raise HTTPException(status_code=500, detail=str(e))
            if not value:
                return {"message": f"Set for {connector_id} persistence value."}
            return {"message": f"Set for {connector_id} value {value}"}

    if isinstance(connector, Provider):
        @router.get("/value", response_model=model_type)
        async def get_value():
            try:
                return await connector.provide()
            except ConnectionError as e:
                raise HTTPException(status_code=500, detail=str(e))

    return router

graphql_routers

GraphQLRouter

Source code in aas_middleware\middleware\graphql_routers.py
class GraphQLRouter:
    def  __init__(self, data_model: DataModel, data_model_name: str, middleware: "Middleware"):
        self.data_model = data_model
        self.data_model_name = data_model_name

        self.middleware = middleware
        self.query, self.mutation = get_base_query_and_mutation_classes()

    def get_connector(self, item_id: str) -> Connector:
        return self.middleware.persistence_registry.get_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))


    def generate_graphql_endpoint(self):
        """
        Generates a GraphQL endpoint for the given data model and adds it to the middleware.
        """
        for top_level_model_type in self.data_model.get_top_level_types():
            self.create_query_for_model(top_level_model_type)
            # TODO: also make mutation possible
            # self.create_mutation_for_model(top_level_model_type)
        schema = graphene.Schema(query=self.query)
        graphql_app = GraphQLApp(schema=schema, on_get=make_graphiql_handler())
        self.middleware.app.mount("/graphql", graphql_app)

    def create_query_for_model(self, model_type: type):
        model_name = model_type.__name__

        submodels = get_contained_models_attribute_info(model_type)
        graphene_submodels = []
        for attribute_name, submodel in submodels:
            graphene_submodels.append(
                create_graphe_pydantic_output_type_for_submodel_elements(submodel)
            )

        for (attribute_name, submodel), graphene_submodel in zip(submodels, graphene_submodels):
            # FIXME: resolve problems with optional submodels!
            submodel_name = submodel.__name__
            class_dict = {
                f"{submodel_name}": graphene.List(graphene_submodel),
                f"resolve_{submodel_name}": self.get_submodel_resolve_function(submodel),
            }
            self.query = type("Query", (self.query,), class_dict)


        graphene_model = create_graphe_pydantic_output_type_for_model(model_type)

        class_dict = {
            f"{model_name}": graphene.List(graphene_model),
            f"resolve_{model_name}": self.get_aas_resolve_function(model_type),
        }
        self.query = type("Query", (self.query,), class_dict)

    def get_aas_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
        """
        Returns the resolve function for the given pydantic model.

        Args:
            model (Type[BaseModel]): Pydantic model for which the resolve function should be created.

        Returns:
            typing.Callable: Resolve function for the given pydantic model.
        """
        middleware_instance = self.middleware
        async def resolve_models(self, info):
            aas_list = []
            connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
            for connection_info in connection_infos:
                connector = middleware_instance.persistence_registry.get_connection(connection_info)
                retrieved_aas: AAS = await connector.provide()
                aas = model.model_validate(retrieved_aas.model_dump())
                aas_list.append(aas)
            return aas_list

        resolve_models.__name__ = f"resolve_{model.__name__}"
        return resolve_models


    def get_submodel_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
        """
        Returns the resolve function for the given pydantic model.

        Args:
            model (Type[BaseModel]): Pydantic model for which the resolve function should be created.

        Returns:
            typing.Callable: Resolve function for the given pydantic model.
        """
        middleware_instance = self.middleware

        async def resolve_models(self, info):
            submodel_list = []
            # TODO: get the correct connectors here... no submodel connectors are available
            connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
            for connection_info in connection_infos:
                connector = middleware_instance.persistence_registry.get_connection(connection_info)
                retrieved_submodel: Submodel = await connector.provide()
                submodel = model.model_validate(retrieved_submodel.model_dump())
                submodel_list.append(submodel)
            return submodel_list

        resolve_models.__name__ = f"resolve_{model.__name__}"
        return resolve_models

generate_graphql_endpoint(self)

Generates a GraphQL endpoint for the given data model and adds it to the middleware.

Source code in aas_middleware\middleware\graphql_routers.py
def generate_graphql_endpoint(self):
    """
    Generates a GraphQL endpoint for the given data model and adds it to the middleware.
    """
    for top_level_model_type in self.data_model.get_top_level_types():
        self.create_query_for_model(top_level_model_type)
        # TODO: also make mutation possible
        # self.create_mutation_for_model(top_level_model_type)
    schema = graphene.Schema(query=self.query)
    graphql_app = GraphQLApp(schema=schema, on_get=make_graphiql_handler())
    self.middleware.app.mount("/graphql", graphql_app)

get_aas_resolve_function(self, model)

Returns the resolve function for the given pydantic model.

Parameters:

Name Type Description Default
model Type[BaseModel]

Pydantic model for which the resolve function should be created.

required

Returns:

Type Description
Callable

Resolve function for the given pydantic model.

Source code in aas_middleware\middleware\graphql_routers.py
def get_aas_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
    """
    Returns the resolve function for the given pydantic model.

    Args:
        model (Type[BaseModel]): Pydantic model for which the resolve function should be created.

    Returns:
        typing.Callable: Resolve function for the given pydantic model.
    """
    middleware_instance = self.middleware
    async def resolve_models(self, info):
        aas_list = []
        connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
        for connection_info in connection_infos:
            connector = middleware_instance.persistence_registry.get_connection(connection_info)
            retrieved_aas: AAS = await connector.provide()
            aas = model.model_validate(retrieved_aas.model_dump())
            aas_list.append(aas)
        return aas_list

    resolve_models.__name__ = f"resolve_{model.__name__}"
    return resolve_models

get_submodel_resolve_function(self, model)

Returns the resolve function for the given pydantic model.

Parameters:

Name Type Description Default
model Type[BaseModel]

Pydantic model for which the resolve function should be created.

required

Returns:

Type Description
Callable

Resolve function for the given pydantic model.

Source code in aas_middleware\middleware\graphql_routers.py
def get_submodel_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
    """
    Returns the resolve function for the given pydantic model.

    Args:
        model (Type[BaseModel]): Pydantic model for which the resolve function should be created.

    Returns:
        typing.Callable: Resolve function for the given pydantic model.
    """
    middleware_instance = self.middleware

    async def resolve_models(self, info):
        submodel_list = []
        # TODO: get the correct connectors here... no submodel connectors are available
        connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
        for connection_info in connection_infos:
            connector = middleware_instance.persistence_registry.get_connection(connection_info)
            retrieved_submodel: Submodel = await connector.provide()
            submodel = model.model_validate(retrieved_submodel.model_dump())
            submodel_list.append(submodel)
        return submodel_list

    resolve_models.__name__ = f"resolve_{model.__name__}"
    return resolve_models

create_graphe_pydantic_output_type_for_model(input_model, union_type=False)

Creates a pydantic model for the given pydantic model.

Parameters:

Name Type Description Default
model Type[BaseModel]

Pydantic model for which the Graphene Object Type should be created.

required

Returns:

Type Description
PydanticObjectType

Graphene Object type for the given pydantic model.

Source code in aas_middleware\middleware\graphql_routers.py
def create_graphe_pydantic_output_type_for_model(
    input_model: typing.Type[BaseModel], union_type: bool = False
) -> PydanticObjectType:
    """
    Creates a pydantic model for the given pydantic model.

    Args:
        model (Type[BaseModel]): Pydantic model for which the Graphene Object Type should be created.

    Returns:
        PydanticObjectType: Graphene Object type for the given pydantic model.
    """
    graphene_model_registry = get_global_registry(PydanticObjectType)._registry
    for model in graphene_model_registry.keys():
        if input_model == model.__name__:
            return graphene_model_registry[model]

    rework_default_list_to_default_factory(input_model)
    graphene_model = type(
        input_model.__name__,
        (PydanticObjectType,),
        {"Meta": type("Meta", (), {"model": input_model})},
    )
    if union_type:
        add_class_method(graphene_model)

    return graphene_model

create_graphe_pydantic_output_type_for_submodel_elements(model, union_type=False)

Create recursively graphene pydantic output types for submodels and submodel elements.

Parameters:

Name Type Description Default
model Union[base.Submodel, base.SubmodelElementCollectiontuple, list, set, ]

Submodel element for which the graphene pydantic output types should be created.

required
Source code in aas_middleware\middleware\graphql_routers.py
def create_graphe_pydantic_output_type_for_submodel_elements(
    model: Submodel, union_type: bool = False
) -> PydanticObjectType:
    """
    Create recursively graphene pydantic output types for submodels and submodel elements.

    Args:
        model (typing.Union[base.Submodel, base.SubmodelElementCollectiontuple, list, set, ]): Submodel element for which the graphene pydantic output types should be created.
    """
    for attribute_name, attribute_value in get_all_submodel_elements_from_submodel(
        model
    ).items():
        if union_type_check(attribute_value):
            subtypes = typing.get_args(attribute_value)
            for subtype in subtypes:
                create_graphe_pydantic_output_type_for_submodel_elements(
                    subtype, union_type=True
                )
        elif hasattr(attribute_value, "model_fields") and issubclass(
            attribute_value, SubmodelElementCollection
        ):
            create_graphe_pydantic_output_type_for_submodel_elements(attribute_value)
        elif is_typing_list_or_tuple(attribute_value):
            if not list_contains_any_submodel_element_collections(attribute_value):
                continue
            for nested_type in typing.get_args(attribute_value):
                if union_type_check(nested_type):
                    subtypes = typing.get_args(nested_type)
                    for subtype in subtypes:
                        create_graphe_pydantic_output_type_for_submodel_elements(
                            subtype, union_type=True
                        )
                elif issubclass(nested_type, SubmodelElementCollection):
                    create_graphe_pydantic_output_type_for_submodel_elements(
                        nested_type
                    )
    return create_graphe_pydantic_output_type_for_model(model, union_type)

get_base_query_and_mutation_classes()

Returns the base query and mutation classes for the GraphQL endpoint.

Returns:

Type Description
tuple

Tuple of the base query and mutation classes for the GraphQL endpoint.

Source code in aas_middleware\middleware\graphql_routers.py
def get_base_query_and_mutation_classes() -> (
    typing.Tuple[graphene.ObjectType, graphene.ObjectType]
):
    """
    Returns the base query and mutation classes for the GraphQL endpoint.

    Returns:
        tuple: Tuple of the base query and mutation classes for the GraphQL endpoint.
    """

    class Query(graphene.ObjectType):
        pass

    class Mutation(graphene.ObjectType):
        pass

    return Query, Mutation

is_typing_list_or_tuple(input_type)

Checks if the given type is a typing.List or typing.Tuple.

Parameters:

Name Type Description Default
input_type Any

Type to check.

required

Returns:

Type Description
bool

True if the given type is a typing.List or typing.Tuple, False otherwise.

Source code in aas_middleware\middleware\graphql_routers.py
def is_typing_list_or_tuple(input_type: typing.Any) -> bool:
    """
    Checks if the given type is a typing.List or typing.Tuple.

    Args:
        input_type (typing.Any): Type to check.

    Returns:
        bool: True if the given type is a typing.List or typing.Tuple, False otherwise.
    """
    return typing.get_origin(input_type) == list or typing.get_origin(input_type) == tuple

middleware

Middleware

Middleware that can be used to generate a REST or GraphQL API from aas' and submodels either in pydanctic models or in aas object store format.

Source code in aas_middleware\middleware\middleware.py
class Middleware:
    """
    Middleware that can be used to generate a REST or GraphQL API from aas' and submodels either in pydanctic models or in aas object store format.
    """

    def __init__(self):
        self._app: typing.Optional[FastAPI] = None
        self.meta_data: MiddlewareMetaData = MiddlewareMetaData()

        self.data_models: typing.Dict[str, DataModel] = {}

        self.on_start_up_callbacks: typing.List[typing.Callable] = []
        self.on_shutdown_callbacks: typing.List[typing.Callable] = []

        self.persistence_registry: PersistenceConnectionRegistry = PersistenceConnectionRegistry()
        self.connection_registry: ConnectionRegistry = ConnectionRegistry()
        self.workflow_registry: WorkflowRegistry = WorkflowRegistry()

        self.mapper_registry: MapperRegistry = MapperRegistry()

    def set_meta_data(self, title: str, description: str, version: str, contact: typing.Dict[str, str]):
        """
        Function to set the meta data of the middleware.

        Args:
            title (str): The title of the middleware.
            description (str): The description of the middleware.
            version (str): The version of the middleware.
            contact (typing.Dict[str, str]): The contact information of the middleware.
            license_info (typing.Dict[str, str]): The license information of the middleware.
        """
        self.meta_data = MiddlewareMetaData(
            title=title,
            description=description,
            version=version,
            contact=contact
        )

    def add_callback(self, callback_type: typing.Literal["on_start_up", "on_shutdown"], callback: typing.Callable, *args, **kwargs):
        """
        Function to add a callback to the middleware.

        Args:
            callback_type (typing.Literal["on_start_up", "on_shutdown"]): The type of the callback.
            callback (typing.Callable): The callback function.
        """
        functional_callback = partial(callback, *args, **kwargs)
        if callback_type == "on_start_up":
            self.on_start_up_callbacks.append(functional_callback)
        elif callback_type == "on_shutdown":
            self.on_shutdown_callbacks.append(functional_callback)

    @asynccontextmanager
    async def lifespan(self, app: FastAPI):
        """
        Function to create a lifespan for the middleware for all events on startup and shutdown.

        Args:
            app (FastAPI): The FastAPI app that should be used for the lifespan.
        """
        for workflow in self.workflow_registry.get_workflows():
            if workflow.on_startup:
                # TODO: make a case distinction for workflows that postpone start up or not...
                asyncio.create_task(workflow.execute())
        for callback in self.on_start_up_callbacks:
            await callback()
        for connector in self.connection_registry.connectors.values():
            await connector.connect()
        for persistence in self.persistence_registry.connectors.values():
            await persistence.connect()
        yield
        for workflow in self.workflow_registry.get_workflows():
            if workflow.on_shutdown:
                if workflow.running:
                    await workflow.interrupt()
                await workflow.execute()

        for callback in self.on_shutdown_callbacks:
            await callback()

        for connector in self.connection_registry.connectors.values():
            await connector.disconnect()
        for persistence in self.persistence_registry.connectors.values():
            await persistence.disconnect()

    @property
    def app(self):
        if not self._app:
            app = FastAPI(
                title=self.meta_data.title,
                description=self.meta_data.description,
                version=self.meta_data.version,
                contact=self.meta_data.contact,
                license_info={
                    "name": "MIT License",
                    "url": "https://mit-license.org/",
                },
                lifespan=self.lifespan
            )

            app.add_middleware(
                # TODO: make CORS more sophisticated for individual connectors
                CORSMiddleware,
                allow_origins=["*"],
                allow_credentials=["*"],
                allow_methods=["*"],
                allow_headers=["*"],
            )
            self._app = app

            @app.get("/", response_model=str)
            async def root():
                return "Welcome to aas-middleware!"

        return self._app

    def load_data_model(self, name: str, data_model: DataModel, persist_instances: bool = False):
        """
        Function to load a data model into the middleware to be used for synchronization.

        Args:
            name (str): The name of the data model.
            data_model (DataModel): Data model containing the types and values.
            persist_instances (bool): If the instances of the data model should be persisted.
        """
        self.data_models[name] = data_model

        if persist_instances:
            for models_of_type in data_model.get_top_level_models().values():
                if not models_of_type:
                    continue
                model = models_of_type[0]

                for model in models_of_type:
                    self.add_callback("on_start_up", self.persist, name, model)

    def load_json_models(
        self,
        json_models: typing.Dict[str, typing.Any] = None,
        all_fields_required: bool = False,
    ):
        """
        Functions that loads models from a json dict into the middleware that can be used for synchronization.

        The function can either be used with a dict that contains the objects.

        Args:
            json_models (dict): Dictionary of aas' and submodels.
            all_fields_required (bool): If all fields are required in the models.
        """
        # TODO: use here the function to load a DataModel from a dict
        # for model_name, model_values in json_models.items():
        #     pydantic_model = get_pydantic_model_from_dict(
        #         model_values, model_name, all_fields_required
        #     )
        #     self.models.append(pydantic_model)

    def load_model_instances(self, name: str, instances: typing.List[BaseModel]):
        """
        Functions that loads pydantic models into the middleware as a datamodel that can be used for synchronization.

        Args:
            name (str): The name of the data model.
            instances (typing.List[BaseModel]): List of pydantic model instances.
        """
        data_model = DataModel.from_models(*instances)
        self.load_data_model(name, data_model)


    def load_pydantic_models(self, name: str, *models: typing.Tuple[typing.Type[BaseModel]]):
        """
        Functions that loads pydantic models into the middleware that can be used for synchronization.

        Args:
            models (typing.List[typing.Type[BaseModel]]): List of pydantic models.
        """
        data_model = DataModel.from_model_types(models)
        self.load_data_model(data_model)

    def load_aas_objectstore(self, models: model.DictObjectStore):
        """
        Functions that loads multiple aas and their submodels into the middleware that can be used for synchronization.

        Args:
            models (typing.List[model.DictObjectStore]): Object store of aas' and submodels
        """
        data_model = BasyxFormatter().deserialize(models)
        self.load_data_model(data_model)

    async def update_value(self, value: typing.Any, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None):
        """
        Function to update a value in the persistence.

        Args:
            data_model_name (str): _description_
            model_id (typing.Optional[str]): _description_
            field_name (typing.Optional[str]): _description_
            value (typing.Any): _description_
        """
        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
        try:
            connector = self.persistence_registry.get_connection(connection_info)
            await connector.consume(value)
        except KeyError as e:
            await self.persist(data_model_name, value)

    async def get_value(self, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None) -> typing.Any:
        """
        Function to get a value from the persistence.

        Args:
            data_model_name (str): _description_
            model_id (typing.Optional[str]): _description_
            field_name (typing.Optional[str]): _description_

        Returns:
            typing.Any: _description_
        """
        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
        try:
            connector = self.persistence_registry.get_connection(connection_info)
            return await connector.provide()
        except KeyError:
            raise KeyError(f"No provider found for {connection_info}")

    def add_default_persistence(self, persistence_factory: PersistenceFactory, data_model_name: typing.Optional[str], model_id: typing.Optional[Identifiable], model_type: typing.Type[typing.Any] = typing.Any):
        """
        Function to add a default persistence for a model.

        Args:
            data_model_name (str): The name of the data model.
            model (Identifiable): The model that should be persisted.
        """
        if not data_model_name in self.data_models:
            raise ValueError(f"No data model {data_model_name} found.")

        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=None, field_id=None)
        self.persistence_registry.add_persistence_factory(connection_info, model_type, persistence_factory)


    async def persist(self, data_model_name: str, model: typing.Optional[Identifiable]=None, persistence_factory: typing.Optional[PersistenceFactory]=None):
        """
        Function to add a model to the persistence.

        Args:
            data_model_name (str): The name of the data model.
            model (Identifiable): The model that should be persisted.
            persistence_factory (PersistenceFactory): The persistence factory that should be used.

        Raises:
            ValueError: If the connection already exists.
        """
        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model.id, contained_model_id=None, field_id=None)
        if connection_info in self.persistence_registry.connections:
            raise ValueError(f"Connection {connection_info} already exists. Try using the existing connector or remove it first.")
        self.persistence_registry.add_to_persistence(connection_info, model, persistence_factory)
        connector = self.persistence_registry.get_connection(connection_info)
        # TODO: raise an error if consume is not possible and remove the persistence in the persistence registry
        await connector.consume(model)


    def add_connector(self, connector_id: str, connector: Connector, model_type: typing.Type[typing.Any], data_model_name: typing.Optional[str]=None, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None):
        """
        Function to add a connector to the middleware.

        Args:
            connector_id (str): The name of the connector.
            connector (Connector): The connector that should be added.
        """
        self.connection_registry.add_connector(connector_id, connector, model_type)
        if data_model_name:
            self.connect_connector_to_persistence(connector_id, data_model_name, model_id, contained_model_id, field_id)
            self.generate_rest_endpoint_for_connector(connector_id, ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id))	
        else:
            self.generate_rest_endpoint_for_connector(connector_id)


    def generate_rest_endpoint_for_connector(self, connector_id: str, connection_info: typing.Optional[ConnectionInfo]=None):
        """
        Function to generate a REST endpoint for a connector.

        Args:
            connector_id (str): _description_
            connection_info (typing.Optional[ConnectionInfo], optional): _description_. Defaults to None.

        Raises:
            ValueError: _description_
        """
        if not connector_id in self.connection_registry.connectors:
            raise ValueError(f"Connector {connector_id} not found.")
        connector = self.connection_registry.get_connector(connector_id)
        model_type = self.connection_registry.connection_types[connector_id]
        if not connection_info:
            router = generate_connector_endpoint(connector_id, connector, model_type)
        else:
            router = generate_persistence_connector_endpoint(connector_id, connector, connection_info, model_type)
        self.app.include_router(router)


    # TODO: handle also async connectors!!


    def connect_connector_to_persistence(self, connector_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
        """
        Function to connect a connector to a data entity in the middleware.

        Args:
            connector_id (str): The name of the connector.
            connector (Connector): The connector that should be connected.
            data_model_name (str): The name of the data model used for identifying the data model in the middleware.
            model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
            field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
            model_type (typing.Type[typing.Any], optional): The type of the model. Defaults to typing.Any.
            persistence_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
            external_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
            formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
        """
        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
        connector = self.connection_registry.get_connector(connector_id)
        type_connection_info = self.connection_registry.connection_types[connector_id]
        self.connection_registry.add_connection(connector_id, connection_info, connector, type_connection_info)

        synchronize_connector_with_persistence(connector, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)

    def workflow(
        self,
        *args,
        on_startup: bool = False,
        on_shutdown: bool = False,
        interval: typing.Optional[float] = None,
        **kwargs
    ):
        def decorator(func):
            workflow = Workflow.define(
                func,
                *args,
                on_startup=on_startup,
                on_shutdown=on_shutdown,
                interval=interval,
                **kwargs
            )
            self.workflow_registry.add_workflow(workflow)
            workflows_app = generate_workflow_endpoint(workflow)
            self.app.include_router(workflows_app)
            return func

        return decorator

    def connect_workflow_to_persistence_provider(self, workflow_id: str, arg_name: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
        """
        Function to connect a workflow to a data entity in the middleware.

        Args:
            workflow_id (str): The name of the workflow.
            data_model_name (str): The name of the data model used for identifying the data model in the middleware.
            model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
            field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
            mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
            formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
        """
        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
        workflow = self.workflow_registry.get_workflow(workflow_id)
        synchronize_workflow_with_persistence_provider(workflow, arg_name, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)
        # TODO: update workflow endpoint to have optional arguments
        # TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry

    def connect_workflow_to_persistence_consumer(self, workflow_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
        """
        Function to connect a workflow to a data entity in the middleware.

        Args:
            workflow_id (str): The name of the workflow.
            data_model_name (str): The name of the data model used for identifying the data model in the middleware.
            model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
            field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
            mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
            formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
        """
        connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
        workflow = self.workflow_registry.get_workflow(workflow_id)
        synchronize_workflow_with_persistence_consumer(workflow, connection_info, self.persistence_registry, external_mapper, formatter)
        # TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry


    def generate_model_registry_api(self):
        """
        Adds a REST API so that new models can be registered and unregistered from the Middleware.
        """
        # TODO: validate if this works and add it to the admin api...
        router = generate_model_api(middleware_instance=self)
        self.app.include_router(router)
        NUM_REGISTRY_ROUTES = len(router.routes)
        NUM_CONSTANT_ROUTES = 5
        self.app.router.routes = (
            self.app.router.routes[:NUM_CONSTANT_ROUTES]
            + self.app.routes[-NUM_REGISTRY_ROUTES:]
            + self.app.routes[NUM_CONSTANT_ROUTES:-NUM_REGISTRY_ROUTES]
        )

    def generate_rest_api_for_data_model(self, data_model_name: str):
        """
        Generates a REST API with CRUD operations for aas' and submodels from the loaded models.
        """
        data_model = self.data_models[data_model_name]
        rest_router = RestRouter(data_model, data_model_name, self)
        rest_router.generate_endpoints()

    def generate_graphql_api_for_data_model(self, data_model_name: str):
        """
        Generates a GraphQL API with query operations for aas' and submodels from the loaded models.
        """
        data_model = self.data_models[data_model_name]
        graphql_router = GraphQLRouter(data_model, data_model_name, self)
        graphql_router.generate_graphql_endpoint()

add_callback(self, callback_type, callback, *args, **kwargs)

Function to add a callback to the middleware.

Parameters:

Name Type Description Default
callback_type Literal["on_start_up", "on_shutdown"]

The type of the callback.

required
callback Callable

The callback function.

required
Source code in aas_middleware\middleware\middleware.py
def add_callback(self, callback_type: typing.Literal["on_start_up", "on_shutdown"], callback: typing.Callable, *args, **kwargs):
    """
    Function to add a callback to the middleware.

    Args:
        callback_type (typing.Literal["on_start_up", "on_shutdown"]): The type of the callback.
        callback (typing.Callable): The callback function.
    """
    functional_callback = partial(callback, *args, **kwargs)
    if callback_type == "on_start_up":
        self.on_start_up_callbacks.append(functional_callback)
    elif callback_type == "on_shutdown":
        self.on_shutdown_callbacks.append(functional_callback)

add_connector(self, connector_id, connector, model_type, data_model_name=None, model_id=None, contained_model_id=None, field_id=None)

Function to add a connector to the middleware.

Parameters:

Name Type Description Default
connector_id str

The name of the connector.

required
connector Connector

The connector that should be added.

required
Source code in aas_middleware\middleware\middleware.py
def add_connector(self, connector_id: str, connector: Connector, model_type: typing.Type[typing.Any], data_model_name: typing.Optional[str]=None, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None):
    """
    Function to add a connector to the middleware.

    Args:
        connector_id (str): The name of the connector.
        connector (Connector): The connector that should be added.
    """
    self.connection_registry.add_connector(connector_id, connector, model_type)
    if data_model_name:
        self.connect_connector_to_persistence(connector_id, data_model_name, model_id, contained_model_id, field_id)
        self.generate_rest_endpoint_for_connector(connector_id, ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id))	
    else:
        self.generate_rest_endpoint_for_connector(connector_id)

add_default_persistence(self, persistence_factory, data_model_name, model_id, model_type=typing.Any)

Function to add a default persistence for a model.

Parameters:

Name Type Description Default
data_model_name str

The name of the data model.

required
model Identifiable

The model that should be persisted.

required
Source code in aas_middleware\middleware\middleware.py
def add_default_persistence(self, persistence_factory: PersistenceFactory, data_model_name: typing.Optional[str], model_id: typing.Optional[Identifiable], model_type: typing.Type[typing.Any] = typing.Any):
    """
    Function to add a default persistence for a model.

    Args:
        data_model_name (str): The name of the data model.
        model (Identifiable): The model that should be persisted.
    """
    if not data_model_name in self.data_models:
        raise ValueError(f"No data model {data_model_name} found.")

    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=None, field_id=None)
    self.persistence_registry.add_persistence_factory(connection_info, model_type, persistence_factory)

connect_connector_to_persistence(self, connector_id, data_model_name, model_id=None, contained_model_id=None, field_id=None, persistence_mapper=None, external_mapper=None, formatter=None)

Function to connect a connector to a data entity in the middleware.

Parameters:

Name Type Description Default
connector_id str

The name of the connector.

required
connector Connector

The connector that should be connected.

required
data_model_name str

The name of the data model used for identifying the data model in the middleware.

required
model_id Optional[str]

The id of the model in the data model. Defaults to None.

None
field_id Optional[str]

The id of the field in the model. Defaults to None.

None
model_type Type[Any]

The type of the model. Defaults to typing.Any.

required
persistence_mapper Optional[Mapper]

The mapper that should be used. Defaults to None.

None
external_mapper Optional[Mapper]

The mapper that should be used. Defaults to None.

None
formatter Optional[Formatter]

The formatter that should be used. Defaults to None.

None
Source code in aas_middleware\middleware\middleware.py
def connect_connector_to_persistence(self, connector_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
    """
    Function to connect a connector to a data entity in the middleware.

    Args:
        connector_id (str): The name of the connector.
        connector (Connector): The connector that should be connected.
        data_model_name (str): The name of the data model used for identifying the data model in the middleware.
        model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
        field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
        model_type (typing.Type[typing.Any], optional): The type of the model. Defaults to typing.Any.
        persistence_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
        external_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
        formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
    """
    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
    connector = self.connection_registry.get_connector(connector_id)
    type_connection_info = self.connection_registry.connection_types[connector_id]
    self.connection_registry.add_connection(connector_id, connection_info, connector, type_connection_info)

    synchronize_connector_with_persistence(connector, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)

connect_workflow_to_persistence_consumer(self, workflow_id, data_model_name, model_id=None, contained_model_id=None, field_id=None, external_mapper=None, formatter=None)

Function to connect a workflow to a data entity in the middleware.

Parameters:

Name Type Description Default
workflow_id str

The name of the workflow.

required
data_model_name str

The name of the data model used for identifying the data model in the middleware.

required
model_id Optional[str]

The id of the model in the data model. Defaults to None.

None
field_id Optional[str]

The id of the field in the model. Defaults to None.

None
mapper Optional[Mapper]

The mapper that should be used. Defaults to None.

required
formatter Optional[Formatter]

The formatter that should be used. Defaults to None.

None
Source code in aas_middleware\middleware\middleware.py
def connect_workflow_to_persistence_consumer(self, workflow_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
    """
    Function to connect a workflow to a data entity in the middleware.

    Args:
        workflow_id (str): The name of the workflow.
        data_model_name (str): The name of the data model used for identifying the data model in the middleware.
        model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
        field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
        mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
        formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
    """
    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
    workflow = self.workflow_registry.get_workflow(workflow_id)
    synchronize_workflow_with_persistence_consumer(workflow, connection_info, self.persistence_registry, external_mapper, formatter)
    # TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry

connect_workflow_to_persistence_provider(self, workflow_id, arg_name, data_model_name, model_id=None, contained_model_id=None, field_id=None, persistence_mapper=None, external_mapper=None, formatter=None)

Function to connect a workflow to a data entity in the middleware.

Parameters:

Name Type Description Default
workflow_id str

The name of the workflow.

required
data_model_name str

The name of the data model used for identifying the data model in the middleware.

required
model_id Optional[str]

The id of the model in the data model. Defaults to None.

None
field_id Optional[str]

The id of the field in the model. Defaults to None.

None
mapper Optional[Mapper]

The mapper that should be used. Defaults to None.

required
formatter Optional[Formatter]

The formatter that should be used. Defaults to None.

None
Source code in aas_middleware\middleware\middleware.py
def connect_workflow_to_persistence_provider(self, workflow_id: str, arg_name: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
    """
    Function to connect a workflow to a data entity in the middleware.

    Args:
        workflow_id (str): The name of the workflow.
        data_model_name (str): The name of the data model used for identifying the data model in the middleware.
        model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
        field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
        mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
        formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
    """
    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
    workflow = self.workflow_registry.get_workflow(workflow_id)
    synchronize_workflow_with_persistence_provider(workflow, arg_name, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)
    # TODO: update workflow endpoint to have optional arguments
    # TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry

generate_graphql_api_for_data_model(self, data_model_name)

Generates a GraphQL API with query operations for aas' and submodels from the loaded models.

Source code in aas_middleware\middleware\middleware.py
def generate_graphql_api_for_data_model(self, data_model_name: str):
    """
    Generates a GraphQL API with query operations for aas' and submodels from the loaded models.
    """
    data_model = self.data_models[data_model_name]
    graphql_router = GraphQLRouter(data_model, data_model_name, self)
    graphql_router.generate_graphql_endpoint()

generate_model_registry_api(self)

Adds a REST API so that new models can be registered and unregistered from the Middleware.

Source code in aas_middleware\middleware\middleware.py
def generate_model_registry_api(self):
    """
    Adds a REST API so that new models can be registered and unregistered from the Middleware.
    """
    # TODO: validate if this works and add it to the admin api...
    router = generate_model_api(middleware_instance=self)
    self.app.include_router(router)
    NUM_REGISTRY_ROUTES = len(router.routes)
    NUM_CONSTANT_ROUTES = 5
    self.app.router.routes = (
        self.app.router.routes[:NUM_CONSTANT_ROUTES]
        + self.app.routes[-NUM_REGISTRY_ROUTES:]
        + self.app.routes[NUM_CONSTANT_ROUTES:-NUM_REGISTRY_ROUTES]
    )

generate_rest_api_for_data_model(self, data_model_name)

Generates a REST API with CRUD operations for aas' and submodels from the loaded models.

Source code in aas_middleware\middleware\middleware.py
def generate_rest_api_for_data_model(self, data_model_name: str):
    """
    Generates a REST API with CRUD operations for aas' and submodels from the loaded models.
    """
    data_model = self.data_models[data_model_name]
    rest_router = RestRouter(data_model, data_model_name, self)
    rest_router.generate_endpoints()

generate_rest_endpoint_for_connector(self, connector_id, connection_info=None)

Function to generate a REST endpoint for a connector.

Parameters:

Name Type Description Default
connector_id str

description

required
connection_info Optional[ConnectionInfo]

description. Defaults to None.

None

Exceptions:

Type Description
ValueError

description

Source code in aas_middleware\middleware\middleware.py
def generate_rest_endpoint_for_connector(self, connector_id: str, connection_info: typing.Optional[ConnectionInfo]=None):
    """
    Function to generate a REST endpoint for a connector.

    Args:
        connector_id (str): _description_
        connection_info (typing.Optional[ConnectionInfo], optional): _description_. Defaults to None.

    Raises:
        ValueError: _description_
    """
    if not connector_id in self.connection_registry.connectors:
        raise ValueError(f"Connector {connector_id} not found.")
    connector = self.connection_registry.get_connector(connector_id)
    model_type = self.connection_registry.connection_types[connector_id]
    if not connection_info:
        router = generate_connector_endpoint(connector_id, connector, model_type)
    else:
        router = generate_persistence_connector_endpoint(connector_id, connector, connection_info, model_type)
    self.app.include_router(router)

get_value(self, data_model_name, model_id=None, contained_model_id=None, field_name=None) async

Function to get a value from the persistence.

Parameters:

Name Type Description Default
data_model_name str

description

required
model_id Optional[str]

description

None
field_name Optional[str]

description

None

Returns:

Type Description
Any

description

Source code in aas_middleware\middleware\middleware.py
async def get_value(self, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None) -> typing.Any:
    """
    Function to get a value from the persistence.

    Args:
        data_model_name (str): _description_
        model_id (typing.Optional[str]): _description_
        field_name (typing.Optional[str]): _description_

    Returns:
        typing.Any: _description_
    """
    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
    try:
        connector = self.persistence_registry.get_connection(connection_info)
        return await connector.provide()
    except KeyError:
        raise KeyError(f"No provider found for {connection_info}")

lifespan(self, app)

Function to create a lifespan for the middleware for all events on startup and shutdown.

Parameters:

Name Type Description Default
app FastAPI

The FastAPI app that should be used for the lifespan.

required
Source code in aas_middleware\middleware\middleware.py
@asynccontextmanager
async def lifespan(self, app: FastAPI):
    """
    Function to create a lifespan for the middleware for all events on startup and shutdown.

    Args:
        app (FastAPI): The FastAPI app that should be used for the lifespan.
    """
    for workflow in self.workflow_registry.get_workflows():
        if workflow.on_startup:
            # TODO: make a case distinction for workflows that postpone start up or not...
            asyncio.create_task(workflow.execute())
    for callback in self.on_start_up_callbacks:
        await callback()
    for connector in self.connection_registry.connectors.values():
        await connector.connect()
    for persistence in self.persistence_registry.connectors.values():
        await persistence.connect()
    yield
    for workflow in self.workflow_registry.get_workflows():
        if workflow.on_shutdown:
            if workflow.running:
                await workflow.interrupt()
            await workflow.execute()

    for callback in self.on_shutdown_callbacks:
        await callback()

    for connector in self.connection_registry.connectors.values():
        await connector.disconnect()
    for persistence in self.persistence_registry.connectors.values():
        await persistence.disconnect()

load_aas_objectstore(self, models)

Functions that loads multiple aas and their submodels into the middleware that can be used for synchronization.

Parameters:

Name Type Description Default
models List[model.DictObjectStore]

Object store of aas' and submodels

required
Source code in aas_middleware\middleware\middleware.py
def load_aas_objectstore(self, models: model.DictObjectStore):
    """
    Functions that loads multiple aas and their submodels into the middleware that can be used for synchronization.

    Args:
        models (typing.List[model.DictObjectStore]): Object store of aas' and submodels
    """
    data_model = BasyxFormatter().deserialize(models)
    self.load_data_model(data_model)

load_data_model(self, name, data_model, persist_instances=False)

Function to load a data model into the middleware to be used for synchronization.

Parameters:

Name Type Description Default
name str

The name of the data model.

required
data_model DataModel

Data model containing the types and values.

required
persist_instances bool

If the instances of the data model should be persisted.

False
Source code in aas_middleware\middleware\middleware.py
def load_data_model(self, name: str, data_model: DataModel, persist_instances: bool = False):
    """
    Function to load a data model into the middleware to be used for synchronization.

    Args:
        name (str): The name of the data model.
        data_model (DataModel): Data model containing the types and values.
        persist_instances (bool): If the instances of the data model should be persisted.
    """
    self.data_models[name] = data_model

    if persist_instances:
        for models_of_type in data_model.get_top_level_models().values():
            if not models_of_type:
                continue
            model = models_of_type[0]

            for model in models_of_type:
                self.add_callback("on_start_up", self.persist, name, model)

load_json_models(self, json_models=None, all_fields_required=False)

Functions that loads models from a json dict into the middleware that can be used for synchronization.

The function can either be used with a dict that contains the objects.

Parameters:

Name Type Description Default
json_models dict

Dictionary of aas' and submodels.

None
all_fields_required bool

If all fields are required in the models.

False
Source code in aas_middleware\middleware\middleware.py
def load_json_models(
    self,
    json_models: typing.Dict[str, typing.Any] = None,
    all_fields_required: bool = False,
):
    """
    Functions that loads models from a json dict into the middleware that can be used for synchronization.

    The function can either be used with a dict that contains the objects.

    Args:
        json_models (dict): Dictionary of aas' and submodels.
        all_fields_required (bool): If all fields are required in the models.
    """
    # TODO: use here the function to load a DataModel from a dict
    # for model_name, model_values in json_models.items():
    #     pydantic_model = get_pydantic_model_from_dict(
    #         model_values, model_name, all_fields_required
    #     )
    #     self.models.append(pydantic_model)

load_model_instances(self, name, instances)

Functions that loads pydantic models into the middleware as a datamodel that can be used for synchronization.

Parameters:

Name Type Description Default
name str

The name of the data model.

required
instances List[BaseModel]

List of pydantic model instances.

required
Source code in aas_middleware\middleware\middleware.py
def load_model_instances(self, name: str, instances: typing.List[BaseModel]):
    """
    Functions that loads pydantic models into the middleware as a datamodel that can be used for synchronization.

    Args:
        name (str): The name of the data model.
        instances (typing.List[BaseModel]): List of pydantic model instances.
    """
    data_model = DataModel.from_models(*instances)
    self.load_data_model(name, data_model)

load_pydantic_models(self, name, *models)

Functions that loads pydantic models into the middleware that can be used for synchronization.

Parameters:

Name Type Description Default
models List[Type[BaseModel]]

List of pydantic models.

()
Source code in aas_middleware\middleware\middleware.py
def load_pydantic_models(self, name: str, *models: typing.Tuple[typing.Type[BaseModel]]):
    """
    Functions that loads pydantic models into the middleware that can be used for synchronization.

    Args:
        models (typing.List[typing.Type[BaseModel]]): List of pydantic models.
    """
    data_model = DataModel.from_model_types(models)
    self.load_data_model(data_model)

persist(self, data_model_name, model=None, persistence_factory=None) async

Function to add a model to the persistence.

Parameters:

Name Type Description Default
data_model_name str

The name of the data model.

required
model Identifiable

The model that should be persisted.

None
persistence_factory PersistenceFactory

The persistence factory that should be used.

None

Exceptions:

Type Description
ValueError

If the connection already exists.

Source code in aas_middleware\middleware\middleware.py
async def persist(self, data_model_name: str, model: typing.Optional[Identifiable]=None, persistence_factory: typing.Optional[PersistenceFactory]=None):
    """
    Function to add a model to the persistence.

    Args:
        data_model_name (str): The name of the data model.
        model (Identifiable): The model that should be persisted.
        persistence_factory (PersistenceFactory): The persistence factory that should be used.

    Raises:
        ValueError: If the connection already exists.
    """
    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model.id, contained_model_id=None, field_id=None)
    if connection_info in self.persistence_registry.connections:
        raise ValueError(f"Connection {connection_info} already exists. Try using the existing connector or remove it first.")
    self.persistence_registry.add_to_persistence(connection_info, model, persistence_factory)
    connector = self.persistence_registry.get_connection(connection_info)
    # TODO: raise an error if consume is not possible and remove the persistence in the persistence registry
    await connector.consume(model)

set_meta_data(self, title, description, version, contact)

Function to set the meta data of the middleware.

Parameters:

Name Type Description Default
title str

The title of the middleware.

required
description str

The description of the middleware.

required
version str

The version of the middleware.

required
contact Dict[str, str]

The contact information of the middleware.

required
license_info Dict[str, str]

The license information of the middleware.

required
Source code in aas_middleware\middleware\middleware.py
def set_meta_data(self, title: str, description: str, version: str, contact: typing.Dict[str, str]):
    """
    Function to set the meta data of the middleware.

    Args:
        title (str): The title of the middleware.
        description (str): The description of the middleware.
        version (str): The version of the middleware.
        contact (typing.Dict[str, str]): The contact information of the middleware.
        license_info (typing.Dict[str, str]): The license information of the middleware.
    """
    self.meta_data = MiddlewareMetaData(
        title=title,
        description=description,
        version=version,
        contact=contact
    )

update_value(self, value, data_model_name, model_id=None, contained_model_id=None, field_name=None) async

Function to update a value in the persistence.

Parameters:

Name Type Description Default
data_model_name str

description

required
model_id Optional[str]

description

None
field_name Optional[str]

description

None
value Any

description

required
Source code in aas_middleware\middleware\middleware.py
async def update_value(self, value: typing.Any, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None):
    """
    Function to update a value in the persistence.

    Args:
        data_model_name (str): _description_
        model_id (typing.Optional[str]): _description_
        field_name (typing.Optional[str]): _description_
        value (typing.Any): _description_
    """
    connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
    try:
        connector = self.persistence_registry.get_connection(connection_info)
        await connector.consume(value)
    except KeyError as e:
        await self.persist(data_model_name, value)

MiddlewareMetaData (BaseModel)

Meta data for the middleware.

Source code in aas_middleware\middleware\middleware.py
class MiddlewareMetaData(BaseModel):
    """
    Meta data for the middleware.
    """
    title: str = "aas-middleware"
    description: str = """
    The aas-middleware allows to convert aas models to pydantic models and generate a REST or GraphQL API from them.
    """
    version: str = Field(default=aas_middleware.VERSION)
    contact: typing.Dict[str, str] = {
        "name": "Sebastian Behrendt",
        "email": "sebastian.behrendt@kit.edu",
    }
    license_info: typing.Dict[str, str] = Field(init=False, default_factory=get_license_info)

model_registry_api

generate_model_api(middleware_instance)

Generates endpoints to register und unregister models from the middleware.

Returns:

Type Description
APIRouter

FastAPI router with endpoints to register and register models.

Source code in aas_middleware\middleware\model_registry_api.py
def generate_model_api(middleware_instance: Middleware) -> APIRouter:
    """
    Generates endpoints to register und unregister models from the middleware.

    Returns:
        APIRouter: FastAPI router with endpoints to register and register models.
    """
    # TODO: Also allow to retrieve and post models as JSONSchema -> with required / non-required fields.
    router = APIRouter(
        prefix=f"",
        tags=["Model registry"],
        responses={404: {"description": "Not found"}},
    )

    @router.get("/get_models", response_model=list)
    async def get_models() -> List[Dict[str, str]]:
        schemas = [
            recursive_model_example_string_reformatter(model.schema())
            for model in middleware_instance.models
        ]
        return schemas

    @router.post(
        "/register_model",
        response_model=dict,
    )
    async def post_model(model_name: str, model: dict) -> Dict[str, str]:
        if any(
            model_name == model_instance.__name__
            for model_instance in middleware_instance.models
        ):
            raise HTTPException(
                403,
                f"A model with the name {model_name} exists already! Please update the existing model.",
            )
        if not "id" in model.keys():
            raise HTTPException(
                403, f"Mandatory field id is missing for the model <{model_name}>."
            )
        for key, value in model.items():
            if isinstance(value, dict) and not "id" in value.keys():
                raise HTTPException(
                    403,
                    f"Mandatory field id is missing in submodel <{key}> for model <{model_name}>.",
                )
        register_model_from_middleware(model_name, model, middleware_instance)
        return {"message": f"Succesfully created API for model {model_name}."}

    @router.put("/update_model", response_model=dict)
    async def update_model(model_name: str, model: dict) -> Dict[str, str]:
        if not any(
            model_name == model_instance.__name__
            for model_instance in middleware_instance.models
        ):
            raise HTTPException(
                403,
                f"A model with the name {model_name} does not exist yet! Please post a new model.",
            )
        if not "id" in model.keys():
            raise HTTPException(
                403, f"Mandatory field id is missing for the model <{model_name}>."
            )
        for key, value in model.items():
            if isinstance(value, dict) and not "id" in value.keys():
                raise HTTPException(
                    403,
                    f"Mandatory field id is missing in submodel <{key}> for model <{model_name}>.",
                )
        delete_model_from_middleware(model_name, middleware_instance)
        register_model_from_middleware(model_name, model, middleware_instance)
        return {"message": f"Succesfully updated API for model {model_name}."}

    @router.delete("/delete_model", response_model=dict)
    async def delete_model(model_name: str):
        if not any(
            model.__name__ == model_name for model in middleware_instance.models
        ):
            raise HTTPException(
                404, f"No model registered in middleware with name <{model_name}>"
            )
        delete_model_from_middleware(model_name, middleware_instance)
        return {"message": f"Succesfully deleted API for model {model_name}."}

    return router

remove_model_routes_from_app(app, model_name)

Function removes routes from app that contain the model_name as first route seperator.

Parameters:

Name Type Description Default
app FastAPI

FastAPI app to remove routes from

required
model_name str

model_name of model to remove from API of app

required
Source code in aas_middleware\middleware\model_registry_api.py
def remove_model_routes_from_app(app: FastAPI, model_name: str):
    """
    Function removes routes from app that contain the model_name as first route seperator.

    Args:
        app (FastAPI): FastAPI app to remove routes from
        model_name (str): model_name of model to remove from API of app
    """
    indices_to_delete = []
    for i, r in enumerate(app.routes):
        if route_belongs_to_model(r, model_name):
            indices_to_delete.append(i)
    for index in sorted(indices_to_delete, reverse=True):
        del app.routes[index]
    update_openapi(app)

update_openapi(app)

Updates the openAPI schema of a fastAPI app during runtime to register updates.

Parameters:

Name Type Description Default
app FastAPI

app where the openapi schema should be updated.

required
Source code in aas_middleware\middleware\model_registry_api.py
def update_openapi(app: FastAPI):
    """
    Updates the openAPI schema of a fastAPI app during runtime to register updates.

    Args:
        app (FastAPI): app where the openapi schema should be updated.
    """
    app.openapi_schema = get_openapi(
        title=app.title,
        version=app.version,
        openapi_version=app.openapi_version,
        description=app.description,
        terms_of_service=app.terms_of_service,
        contact=app.contact,
        license_info=app.license_info,
        routes=app.routes,
        tags=app.openapi_tags,
        servers=app.servers,
    )

registries

ConnectionInfo (BaseModel)

Class that contains the information of a connection of a provider and a consumer to the persistence layer.

Source code in aas_middleware\middleware\registries.py
class ConnectionInfo(BaseModel):
    """
    Class that contains the information of a connection of a provider and a consumer to the persistence layer.
    """
    data_model_name: str
    model_id: typing.Optional[str] = None
    contained_model_id: typing.Optional[str] = None
    field_id: typing.Optional[str] = None

    model_config = ConfigDict(frozen=True, protected_namespaces=())

    @property
    def connection_type(self) -> typing.Literal["data_model", "model", "contained_model", "field"]:
        if self.model_id:
            if self.contained_model_id:
                if self.field_id:
                    return "field"
                return "contained_model"
            return "model"
        return "data_model"

ConnectionRegistry

Class that manages the connections of the middleware.

Source code in aas_middleware\middleware\registries.py
class ConnectionRegistry:
    """
    Class that manages the connections of the middleware.
    """

    def __init__(self):
        self.connectors: typing.Dict[str, Connector] = {}
        self.connection_types: typing.Dict[str, typing.Type[Connector]] = {}
        self.connections: typing.Dict[ConnectionInfo, typing.List[str]] = {}


    def get_connector_id(self, connector: Connector) -> str:
        """
        Function to get a connector id from the connection manager.

        Args:
            connector (Connector): The connector of the connection.

        Returns:
            str: The id of the connector.

        Raises:
            KeyError: If the connector is not in the connection manager.
        """
        for connector_id, connector_ in self.connectors.items():
            if connector == connector_:
                return connector_id
        raise KeyError(f"Connector {connector} is not in the connection manager.")


    def get_connector(self, connector_id: str) -> Connector:
        """
        Function to get a connector from the connection manager.

        Args:
            connector_id (str): The id of the connector.

        Returns:
            Connector: The connector of the connection.

        Raises:
            KeyError: If the connector id is not in the connection manager.
        """
        return self.connectors[connector_id]


    def add_connector(self, connector_id: str, connector: Connector, connection_type: typing.Type[Connector]):
        """
        Function to add a connector to the connection manager.

        Args:
            connector_id (str): The name of the connector.
            connector (Connector): The connector to be added.
            connection_type (typing.Type[Connector]): The type of the connector.
        """
        self.connectors[connector_id] = connector
        self.connection_types[connector_id] = connection_type

    def add_connection(self, connector_id: str, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
        """
        Function to add a connection to the connection manager.

        Args:
            connector_id (str): The id of the connector.
            connection_info (ConnectionInfo): The connection info of the connection.
            connector (Connector): The connector of the connection.
            type_connection_info (typing.Type[typing.Any]): The type of the connection info of the connection.
        """
        if not connection_info in self.connections:
            self.connections[connection_info] = []
        self.connections[connection_info].append(connector_id)
        self.add_connector(connector_id, connector, type_connection_info)

    def get_connections(self, connection_info: ConnectionInfo) -> typing.List[typing.Tuple[Connector, typing.Type[typing.Any]]]:
        """
        Function to get a connection from the connection manager.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.

        Returns:
            Connector: The connector of the connection.
        """
        connector_ids = self.connections[connection_info]
        connections = []
        for connector_id in connector_ids:
            connections.append((self.get_connector(connector_id), self.connection_types[connector_id]))
        return connections

    def get_data_model_connection_info(self, data_model_name: str) -> typing.List[ConnectionInfo]:
        """
        Function to get the connection info of a data model.

        Args:
            data_model_name (str): The name of the data model.

        Returns:
            typing.Set[ConnectionInfo]: The connection info of the data model.
        """
        connection_infos = []
        for connection_info in self.connections:
            if not connection_info.data_model_name == data_model_name:
                continue
            connection_infos.append(connection_info)
        return connection_infos

    def get_model_connection_info(self, model_id: str) -> typing.List[ConnectionInfo]:
        """
        Function to get the connection info of a model.

        Args:
            model_id (str): The id of the model.

        Returns:
            typing.Set[ConnectionInfo]: The connection info of the model.
        """
        connection_infos = []
        for connection_info in self.connections:
            if not connection_info.model_id == model_id:
                continue
            connection_infos.append(connection_info)
        return connection_infos

    def get_field_connection_info(self, field_id: str) -> typing.List[ConnectionInfo]:
        """
        Function to get the connection info of a field.

        Args:
            field_id (str): The id of the field.

        Returns:
            typing.Set[ConnectionInfo]: The connection info of the field.
        """
        connection_infos = []
        for connection_info in self.connections:
            if not connection_info.field_id == field_id:
                continue
            connection_infos.append(connection_info)
        return connection_infos

    def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
        """
        Function to get the connection info of a type.

        Args:
            type_name (str): The name of the type.

        Returns:
            typing.Set[ConnectionInfo]: The connection info of the type.
        """
        connection_infos = []
        for connection_info, connections in self.connections.items():
            for connection in connections:
                if not connection[1].__name__ == type_name:
                    continue
                connection_infos.append(connection_info)
        return connection_infos

add_connection(self, connector_id, connection_info, connector, type_connection_info)

Function to add a connection to the connection manager.

Parameters:

Name Type Description Default
connector_id str

The id of the connector.

required
connection_info ConnectionInfo

The connection info of the connection.

required
connector Connector

The connector of the connection.

required
type_connection_info Type[Any]

The type of the connection info of the connection.

required
Source code in aas_middleware\middleware\registries.py
def add_connection(self, connector_id: str, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
    """
    Function to add a connection to the connection manager.

    Args:
        connector_id (str): The id of the connector.
        connection_info (ConnectionInfo): The connection info of the connection.
        connector (Connector): The connector of the connection.
        type_connection_info (typing.Type[typing.Any]): The type of the connection info of the connection.
    """
    if not connection_info in self.connections:
        self.connections[connection_info] = []
    self.connections[connection_info].append(connector_id)
    self.add_connector(connector_id, connector, type_connection_info)

add_connector(self, connector_id, connector, connection_type)

Function to add a connector to the connection manager.

Parameters:

Name Type Description Default
connector_id str

The name of the connector.

required
connector Connector

The connector to be added.

required
connection_type Type[Connector]

The type of the connector.

required
Source code in aas_middleware\middleware\registries.py
def add_connector(self, connector_id: str, connector: Connector, connection_type: typing.Type[Connector]):
    """
    Function to add a connector to the connection manager.

    Args:
        connector_id (str): The name of the connector.
        connector (Connector): The connector to be added.
        connection_type (typing.Type[Connector]): The type of the connector.
    """
    self.connectors[connector_id] = connector
    self.connection_types[connector_id] = connection_type

get_connections(self, connection_info)

Function to get a connection from the connection manager.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required

Returns:

Type Description
Connector

The connector of the connection.

Source code in aas_middleware\middleware\registries.py
def get_connections(self, connection_info: ConnectionInfo) -> typing.List[typing.Tuple[Connector, typing.Type[typing.Any]]]:
    """
    Function to get a connection from the connection manager.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.

    Returns:
        Connector: The connector of the connection.
    """
    connector_ids = self.connections[connection_info]
    connections = []
    for connector_id in connector_ids:
        connections.append((self.get_connector(connector_id), self.connection_types[connector_id]))
    return connections

get_connector(self, connector_id)

Function to get a connector from the connection manager.

Parameters:

Name Type Description Default
connector_id str

The id of the connector.

required

Returns:

Type Description
Connector

The connector of the connection.

Exceptions:

Type Description
KeyError

If the connector id is not in the connection manager.

Source code in aas_middleware\middleware\registries.py
def get_connector(self, connector_id: str) -> Connector:
    """
    Function to get a connector from the connection manager.

    Args:
        connector_id (str): The id of the connector.

    Returns:
        Connector: The connector of the connection.

    Raises:
        KeyError: If the connector id is not in the connection manager.
    """
    return self.connectors[connector_id]

get_connector_id(self, connector)

Function to get a connector id from the connection manager.

Parameters:

Name Type Description Default
connector Connector

The connector of the connection.

required

Returns:

Type Description
str

The id of the connector.

Exceptions:

Type Description
KeyError

If the connector is not in the connection manager.

Source code in aas_middleware\middleware\registries.py
def get_connector_id(self, connector: Connector) -> str:
    """
    Function to get a connector id from the connection manager.

    Args:
        connector (Connector): The connector of the connection.

    Returns:
        str: The id of the connector.

    Raises:
        KeyError: If the connector is not in the connection manager.
    """
    for connector_id, connector_ in self.connectors.items():
        if connector == connector_:
            return connector_id
    raise KeyError(f"Connector {connector} is not in the connection manager.")

get_data_model_connection_info(self, data_model_name)

Function to get the connection info of a data model.

Parameters:

Name Type Description Default
data_model_name str

The name of the data model.

required

Returns:

Type Description
Set[ConnectionInfo]

The connection info of the data model.

Source code in aas_middleware\middleware\registries.py
def get_data_model_connection_info(self, data_model_name: str) -> typing.List[ConnectionInfo]:
    """
    Function to get the connection info of a data model.

    Args:
        data_model_name (str): The name of the data model.

    Returns:
        typing.Set[ConnectionInfo]: The connection info of the data model.
    """
    connection_infos = []
    for connection_info in self.connections:
        if not connection_info.data_model_name == data_model_name:
            continue
        connection_infos.append(connection_info)
    return connection_infos

get_field_connection_info(self, field_id)

Function to get the connection info of a field.

Parameters:

Name Type Description Default
field_id str

The id of the field.

required

Returns:

Type Description
Set[ConnectionInfo]

The connection info of the field.

Source code in aas_middleware\middleware\registries.py
def get_field_connection_info(self, field_id: str) -> typing.List[ConnectionInfo]:
    """
    Function to get the connection info of a field.

    Args:
        field_id (str): The id of the field.

    Returns:
        typing.Set[ConnectionInfo]: The connection info of the field.
    """
    connection_infos = []
    for connection_info in self.connections:
        if not connection_info.field_id == field_id:
            continue
        connection_infos.append(connection_info)
    return connection_infos

get_model_connection_info(self, model_id)

Function to get the connection info of a model.

Parameters:

Name Type Description Default
model_id str

The id of the model.

required

Returns:

Type Description
Set[ConnectionInfo]

The connection info of the model.

Source code in aas_middleware\middleware\registries.py
def get_model_connection_info(self, model_id: str) -> typing.List[ConnectionInfo]:
    """
    Function to get the connection info of a model.

    Args:
        model_id (str): The id of the model.

    Returns:
        typing.Set[ConnectionInfo]: The connection info of the model.
    """
    connection_infos = []
    for connection_info in self.connections:
        if not connection_info.model_id == model_id:
            continue
        connection_infos.append(connection_info)
    return connection_infos

get_type_connection_info(self, type_name)

Function to get the connection info of a type.

Parameters:

Name Type Description Default
type_name str

The name of the type.

required

Returns:

Type Description
Set[ConnectionInfo]

The connection info of the type.

Source code in aas_middleware\middleware\registries.py
def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
    """
    Function to get the connection info of a type.

    Args:
        type_name (str): The name of the type.

    Returns:
        typing.Set[ConnectionInfo]: The connection info of the type.
    """
    connection_infos = []
    for connection_info, connections in self.connections.items():
        for connection in connections:
            if not connection[1].__name__ == type_name:
                continue
            connection_infos.append(connection_info)
    return connection_infos

MapperRegistry

Class that manages the mappers of the middleware.

Source code in aas_middleware\middleware\registries.py
class MapperRegistry:
    """
    Class that manages the mappers of the middleware.
    """

    def __init__(self):
        self.mappers: typing.Dict[str, Mapper] = {}
        self.mapper_input_types: typing.Dict[str, typing.Type] = {}
        self.mapper_output_types: typing.Dict[str, typing.Type] = {}

        self.connections: typing.Dict[typing.Tuple[ConnectionInfo, ConnectionInfo], str] = {}

    def add_mapper(self, mapper_id: str, mapper: Mapper, input_connection: typing.Optional[ConnectionInfo] = None, output_connection: typing.Optional[ConnectionInfo] = None):
        """
        Function to add a mapper to the registry.

        Args:
            mapper_id (str): The name of the mapper.
            mapper (Mapper): The mapper to be added.
            input_connection (typing.Optional[ConnectionInfo]): The input connection of the mapper.
            output_connection (typing.Optional[ConnectionInfo]): The output connection of the mapper.
        """
        self.mappers[mapper_id] = mapper
        self.mapper_input_types[mapper_id] = typing.get_type_hints(mapper.map)["data"]
        self.mapper_output_types[mapper_id] = typing.get_type_hints(mapper.map)["return"]
        if input_connection and output_connection:
            self.connections[(input_connection, output_connection)] = mapper_id
        elif input_connection or output_connection:
            raise ValueError("Either None or both input and output connection must be provided.")


    def get_mappers(self) -> typing.List[Mapper]:
        """
        Function to get the mappers in the registry.

        Returns:
            typing.List[Mapper]: The mappers in the registry
        """
        return list(self.mappers.values())

    def get_mapper(self, mapper_id: str) -> Mapper:
        """
        Function to get a mapper from the registry.

        Args:
            mapper_id (str): The id of the mapper.

        Returns:
            Mapper: The mapper from the registry.

        Raises:
            KeyError: If the mapper is not in the registry.
        """
        return self.mappers[mapper_id]

    def get_mapper_ids(self) -> typing.List[str]:
        """
        Function to get the names of the mappers in the registry.

        Returns:
            typing.List[str]: The names of the mappers in the registry.
        """
        return list(self.mappers.keys())

    def get_mapper_connections(self) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
        """
        Function to get the connections of the mappers in the registry.

        Returns:
            typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mappers in the registry.
        """
        return list(self.connections.keys())


    def get_mapper_by_input_connection(self, input_connection: ConnectionInfo) -> typing.Optional[Mapper]:
        """
        Function to get a mapper by the input connection.

        Args:
            input_connection (ConnectionInfo): The input connection of the mapper.

        Returns:
            Mapper: The mapper of the input connection.
        """
        for connection, mapper_id in self.connections.items():
            if connection[0] == input_connection:
                return self.mappers[mapper_id]

    def get_mapper_by_output_connection(self, output_connection: ConnectionInfo) -> typing.Optional[Mapper]:
        """
        Function to get a mapper by the output connection.

        Args:
            output_connection (ConnectionInfo): The output connection of the mapper.

        Returns:
            Mapper: The mapper of the output connection.
        """
        for connection, mapper_id in self.connections.items():
            if connection[1] == output_connection:
                return self.mappers[mapper_id]

    def get_connection_of_mapper(self, mapper_id: str) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
        """
        Function to get the connections of a mapper.

        Args:
            mapper_id (str): The id of the mapper.

        Returns:
            typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mapper.
        """
        connections = []
        for connection, mapper_id2 in self.connections.items():
            if mapper_id2 == self.mappers[mapper_id]:
                connections.append(connection)
        return connections

add_mapper(self, mapper_id, mapper, input_connection=None, output_connection=None)

Function to add a mapper to the registry.

Parameters:

Name Type Description Default
mapper_id str

The name of the mapper.

required
mapper Mapper

The mapper to be added.

required
input_connection Optional[ConnectionInfo]

The input connection of the mapper.

None
output_connection Optional[ConnectionInfo]

The output connection of the mapper.

None
Source code in aas_middleware\middleware\registries.py
def add_mapper(self, mapper_id: str, mapper: Mapper, input_connection: typing.Optional[ConnectionInfo] = None, output_connection: typing.Optional[ConnectionInfo] = None):
    """
    Function to add a mapper to the registry.

    Args:
        mapper_id (str): The name of the mapper.
        mapper (Mapper): The mapper to be added.
        input_connection (typing.Optional[ConnectionInfo]): The input connection of the mapper.
        output_connection (typing.Optional[ConnectionInfo]): The output connection of the mapper.
    """
    self.mappers[mapper_id] = mapper
    self.mapper_input_types[mapper_id] = typing.get_type_hints(mapper.map)["data"]
    self.mapper_output_types[mapper_id] = typing.get_type_hints(mapper.map)["return"]
    if input_connection and output_connection:
        self.connections[(input_connection, output_connection)] = mapper_id
    elif input_connection or output_connection:
        raise ValueError("Either None or both input and output connection must be provided.")

get_connection_of_mapper(self, mapper_id)

Function to get the connections of a mapper.

Parameters:

Name Type Description Default
mapper_id str

The id of the mapper.

required

Returns:

Type Description
List[Tuple[ConnectionInfo, ConnectionInfo]]

The connections of the mapper.

Source code in aas_middleware\middleware\registries.py
def get_connection_of_mapper(self, mapper_id: str) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
    """
    Function to get the connections of a mapper.

    Args:
        mapper_id (str): The id of the mapper.

    Returns:
        typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mapper.
    """
    connections = []
    for connection, mapper_id2 in self.connections.items():
        if mapper_id2 == self.mappers[mapper_id]:
            connections.append(connection)
    return connections

get_mapper(self, mapper_id)

Function to get a mapper from the registry.

Parameters:

Name Type Description Default
mapper_id str

The id of the mapper.

required

Returns:

Type Description
Mapper

The mapper from the registry.

Exceptions:

Type Description
KeyError

If the mapper is not in the registry.

Source code in aas_middleware\middleware\registries.py
def get_mapper(self, mapper_id: str) -> Mapper:
    """
    Function to get a mapper from the registry.

    Args:
        mapper_id (str): The id of the mapper.

    Returns:
        Mapper: The mapper from the registry.

    Raises:
        KeyError: If the mapper is not in the registry.
    """
    return self.mappers[mapper_id]

get_mapper_by_input_connection(self, input_connection)

Function to get a mapper by the input connection.

Parameters:

Name Type Description Default
input_connection ConnectionInfo

The input connection of the mapper.

required

Returns:

Type Description
Mapper

The mapper of the input connection.

Source code in aas_middleware\middleware\registries.py
def get_mapper_by_input_connection(self, input_connection: ConnectionInfo) -> typing.Optional[Mapper]:
    """
    Function to get a mapper by the input connection.

    Args:
        input_connection (ConnectionInfo): The input connection of the mapper.

    Returns:
        Mapper: The mapper of the input connection.
    """
    for connection, mapper_id in self.connections.items():
        if connection[0] == input_connection:
            return self.mappers[mapper_id]

get_mapper_by_output_connection(self, output_connection)

Function to get a mapper by the output connection.

Parameters:

Name Type Description Default
output_connection ConnectionInfo

The output connection of the mapper.

required

Returns:

Type Description
Mapper

The mapper of the output connection.

Source code in aas_middleware\middleware\registries.py
def get_mapper_by_output_connection(self, output_connection: ConnectionInfo) -> typing.Optional[Mapper]:
    """
    Function to get a mapper by the output connection.

    Args:
        output_connection (ConnectionInfo): The output connection of the mapper.

    Returns:
        Mapper: The mapper of the output connection.
    """
    for connection, mapper_id in self.connections.items():
        if connection[1] == output_connection:
            return self.mappers[mapper_id]

get_mapper_connections(self)

Function to get the connections of the mappers in the registry.

Returns:

Type Description
List[Tuple[ConnectionInfo, ConnectionInfo]]

The connections of the mappers in the registry.

Source code in aas_middleware\middleware\registries.py
def get_mapper_connections(self) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
    """
    Function to get the connections of the mappers in the registry.

    Returns:
        typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mappers in the registry.
    """
    return list(self.connections.keys())

get_mapper_ids(self)

Function to get the names of the mappers in the registry.

Returns:

Type Description
List[str]

The names of the mappers in the registry.

Source code in aas_middleware\middleware\registries.py
def get_mapper_ids(self) -> typing.List[str]:
    """
    Function to get the names of the mappers in the registry.

    Returns:
        typing.List[str]: The names of the mappers in the registry.
    """
    return list(self.mappers.keys())

get_mappers(self)

Function to get the mappers in the registry.

Returns:

Type Description
List[Mapper]

The mappers in the registry

Source code in aas_middleware\middleware\registries.py
def get_mappers(self) -> typing.List[Mapper]:
    """
    Function to get the mappers in the registry.

    Returns:
        typing.List[Mapper]: The mappers in the registry
    """
    return list(self.mappers.values())

PersistenceConnectionRegistry (ConnectionRegistry)

Class that manages the connections of the middleware.

Source code in aas_middleware\middleware\registries.py
class PersistenceConnectionRegistry(ConnectionRegistry):
    """
    Class that manages the connections of the middleware.
    """

    def __init__(self):
        super().__init__()
        self.connectors: typing.Dict[str, Connector] = {}
        self.connection_types: typing.Dict[str, typing.Type[Connector]] = {}
        self.connections: typing.Dict[ConnectionInfo, str] = {}
        self.persistence_factories: typing.Dict[ConnectionInfo, typing.List[typing.Tuple[PersistenceFactory, typing.Type[typing.Any]]]] = {}


    def get_connector_by_data_model_and_model_id(self, data_model_name: str, model_id: str) -> Connector:
        """
        Function to get a connector from the connection manager.

        Args:
            data_model_name (str): The name of the data model.
            model_id (str): The id of the model.

        Returns:
            Connector: The connector of the connection.

        Raises:
            KeyError: If the model id is not in the connection manager.
        """
        connector_id = data_model_name + model_id + "_persistence"
        return self.get_connector(connector_id)

    def add_persistence_factory(self, connection_info: ConnectionInfo, model_type: typing.Type[typing.Any], persistence_factory: PersistenceFactory):
        """
        Function to add a persistence factory to the connection manager.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.
            persistence_factory (PersistenceFactory): The persistence factory of the connection.
        """
        if not connection_info in self.persistence_factories:
            self.persistence_factories[connection_info] = []
        self.persistence_factories[connection_info].append((model_type, persistence_factory))

    def get_default_persistence_factory(self, connection_info: ConnectionInfo, persisted_model_type: typing.Type[typing.Any]) -> PersistenceFactory:
        """
        Function to get the default persistence factory of a connection.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.

        Returns:
            PersistenceFactory: The default persistence factory of the connection.
        """
        data_model_connection_info = ConnectionInfo(data_model_name=connection_info.data_model_name)
        if not data_model_connection_info in self.persistence_factories:
            logger.warning(f"No persistence factory found for {data_model_connection_info}. Using default persistence factory.")
            return PersistenceFactory(ModelConnector)
        for model_type, persistence_factory in self.persistence_factories[data_model_connection_info]:
            if issubclass(persisted_model_type, model_type):
                return persistence_factory
        logger.warning(f"No persistence factory found for {data_model_connection_info} and model type {persisted_model_type.__name__}. Using default persistence factory.")
        return PersistenceFactory(ModelConnector)

    def add_to_persistence(self, connection_info: ConnectionInfo, model: Identifiable, persistence_factory: typing.Optional[PersistenceFactory]):
        """
        Function to add a persistent connection to the connection manager.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.
        """
        if not persistence_factory:
            persistence_factory = self.get_default_persistence_factory(connection_info, type(model))
        connector = persistence_factory.create(model)
        self.add_connection(connection_info, connector, type(model))

    def add_connection(self, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
        """
        Function to add a connection to the connection manager.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.
            connector (Connector): The connector of the connection.
        """
        connector_id = connection_info.data_model_name + connection_info.model_id + "_persistence"
        self.add_connector(connector_id, connector, type_connection_info)
        self.connections[connection_info] = connector_id


    def remove_connection(self, connection_info: ConnectionInfo):
        """
        Function to remove a connection from the connection manager.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.
        """
        del self.connections[connection_info]
        # TODO: also delete connector and connection type

    def get_connection(self, connection_info: ConnectionInfo) -> Connector:
        """
        Function to get a connection from the connection manager.

        Args:
            connection_info (ConnectionInfo): The connection info of the connection.

        Returns:
            Connector: The connector of the connection.

        Raises:
            KeyError: If the connection info is not in the connection manager.
        """
        if connection_info in self.connections:
            return self.get_connector(self.connections[connection_info])
        raise KeyError(f"Data model Connection info {connection_info} is not in the connection manager.")

    def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
        """
        Function to get the connection info of a type.

        Args:
            type_name (str): The name of the type.

        Returns:
            typing.Set[ConnectionInfo]: The connection info of the type.
        """
        connection_infos = []
        for connection_info, connector_id in self.connections.items():
            model_type = self.connection_types[connector_id]
            if not model_type.__name__ == type_name:
                continue
            connection_infos.append(connection_info)
        return connection_infos

add_connection(self, connection_info, connector, type_connection_info)

Function to add a connection to the connection manager.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required
connector Connector

The connector of the connection.

required
Source code in aas_middleware\middleware\registries.py
def add_connection(self, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
    """
    Function to add a connection to the connection manager.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.
        connector (Connector): The connector of the connection.
    """
    connector_id = connection_info.data_model_name + connection_info.model_id + "_persistence"
    self.add_connector(connector_id, connector, type_connection_info)
    self.connections[connection_info] = connector_id

add_persistence_factory(self, connection_info, model_type, persistence_factory)

Function to add a persistence factory to the connection manager.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required
persistence_factory PersistenceFactory

The persistence factory of the connection.

required
Source code in aas_middleware\middleware\registries.py
def add_persistence_factory(self, connection_info: ConnectionInfo, model_type: typing.Type[typing.Any], persistence_factory: PersistenceFactory):
    """
    Function to add a persistence factory to the connection manager.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.
        persistence_factory (PersistenceFactory): The persistence factory of the connection.
    """
    if not connection_info in self.persistence_factories:
        self.persistence_factories[connection_info] = []
    self.persistence_factories[connection_info].append((model_type, persistence_factory))

add_to_persistence(self, connection_info, model, persistence_factory)

Function to add a persistent connection to the connection manager.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required
Source code in aas_middleware\middleware\registries.py
def add_to_persistence(self, connection_info: ConnectionInfo, model: Identifiable, persistence_factory: typing.Optional[PersistenceFactory]):
    """
    Function to add a persistent connection to the connection manager.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.
    """
    if not persistence_factory:
        persistence_factory = self.get_default_persistence_factory(connection_info, type(model))
    connector = persistence_factory.create(model)
    self.add_connection(connection_info, connector, type(model))

get_connection(self, connection_info)

Function to get a connection from the connection manager.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required

Returns:

Type Description
Connector

The connector of the connection.

Exceptions:

Type Description
KeyError

If the connection info is not in the connection manager.

Source code in aas_middleware\middleware\registries.py
def get_connection(self, connection_info: ConnectionInfo) -> Connector:
    """
    Function to get a connection from the connection manager.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.

    Returns:
        Connector: The connector of the connection.

    Raises:
        KeyError: If the connection info is not in the connection manager.
    """
    if connection_info in self.connections:
        return self.get_connector(self.connections[connection_info])
    raise KeyError(f"Data model Connection info {connection_info} is not in the connection manager.")

get_connector_by_data_model_and_model_id(self, data_model_name, model_id)

Function to get a connector from the connection manager.

Parameters:

Name Type Description Default
data_model_name str

The name of the data model.

required
model_id str

The id of the model.

required

Returns:

Type Description
Connector

The connector of the connection.

Exceptions:

Type Description
KeyError

If the model id is not in the connection manager.

Source code in aas_middleware\middleware\registries.py
def get_connector_by_data_model_and_model_id(self, data_model_name: str, model_id: str) -> Connector:
    """
    Function to get a connector from the connection manager.

    Args:
        data_model_name (str): The name of the data model.
        model_id (str): The id of the model.

    Returns:
        Connector: The connector of the connection.

    Raises:
        KeyError: If the model id is not in the connection manager.
    """
    connector_id = data_model_name + model_id + "_persistence"
    return self.get_connector(connector_id)

get_default_persistence_factory(self, connection_info, persisted_model_type)

Function to get the default persistence factory of a connection.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required

Returns:

Type Description
PersistenceFactory

The default persistence factory of the connection.

Source code in aas_middleware\middleware\registries.py
def get_default_persistence_factory(self, connection_info: ConnectionInfo, persisted_model_type: typing.Type[typing.Any]) -> PersistenceFactory:
    """
    Function to get the default persistence factory of a connection.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.

    Returns:
        PersistenceFactory: The default persistence factory of the connection.
    """
    data_model_connection_info = ConnectionInfo(data_model_name=connection_info.data_model_name)
    if not data_model_connection_info in self.persistence_factories:
        logger.warning(f"No persistence factory found for {data_model_connection_info}. Using default persistence factory.")
        return PersistenceFactory(ModelConnector)
    for model_type, persistence_factory in self.persistence_factories[data_model_connection_info]:
        if issubclass(persisted_model_type, model_type):
            return persistence_factory
    logger.warning(f"No persistence factory found for {data_model_connection_info} and model type {persisted_model_type.__name__}. Using default persistence factory.")
    return PersistenceFactory(ModelConnector)

get_type_connection_info(self, type_name)

Function to get the connection info of a type.

Parameters:

Name Type Description Default
type_name str

The name of the type.

required

Returns:

Type Description
Set[ConnectionInfo]

The connection info of the type.

Source code in aas_middleware\middleware\registries.py
def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
    """
    Function to get the connection info of a type.

    Args:
        type_name (str): The name of the type.

    Returns:
        typing.Set[ConnectionInfo]: The connection info of the type.
    """
    connection_infos = []
    for connection_info, connector_id in self.connections.items():
        model_type = self.connection_types[connector_id]
        if not model_type.__name__ == type_name:
            continue
        connection_infos.append(connection_info)
    return connection_infos

remove_connection(self, connection_info)

Function to remove a connection from the connection manager.

Parameters:

Name Type Description Default
connection_info ConnectionInfo

The connection info of the connection.

required
Source code in aas_middleware\middleware\registries.py
def remove_connection(self, connection_info: ConnectionInfo):
    """
    Function to remove a connection from the connection manager.

    Args:
        connection_info (ConnectionInfo): The connection info of the connection.
    """
    del self.connections[connection_info]
    # TODO: also delete connector and connection type

WorkflowRegistry

Class that manages the workflows of the registry.

Source code in aas_middleware\middleware\registries.py
class WorkflowRegistry:
    """
    Class that manages the workflows of the registry.
    """

    def __init__(self):
        self.workflows: typing.Dict[str, Workflow] = {}

        self.workflow_providers: typing.Dict[str, typing.List[typing.Tuple[ConnectionInfo, Provider]]] = {}
        self.workflow_consumers: typing.Dict[str, typing.List[typing.Tuple[ConnectionInfo, Consumer]]] = {}

    def add_workflow(self, workflow: Workflow):
        """
        Function to add a workflow to the registry.

        Args:
            workflow (Workflow): The workflow to be added.
        """
        self.workflows[workflow.get_name()] = workflow

    def add_provider_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, provider: Provider):
        """
        Function to add a provider to a workflow.

        Args:
            workflow_name (str): The name of the workflow.
            connection_info (ConnectionInfo): The connection info of the provider.
            provider (Provider): The provider to be added.
        """
        if not workflow_name in self.workflow_providers:
            self.workflow_providers[workflow_name] = []
        self.workflow_providers[workflow_name].append((connection_info, provider))

    def add_consumer_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, connector: Connector):
        """
        Function to add a consumer to a workflow.

        Args:
            workflow_name (str): The name of the workflow.
            connection_info (ConnectionInfo): The connection info of the consumer.
            connector (Connector): The connector to be added.
        """
        if not workflow_name in self.workflow_consumers:
            self.workflow_consumers[workflow_name] = []
        self.workflow_consumers[workflow_name].append((connection_info, connector))

    def get_workflows(self) -> typing.List[Workflow]:
        """
        Function to get the workflows in the registry.

        Returns:
            typing.List[Workflow]: The workflows in the registry.
        """
        return list(self.workflows.values())

    def get_workflow(self, workflow_name: str) -> Workflow:
        """
        Function to get a workflow from the registry.

        Args:
            workflow_name (str): The name of the workflow.

        Returns:
            Workflow: The workflow from the registry.

        Raises:
            KeyError: If the workflow is not in the registry.
        """
        return self.workflows[workflow_name]

    def get_connections_of_workflow(self, workflow_name: str) -> typing.Tuple[typing.List[typing.Tuple[ConnectionInfo, Provider]], typing.List[typing.Tuple[ConnectionInfo, Consumer]]]:
        """
        Function to get the connections of a workflow.

        Args:
            workflow_name (str): The name of the workflow.

        Returns:
            typing.Tuple[typing.List[ConnectionInfo, Provider], typing.List[ConnectionInfo, Consumer]]: The connections of the workflow.
        """
        return self.workflow_providers[workflow_name], self.workflow_consumers[workflow_name]


    def get_providers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Provider]]:
        """
        Function to get the providers of a workflow.

        Args:
            workflow_name (str): The name of the workflow.

        Returns:
            typing.List[typing.Tuple[ConnectionInfo, Provider]]: The providers of the workflow.
        """
        return self.workflow_providers[workflow_name]

    def get_consumers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Consumer]]:
        """
        Function to get the consumers of a workflow.

        Args:
            workflow_name (str): The name of the workflow.

        Returns:
            typing.List[typing.Tuple[ConnectionInfo, Consumer]]: The consumers of the workflow.
        """
        return self.workflow_consumers[workflow_name]

    def get_workflow_names(self) -> typing.List[str]:
        """
        Function to get the names of the workflows in the registry.

        Returns:
            typing.List[str]: The names of the workflows in the registry.
        """
        return list(self.workflows.keys())

    def get_workflow_descriptions(self) -> typing.List[WorkflowDescription]:
        """
        Function to get the descriptions of the workflows in the registry.

        Returns:
            typing.List[WorkflowDescription]: The descriptions of the workflows in the registry.
        """
        return [workflow.get_description() for workflow in self.workflows.values()]

add_consumer_to_workflow(self, workflow_name, connection_info, connector)

Function to add a consumer to a workflow.

Parameters:

Name Type Description Default
workflow_name str

The name of the workflow.

required
connection_info ConnectionInfo

The connection info of the consumer.

required
connector Connector

The connector to be added.

required
Source code in aas_middleware\middleware\registries.py
def add_consumer_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, connector: Connector):
    """
    Function to add a consumer to a workflow.

    Args:
        workflow_name (str): The name of the workflow.
        connection_info (ConnectionInfo): The connection info of the consumer.
        connector (Connector): The connector to be added.
    """
    if not workflow_name in self.workflow_consumers:
        self.workflow_consumers[workflow_name] = []
    self.workflow_consumers[workflow_name].append((connection_info, connector))

add_provider_to_workflow(self, workflow_name, connection_info, provider)

Function to add a provider to a workflow.

Parameters:

Name Type Description Default
workflow_name str

The name of the workflow.

required
connection_info ConnectionInfo

The connection info of the provider.

required
provider Provider

The provider to be added.

required
Source code in aas_middleware\middleware\registries.py
def add_provider_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, provider: Provider):
    """
    Function to add a provider to a workflow.

    Args:
        workflow_name (str): The name of the workflow.
        connection_info (ConnectionInfo): The connection info of the provider.
        provider (Provider): The provider to be added.
    """
    if not workflow_name in self.workflow_providers:
        self.workflow_providers[workflow_name] = []
    self.workflow_providers[workflow_name].append((connection_info, provider))

add_workflow(self, workflow)

Function to add a workflow to the registry.

Parameters:

Name Type Description Default
workflow Workflow

The workflow to be added.

required
Source code in aas_middleware\middleware\registries.py
def add_workflow(self, workflow: Workflow):
    """
    Function to add a workflow to the registry.

    Args:
        workflow (Workflow): The workflow to be added.
    """
    self.workflows[workflow.get_name()] = workflow

get_connections_of_workflow(self, workflow_name)

Function to get the connections of a workflow.

Parameters:

Name Type Description Default
workflow_name str

The name of the workflow.

required

Returns:

Type Description
Tuple[List[ConnectionInfo, Provider], List[ConnectionInfo, Consumer]]

The connections of the workflow.

Source code in aas_middleware\middleware\registries.py
def get_connections_of_workflow(self, workflow_name: str) -> typing.Tuple[typing.List[typing.Tuple[ConnectionInfo, Provider]], typing.List[typing.Tuple[ConnectionInfo, Consumer]]]:
    """
    Function to get the connections of a workflow.

    Args:
        workflow_name (str): The name of the workflow.

    Returns:
        typing.Tuple[typing.List[ConnectionInfo, Provider], typing.List[ConnectionInfo, Consumer]]: The connections of the workflow.
    """
    return self.workflow_providers[workflow_name], self.workflow_consumers[workflow_name]

get_consumers(self, workflow_name)

Function to get the consumers of a workflow.

Parameters:

Name Type Description Default
workflow_name str

The name of the workflow.

required

Returns:

Type Description
List[Tuple[ConnectionInfo, Consumer]]

The consumers of the workflow.

Source code in aas_middleware\middleware\registries.py
def get_consumers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Consumer]]:
    """
    Function to get the consumers of a workflow.

    Args:
        workflow_name (str): The name of the workflow.

    Returns:
        typing.List[typing.Tuple[ConnectionInfo, Consumer]]: The consumers of the workflow.
    """
    return self.workflow_consumers[workflow_name]

get_providers(self, workflow_name)

Function to get the providers of a workflow.

Parameters:

Name Type Description Default
workflow_name str

The name of the workflow.

required

Returns:

Type Description
List[Tuple[ConnectionInfo, Provider]]

The providers of the workflow.

Source code in aas_middleware\middleware\registries.py
def get_providers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Provider]]:
    """
    Function to get the providers of a workflow.

    Args:
        workflow_name (str): The name of the workflow.

    Returns:
        typing.List[typing.Tuple[ConnectionInfo, Provider]]: The providers of the workflow.
    """
    return self.workflow_providers[workflow_name]

get_workflow(self, workflow_name)

Function to get a workflow from the registry.

Parameters:

Name Type Description Default
workflow_name str

The name of the workflow.

required

Returns:

Type Description
Workflow

The workflow from the registry.

Exceptions:

Type Description
KeyError

If the workflow is not in the registry.

Source code in aas_middleware\middleware\registries.py
def get_workflow(self, workflow_name: str) -> Workflow:
    """
    Function to get a workflow from the registry.

    Args:
        workflow_name (str): The name of the workflow.

    Returns:
        Workflow: The workflow from the registry.

    Raises:
        KeyError: If the workflow is not in the registry.
    """
    return self.workflows[workflow_name]

get_workflow_descriptions(self)

Function to get the descriptions of the workflows in the registry.

Returns:

Type Description
List[WorkflowDescription]

The descriptions of the workflows in the registry.

Source code in aas_middleware\middleware\registries.py
def get_workflow_descriptions(self) -> typing.List[WorkflowDescription]:
    """
    Function to get the descriptions of the workflows in the registry.

    Returns:
        typing.List[WorkflowDescription]: The descriptions of the workflows in the registry.
    """
    return [workflow.get_description() for workflow in self.workflows.values()]

get_workflow_names(self)

Function to get the names of the workflows in the registry.

Returns:

Type Description
List[str]

The names of the workflows in the registry.

Source code in aas_middleware\middleware\registries.py
def get_workflow_names(self) -> typing.List[str]:
    """
    Function to get the names of the workflows in the registry.

    Returns:
        typing.List[str]: The names of the workflows in the registry.
    """
    return list(self.workflows.keys())

get_workflows(self)

Function to get the workflows in the registry.

Returns:

Type Description
List[Workflow]

The workflows in the registry.

Source code in aas_middleware\middleware\registries.py
def get_workflows(self) -> typing.List[Workflow]:
    """
    Function to get the workflows in the registry.

    Returns:
        typing.List[Workflow]: The workflows in the registry.
    """
    return list(self.workflows.values())

rest_routers

RestRouter

Source code in aas_middleware\middleware\rest_routers.py
class RestRouter:
    def __init__(self, data_model: DataModel, data_model_name: str, middleware: "Middleware"):
        self.data_model = data_model
        self.data_model_name = data_model_name
        self.aas_data_model = data_model

        self.middleware = middleware

    def get_connector(self, item_id: str) -> Connector:
        return self.middleware.persistence_registry.get_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))

    def generate_endpoints_from_contained_model(
            self,
        aas_model_type: Type[AAS], 
        attribute_name: str,
        submodel_model_type: Type[Submodel],
    ) -> APIRouter:
        """
        Generates CRUD endpoints for a submodel of a pydantic model representing an aas.

        Args:
            aas_model_type (Type[BaseModel]): Pydantic model representing the aas of the submodel.
            submodel_model_type (Type[base.Submodel]): Pydantic model representing the submodel.

        Returns:
            APIRouter: FastAPI router with CRUD endpoints for the given submodel that performs Middleware syxnchronization.
        """
        model_name = aas_model_type.__name__
        optional_submodel = check_if_attribute_is_optional_in_aas(aas_model_type, attribute_name)
        # TODO: the data model name should be used for creating the endpoint
        # TODO: adjust that no aas or submodel reference appears in the router -> should work for all models.
        router = APIRouter(
            prefix=f"/{model_name}/{{item_id}}/{attribute_name}",
            tags=[model_name],
            responses={404: {"description": "Not found"}},
        )

        @router.get(
            "/",
            response_model=submodel_model_type,
        )
        async def get_item(item_id: str):
            try:
                model = await self.get_connector(item_id).provide()
                return getattr(model, attribute_name)
            except Exception as e:
                raise HTTPException(
                    status_code=400, detail=f"Submodel with id {item_id} could not be retrieved. Error: {e}"
                )

        if optional_submodel:
            @router.post("/")
            async def post_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
                connector = self.get_connector(item_id)
                try: 
                    provided_data = await connector.provide()
                    # TODO: update that the correct type is immediately returned -> using model validate inside the connector
                    provided_data_dict = provided_data.model_dump()
                    model = aas_model_type.model_validate(provided_data_dict)
                    setattr(model, attribute_name, item)
                    await connector.consume(model)
                    return {
                        "message": f"Succesfully created attribute {attribute_name} of aas with id {item_id}"
                    }
                except Exception as e:
                    raise HTTPException(
                        status_code=400, detail=f"Attribute {attribute_name} for model with id {item_id} could not be set. Error: {e}"
                    )

        @router.put("/")
        async def put_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
            connector = self.get_connector(item_id)
            try:
                model = await connector.provide()
                if getattr(model, attribute_name) == item:
                    return {
                        "message": f"Attribute {attribute_name} of model with id {item_id} is already up to date"
                    }
                setattr(model, attribute_name, item)
                await connector.consume(model)
                return {
                    "message": f"Succesfully updated attribute {attribute_name} of model with id {item_id}"
                }
            except Exception as e:
                raise HTTPException(
                    status_code=400, detail=f"Attribute {attribute_name} of model with id {item_id} could not be updated. Error: {e}"
                )

        if optional_submodel:

            @router.delete("/")
            async def delete_item(item_id: str):
                connector = self.get_connector(item_id)
                try:
                    model = await connector.provide()
                    setattr(model, attribute_name, None)
                    await connector.consume(model)
                    return {
                        "message": f"Succesfully deleted attribute {attribute_name} of model with id {item_id}"
                    }
                except Exception as e:
                    raise HTTPException(
                        status_code=400, detail=f"attribute {attribute_name} of model with id {item_id} could not be deleted. Error: {e}"
                    )

        return router


    def generate_aas_endpoints_from_model(self, aas_model_type: Type[AAS]) -> APIRouter:
        """
        Generates CRUD endpoints for a pydantic model representing an aas.

        Args:
            aas_model_type (Type[AAS]): Pydantic model representing an aas

        Returns:
            APIRouter: FastAPI router with CRUD endpoints for the given pydantic model that performs Middleware syxnchronization.
        """
        router = APIRouter(
            prefix=f"/{aas_model_type.__name__}",
            tags=[aas_model_type.__name__],
            responses={404: {"description": "Not found"}},
        )

        @router.get("/", response_model=List[aas_model_type])
        async def get_items():
            aas_list = []
            connection_infos = self.middleware.persistence_registry.get_type_connection_info(aas_model_type.__name__)
            for connection_info in connection_infos:
                connector = self.middleware.persistence_registry.get_connection(connection_info)
                retrieved_aas = await connector.provide()
                aas_list.append(retrieved_aas)
            return aas_list

        @router.post(f"/", response_model=Dict[str, str])
        async def post_item(item: aas_model_type) -> Dict[str, str]:
            try:
                await self.middleware.persist(data_model_name=self.data_model_name, model=item)
                return {
                    "message": f"Succesfully created aas {aas_model_type.__name__} with id {item.id}"
                }
            except ValueError:
                raise HTTPException(
                    status_code=400, detail=f"AAS with id {item.id} already exists"
                )

        @router.get("/{item_id}", response_model=aas_model_type)
        async def get_item(item_id: str):
            try:
                connector = self.get_connector(item_id)
                provided_data = await connector.provide()
                # TODO: update that the correct type is immediately returned -> using model validate inside the connector
                provided_data_dict = provided_data.model_dump()
                return aas_model_type.model_validate(provided_data_dict)
            except Exception as e:
                raise HTTPException(
                    status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Error: {e}"
                )

        @router.put("/{item_id}")
        async def put_item(item_id: str, item: aas_model_type) -> Dict[str, str]:
            try:
                consumer = self.get_connector(item_id)
            except KeyError as e:
                raise HTTPException(
                    status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Try posting it at first."
                )

            # TODO: add some exception handling below
            if item_id == item.id:
                await consumer.consume(item)
            else:
                await self.middleware.persist(data_model_name=self.data_model_name, model=item)     
                await delete_item(item_id)

            return {"message": f"Succesfully updated aas with id {item.id}"}

        @router.delete("/{item_id}")
        async def delete_item(item_id: str):
            await self.get_connector(item_id).consume(None)
            self.middleware.persistence_registry.remove_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))
            return {"message": f"Succesfully deleted aas with id {item_id}"}

        return router



    def generate_endpoints_from_model(self, pydantic_model: Type[BaseModel]) -> List[APIRouter]:
        """
        Generates CRUD endpoints for a pydantic model representing an aas and its submodels.

        Args:
            pydantic_model (Type[BaseModel]): Pydantic model representing an aas with submodels.

        Returns:
            List[APIRouter]: List of FastAPI routers with CRUD endpoints for the given pydantic model and its submodels that perform Middleware syxnchronization.
        """
        routers = []
        routers.append(self.generate_aas_endpoints_from_model(pydantic_model))
        attribute_infos = get_contained_models_attribute_info(pydantic_model)
        for attribute_name, contained_model in attribute_infos:
            routers.append(self.generate_endpoints_from_contained_model(pydantic_model, attribute_name, contained_model))
        return routers


    def generate_endpoints(self):
        """
        Generates CRUD endpoints for a pydantic model representing an aas and its submodels and adds them to the middleware app.
        """
        routers = []

        for top_level_model_type in self.aas_data_model.get_top_level_types():
            routers += self.generate_endpoints_from_model(top_level_model_type)
        for router in routers:
                self.middleware.app.include_router(router)

generate_aas_endpoints_from_model(self, aas_model_type)

Generates CRUD endpoints for a pydantic model representing an aas.

Parameters:

Name Type Description Default
aas_model_type Type[AAS]

Pydantic model representing an aas

required

Returns:

Type Description
APIRouter

FastAPI router with CRUD endpoints for the given pydantic model that performs Middleware syxnchronization.

Source code in aas_middleware\middleware\rest_routers.py
def generate_aas_endpoints_from_model(self, aas_model_type: Type[AAS]) -> APIRouter:
    """
    Generates CRUD endpoints for a pydantic model representing an aas.

    Args:
        aas_model_type (Type[AAS]): Pydantic model representing an aas

    Returns:
        APIRouter: FastAPI router with CRUD endpoints for the given pydantic model that performs Middleware syxnchronization.
    """
    router = APIRouter(
        prefix=f"/{aas_model_type.__name__}",
        tags=[aas_model_type.__name__],
        responses={404: {"description": "Not found"}},
    )

    @router.get("/", response_model=List[aas_model_type])
    async def get_items():
        aas_list = []
        connection_infos = self.middleware.persistence_registry.get_type_connection_info(aas_model_type.__name__)
        for connection_info in connection_infos:
            connector = self.middleware.persistence_registry.get_connection(connection_info)
            retrieved_aas = await connector.provide()
            aas_list.append(retrieved_aas)
        return aas_list

    @router.post(f"/", response_model=Dict[str, str])
    async def post_item(item: aas_model_type) -> Dict[str, str]:
        try:
            await self.middleware.persist(data_model_name=self.data_model_name, model=item)
            return {
                "message": f"Succesfully created aas {aas_model_type.__name__} with id {item.id}"
            }
        except ValueError:
            raise HTTPException(
                status_code=400, detail=f"AAS with id {item.id} already exists"
            )

    @router.get("/{item_id}", response_model=aas_model_type)
    async def get_item(item_id: str):
        try:
            connector = self.get_connector(item_id)
            provided_data = await connector.provide()
            # TODO: update that the correct type is immediately returned -> using model validate inside the connector
            provided_data_dict = provided_data.model_dump()
            return aas_model_type.model_validate(provided_data_dict)
        except Exception as e:
            raise HTTPException(
                status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Error: {e}"
            )

    @router.put("/{item_id}")
    async def put_item(item_id: str, item: aas_model_type) -> Dict[str, str]:
        try:
            consumer = self.get_connector(item_id)
        except KeyError as e:
            raise HTTPException(
                status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Try posting it at first."
            )

        # TODO: add some exception handling below
        if item_id == item.id:
            await consumer.consume(item)
        else:
            await self.middleware.persist(data_model_name=self.data_model_name, model=item)     
            await delete_item(item_id)

        return {"message": f"Succesfully updated aas with id {item.id}"}

    @router.delete("/{item_id}")
    async def delete_item(item_id: str):
        await self.get_connector(item_id).consume(None)
        self.middleware.persistence_registry.remove_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))
        return {"message": f"Succesfully deleted aas with id {item_id}"}

    return router

generate_endpoints(self)

Generates CRUD endpoints for a pydantic model representing an aas and its submodels and adds them to the middleware app.

Source code in aas_middleware\middleware\rest_routers.py
def generate_endpoints(self):
    """
    Generates CRUD endpoints for a pydantic model representing an aas and its submodels and adds them to the middleware app.
    """
    routers = []

    for top_level_model_type in self.aas_data_model.get_top_level_types():
        routers += self.generate_endpoints_from_model(top_level_model_type)
    for router in routers:
            self.middleware.app.include_router(router)

generate_endpoints_from_contained_model(self, aas_model_type, attribute_name, submodel_model_type)

Generates CRUD endpoints for a submodel of a pydantic model representing an aas.

Parameters:

Name Type Description Default
aas_model_type Type[BaseModel]

Pydantic model representing the aas of the submodel.

required
submodel_model_type Type[base.Submodel]

Pydantic model representing the submodel.

required

Returns:

Type Description
APIRouter

FastAPI router with CRUD endpoints for the given submodel that performs Middleware syxnchronization.

Source code in aas_middleware\middleware\rest_routers.py
def generate_endpoints_from_contained_model(
        self,
    aas_model_type: Type[AAS], 
    attribute_name: str,
    submodel_model_type: Type[Submodel],
) -> APIRouter:
    """
    Generates CRUD endpoints for a submodel of a pydantic model representing an aas.

    Args:
        aas_model_type (Type[BaseModel]): Pydantic model representing the aas of the submodel.
        submodel_model_type (Type[base.Submodel]): Pydantic model representing the submodel.

    Returns:
        APIRouter: FastAPI router with CRUD endpoints for the given submodel that performs Middleware syxnchronization.
    """
    model_name = aas_model_type.__name__
    optional_submodel = check_if_attribute_is_optional_in_aas(aas_model_type, attribute_name)
    # TODO: the data model name should be used for creating the endpoint
    # TODO: adjust that no aas or submodel reference appears in the router -> should work for all models.
    router = APIRouter(
        prefix=f"/{model_name}/{{item_id}}/{attribute_name}",
        tags=[model_name],
        responses={404: {"description": "Not found"}},
    )

    @router.get(
        "/",
        response_model=submodel_model_type,
    )
    async def get_item(item_id: str):
        try:
            model = await self.get_connector(item_id).provide()
            return getattr(model, attribute_name)
        except Exception as e:
            raise HTTPException(
                status_code=400, detail=f"Submodel with id {item_id} could not be retrieved. Error: {e}"
            )

    if optional_submodel:
        @router.post("/")
        async def post_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
            connector = self.get_connector(item_id)
            try: 
                provided_data = await connector.provide()
                # TODO: update that the correct type is immediately returned -> using model validate inside the connector
                provided_data_dict = provided_data.model_dump()
                model = aas_model_type.model_validate(provided_data_dict)
                setattr(model, attribute_name, item)
                await connector.consume(model)
                return {
                    "message": f"Succesfully created attribute {attribute_name} of aas with id {item_id}"
                }
            except Exception as e:
                raise HTTPException(
                    status_code=400, detail=f"Attribute {attribute_name} for model with id {item_id} could not be set. Error: {e}"
                )

    @router.put("/")
    async def put_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
        connector = self.get_connector(item_id)
        try:
            model = await connector.provide()
            if getattr(model, attribute_name) == item:
                return {
                    "message": f"Attribute {attribute_name} of model with id {item_id} is already up to date"
                }
            setattr(model, attribute_name, item)
            await connector.consume(model)
            return {
                "message": f"Succesfully updated attribute {attribute_name} of model with id {item_id}"
            }
        except Exception as e:
            raise HTTPException(
                status_code=400, detail=f"Attribute {attribute_name} of model with id {item_id} could not be updated. Error: {e}"
            )

    if optional_submodel:

        @router.delete("/")
        async def delete_item(item_id: str):
            connector = self.get_connector(item_id)
            try:
                model = await connector.provide()
                setattr(model, attribute_name, None)
                await connector.consume(model)
                return {
                    "message": f"Succesfully deleted attribute {attribute_name} of model with id {item_id}"
                }
            except Exception as e:
                raise HTTPException(
                    status_code=400, detail=f"attribute {attribute_name} of model with id {item_id} could not be deleted. Error: {e}"
                )

    return router

generate_endpoints_from_model(self, pydantic_model)

Generates CRUD endpoints for a pydantic model representing an aas and its submodels.

Parameters:

Name Type Description Default
pydantic_model Type[BaseModel]

Pydantic model representing an aas with submodels.

required

Returns:

Type Description
List[APIRouter]

List of FastAPI routers with CRUD endpoints for the given pydantic model and its submodels that perform Middleware syxnchronization.

Source code in aas_middleware\middleware\rest_routers.py
def generate_endpoints_from_model(self, pydantic_model: Type[BaseModel]) -> List[APIRouter]:
    """
    Generates CRUD endpoints for a pydantic model representing an aas and its submodels.

    Args:
        pydantic_model (Type[BaseModel]): Pydantic model representing an aas with submodels.

    Returns:
        List[APIRouter]: List of FastAPI routers with CRUD endpoints for the given pydantic model and its submodels that perform Middleware syxnchronization.
    """
    routers = []
    routers.append(self.generate_aas_endpoints_from_model(pydantic_model))
    attribute_infos = get_contained_models_attribute_info(pydantic_model)
    for attribute_name, contained_model in attribute_infos:
        routers.append(self.generate_endpoints_from_contained_model(pydantic_model, attribute_name, contained_model))
    return routers

check_if_attribute_is_optional_in_aas(aas, attribute_name)

Checks if a submodel is an optional attribute in an aas.

Parameters:

Name Type Description Default
aas Type[base.AAS]

AAS model.

required
submodel Type[base.Submodel]

Submodel to be checked.

required

Exceptions:

Type Description
ValueError

If the submodel is not a submodel of the aas.

Returns:

Type Description
bool

True if the submodel is an optional attribute in the aas, False otherwise.

Source code in aas_middleware\middleware\rest_routers.py
def check_if_attribute_is_optional_in_aas(
    aas: Type[AAS], attribute_name: str
    ) -> bool:
    """
    Checks if a submodel is an optional attribute in an aas.

    Args:
        aas (Type[base.AAS]): AAS model.
        submodel (Type[base.Submodel]): Submodel to be checked.

    Raises:
        ValueError: If the submodel is not a submodel of the aas.

    Returns:
        bool: True if the submodel is an optional attribute in the aas, False otherwise.
    """
    if attribute_name not in aas.model_fields:
        raise ValueError(
            f"Submodel {attribute_name} is not a submodel attribute of {aas.__name__}."
        )
    field_info = aas.model_fields[attribute_name]
    if not field_info.is_required():
        return True
    elif typing.get_origin(field_info.annotation) == Union and type(None) in typing.get_args(field_info.annotation):
        return True
    else:
        return False

synchronization

adjust_body_for_external_schema(body, persistence_mapper=None, formatter=None)

Modifies the body for an external schema.

Parameters:

Name Type Description Default
body Any

The body to modify.

required
persistence_mapper Optional[Mapper]

The mapper that maps the body to the external model. Defaults to None.

None
formatter Optional[Formatter]

The formatter that serializes the body. Defaults to None.

None

Returns:

Type Description
Any

The modified body.

Source code in aas_middleware\middleware\synchronization.py
def adjust_body_for_external_schema(body: Any, persistence_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None) -> Any:
    """
    Modifies the body for an external schema.

    Args:
        body (Any): The body to modify.
        persistence_mapper (typing.Optional[Mapper], optional): The mapper that maps the body to the external model. Defaults to None.
        formatter (typing.Optional[Formatter], optional): The formatter that serializes the body. Defaults to None.

    Returns:
        Any: The modified body.
    """
    if persistence_mapper:
        body = persistence_mapper.map(body)
    if formatter:
        body = formatter.serialize(body)
    return body

adjust_body_for_persistence_schema(body, external_mapper=None, formatter=None)

Modifies the body for persistence.

Parameters:

Name Type Description Default
body Any

The body to modify.

required
external_mapper Optional[Mapper]

The mapper that maps the body to the persistence model. Defaults to None.

None
formatter Optional[Formatter]

The formatter that serializes the body. Defaults to None.

None

Returns:

Type Description
Any

The modified body.

Source code in aas_middleware\middleware\synchronization.py
def adjust_body_for_persistence_schema(body: Any, external_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None) -> Any:
    """
    Modifies the body for persistence.

    Args:
        body (Any): The body to modify.
        external_mapper (typing.Optional[Mapper], optional): The mapper that maps the body to the persistence model. Defaults to None.
        formatter (typing.Optional[Formatter], optional): The formatter that serializes the body. Defaults to None.

    Returns:
        Any: The modified body.
    """
    if formatter:
        body = formatter.deserialize(body)
    if external_mapper:
        body = external_mapper.map(body)
    return body

synchronize_connector_with_persistence(connector, connection_info, persistence_registry, persistence_mapper=None, external_mapper=None, formatter=None)

Synchronizes a connector with the persistence layer.

Parameters:

Name Type Description Default
connector Union[Consumer, Provider]

The connector to synchronize.

required
connection_info ConnectionInfo

The connection info for the persistence layer.

required
persistence_registry PersistenceConnectionRegistry

The registry for the persistence connectors.

required
Source code in aas_middleware\middleware\synchronization.py
def synchronize_connector_with_persistence(connector: Union[Consumer, Provider], connection_info: ConnectionInfo, persistence_registry: PersistenceConnectionRegistry, persistence_mapper: typing.Optional[Mapper] = None, external_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None):
    """
    Synchronizes a connector with the persistence layer.

    Args:
        connector (Union[Consumer, Provider]): The connector to synchronize.
        connection_info (ConnectionInfo): The connection info for the persistence layer.
        persistence_registry (PersistenceConnectionRegistry): The registry for the persistence connectors.    
    """
    if isinstance(connector, Consumer):
        original_consume = connector.consume

        @wraps(connector.consume)
        async def wrapped_consume(consumer_body: Any):
            persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
            if consumer_body is None:
                # TODO: make data model connection possible here, not possible because of some conventions in the registries
                persistence_body = await get_persistence_value(persistence_connector, connection_info)
                consumer_body = adjust_body_for_external_schema(persistence_body, persistence_mapper, formatter)
            else:
                persistence_body = adjust_body_for_persistence_schema(consumer_body, external_mapper, formatter)
                await update_persistence_with_value(persistence_connector, connection_info, persistence_body)
            await original_consume(consumer_body)

        connector.consume = wrapped_consume

    if isinstance(connector, Provider):
        original_provide = connector.provide

        @wraps(connector.provide)
        async def wrapped_provide() -> Any:
            persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
            provider_body = await original_provide()
            persistence_body = await get_persistence_value(persistence_connector, connection_info)
            provider_body_in_peristence_schema = adjust_body_for_persistence_schema(provider_body, external_mapper, formatter)
            if provider_body_in_peristence_schema != persistence_body:
                await update_persistence_with_value(persistence_connector, connection_info, provider_body_in_peristence_schema)
                assert provider_body_in_peristence_schema == await get_persistence_value(persistence_connector, connection_info), f"Persistence value was not updated correctly. Expected {provider_body_in_peristence_schema}, got {await get_persistence_value(persistence_connector, connection_info)}"
            return provider_body

        connector.provide = wrapped_provide

synchronize_workflow_with_persistence_consumer(workflow, connection_info, persistence_registry, external_mapper=None, formatter=None)

Synchronizes a workflow with a persistence consumer.

Parameters:

Name Type Description Default
workflow Workflow

The workflow to synchronize.

required
arg_name str

The name of the argument in the workflow that is a consumer.

required
connection_info ConnectionInfo

The connection info for the persistence layer.

required
persistence_registry PersistenceConnectionRegistry

The registry for the persistence connectors.

required
Source code in aas_middleware\middleware\synchronization.py
def synchronize_workflow_with_persistence_consumer(workflow: Workflow, connection_info: ConnectionInfo, persistence_registry: PersistenceConnectionRegistry, external_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None):
    """
    Synchronizes a workflow with a persistence consumer.

    Args:
        workflow (Workflow): The workflow to synchronize.
        arg_name (str): The name of the argument in the workflow that is a consumer.
        connection_info (ConnectionInfo): The connection info for the persistence layer.
        persistence_registry (PersistenceConnectionRegistry): The registry for the persistence connectors.
    """
    original_execute = workflow.execute

    @wraps(workflow.execute)
    async def wrapped_execute(execute_body: Any):
        print("execute body: ", execute_body)
        workflow_return = await original_execute(execute_body)
        print("workflow return: ", workflow_return)
        persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
        persistence_body = adjust_body_for_persistence_schema(workflow_return, external_mapper, formatter)
        print("persistence body: ", persistence_body)
        await update_persistence_with_value(persistence_connector, connection_info, persistence_body)
        return workflow_return

    workflow.execute = wrapped_execute

synchronize_workflow_with_persistence_provider(workflow, arg_name, connection_info, persistence_registry, persistence_mapper=None, external_mapper=None, formatter=None)

Synchronizes a workflow with a persistence provider.

Parameters:

Name Type Description Default
workflow Workflow

description

required
arg_name str

description

required
connection_info ConnectionInfo

description

required
persistence_registry PersistenceConnectionRegistry

description

required
persistence_mapper Optional[Mapper]

description. Defaults to None.

None
external_mapper Optional[Mapper]

description. Defaults

None
formatter Optional[Formatter]

description. Defaults to None.

None
Source code in aas_middleware\middleware\synchronization.py
def synchronize_workflow_with_persistence_provider(workflow: Workflow, arg_name: str, connection_info: ConnectionInfo, persistence_registry: PersistenceConnectionRegistry, persistence_mapper: typing.Optional[Mapper] = None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter] = None):
    """
    Synchronizes a workflow with a persistence provider.

    Args:
        workflow (Workflow): _description_
        arg_name (str): _description_
        connection_info (ConnectionInfo): _description_
        persistence_registry (PersistenceConnectionRegistry): _description_
        persistence_mapper (typing.Optional[Mapper], optional): _description_. Defaults to None.
        external_mapper (typing.Optional[Mapper], optional): _description_. Defaults
        formatter (typing.Optional[Formatter], optional): _description_. Defaults to None.
    """
    original_execute = workflow.execute

    @wraps(workflow.execute)
    async def wrapped_execute(execute_body: Any):
        persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)

        if not execute_body:
            persistence_body = await get_persistence_value(persistence_connector, connection_info)
            execute_body = adjust_body_for_external_schema(persistence_body, persistence_mapper, formatter)
        else:
            persistence_body = adjust_body_for_persistence_schema(execute_body, external_mapper, formatter)
            await update_persistence_with_value(persistence_connector, connection_info, persistence_body)

        workflow_return = await original_execute(execute_body)
        return workflow_return

    workflow.execute = wrapped_execute

workflow_router

generate_workflow_endpoint(workflow)

Generates endpoints for a workflow to execute the workflow.

Parameters:

Name Type Description Default
workflow Workflow

Workflow that contains the function to be executed by the workflow.

required

Returns:

Type Description
APIRouter

FastAPI router with an endpoint to execute the workflow.

Source code in aas_middleware\middleware\workflow_router.py
def generate_workflow_endpoint(workflow: Workflow) -> List[APIRouter]:
    """
    Generates endpoints for a workflow to execute the workflow.

    Args:
        workflow (Workflow): Workflow that contains the function to be executed by the workflow.

    Returns:
        APIRouter: FastAPI router with an endpoint to execute the workflow.
    """
    router = APIRouter(
        prefix=f"/workflows/{workflow.get_name()}",
        tags=["workflows"],
        responses={404: {"description": "Not found"}},
    )

    if isinstance(workflow.workflow_function, functools.partial):
        type_hints = get_partial_type_hints(workflow.workflow_function)
    else:
        type_hints = typing.get_type_hints(workflow.workflow_function)

    return_type = type_hints.pop("return") if "return" in type_hints else None
    if len(type_hints) == 0:
        input_type_hints = None
    elif len(type_hints) == 1:
        input_type_hints = list(type_hints.values())[0]
    else:
        input_type_hints = get_base_model_from_type_hints(workflow.get_name(), type_hints)


    if input_type_hints is None:
        if workflow.get_description().interval is None:
            @router.post("/execute", response_model=return_type)
            async def execute():
                if workflow.running:
                    raise HTTPException(
                        status_code=400,
                        detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
                    )
                return await workflow.execute()

        @router.post("/execute_background", response_model=Dict[str, str])
        async def execute_background(background_tasks: BackgroundTasks):
            if workflow.running:
                raise HTTPException(
                    status_code=400,
                    detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
                )
            background_tasks.add_task(workflow.execute)
            return {"message": f"Started exeuction of workflow {workflow.get_name()}"}
    else:
        if workflow.get_description().interval is None:
            @router.post("/execute", response_model=return_type)
            # TODO: make the optional not here, but add a method that updates the POST endpoints after a connection is added, to have optional parameters...
            async def execute(arg: typing.Optional[input_type_hints]=None): # type: ignore
                if workflow.running:
                    raise HTTPException(
                        status_code=400,
                        detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
                    )
                if isinstance(arg, BaseModel) and len(type_hints) > 1:
                    input_value = dict(arg)
                    return await workflow.execute(**input_value)
                else:
                    return await workflow.execute(arg)

        @router.post("/execute_background", response_model=Dict[str, str])
        async def execute_background(background_tasks: BackgroundTasks, arg: typing.Optional[input_type_hints]=None): # type: ignore
            if workflow.running:
                raise HTTPException(
                    status_code=400,
                    detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
                )
            if isinstance(arg, BaseModel) and len(type_hints) > 1:
                input_value = dict(arg)
                background_tasks.add_task(workflow.execute, **input_value)
            else:
                background_tasks.add_task(workflow.execute, arg.model_dump())
            return {"message": f"Started exeuction of workflow {workflow.get_name()}"}


    @router.get("/description", response_model=WorkflowDescription)
    async def describe_workflow():
        return workflow.get_description()

    @router.get("/interrupt", response_model=Dict[str, str])
    async def interrupt_workflow():
        try:
            await workflow.interrupt()
        except ValueError as e:
            raise HTTPException(status_code=400, detail=str(e))
        return {"message": f"Stopped execution of workflow {workflow.get_name()}"}

    return router

get_base_model_from_type_hints(name, type_hints)

Get the base model from the type hints of a function.

Parameters:

Name Type Description Default
type_hints Dict[str, Any]

Type hints of a function.

required

Returns:

Type Description
Type[Any]

Base model of the type hints.

Source code in aas_middleware\middleware\workflow_router.py
def get_base_model_from_type_hints(name: str, type_hints: Dict[str, typing.Any]) -> typing.Type[BaseModel]:
    """
    Get the base model from the type hints of a function.

    Args:
        type_hints (Dict[str, typing.Any]): Type hints of a function.

    Returns:
        typing.Type[typing.Any]: Base model of the type hints.
    """
    if "return" in type_hints:
        type_hints.pop("return")
    dynamical_model_creation_dict = {}
    for argument, argument_type in type_hints.items():
        entry = {
            argument: typing.Annotated[
                argument_type, Field()
            ]
        }
        dynamical_model_creation_dict.update(entry)
    base_model = create_model(f"body_for_{name}", **dynamical_model_creation_dict)
    return base_model