Middleware
aas_persistence_middleware
AasMiddleware (Middleware)
Middleware that automatically that has aas and submodel repositories as persistence providers and consumers.
Source code in aas_middleware\middleware\aas_persistence_middleware.py
class AasMiddleware(Middleware):
"""
Middleware that automatically that has aas and submodel repositories as persistence providers and consumers.
"""
def __init__(self):
super().__init__()
def load_aas_persistent_data_model(self, name: str, data_model: DataModel, aas_host: str, aas_port: int, submodel_host: str, submodel_port: int, persist_instances: bool = False):
"""
Function to load a data model into the middleware to be used for synchronization.
Args:
name (str): The name of the data model.
data_model (DataModel): Data model containing the types and values.
aas_host (str): The host of the AAS server.
aas_port (int): The port of the AAS server.
submodel_host (str): The host of the submodel server.
submodel_port (int): The port of the submodel server.
persist_instances (bool, optional): Whether to persist instances of the data model. Defaults to False.
"""
# aas_data_model = DataModelRebuilder(data_model).rebuild_data_model_for_AAS_structure()
aas_data_model = data_model
self.load_data_model(name, aas_data_model, persist_instances)
aas_persistence_factory = PersistenceFactory(BasyxAASConnector, host=aas_host, port=aas_port, submodel_host=submodel_host, submodel_port=submodel_port)
submodel_persistence_factory = PersistenceFactory(BasyxSubmodelConnector, host=submodel_host, port=submodel_port)
self.add_default_persistence(aas_persistence_factory, name, None, AAS)
self.add_default_persistence(submodel_persistence_factory, name, None, Submodel)
def scan_aas_server(self):
"""
Function to scan the AAS server for all available AAS and Submodels.
"""
# TODO: implement function
pass
load_aas_persistent_data_model(self, name, data_model, aas_host, aas_port, submodel_host, submodel_port, persist_instances=False)
Function to load a data model into the middleware to be used for synchronization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the data model. |
required |
data_model |
DataModel |
Data model containing the types and values. |
required |
aas_host |
str |
The host of the AAS server. |
required |
aas_port |
int |
The port of the AAS server. |
required |
submodel_host |
str |
The host of the submodel server. |
required |
submodel_port |
int |
The port of the submodel server. |
required |
persist_instances |
bool |
Whether to persist instances of the data model. Defaults to False. |
False |
Source code in aas_middleware\middleware\aas_persistence_middleware.py
def load_aas_persistent_data_model(self, name: str, data_model: DataModel, aas_host: str, aas_port: int, submodel_host: str, submodel_port: int, persist_instances: bool = False):
"""
Function to load a data model into the middleware to be used for synchronization.
Args:
name (str): The name of the data model.
data_model (DataModel): Data model containing the types and values.
aas_host (str): The host of the AAS server.
aas_port (int): The port of the AAS server.
submodel_host (str): The host of the submodel server.
submodel_port (int): The port of the submodel server.
persist_instances (bool, optional): Whether to persist instances of the data model. Defaults to False.
"""
# aas_data_model = DataModelRebuilder(data_model).rebuild_data_model_for_AAS_structure()
aas_data_model = data_model
self.load_data_model(name, aas_data_model, persist_instances)
aas_persistence_factory = PersistenceFactory(BasyxAASConnector, host=aas_host, port=aas_port, submodel_host=submodel_host, submodel_port=submodel_port)
submodel_persistence_factory = PersistenceFactory(BasyxSubmodelConnector, host=submodel_host, port=submodel_port)
self.add_default_persistence(aas_persistence_factory, name, None, AAS)
self.add_default_persistence(submodel_persistence_factory, name, None, Submodel)
connector_router
generate_connector_endpoint(connector_id, connector, model_type)
Generates endpoints for a workflow to execute the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
Workflow that contains the function to be executed by the workflow. |
required |
Returns:
Type | Description |
---|---|
APIRouter |
FastAPI router with an endpoint to execute the workflow. |
Source code in aas_middleware\middleware\connector_router.py
def generate_connector_endpoint(connector_id: str, connector: Union[Consumer, Provider, Connector], model_type: Type[Any]) -> List[APIRouter]:
"""
Generates endpoints for a workflow to execute the workflow.
Args:
workflow (Workflow): Workflow that contains the function to be executed by the workflow.
Returns:
APIRouter: FastAPI router with an endpoint to execute the workflow.
"""
router = APIRouter(
prefix=f"/connectors/{connector_id}",
tags=["connectors"],
responses={404: {"description": "Not found"}},
)
@router.get("/description", response_model=ConnectorDescription)
async def describe_connector():
return ConnectorDescription(
connector_id=connector_id,
connector_type=type(connector).__name__,
persistence_connection=None,
model_type=model_type.__name__
)
if isinstance(connector, Consumer):
@router.post("/value", response_model=Dict[str, str])
async def set_value(value: model_type):
try:
await connector.consume(value)
except ConnectionError as e:
raise HTTPException(status_code=500, detail=str(e))
return {"message": f"Set value for {connector_id}"}
if isinstance(connector, Provider):
@router.get("/value", response_model=model_type)
async def get_value():
try:
return await connector.provide()
except ConnectionError as e:
raise HTTPException(status_code=500, detail=str(e))
return router
generate_persistence_connector_endpoint(connector_id, connector, connection_info, model_type)
Generates endpoints for a workflow to execute the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
Workflow that contains the function to be executed by the workflow. |
required |
Returns:
Type | Description |
---|---|
APIRouter |
FastAPI router with an endpoint to execute the workflow. |
Source code in aas_middleware\middleware\connector_router.py
def generate_persistence_connector_endpoint(connector_id: str, connector: Union[Consumer, Provider, Connector], connection_info: ConnectionInfo, model_type: Type[Any]) -> List[APIRouter]:
"""
Generates endpoints for a workflow to execute the workflow.
Args:
workflow (Workflow): Workflow that contains the function to be executed by the workflow.
Returns:
APIRouter: FastAPI router with an endpoint to execute the workflow.
"""
router = APIRouter(
prefix=f"/connectors/{connector_id}",
tags=["connectors"],
responses={404: {"description": "Not found"}},
)
@router.get("/description", response_model=ConnectorDescription)
async def describe_connector():
return ConnectorDescription(
connector_id=connector_id,
connector_type=type(connector).__name__,
persistence_connection=connection_info,
model_type=model_type.__name__
)
if isinstance(connector, Consumer):
@router.post("/value", response_model=Dict[str, str])
async def set_value(value: Optional[model_type]=None):
try:
await connector.consume(value)
except ConnectionError as e:
raise HTTPException(status_code=500, detail=str(e))
if not value:
return {"message": f"Set for {connector_id} persistence value."}
return {"message": f"Set for {connector_id} value {value}"}
if isinstance(connector, Provider):
@router.get("/value", response_model=model_type)
async def get_value():
try:
return await connector.provide()
except ConnectionError as e:
raise HTTPException(status_code=500, detail=str(e))
return router
graphql_routers
GraphQLRouter
Source code in aas_middleware\middleware\graphql_routers.py
class GraphQLRouter:
def __init__(self, data_model: DataModel, data_model_name: str, middleware: "Middleware"):
self.data_model = data_model
self.data_model_name = data_model_name
self.middleware = middleware
self.query, self.mutation = get_base_query_and_mutation_classes()
def get_connector(self, item_id: str) -> Connector:
return self.middleware.persistence_registry.get_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))
def generate_graphql_endpoint(self):
"""
Generates a GraphQL endpoint for the given data model and adds it to the middleware.
"""
for top_level_model_type in self.data_model.get_top_level_types():
self.create_query_for_model(top_level_model_type)
# TODO: also make mutation possible
# self.create_mutation_for_model(top_level_model_type)
schema = graphene.Schema(query=self.query)
graphql_app = GraphQLApp(schema=schema, on_get=make_graphiql_handler())
self.middleware.app.mount("/graphql", graphql_app)
def create_query_for_model(self, model_type: type):
model_name = model_type.__name__
submodels = get_contained_models_attribute_info(model_type)
graphene_submodels = []
for attribute_name, submodel in submodels:
graphene_submodels.append(
create_graphe_pydantic_output_type_for_submodel_elements(submodel)
)
for (attribute_name, submodel), graphene_submodel in zip(submodels, graphene_submodels):
# FIXME: resolve problems with optional submodels!
submodel_name = submodel.__name__
class_dict = {
f"{submodel_name}": graphene.List(graphene_submodel),
f"resolve_{submodel_name}": self.get_submodel_resolve_function(submodel),
}
self.query = type("Query", (self.query,), class_dict)
graphene_model = create_graphe_pydantic_output_type_for_model(model_type)
class_dict = {
f"{model_name}": graphene.List(graphene_model),
f"resolve_{model_name}": self.get_aas_resolve_function(model_type),
}
self.query = type("Query", (self.query,), class_dict)
def get_aas_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
"""
Returns the resolve function for the given pydantic model.
Args:
model (Type[BaseModel]): Pydantic model for which the resolve function should be created.
Returns:
typing.Callable: Resolve function for the given pydantic model.
"""
middleware_instance = self.middleware
async def resolve_models(self, info):
aas_list = []
connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
for connection_info in connection_infos:
connector = middleware_instance.persistence_registry.get_connection(connection_info)
retrieved_aas: AAS = await connector.provide()
aas = model.model_validate(retrieved_aas.model_dump())
aas_list.append(aas)
return aas_list
resolve_models.__name__ = f"resolve_{model.__name__}"
return resolve_models
def get_submodel_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
"""
Returns the resolve function for the given pydantic model.
Args:
model (Type[BaseModel]): Pydantic model for which the resolve function should be created.
Returns:
typing.Callable: Resolve function for the given pydantic model.
"""
middleware_instance = self.middleware
async def resolve_models(self, info):
submodel_list = []
# TODO: get the correct connectors here... no submodel connectors are available
connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
for connection_info in connection_infos:
connector = middleware_instance.persistence_registry.get_connection(connection_info)
retrieved_submodel: Submodel = await connector.provide()
submodel = model.model_validate(retrieved_submodel.model_dump())
submodel_list.append(submodel)
return submodel_list
resolve_models.__name__ = f"resolve_{model.__name__}"
return resolve_models
generate_graphql_endpoint(self)
Generates a GraphQL endpoint for the given data model and adds it to the middleware.
Source code in aas_middleware\middleware\graphql_routers.py
def generate_graphql_endpoint(self):
"""
Generates a GraphQL endpoint for the given data model and adds it to the middleware.
"""
for top_level_model_type in self.data_model.get_top_level_types():
self.create_query_for_model(top_level_model_type)
# TODO: also make mutation possible
# self.create_mutation_for_model(top_level_model_type)
schema = graphene.Schema(query=self.query)
graphql_app = GraphQLApp(schema=schema, on_get=make_graphiql_handler())
self.middleware.app.mount("/graphql", graphql_app)
get_aas_resolve_function(self, model)
Returns the resolve function for the given pydantic model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[BaseModel] |
Pydantic model for which the resolve function should be created. |
required |
Returns:
Type | Description |
---|---|
Callable |
Resolve function for the given pydantic model. |
Source code in aas_middleware\middleware\graphql_routers.py
def get_aas_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
"""
Returns the resolve function for the given pydantic model.
Args:
model (Type[BaseModel]): Pydantic model for which the resolve function should be created.
Returns:
typing.Callable: Resolve function for the given pydantic model.
"""
middleware_instance = self.middleware
async def resolve_models(self, info):
aas_list = []
connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
for connection_info in connection_infos:
connector = middleware_instance.persistence_registry.get_connection(connection_info)
retrieved_aas: AAS = await connector.provide()
aas = model.model_validate(retrieved_aas.model_dump())
aas_list.append(aas)
return aas_list
resolve_models.__name__ = f"resolve_{model.__name__}"
return resolve_models
get_submodel_resolve_function(self, model)
Returns the resolve function for the given pydantic model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[BaseModel] |
Pydantic model for which the resolve function should be created. |
required |
Returns:
Type | Description |
---|---|
Callable |
Resolve function for the given pydantic model. |
Source code in aas_middleware\middleware\graphql_routers.py
def get_submodel_resolve_function(self, model: typing.Type[BaseModel]) -> typing.Callable:
"""
Returns the resolve function for the given pydantic model.
Args:
model (Type[BaseModel]): Pydantic model for which the resolve function should be created.
Returns:
typing.Callable: Resolve function for the given pydantic model.
"""
middleware_instance = self.middleware
async def resolve_models(self, info):
submodel_list = []
# TODO: get the correct connectors here... no submodel connectors are available
connection_infos = middleware_instance.persistence_registry.get_type_connection_info(model.__name__)
for connection_info in connection_infos:
connector = middleware_instance.persistence_registry.get_connection(connection_info)
retrieved_submodel: Submodel = await connector.provide()
submodel = model.model_validate(retrieved_submodel.model_dump())
submodel_list.append(submodel)
return submodel_list
resolve_models.__name__ = f"resolve_{model.__name__}"
return resolve_models
create_graphe_pydantic_output_type_for_model(input_model, union_type=False)
Creates a pydantic model for the given pydantic model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Type[BaseModel] |
Pydantic model for which the Graphene Object Type should be created. |
required |
Returns:
Type | Description |
---|---|
PydanticObjectType |
Graphene Object type for the given pydantic model. |
Source code in aas_middleware\middleware\graphql_routers.py
def create_graphe_pydantic_output_type_for_model(
input_model: typing.Type[BaseModel], union_type: bool = False
) -> PydanticObjectType:
"""
Creates a pydantic model for the given pydantic model.
Args:
model (Type[BaseModel]): Pydantic model for which the Graphene Object Type should be created.
Returns:
PydanticObjectType: Graphene Object type for the given pydantic model.
"""
graphene_model_registry = get_global_registry(PydanticObjectType)._registry
for model in graphene_model_registry.keys():
if input_model == model.__name__:
return graphene_model_registry[model]
rework_default_list_to_default_factory(input_model)
graphene_model = type(
input_model.__name__,
(PydanticObjectType,),
{"Meta": type("Meta", (), {"model": input_model})},
)
if union_type:
add_class_method(graphene_model)
return graphene_model
create_graphe_pydantic_output_type_for_submodel_elements(model, union_type=False)
Create recursively graphene pydantic output types for submodels and submodel elements.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Union[base.Submodel, base.SubmodelElementCollectiontuple, list, set, ] |
Submodel element for which the graphene pydantic output types should be created. |
required |
Source code in aas_middleware\middleware\graphql_routers.py
def create_graphe_pydantic_output_type_for_submodel_elements(
model: Submodel, union_type: bool = False
) -> PydanticObjectType:
"""
Create recursively graphene pydantic output types for submodels and submodel elements.
Args:
model (typing.Union[base.Submodel, base.SubmodelElementCollectiontuple, list, set, ]): Submodel element for which the graphene pydantic output types should be created.
"""
for attribute_name, attribute_value in get_all_submodel_elements_from_submodel(
model
).items():
if union_type_check(attribute_value):
subtypes = typing.get_args(attribute_value)
for subtype in subtypes:
create_graphe_pydantic_output_type_for_submodel_elements(
subtype, union_type=True
)
elif hasattr(attribute_value, "model_fields") and issubclass(
attribute_value, SubmodelElementCollection
):
create_graphe_pydantic_output_type_for_submodel_elements(attribute_value)
elif is_typing_list_or_tuple(attribute_value):
if not list_contains_any_submodel_element_collections(attribute_value):
continue
for nested_type in typing.get_args(attribute_value):
if union_type_check(nested_type):
subtypes = typing.get_args(nested_type)
for subtype in subtypes:
create_graphe_pydantic_output_type_for_submodel_elements(
subtype, union_type=True
)
elif issubclass(nested_type, SubmodelElementCollection):
create_graphe_pydantic_output_type_for_submodel_elements(
nested_type
)
return create_graphe_pydantic_output_type_for_model(model, union_type)
get_base_query_and_mutation_classes()
Returns the base query and mutation classes for the GraphQL endpoint.
Returns:
Type | Description |
---|---|
tuple |
Tuple of the base query and mutation classes for the GraphQL endpoint. |
Source code in aas_middleware\middleware\graphql_routers.py
def get_base_query_and_mutation_classes() -> (
typing.Tuple[graphene.ObjectType, graphene.ObjectType]
):
"""
Returns the base query and mutation classes for the GraphQL endpoint.
Returns:
tuple: Tuple of the base query and mutation classes for the GraphQL endpoint.
"""
class Query(graphene.ObjectType):
pass
class Mutation(graphene.ObjectType):
pass
return Query, Mutation
is_typing_list_or_tuple(input_type)
Checks if the given type is a typing.List or typing.Tuple.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_type |
Any |
Type to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the given type is a typing.List or typing.Tuple, False otherwise. |
Source code in aas_middleware\middleware\graphql_routers.py
def is_typing_list_or_tuple(input_type: typing.Any) -> bool:
"""
Checks if the given type is a typing.List or typing.Tuple.
Args:
input_type (typing.Any): Type to check.
Returns:
bool: True if the given type is a typing.List or typing.Tuple, False otherwise.
"""
return typing.get_origin(input_type) == list or typing.get_origin(input_type) == tuple
middleware
Middleware
Middleware that can be used to generate a REST or GraphQL API from aas' and submodels either in pydanctic models or in aas object store format.
Source code in aas_middleware\middleware\middleware.py
class Middleware:
"""
Middleware that can be used to generate a REST or GraphQL API from aas' and submodels either in pydanctic models or in aas object store format.
"""
def __init__(self):
self._app: typing.Optional[FastAPI] = None
self.meta_data: MiddlewareMetaData = MiddlewareMetaData()
self.data_models: typing.Dict[str, DataModel] = {}
self.on_start_up_callbacks: typing.List[typing.Callable] = []
self.on_shutdown_callbacks: typing.List[typing.Callable] = []
self.persistence_registry: PersistenceConnectionRegistry = PersistenceConnectionRegistry()
self.connection_registry: ConnectionRegistry = ConnectionRegistry()
self.workflow_registry: WorkflowRegistry = WorkflowRegistry()
self.mapper_registry: MapperRegistry = MapperRegistry()
def set_meta_data(self, title: str, description: str, version: str, contact: typing.Dict[str, str]):
"""
Function to set the meta data of the middleware.
Args:
title (str): The title of the middleware.
description (str): The description of the middleware.
version (str): The version of the middleware.
contact (typing.Dict[str, str]): The contact information of the middleware.
license_info (typing.Dict[str, str]): The license information of the middleware.
"""
self.meta_data = MiddlewareMetaData(
title=title,
description=description,
version=version,
contact=contact
)
def add_callback(self, callback_type: typing.Literal["on_start_up", "on_shutdown"], callback: typing.Callable, *args, **kwargs):
"""
Function to add a callback to the middleware.
Args:
callback_type (typing.Literal["on_start_up", "on_shutdown"]): The type of the callback.
callback (typing.Callable): The callback function.
"""
functional_callback = partial(callback, *args, **kwargs)
if callback_type == "on_start_up":
self.on_start_up_callbacks.append(functional_callback)
elif callback_type == "on_shutdown":
self.on_shutdown_callbacks.append(functional_callback)
@asynccontextmanager
async def lifespan(self, app: FastAPI):
"""
Function to create a lifespan for the middleware for all events on startup and shutdown.
Args:
app (FastAPI): The FastAPI app that should be used for the lifespan.
"""
for workflow in self.workflow_registry.get_workflows():
if workflow.on_startup:
# TODO: make a case distinction for workflows that postpone start up or not...
asyncio.create_task(workflow.execute())
for callback in self.on_start_up_callbacks:
await callback()
for connector in self.connection_registry.connectors.values():
await connector.connect()
for persistence in self.persistence_registry.connectors.values():
await persistence.connect()
yield
for workflow in self.workflow_registry.get_workflows():
if workflow.on_shutdown:
if workflow.running:
await workflow.interrupt()
await workflow.execute()
for callback in self.on_shutdown_callbacks:
await callback()
for connector in self.connection_registry.connectors.values():
await connector.disconnect()
for persistence in self.persistence_registry.connectors.values():
await persistence.disconnect()
@property
def app(self):
if not self._app:
app = FastAPI(
title=self.meta_data.title,
description=self.meta_data.description,
version=self.meta_data.version,
contact=self.meta_data.contact,
license_info={
"name": "MIT License",
"url": "https://mit-license.org/",
},
lifespan=self.lifespan
)
app.add_middleware(
# TODO: make CORS more sophisticated for individual connectors
CORSMiddleware,
allow_origins=["*"],
allow_credentials=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
self._app = app
@app.get("/", response_model=str)
async def root():
return "Welcome to aas-middleware!"
return self._app
def load_data_model(self, name: str, data_model: DataModel, persist_instances: bool = False):
"""
Function to load a data model into the middleware to be used for synchronization.
Args:
name (str): The name of the data model.
data_model (DataModel): Data model containing the types and values.
persist_instances (bool): If the instances of the data model should be persisted.
"""
self.data_models[name] = data_model
if persist_instances:
for models_of_type in data_model.get_top_level_models().values():
if not models_of_type:
continue
model = models_of_type[0]
for model in models_of_type:
self.add_callback("on_start_up", self.persist, name, model)
def load_json_models(
self,
json_models: typing.Dict[str, typing.Any] = None,
all_fields_required: bool = False,
):
"""
Functions that loads models from a json dict into the middleware that can be used for synchronization.
The function can either be used with a dict that contains the objects.
Args:
json_models (dict): Dictionary of aas' and submodels.
all_fields_required (bool): If all fields are required in the models.
"""
# TODO: use here the function to load a DataModel from a dict
# for model_name, model_values in json_models.items():
# pydantic_model = get_pydantic_model_from_dict(
# model_values, model_name, all_fields_required
# )
# self.models.append(pydantic_model)
def load_model_instances(self, name: str, instances: typing.List[BaseModel]):
"""
Functions that loads pydantic models into the middleware as a datamodel that can be used for synchronization.
Args:
name (str): The name of the data model.
instances (typing.List[BaseModel]): List of pydantic model instances.
"""
data_model = DataModel.from_models(*instances)
self.load_data_model(name, data_model)
def load_pydantic_models(self, name: str, *models: typing.Tuple[typing.Type[BaseModel]]):
"""
Functions that loads pydantic models into the middleware that can be used for synchronization.
Args:
models (typing.List[typing.Type[BaseModel]]): List of pydantic models.
"""
data_model = DataModel.from_model_types(models)
self.load_data_model(data_model)
def load_aas_objectstore(self, models: model.DictObjectStore):
"""
Functions that loads multiple aas and their submodels into the middleware that can be used for synchronization.
Args:
models (typing.List[model.DictObjectStore]): Object store of aas' and submodels
"""
data_model = BasyxFormatter().deserialize(models)
self.load_data_model(data_model)
async def update_value(self, value: typing.Any, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None):
"""
Function to update a value in the persistence.
Args:
data_model_name (str): _description_
model_id (typing.Optional[str]): _description_
field_name (typing.Optional[str]): _description_
value (typing.Any): _description_
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
try:
connector = self.persistence_registry.get_connection(connection_info)
await connector.consume(value)
except KeyError as e:
await self.persist(data_model_name, value)
async def get_value(self, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None) -> typing.Any:
"""
Function to get a value from the persistence.
Args:
data_model_name (str): _description_
model_id (typing.Optional[str]): _description_
field_name (typing.Optional[str]): _description_
Returns:
typing.Any: _description_
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
try:
connector = self.persistence_registry.get_connection(connection_info)
return await connector.provide()
except KeyError:
raise KeyError(f"No provider found for {connection_info}")
def add_default_persistence(self, persistence_factory: PersistenceFactory, data_model_name: typing.Optional[str], model_id: typing.Optional[Identifiable], model_type: typing.Type[typing.Any] = typing.Any):
"""
Function to add a default persistence for a model.
Args:
data_model_name (str): The name of the data model.
model (Identifiable): The model that should be persisted.
"""
if not data_model_name in self.data_models:
raise ValueError(f"No data model {data_model_name} found.")
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=None, field_id=None)
self.persistence_registry.add_persistence_factory(connection_info, model_type, persistence_factory)
async def persist(self, data_model_name: str, model: typing.Optional[Identifiable]=None, persistence_factory: typing.Optional[PersistenceFactory]=None):
"""
Function to add a model to the persistence.
Args:
data_model_name (str): The name of the data model.
model (Identifiable): The model that should be persisted.
persistence_factory (PersistenceFactory): The persistence factory that should be used.
Raises:
ValueError: If the connection already exists.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model.id, contained_model_id=None, field_id=None)
if connection_info in self.persistence_registry.connections:
raise ValueError(f"Connection {connection_info} already exists. Try using the existing connector or remove it first.")
self.persistence_registry.add_to_persistence(connection_info, model, persistence_factory)
connector = self.persistence_registry.get_connection(connection_info)
# TODO: raise an error if consume is not possible and remove the persistence in the persistence registry
await connector.consume(model)
def add_connector(self, connector_id: str, connector: Connector, model_type: typing.Type[typing.Any], data_model_name: typing.Optional[str]=None, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None):
"""
Function to add a connector to the middleware.
Args:
connector_id (str): The name of the connector.
connector (Connector): The connector that should be added.
"""
self.connection_registry.add_connector(connector_id, connector, model_type)
if data_model_name:
self.connect_connector_to_persistence(connector_id, data_model_name, model_id, contained_model_id, field_id)
self.generate_rest_endpoint_for_connector(connector_id, ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id))
else:
self.generate_rest_endpoint_for_connector(connector_id)
def generate_rest_endpoint_for_connector(self, connector_id: str, connection_info: typing.Optional[ConnectionInfo]=None):
"""
Function to generate a REST endpoint for a connector.
Args:
connector_id (str): _description_
connection_info (typing.Optional[ConnectionInfo], optional): _description_. Defaults to None.
Raises:
ValueError: _description_
"""
if not connector_id in self.connection_registry.connectors:
raise ValueError(f"Connector {connector_id} not found.")
connector = self.connection_registry.get_connector(connector_id)
model_type = self.connection_registry.connection_types[connector_id]
if not connection_info:
router = generate_connector_endpoint(connector_id, connector, model_type)
else:
router = generate_persistence_connector_endpoint(connector_id, connector, connection_info, model_type)
self.app.include_router(router)
# TODO: handle also async connectors!!
def connect_connector_to_persistence(self, connector_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
"""
Function to connect a connector to a data entity in the middleware.
Args:
connector_id (str): The name of the connector.
connector (Connector): The connector that should be connected.
data_model_name (str): The name of the data model used for identifying the data model in the middleware.
model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
model_type (typing.Type[typing.Any], optional): The type of the model. Defaults to typing.Any.
persistence_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
external_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
connector = self.connection_registry.get_connector(connector_id)
type_connection_info = self.connection_registry.connection_types[connector_id]
self.connection_registry.add_connection(connector_id, connection_info, connector, type_connection_info)
synchronize_connector_with_persistence(connector, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)
def workflow(
self,
*args,
on_startup: bool = False,
on_shutdown: bool = False,
interval: typing.Optional[float] = None,
**kwargs
):
def decorator(func):
workflow = Workflow.define(
func,
*args,
on_startup=on_startup,
on_shutdown=on_shutdown,
interval=interval,
**kwargs
)
self.workflow_registry.add_workflow(workflow)
workflows_app = generate_workflow_endpoint(workflow)
self.app.include_router(workflows_app)
return func
return decorator
def connect_workflow_to_persistence_provider(self, workflow_id: str, arg_name: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
"""
Function to connect a workflow to a data entity in the middleware.
Args:
workflow_id (str): The name of the workflow.
data_model_name (str): The name of the data model used for identifying the data model in the middleware.
model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
workflow = self.workflow_registry.get_workflow(workflow_id)
synchronize_workflow_with_persistence_provider(workflow, arg_name, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)
# TODO: update workflow endpoint to have optional arguments
# TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry
def connect_workflow_to_persistence_consumer(self, workflow_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
"""
Function to connect a workflow to a data entity in the middleware.
Args:
workflow_id (str): The name of the workflow.
data_model_name (str): The name of the data model used for identifying the data model in the middleware.
model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
workflow = self.workflow_registry.get_workflow(workflow_id)
synchronize_workflow_with_persistence_consumer(workflow, connection_info, self.persistence_registry, external_mapper, formatter)
# TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry
def generate_model_registry_api(self):
"""
Adds a REST API so that new models can be registered and unregistered from the Middleware.
"""
# TODO: validate if this works and add it to the admin api...
router = generate_model_api(middleware_instance=self)
self.app.include_router(router)
NUM_REGISTRY_ROUTES = len(router.routes)
NUM_CONSTANT_ROUTES = 5
self.app.router.routes = (
self.app.router.routes[:NUM_CONSTANT_ROUTES]
+ self.app.routes[-NUM_REGISTRY_ROUTES:]
+ self.app.routes[NUM_CONSTANT_ROUTES:-NUM_REGISTRY_ROUTES]
)
def generate_rest_api_for_data_model(self, data_model_name: str):
"""
Generates a REST API with CRUD operations for aas' and submodels from the loaded models.
"""
data_model = self.data_models[data_model_name]
rest_router = RestRouter(data_model, data_model_name, self)
rest_router.generate_endpoints()
def generate_graphql_api_for_data_model(self, data_model_name: str):
"""
Generates a GraphQL API with query operations for aas' and submodels from the loaded models.
"""
data_model = self.data_models[data_model_name]
graphql_router = GraphQLRouter(data_model, data_model_name, self)
graphql_router.generate_graphql_endpoint()
add_callback(self, callback_type, callback, *args, **kwargs)
Function to add a callback to the middleware.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback_type |
Literal["on_start_up", "on_shutdown"] |
The type of the callback. |
required |
callback |
Callable |
The callback function. |
required |
Source code in aas_middleware\middleware\middleware.py
def add_callback(self, callback_type: typing.Literal["on_start_up", "on_shutdown"], callback: typing.Callable, *args, **kwargs):
"""
Function to add a callback to the middleware.
Args:
callback_type (typing.Literal["on_start_up", "on_shutdown"]): The type of the callback.
callback (typing.Callable): The callback function.
"""
functional_callback = partial(callback, *args, **kwargs)
if callback_type == "on_start_up":
self.on_start_up_callbacks.append(functional_callback)
elif callback_type == "on_shutdown":
self.on_shutdown_callbacks.append(functional_callback)
add_connector(self, connector_id, connector, model_type, data_model_name=None, model_id=None, contained_model_id=None, field_id=None)
Function to add a connector to the middleware.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_id |
str |
The name of the connector. |
required |
connector |
Connector |
The connector that should be added. |
required |
Source code in aas_middleware\middleware\middleware.py
def add_connector(self, connector_id: str, connector: Connector, model_type: typing.Type[typing.Any], data_model_name: typing.Optional[str]=None, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None):
"""
Function to add a connector to the middleware.
Args:
connector_id (str): The name of the connector.
connector (Connector): The connector that should be added.
"""
self.connection_registry.add_connector(connector_id, connector, model_type)
if data_model_name:
self.connect_connector_to_persistence(connector_id, data_model_name, model_id, contained_model_id, field_id)
self.generate_rest_endpoint_for_connector(connector_id, ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id))
else:
self.generate_rest_endpoint_for_connector(connector_id)
add_default_persistence(self, persistence_factory, data_model_name, model_id, model_type=typing.Any)
Function to add a default persistence for a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_model_name |
str |
The name of the data model. |
required |
model |
Identifiable |
The model that should be persisted. |
required |
Source code in aas_middleware\middleware\middleware.py
def add_default_persistence(self, persistence_factory: PersistenceFactory, data_model_name: typing.Optional[str], model_id: typing.Optional[Identifiable], model_type: typing.Type[typing.Any] = typing.Any):
"""
Function to add a default persistence for a model.
Args:
data_model_name (str): The name of the data model.
model (Identifiable): The model that should be persisted.
"""
if not data_model_name in self.data_models:
raise ValueError(f"No data model {data_model_name} found.")
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=None, field_id=None)
self.persistence_registry.add_persistence_factory(connection_info, model_type, persistence_factory)
connect_connector_to_persistence(self, connector_id, data_model_name, model_id=None, contained_model_id=None, field_id=None, persistence_mapper=None, external_mapper=None, formatter=None)
Function to connect a connector to a data entity in the middleware.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_id |
str |
The name of the connector. |
required |
connector |
Connector |
The connector that should be connected. |
required |
data_model_name |
str |
The name of the data model used for identifying the data model in the middleware. |
required |
model_id |
Optional[str] |
The id of the model in the data model. Defaults to None. |
None |
field_id |
Optional[str] |
The id of the field in the model. Defaults to None. |
None |
model_type |
Type[Any] |
The type of the model. Defaults to typing.Any. |
required |
persistence_mapper |
Optional[Mapper] |
The mapper that should be used. Defaults to None. |
None |
external_mapper |
Optional[Mapper] |
The mapper that should be used. Defaults to None. |
None |
formatter |
Optional[Formatter] |
The formatter that should be used. Defaults to None. |
None |
Source code in aas_middleware\middleware\middleware.py
def connect_connector_to_persistence(self, connector_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
"""
Function to connect a connector to a data entity in the middleware.
Args:
connector_id (str): The name of the connector.
connector (Connector): The connector that should be connected.
data_model_name (str): The name of the data model used for identifying the data model in the middleware.
model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
model_type (typing.Type[typing.Any], optional): The type of the model. Defaults to typing.Any.
persistence_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
external_mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
connector = self.connection_registry.get_connector(connector_id)
type_connection_info = self.connection_registry.connection_types[connector_id]
self.connection_registry.add_connection(connector_id, connection_info, connector, type_connection_info)
synchronize_connector_with_persistence(connector, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)
connect_workflow_to_persistence_consumer(self, workflow_id, data_model_name, model_id=None, contained_model_id=None, field_id=None, external_mapper=None, formatter=None)
Function to connect a workflow to a data entity in the middleware.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_id |
str |
The name of the workflow. |
required |
data_model_name |
str |
The name of the data model used for identifying the data model in the middleware. |
required |
model_id |
Optional[str] |
The id of the model in the data model. Defaults to None. |
None |
field_id |
Optional[str] |
The id of the field in the model. Defaults to None. |
None |
mapper |
Optional[Mapper] |
The mapper that should be used. Defaults to None. |
required |
formatter |
Optional[Formatter] |
The formatter that should be used. Defaults to None. |
None |
Source code in aas_middleware\middleware\middleware.py
def connect_workflow_to_persistence_consumer(self, workflow_id: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
"""
Function to connect a workflow to a data entity in the middleware.
Args:
workflow_id (str): The name of the workflow.
data_model_name (str): The name of the data model used for identifying the data model in the middleware.
model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
workflow = self.workflow_registry.get_workflow(workflow_id)
synchronize_workflow_with_persistence_consumer(workflow, connection_info, self.persistence_registry, external_mapper, formatter)
# TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry
connect_workflow_to_persistence_provider(self, workflow_id, arg_name, data_model_name, model_id=None, contained_model_id=None, field_id=None, persistence_mapper=None, external_mapper=None, formatter=None)
Function to connect a workflow to a data entity in the middleware.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_id |
str |
The name of the workflow. |
required |
data_model_name |
str |
The name of the data model used for identifying the data model in the middleware. |
required |
model_id |
Optional[str] |
The id of the model in the data model. Defaults to None. |
None |
field_id |
Optional[str] |
The id of the field in the model. Defaults to None. |
None |
mapper |
Optional[Mapper] |
The mapper that should be used. Defaults to None. |
required |
formatter |
Optional[Formatter] |
The formatter that should be used. Defaults to None. |
None |
Source code in aas_middleware\middleware\middleware.py
def connect_workflow_to_persistence_provider(self, workflow_id: str, arg_name: str, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_id: typing.Optional[str]=None, persistence_mapper: typing.Optional[Mapper]=None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter]=None):
"""
Function to connect a workflow to a data entity in the middleware.
Args:
workflow_id (str): The name of the workflow.
data_model_name (str): The name of the data model used for identifying the data model in the middleware.
model_id (typing.Optional[str], optional): The id of the model in the data model. Defaults to None.
field_id (typing.Optional[str], optional): The id of the field in the model. Defaults to None.
mapper (typing.Optional[Mapper], optional): The mapper that should be used. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that should be used. Defaults to None.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_id)
workflow = self.workflow_registry.get_workflow(workflow_id)
synchronize_workflow_with_persistence_provider(workflow, arg_name, connection_info, self.persistence_registry, persistence_mapper, external_mapper, formatter)
# TODO: update workflow endpoint to have optional arguments
# TODO: register mappers, formatters in middleware and add endpoint for them, also add connection to workflow registry
generate_graphql_api_for_data_model(self, data_model_name)
Generates a GraphQL API with query operations for aas' and submodels from the loaded models.
Source code in aas_middleware\middleware\middleware.py
def generate_graphql_api_for_data_model(self, data_model_name: str):
"""
Generates a GraphQL API with query operations for aas' and submodels from the loaded models.
"""
data_model = self.data_models[data_model_name]
graphql_router = GraphQLRouter(data_model, data_model_name, self)
graphql_router.generate_graphql_endpoint()
generate_model_registry_api(self)
Adds a REST API so that new models can be registered and unregistered from the Middleware.
Source code in aas_middleware\middleware\middleware.py
def generate_model_registry_api(self):
"""
Adds a REST API so that new models can be registered and unregistered from the Middleware.
"""
# TODO: validate if this works and add it to the admin api...
router = generate_model_api(middleware_instance=self)
self.app.include_router(router)
NUM_REGISTRY_ROUTES = len(router.routes)
NUM_CONSTANT_ROUTES = 5
self.app.router.routes = (
self.app.router.routes[:NUM_CONSTANT_ROUTES]
+ self.app.routes[-NUM_REGISTRY_ROUTES:]
+ self.app.routes[NUM_CONSTANT_ROUTES:-NUM_REGISTRY_ROUTES]
)
generate_rest_api_for_data_model(self, data_model_name)
Generates a REST API with CRUD operations for aas' and submodels from the loaded models.
Source code in aas_middleware\middleware\middleware.py
def generate_rest_api_for_data_model(self, data_model_name: str):
"""
Generates a REST API with CRUD operations for aas' and submodels from the loaded models.
"""
data_model = self.data_models[data_model_name]
rest_router = RestRouter(data_model, data_model_name, self)
rest_router.generate_endpoints()
generate_rest_endpoint_for_connector(self, connector_id, connection_info=None)
Function to generate a REST endpoint for a connector.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_id |
str |
description |
required |
connection_info |
Optional[ConnectionInfo] |
description. Defaults to None. |
None |
Exceptions:
Type | Description |
---|---|
ValueError |
description |
Source code in aas_middleware\middleware\middleware.py
def generate_rest_endpoint_for_connector(self, connector_id: str, connection_info: typing.Optional[ConnectionInfo]=None):
"""
Function to generate a REST endpoint for a connector.
Args:
connector_id (str): _description_
connection_info (typing.Optional[ConnectionInfo], optional): _description_. Defaults to None.
Raises:
ValueError: _description_
"""
if not connector_id in self.connection_registry.connectors:
raise ValueError(f"Connector {connector_id} not found.")
connector = self.connection_registry.get_connector(connector_id)
model_type = self.connection_registry.connection_types[connector_id]
if not connection_info:
router = generate_connector_endpoint(connector_id, connector, model_type)
else:
router = generate_persistence_connector_endpoint(connector_id, connector, connection_info, model_type)
self.app.include_router(router)
get_value(self, data_model_name, model_id=None, contained_model_id=None, field_name=None)
async
Function to get a value from the persistence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_model_name |
str |
description |
required |
model_id |
Optional[str] |
description |
None |
field_name |
Optional[str] |
description |
None |
Returns:
Type | Description |
---|---|
Any |
description |
Source code in aas_middleware\middleware\middleware.py
async def get_value(self, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None) -> typing.Any:
"""
Function to get a value from the persistence.
Args:
data_model_name (str): _description_
model_id (typing.Optional[str]): _description_
field_name (typing.Optional[str]): _description_
Returns:
typing.Any: _description_
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
try:
connector = self.persistence_registry.get_connection(connection_info)
return await connector.provide()
except KeyError:
raise KeyError(f"No provider found for {connection_info}")
lifespan(self, app)
Function to create a lifespan for the middleware for all events on startup and shutdown.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
FastAPI |
The FastAPI app that should be used for the lifespan. |
required |
Source code in aas_middleware\middleware\middleware.py
@asynccontextmanager
async def lifespan(self, app: FastAPI):
"""
Function to create a lifespan for the middleware for all events on startup and shutdown.
Args:
app (FastAPI): The FastAPI app that should be used for the lifespan.
"""
for workflow in self.workflow_registry.get_workflows():
if workflow.on_startup:
# TODO: make a case distinction for workflows that postpone start up or not...
asyncio.create_task(workflow.execute())
for callback in self.on_start_up_callbacks:
await callback()
for connector in self.connection_registry.connectors.values():
await connector.connect()
for persistence in self.persistence_registry.connectors.values():
await persistence.connect()
yield
for workflow in self.workflow_registry.get_workflows():
if workflow.on_shutdown:
if workflow.running:
await workflow.interrupt()
await workflow.execute()
for callback in self.on_shutdown_callbacks:
await callback()
for connector in self.connection_registry.connectors.values():
await connector.disconnect()
for persistence in self.persistence_registry.connectors.values():
await persistence.disconnect()
load_aas_objectstore(self, models)
Functions that loads multiple aas and their submodels into the middleware that can be used for synchronization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
models |
List[model.DictObjectStore] |
Object store of aas' and submodels |
required |
Source code in aas_middleware\middleware\middleware.py
def load_aas_objectstore(self, models: model.DictObjectStore):
"""
Functions that loads multiple aas and their submodels into the middleware that can be used for synchronization.
Args:
models (typing.List[model.DictObjectStore]): Object store of aas' and submodels
"""
data_model = BasyxFormatter().deserialize(models)
self.load_data_model(data_model)
load_data_model(self, name, data_model, persist_instances=False)
Function to load a data model into the middleware to be used for synchronization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the data model. |
required |
data_model |
DataModel |
Data model containing the types and values. |
required |
persist_instances |
bool |
If the instances of the data model should be persisted. |
False |
Source code in aas_middleware\middleware\middleware.py
def load_data_model(self, name: str, data_model: DataModel, persist_instances: bool = False):
"""
Function to load a data model into the middleware to be used for synchronization.
Args:
name (str): The name of the data model.
data_model (DataModel): Data model containing the types and values.
persist_instances (bool): If the instances of the data model should be persisted.
"""
self.data_models[name] = data_model
if persist_instances:
for models_of_type in data_model.get_top_level_models().values():
if not models_of_type:
continue
model = models_of_type[0]
for model in models_of_type:
self.add_callback("on_start_up", self.persist, name, model)
load_json_models(self, json_models=None, all_fields_required=False)
Functions that loads models from a json dict into the middleware that can be used for synchronization.
The function can either be used with a dict that contains the objects.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_models |
dict |
Dictionary of aas' and submodels. |
None |
all_fields_required |
bool |
If all fields are required in the models. |
False |
Source code in aas_middleware\middleware\middleware.py
def load_json_models(
self,
json_models: typing.Dict[str, typing.Any] = None,
all_fields_required: bool = False,
):
"""
Functions that loads models from a json dict into the middleware that can be used for synchronization.
The function can either be used with a dict that contains the objects.
Args:
json_models (dict): Dictionary of aas' and submodels.
all_fields_required (bool): If all fields are required in the models.
"""
# TODO: use here the function to load a DataModel from a dict
# for model_name, model_values in json_models.items():
# pydantic_model = get_pydantic_model_from_dict(
# model_values, model_name, all_fields_required
# )
# self.models.append(pydantic_model)
load_model_instances(self, name, instances)
Functions that loads pydantic models into the middleware as a datamodel that can be used for synchronization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the data model. |
required |
instances |
List[BaseModel] |
List of pydantic model instances. |
required |
Source code in aas_middleware\middleware\middleware.py
def load_model_instances(self, name: str, instances: typing.List[BaseModel]):
"""
Functions that loads pydantic models into the middleware as a datamodel that can be used for synchronization.
Args:
name (str): The name of the data model.
instances (typing.List[BaseModel]): List of pydantic model instances.
"""
data_model = DataModel.from_models(*instances)
self.load_data_model(name, data_model)
load_pydantic_models(self, name, *models)
Functions that loads pydantic models into the middleware that can be used for synchronization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
models |
List[Type[BaseModel]] |
List of pydantic models. |
() |
Source code in aas_middleware\middleware\middleware.py
def load_pydantic_models(self, name: str, *models: typing.Tuple[typing.Type[BaseModel]]):
"""
Functions that loads pydantic models into the middleware that can be used for synchronization.
Args:
models (typing.List[typing.Type[BaseModel]]): List of pydantic models.
"""
data_model = DataModel.from_model_types(models)
self.load_data_model(data_model)
persist(self, data_model_name, model=None, persistence_factory=None)
async
Function to add a model to the persistence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_model_name |
str |
The name of the data model. |
required |
model |
Identifiable |
The model that should be persisted. |
None |
persistence_factory |
PersistenceFactory |
The persistence factory that should be used. |
None |
Exceptions:
Type | Description |
---|---|
ValueError |
If the connection already exists. |
Source code in aas_middleware\middleware\middleware.py
async def persist(self, data_model_name: str, model: typing.Optional[Identifiable]=None, persistence_factory: typing.Optional[PersistenceFactory]=None):
"""
Function to add a model to the persistence.
Args:
data_model_name (str): The name of the data model.
model (Identifiable): The model that should be persisted.
persistence_factory (PersistenceFactory): The persistence factory that should be used.
Raises:
ValueError: If the connection already exists.
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model.id, contained_model_id=None, field_id=None)
if connection_info in self.persistence_registry.connections:
raise ValueError(f"Connection {connection_info} already exists. Try using the existing connector or remove it first.")
self.persistence_registry.add_to_persistence(connection_info, model, persistence_factory)
connector = self.persistence_registry.get_connection(connection_info)
# TODO: raise an error if consume is not possible and remove the persistence in the persistence registry
await connector.consume(model)
set_meta_data(self, title, description, version, contact)
Function to set the meta data of the middleware.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
title |
str |
The title of the middleware. |
required |
description |
str |
The description of the middleware. |
required |
version |
str |
The version of the middleware. |
required |
contact |
Dict[str, str] |
The contact information of the middleware. |
required |
license_info |
Dict[str, str] |
The license information of the middleware. |
required |
Source code in aas_middleware\middleware\middleware.py
def set_meta_data(self, title: str, description: str, version: str, contact: typing.Dict[str, str]):
"""
Function to set the meta data of the middleware.
Args:
title (str): The title of the middleware.
description (str): The description of the middleware.
version (str): The version of the middleware.
contact (typing.Dict[str, str]): The contact information of the middleware.
license_info (typing.Dict[str, str]): The license information of the middleware.
"""
self.meta_data = MiddlewareMetaData(
title=title,
description=description,
version=version,
contact=contact
)
update_value(self, value, data_model_name, model_id=None, contained_model_id=None, field_name=None)
async
Function to update a value in the persistence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_model_name |
str |
description |
required |
model_id |
Optional[str] |
description |
None |
field_name |
Optional[str] |
description |
None |
value |
Any |
description |
required |
Source code in aas_middleware\middleware\middleware.py
async def update_value(self, value: typing.Any, data_model_name: str, model_id: typing.Optional[str]=None, contained_model_id: typing.Optional[str]=None, field_name: typing.Optional[str]=None):
"""
Function to update a value in the persistence.
Args:
data_model_name (str): _description_
model_id (typing.Optional[str]): _description_
field_name (typing.Optional[str]): _description_
value (typing.Any): _description_
"""
connection_info = ConnectionInfo(data_model_name=data_model_name, model_id=model_id, contained_model_id=contained_model_id, field_id=field_name)
try:
connector = self.persistence_registry.get_connection(connection_info)
await connector.consume(value)
except KeyError as e:
await self.persist(data_model_name, value)
MiddlewareMetaData (BaseModel)
Meta data for the middleware.
Source code in aas_middleware\middleware\middleware.py
class MiddlewareMetaData(BaseModel):
"""
Meta data for the middleware.
"""
title: str = "aas-middleware"
description: str = """
The aas-middleware allows to convert aas models to pydantic models and generate a REST or GraphQL API from them.
"""
version: str = Field(default=aas_middleware.VERSION)
contact: typing.Dict[str, str] = {
"name": "Sebastian Behrendt",
"email": "sebastian.behrendt@kit.edu",
}
license_info: typing.Dict[str, str] = Field(init=False, default_factory=get_license_info)
model_registry_api
generate_model_api(middleware_instance)
Generates endpoints to register und unregister models from the middleware.
Returns:
Type | Description |
---|---|
APIRouter |
FastAPI router with endpoints to register and register models. |
Source code in aas_middleware\middleware\model_registry_api.py
def generate_model_api(middleware_instance: Middleware) -> APIRouter:
"""
Generates endpoints to register und unregister models from the middleware.
Returns:
APIRouter: FastAPI router with endpoints to register and register models.
"""
# TODO: Also allow to retrieve and post models as JSONSchema -> with required / non-required fields.
router = APIRouter(
prefix=f"",
tags=["Model registry"],
responses={404: {"description": "Not found"}},
)
@router.get("/get_models", response_model=list)
async def get_models() -> List[Dict[str, str]]:
schemas = [
recursive_model_example_string_reformatter(model.schema())
for model in middleware_instance.models
]
return schemas
@router.post(
"/register_model",
response_model=dict,
)
async def post_model(model_name: str, model: dict) -> Dict[str, str]:
if any(
model_name == model_instance.__name__
for model_instance in middleware_instance.models
):
raise HTTPException(
403,
f"A model with the name {model_name} exists already! Please update the existing model.",
)
if not "id" in model.keys():
raise HTTPException(
403, f"Mandatory field id is missing for the model <{model_name}>."
)
for key, value in model.items():
if isinstance(value, dict) and not "id" in value.keys():
raise HTTPException(
403,
f"Mandatory field id is missing in submodel <{key}> for model <{model_name}>.",
)
register_model_from_middleware(model_name, model, middleware_instance)
return {"message": f"Succesfully created API for model {model_name}."}
@router.put("/update_model", response_model=dict)
async def update_model(model_name: str, model: dict) -> Dict[str, str]:
if not any(
model_name == model_instance.__name__
for model_instance in middleware_instance.models
):
raise HTTPException(
403,
f"A model with the name {model_name} does not exist yet! Please post a new model.",
)
if not "id" in model.keys():
raise HTTPException(
403, f"Mandatory field id is missing for the model <{model_name}>."
)
for key, value in model.items():
if isinstance(value, dict) and not "id" in value.keys():
raise HTTPException(
403,
f"Mandatory field id is missing in submodel <{key}> for model <{model_name}>.",
)
delete_model_from_middleware(model_name, middleware_instance)
register_model_from_middleware(model_name, model, middleware_instance)
return {"message": f"Succesfully updated API for model {model_name}."}
@router.delete("/delete_model", response_model=dict)
async def delete_model(model_name: str):
if not any(
model.__name__ == model_name for model in middleware_instance.models
):
raise HTTPException(
404, f"No model registered in middleware with name <{model_name}>"
)
delete_model_from_middleware(model_name, middleware_instance)
return {"message": f"Succesfully deleted API for model {model_name}."}
return router
remove_model_routes_from_app(app, model_name)
Function removes routes from app that contain the model_name as first route seperator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
FastAPI |
FastAPI app to remove routes from |
required |
model_name |
str |
model_name of model to remove from API of app |
required |
Source code in aas_middleware\middleware\model_registry_api.py
def remove_model_routes_from_app(app: FastAPI, model_name: str):
"""
Function removes routes from app that contain the model_name as first route seperator.
Args:
app (FastAPI): FastAPI app to remove routes from
model_name (str): model_name of model to remove from API of app
"""
indices_to_delete = []
for i, r in enumerate(app.routes):
if route_belongs_to_model(r, model_name):
indices_to_delete.append(i)
for index in sorted(indices_to_delete, reverse=True):
del app.routes[index]
update_openapi(app)
update_openapi(app)
Updates the openAPI schema of a fastAPI app during runtime to register updates.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app |
FastAPI |
app where the openapi schema should be updated. |
required |
Source code in aas_middleware\middleware\model_registry_api.py
def update_openapi(app: FastAPI):
"""
Updates the openAPI schema of a fastAPI app during runtime to register updates.
Args:
app (FastAPI): app where the openapi schema should be updated.
"""
app.openapi_schema = get_openapi(
title=app.title,
version=app.version,
openapi_version=app.openapi_version,
description=app.description,
terms_of_service=app.terms_of_service,
contact=app.contact,
license_info=app.license_info,
routes=app.routes,
tags=app.openapi_tags,
servers=app.servers,
)
registries
ConnectionInfo (BaseModel)
Class that contains the information of a connection of a provider and a consumer to the persistence layer.
Source code in aas_middleware\middleware\registries.py
class ConnectionInfo(BaseModel):
"""
Class that contains the information of a connection of a provider and a consumer to the persistence layer.
"""
data_model_name: str
model_id: typing.Optional[str] = None
contained_model_id: typing.Optional[str] = None
field_id: typing.Optional[str] = None
model_config = ConfigDict(frozen=True, protected_namespaces=())
@property
def connection_type(self) -> typing.Literal["data_model", "model", "contained_model", "field"]:
if self.model_id:
if self.contained_model_id:
if self.field_id:
return "field"
return "contained_model"
return "model"
return "data_model"
ConnectionRegistry
Class that manages the connections of the middleware.
Source code in aas_middleware\middleware\registries.py
class ConnectionRegistry:
"""
Class that manages the connections of the middleware.
"""
def __init__(self):
self.connectors: typing.Dict[str, Connector] = {}
self.connection_types: typing.Dict[str, typing.Type[Connector]] = {}
self.connections: typing.Dict[ConnectionInfo, typing.List[str]] = {}
def get_connector_id(self, connector: Connector) -> str:
"""
Function to get a connector id from the connection manager.
Args:
connector (Connector): The connector of the connection.
Returns:
str: The id of the connector.
Raises:
KeyError: If the connector is not in the connection manager.
"""
for connector_id, connector_ in self.connectors.items():
if connector == connector_:
return connector_id
raise KeyError(f"Connector {connector} is not in the connection manager.")
def get_connector(self, connector_id: str) -> Connector:
"""
Function to get a connector from the connection manager.
Args:
connector_id (str): The id of the connector.
Returns:
Connector: The connector of the connection.
Raises:
KeyError: If the connector id is not in the connection manager.
"""
return self.connectors[connector_id]
def add_connector(self, connector_id: str, connector: Connector, connection_type: typing.Type[Connector]):
"""
Function to add a connector to the connection manager.
Args:
connector_id (str): The name of the connector.
connector (Connector): The connector to be added.
connection_type (typing.Type[Connector]): The type of the connector.
"""
self.connectors[connector_id] = connector
self.connection_types[connector_id] = connection_type
def add_connection(self, connector_id: str, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
"""
Function to add a connection to the connection manager.
Args:
connector_id (str): The id of the connector.
connection_info (ConnectionInfo): The connection info of the connection.
connector (Connector): The connector of the connection.
type_connection_info (typing.Type[typing.Any]): The type of the connection info of the connection.
"""
if not connection_info in self.connections:
self.connections[connection_info] = []
self.connections[connection_info].append(connector_id)
self.add_connector(connector_id, connector, type_connection_info)
def get_connections(self, connection_info: ConnectionInfo) -> typing.List[typing.Tuple[Connector, typing.Type[typing.Any]]]:
"""
Function to get a connection from the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
Returns:
Connector: The connector of the connection.
"""
connector_ids = self.connections[connection_info]
connections = []
for connector_id in connector_ids:
connections.append((self.get_connector(connector_id), self.connection_types[connector_id]))
return connections
def get_data_model_connection_info(self, data_model_name: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a data model.
Args:
data_model_name (str): The name of the data model.
Returns:
typing.Set[ConnectionInfo]: The connection info of the data model.
"""
connection_infos = []
for connection_info in self.connections:
if not connection_info.data_model_name == data_model_name:
continue
connection_infos.append(connection_info)
return connection_infos
def get_model_connection_info(self, model_id: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a model.
Args:
model_id (str): The id of the model.
Returns:
typing.Set[ConnectionInfo]: The connection info of the model.
"""
connection_infos = []
for connection_info in self.connections:
if not connection_info.model_id == model_id:
continue
connection_infos.append(connection_info)
return connection_infos
def get_field_connection_info(self, field_id: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a field.
Args:
field_id (str): The id of the field.
Returns:
typing.Set[ConnectionInfo]: The connection info of the field.
"""
connection_infos = []
for connection_info in self.connections:
if not connection_info.field_id == field_id:
continue
connection_infos.append(connection_info)
return connection_infos
def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a type.
Args:
type_name (str): The name of the type.
Returns:
typing.Set[ConnectionInfo]: The connection info of the type.
"""
connection_infos = []
for connection_info, connections in self.connections.items():
for connection in connections:
if not connection[1].__name__ == type_name:
continue
connection_infos.append(connection_info)
return connection_infos
add_connection(self, connector_id, connection_info, connector, type_connection_info)
Function to add a connection to the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_id |
str |
The id of the connector. |
required |
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
connector |
Connector |
The connector of the connection. |
required |
type_connection_info |
Type[Any] |
The type of the connection info of the connection. |
required |
Source code in aas_middleware\middleware\registries.py
def add_connection(self, connector_id: str, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
"""
Function to add a connection to the connection manager.
Args:
connector_id (str): The id of the connector.
connection_info (ConnectionInfo): The connection info of the connection.
connector (Connector): The connector of the connection.
type_connection_info (typing.Type[typing.Any]): The type of the connection info of the connection.
"""
if not connection_info in self.connections:
self.connections[connection_info] = []
self.connections[connection_info].append(connector_id)
self.add_connector(connector_id, connector, type_connection_info)
add_connector(self, connector_id, connector, connection_type)
Function to add a connector to the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_id |
str |
The name of the connector. |
required |
connector |
Connector |
The connector to be added. |
required |
connection_type |
Type[Connector] |
The type of the connector. |
required |
Source code in aas_middleware\middleware\registries.py
def add_connector(self, connector_id: str, connector: Connector, connection_type: typing.Type[Connector]):
"""
Function to add a connector to the connection manager.
Args:
connector_id (str): The name of the connector.
connector (Connector): The connector to be added.
connection_type (typing.Type[Connector]): The type of the connector.
"""
self.connectors[connector_id] = connector
self.connection_types[connector_id] = connection_type
get_connections(self, connection_info)
Function to get a connection from the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
Returns:
Type | Description |
---|---|
Connector |
The connector of the connection. |
Source code in aas_middleware\middleware\registries.py
def get_connections(self, connection_info: ConnectionInfo) -> typing.List[typing.Tuple[Connector, typing.Type[typing.Any]]]:
"""
Function to get a connection from the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
Returns:
Connector: The connector of the connection.
"""
connector_ids = self.connections[connection_info]
connections = []
for connector_id in connector_ids:
connections.append((self.get_connector(connector_id), self.connection_types[connector_id]))
return connections
get_connector(self, connector_id)
Function to get a connector from the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector_id |
str |
The id of the connector. |
required |
Returns:
Type | Description |
---|---|
Connector |
The connector of the connection. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the connector id is not in the connection manager. |
Source code in aas_middleware\middleware\registries.py
def get_connector(self, connector_id: str) -> Connector:
"""
Function to get a connector from the connection manager.
Args:
connector_id (str): The id of the connector.
Returns:
Connector: The connector of the connection.
Raises:
KeyError: If the connector id is not in the connection manager.
"""
return self.connectors[connector_id]
get_connector_id(self, connector)
Function to get a connector id from the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector |
Connector |
The connector of the connection. |
required |
Returns:
Type | Description |
---|---|
str |
The id of the connector. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the connector is not in the connection manager. |
Source code in aas_middleware\middleware\registries.py
def get_connector_id(self, connector: Connector) -> str:
"""
Function to get a connector id from the connection manager.
Args:
connector (Connector): The connector of the connection.
Returns:
str: The id of the connector.
Raises:
KeyError: If the connector is not in the connection manager.
"""
for connector_id, connector_ in self.connectors.items():
if connector == connector_:
return connector_id
raise KeyError(f"Connector {connector} is not in the connection manager.")
get_data_model_connection_info(self, data_model_name)
Function to get the connection info of a data model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_model_name |
str |
The name of the data model. |
required |
Returns:
Type | Description |
---|---|
Set[ConnectionInfo] |
The connection info of the data model. |
Source code in aas_middleware\middleware\registries.py
def get_data_model_connection_info(self, data_model_name: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a data model.
Args:
data_model_name (str): The name of the data model.
Returns:
typing.Set[ConnectionInfo]: The connection info of the data model.
"""
connection_infos = []
for connection_info in self.connections:
if not connection_info.data_model_name == data_model_name:
continue
connection_infos.append(connection_info)
return connection_infos
get_field_connection_info(self, field_id)
Function to get the connection info of a field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
field_id |
str |
The id of the field. |
required |
Returns:
Type | Description |
---|---|
Set[ConnectionInfo] |
The connection info of the field. |
Source code in aas_middleware\middleware\registries.py
def get_field_connection_info(self, field_id: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a field.
Args:
field_id (str): The id of the field.
Returns:
typing.Set[ConnectionInfo]: The connection info of the field.
"""
connection_infos = []
for connection_info in self.connections:
if not connection_info.field_id == field_id:
continue
connection_infos.append(connection_info)
return connection_infos
get_model_connection_info(self, model_id)
Function to get the connection info of a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_id |
str |
The id of the model. |
required |
Returns:
Type | Description |
---|---|
Set[ConnectionInfo] |
The connection info of the model. |
Source code in aas_middleware\middleware\registries.py
def get_model_connection_info(self, model_id: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a model.
Args:
model_id (str): The id of the model.
Returns:
typing.Set[ConnectionInfo]: The connection info of the model.
"""
connection_infos = []
for connection_info in self.connections:
if not connection_info.model_id == model_id:
continue
connection_infos.append(connection_info)
return connection_infos
get_type_connection_info(self, type_name)
Function to get the connection info of a type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type_name |
str |
The name of the type. |
required |
Returns:
Type | Description |
---|---|
Set[ConnectionInfo] |
The connection info of the type. |
Source code in aas_middleware\middleware\registries.py
def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a type.
Args:
type_name (str): The name of the type.
Returns:
typing.Set[ConnectionInfo]: The connection info of the type.
"""
connection_infos = []
for connection_info, connections in self.connections.items():
for connection in connections:
if not connection[1].__name__ == type_name:
continue
connection_infos.append(connection_info)
return connection_infos
MapperRegistry
Class that manages the mappers of the middleware.
Source code in aas_middleware\middleware\registries.py
class MapperRegistry:
"""
Class that manages the mappers of the middleware.
"""
def __init__(self):
self.mappers: typing.Dict[str, Mapper] = {}
self.mapper_input_types: typing.Dict[str, typing.Type] = {}
self.mapper_output_types: typing.Dict[str, typing.Type] = {}
self.connections: typing.Dict[typing.Tuple[ConnectionInfo, ConnectionInfo], str] = {}
def add_mapper(self, mapper_id: str, mapper: Mapper, input_connection: typing.Optional[ConnectionInfo] = None, output_connection: typing.Optional[ConnectionInfo] = None):
"""
Function to add a mapper to the registry.
Args:
mapper_id (str): The name of the mapper.
mapper (Mapper): The mapper to be added.
input_connection (typing.Optional[ConnectionInfo]): The input connection of the mapper.
output_connection (typing.Optional[ConnectionInfo]): The output connection of the mapper.
"""
self.mappers[mapper_id] = mapper
self.mapper_input_types[mapper_id] = typing.get_type_hints(mapper.map)["data"]
self.mapper_output_types[mapper_id] = typing.get_type_hints(mapper.map)["return"]
if input_connection and output_connection:
self.connections[(input_connection, output_connection)] = mapper_id
elif input_connection or output_connection:
raise ValueError("Either None or both input and output connection must be provided.")
def get_mappers(self) -> typing.List[Mapper]:
"""
Function to get the mappers in the registry.
Returns:
typing.List[Mapper]: The mappers in the registry
"""
return list(self.mappers.values())
def get_mapper(self, mapper_id: str) -> Mapper:
"""
Function to get a mapper from the registry.
Args:
mapper_id (str): The id of the mapper.
Returns:
Mapper: The mapper from the registry.
Raises:
KeyError: If the mapper is not in the registry.
"""
return self.mappers[mapper_id]
def get_mapper_ids(self) -> typing.List[str]:
"""
Function to get the names of the mappers in the registry.
Returns:
typing.List[str]: The names of the mappers in the registry.
"""
return list(self.mappers.keys())
def get_mapper_connections(self) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
"""
Function to get the connections of the mappers in the registry.
Returns:
typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mappers in the registry.
"""
return list(self.connections.keys())
def get_mapper_by_input_connection(self, input_connection: ConnectionInfo) -> typing.Optional[Mapper]:
"""
Function to get a mapper by the input connection.
Args:
input_connection (ConnectionInfo): The input connection of the mapper.
Returns:
Mapper: The mapper of the input connection.
"""
for connection, mapper_id in self.connections.items():
if connection[0] == input_connection:
return self.mappers[mapper_id]
def get_mapper_by_output_connection(self, output_connection: ConnectionInfo) -> typing.Optional[Mapper]:
"""
Function to get a mapper by the output connection.
Args:
output_connection (ConnectionInfo): The output connection of the mapper.
Returns:
Mapper: The mapper of the output connection.
"""
for connection, mapper_id in self.connections.items():
if connection[1] == output_connection:
return self.mappers[mapper_id]
def get_connection_of_mapper(self, mapper_id: str) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
"""
Function to get the connections of a mapper.
Args:
mapper_id (str): The id of the mapper.
Returns:
typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mapper.
"""
connections = []
for connection, mapper_id2 in self.connections.items():
if mapper_id2 == self.mappers[mapper_id]:
connections.append(connection)
return connections
add_mapper(self, mapper_id, mapper, input_connection=None, output_connection=None)
Function to add a mapper to the registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapper_id |
str |
The name of the mapper. |
required |
mapper |
Mapper |
The mapper to be added. |
required |
input_connection |
Optional[ConnectionInfo] |
The input connection of the mapper. |
None |
output_connection |
Optional[ConnectionInfo] |
The output connection of the mapper. |
None |
Source code in aas_middleware\middleware\registries.py
def add_mapper(self, mapper_id: str, mapper: Mapper, input_connection: typing.Optional[ConnectionInfo] = None, output_connection: typing.Optional[ConnectionInfo] = None):
"""
Function to add a mapper to the registry.
Args:
mapper_id (str): The name of the mapper.
mapper (Mapper): The mapper to be added.
input_connection (typing.Optional[ConnectionInfo]): The input connection of the mapper.
output_connection (typing.Optional[ConnectionInfo]): The output connection of the mapper.
"""
self.mappers[mapper_id] = mapper
self.mapper_input_types[mapper_id] = typing.get_type_hints(mapper.map)["data"]
self.mapper_output_types[mapper_id] = typing.get_type_hints(mapper.map)["return"]
if input_connection and output_connection:
self.connections[(input_connection, output_connection)] = mapper_id
elif input_connection or output_connection:
raise ValueError("Either None or both input and output connection must be provided.")
get_connection_of_mapper(self, mapper_id)
Function to get the connections of a mapper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapper_id |
str |
The id of the mapper. |
required |
Returns:
Type | Description |
---|---|
List[Tuple[ConnectionInfo, ConnectionInfo]] |
The connections of the mapper. |
Source code in aas_middleware\middleware\registries.py
def get_connection_of_mapper(self, mapper_id: str) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
"""
Function to get the connections of a mapper.
Args:
mapper_id (str): The id of the mapper.
Returns:
typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mapper.
"""
connections = []
for connection, mapper_id2 in self.connections.items():
if mapper_id2 == self.mappers[mapper_id]:
connections.append(connection)
return connections
get_mapper(self, mapper_id)
Function to get a mapper from the registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapper_id |
str |
The id of the mapper. |
required |
Returns:
Type | Description |
---|---|
Mapper |
The mapper from the registry. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the mapper is not in the registry. |
Source code in aas_middleware\middleware\registries.py
get_mapper_by_input_connection(self, input_connection)
Function to get a mapper by the input connection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_connection |
ConnectionInfo |
The input connection of the mapper. |
required |
Returns:
Type | Description |
---|---|
Mapper |
The mapper of the input connection. |
Source code in aas_middleware\middleware\registries.py
def get_mapper_by_input_connection(self, input_connection: ConnectionInfo) -> typing.Optional[Mapper]:
"""
Function to get a mapper by the input connection.
Args:
input_connection (ConnectionInfo): The input connection of the mapper.
Returns:
Mapper: The mapper of the input connection.
"""
for connection, mapper_id in self.connections.items():
if connection[0] == input_connection:
return self.mappers[mapper_id]
get_mapper_by_output_connection(self, output_connection)
Function to get a mapper by the output connection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_connection |
ConnectionInfo |
The output connection of the mapper. |
required |
Returns:
Type | Description |
---|---|
Mapper |
The mapper of the output connection. |
Source code in aas_middleware\middleware\registries.py
def get_mapper_by_output_connection(self, output_connection: ConnectionInfo) -> typing.Optional[Mapper]:
"""
Function to get a mapper by the output connection.
Args:
output_connection (ConnectionInfo): The output connection of the mapper.
Returns:
Mapper: The mapper of the output connection.
"""
for connection, mapper_id in self.connections.items():
if connection[1] == output_connection:
return self.mappers[mapper_id]
get_mapper_connections(self)
Function to get the connections of the mappers in the registry.
Returns:
Type | Description |
---|---|
List[Tuple[ConnectionInfo, ConnectionInfo]] |
The connections of the mappers in the registry. |
Source code in aas_middleware\middleware\registries.py
def get_mapper_connections(self) -> typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]:
"""
Function to get the connections of the mappers in the registry.
Returns:
typing.List[typing.Tuple[ConnectionInfo, ConnectionInfo]]: The connections of the mappers in the registry.
"""
return list(self.connections.keys())
get_mapper_ids(self)
Function to get the names of the mappers in the registry.
Returns:
Type | Description |
---|---|
List[str] |
The names of the mappers in the registry. |
get_mappers(self)
Function to get the mappers in the registry.
Returns:
Type | Description |
---|---|
List[Mapper] |
The mappers in the registry |
PersistenceConnectionRegistry (ConnectionRegistry)
Class that manages the connections of the middleware.
Source code in aas_middleware\middleware\registries.py
class PersistenceConnectionRegistry(ConnectionRegistry):
"""
Class that manages the connections of the middleware.
"""
def __init__(self):
super().__init__()
self.connectors: typing.Dict[str, Connector] = {}
self.connection_types: typing.Dict[str, typing.Type[Connector]] = {}
self.connections: typing.Dict[ConnectionInfo, str] = {}
self.persistence_factories: typing.Dict[ConnectionInfo, typing.List[typing.Tuple[PersistenceFactory, typing.Type[typing.Any]]]] = {}
def get_connector_by_data_model_and_model_id(self, data_model_name: str, model_id: str) -> Connector:
"""
Function to get a connector from the connection manager.
Args:
data_model_name (str): The name of the data model.
model_id (str): The id of the model.
Returns:
Connector: The connector of the connection.
Raises:
KeyError: If the model id is not in the connection manager.
"""
connector_id = data_model_name + model_id + "_persistence"
return self.get_connector(connector_id)
def add_persistence_factory(self, connection_info: ConnectionInfo, model_type: typing.Type[typing.Any], persistence_factory: PersistenceFactory):
"""
Function to add a persistence factory to the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
persistence_factory (PersistenceFactory): The persistence factory of the connection.
"""
if not connection_info in self.persistence_factories:
self.persistence_factories[connection_info] = []
self.persistence_factories[connection_info].append((model_type, persistence_factory))
def get_default_persistence_factory(self, connection_info: ConnectionInfo, persisted_model_type: typing.Type[typing.Any]) -> PersistenceFactory:
"""
Function to get the default persistence factory of a connection.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
Returns:
PersistenceFactory: The default persistence factory of the connection.
"""
data_model_connection_info = ConnectionInfo(data_model_name=connection_info.data_model_name)
if not data_model_connection_info in self.persistence_factories:
logger.warning(f"No persistence factory found for {data_model_connection_info}. Using default persistence factory.")
return PersistenceFactory(ModelConnector)
for model_type, persistence_factory in self.persistence_factories[data_model_connection_info]:
if issubclass(persisted_model_type, model_type):
return persistence_factory
logger.warning(f"No persistence factory found for {data_model_connection_info} and model type {persisted_model_type.__name__}. Using default persistence factory.")
return PersistenceFactory(ModelConnector)
def add_to_persistence(self, connection_info: ConnectionInfo, model: Identifiable, persistence_factory: typing.Optional[PersistenceFactory]):
"""
Function to add a persistent connection to the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
"""
if not persistence_factory:
persistence_factory = self.get_default_persistence_factory(connection_info, type(model))
connector = persistence_factory.create(model)
self.add_connection(connection_info, connector, type(model))
def add_connection(self, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
"""
Function to add a connection to the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
connector (Connector): The connector of the connection.
"""
connector_id = connection_info.data_model_name + connection_info.model_id + "_persistence"
self.add_connector(connector_id, connector, type_connection_info)
self.connections[connection_info] = connector_id
def remove_connection(self, connection_info: ConnectionInfo):
"""
Function to remove a connection from the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
"""
del self.connections[connection_info]
# TODO: also delete connector and connection type
def get_connection(self, connection_info: ConnectionInfo) -> Connector:
"""
Function to get a connection from the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
Returns:
Connector: The connector of the connection.
Raises:
KeyError: If the connection info is not in the connection manager.
"""
if connection_info in self.connections:
return self.get_connector(self.connections[connection_info])
raise KeyError(f"Data model Connection info {connection_info} is not in the connection manager.")
def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a type.
Args:
type_name (str): The name of the type.
Returns:
typing.Set[ConnectionInfo]: The connection info of the type.
"""
connection_infos = []
for connection_info, connector_id in self.connections.items():
model_type = self.connection_types[connector_id]
if not model_type.__name__ == type_name:
continue
connection_infos.append(connection_info)
return connection_infos
add_connection(self, connection_info, connector, type_connection_info)
Function to add a connection to the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
connector |
Connector |
The connector of the connection. |
required |
Source code in aas_middleware\middleware\registries.py
def add_connection(self, connection_info: ConnectionInfo, connector: Connector, type_connection_info: typing.Type[typing.Any]):
"""
Function to add a connection to the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
connector (Connector): The connector of the connection.
"""
connector_id = connection_info.data_model_name + connection_info.model_id + "_persistence"
self.add_connector(connector_id, connector, type_connection_info)
self.connections[connection_info] = connector_id
add_persistence_factory(self, connection_info, model_type, persistence_factory)
Function to add a persistence factory to the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
persistence_factory |
PersistenceFactory |
The persistence factory of the connection. |
required |
Source code in aas_middleware\middleware\registries.py
def add_persistence_factory(self, connection_info: ConnectionInfo, model_type: typing.Type[typing.Any], persistence_factory: PersistenceFactory):
"""
Function to add a persistence factory to the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
persistence_factory (PersistenceFactory): The persistence factory of the connection.
"""
if not connection_info in self.persistence_factories:
self.persistence_factories[connection_info] = []
self.persistence_factories[connection_info].append((model_type, persistence_factory))
add_to_persistence(self, connection_info, model, persistence_factory)
Function to add a persistent connection to the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
Source code in aas_middleware\middleware\registries.py
def add_to_persistence(self, connection_info: ConnectionInfo, model: Identifiable, persistence_factory: typing.Optional[PersistenceFactory]):
"""
Function to add a persistent connection to the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
"""
if not persistence_factory:
persistence_factory = self.get_default_persistence_factory(connection_info, type(model))
connector = persistence_factory.create(model)
self.add_connection(connection_info, connector, type(model))
get_connection(self, connection_info)
Function to get a connection from the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
Returns:
Type | Description |
---|---|
Connector |
The connector of the connection. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the connection info is not in the connection manager. |
Source code in aas_middleware\middleware\registries.py
def get_connection(self, connection_info: ConnectionInfo) -> Connector:
"""
Function to get a connection from the connection manager.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
Returns:
Connector: The connector of the connection.
Raises:
KeyError: If the connection info is not in the connection manager.
"""
if connection_info in self.connections:
return self.get_connector(self.connections[connection_info])
raise KeyError(f"Data model Connection info {connection_info} is not in the connection manager.")
get_connector_by_data_model_and_model_id(self, data_model_name, model_id)
Function to get a connector from the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_model_name |
str |
The name of the data model. |
required |
model_id |
str |
The id of the model. |
required |
Returns:
Type | Description |
---|---|
Connector |
The connector of the connection. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the model id is not in the connection manager. |
Source code in aas_middleware\middleware\registries.py
def get_connector_by_data_model_and_model_id(self, data_model_name: str, model_id: str) -> Connector:
"""
Function to get a connector from the connection manager.
Args:
data_model_name (str): The name of the data model.
model_id (str): The id of the model.
Returns:
Connector: The connector of the connection.
Raises:
KeyError: If the model id is not in the connection manager.
"""
connector_id = data_model_name + model_id + "_persistence"
return self.get_connector(connector_id)
get_default_persistence_factory(self, connection_info, persisted_model_type)
Function to get the default persistence factory of a connection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
Returns:
Type | Description |
---|---|
PersistenceFactory |
The default persistence factory of the connection. |
Source code in aas_middleware\middleware\registries.py
def get_default_persistence_factory(self, connection_info: ConnectionInfo, persisted_model_type: typing.Type[typing.Any]) -> PersistenceFactory:
"""
Function to get the default persistence factory of a connection.
Args:
connection_info (ConnectionInfo): The connection info of the connection.
Returns:
PersistenceFactory: The default persistence factory of the connection.
"""
data_model_connection_info = ConnectionInfo(data_model_name=connection_info.data_model_name)
if not data_model_connection_info in self.persistence_factories:
logger.warning(f"No persistence factory found for {data_model_connection_info}. Using default persistence factory.")
return PersistenceFactory(ModelConnector)
for model_type, persistence_factory in self.persistence_factories[data_model_connection_info]:
if issubclass(persisted_model_type, model_type):
return persistence_factory
logger.warning(f"No persistence factory found for {data_model_connection_info} and model type {persisted_model_type.__name__}. Using default persistence factory.")
return PersistenceFactory(ModelConnector)
get_type_connection_info(self, type_name)
Function to get the connection info of a type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type_name |
str |
The name of the type. |
required |
Returns:
Type | Description |
---|---|
Set[ConnectionInfo] |
The connection info of the type. |
Source code in aas_middleware\middleware\registries.py
def get_type_connection_info(self, type_name: str) -> typing.List[ConnectionInfo]:
"""
Function to get the connection info of a type.
Args:
type_name (str): The name of the type.
Returns:
typing.Set[ConnectionInfo]: The connection info of the type.
"""
connection_infos = []
for connection_info, connector_id in self.connections.items():
model_type = self.connection_types[connector_id]
if not model_type.__name__ == type_name:
continue
connection_infos.append(connection_info)
return connection_infos
remove_connection(self, connection_info)
Function to remove a connection from the connection manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connection_info |
ConnectionInfo |
The connection info of the connection. |
required |
Source code in aas_middleware\middleware\registries.py
WorkflowRegistry
Class that manages the workflows of the registry.
Source code in aas_middleware\middleware\registries.py
class WorkflowRegistry:
"""
Class that manages the workflows of the registry.
"""
def __init__(self):
self.workflows: typing.Dict[str, Workflow] = {}
self.workflow_providers: typing.Dict[str, typing.List[typing.Tuple[ConnectionInfo, Provider]]] = {}
self.workflow_consumers: typing.Dict[str, typing.List[typing.Tuple[ConnectionInfo, Consumer]]] = {}
def add_workflow(self, workflow: Workflow):
"""
Function to add a workflow to the registry.
Args:
workflow (Workflow): The workflow to be added.
"""
self.workflows[workflow.get_name()] = workflow
def add_provider_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, provider: Provider):
"""
Function to add a provider to a workflow.
Args:
workflow_name (str): The name of the workflow.
connection_info (ConnectionInfo): The connection info of the provider.
provider (Provider): The provider to be added.
"""
if not workflow_name in self.workflow_providers:
self.workflow_providers[workflow_name] = []
self.workflow_providers[workflow_name].append((connection_info, provider))
def add_consumer_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, connector: Connector):
"""
Function to add a consumer to a workflow.
Args:
workflow_name (str): The name of the workflow.
connection_info (ConnectionInfo): The connection info of the consumer.
connector (Connector): The connector to be added.
"""
if not workflow_name in self.workflow_consumers:
self.workflow_consumers[workflow_name] = []
self.workflow_consumers[workflow_name].append((connection_info, connector))
def get_workflows(self) -> typing.List[Workflow]:
"""
Function to get the workflows in the registry.
Returns:
typing.List[Workflow]: The workflows in the registry.
"""
return list(self.workflows.values())
def get_workflow(self, workflow_name: str) -> Workflow:
"""
Function to get a workflow from the registry.
Args:
workflow_name (str): The name of the workflow.
Returns:
Workflow: The workflow from the registry.
Raises:
KeyError: If the workflow is not in the registry.
"""
return self.workflows[workflow_name]
def get_connections_of_workflow(self, workflow_name: str) -> typing.Tuple[typing.List[typing.Tuple[ConnectionInfo, Provider]], typing.List[typing.Tuple[ConnectionInfo, Consumer]]]:
"""
Function to get the connections of a workflow.
Args:
workflow_name (str): The name of the workflow.
Returns:
typing.Tuple[typing.List[ConnectionInfo, Provider], typing.List[ConnectionInfo, Consumer]]: The connections of the workflow.
"""
return self.workflow_providers[workflow_name], self.workflow_consumers[workflow_name]
def get_providers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Provider]]:
"""
Function to get the providers of a workflow.
Args:
workflow_name (str): The name of the workflow.
Returns:
typing.List[typing.Tuple[ConnectionInfo, Provider]]: The providers of the workflow.
"""
return self.workflow_providers[workflow_name]
def get_consumers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Consumer]]:
"""
Function to get the consumers of a workflow.
Args:
workflow_name (str): The name of the workflow.
Returns:
typing.List[typing.Tuple[ConnectionInfo, Consumer]]: The consumers of the workflow.
"""
return self.workflow_consumers[workflow_name]
def get_workflow_names(self) -> typing.List[str]:
"""
Function to get the names of the workflows in the registry.
Returns:
typing.List[str]: The names of the workflows in the registry.
"""
return list(self.workflows.keys())
def get_workflow_descriptions(self) -> typing.List[WorkflowDescription]:
"""
Function to get the descriptions of the workflows in the registry.
Returns:
typing.List[WorkflowDescription]: The descriptions of the workflows in the registry.
"""
return [workflow.get_description() for workflow in self.workflows.values()]
add_consumer_to_workflow(self, workflow_name, connection_info, connector)
Function to add a consumer to a workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_name |
str |
The name of the workflow. |
required |
connection_info |
ConnectionInfo |
The connection info of the consumer. |
required |
connector |
Connector |
The connector to be added. |
required |
Source code in aas_middleware\middleware\registries.py
def add_consumer_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, connector: Connector):
"""
Function to add a consumer to a workflow.
Args:
workflow_name (str): The name of the workflow.
connection_info (ConnectionInfo): The connection info of the consumer.
connector (Connector): The connector to be added.
"""
if not workflow_name in self.workflow_consumers:
self.workflow_consumers[workflow_name] = []
self.workflow_consumers[workflow_name].append((connection_info, connector))
add_provider_to_workflow(self, workflow_name, connection_info, provider)
Function to add a provider to a workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_name |
str |
The name of the workflow. |
required |
connection_info |
ConnectionInfo |
The connection info of the provider. |
required |
provider |
Provider |
The provider to be added. |
required |
Source code in aas_middleware\middleware\registries.py
def add_provider_to_workflow(self, workflow_name: str, connection_info: ConnectionInfo, provider: Provider):
"""
Function to add a provider to a workflow.
Args:
workflow_name (str): The name of the workflow.
connection_info (ConnectionInfo): The connection info of the provider.
provider (Provider): The provider to be added.
"""
if not workflow_name in self.workflow_providers:
self.workflow_providers[workflow_name] = []
self.workflow_providers[workflow_name].append((connection_info, provider))
add_workflow(self, workflow)
Function to add a workflow to the registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
The workflow to be added. |
required |
get_connections_of_workflow(self, workflow_name)
Function to get the connections of a workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_name |
str |
The name of the workflow. |
required |
Returns:
Type | Description |
---|---|
Tuple[List[ConnectionInfo, Provider], List[ConnectionInfo, Consumer]] |
The connections of the workflow. |
Source code in aas_middleware\middleware\registries.py
def get_connections_of_workflow(self, workflow_name: str) -> typing.Tuple[typing.List[typing.Tuple[ConnectionInfo, Provider]], typing.List[typing.Tuple[ConnectionInfo, Consumer]]]:
"""
Function to get the connections of a workflow.
Args:
workflow_name (str): The name of the workflow.
Returns:
typing.Tuple[typing.List[ConnectionInfo, Provider], typing.List[ConnectionInfo, Consumer]]: The connections of the workflow.
"""
return self.workflow_providers[workflow_name], self.workflow_consumers[workflow_name]
get_consumers(self, workflow_name)
Function to get the consumers of a workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_name |
str |
The name of the workflow. |
required |
Returns:
Type | Description |
---|---|
List[Tuple[ConnectionInfo, Consumer]] |
The consumers of the workflow. |
Source code in aas_middleware\middleware\registries.py
def get_consumers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Consumer]]:
"""
Function to get the consumers of a workflow.
Args:
workflow_name (str): The name of the workflow.
Returns:
typing.List[typing.Tuple[ConnectionInfo, Consumer]]: The consumers of the workflow.
"""
return self.workflow_consumers[workflow_name]
get_providers(self, workflow_name)
Function to get the providers of a workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_name |
str |
The name of the workflow. |
required |
Returns:
Type | Description |
---|---|
List[Tuple[ConnectionInfo, Provider]] |
The providers of the workflow. |
Source code in aas_middleware\middleware\registries.py
def get_providers(self, workflow_name: str) -> typing.List[typing.Tuple[ConnectionInfo, Provider]]:
"""
Function to get the providers of a workflow.
Args:
workflow_name (str): The name of the workflow.
Returns:
typing.List[typing.Tuple[ConnectionInfo, Provider]]: The providers of the workflow.
"""
return self.workflow_providers[workflow_name]
get_workflow(self, workflow_name)
Function to get a workflow from the registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow_name |
str |
The name of the workflow. |
required |
Returns:
Type | Description |
---|---|
Workflow |
The workflow from the registry. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the workflow is not in the registry. |
Source code in aas_middleware\middleware\registries.py
def get_workflow(self, workflow_name: str) -> Workflow:
"""
Function to get a workflow from the registry.
Args:
workflow_name (str): The name of the workflow.
Returns:
Workflow: The workflow from the registry.
Raises:
KeyError: If the workflow is not in the registry.
"""
return self.workflows[workflow_name]
get_workflow_descriptions(self)
Function to get the descriptions of the workflows in the registry.
Returns:
Type | Description |
---|---|
List[WorkflowDescription] |
The descriptions of the workflows in the registry. |
Source code in aas_middleware\middleware\registries.py
def get_workflow_descriptions(self) -> typing.List[WorkflowDescription]:
"""
Function to get the descriptions of the workflows in the registry.
Returns:
typing.List[WorkflowDescription]: The descriptions of the workflows in the registry.
"""
return [workflow.get_description() for workflow in self.workflows.values()]
get_workflow_names(self)
Function to get the names of the workflows in the registry.
Returns:
Type | Description |
---|---|
List[str] |
The names of the workflows in the registry. |
get_workflows(self)
Function to get the workflows in the registry.
Returns:
Type | Description |
---|---|
List[Workflow] |
The workflows in the registry. |
rest_routers
RestRouter
Source code in aas_middleware\middleware\rest_routers.py
class RestRouter:
def __init__(self, data_model: DataModel, data_model_name: str, middleware: "Middleware"):
self.data_model = data_model
self.data_model_name = data_model_name
self.aas_data_model = data_model
self.middleware = middleware
def get_connector(self, item_id: str) -> Connector:
return self.middleware.persistence_registry.get_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))
def generate_endpoints_from_contained_model(
self,
aas_model_type: Type[AAS],
attribute_name: str,
submodel_model_type: Type[Submodel],
) -> APIRouter:
"""
Generates CRUD endpoints for a submodel of a pydantic model representing an aas.
Args:
aas_model_type (Type[BaseModel]): Pydantic model representing the aas of the submodel.
submodel_model_type (Type[base.Submodel]): Pydantic model representing the submodel.
Returns:
APIRouter: FastAPI router with CRUD endpoints for the given submodel that performs Middleware syxnchronization.
"""
model_name = aas_model_type.__name__
optional_submodel = check_if_attribute_is_optional_in_aas(aas_model_type, attribute_name)
# TODO: the data model name should be used for creating the endpoint
# TODO: adjust that no aas or submodel reference appears in the router -> should work for all models.
router = APIRouter(
prefix=f"/{model_name}/{{item_id}}/{attribute_name}",
tags=[model_name],
responses={404: {"description": "Not found"}},
)
@router.get(
"/",
response_model=submodel_model_type,
)
async def get_item(item_id: str):
try:
model = await self.get_connector(item_id).provide()
return getattr(model, attribute_name)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Submodel with id {item_id} could not be retrieved. Error: {e}"
)
if optional_submodel:
@router.post("/")
async def post_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
connector = self.get_connector(item_id)
try:
provided_data = await connector.provide()
# TODO: update that the correct type is immediately returned -> using model validate inside the connector
provided_data_dict = provided_data.model_dump()
model = aas_model_type.model_validate(provided_data_dict)
setattr(model, attribute_name, item)
await connector.consume(model)
return {
"message": f"Succesfully created attribute {attribute_name} of aas with id {item_id}"
}
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Attribute {attribute_name} for model with id {item_id} could not be set. Error: {e}"
)
@router.put("/")
async def put_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
connector = self.get_connector(item_id)
try:
model = await connector.provide()
if getattr(model, attribute_name) == item:
return {
"message": f"Attribute {attribute_name} of model with id {item_id} is already up to date"
}
setattr(model, attribute_name, item)
await connector.consume(model)
return {
"message": f"Succesfully updated attribute {attribute_name} of model with id {item_id}"
}
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Attribute {attribute_name} of model with id {item_id} could not be updated. Error: {e}"
)
if optional_submodel:
@router.delete("/")
async def delete_item(item_id: str):
connector = self.get_connector(item_id)
try:
model = await connector.provide()
setattr(model, attribute_name, None)
await connector.consume(model)
return {
"message": f"Succesfully deleted attribute {attribute_name} of model with id {item_id}"
}
except Exception as e:
raise HTTPException(
status_code=400, detail=f"attribute {attribute_name} of model with id {item_id} could not be deleted. Error: {e}"
)
return router
def generate_aas_endpoints_from_model(self, aas_model_type: Type[AAS]) -> APIRouter:
"""
Generates CRUD endpoints for a pydantic model representing an aas.
Args:
aas_model_type (Type[AAS]): Pydantic model representing an aas
Returns:
APIRouter: FastAPI router with CRUD endpoints for the given pydantic model that performs Middleware syxnchronization.
"""
router = APIRouter(
prefix=f"/{aas_model_type.__name__}",
tags=[aas_model_type.__name__],
responses={404: {"description": "Not found"}},
)
@router.get("/", response_model=List[aas_model_type])
async def get_items():
aas_list = []
connection_infos = self.middleware.persistence_registry.get_type_connection_info(aas_model_type.__name__)
for connection_info in connection_infos:
connector = self.middleware.persistence_registry.get_connection(connection_info)
retrieved_aas = await connector.provide()
aas_list.append(retrieved_aas)
return aas_list
@router.post(f"/", response_model=Dict[str, str])
async def post_item(item: aas_model_type) -> Dict[str, str]:
try:
await self.middleware.persist(data_model_name=self.data_model_name, model=item)
return {
"message": f"Succesfully created aas {aas_model_type.__name__} with id {item.id}"
}
except ValueError:
raise HTTPException(
status_code=400, detail=f"AAS with id {item.id} already exists"
)
@router.get("/{item_id}", response_model=aas_model_type)
async def get_item(item_id: str):
try:
connector = self.get_connector(item_id)
provided_data = await connector.provide()
# TODO: update that the correct type is immediately returned -> using model validate inside the connector
provided_data_dict = provided_data.model_dump()
return aas_model_type.model_validate(provided_data_dict)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Error: {e}"
)
@router.put("/{item_id}")
async def put_item(item_id: str, item: aas_model_type) -> Dict[str, str]:
try:
consumer = self.get_connector(item_id)
except KeyError as e:
raise HTTPException(
status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Try posting it at first."
)
# TODO: add some exception handling below
if item_id == item.id:
await consumer.consume(item)
else:
await self.middleware.persist(data_model_name=self.data_model_name, model=item)
await delete_item(item_id)
return {"message": f"Succesfully updated aas with id {item.id}"}
@router.delete("/{item_id}")
async def delete_item(item_id: str):
await self.get_connector(item_id).consume(None)
self.middleware.persistence_registry.remove_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))
return {"message": f"Succesfully deleted aas with id {item_id}"}
return router
def generate_endpoints_from_model(self, pydantic_model: Type[BaseModel]) -> List[APIRouter]:
"""
Generates CRUD endpoints for a pydantic model representing an aas and its submodels.
Args:
pydantic_model (Type[BaseModel]): Pydantic model representing an aas with submodels.
Returns:
List[APIRouter]: List of FastAPI routers with CRUD endpoints for the given pydantic model and its submodels that perform Middleware syxnchronization.
"""
routers = []
routers.append(self.generate_aas_endpoints_from_model(pydantic_model))
attribute_infos = get_contained_models_attribute_info(pydantic_model)
for attribute_name, contained_model in attribute_infos:
routers.append(self.generate_endpoints_from_contained_model(pydantic_model, attribute_name, contained_model))
return routers
def generate_endpoints(self):
"""
Generates CRUD endpoints for a pydantic model representing an aas and its submodels and adds them to the middleware app.
"""
routers = []
for top_level_model_type in self.aas_data_model.get_top_level_types():
routers += self.generate_endpoints_from_model(top_level_model_type)
for router in routers:
self.middleware.app.include_router(router)
generate_aas_endpoints_from_model(self, aas_model_type)
Generates CRUD endpoints for a pydantic model representing an aas.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_model_type |
Type[AAS] |
Pydantic model representing an aas |
required |
Returns:
Type | Description |
---|---|
APIRouter |
FastAPI router with CRUD endpoints for the given pydantic model that performs Middleware syxnchronization. |
Source code in aas_middleware\middleware\rest_routers.py
def generate_aas_endpoints_from_model(self, aas_model_type: Type[AAS]) -> APIRouter:
"""
Generates CRUD endpoints for a pydantic model representing an aas.
Args:
aas_model_type (Type[AAS]): Pydantic model representing an aas
Returns:
APIRouter: FastAPI router with CRUD endpoints for the given pydantic model that performs Middleware syxnchronization.
"""
router = APIRouter(
prefix=f"/{aas_model_type.__name__}",
tags=[aas_model_type.__name__],
responses={404: {"description": "Not found"}},
)
@router.get("/", response_model=List[aas_model_type])
async def get_items():
aas_list = []
connection_infos = self.middleware.persistence_registry.get_type_connection_info(aas_model_type.__name__)
for connection_info in connection_infos:
connector = self.middleware.persistence_registry.get_connection(connection_info)
retrieved_aas = await connector.provide()
aas_list.append(retrieved_aas)
return aas_list
@router.post(f"/", response_model=Dict[str, str])
async def post_item(item: aas_model_type) -> Dict[str, str]:
try:
await self.middleware.persist(data_model_name=self.data_model_name, model=item)
return {
"message": f"Succesfully created aas {aas_model_type.__name__} with id {item.id}"
}
except ValueError:
raise HTTPException(
status_code=400, detail=f"AAS with id {item.id} already exists"
)
@router.get("/{item_id}", response_model=aas_model_type)
async def get_item(item_id: str):
try:
connector = self.get_connector(item_id)
provided_data = await connector.provide()
# TODO: update that the correct type is immediately returned -> using model validate inside the connector
provided_data_dict = provided_data.model_dump()
return aas_model_type.model_validate(provided_data_dict)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Error: {e}"
)
@router.put("/{item_id}")
async def put_item(item_id: str, item: aas_model_type) -> Dict[str, str]:
try:
consumer = self.get_connector(item_id)
except KeyError as e:
raise HTTPException(
status_code=400, detail=f"AAS with id {item_id} could not be retrieved. Try posting it at first."
)
# TODO: add some exception handling below
if item_id == item.id:
await consumer.consume(item)
else:
await self.middleware.persist(data_model_name=self.data_model_name, model=item)
await delete_item(item_id)
return {"message": f"Succesfully updated aas with id {item.id}"}
@router.delete("/{item_id}")
async def delete_item(item_id: str):
await self.get_connector(item_id).consume(None)
self.middleware.persistence_registry.remove_connection(ConnectionInfo(data_model_name=self.data_model_name, model_id=item_id))
return {"message": f"Succesfully deleted aas with id {item_id}"}
return router
generate_endpoints(self)
Generates CRUD endpoints for a pydantic model representing an aas and its submodels and adds them to the middleware app.
Source code in aas_middleware\middleware\rest_routers.py
def generate_endpoints(self):
"""
Generates CRUD endpoints for a pydantic model representing an aas and its submodels and adds them to the middleware app.
"""
routers = []
for top_level_model_type in self.aas_data_model.get_top_level_types():
routers += self.generate_endpoints_from_model(top_level_model_type)
for router in routers:
self.middleware.app.include_router(router)
generate_endpoints_from_contained_model(self, aas_model_type, attribute_name, submodel_model_type)
Generates CRUD endpoints for a submodel of a pydantic model representing an aas.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_model_type |
Type[BaseModel] |
Pydantic model representing the aas of the submodel. |
required |
submodel_model_type |
Type[base.Submodel] |
Pydantic model representing the submodel. |
required |
Returns:
Type | Description |
---|---|
APIRouter |
FastAPI router with CRUD endpoints for the given submodel that performs Middleware syxnchronization. |
Source code in aas_middleware\middleware\rest_routers.py
def generate_endpoints_from_contained_model(
self,
aas_model_type: Type[AAS],
attribute_name: str,
submodel_model_type: Type[Submodel],
) -> APIRouter:
"""
Generates CRUD endpoints for a submodel of a pydantic model representing an aas.
Args:
aas_model_type (Type[BaseModel]): Pydantic model representing the aas of the submodel.
submodel_model_type (Type[base.Submodel]): Pydantic model representing the submodel.
Returns:
APIRouter: FastAPI router with CRUD endpoints for the given submodel that performs Middleware syxnchronization.
"""
model_name = aas_model_type.__name__
optional_submodel = check_if_attribute_is_optional_in_aas(aas_model_type, attribute_name)
# TODO: the data model name should be used for creating the endpoint
# TODO: adjust that no aas or submodel reference appears in the router -> should work for all models.
router = APIRouter(
prefix=f"/{model_name}/{{item_id}}/{attribute_name}",
tags=[model_name],
responses={404: {"description": "Not found"}},
)
@router.get(
"/",
response_model=submodel_model_type,
)
async def get_item(item_id: str):
try:
model = await self.get_connector(item_id).provide()
return getattr(model, attribute_name)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Submodel with id {item_id} could not be retrieved. Error: {e}"
)
if optional_submodel:
@router.post("/")
async def post_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
connector = self.get_connector(item_id)
try:
provided_data = await connector.provide()
# TODO: update that the correct type is immediately returned -> using model validate inside the connector
provided_data_dict = provided_data.model_dump()
model = aas_model_type.model_validate(provided_data_dict)
setattr(model, attribute_name, item)
await connector.consume(model)
return {
"message": f"Succesfully created attribute {attribute_name} of aas with id {item_id}"
}
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Attribute {attribute_name} for model with id {item_id} could not be set. Error: {e}"
)
@router.put("/")
async def put_item(item_id: str, item: submodel_model_type) -> Dict[str, str]:
connector = self.get_connector(item_id)
try:
model = await connector.provide()
if getattr(model, attribute_name) == item:
return {
"message": f"Attribute {attribute_name} of model with id {item_id} is already up to date"
}
setattr(model, attribute_name, item)
await connector.consume(model)
return {
"message": f"Succesfully updated attribute {attribute_name} of model with id {item_id}"
}
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Attribute {attribute_name} of model with id {item_id} could not be updated. Error: {e}"
)
if optional_submodel:
@router.delete("/")
async def delete_item(item_id: str):
connector = self.get_connector(item_id)
try:
model = await connector.provide()
setattr(model, attribute_name, None)
await connector.consume(model)
return {
"message": f"Succesfully deleted attribute {attribute_name} of model with id {item_id}"
}
except Exception as e:
raise HTTPException(
status_code=400, detail=f"attribute {attribute_name} of model with id {item_id} could not be deleted. Error: {e}"
)
return router
generate_endpoints_from_model(self, pydantic_model)
Generates CRUD endpoints for a pydantic model representing an aas and its submodels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pydantic_model |
Type[BaseModel] |
Pydantic model representing an aas with submodels. |
required |
Returns:
Type | Description |
---|---|
List[APIRouter] |
List of FastAPI routers with CRUD endpoints for the given pydantic model and its submodels that perform Middleware syxnchronization. |
Source code in aas_middleware\middleware\rest_routers.py
def generate_endpoints_from_model(self, pydantic_model: Type[BaseModel]) -> List[APIRouter]:
"""
Generates CRUD endpoints for a pydantic model representing an aas and its submodels.
Args:
pydantic_model (Type[BaseModel]): Pydantic model representing an aas with submodels.
Returns:
List[APIRouter]: List of FastAPI routers with CRUD endpoints for the given pydantic model and its submodels that perform Middleware syxnchronization.
"""
routers = []
routers.append(self.generate_aas_endpoints_from_model(pydantic_model))
attribute_infos = get_contained_models_attribute_info(pydantic_model)
for attribute_name, contained_model in attribute_infos:
routers.append(self.generate_endpoints_from_contained_model(pydantic_model, attribute_name, contained_model))
return routers
check_if_attribute_is_optional_in_aas(aas, attribute_name)
Checks if a submodel is an optional attribute in an aas.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas |
Type[base.AAS] |
AAS model. |
required |
submodel |
Type[base.Submodel] |
Submodel to be checked. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the submodel is not a submodel of the aas. |
Returns:
Type | Description |
---|---|
bool |
True if the submodel is an optional attribute in the aas, False otherwise. |
Source code in aas_middleware\middleware\rest_routers.py
def check_if_attribute_is_optional_in_aas(
aas: Type[AAS], attribute_name: str
) -> bool:
"""
Checks if a submodel is an optional attribute in an aas.
Args:
aas (Type[base.AAS]): AAS model.
submodel (Type[base.Submodel]): Submodel to be checked.
Raises:
ValueError: If the submodel is not a submodel of the aas.
Returns:
bool: True if the submodel is an optional attribute in the aas, False otherwise.
"""
if attribute_name not in aas.model_fields:
raise ValueError(
f"Submodel {attribute_name} is not a submodel attribute of {aas.__name__}."
)
field_info = aas.model_fields[attribute_name]
if not field_info.is_required():
return True
elif typing.get_origin(field_info.annotation) == Union and type(None) in typing.get_args(field_info.annotation):
return True
else:
return False
synchronization
adjust_body_for_external_schema(body, persistence_mapper=None, formatter=None)
Modifies the body for an external schema.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
body |
Any |
The body to modify. |
required |
persistence_mapper |
Optional[Mapper] |
The mapper that maps the body to the external model. Defaults to None. |
None |
formatter |
Optional[Formatter] |
The formatter that serializes the body. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Any |
The modified body. |
Source code in aas_middleware\middleware\synchronization.py
def adjust_body_for_external_schema(body: Any, persistence_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None) -> Any:
"""
Modifies the body for an external schema.
Args:
body (Any): The body to modify.
persistence_mapper (typing.Optional[Mapper], optional): The mapper that maps the body to the external model. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that serializes the body. Defaults to None.
Returns:
Any: The modified body.
"""
if persistence_mapper:
body = persistence_mapper.map(body)
if formatter:
body = formatter.serialize(body)
return body
adjust_body_for_persistence_schema(body, external_mapper=None, formatter=None)
Modifies the body for persistence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
body |
Any |
The body to modify. |
required |
external_mapper |
Optional[Mapper] |
The mapper that maps the body to the persistence model. Defaults to None. |
None |
formatter |
Optional[Formatter] |
The formatter that serializes the body. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
Any |
The modified body. |
Source code in aas_middleware\middleware\synchronization.py
def adjust_body_for_persistence_schema(body: Any, external_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None) -> Any:
"""
Modifies the body for persistence.
Args:
body (Any): The body to modify.
external_mapper (typing.Optional[Mapper], optional): The mapper that maps the body to the persistence model. Defaults to None.
formatter (typing.Optional[Formatter], optional): The formatter that serializes the body. Defaults to None.
Returns:
Any: The modified body.
"""
if formatter:
body = formatter.deserialize(body)
if external_mapper:
body = external_mapper.map(body)
return body
synchronize_connector_with_persistence(connector, connection_info, persistence_registry, persistence_mapper=None, external_mapper=None, formatter=None)
Synchronizes a connector with the persistence layer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
connector |
Union[Consumer, Provider] |
The connector to synchronize. |
required |
connection_info |
ConnectionInfo |
The connection info for the persistence layer. |
required |
persistence_registry |
PersistenceConnectionRegistry |
The registry for the persistence connectors. |
required |
Source code in aas_middleware\middleware\synchronization.py
def synchronize_connector_with_persistence(connector: Union[Consumer, Provider], connection_info: ConnectionInfo, persistence_registry: PersistenceConnectionRegistry, persistence_mapper: typing.Optional[Mapper] = None, external_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None):
"""
Synchronizes a connector with the persistence layer.
Args:
connector (Union[Consumer, Provider]): The connector to synchronize.
connection_info (ConnectionInfo): The connection info for the persistence layer.
persistence_registry (PersistenceConnectionRegistry): The registry for the persistence connectors.
"""
if isinstance(connector, Consumer):
original_consume = connector.consume
@wraps(connector.consume)
async def wrapped_consume(consumer_body: Any):
persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
if consumer_body is None:
# TODO: make data model connection possible here, not possible because of some conventions in the registries
persistence_body = await get_persistence_value(persistence_connector, connection_info)
consumer_body = adjust_body_for_external_schema(persistence_body, persistence_mapper, formatter)
else:
persistence_body = adjust_body_for_persistence_schema(consumer_body, external_mapper, formatter)
await update_persistence_with_value(persistence_connector, connection_info, persistence_body)
await original_consume(consumer_body)
connector.consume = wrapped_consume
if isinstance(connector, Provider):
original_provide = connector.provide
@wraps(connector.provide)
async def wrapped_provide() -> Any:
persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
provider_body = await original_provide()
persistence_body = await get_persistence_value(persistence_connector, connection_info)
provider_body_in_peristence_schema = adjust_body_for_persistence_schema(provider_body, external_mapper, formatter)
if provider_body_in_peristence_schema != persistence_body:
await update_persistence_with_value(persistence_connector, connection_info, provider_body_in_peristence_schema)
assert provider_body_in_peristence_schema == await get_persistence_value(persistence_connector, connection_info), f"Persistence value was not updated correctly. Expected {provider_body_in_peristence_schema}, got {await get_persistence_value(persistence_connector, connection_info)}"
return provider_body
connector.provide = wrapped_provide
synchronize_workflow_with_persistence_consumer(workflow, connection_info, persistence_registry, external_mapper=None, formatter=None)
Synchronizes a workflow with a persistence consumer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
The workflow to synchronize. |
required |
arg_name |
str |
The name of the argument in the workflow that is a consumer. |
required |
connection_info |
ConnectionInfo |
The connection info for the persistence layer. |
required |
persistence_registry |
PersistenceConnectionRegistry |
The registry for the persistence connectors. |
required |
Source code in aas_middleware\middleware\synchronization.py
def synchronize_workflow_with_persistence_consumer(workflow: Workflow, connection_info: ConnectionInfo, persistence_registry: PersistenceConnectionRegistry, external_mapper: typing.Optional[Mapper] = None, formatter: typing.Optional[Formatter] = None):
"""
Synchronizes a workflow with a persistence consumer.
Args:
workflow (Workflow): The workflow to synchronize.
arg_name (str): The name of the argument in the workflow that is a consumer.
connection_info (ConnectionInfo): The connection info for the persistence layer.
persistence_registry (PersistenceConnectionRegistry): The registry for the persistence connectors.
"""
original_execute = workflow.execute
@wraps(workflow.execute)
async def wrapped_execute(execute_body: Any):
print("execute body: ", execute_body)
workflow_return = await original_execute(execute_body)
print("workflow return: ", workflow_return)
persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
persistence_body = adjust_body_for_persistence_schema(workflow_return, external_mapper, formatter)
print("persistence body: ", persistence_body)
await update_persistence_with_value(persistence_connector, connection_info, persistence_body)
return workflow_return
workflow.execute = wrapped_execute
synchronize_workflow_with_persistence_provider(workflow, arg_name, connection_info, persistence_registry, persistence_mapper=None, external_mapper=None, formatter=None)
Synchronizes a workflow with a persistence provider.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
description |
required |
arg_name |
str |
description |
required |
connection_info |
ConnectionInfo |
description |
required |
persistence_registry |
PersistenceConnectionRegistry |
description |
required |
persistence_mapper |
Optional[Mapper] |
description. Defaults to None. |
None |
external_mapper |
Optional[Mapper] |
description. Defaults |
None |
formatter |
Optional[Formatter] |
description. Defaults to None. |
None |
Source code in aas_middleware\middleware\synchronization.py
def synchronize_workflow_with_persistence_provider(workflow: Workflow, arg_name: str, connection_info: ConnectionInfo, persistence_registry: PersistenceConnectionRegistry, persistence_mapper: typing.Optional[Mapper] = None, external_mapper: typing.Optional[Mapper]=None, formatter: typing.Optional[Formatter] = None):
"""
Synchronizes a workflow with a persistence provider.
Args:
workflow (Workflow): _description_
arg_name (str): _description_
connection_info (ConnectionInfo): _description_
persistence_registry (PersistenceConnectionRegistry): _description_
persistence_mapper (typing.Optional[Mapper], optional): _description_. Defaults to None.
external_mapper (typing.Optional[Mapper], optional): _description_. Defaults
formatter (typing.Optional[Formatter], optional): _description_. Defaults to None.
"""
original_execute = workflow.execute
@wraps(workflow.execute)
async def wrapped_execute(execute_body: Any):
persistence_connector = persistence_registry.get_connector_by_data_model_and_model_id(data_model_name=connection_info.data_model_name, model_id=connection_info.model_id)
if not execute_body:
persistence_body = await get_persistence_value(persistence_connector, connection_info)
execute_body = adjust_body_for_external_schema(persistence_body, persistence_mapper, formatter)
else:
persistence_body = adjust_body_for_persistence_schema(execute_body, external_mapper, formatter)
await update_persistence_with_value(persistence_connector, connection_info, persistence_body)
workflow_return = await original_execute(execute_body)
return workflow_return
workflow.execute = wrapped_execute
workflow_router
generate_workflow_endpoint(workflow)
Generates endpoints for a workflow to execute the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow |
Workflow |
Workflow that contains the function to be executed by the workflow. |
required |
Returns:
Type | Description |
---|---|
APIRouter |
FastAPI router with an endpoint to execute the workflow. |
Source code in aas_middleware\middleware\workflow_router.py
def generate_workflow_endpoint(workflow: Workflow) -> List[APIRouter]:
"""
Generates endpoints for a workflow to execute the workflow.
Args:
workflow (Workflow): Workflow that contains the function to be executed by the workflow.
Returns:
APIRouter: FastAPI router with an endpoint to execute the workflow.
"""
router = APIRouter(
prefix=f"/workflows/{workflow.get_name()}",
tags=["workflows"],
responses={404: {"description": "Not found"}},
)
if isinstance(workflow.workflow_function, functools.partial):
type_hints = get_partial_type_hints(workflow.workflow_function)
else:
type_hints = typing.get_type_hints(workflow.workflow_function)
return_type = type_hints.pop("return") if "return" in type_hints else None
if len(type_hints) == 0:
input_type_hints = None
elif len(type_hints) == 1:
input_type_hints = list(type_hints.values())[0]
else:
input_type_hints = get_base_model_from_type_hints(workflow.get_name(), type_hints)
if input_type_hints is None:
if workflow.get_description().interval is None:
@router.post("/execute", response_model=return_type)
async def execute():
if workflow.running:
raise HTTPException(
status_code=400,
detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
)
return await workflow.execute()
@router.post("/execute_background", response_model=Dict[str, str])
async def execute_background(background_tasks: BackgroundTasks):
if workflow.running:
raise HTTPException(
status_code=400,
detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
)
background_tasks.add_task(workflow.execute)
return {"message": f"Started exeuction of workflow {workflow.get_name()}"}
else:
if workflow.get_description().interval is None:
@router.post("/execute", response_model=return_type)
# TODO: make the optional not here, but add a method that updates the POST endpoints after a connection is added, to have optional parameters...
async def execute(arg: typing.Optional[input_type_hints]=None): # type: ignore
if workflow.running:
raise HTTPException(
status_code=400,
detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
)
if isinstance(arg, BaseModel) and len(type_hints) > 1:
input_value = dict(arg)
return await workflow.execute(**input_value)
else:
return await workflow.execute(arg)
@router.post("/execute_background", response_model=Dict[str, str])
async def execute_background(background_tasks: BackgroundTasks, arg: typing.Optional[input_type_hints]=None): # type: ignore
if workflow.running:
raise HTTPException(
status_code=400,
detail=f"Workflow {workflow.get_name()} is already running. Wait for it to finish or interrupt it first.",
)
if isinstance(arg, BaseModel) and len(type_hints) > 1:
input_value = dict(arg)
background_tasks.add_task(workflow.execute, **input_value)
else:
background_tasks.add_task(workflow.execute, arg.model_dump())
return {"message": f"Started exeuction of workflow {workflow.get_name()}"}
@router.get("/description", response_model=WorkflowDescription)
async def describe_workflow():
return workflow.get_description()
@router.get("/interrupt", response_model=Dict[str, str])
async def interrupt_workflow():
try:
await workflow.interrupt()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"message": f"Stopped execution of workflow {workflow.get_name()}"}
return router
get_base_model_from_type_hints(name, type_hints)
Get the base model from the type hints of a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type_hints |
Dict[str, Any] |
Type hints of a function. |
required |
Returns:
Type | Description |
---|---|
Type[Any] |
Base model of the type hints. |
Source code in aas_middleware\middleware\workflow_router.py
def get_base_model_from_type_hints(name: str, type_hints: Dict[str, typing.Any]) -> typing.Type[BaseModel]:
"""
Get the base model from the type hints of a function.
Args:
type_hints (Dict[str, typing.Any]): Type hints of a function.
Returns:
typing.Type[typing.Any]: Base model of the type hints.
"""
if "return" in type_hints:
type_hints.pop("return")
dynamical_model_creation_dict = {}
for argument, argument_type in type_hints.items():
entry = {
argument: typing.Annotated[
argument_type, Field()
]
}
dynamical_model_creation_dict.update(entry)
base_model = create_model(f"body_for_{name}", **dynamical_model_creation_dict)
return base_model