openpectus.protocol.aggregator_dispatcher
Attributes
Specific handler for register messages from engine |
|
Specific handler for engine disconnect notifications from dispatcher |
|
Specific handler for engine disconnect notifications from dispatcher |
|
Handler in aggregator that handles engine messages of a given type |
|
Classes
Aggregator dispatcher for the Aggregator-Engine Protocol using REST + WebSocket RPC. |
Module Contents
- openpectus.protocol.aggregator_dispatcher.logger
- openpectus.protocol.aggregator_dispatcher.RegisterHandler
Specific handler for register messages from engine
- openpectus.protocol.aggregator_dispatcher.EngineConnectHandler
Specific handler for engine disconnect notifications from dispatcher
- openpectus.protocol.aggregator_dispatcher.EngineDisconnectHandler
Specific handler for engine disconnect notifications from dispatcher
- openpectus.protocol.aggregator_dispatcher.AggregatorMessageHandler
Handler in aggregator that handles engine messages of a given type
- openpectus.protocol.aggregator_dispatcher.WEBSOCKET_RPC_TIMEOUT_SECS: float | None = 2.0
- class openpectus.protocol.aggregator_dispatcher.AggregatorDispatcher
Aggregator dispatcher for the Aggregator-Engine Protocol using REST + WebSocket RPC. Allows receiving message via HTTP POST and sending messages via JSON-RPC.
- _handlers: dict[type, AggregatorMessageHandler]
- _register_handler: RegisterHandler | None = None
- _connect_handler: EngineConnectHandler | None = None
- _disconnect_handler: EngineDisconnectHandler | None = None
- router
- _engine_id_channel_map: dict[str, fastapi_websocket_rpc.RpcChannel]
- _on_client_connect_tasks
- endpoint
- __str__()
- _register_post_route(router)
Set up post route to handle RegisterEngineMsg which must be handled before websocket can be established
- Parameters:
router (fastapi.APIRouter | fastapi.FastAPI)
- async _on_delayed_client_connect(channel)
We ‘delay’ our on_connect callback because the WebsocketRPCEndpoint calls on_connect callbacks before it starts listening to responses to rpc calls. When we use create_task(), in on_client_connect() below, to call this method, we ensure that WebsocketRPCEndpoint starts listening to responses before we call get_engine_id_async() over rpc.
- Parameters:
channel (fastapi_websocket_rpc.RpcChannel)
- async on_client_connect(channel)
- Parameters:
channel (fastapi_websocket_rpc.RpcChannel)
- async on_client_disconnect(channel)
- Parameters:
channel (fastapi_websocket_rpc.RpcChannel)
- has_connected_engine_id(engine_id)
Return a value indicating whether the engine_id is known. Abstract method.
- Parameters:
engine_id (str)
- Return type:
bool
- async rpc_call(engine_id, message)
- Parameters:
engine_id (str)
message (openpectus.protocol.messages.MessageBase)
- Return type:
- async dispatch_message(message)
Dispatch incoming message to registered handler.
- Parameters:
- Return type:
- set_message_handler(message_type, handler)
Set handler for message_type.
- Parameters:
message_type (type[openpectus.protocol.dispatch_interface.EngineMessageType])
handler (AggregatorMessageHandler)
- set_register_handler(handler)
- Parameters:
handler (RegisterHandler)
- set_connect_handler(handler)
- Parameters:
handler (EngineConnectHandler)
- set_disconnect_handler(handler)
- Parameters:
handler (EngineDisconnectHandler)
- async shutdown()