openpectus.engine.engine_runner

Attributes

logger

MAX_RECONNECT_WAIT_SECONDS

RecoverState

AsyncConnectionCallback

StateChangingCallback

Classes

AsyncTimer

EngineRunner

Runner that implements error recovery and masks connection errors that may occur in the Aggregator-Engine Protocol.

Module Contents

openpectus.engine.engine_runner.logger
openpectus.engine.engine_runner.MAX_RECONNECT_WAIT_SECONDS = 10
openpectus.engine.engine_runner.RecoverState
openpectus.engine.engine_runner.AsyncConnectionCallback
openpectus.engine.engine_runner.StateChangingCallback
class openpectus.engine.engine_runner.AsyncTimer(timeout, callback)
_timeout
_callback
_task
__str__()
async _job()
start()
stop()
class openpectus.engine.engine_runner.EngineRunner(dispatcher, message_builder, emitter, loop)

Bases: openpectus.lang.exec.events.EventListener

Runner that implements error recovery and masks connection errors that may occur in the Aggregator-Engine Protocol.

Parameters:
_dispatcher
_message_builder
_state: RecoverState = 'Started'
_message_buffer: list[openpectus.protocol.engine_messages.EngineMessage] = []
_lock
_state_task: asyncio.Task[None] | None = None
_transmit_buffer_task: asyncio.Task[None] | None = None
_timer
_first_steady_state = True
_loop
connected_callback: AsyncConnectionCallback | None = None
failed_callback: AsyncConnectionCallback | None = None
catchingup_callback: AsyncConnectionCallback | None = None
reconnecting_callback: AsyncConnectionCallback | None = None
reconnected_callback: AsyncConnectionCallback | None = None
state_changing_callback: StateChangingCallback | None = None
first_steady_state_callback: AsyncConnectionCallback | None = None
__str__()
Return type:

str

on_start(run_id)

Is invoked by the Start command when method is started.

Parameters:

run_id (str)

on_stop()

Is invoked by the Stop command when method is stopped.

on_runstate_change(state_change)

Is invoked when method interpretation is complete.

on_block_start(block_info)

Invoked just after a new block is started, before on_tick, in the same engine tick.

on_notify_command(argument)
on_scope_activate(scope_info)
on_method_error(exception)

Invoked by engine when an error state is encountered which is severe enough to cause pause

on_connection_status_change(status)

Invoked by system tag Connection Status on value change

property state: RecoverState
Return type:

RecoverState

async run()
async _connect_async()
async _disconnect_async(set_state_disconnected=True)
async shutdown()
async _post_async(message)
Parameters:

message (openpectus.protocol.engine_messages.EngineMessage)

Return type:

openpectus.protocol.messages.MessageBase

async _tick()
async _set_state(state)
Parameters:

state (RecoverState)

async _send_buffered_batch()
_buffer_message(message)

Append message to the buffer

Parameters:

message (openpectus.protocol.engine_messages.EngineMessage)

_get_buffer_size()
Return type:

int

async _on_connection_established()
async _on_connected()
async buffer_messages()
async _on_failed()
async _on_reconnecting()
async _on_catchingup()
async _on_reconnected()
async steady_state_send_messages()
async on_steady_state()

Steady-State message sending loop