openpectus.protocol.aggregator_dispatcher

Attributes

logger

RegisterHandler

Specific handler for register messages from engine

EngineConnectHandler

Specific handler for engine disconnect notifications from dispatcher

EngineDisconnectHandler

Specific handler for engine disconnect notifications from dispatcher

AggregatorMessageHandler

Handler in aggregator that handles engine messages of a given type

WEBSOCKET_RPC_TIMEOUT_SECS

Classes

AggregatorDispatcher

Aggregator dispatcher for the Aggregator-Engine Protocol using REST + WebSocket RPC.

Module Contents

openpectus.protocol.aggregator_dispatcher.logger
openpectus.protocol.aggregator_dispatcher.RegisterHandler

Specific handler for register messages from engine

openpectus.protocol.aggregator_dispatcher.EngineConnectHandler

Specific handler for engine disconnect notifications from dispatcher

openpectus.protocol.aggregator_dispatcher.EngineDisconnectHandler

Specific handler for engine disconnect notifications from dispatcher

openpectus.protocol.aggregator_dispatcher.AggregatorMessageHandler

Handler in aggregator that handles engine messages of a given type

openpectus.protocol.aggregator_dispatcher.WEBSOCKET_RPC_TIMEOUT_SECS: float | None = 2.0
class openpectus.protocol.aggregator_dispatcher.AggregatorDispatcher

Aggregator dispatcher for the Aggregator-Engine Protocol using REST + WebSocket RPC. Allows receiving message via HTTP POST and sending messages via JSON-RPC.

_handlers: dict[type, AggregatorMessageHandler]
_register_handler: RegisterHandler | None = None
_connect_handler: EngineConnectHandler | None = None
_disconnect_handler: EngineDisconnectHandler | None = None
router
_engine_id_channel_map: dict[str, fastapi_websocket_rpc.RpcChannel]
_on_client_connect_tasks
endpoint
__str__()
_register_post_route(router)

Set up post route to handle RegisterEngineMsg which must be handled before websocket can be established

Parameters:

router (fastapi.APIRouter | fastapi.FastAPI)

async _on_delayed_client_connect(channel)

We ‘delay’ our on_connect callback because the WebsocketRPCEndpoint calls on_connect callbacks before it starts listening to responses to rpc calls. When we use create_task(), in on_client_connect() below, to call this method, we ensure that WebsocketRPCEndpoint starts listening to responses before we call get_engine_id_async() over rpc.

Parameters:

channel (fastapi_websocket_rpc.RpcChannel)

async on_client_connect(channel)
Parameters:

channel (fastapi_websocket_rpc.RpcChannel)

async on_client_disconnect(channel)
Parameters:

channel (fastapi_websocket_rpc.RpcChannel)

has_connected_engine_id(engine_id)

Return a value indicating whether the engine_id is known. Abstract method.

Parameters:

engine_id (str)

Return type:

bool

async rpc_call(engine_id, message)
Parameters:
Return type:

openpectus.protocol.messages.MessageBase

async dispatch_message(message)

Dispatch incoming message to registered handler.

Parameters:

message (openpectus.protocol.engine_messages.EngineMessage)

Return type:

openpectus.protocol.messages.MessageBase

set_message_handler(message_type, handler)

Set handler for message_type.

Parameters:
  • message_type (type[openpectus.protocol.dispatch_interface.EngineMessageType])

  • handler (AggregatorMessageHandler)

set_register_handler(handler)
Parameters:

handler (RegisterHandler)

set_connect_handler(handler)
Parameters:

handler (EngineConnectHandler)

set_disconnect_handler(handler)
Parameters:

handler (EngineDisconnectHandler)

async shutdown()