Connect API
        connectors
  
      special
  
    
        aas_client_connector
  
      special
  
    
        aas_client
    
aas_is_on_server(aas_id, aas_client)
  
      async
  
    Function to check if an AAS with the given id is on the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas_id | 
        str | 
        id of the AAS  | 
        required | 
Returns:
| Type | Description | 
|---|---|
bool | 
      True if AAS is on server, False if not  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def aas_is_on_server(aas_id: str, aas_client: AASClient) -> bool:
    """
    Function to check if an AAS with the given id is on the server
    Args:
        aas_id (str): id of the AAS
    Returns:
        bool: True if AAS is on server, False if not
    """
    try:
        await get_basyx_aas_from_server(aas_id, aas_client)
        return True
    except Exception as e:
        return False
delete_aas_from_server(aas_id, aas_client)
  
      async
  
    Function to delete an AAS from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas_id | 
        str | 
        id of the AAS  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If AAS with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def delete_aas_from_server(aas_id: str, aas_client: AASClient):
    """
    Function to delete an AAS from the server
    Args:
        aas_id (str): id of the AAS
    Raises:
        HTTPException: If AAS with the given id does not exist
    """
    if not await aas_is_on_server(aas_id, aas_client):
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas_id} does not exist. Cannot delete it."
        )
    base_64_id = client_utils.get_base64_from_string(aas_id)
    response = await delete_asset_administration_shell_by_id.asyncio(
        client=aas_client, aas_identifier=base_64_id
    )
get_aas_from_server(aas_id, aas_client, submodel_client)
  
      async
  
    Function to get an AAS from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas_id | 
        str | 
        id of the AAS  | 
        required | 
Returns:
| Type | Description | 
|---|---|
aas_model.AAS | 
      AAS retrieved from the server  | 
    
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If AAS with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def get_aas_from_server(aas_id: str, aas_client: AASClient, submodel_client: SubmodelClient) -> aas_model.AAS:
    """
    Function to get an AAS from the server
    Args:
        aas_id (str): id of the AAS
    Returns:
        aas_model.AAS: AAS retrieved from the server
    Raises:
        HTTPException: If AAS with the given id does not exist
    """
    try: 
        aas = await get_basyx_aas_from_server(aas_id, aas_client)
    except Exception as e:
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas_id} could not be retrieved. Error: {e}"
        )
    try:
        aas_submodels = await get_all_basyx_submodels_from_server(aas, submodel_client)
    except Exception as e:
        raise HTTPException(
            status_code=400, detail=f"Submodels of AAS with id {aas_id} could not be retrieved. Error: {e}"
        )
    obj_store = model.DictObjectStore()
    obj_store.add(aas)
    [obj_store.add(submodel) for submodel in aas_submodels]
    data_model = BasyxFormatter().deserialize(obj_store)
    model_data = data_model.get_model(aas_id)
    return model_data
get_all_aas_from_server(pydantic_model, aas_client, submodel_client)
  
      async
  
    Function to get all AAS from the server
Returns:
| Type | Description | 
|---|---|
List[aas_model.AAS] | 
      List of AAS retrieved from the server  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def get_all_aas_from_server(pydantic_model: BaseModel, aas_client: AASClient, submodel_client: SubmodelClient) -> List[aas_model.AAS]:
    """
    Function to get all AAS from the server
    Returns:
        List[aas_model.AAS]: List of AAS retrieved from the server
    """
    result_string = await get_all_asset_administration_shells.asyncio(client=aas_client)
    aas_data = result_string["result"]
    aas_list = [client_utils.transform_client_to_basyx_model(aas) for aas in aas_data]
    submodels = []
    for aas in aas_list:
        aas_submodels = await get_all_basyx_submodels_from_server(aas, submodel_client)
        submodels.extend(aas_submodels)
    obj_store = model.DictObjectStore()
    [obj_store.add(aas) for aas in aas_list]
    [obj_store.add(submodel) for submodel in submodels if not any(submodel.id == other_sm.id for other_sm in obj_store)]
    data_model = BasyxFormatter().deserialize(model_data)
    model_data = data_model.get_models_of_type(aas_model.AAS)
    model_data = [model for model in model_data if model.__class__.__name__ == pydantic_model.__name__]
    return model_data
get_basyx_aas_from_server(aas_id, aas_client)
  
      async
  
    Function to get an AAS from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas_id | 
        str | 
        id of the AAS  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If AAS with the given id does not exist  | 
      
Returns:
| Type | Description | 
|---|---|
model.AssetAdministrationShell | 
      AAS retrieved from the server  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def get_basyx_aas_from_server(aas_id: str, aas_client: AASClient) -> model.AssetAdministrationShell:
    """
    Function to get an AAS from the server
    Args:
        aas_id (str): id of the AAS
    Raises:
        HTTPException: If AAS with the given id does not exist
    Returns:
        model.AssetAdministrationShell: AAS retrieved from the server
    """
    base_64_id = client_utils.get_base64_from_string(aas_id)
    try:
        aas_data = await get_asset_administration_shell_by_id.asyncio(
            client=aas_client, aas_identifier=base_64_id
        )
        return client_utils.transform_client_to_basyx_model(aas_data.to_dict())
    except Exception as e:
        raise ConnectionError(
            e
        )
get_submodel_from_aas_id_and_class_name(aas_id, class_name, aas_client, submodel_client)
  
      async
  
    Function to get a submodel from the server based on the AAS id and the class name of the submodel
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas_id | 
        str | 
        id of the AAS  | 
        required | 
class_name | 
        str | 
        class name of the submodel  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If submodel with the given class name does not exist for the given AAS  | 
      
Returns:
| Type | Description | 
|---|---|
aas_model.Submodel | 
      submodel retrieved from the server  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def get_submodel_from_aas_id_and_class_name(aas_id: str, class_name: str, aas_client: AASClient, submodel_client: SubmodelClient) -> aas_model.Submodel:
    """
    Function to get a submodel from the server based on the AAS id and the class name of the submodel
    Args:
        aas_id (str): id of the AAS
        class_name (str): class name of the submodel
    Raises:
        HTTPException: If submodel with the given class name does not exist for the given AAS
    Returns:
        aas_model.Submodel: submodel retrieved from the server
    """
    basyx_aas = await get_basyx_aas_from_server(aas_id, aas_client)
    for basyx_submodel in basyx_aas.submodel:
        submodel_id = basyx_submodel.key[0].value
        submodel = await get_submodel_from_server(submodel_id, submodel_client)
        if submodel.__class__.__name__ == class_name:
            return submodel
    raise HTTPException(
        status_code=411,
        detail=f"Submodel with name {class_name} does not exist for AAS with id {aas_id}",
    )
post_aas_to_server(aas, aas_client, submodel_client)
  
      async
  
    Function to post an AAS to the server. Also posts all submodels of the AAS to the server, if they do not exist yet.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas | 
        aas_model.AAS | 
        AAS to post  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If AAS with the given id already exists  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def post_aas_to_server(aas: aas_model.AAS, aas_client: AASClient, submodel_client: SubmodelClient):
    """
    Function to post an AAS to the server. Also posts all submodels of the AAS to the server, if they do not exist yet.
    Args:
        aas (aas_model.AAS): AAS to post
    Raises:
        HTTPException: If AAS with the given id already exists
    """
    if await aas_is_on_server(aas.id, aas_client):
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas.id} already exists"
        )
    check_aas_for_duplicate_ids(aas)
    obj_store = convert_model_to_aas(aas)
    basyx_aas = obj_store.get(aas.id)
    aas_for_client = ClientModel(basyx_object=basyx_aas)
    response = await post_asset_administration_shell.asyncio(
        client=aas_client, body=aas_for_client
    )
    aas_attributes = get_value_attributes(aas)
    for submodel in aas_attributes.values():
        if not await submodel_is_on_server(submodel.id, submodel_client):
            await post_submodel_to_server(submodel, submodel_client)
        else:
            logger.info(f"Submodel with id {submodel.id} already exists on the server. Updating the value.")
            await put_submodel_to_server(submodel, submodel_client)
put_aas_to_server(aas, aas_client, submodel_client)
  
      async
  
    Function to put an AAS to the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas | 
        aas_model.AAS | 
        AAS to put  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If AAS with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
          async def put_aas_to_server(aas: aas_model.AAS, aas_client: AASClient, submodel_client: SubmodelClient):
    """
    Function to put an AAS to the server
    Args:
        aas (aas_model.AAS): AAS to put
    Raises:
        HTTPException: If AAS with the given id does not exist
    """
    if not await aas_is_on_server(aas.id, aas_client):
        raise HTTPException(
            status_code=400, detail=f"AAS with id {aas.id} does not exist"
        )
    obj_store = convert_model_to_aas(aas)
    basyx_aas = obj_store.get(aas.id)
    aas_for_client = ClientModel(basyx_object=basyx_aas)
    base_64_id = client_utils.get_base64_from_string(aas.id)
    await put_asset_administration_shell_by_id.asyncio(
        aas_identifier=base_64_id, client=aas_client, body=aas_for_client
    )
    for submodel in get_value_attributes(aas).values():
        if await submodel_is_on_server(submodel.id, submodel_client):
            await put_submodel_to_server(submodel, submodel_client)
        else:
            await post_submodel_to_server(submodel, submodel_client)
        client_utils
    
transform_client_to_basyx_model(client_model)
    Function to transform a client model to a basyx model
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
response_model | 
        dict | 
        dictionary from server client that needs to be transformed  | 
        required | 
Returns:
| Type | Description | 
|---|---|
Union[model.AssetAdministrationShell, model.Submodel] | 
      basyx model from the given client model  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\client_utils.py
          def transform_client_to_basyx_model(
    client_model: dict | Any,
) -> Union[model.AssetAdministrationShell, model.Submodel]:
    """
    Function to transform a client model to a basyx model
    Args:
        response_model (dict): dictionary from server client that needs to be transformed
    Returns:
        Union[model.AssetAdministrationShell, model.Submodel]: basyx model from the given client model
    """
    if not isinstance(client_model, dict):
        client_model = client_model.to_dict()
    remove_empty_lists(client_model)
    json_model = json.dumps(client_model, indent=4)
    basyx_model = json.loads(json_model, cls=basyx.aas.adapter.json.AASFromJsonDecoder)
    return basyx_model
        submodel_client
    
delete_submodel_from_server(submodel_id, submodel_client)
  
      async
  
    Function to delete a submodel from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
submodel_id | 
        str | 
        id of the submodel  | 
        required | 
submodel_client | 
        SubmodelClient | 
        client to connect to the server  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If submodel with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def delete_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient):
    """
    Function to delete a submodel from the server
    Args:
        submodel_id (str): id of the submodel
        submodel_client (SubmodelClient): client to connect to the server
    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    if not await submodel_is_on_server(submodel_id, submodel_client):
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel_id} does not exist. Cannot delete it."
        )
    base_64_id = client_utils.get_base64_from_string(submodel_id)
    await delete_submodel_by_id.asyncio(client=submodel_client, submodel_identifier=base_64_id)
get_all_basyx_submodels_from_server(aas, submodel_client)
  
      async
  
    Function to get all submodels from an AAS in basyx format
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
aas | 
        model.AssetAdministrationShell | 
        AAS to get submodels from  | 
        required | 
submodel_client | 
        SubmodelClient | 
        client to connect to the server  | 
        required | 
Returns:
| Type | Description | 
|---|---|
List[model.Submodel] | 
      List of basyx submodels retrieved from the server  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def get_all_basyx_submodels_from_server(aas: model.AssetAdministrationShell, submodel_client: SubmodelClient) -> List[ClientSubmodel]:
    """
    Function to get all submodels from an AAS in basyx format
    Args:
        aas (model.AssetAdministrationShell): AAS to get submodels from
        submodel_client (SubmodelClient): client to connect to the server
    Returns:
        List[model.Submodel]: List of basyx submodels retrieved from the server
    """
    submodels = []
    for submodel_reference in aas.submodel:
        basyx_submodel = await get_basyx_submodel_from_server(submodel_reference.key[0].value, submodel_client)
        submodels.append(basyx_submodel)
    return submodels
get_all_submodel_data_from_server(submodel_client)
  
      async
  
    Function to get all submodels from the server
Returns:
| Type | Description | 
|---|---|
List[aas_model.Submodel] | 
      List of submodels retrieved from the server  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def get_all_submodel_data_from_server(submodel_client: SubmodelClient) -> List[ClientSubmodel]:
    """
    Function to get all submodels from the server
    Returns:
        List[aas_model.Submodel]: List of submodels retrieved from the server
    """
    submodel_data = await get_all_submodels.asyncio(client=submodel_client)
    submodel_data = submodel_data.result
    return submodel_data
get_all_submodels_of_type(model, submodel_client)
  
      async
  
    Function to get all submodels of a certain type from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
model | 
        BaseModel | 
        Pydantic model of the submodel  | 
        required | 
Returns:
| Type | Description | 
|---|---|
List[aas_model.Submodel] | 
      List of submodels retrieved from the server  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def get_all_submodels_of_type(model: BaseModel, submodel_client: SubmodelClient) -> List[aas_model.Submodel]:
    """
    Function to get all submodels of a certain type from the server
    Args:
        model (BaseModel): Pydantic model of the submodel
    Returns:
        List[aas_model.Submodel]: List of submodels retrieved from the server
    """
    submodels_data = await get_all_submodel_data_from_server(submodel_client)
    submodels_of_type = []
    for submodel_data in submodels_data:
        basyx_submodel = client_utils.transform_client_to_basyx_model(submodel_data)
        submodel = convert_submodel_to_model(basyx_submodel)
        if submodel.__class__.__name__ == model.__name__:
            submodels_of_type.append(submodel)
    return submodels_of_type
get_basyx_submodel_from_server(submodel_id, submodel_client)
  
      async
  
    Function to get a submodel from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
submodel_id | 
        str | 
        id of the submodel  | 
        required | 
submodel_client | 
        SubmodelClient | 
        client to connect to the server  | 
        required | 
Returns:
| Type | Description | 
|---|---|
model.Submodel | 
      submodel retrieved from the server  | 
    
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If submodel with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def get_basyx_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient) -> model.Submodel:
    """
    Function to get a submodel from the server
    Args:
        submodel_id (str): id of the submodel
        submodel_client (SubmodelClient): client to connect to the server
    Returns:
        model.Submodel: submodel retrieved from the server
    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    base_64_id = client_utils.get_base64_from_string(submodel_id)
    try:
        submodel_data = await get_submodel_by_id.asyncio(
            client=submodel_client, submodel_identifier=base_64_id
        )
        return client_utils.transform_client_to_basyx_model(submodel_data.to_dict())
    except Exception as e:
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel_id} could not be retrieved. Error: {e}"
        )
get_submodel_from_server(submodel_id, submodel_client)
  
      async
  
    Function to get a submodel from the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
submodel_id | 
        str | 
        id of the submodel  | 
        required | 
Returns:
| Type | Description | 
|---|---|
aas_model.Submodel | 
      submodel retrieved from the server  | 
    
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If submodel with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def get_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient) -> aas_model.Submodel:
    """
    Function to get a submodel from the server
    Args:
        submodel_id (str): id of the submodel
    Returns:
        aas_model.Submodel: submodel retrieved from the server
    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    try:
        basyx_submodel = await get_basyx_submodel_from_server(submodel_id, submodel_client)
        return convert_submodel_to_model(basyx_submodel)
    except HTTPException as e:
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel_id} could not be retrieved. Error: {e}"
        )
post_submodel_to_server(pydantic_submodel, submodel_client)
  
      async
  
    Function to post a submodel to the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
pydantic_submodel | 
        aas_model.Submodel | 
        submodel to post  | 
        required | 
submodel_client | 
        SubmodelClient | 
        client to connect to the server  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If submodel with the given id already exists  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def post_submodel_to_server(pydantic_submodel: aas_model.Submodel, submodel_client: SubmodelClient):
    """
    Function to post a submodel to the server
    Args:
        pydantic_submodel (aas_model.Submodel): submodel to post
        submodel_client (SubmodelClient): client to connect to the server
    Raises:
        HTTPException: If submodel with the given id already exists
    """
    if await submodel_is_on_server(pydantic_submodel.id, submodel_client):
        raise HTTPException(
            status_code=400,
            detail=f"Submodel with id {pydantic_submodel.id} already exists. Try putting it instead.",
        )
    basyx_submodel = convert_model_to_submodel(pydantic_submodel)
    submodel_for_client = client_utils.ClientModel(basyx_object=basyx_submodel)
    # TODO: make a try except with json.decoder.JSONDecodeError to avoid error when posting a submodel that already exists, same goes for aas
    response = await post_submodel.asyncio(client=submodel_client, body=submodel_for_client)
put_submodel_to_server(submodel, submodel_client)
  
      async
  
    Function to put a submodel to the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
submodel | 
        aas_model.Submodel | 
        submodel to put  | 
        required | 
submodel_client | 
        SubmodelClient | 
        client to connect to the server  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
HTTPException | 
        If submodel with the given id does not exist  | 
      
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def put_submodel_to_server(submodel: aas_model.Submodel, submodel_client: SubmodelClient):
    """
    Function to put a submodel to the server
    Args:
        submodel (aas_model.Submodel): submodel to put
        submodel_client (SubmodelClient): client to connect to the server
    Raises:
        HTTPException: If submodel with the given id does not exist
    """
    if not await submodel_is_on_server(submodel.id, submodel_client):
        raise HTTPException(
            status_code=400, detail=f"Submodel with id {submodel.id} does not exist. Try posting it first."
        )
    basyx_submodel = convert_model_to_submodel(submodel)
    submodel_for_client = client_utils.ClientModel(basyx_object=basyx_submodel)
    base_64_id = client_utils.get_base64_from_string(submodel.id)
    response = await put_submodel_by_id.asyncio(
        submodel_identifier=base_64_id, client=submodel_client, body=submodel_for_client
    )
submodel_is_on_server(submodel_id, submodel_client)
  
      async
  
    Function to check if a submodel with the given id is on the server
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
submodel_id | 
        str | 
        id of the submodel  | 
        required | 
submodel_client | 
        SubmodelClient | 
        client to connect to the server  | 
        required | 
Returns:
| Type | Description | 
|---|---|
bool | 
      True if submodel is on server, False if not  | 
    
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
          async def submodel_is_on_server(submodel_id: str, submodel_client: SubmodelClient) -> bool:
    """
    Function to check if a submodel with the given id is on the server
    Args:
        submodel_id (str): id of the submodel
        submodel_client (SubmodelClient): client to connect to the server
    Returns:
        bool: True if submodel is on server, False if not
    """
    try:
        await get_submodel_from_server(submodel_id, submodel_client)
        return True
    except HTTPException as e:
        return False
        async_connector
    
        
Publisher            (Protocol)
        
    Source code in aas_middleware\connect\connectors\async_connector.py
          @runtime_checkable
class Publisher(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def publish(self, topic: str, message: Any) -> None:
        """
        Interfaces for a publisher to publish messages to a topic.
        Args:
            topic (str): The topic to publish the message to.
            message (Any): The message to be published.
        Raises:
            ConnectionError: If the publishing of the message failed.
        """
        ...
connect(self)
  
      async
  
    
  
disconnect(self)
  
      async
  
    
  
publish(self, topic, message)
  
      async
  
    Interfaces for a publisher to publish messages to a topic.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
topic | 
        str | 
        The topic to publish the message to.  | 
        required | 
message | 
        Any | 
        The message to be published.  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
ConnectionError | 
        If the publishing of the message failed.  | 
      
Source code in aas_middleware\connect\connectors\async_connector.py
          
        
        
Subsciber            (Protocol)
        
    Source code in aas_middleware\connect\connectors\async_connector.py
          @runtime_checkable
class Subsciber(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def subscribe(self, topic: str, callback: Callable[[Any], Awaitable[None] | Any]) -> None:
        """
        Interfaces for a subscriber to subscribe to a topic and receive messages.
        Args:
            topic (str): The topic to subscribe to.
            callback (Callable[[str, str], Awaitable[None]]): The callback to be executed when a message is received.
        Raises:
            ConnectionError: If the subscribing to the topic failed.
        """
        ...
connect(self)
  
      async
  
    
  
disconnect(self)
  
      async
  
    
  
subscribe(self, topic, callback)
  
      async
  
    Interfaces for a subscriber to subscribe to a topic and receive messages.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
topic | 
        str | 
        The topic to subscribe to.  | 
        required | 
callback | 
        Callable[[str, str], Awaitable[None]] | 
        The callback to be executed when a message is received.  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
ConnectionError | 
        If the subscribing to the topic failed.  | 
      
Source code in aas_middleware\connect\connectors\async_connector.py
          async def subscribe(self, topic: str, callback: Callable[[Any], Awaitable[None] | Any]) -> None:
    """
    Interfaces for a subscriber to subscribe to a topic and receive messages.
    Args:
        topic (str): The topic to subscribe to.
        callback (Callable[[str, str], Awaitable[None]]): The callback to be executed when a message is received.
    Raises:
        ConnectionError: If the subscribing to the topic failed.
    """
    ...
        connector
    
        
Consumer            (Protocol)
        
    Source code in aas_middleware\connect\connectors\connector.py
          @runtime_checkable
class Consumer(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def consume(self, body: Any) -> None:
        """
        Interfaces for a consumer to consume data and send in with the connection to the consumer.
        Args:
            body (Any): The data to be consumed.
        Raises:
            ConnectionError: If the consuming of the data failed.
        """
        ...
connect(self)
  
      async
  
    
  
consume(self, body)
  
      async
  
    Interfaces for a consumer to consume data and send in with the connection to the consumer.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
body | 
        Any | 
        The data to be consumed.  | 
        required | 
Exceptions:
| Type | Description | 
|---|---|
ConnectionError | 
        If the consuming of the data failed.  | 
      
Source code in aas_middleware\connect\connectors\connector.py
          
        
disconnect(self)
  
      async
  
    
  
        
Provider            (Protocol)
        
    Source code in aas_middleware\connect\connectors\connector.py
          @runtime_checkable
class Provider(Protocol):
    async def connect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def disconnect(self):
        """
        Raises:
            ConnectionError: If the connection to the server could not be established.
        """
        ...
    async def provide(self) -> Any:
        """
        Interfaces for a provider to provide data.
        Returns:
            Any: The data to be provided.
        Raises:
            ConnectionError: If the providing of the data failed.
        """
        ...
connect(self)
  
      async
  
    
  
disconnect(self)
  
      async
  
    
  
provide(self)
  
      async
  
    Interfaces for a provider to provide data.
Returns:
| Type | Description | 
|---|---|
Any | 
      The data to be provided.  | 
    
Exceptions:
| Type | Description | 
|---|---|
ConnectionError | 
        If the providing of the data failed.  | 
      
        web_hook_client_connector
    
        
WebHookClientConnector        
    Class for a WebHookClientConnector that can be used to connect to a webhook and receive data from it.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
web_hook_url | 
        str | 
        adress of the webhook  | 
        required | 
Attributes:
| Name | Type | Description | 
|---|---|---|
web_hook_url | 
        str | 
        adress of the webhook server  | 
      
own_url | 
        Optional[str] | 
        own url where the webhook sends data to  | 
      
hook | 
        Optional[anyio.Event] | 
        hook event for posts of the webhook  | 
      
connected_subscriber | 
        Optional[str] | 
        adress of the subscriber that is connected to the webhook  | 
      
received_data | 
        str | 
        data received from the webhook  | 
      
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
          class WebHookClientConnector:
    """
    Class for a WebHookClientConnector that can be used to connect to a webhook and receive data from it.
    Args:
        web_hook_url (str): adress of the webhook
    Attributes:
        web_hook_url (str): adress of the webhook server
        own_url (Optional[str]): own url where the webhook sends data to
        hook (Optional[anyio.Event]): hook event for posts of the webhook
        connected_subscriber (Optional[str]): adress of the subscriber that is connected to the webhook
        received_data (str): data received from the webhook
    """
    def __init__(self, web_hook_url: str):
        """
        Args:
            web_hook_url (str): adress of the webhook
        """
        self.web_hook_url = web_hook_url
        self.own_url: Optional[str] = None
        self.hook: Optional[anyio.Event] = None
        self.connected_subscriber: Optional[str] = None
        self.received_data: str = ""
    async def set_hook(self, hook: anyio.Event, url: str):
        """
        Function sets the hook event for posts of the webhook and the url where the webhook should post to.
        """
        # TODO: set this hook when adding the hook to a provider so that it is linked to fastAPI post route of the datamodel
        # either to this with a callback or these events.
        self.hook = hook
        self.own_url = url
    async def trigger_hook(self, body: str):
        """
        Function triggers the hook for the webhook.
        """
        if not self.hook:
            raise HTTPException(
                status_code=400,
                detail="Webhook does not have a hook set. Please use the set_hook function to set the hook.",
            )
        self.received_data = body
        self.hook.set()
    async def connect(self):
        """
        Function connects to the webhook with a post request that sends the middleware url to the webhook.
        """
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.web_hook_url, json={"url": self.own_url}
            ) as response:
                status = response.status
                if status != 200:
                    raise HTTPException(
                        status_code=400,
                        detail="Webhook could not connect to the webhook server.",
                    )
                # TODO: add logging for somethink like this!
                await response.text()
    async def disconnect(self):
        """
        Function disconnects from the webhook.
        """
        # TODO: either remove the hookevent from the provider or set it to false or
        # remove the callback from the endpoint of the provider
    async def consume(self, body: str) -> str:
        raise NotImplementedError(
            "WebHookClientConnector does not support sending data but only receiving data. Try to use the WebHookServerConnector instead."
        )
    async def provide(self) -> str:
        """
        Function receives data from the webhook.
        """
        if not self.hook:
            raise Exception(
                "WebHookClientConnector does not have a hook set. Please use the set_hook function to set the hook."
            )
        await self.hook.wait()
        self.hook = anyio.Event()
        return self.received_data
__init__(self, web_hook_url)
  
      special
  
    Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
web_hook_url | 
        str | 
        adress of the webhook  | 
        required | 
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
          
        
connect(self)
  
      async
  
    Function connects to the webhook with a post request that sends the middleware url to the webhook.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
          async def connect(self):
    """
    Function connects to the webhook with a post request that sends the middleware url to the webhook.
    """
    async with aiohttp.ClientSession() as session:
        async with session.post(
            self.web_hook_url, json={"url": self.own_url}
        ) as response:
            status = response.status
            if status != 200:
                raise HTTPException(
                    status_code=400,
                    detail="Webhook could not connect to the webhook server.",
                )
            # TODO: add logging for somethink like this!
            await response.text()
disconnect(self)
  
      async
  
    Function disconnects from the webhook.
provide(self)
  
      async
  
    Function receives data from the webhook.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
          
        
set_hook(self, hook, url)
  
      async
  
    Function sets the hook event for posts of the webhook and the url where the webhook should post to.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
          async def set_hook(self, hook: anyio.Event, url: str):
    """
    Function sets the hook event for posts of the webhook and the url where the webhook should post to.
    """
    # TODO: set this hook when adding the hook to a provider so that it is linked to fastAPI post route of the datamodel
    # either to this with a callback or these events.
    self.hook = hook
    self.own_url = url
trigger_hook(self, body)
  
      async
  
    Function triggers the hook for the webhook.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
          
        
        web_hook_server_connector
    
        
WebHookServerConnector        
    Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
          class WebHookServerConnector:
    def __init__(self):
        self.hook: anyio.Event = anyio.Event()
        self.connected_subscribers: List[str] = []
        self.received_data: str = ""
        self.connectable: bool = False
    async def start_server(self) -> None:
        """
        Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
        """
        # TODO: add functionality that when added to a consumer, an endpoint is created for webhook clients to connect to.
        # and one endpoint to remove client subsciptions. THe register and unregister functions should then be used.
        pass
    async def register_subscriber(self, url: str) -> None:
        """
        Function registers a subscriber to the webhook.
        Args:
            url (str): _description_
        """
        if not self.connectable:
            raise HTTPException(status_code=400, detail="Webhook not connectable.")
        self.connected_subscribers.append(url)
    async def unregister_subscriber(self, url: str) -> None:
        """
        Function unregisters a subscriber from the webhook.
        Args:
            url (str): _description_
        """
        try:
            self.connected_subscribers.remove(url)
        except ValueError:
            raise HTTPException(status_code=400, detail="Subscriber not registered.")
    async def connect(self):
        """
        Function allows clients to connect to the webhook.
        """
        self.connectable = True
    async def disconnect(self):
        """
        Function disconnects all subscribed clients from the webhook.
        """
        self.connectable = False
        self.connected_subscribers = []
    async def consume(self) -> str:
        """
        Function sends data to all subscribed clients of the webhook.
        Args:
            body (str): _description_
        Returns:
            str: _description_
        """
        await self.hook.wait()
        if not self.connectable:
            raise HTTPException(status_code=400, detail="Webhook not connectable.")
        self.hook.set()
        for subscriber in self.connected_subscribers:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    subscriber, data=self.received_data
                ) as response:
                    await response.text()
        return "Webhook message sent"
    async def provide(self) -> str:
        raise NotImplementedError
connect(self)
  
      async
  
    
  
consume(self)
  
      async
  
    Function sends data to all subscribed clients of the webhook.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
body | 
        str | 
        description  | 
        required | 
Returns:
| Type | Description | 
|---|---|
str | 
      description  | 
    
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
          async def consume(self) -> str:
    """
    Function sends data to all subscribed clients of the webhook.
    Args:
        body (str): _description_
    Returns:
        str: _description_
    """
    await self.hook.wait()
    if not self.connectable:
        raise HTTPException(status_code=400, detail="Webhook not connectable.")
    self.hook.set()
    for subscriber in self.connected_subscribers:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                subscriber, data=self.received_data
            ) as response:
                await response.text()
    return "Webhook message sent"
disconnect(self)
  
      async
  
    
  
register_subscriber(self, url)
  
      async
  
    Function registers a subscriber to the webhook.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
url | 
        str | 
        description  | 
        required | 
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
          
        
start_server(self)
  
      async
  
    Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
          async def start_server(self) -> None:
    """
    Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
    """
    # TODO: add functionality that when added to a consumer, an endpoint is created for webhook clients to connect to.
    # and one endpoint to remove client subsciptions. THe register and unregister functions should then be used.
    pass
unregister_subscriber(self, url)
  
      async
  
    Function unregisters a subscriber from the webhook.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
url | 
        str | 
        description  | 
        required | 
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
          
        
        web_socket_server_connector
    
        
WebSocketServerConnector        
    Source code in aas_middleware\connect\connectors\web_socket_server_connector.py
          class WebSocketServerConnector:
    def __init__(
        self, host: str, port: int, reply_function: Optional[Callable[[], str]] = None
    ):
        self.host = host
        self.port = port
        self.server: Optional[websockets.WebSocketServer] = None
        self.websocket: Optional[websockets.WebSocketServerProtocol] = None
        self.received_data_event = anyio.Event()
        self.received_data: str = ""
        self.reply_function = reply_function
        self.connectable = False
    async def start_server(self) -> None:
        self.server = await websockets.serve(
            self.handle_websocket, self.host, self.port
        )
    async def handle_websocket(
        self, websocket: websockets.WebSocketServerProtocol, path: str
    ) -> None:
        if not self.connectable:
            await websocket.close()
            return
        self.websocket = websocket
        try:
            async for message in websocket:
                print(f"Received message: {message}")
                if isinstance(message, bytes):
                    message = message.decode()
                self.received_data = message
                self.received_data_event.set()
                if self.reply_function:
                    await websocket.send(self.reply_function())
                else:
                    await websocket.send("Message received")
        except websockets.exceptions.ConnectionClosed:
            raise HTTPException(status_code=400, detail="Websocket connection closed.")
    async def connect(self) -> None:
        """
        Function activates that connection to the server are allowed and accepted.
        """
        if not self.server:
            raise HTTPException(
                status_code=400, detail="Websocket server is not started."
            )
        self.connectable = True
        if not self.server:
            await self.start_server()
    async def disconnect(self) -> None:
        self.connectable = False
        if self.websocket:
            await self.websocket.close()
            self.websocket = None
        if self.server:
            self.server.close()
            await self.server.wait_closed()
            self.server = None
    async def consume(self, body: str) -> Optional[str]:
        if not self.server:
            raise HTTPException(
                status_code=400, detail="Websocket server is not started."
            )
        if not self.websocket:
            raise HTTPException(
                status_code=400, detail="Websocket not connected to websocket server."
            )
        await self.websocket.send(body)
    async def provide(self) -> str:
        await self.received_data_event.wait()
        self.received_data_event = anyio.Event()
        return self.received_data
connect(self)
  
      async
  
    Function activates that connection to the server are allowed and accepted.