openpectus.aggregator.data.models

Attributes

ProcessValueValueType

Classes

TZDateTime

Allows the creation of types which add additional functionality

DBModel

Base model for data entity classes.

RecentEngine

Base model for data entity classes.

RecentRun

Represents a historical run of a process unit.

RecentRunMethodAndState

Base model for data entity classes.

RecentRunRunLog

Base model for data entity classes.

RecentRunErrorLog

Base model for data entity classes.

RecentRunPlotConfiguration

Base model for data entity classes.

PlotLogEntryValue

Base model for data entity classes.

ProcessValueType

Enum where members are also (and must be) strings

PlotLogEntry

Base model for data entity classes.

PlotLog

Base model for data entity classes.

WebPushNotificationPreferences

Base model for data entity classes.

WebPushSubscription

Base model for data entity classes.

Functions

get_ProcessValueType_from_value(value)

Module Contents

class openpectus.aggregator.data.models.TZDateTime(*args, **kwargs)

Bases: sqlalchemy.TypeDecorator[datetime.datetime]

Allows the creation of types which add additional functionality to an existing type.

This method is preferred to direct subclassing of SQLAlchemy’s built-in types as it ensures that all required functionality of the underlying type is kept in place.

Typical usage:

import sqlalchemy.types as types


class MyType(types.TypeDecorator):
    """Prefixes Unicode values with "PREFIX:" on the way in and
    strips it off on the way out.
    """

    impl = types.Unicode

    cache_ok = True

    def process_bind_param(self, value, dialect):
        return "PREFIX:" + value

    def process_result_value(self, value, dialect):
        return value[7:]

    def copy(self, **kw):
        return MyType(self.impl.length)

The class-level impl attribute is required, and can reference any TypeEngine class. Alternatively, the load_dialect_impl() method can be used to provide different type classes based on the dialect given; in this case, the impl variable can reference TypeEngine as a placeholder.

The TypeDecorator.cache_ok class-level flag indicates if this custom TypeDecorator is safe to be used as part of a cache key. This flag defaults to None which will initially generate a warning when the SQL compiler attempts to generate a cache key for a statement that uses this type. If the TypeDecorator is not guaranteed to produce the same bind/result behavior and SQL generation every time, this flag should be set to False; otherwise if the class produces the same behavior each time, it may be set to True. See TypeDecorator.cache_ok for further notes on how this works.

Types that receive a Python type that isn’t similar to the ultimate type used may want to define the TypeDecorator.coerce_compared_value() method. This is used to give the expression system a hint when coercing Python objects into bind parameters within expressions. Consider this expression:

mytable.c.somecol + datetime.date(2009, 5, 15)

Above, if “somecol” is an Integer variant, it makes sense that we’re doing date arithmetic, where above is usually interpreted by databases as adding a number of days to the given date. The expression system does the right thing by not attempting to coerce the “date()” value into an integer-oriented bind parameter.

However, in the case of TypeDecorator, we are usually changing an incoming Python type to something new - TypeDecorator by default will “coerce” the non-typed side to be the same type as itself. Such as below, we define an “epoch” type that stores a date value as an integer:

class MyEpochType(types.TypeDecorator):
    impl = types.Integer

    cache_ok = True

    epoch = datetime.date(1970, 1, 1)

    def process_bind_param(self, value, dialect):
        return (value - self.epoch).days

    def process_result_value(self, value, dialect):
        return self.epoch + timedelta(days=value)

Our expression of somecol + date with the above type will coerce the “date” on the right side to also be treated as MyEpochType.

This behavior can be overridden via the coerce_compared_value() method, which returns a type that should be used for the value of the expression. Below we set it such that an integer value will be treated as an Integer, and any other value is assumed to be a date and will be treated as a MyEpochType:

def coerce_compared_value(self, op, value):
    if isinstance(value, int):
        return Integer()
    else:
        return self

Warning

Note that the behavior of coerce_compared_value is not inherited by default from that of the base type. If the TypeDecorator is augmenting a type that requires special logic for certain types of operators, this method must be overridden. A key example is when decorating the _postgresql.JSON and _postgresql.JSONB types; the default rules of TypeEngine.coerce_compared_value() should be used in order to deal with operators like index operations:

from sqlalchemy import JSON
from sqlalchemy import TypeDecorator


class MyJsonType(TypeDecorator):
    impl = JSON

    cache_ok = True

    def coerce_compared_value(self, op, value):
        return self.impl.coerce_compared_value(op, value)

Without the above step, index operations such as mycol['foo'] will cause the index value 'foo' to be JSON encoded.

Similarly, when working with the ARRAY datatype, the type coercion for index operations (e.g. mycol[5]) is also handled by TypeDecorator.coerce_compared_value(), where again a simple override is sufficient unless special rules are needed for particular operators:

from sqlalchemy import ARRAY
from sqlalchemy import TypeDecorator


class MyArrayType(TypeDecorator):
    impl = ARRAY

    cache_ok = True

    def coerce_compared_value(self, op, value):
        return self.impl.coerce_compared_value(op, value)
Parameters:
  • args (Any)

  • kwargs (Any)

impl
cache_ok = True

Indicate if statements using this ExternalType are “safe to cache”.

The default value None will emit a warning and then not allow caching of a statement which includes this type. Set to False to disable statements using this type from being cached at all without a warning. When set to True, the object’s class and selected elements from its state will be used as part of the cache key. For example, using a TypeDecorator:

class MyType(TypeDecorator):
    impl = String

    cache_ok = True

    def __init__(self, choices):
        self.choices = tuple(choices)
        self.internal_only = True

The cache key for the above type would be equivalent to:

>>> MyType(["a", "b", "c"])._static_cache_key
(<class '__main__.MyType'>, ('choices', ('a', 'b', 'c')))

The caching scheme will extract attributes from the type that correspond to the names of parameters in the __init__() method. Above, the “choices” attribute becomes part of the cache key but “internal_only” does not, because there is no parameter named “internal_only”.

The requirements for cacheable elements is that they are hashable and also that they indicate the same SQL rendered for expressions using this type every time for a given cache value.

To accommodate for datatypes that refer to unhashable structures such as dictionaries, sets and lists, these objects can be made “cacheable” by assigning hashable structures to the attributes whose names correspond with the names of the arguments. For example, a datatype which accepts a dictionary of lookup values may publish this as a sorted series of tuples. Given a previously un-cacheable type as:

class LookupType(UserDefinedType):
    """a custom type that accepts a dictionary as a parameter.

    this is the non-cacheable version, as "self.lookup" is not
    hashable.

    """

    def __init__(self, lookup):
        self.lookup = lookup

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect): ...  # works with "self.lookup" ...

Where “lookup” is a dictionary. The type will not be able to generate a cache key:

>>> type_ = LookupType({"a": 10, "b": 20})
>>> type_._static_cache_key
<stdin>:1: SAWarning: UserDefinedType LookupType({'a': 10, 'b': 20}) will not
produce a cache key because the ``cache_ok`` flag is not set to True.
Set this flag to True if this type object's state is safe to use
in a cache key, or False to disable this warning.
symbol('no_cache')

If we did set up such a cache key, it wouldn’t be usable. We would get a tuple structure that contains a dictionary inside of it, which cannot itself be used as a key in a “cache dictionary” such as SQLAlchemy’s statement cache, since Python dictionaries aren’t hashable:

>>> # set cache_ok = True
>>> type_.cache_ok = True

>>> # this is the cache key it would generate
>>> key = type_._static_cache_key
>>> key
(<class '__main__.LookupType'>, ('lookup', {'a': 10, 'b': 20}))

>>> # however this key is not hashable, will fail when used with
>>> # SQLAlchemy statement cache
>>> some_cache = {key: "some sql value"}
Traceback (most recent call last): File "<stdin>", line 1,
in <module> TypeError: unhashable type: 'dict'

The type may be made cacheable by assigning a sorted tuple of tuples to the “.lookup” attribute:

class LookupType(UserDefinedType):
    """a custom type that accepts a dictionary as a parameter.

    The dictionary is stored both as itself in a private variable,
    and published in a public variable as a sorted tuple of tuples,
    which is hashable and will also return the same value for any
    two equivalent dictionaries.  Note it assumes the keys and
    values of the dictionary are themselves hashable.

    """

    cache_ok = True

    def __init__(self, lookup):
        self._lookup = lookup

        # assume keys/values of "lookup" are hashable; otherwise
        # they would also need to be converted in some way here
        self.lookup = tuple((key, lookup[key]) for key in sorted(lookup))

    def get_col_spec(self, **kw):
        return "VARCHAR(255)"

    def bind_processor(self, dialect): ...  # works with "self._lookup" ...

Where above, the cache key for LookupType({"a": 10, "b": 20}) will be:

>>> LookupType({"a": 10, "b": 20})._static_cache_key
(<class '__main__.LookupType'>, ('lookup', (('a', 10), ('b', 20))))

Added in version 1.4.14: - added the cache_ok flag to allow some configurability of caching for TypeDecorator classes.

Added in version 1.4.28: - added the ExternalType mixin which generalizes the cache_ok flag to both the TypeDecorator and UserDefinedType classes.

See also

sql_caching

process_bind_param(value, dialect)

Receive a bound parameter value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for incoming data values. This method is called at statement execution time and is passed the literal Python data value which is to be associated with a bound parameter in the statement.

The operation could be anything desired to perform custom behavior, such as transforming or serializing data. This could also be used as a hook for validating logic.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_result_value()

process_result_value(value, dialect)

Receive a result-row column value to be converted.

Custom subclasses of _types.TypeDecorator should override this method to provide custom behaviors for data values being received in result rows coming from the database. This method is called at result fetching time and is passed the literal Python data value that’s extracted from a database result row.

The operation could be anything desired to perform custom behavior, such as transforming or deserializing data.

Parameters:
  • value – Data to operate upon, of any type expected by this method in the subclass. Can be None.

  • dialect – the Dialect in use.

See also

types_typedecorator

_types.TypeDecorator.process_bind_param()

class openpectus.aggregator.data.models.DBModel

Bases: sqlalchemy.orm.DeclarativeBase

Base model for data entity classes.

id: sqlalchemy.orm.Mapped[int]
metadata
class openpectus.aggregator.data.models.RecentEngine

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'RecentEngines'
engine_id: sqlalchemy.orm.Mapped[str]
run_id: sqlalchemy.orm.Mapped[str | None]
run_started: sqlalchemy.orm.Mapped[datetime.datetime | None]
run_stopped: sqlalchemy.orm.Mapped[datetime.datetime | None]
name: sqlalchemy.orm.Mapped[str]
system_state: sqlalchemy.orm.Mapped[str]
location: sqlalchemy.orm.Mapped[str]
last_update: sqlalchemy.orm.Mapped[datetime.datetime]
contributors: sqlalchemy.orm.Mapped[list[openpectus.aggregator.models.Contributor]]
required_roles: sqlalchemy.orm.Mapped[list[str]]
class openpectus.aggregator.data.models.RecentRun

Bases: DBModel

Represents a historical run of a process unit.

__tablename__ = 'RecentRuns'
engine_id: sqlalchemy.orm.Mapped[str]
run_id: sqlalchemy.orm.Mapped[str]
engine_computer_name: sqlalchemy.orm.Mapped[str]
engine_version: sqlalchemy.orm.Mapped[str]
engine_hardware_str: sqlalchemy.orm.Mapped[str]
aggregator_computer_name: sqlalchemy.orm.Mapped[str]
aggregator_version: sqlalchemy.orm.Mapped[str]
uod_name: sqlalchemy.orm.Mapped[str]
uod_filename: sqlalchemy.orm.Mapped[str]
uod_author_name: sqlalchemy.orm.Mapped[str]
uod_author_email: sqlalchemy.orm.Mapped[str]
started_date: sqlalchemy.orm.Mapped[datetime.datetime]
completed_date: sqlalchemy.orm.Mapped[datetime.datetime]
contributors: sqlalchemy.orm.Mapped[list[openpectus.aggregator.models.Contributor]]
required_roles: sqlalchemy.orm.Mapped[list[str]]
plot_log: sqlalchemy.orm.Mapped[PlotLog]
plot_configuration: sqlalchemy.orm.Mapped[RecentRunPlotConfiguration]
error_log: sqlalchemy.orm.Mapped[RecentRunErrorLog]
run_log: sqlalchemy.orm.Mapped[RecentRunRunLog]
method_and_state: sqlalchemy.orm.Mapped[RecentRunMethodAndState]
archive: sqlalchemy.orm.Mapped[str | None]
archive_filename: sqlalchemy.orm.Mapped[str | None]
class openpectus.aggregator.data.models.RecentRunMethodAndState

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'RecentRunMethodAndStates'
run_id: sqlalchemy.orm.Mapped[str]
method: sqlalchemy.orm.Mapped[openpectus.aggregator.models.Method]
state: sqlalchemy.orm.Mapped[openpectus.aggregator.models.MethodState]
recent_run: sqlalchemy.orm.Mapped[RecentRun]
class openpectus.aggregator.data.models.RecentRunRunLog

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'RecentRunRunLogs'
run_id: sqlalchemy.orm.Mapped[str]
run_log: sqlalchemy.orm.Mapped[openpectus.aggregator.models.RunLog]
recent_run: sqlalchemy.orm.Mapped[RecentRun]
class openpectus.aggregator.data.models.RecentRunErrorLog

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'RecentRunErrorLogs'
run_id: sqlalchemy.orm.Mapped[str]
error_log: sqlalchemy.orm.Mapped[openpectus.aggregator.models.AggregatedErrorLog]
recent_run: sqlalchemy.orm.Mapped[RecentRun]
class openpectus.aggregator.data.models.RecentRunPlotConfiguration

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'RecentRunPlotConfigurations'
run_id: sqlalchemy.orm.Mapped[str]
plot_configuration: sqlalchemy.orm.Mapped[openpectus.aggregator.models.PlotConfiguration]
recent_run: sqlalchemy.orm.Mapped[RecentRun]
class openpectus.aggregator.data.models.PlotLogEntryValue

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'PlotLogEntryValues'
plot_log_entry_id: sqlalchemy.orm.Mapped[int]
tick_time: sqlalchemy.orm.Mapped[float]
value_str: sqlalchemy.orm.Mapped[str | None]
value_float: sqlalchemy.orm.Mapped[float | None]
value_int: sqlalchemy.orm.Mapped[int | None]
property value
openpectus.aggregator.data.models.ProcessValueValueType
class openpectus.aggregator.data.models.ProcessValueType

Bases: enum.StrEnum

Enum where members are also (and must be) strings

STRING
FLOAT
INT
CHOICE
NONE
openpectus.aggregator.data.models.get_ProcessValueType_from_value(value)
Parameters:

value (ProcessValueValueType)

Return type:

ProcessValueType

class openpectus.aggregator.data.models.PlotLogEntry

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'PlotLogEntries'
name: sqlalchemy.orm.Mapped[str]
values: sqlalchemy.orm.Mapped[list[PlotLogEntryValue]]
value_unit: sqlalchemy.orm.Mapped[str | None]
value_type: sqlalchemy.orm.Mapped[ProcessValueType]
plot_log_id: sqlalchemy.orm.Mapped[int]
plot_log: sqlalchemy.orm.Mapped[PlotLog]
class openpectus.aggregator.data.models.PlotLog

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'PlotLogs'
engine_id: sqlalchemy.orm.Mapped[str]
run_id: sqlalchemy.orm.Mapped[str]
entries: sqlalchemy.orm.Mapped[Dict[str, PlotLogEntry]]
recent_run: sqlalchemy.orm.Mapped[RecentRun]
class openpectus.aggregator.data.models.WebPushNotificationPreferences

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'WebPushNotificationPreferences'
user_id: sqlalchemy.orm.Mapped[str]
user_roles: sqlalchemy.orm.Mapped[list[str]]
scope: sqlalchemy.orm.Mapped[openpectus.aggregator.models.NotificationScope]
topics: sqlalchemy.orm.Mapped[list[openpectus.aggregator.models.NotificationTopic]]
process_units: sqlalchemy.orm.Mapped[list[str]]
webpush_subscriptions: sqlalchemy.orm.Mapped[list[WebPushSubscription]]
class openpectus.aggregator.data.models.WebPushSubscription

Bases: DBModel

Base model for data entity classes.

__tablename__ = 'WebPushSubscriptions'
user_id: sqlalchemy.orm.Mapped[str]
endpoint: sqlalchemy.orm.Mapped[str]
auth: sqlalchemy.orm.Mapped[str]
p256dh: sqlalchemy.orm.Mapped[str]
webpush_notification_preferences: sqlalchemy.orm.Mapped[WebPushNotificationPreferences]