Connect API
connectors
special
aas_client_connector
special
aas_client
aas_is_on_server(aas_id, aas_client)
async
Function to check if an AAS with the given id is on the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_id |
str |
id of the AAS |
required |
Returns:
Type | Description |
---|---|
bool |
True if AAS is on server, False if not |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def aas_is_on_server(aas_id: str, aas_client: AASClient) -> bool:
"""
Function to check if an AAS with the given id is on the server
Args:
aas_id (str): id of the AAS
Returns:
bool: True if AAS is on server, False if not
"""
try:
await get_basyx_aas_from_server(aas_id, aas_client)
return True
except Exception as e:
return False
delete_aas_from_server(aas_id, aas_client)
async
Function to delete an AAS from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_id |
str |
id of the AAS |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If AAS with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def delete_aas_from_server(aas_id: str, aas_client: AASClient):
"""
Function to delete an AAS from the server
Args:
aas_id (str): id of the AAS
Raises:
HTTPException: If AAS with the given id does not exist
"""
if not await aas_is_on_server(aas_id, aas_client):
raise HTTPException(
status_code=400, detail=f"AAS with id {aas_id} does not exist. Cannot delete it."
)
base_64_id = client_utils.get_base64_from_string(aas_id)
response = await delete_asset_administration_shell_by_id.asyncio(
client=aas_client, aas_identifier=base_64_id
)
get_aas_from_server(aas_id, aas_client, submodel_client)
async
Function to get an AAS from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_id |
str |
id of the AAS |
required |
Returns:
Type | Description |
---|---|
aas_model.AAS |
AAS retrieved from the server |
Exceptions:
Type | Description |
---|---|
HTTPException |
If AAS with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_aas_from_server(aas_id: str, aas_client: AASClient, submodel_client: SubmodelClient) -> aas_model.AAS:
"""
Function to get an AAS from the server
Args:
aas_id (str): id of the AAS
Returns:
aas_model.AAS: AAS retrieved from the server
Raises:
HTTPException: If AAS with the given id does not exist
"""
try:
aas = await get_basyx_aas_from_server(aas_id, aas_client)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"AAS with id {aas_id} could not be retrieved. Error: {e}"
)
try:
aas_submodels = await get_all_basyx_submodels_from_server(aas, submodel_client)
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Submodels of AAS with id {aas_id} could not be retrieved. Error: {e}"
)
obj_store = model.DictObjectStore()
obj_store.add(aas)
[obj_store.add(submodel) for submodel in aas_submodels]
data_model = BasyxFormatter().deserialize(obj_store)
model_data = data_model.get_model(aas_id)
return model_data
get_all_aas_from_server(pydantic_model, aas_client, submodel_client)
async
Function to get all AAS from the server
Returns:
Type | Description |
---|---|
List[aas_model.AAS] |
List of AAS retrieved from the server |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_all_aas_from_server(pydantic_model: BaseModel, aas_client: AASClient, submodel_client: SubmodelClient) -> List[aas_model.AAS]:
"""
Function to get all AAS from the server
Returns:
List[aas_model.AAS]: List of AAS retrieved from the server
"""
result_string = await get_all_asset_administration_shells.asyncio(client=aas_client)
aas_data = result_string["result"]
aas_list = [client_utils.transform_client_to_basyx_model(aas) for aas in aas_data]
submodels = []
for aas in aas_list:
aas_submodels = await get_all_basyx_submodels_from_server(aas, submodel_client)
submodels.extend(aas_submodels)
obj_store = model.DictObjectStore()
[obj_store.add(aas) for aas in aas_list]
[obj_store.add(submodel) for submodel in submodels if not any(submodel.id == other_sm.id for other_sm in obj_store)]
data_model = BasyxFormatter().deserialize(model_data)
model_data = data_model.get_models_of_type(aas_model.AAS)
model_data = [model for model in model_data if model.__class__.__name__ == pydantic_model.__name__]
return model_data
get_basyx_aas_from_server(aas_id, aas_client)
async
Function to get an AAS from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_id |
str |
id of the AAS |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If AAS with the given id does not exist |
Returns:
Type | Description |
---|---|
model.AssetAdministrationShell |
AAS retrieved from the server |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_basyx_aas_from_server(aas_id: str, aas_client: AASClient) -> model.AssetAdministrationShell:
"""
Function to get an AAS from the server
Args:
aas_id (str): id of the AAS
Raises:
HTTPException: If AAS with the given id does not exist
Returns:
model.AssetAdministrationShell: AAS retrieved from the server
"""
base_64_id = client_utils.get_base64_from_string(aas_id)
try:
aas_data = await get_asset_administration_shell_by_id.asyncio(
client=aas_client, aas_identifier=base_64_id
)
return client_utils.transform_client_to_basyx_model(aas_data.to_dict())
except Exception as e:
raise ConnectionError(
e
)
get_submodel_from_aas_id_and_class_name(aas_id, class_name, aas_client, submodel_client)
async
Function to get a submodel from the server based on the AAS id and the class name of the submodel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas_id |
str |
id of the AAS |
required |
class_name |
str |
class name of the submodel |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If submodel with the given class name does not exist for the given AAS |
Returns:
Type | Description |
---|---|
aas_model.Submodel |
submodel retrieved from the server |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def get_submodel_from_aas_id_and_class_name(aas_id: str, class_name: str, aas_client: AASClient, submodel_client: SubmodelClient) -> aas_model.Submodel:
"""
Function to get a submodel from the server based on the AAS id and the class name of the submodel
Args:
aas_id (str): id of the AAS
class_name (str): class name of the submodel
Raises:
HTTPException: If submodel with the given class name does not exist for the given AAS
Returns:
aas_model.Submodel: submodel retrieved from the server
"""
basyx_aas = await get_basyx_aas_from_server(aas_id, aas_client)
for basyx_submodel in basyx_aas.submodel:
submodel_id = basyx_submodel.key[0].value
submodel = await get_submodel_from_server(submodel_id, submodel_client)
if submodel.__class__.__name__ == class_name:
return submodel
raise HTTPException(
status_code=411,
detail=f"Submodel with name {class_name} does not exist for AAS with id {aas_id}",
)
post_aas_to_server(aas, aas_client, submodel_client)
async
Function to post an AAS to the server. Also posts all submodels of the AAS to the server, if they do not exist yet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas |
aas_model.AAS |
AAS to post |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If AAS with the given id already exists |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def post_aas_to_server(aas: aas_model.AAS, aas_client: AASClient, submodel_client: SubmodelClient):
"""
Function to post an AAS to the server. Also posts all submodels of the AAS to the server, if they do not exist yet.
Args:
aas (aas_model.AAS): AAS to post
Raises:
HTTPException: If AAS with the given id already exists
"""
if await aas_is_on_server(aas.id, aas_client):
raise HTTPException(
status_code=400, detail=f"AAS with id {aas.id} already exists"
)
check_aas_for_duplicate_ids(aas)
obj_store = convert_model_to_aas(aas)
basyx_aas = obj_store.get(aas.id)
aas_for_client = ClientModel(basyx_object=basyx_aas)
response = await post_asset_administration_shell.asyncio(
client=aas_client, body=aas_for_client
)
aas_attributes = get_value_attributes(aas)
for submodel in aas_attributes.values():
if not await submodel_is_on_server(submodel.id, submodel_client):
await post_submodel_to_server(submodel, submodel_client)
else:
logger.info(f"Submodel with id {submodel.id} already exists on the server. Updating the value.")
await put_submodel_to_server(submodel, submodel_client)
put_aas_to_server(aas, aas_client, submodel_client)
async
Function to put an AAS to the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas |
aas_model.AAS |
AAS to put |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If AAS with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\aas_client.py
async def put_aas_to_server(aas: aas_model.AAS, aas_client: AASClient, submodel_client: SubmodelClient):
"""
Function to put an AAS to the server
Args:
aas (aas_model.AAS): AAS to put
Raises:
HTTPException: If AAS with the given id does not exist
"""
if not await aas_is_on_server(aas.id, aas_client):
raise HTTPException(
status_code=400, detail=f"AAS with id {aas.id} does not exist"
)
obj_store = convert_model_to_aas(aas)
basyx_aas = obj_store.get(aas.id)
aas_for_client = ClientModel(basyx_object=basyx_aas)
base_64_id = client_utils.get_base64_from_string(aas.id)
await put_asset_administration_shell_by_id.asyncio(
aas_identifier=base_64_id, client=aas_client, body=aas_for_client
)
for submodel in get_value_attributes(aas).values():
if await submodel_is_on_server(submodel.id, submodel_client):
await put_submodel_to_server(submodel, submodel_client)
else:
await post_submodel_to_server(submodel, submodel_client)
client_utils
transform_client_to_basyx_model(client_model)
Function to transform a client model to a basyx model
Parameters:
Name | Type | Description | Default |
---|---|---|---|
response_model |
dict |
dictionary from server client that needs to be transformed |
required |
Returns:
Type | Description |
---|---|
Union[model.AssetAdministrationShell, model.Submodel] |
basyx model from the given client model |
Source code in aas_middleware\connect\connectors\aas_client_connector\client_utils.py
def transform_client_to_basyx_model(
client_model: dict | Any,
) -> Union[model.AssetAdministrationShell, model.Submodel]:
"""
Function to transform a client model to a basyx model
Args:
response_model (dict): dictionary from server client that needs to be transformed
Returns:
Union[model.AssetAdministrationShell, model.Submodel]: basyx model from the given client model
"""
if not isinstance(client_model, dict):
client_model = client_model.to_dict()
remove_empty_lists(client_model)
json_model = json.dumps(client_model, indent=4)
basyx_model = json.loads(json_model, cls=basyx.aas.adapter.json.AASFromJsonDecoder)
return basyx_model
submodel_client
delete_submodel_from_server(submodel_id, submodel_client)
async
Function to delete a submodel from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
submodel_id |
str |
id of the submodel |
required |
submodel_client |
SubmodelClient |
client to connect to the server |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If submodel with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def delete_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient):
"""
Function to delete a submodel from the server
Args:
submodel_id (str): id of the submodel
submodel_client (SubmodelClient): client to connect to the server
Raises:
HTTPException: If submodel with the given id does not exist
"""
if not await submodel_is_on_server(submodel_id, submodel_client):
raise HTTPException(
status_code=400, detail=f"Submodel with id {submodel_id} does not exist. Cannot delete it."
)
base_64_id = client_utils.get_base64_from_string(submodel_id)
await delete_submodel_by_id.asyncio(client=submodel_client, submodel_identifier=base_64_id)
get_all_basyx_submodels_from_server(aas, submodel_client)
async
Function to get all submodels from an AAS in basyx format
Parameters:
Name | Type | Description | Default |
---|---|---|---|
aas |
model.AssetAdministrationShell |
AAS to get submodels from |
required |
submodel_client |
SubmodelClient |
client to connect to the server |
required |
Returns:
Type | Description |
---|---|
List[model.Submodel] |
List of basyx submodels retrieved from the server |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_all_basyx_submodels_from_server(aas: model.AssetAdministrationShell, submodel_client: SubmodelClient) -> List[ClientSubmodel]:
"""
Function to get all submodels from an AAS in basyx format
Args:
aas (model.AssetAdministrationShell): AAS to get submodels from
submodel_client (SubmodelClient): client to connect to the server
Returns:
List[model.Submodel]: List of basyx submodels retrieved from the server
"""
submodels = []
for submodel_reference in aas.submodel:
basyx_submodel = await get_basyx_submodel_from_server(submodel_reference.key[0].value, submodel_client)
submodels.append(basyx_submodel)
return submodels
get_all_submodel_data_from_server(submodel_client)
async
Function to get all submodels from the server
Returns:
Type | Description |
---|---|
List[aas_model.Submodel] |
List of submodels retrieved from the server |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_all_submodel_data_from_server(submodel_client: SubmodelClient) -> List[ClientSubmodel]:
"""
Function to get all submodels from the server
Returns:
List[aas_model.Submodel]: List of submodels retrieved from the server
"""
submodel_data = await get_all_submodels.asyncio(client=submodel_client)
submodel_data = submodel_data.result
return submodel_data
get_all_submodels_of_type(model, submodel_client)
async
Function to get all submodels of a certain type from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
BaseModel |
Pydantic model of the submodel |
required |
Returns:
Type | Description |
---|---|
List[aas_model.Submodel] |
List of submodels retrieved from the server |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_all_submodels_of_type(model: BaseModel, submodel_client: SubmodelClient) -> List[aas_model.Submodel]:
"""
Function to get all submodels of a certain type from the server
Args:
model (BaseModel): Pydantic model of the submodel
Returns:
List[aas_model.Submodel]: List of submodels retrieved from the server
"""
submodels_data = await get_all_submodel_data_from_server(submodel_client)
submodels_of_type = []
for submodel_data in submodels_data:
basyx_submodel = client_utils.transform_client_to_basyx_model(submodel_data)
submodel = convert_submodel_to_model(basyx_submodel)
if submodel.__class__.__name__ == model.__name__:
submodels_of_type.append(submodel)
return submodels_of_type
get_basyx_submodel_from_server(submodel_id, submodel_client)
async
Function to get a submodel from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
submodel_id |
str |
id of the submodel |
required |
submodel_client |
SubmodelClient |
client to connect to the server |
required |
Returns:
Type | Description |
---|---|
model.Submodel |
submodel retrieved from the server |
Exceptions:
Type | Description |
---|---|
HTTPException |
If submodel with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_basyx_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient) -> model.Submodel:
"""
Function to get a submodel from the server
Args:
submodel_id (str): id of the submodel
submodel_client (SubmodelClient): client to connect to the server
Returns:
model.Submodel: submodel retrieved from the server
Raises:
HTTPException: If submodel with the given id does not exist
"""
base_64_id = client_utils.get_base64_from_string(submodel_id)
try:
submodel_data = await get_submodel_by_id.asyncio(
client=submodel_client, submodel_identifier=base_64_id
)
return client_utils.transform_client_to_basyx_model(submodel_data.to_dict())
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Submodel with id {submodel_id} could not be retrieved. Error: {e}"
)
get_submodel_from_server(submodel_id, submodel_client)
async
Function to get a submodel from the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
submodel_id |
str |
id of the submodel |
required |
Returns:
Type | Description |
---|---|
aas_model.Submodel |
submodel retrieved from the server |
Exceptions:
Type | Description |
---|---|
HTTPException |
If submodel with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def get_submodel_from_server(submodel_id: str, submodel_client: SubmodelClient) -> aas_model.Submodel:
"""
Function to get a submodel from the server
Args:
submodel_id (str): id of the submodel
Returns:
aas_model.Submodel: submodel retrieved from the server
Raises:
HTTPException: If submodel with the given id does not exist
"""
try:
basyx_submodel = await get_basyx_submodel_from_server(submodel_id, submodel_client)
return convert_submodel_to_model(basyx_submodel)
except HTTPException as e:
raise HTTPException(
status_code=400, detail=f"Submodel with id {submodel_id} could not be retrieved. Error: {e}"
)
post_submodel_to_server(pydantic_submodel, submodel_client)
async
Function to post a submodel to the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pydantic_submodel |
aas_model.Submodel |
submodel to post |
required |
submodel_client |
SubmodelClient |
client to connect to the server |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If submodel with the given id already exists |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def post_submodel_to_server(pydantic_submodel: aas_model.Submodel, submodel_client: SubmodelClient):
"""
Function to post a submodel to the server
Args:
pydantic_submodel (aas_model.Submodel): submodel to post
submodel_client (SubmodelClient): client to connect to the server
Raises:
HTTPException: If submodel with the given id already exists
"""
if await submodel_is_on_server(pydantic_submodel.id, submodel_client):
raise HTTPException(
status_code=400,
detail=f"Submodel with id {pydantic_submodel.id} already exists. Try putting it instead.",
)
basyx_submodel = convert_model_to_submodel(pydantic_submodel)
submodel_for_client = client_utils.ClientModel(basyx_object=basyx_submodel)
# TODO: make a try except with json.decoder.JSONDecodeError to avoid error when posting a submodel that already exists, same goes for aas
response = await post_submodel.asyncio(client=submodel_client, body=submodel_for_client)
put_submodel_to_server(submodel, submodel_client)
async
Function to put a submodel to the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
submodel |
aas_model.Submodel |
submodel to put |
required |
submodel_client |
SubmodelClient |
client to connect to the server |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
If submodel with the given id does not exist |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def put_submodel_to_server(submodel: aas_model.Submodel, submodel_client: SubmodelClient):
"""
Function to put a submodel to the server
Args:
submodel (aas_model.Submodel): submodel to put
submodel_client (SubmodelClient): client to connect to the server
Raises:
HTTPException: If submodel with the given id does not exist
"""
if not await submodel_is_on_server(submodel.id, submodel_client):
raise HTTPException(
status_code=400, detail=f"Submodel with id {submodel.id} does not exist. Try posting it first."
)
basyx_submodel = convert_model_to_submodel(submodel)
submodel_for_client = client_utils.ClientModel(basyx_object=basyx_submodel)
base_64_id = client_utils.get_base64_from_string(submodel.id)
response = await put_submodel_by_id.asyncio(
submodel_identifier=base_64_id, client=submodel_client, body=submodel_for_client
)
submodel_is_on_server(submodel_id, submodel_client)
async
Function to check if a submodel with the given id is on the server
Parameters:
Name | Type | Description | Default |
---|---|---|---|
submodel_id |
str |
id of the submodel |
required |
submodel_client |
SubmodelClient |
client to connect to the server |
required |
Returns:
Type | Description |
---|---|
bool |
True if submodel is on server, False if not |
Source code in aas_middleware\connect\connectors\aas_client_connector\submodel_client.py
async def submodel_is_on_server(submodel_id: str, submodel_client: SubmodelClient) -> bool:
"""
Function to check if a submodel with the given id is on the server
Args:
submodel_id (str): id of the submodel
submodel_client (SubmodelClient): client to connect to the server
Returns:
bool: True if submodel is on server, False if not
"""
try:
await get_submodel_from_server(submodel_id, submodel_client)
return True
except HTTPException as e:
return False
async_connector
Publisher (Protocol)
Source code in aas_middleware\connect\connectors\async_connector.py
@runtime_checkable
class Publisher(Protocol):
async def connect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def disconnect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def publish(self, topic: str, message: Any) -> None:
"""
Interfaces for a publisher to publish messages to a topic.
Args:
topic (str): The topic to publish the message to.
message (Any): The message to be published.
Raises:
ConnectionError: If the publishing of the message failed.
"""
...
connect(self)
async
disconnect(self)
async
publish(self, topic, message)
async
Interfaces for a publisher to publish messages to a topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str |
The topic to publish the message to. |
required |
message |
Any |
The message to be published. |
required |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the publishing of the message failed. |
Source code in aas_middleware\connect\connectors\async_connector.py
Subsciber (Protocol)
Source code in aas_middleware\connect\connectors\async_connector.py
@runtime_checkable
class Subsciber(Protocol):
async def connect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def disconnect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def subscribe(self, topic: str, callback: Callable[[Any], Awaitable[None] | Any]) -> None:
"""
Interfaces for a subscriber to subscribe to a topic and receive messages.
Args:
topic (str): The topic to subscribe to.
callback (Callable[[str, str], Awaitable[None]]): The callback to be executed when a message is received.
Raises:
ConnectionError: If the subscribing to the topic failed.
"""
...
connect(self)
async
disconnect(self)
async
subscribe(self, topic, callback)
async
Interfaces for a subscriber to subscribe to a topic and receive messages.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic |
str |
The topic to subscribe to. |
required |
callback |
Callable[[str, str], Awaitable[None]] |
The callback to be executed when a message is received. |
required |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the subscribing to the topic failed. |
Source code in aas_middleware\connect\connectors\async_connector.py
async def subscribe(self, topic: str, callback: Callable[[Any], Awaitable[None] | Any]) -> None:
"""
Interfaces for a subscriber to subscribe to a topic and receive messages.
Args:
topic (str): The topic to subscribe to.
callback (Callable[[str, str], Awaitable[None]]): The callback to be executed when a message is received.
Raises:
ConnectionError: If the subscribing to the topic failed.
"""
...
connector
Consumer (Protocol)
Source code in aas_middleware\connect\connectors\connector.py
@runtime_checkable
class Consumer(Protocol):
async def connect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def disconnect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def consume(self, body: Any) -> None:
"""
Interfaces for a consumer to consume data and send in with the connection to the consumer.
Args:
body (Any): The data to be consumed.
Raises:
ConnectionError: If the consuming of the data failed.
"""
...
connect(self)
async
consume(self, body)
async
Interfaces for a consumer to consume data and send in with the connection to the consumer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
body |
Any |
The data to be consumed. |
required |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the consuming of the data failed. |
Source code in aas_middleware\connect\connectors\connector.py
disconnect(self)
async
Provider (Protocol)
Source code in aas_middleware\connect\connectors\connector.py
@runtime_checkable
class Provider(Protocol):
async def connect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def disconnect(self):
"""
Raises:
ConnectionError: If the connection to the server could not be established.
"""
...
async def provide(self) -> Any:
"""
Interfaces for a provider to provide data.
Returns:
Any: The data to be provided.
Raises:
ConnectionError: If the providing of the data failed.
"""
...
connect(self)
async
disconnect(self)
async
provide(self)
async
Interfaces for a provider to provide data.
Returns:
Type | Description |
---|---|
Any |
The data to be provided. |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the providing of the data failed. |
web_hook_client_connector
WebHookClientConnector
Class for a WebHookClientConnector that can be used to connect to a webhook and receive data from it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
web_hook_url |
str |
adress of the webhook |
required |
Attributes:
Name | Type | Description |
---|---|---|
web_hook_url |
str |
adress of the webhook server |
own_url |
Optional[str] |
own url where the webhook sends data to |
hook |
Optional[anyio.Event] |
hook event for posts of the webhook |
connected_subscriber |
Optional[str] |
adress of the subscriber that is connected to the webhook |
received_data |
str |
data received from the webhook |
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
class WebHookClientConnector:
"""
Class for a WebHookClientConnector that can be used to connect to a webhook and receive data from it.
Args:
web_hook_url (str): adress of the webhook
Attributes:
web_hook_url (str): adress of the webhook server
own_url (Optional[str]): own url where the webhook sends data to
hook (Optional[anyio.Event]): hook event for posts of the webhook
connected_subscriber (Optional[str]): adress of the subscriber that is connected to the webhook
received_data (str): data received from the webhook
"""
def __init__(self, web_hook_url: str):
"""
Args:
web_hook_url (str): adress of the webhook
"""
self.web_hook_url = web_hook_url
self.own_url: Optional[str] = None
self.hook: Optional[anyio.Event] = None
self.connected_subscriber: Optional[str] = None
self.received_data: str = ""
async def set_hook(self, hook: anyio.Event, url: str):
"""
Function sets the hook event for posts of the webhook and the url where the webhook should post to.
"""
# TODO: set this hook when adding the hook to a provider so that it is linked to fastAPI post route of the datamodel
# either to this with a callback or these events.
self.hook = hook
self.own_url = url
async def trigger_hook(self, body: str):
"""
Function triggers the hook for the webhook.
"""
if not self.hook:
raise HTTPException(
status_code=400,
detail="Webhook does not have a hook set. Please use the set_hook function to set the hook.",
)
self.received_data = body
self.hook.set()
async def connect(self):
"""
Function connects to the webhook with a post request that sends the middleware url to the webhook.
"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.web_hook_url, json={"url": self.own_url}
) as response:
status = response.status
if status != 200:
raise HTTPException(
status_code=400,
detail="Webhook could not connect to the webhook server.",
)
# TODO: add logging for somethink like this!
await response.text()
async def disconnect(self):
"""
Function disconnects from the webhook.
"""
# TODO: either remove the hookevent from the provider or set it to false or
# remove the callback from the endpoint of the provider
async def consume(self, body: str) -> str:
raise NotImplementedError(
"WebHookClientConnector does not support sending data but only receiving data. Try to use the WebHookServerConnector instead."
)
async def provide(self) -> str:
"""
Function receives data from the webhook.
"""
if not self.hook:
raise Exception(
"WebHookClientConnector does not have a hook set. Please use the set_hook function to set the hook."
)
await self.hook.wait()
self.hook = anyio.Event()
return self.received_data
__init__(self, web_hook_url)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
web_hook_url |
str |
adress of the webhook |
required |
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
connect(self)
async
Function connects to the webhook with a post request that sends the middleware url to the webhook.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def connect(self):
"""
Function connects to the webhook with a post request that sends the middleware url to the webhook.
"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.web_hook_url, json={"url": self.own_url}
) as response:
status = response.status
if status != 200:
raise HTTPException(
status_code=400,
detail="Webhook could not connect to the webhook server.",
)
# TODO: add logging for somethink like this!
await response.text()
disconnect(self)
async
Function disconnects from the webhook.
provide(self)
async
Function receives data from the webhook.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
set_hook(self, hook, url)
async
Function sets the hook event for posts of the webhook and the url where the webhook should post to.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
async def set_hook(self, hook: anyio.Event, url: str):
"""
Function sets the hook event for posts of the webhook and the url where the webhook should post to.
"""
# TODO: set this hook when adding the hook to a provider so that it is linked to fastAPI post route of the datamodel
# either to this with a callback or these events.
self.hook = hook
self.own_url = url
trigger_hook(self, body)
async
Function triggers the hook for the webhook.
Source code in aas_middleware\connect\connectors\web_hook_client_connector.py
web_hook_server_connector
WebHookServerConnector
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
class WebHookServerConnector:
def __init__(self):
self.hook: anyio.Event = anyio.Event()
self.connected_subscribers: List[str] = []
self.received_data: str = ""
self.connectable: bool = False
async def start_server(self) -> None:
"""
Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
"""
# TODO: add functionality that when added to a consumer, an endpoint is created for webhook clients to connect to.
# and one endpoint to remove client subsciptions. THe register and unregister functions should then be used.
pass
async def register_subscriber(self, url: str) -> None:
"""
Function registers a subscriber to the webhook.
Args:
url (str): _description_
"""
if not self.connectable:
raise HTTPException(status_code=400, detail="Webhook not connectable.")
self.connected_subscribers.append(url)
async def unregister_subscriber(self, url: str) -> None:
"""
Function unregisters a subscriber from the webhook.
Args:
url (str): _description_
"""
try:
self.connected_subscribers.remove(url)
except ValueError:
raise HTTPException(status_code=400, detail="Subscriber not registered.")
async def connect(self):
"""
Function allows clients to connect to the webhook.
"""
self.connectable = True
async def disconnect(self):
"""
Function disconnects all subscribed clients from the webhook.
"""
self.connectable = False
self.connected_subscribers = []
async def consume(self) -> str:
"""
Function sends data to all subscribed clients of the webhook.
Args:
body (str): _description_
Returns:
str: _description_
"""
await self.hook.wait()
if not self.connectable:
raise HTTPException(status_code=400, detail="Webhook not connectable.")
self.hook.set()
for subscriber in self.connected_subscribers:
async with aiohttp.ClientSession() as session:
async with session.post(
subscriber, data=self.received_data
) as response:
await response.text()
return "Webhook message sent"
async def provide(self) -> str:
raise NotImplementedError
connect(self)
async
consume(self)
async
Function sends data to all subscribed clients of the webhook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
body |
str |
description |
required |
Returns:
Type | Description |
---|---|
str |
description |
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def consume(self) -> str:
"""
Function sends data to all subscribed clients of the webhook.
Args:
body (str): _description_
Returns:
str: _description_
"""
await self.hook.wait()
if not self.connectable:
raise HTTPException(status_code=400, detail="Webhook not connectable.")
self.hook.set()
for subscriber in self.connected_subscribers:
async with aiohttp.ClientSession() as session:
async with session.post(
subscriber, data=self.received_data
) as response:
await response.text()
return "Webhook message sent"
disconnect(self)
async
register_subscriber(self, url)
async
Function registers a subscriber to the webhook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
description |
required |
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
start_server(self)
async
Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
async def start_server(self) -> None:
"""
Acticates an endpoint where clients can connect to with a post request that sends the url to post the webhook data to.
"""
# TODO: add functionality that when added to a consumer, an endpoint is created for webhook clients to connect to.
# and one endpoint to remove client subsciptions. THe register and unregister functions should then be used.
pass
unregister_subscriber(self, url)
async
Function unregisters a subscriber from the webhook.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
description |
required |
Source code in aas_middleware\connect\connectors\web_hook_server_connector.py
web_socket_server_connector
WebSocketServerConnector
Source code in aas_middleware\connect\connectors\web_socket_server_connector.py
class WebSocketServerConnector:
def __init__(
self, host: str, port: int, reply_function: Optional[Callable[[], str]] = None
):
self.host = host
self.port = port
self.server: Optional[websockets.WebSocketServer] = None
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.received_data_event = anyio.Event()
self.received_data: str = ""
self.reply_function = reply_function
self.connectable = False
async def start_server(self) -> None:
self.server = await websockets.serve(
self.handle_websocket, self.host, self.port
)
async def handle_websocket(
self, websocket: websockets.WebSocketServerProtocol, path: str
) -> None:
if not self.connectable:
await websocket.close()
return
self.websocket = websocket
try:
async for message in websocket:
print(f"Received message: {message}")
if isinstance(message, bytes):
message = message.decode()
self.received_data = message
self.received_data_event.set()
if self.reply_function:
await websocket.send(self.reply_function())
else:
await websocket.send("Message received")
except websockets.exceptions.ConnectionClosed:
raise HTTPException(status_code=400, detail="Websocket connection closed.")
async def connect(self) -> None:
"""
Function activates that connection to the server are allowed and accepted.
"""
if not self.server:
raise HTTPException(
status_code=400, detail="Websocket server is not started."
)
self.connectable = True
if not self.server:
await self.start_server()
async def disconnect(self) -> None:
self.connectable = False
if self.websocket:
await self.websocket.close()
self.websocket = None
if self.server:
self.server.close()
await self.server.wait_closed()
self.server = None
async def consume(self, body: str) -> Optional[str]:
if not self.server:
raise HTTPException(
status_code=400, detail="Websocket server is not started."
)
if not self.websocket:
raise HTTPException(
status_code=400, detail="Websocket not connected to websocket server."
)
await self.websocket.send(body)
async def provide(self) -> str:
await self.received_data_event.wait()
self.received_data_event = anyio.Event()
return self.received_data
connect(self)
async
Function activates that connection to the server are allowed and accepted.