Skip to content

Connect API

connectors special

aas_client_connector special

aas_client

aas_is_on_server(aas_id, aas_client) async

Function to check if an AAS with the given id is on the server

Parameters:

Name Type Description Default
aas_id str

id of the AAS

required

Returns:

Type Description
bool

True if AAS is on server, False if not

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def aas_is_on_server(aas_id: str, aas_client: AASClient) -> bool:
    """
    Function to check if an AAS with the given id is on the server
    Args:
        aas_id (str): id of the AAS
    Returns:
        bool: True if AAS is on server, False if not
    """
    try:
        await get_basyx_aas_from_server(aas_id, aas_client)
        return True
    except Exception as e:
        return False
delete_aas_from_server(aas_id, aas_client) async

Function to delete an AAS from the server

Parameters:

Name Type Description Default
aas_id str

id of the AAS

required

Exceptions:

Type Description
HTTPException

If AAS with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def delete_aas_from_server(aas_id: str, aas_client: AASClient):
    """
    Function to delete an AAS from the server
    Args:
        aas_id (str): id of the AAS

    Raises:
        HTTPException: If AAS with the given id does not exist
    """
    if not await aas_is_on_server(aas_id, aas_client):
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas_id} does not exist. Cannot delete it."
        )
    base_64_id = client_utils.get_base64_from_string(aas_id)
    response = await delete_asset_administration_shell_by_id.asyncio(
        client=aas_client, aas_identifier=base_64_id
    )
get_aas_from_server(aas_id, aas_client, submodel_client) async

Function to get an AAS from the server

Parameters:

Name Type Description Default
aas_id str

id of the AAS

required

Returns:

Type Description
aas_model.AAS

AAS retrieved from the server

Exceptions:

Type Description
HTTPException

If AAS with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_aas_from_server(aas_id: str, aas_client: AASClient, submodel_client: SubmodelClient) -> aas_model.AAS:
    """
    Function to get an AAS from the server
    Args:
        aas_id (str): id of the AAS
    Returns:
        aas_model.AAS: AAS retrieved from the server

    Raises:
        HTTPException: If AAS with the given id does not exist
    """
    try: 
        aas = await get_basyx_aas_from_server(aas_id, aas_client)
    except Exception as e:
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas_id} could not be retrieved. Error: {e}"
        )
    try:
        aas_submodels = await get_all_basyx_submodels_from_server(aas, submodel_client)
    except Exception as e:
        raise HTTPException(
            status_code=400, detail=f"Submodels of AAS with id {aas_id} could not be retrieved. Error: {e}"
        )

    obj_store = model.DictObjectStore()
    obj_store.add(aas)
    [obj_store.add(submodel) for submodel in aas_submodels]

    data_model = BasyxFormatter().deserialize(obj_store)
    model_data = data_model.get_model(aas_id)

    return model_data
get_all_aas_from_server(pydantic_model, aas_client, submodel_client) async

Function to get all AAS from the server

Returns:

Type Description
List[aas_model.AAS]

List of AAS retrieved from the server

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_all_aas_from_server(pydantic_model: BaseModel, aas_client: AASClient, submodel_client: SubmodelClient) -> List[aas_model.AAS]:
    """
    Function to get all AAS from the server
    Returns:
        List[aas_model.AAS]: List of AAS retrieved from the server
    """
    result_string = await get_all_asset_administration_shells.asyncio(client=aas_client)
    aas_data = result_string["result"]
    aas_list = [client_utils.transform_client_to_basyx_model(aas) for aas in aas_data]

    submodels = []
    for aas in aas_list:
        aas_submodels = await get_all_basyx_submodels_from_server(aas, submodel_client)
        submodels.extend(aas_submodels)
    obj_store = model.DictObjectStore()
    [obj_store.add(aas) for aas in aas_list]
    [obj_store.add(submodel) for submodel in submodels if not any(submodel.id == other_sm.id for other_sm in obj_store)]

    data_model = BasyxFormatter().deserialize(model_data)
    model_data = data_model.get_models_of_type(aas_model.AAS)
    model_data = [model for model in model_data if model.__class__.__name__ == pydantic_model.__name__]
    return model_data
get_basyx_aas_from_server(aas_id, aas_client) async

Function to get an AAS from the server

Parameters:

Name Type Description Default
aas_id str

id of the AAS

required

Exceptions:

Type Description
HTTPException

If AAS with the given id does not exist

Returns:

Type Description
model.AssetAdministrationShell

AAS retrieved from the server

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_basyx_aas_from_server(aas_id: str, aas_client: AASClient) -> model.AssetAdministrationShell:
    """
    Function to get an AAS from the server
    Args:
        aas_id (str): id of the AAS
    Raises:
        HTTPException: If AAS with the given id does not exist
    Returns:
        model.AssetAdministrationShell: AAS retrieved from the server
    """
    base_64_id = client_utils.get_base64_from_string(aas_id)
    try:
        aas_data = await get_asset_administration_shell_by_id.asyncio(
            client=aas_client, aas_identifier=base_64_id
        )
        return client_utils.transform_client_to_basyx_model(aas_data.to_dict())
    except Exception as e:
        raise ConnectionError(
            e
        )
get_submodel_from_aas_id_and_class_name(aas_id, class_name, aas_client, submodel_client) async

Function to get a submodel from the server based on the AAS id and the class name of the submodel

Parameters:

Name Type Description Default
aas_id str

id of the AAS

required
class_name str

class name of the submodel

required

Exceptions:

Type Description
HTTPException

If submodel with the given class name does not exist for the given AAS

Returns:

Type Description
aas_model.Submodel

submodel retrieved from the server

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_submodel_from_aas_id_and_class_name(aas_id: str, class_name: str, aas_client: AASClient, submodel_client: SubmodelClient) -> aas_model.Submodel:
    """
    Function to get a submodel from the server based on the AAS id and the class name of the submodel
    Args:
        aas_id (str): id of the AAS
        class_name (str): class name of the submodel
    Raises:
        HTTPException: If submodel with the given class name does not exist for the given AAS
    Returns:
        aas_model.Submodel: submodel retrieved from the server
    """
    basyx_aas = await get_basyx_aas_from_server(aas_id, aas_client)
    for basyx_submodel in basyx_aas.submodel:
        submodel_id = basyx_submodel.key[0].value
        submodel = await get_submodel_from_server(submodel_id, submodel_client)
        if submodel.__class__.__name__ == class_name:
            return submodel
    raise HTTPException(
        status_code=411,
        detail=f"Submodel with name {class_name} does not exist for AAS with id {aas_id}",
    )
post_aas_to_server(aas, aas_client, submodel_client) async

Function to post an AAS to the server. Also posts all submodels of the AAS to the server, if they do not exist yet.

Parameters:

Name Type Description Default
aas aas_model.AAS

AAS to post

required

Exceptions:

Type Description
HTTPException

If AAS with the given id already exists

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def post_aas_to_server(aas: aas_model.AAS, aas_client: AASClient, submodel_client: SubmodelClient):
    """
    Function to post an AAS to the server. Also posts all submodels of the AAS to the server, if they do not exist yet.
    Args:
        aas (aas_model.AAS): AAS to post
    Raises:
        HTTPException: If AAS with the given id already exists
    """
    if await aas_is_on_server(aas.id, aas_client):
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas.id} already exists"
        )
    check_aas_for_duplicate_ids(aas)
    obj_store = convert_model_to_aas(aas)
    basyx_aas = obj_store.get(aas.id)
    aas_for_client = ClientModel(basyx_object=basyx_aas)
    response = await post_asset_administration_shell.asyncio(
        client=aas_client, body=aas_for_client
    )

    aas_attributes = get_value_attributes(aas)
    for submodel in aas_attributes.values():
        if not await submodel_is_on_server(submodel.id, submodel_client):
            await post_submodel_to_server(submodel, submodel_client)
        else:
            logger.info(f"Submodel with id {submodel.id} already exists on the server. Updating the value.")
            await put_submodel_to_server(submodel, submodel_client)
put_aas_to_server(aas, aas_client, submodel_client) async

Function to put an AAS to the server

Parameters:

Name Type Description Default
aas aas_model.AAS

AAS to put

required

Exceptions:

Type Description
HTTPException

If AAS with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def put_aas_to_server(aas: aas_model.AAS, aas_client: AASClient, submodel_client: SubmodelClient):
    """
    Function to put an AAS to the server
    Args:
        aas (aas_model.AAS): AAS to put
    Raises:
        HTTPException: If AAS with the given id does not exist
    """
    if not await aas_is_on_server(aas.id, aas_client):
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas.id} does not exist"
        )
    obj_store = convert_model_to_aas(aas)
    basyx_aas = obj_store.get(aas.id)
    aas_for_client = ClientModel(basyx_object=basyx_aas)
    base_64_id = client_utils.get_base64_from_string(aas.id)
    await put_asset_administration_shell_by_id.asyncio(
        aas_identifier=base_64_id, client=aas_client, body=aas_for_client
    )

    for submodel in get_value_attributes(aas).values():
        if await submodel_is_on_server(submodel.id, submodel_client):
            await put_submodel_to_server(submodel, submodel_client)
        else:
            await post_submodel_to_server(submodel, submodel_client)

client_utils

transform_client_to_basyx_model(client_model)

Function to transform a client model to a basyx model

Parameters:

Name Type Description Default
response_model dict

dictionary from server client that needs to be transformed

required

Returns:

Type Description
Union[model.AssetAdministrationShell, model.Submodel]

basyx model from the given client model

Source code in aas_middleware\connect\connectors\aas_client_connector\client_utils.py
def transform_client_to_basyx_model(
    client_model: dict | Any,
) -> Union[model.AssetAdministrationShell, model.Submodel]:
    """
    Function to transform a client model to a basyx model
    Args:
        response_model (dict): dictionary from server client that needs to be transformed
    Returns:
        Union[model.AssetAdministrationShell, model.Submodel]: basyx model from the given client model
    """
    if not isinstance(client_model, dict):
        client_model = client_model.to_dict()
    remove_empty_lists(client_model)
    json_model = json.dumps(client_model, indent=4)
    basyx_model = json.loads(json_model, cls=basyx.aas.adapter.json.AASFromJsonDecoder)
    return basyx_model

submodel_client

delete_submodel_from_server(submodel_id, submodel_client) async

Function to delete a submodel from the server

Parameters:

Name Type Description Default
submodel_id str

id of the submodel

required
submodel_client SubmodelClient

client to connect to the server

required

Exceptions:

Type Description
HTTPException

If submodel with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def delete_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient):
    """
    Function to delete a submodel from the server
    Args:
        submodel_id (str): id of the submodel
        submodel_client (SubmodelClient): client to connect to the server

    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    if not await submodel_is_on_server(submodel_id, submodel_client):
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel_id} does not exist. Cannot delete it."
        )
    base_64_id = client_utils.get_base64_from_string(submodel_id)
    await delete_submodel_by_id.asyncio(client=submodel_client, submodel_identifier=base_64_id)
get_all_basyx_submodels_from_server(aas, submodel_client) async

Function to get all submodels from an AAS in basyx format

Parameters:

Name Type Description Default
aas model.AssetAdministrationShell

AAS to get submodels from

required
submodel_client SubmodelClient

client to connect to the server

required

Returns:

Type Description
List[model.Submodel]

List of basyx submodels retrieved from the server

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_all_basyx_submodels_from_server(aas: model.AssetAdministrationShell, submodel_client: SubmodelClient) -> List[ClientSubmodel]:
    """
    Function to get all submodels from an AAS in basyx format
    Args:
        aas (model.AssetAdministrationShell): AAS to get submodels from
        submodel_client (SubmodelClient): client to connect to the server

    Returns:
        List[model.Submodel]: List of basyx submodels retrieved from the server
    """
    submodels = []
    for submodel_reference in aas.submodel:
        basyx_submodel = await get_basyx_submodel_from_server(submodel_reference.key[0].value, submodel_client)
        submodels.append(basyx_submodel)
    return submodels
get_all_submodel_data_from_server(submodel_client) async

Function to get all submodels from the server

Returns:

Type Description
List[aas_model.Submodel]

List of submodels retrieved from the server

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_all_submodel_data_from_server(submodel_client: SubmodelClient) -> List[ClientSubmodel]:
    """
    Function to get all submodels from the server
    Returns:
        List[aas_model.Submodel]: List of submodels retrieved from the server
    """
    submodel_data = await get_all_submodels.asyncio(client=submodel_client)
    submodel_data = submodel_data.result
    return submodel_data
get_all_submodels_of_type(model, submodel_client) async

Function to get all submodels of a certain type from the server

Parameters:

Name Type Description Default
model BaseModel

Pydantic model of the submodel

required

Returns:

Type Description
List[aas_model.Submodel]

List of submodels retrieved from the server

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_all_submodels_of_type(model: BaseModel, submodel_client: SubmodelClient) -> List[aas_model.Submodel]:
    """
    Function to get all submodels of a certain type from the server
    Args:
        model (BaseModel): Pydantic model of the submodel
    Returns:
        List[aas_model.Submodel]: List of submodels retrieved from the server
    """
    submodels_data = await get_all_submodel_data_from_server(submodel_client)
    submodels_of_type = []
    for submodel_data in submodels_data:
        basyx_submodel = client_utils.transform_client_to_basyx_model(submodel_data)
        submodel = convert_submodel_to_model(basyx_submodel)
        if submodel.__class__.__name__ == model.__name__:
            submodels_of_type.append(submodel)
    return submodels_of_type
get_basyx_submodel_from_server(submodel_id, submodel_client) async

Function to get a submodel from the server

Parameters:

Name Type Description Default
submodel_id str

id of the submodel

required
submodel_client SubmodelClient

client to connect to the server

required

Returns:

Type Description
model.Submodel

submodel retrieved from the server

Exceptions:

Type Description
HTTPException

If submodel with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_basyx_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient) -> model.Submodel:
    """
    Function to get a submodel from the server
    Args:
        submodel_id (str): id of the submodel
        submodel_client (SubmodelClient): client to connect to the server

    Returns:
        model.Submodel: submodel retrieved from the server

    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    base_64_id = client_utils.get_base64_from_string(submodel_id)
    try:
        submodel_data = await get_submodel_by_id.asyncio(
            client=submodel_client, submodel_identifier=base_64_id
        )
        return client_utils.transform_client_to_basyx_model(submodel_data.to_dict())
    except Exception as e:
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel_id} could not be retrieved. Error: {e}"
        )
get_submodel_from_server(submodel_id, submodel_client) async

Function to get a submodel from the server

Parameters:

Name Type Description Default
submodel_id str

id of the submodel

required

Returns:

Type Description
aas_model.Submodel

submodel retrieved from the server

Exceptions:

Type Description
HTTPException

If submodel with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient) -> aas_model.Submodel:
    """
    Function to get a submodel from the server
    Args:
        submodel_id (str): id of the submodel
    Returns:
        aas_model.Submodel: submodel retrieved from the server

    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    try:
        basyx_submodel = await get_basyx_submodel_from_server(submodel_id, submodel_client)
        return convert_submodel_to_model(basyx_submodel)
    except HTTPException as e:
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel_id} could not be retrieved. Error: {e}"
        )
post_submodel_to_server(pydantic_submodel, submodel_client) async

Function to post a submodel to the server

Parameters:

Name Type Description Default
pydantic_submodel aas_model.Submodel

submodel to post

required
submodel_client SubmodelClient

client to connect to the server

required

Exceptions:

Type Description
HTTPException

If submodel with the given id already exists

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def post_submodel_to_server(pydantic_submodel: aas_model.Submodel, submodel_client: SubmodelClient):
    """
    Function to post a submodel to the server
    Args:
        pydantic_submodel (aas_model.Submodel): submodel to post
        submodel_client (SubmodelClient): client to connect to the server

    Raises:
        HTTPException: If submodel with the given id already exists
    """
    if await submodel_is_on_server(pydantic_submodel.id, submodel_client):
        raise HTTPException(
            status_code=400,
            detail=f"Submodel with id {pydantic_submodel.id} already exists. Try putting it instead.",
        )
    basyx_submodel = convert_model_to_submodel(pydantic_submodel)
    submodel_for_client = client_utils.ClientModel(basyx_object=basyx_submodel)
    # TODO: make a try except with json.decoder.JSONDecodeError to avoid error when posting a submodel that already exists, same goes for aas
    response = await post_submodel.asyncio(client=submodel_client, body=submodel_for_client)
put_submodel_to_server(submodel, submodel_client) async

Function to put a submodel to the server

Parameters:

Name Type Description Default
submodel aas_model.Submodel

submodel to put

required
submodel_client SubmodelClient

client to connect to the server

required

Exceptions:

Type Description
HTTPException

If submodel with the given id does not exist

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def put_submodel_to_server(submodel: aas_model.Submodel, submodel_client: SubmodelClient):
    """
    Function to put a submodel to the server
    Args:
        submodel (aas_model.Submodel): submodel to put
        submodel_client (SubmodelClient): client to connect to the server

    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    if not await submodel_is_on_server(submodel.id, submodel_client):
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel.id} does not exist. Try posting it first."
        )
    basyx_submodel = convert_model_to_submodel(submodel)
    submodel_for_client = client_utils.ClientModel(basyx_object=basyx_submodel)
    base_64_id = client_utils.get_base64_from_string(submodel.id)
    response = await put_submodel_by_id.asyncio(
        submodel_identifier=base_64_id, client=submodel_client, body=submodel_for_client
    )
submodel_is_on_server(submodel_id, submodel_client) async

Function to check if a submodel with the given id is on the server

Parameters:

Name Type Description Default
submodel_id str

id of the submodel

required
submodel_client SubmodelClient

client to connect to the server

required

Returns:

Type Description
bool

True if submodel is on server, False if not

Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def submodel_is_on_server(submodel_id: str, submodel_client: SubmodelClient) -> bool:
    """
    Function to check if a submodel with the given id is on the server
    Args:
        submodel_id (str): id of the submodel
        submodel_client (SubmodelClient): client to connect to the server

    Returns:
        bool: True if submodel is on server, False if not
    """
    try:
        await get_submodel_from_server(submodel_id, submodel_client)
        return True
    except HTTPException as e:
        return False

async_connector

Publisher (Protocol)

Source code in aas_middleware\connect\connectors\async_connector.py
@runtime_checkable
class Publisher(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def publish(self, topic: str, message: Any) -> None:
        """
        Interfaces for a publisher to publish messages to a topic.

        Args:
            topic (str): The topic to publish the message to.
            message (Any): The message to be published.

        Raises:
            ConnectionError: If the publishing of the message failed.
        """
        ...
connect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\async_connector.py
async def connect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
disconnect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\async_connector.py
async def disconnect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
publish(self, topic, message) async

Interfaces for a publisher to publish messages to a topic.

Parameters:

Name Type Description Default
topic str

The topic to publish the message to.

required
message Any

The message to be published.

required

Exceptions:

Type Description
ConnectionError

If the publishing of the message failed.

Source code in aas_middleware\connect\connectors\async_connector.py
async def publish(self, topic: str, message: Any) -> None:
    """
    Interfaces for a publisher to publish messages to a topic.

    Args:
        topic (str): The topic to publish the message to.
        message (Any): The message to be published.

    Raises:
        ConnectionError: If the publishing of the message failed.
    """
    ...

Subsciber (Protocol)

Source code in aas_middleware\connect\connectors\async_connector.py
@runtime_checkable
class Subsciber(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def subscribe(self, topic: str, callback: Callable[[Any], Awaitable[None] | Any]) -> None:
        """
        Interfaces for a subscriber to subscribe to a topic and receive messages.

        Args:
            topic (str): The topic to subscribe to.
            callback (Callable[[str, str], Awaitable[None]]): The callback to be executed when a message is received.

        Raises:
            ConnectionError: If the subscribing to the topic failed.
        """
        ...
connect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\async_connector.py
async def connect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
disconnect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\async_connector.py
async def disconnect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
subscribe(self, topic, callback) async

Interfaces for a subscriber to subscribe to a topic and receive messages.

Parameters:

Name Type Description Default
topic str

The topic to subscribe to.

required
callback Callable[[str, str], Awaitable[None]]

The callback to be executed when a message is received.

required

Exceptions:

Type Description
ConnectionError

If the subscribing to the topic failed.

Source code in aas_middleware\connect\connectors\async_connector.py
async def subscribe(self, topic: str, callback: Callable[[Any], Awaitable[None] | Any]) -> None:
    """
    Interfaces for a subscriber to subscribe to a topic and receive messages.

    Args:
        topic (str): The topic to subscribe to.
        callback (Callable[[str, str], Awaitable[None]]): The callback to be executed when a message is received.

    Raises:
        ConnectionError: If the subscribing to the topic failed.
    """
    ...

connector

Consumer (Protocol)

Source code in aas_middleware\connect\connectors\connector.py
@runtime_checkable
class Consumer(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def consume(self, body: Any) -> None:
        """
        Interfaces for a consumer to consume data and send in with the connection to the consumer.

        Args:
            body (Any): The data to be consumed.

        Raises:
            ConnectionError: If the consuming of the data failed.
        """
        ...
connect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\connector.py
async def connect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
consume(self, body) async

Interfaces for a consumer to consume data and send in with the connection to the consumer.

Parameters:

Name Type Description Default
body Any

The data to be consumed.

required

Exceptions:

Type Description
ConnectionError

If the consuming of the data failed.

Source code in aas_middleware\connect\connectors\connector.py
async def consume(self, body: Any) -> None:
    """
    Interfaces for a consumer to consume data and send in with the connection to the consumer.

    Args:
        body (Any): The data to be consumed.

    Raises:
        ConnectionError: If the consuming of the data failed.
    """
    ...
disconnect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\connector.py
async def disconnect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...

Provider (Protocol)

Source code in aas_middleware\connect\connectors\connector.py
@runtime_checkable
class Provider(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...

    async def provide(self) -> Any:
        """
        Interfaces for a provider to provide data.

        Returns:
            Any: The data to be provided.

        Raises:
            ConnectionError: If the providing of the data failed.
        """
        ...
connect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\connector.py
async def connect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
disconnect(self) async

Exceptions:

Type Description
ConnectionError

If the connection to the server could not be established.

Source code in aas_middleware\connect\connectors\connector.py
async def disconnect(self):
    """
    Raises:
        ConnectionError: If the connection to the server could not be established.
    """
    ...
provide(self) async

Interfaces for a provider to provide data.

Returns:

Type Description
Any

The data to be provided.

Exceptions:

Type Description
ConnectionError

If the providing of the data failed.

Source code in aas_middleware\connect\connectors\connector.py
async def provide(self) -> Any:
    """
    Interfaces for a provider to provide data.

    Returns:
        Any: The data to be provided.

    Raises:
        ConnectionError: If the providing of the data failed.
    """
    ...

web_hook_client_connector

WebHookClientConnector

Class for a WebHookClientConnector that can be used to connect to a webhook and receive data from it.

Parameters:

Name Type Description Default
web_hook_url str

adress of the webhook

required

Attributes:

Name Type Description
web_hook_url str

adress of the webhook server

own_url Optional[str]

own url where the webhook sends data to

hook Optional[anyio.Event]

hook event for posts of the webhook

connected_subscriber Optional[str]

adress of the subscriber that is connected to the webhook

received_data str

data received from the webhook

Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
class WebHookClientConnector:
    """
    Class for a WebHookClientConnector that can be used to connect to a webhook and receive data from it.

    Args:
        web_hook_url (str): adress of the webhook

    Attributes:
        web_hook_url (str): adress of the webhook server
        own_url (Optional[str]): own url where the webhook sends data to
        hook (Optional[anyio.Event]): hook event for posts of the webhook
        connected_subscriber (Optional[str]): adress of the subscriber that is connected to the webhook
        received_data (str): data received from the webhook
    """

    def __init__(self, web_hook_url: str):
        """
        Args:
            web_hook_url (str): adress of the webhook
        """
        self.web_hook_url = web_hook_url
        self.own_url: Optional[str] = None
        self.hook: Optional[anyio.Event] = None
        self.connected_subscriber: Optional[str] = None
        self.received_data: str = ""

    async def set_hook(self, hook: anyio.Event, url: str):
        """
        Function sets the hook event for posts of the webhook and the url where the webhook should post to.
        """
        # TODO: set this hook when adding the hook to a provider so that it is linked to fastAPI post route of the datamodel
        # either to this with a callback or these events.
        self.hook = hook
        self.own_url = url

    async def trigger_hook(self, body: str):
        """
        Function triggers the hook for the webhook.
        """
        if not self.hook:
            raise HTTPException(
                status_code=400,
                detail="Webhook does not have a hook set. Please use the set_hook function to set the hook.",
            )
        self.received_data = body
        self.hook.set()

    async def connect(self):
        """
        Function connects to the webhook with a post request that sends the middleware url to the webhook.
        """
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.web_hook_url, json={"url": self.own_url}
            ) as response:
                status = response.status
                if status != 200:
                    raise HTTPException(
                        status_code=400,
                        detail="Webhook could not connect to the webhook server.",
                    )
                # TODO: add logging for somethink like this!
                await response.text()

    async def disconnect(self):
        """
        Function disconnects from the webhook.
        """
        # TODO: either remove the hookevent from the provider or set it to false or
        # remove the callback from the endpoint of the provider

    async def consume(self, body: str) -> str:
        raise NotImplementedError(
            "WebHookClientConnector does not support sending data but only receiving data. Try to use the WebHookServerConnector instead."
        )

    async def provide(self) -> str:
        """
        Function receives data from the webhook.

        """
        if not self.hook:
            raise Exception(
                "WebHookClientConnector does not have a hook set. Please use the set_hook function to set the hook."
            )
        await self.hook.wait()
        self.hook = anyio.Event()
        return self.received_data
__init__(self, web_hook_url) special

Parameters:

Name Type Description Default
web_hook_url str

adress of the webhook

required
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
def __init__(self, web_hook_url: str):
    """
    Args:
        web_hook_url (str): adress of the webhook
    """
    self.web_hook_url = web_hook_url
    self.own_url: Optional[str] = None
    self.hook: Optional[anyio.Event] = None
    self.connected_subscriber: Optional[str] = None
    self.received_data: str = ""
connect(self) async

Function connects to the webhook with a post request that sends the middleware url to the webhook.

Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def connect(self):
    """
    Function connects to the webhook with a post request that sends the middleware url to the webhook.
    """
    async with aiohttp.ClientSession() as session:
        async with session.post(
            self.web_hook_url, json={"url": self.own_url}
        ) as response:
            status = response.status
            if status != 200:
                raise HTTPException(
                    status_code=400,
                    detail="Webhook could not connect to the webhook server.",
                )
            # TODO: add logging for somethink like this!
            await response.text()
disconnect(self) async

Function disconnects from the webhook.

Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def disconnect(self):
    """
    Function disconnects from the webhook.
    """
    # TODO: either remove the hookevent from the provider or set it to false or
    # remove the callback from the endpoint of the provider
provide(self) async

Function receives data from the webhook.

Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def provide(self) -> str:
    """
    Function receives data from the webhook.

    """
    if not self.hook:
        raise Exception(
            "WebHookClientConnector does not have a hook set. Please use the set_hook function to set the hook."
        )
    await self.hook.wait()
    self.hook = anyio.Event()
    return self.received_data
set_hook(self, hook, url) async

Function sets the hook event for posts of the webhook and the url where the webhook should post to.

Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def set_hook(self, hook: anyio.Event, url: str):
    """
    Function sets the hook event for posts of the webhook and the url where the webhook should post to.
    """
    # TODO: set this hook when adding the hook to a provider so that it is linked to fastAPI post route of the datamodel
    # either to this with a callback or these events.
    self.hook = hook
    self.own_url = url
trigger_hook(self, body) async

Function triggers the hook for the webhook.

Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def trigger_hook(self, body: str):
    """
    Function triggers the hook for the webhook.
    """
    if not self.hook:
        raise HTTPException(
            status_code=400,
            detail="Webhook does not have a hook set. Please use the set_hook function to set the hook.",
        )
    self.received_data = body
    self.hook.set()

web_hook_server_connector

WebHookServerConnector

Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
class WebHookServerConnector:
    def __init__(self):
        self.hook: anyio.Event = anyio.Event()
        self.connected_subscribers: List[str] = []
        self.received_data: str = ""
        self.connectable: bool = False

    async def start_server(self) -> None:
        """
        Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
        """
        # TODO: add functionality that when added to a consumer, an endpoint is created for webhook clients to connect to.
        # and one endpoint to remove client subsciptions. THe register and unregister functions should then be used.
        pass

    async def register_subscriber(self, url: str) -> None:
        """
        Function registers a subscriber to the webhook.

        Args:
            url (str): _description_
        """
        if not self.connectable:
            raise HTTPException(status_code=400, detail="Webhook not connectable.")
        self.connected_subscribers.append(url)

    async def unregister_subscriber(self, url: str) -> None:
        """
        Function unregisters a subscriber from the webhook.

        Args:
            url (str): _description_
        """
        try:
            self.connected_subscribers.remove(url)
        except ValueError:
            raise HTTPException(status_code=400, detail="Subscriber not registered.")

    async def connect(self):
        """
        Function allows clients to connect to the webhook.
        """
        self.connectable = True

    async def disconnect(self):
        """
        Function disconnects all subscribed clients from the webhook.
        """
        self.connectable = False
        self.connected_subscribers = []

    async def consume(self) -> str:
        """
        Function sends data to all subscribed clients of the webhook.

        Args:
            body (str): _description_

        Returns:
            str: _description_
        """
        await self.hook.wait()
        if not self.connectable:
            raise HTTPException(status_code=400, detail="Webhook not connectable.")
        self.hook.set()
        for subscriber in self.connected_subscribers:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    subscriber, data=self.received_data
                ) as response:
                    await response.text()
        return "Webhook message sent"

    async def provide(self) -> str:
        raise NotImplementedError
connect(self) async

Function allows clients to connect to the webhook.

Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def connect(self):
    """
    Function allows clients to connect to the webhook.
    """
    self.connectable = True
consume(self) async

Function sends data to all subscribed clients of the webhook.

Parameters:

Name Type Description Default
body str

description

required

Returns:

Type Description
str

description

Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def consume(self) -> str:
    """
    Function sends data to all subscribed clients of the webhook.

    Args:
        body (str): _description_

    Returns:
        str: _description_
    """
    await self.hook.wait()
    if not self.connectable:
        raise HTTPException(status_code=400, detail="Webhook not connectable.")
    self.hook.set()
    for subscriber in self.connected_subscribers:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                subscriber, data=self.received_data
            ) as response:
                await response.text()
    return "Webhook message sent"
disconnect(self) async

Function disconnects all subscribed clients from the webhook.

Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def disconnect(self):
    """
    Function disconnects all subscribed clients from the webhook.
    """
    self.connectable = False
    self.connected_subscribers = []
register_subscriber(self, url) async

Function registers a subscriber to the webhook.

Parameters:

Name Type Description Default
url str

description

required
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def register_subscriber(self, url: str) -> None:
    """
    Function registers a subscriber to the webhook.

    Args:
        url (str): _description_
    """
    if not self.connectable:
        raise HTTPException(status_code=400, detail="Webhook not connectable.")
    self.connected_subscribers.append(url)
start_server(self) async

Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.

Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def start_server(self) -> None:
    """
    Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
    """
    # TODO: add functionality that when added to a consumer, an endpoint is created for webhook clients to connect to.
    # and one endpoint to remove client subsciptions. THe register and unregister functions should then be used.
    pass
unregister_subscriber(self, url) async

Function unregisters a subscriber from the webhook.

Parameters:

Name Type Description Default
url str

description

required
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def unregister_subscriber(self, url: str) -> None:
    """
    Function unregisters a subscriber from the webhook.

    Args:
        url (str): _description_
    """
    try:
        self.connected_subscribers.remove(url)
    except ValueError:
        raise HTTPException(status_code=400, detail="Subscriber not registered.")

web_socket_server_connector

WebSocketServerConnector

Source code in aas_middleware\connect\connectors\web_socket_server_connector.py
class WebSocketServerConnector:
    def __init__(
        self, host: str, port: int, reply_function: Optional[Callable[[], str]] = None
    ):
        self.host = host
        self.port = port
        self.server: Optional[websockets.WebSocketServer] = None
        self.websocket: Optional[websockets.WebSocketServerProtocol] = None
        self.received_data_event = anyio.Event()
        self.received_data: str = ""
        self.reply_function = reply_function
        self.connectable = False

    async def start_server(self) -> None:
        self.server = await websockets.serve(
            self.handle_websocket, self.host, self.port
        )

    async def handle_websocket(
        self, websocket: websockets.WebSocketServerProtocol, path: str
    ) -> None:
        if not self.connectable:
            await websocket.close()
            return
        self.websocket = websocket

        try:
            async for message in websocket:
                print(f"Received message: {message}")
                if isinstance(message, bytes):
                    message = message.decode()
                self.received_data = message
                self.received_data_event.set()
                if self.reply_function:
                    await websocket.send(self.reply_function())
                else:
                    await websocket.send("Message received")

        except websockets.exceptions.ConnectionClosed:
            raise HTTPException(status_code=400, detail="Websocket connection closed.")

    async def connect(self) -> None:
        """
        Function activates that connection to the server are allowed and accepted.
        """
        if not self.server:
            raise HTTPException(
                status_code=400, detail="Websocket server is not started."
            )
        self.connectable = True
        if not self.server:
            await self.start_server()

    async def disconnect(self) -> None:
        self.connectable = False
        if self.websocket:
            await self.websocket.close()
            self.websocket = None
        if self.server:
            self.server.close()
            await self.server.wait_closed()
            self.server = None

    async def consume(self, body: str) -> Optional[str]:
        if not self.server:
            raise HTTPException(
                status_code=400, detail="Websocket server is not started."
            )
        if not self.websocket:
            raise HTTPException(
                status_code=400, detail="Websocket not connected to websocket server."
            )
        await self.websocket.send(body)

    async def provide(self) -> str:
        await self.received_data_event.wait()
        self.received_data_event = anyio.Event()
        return self.received_data
connect(self) async

Function activates that connection to the server are allowed and accepted.

Source code in aas_middleware\connect\connectors\web_socket_server_connector.py
async def connect(self) -> None:
    """
    Function activates that connection to the server are allowed and accepted.
    """
    if not self.server:
        raise HTTPException(
            status_code=400, detail="Websocket server is not started."
        )
    self.connectable = True
    if not self.server:
        await self.start_server()