Skip to content

Create, Compile, and Run a Phased Reactive System

Recap

By this point the video player has three layers of declarations:

  • node instances such as Network, QualityPolicy, and MediaSession;
  • phases such as measure, decide, drop_quality, and play;
  • transitions that say how one phase hands control to the next.

The running example: an adaptive-bitrate video player

The player samples network bandwidth, decides whether the current bitrate is sustainable, optionally lowers quality, and then plays the next chunk. The buffer and bitrate persist across ticks, so the next tick starts from the state left by the previous one.

flowchart LR
    init([init]) --> measure
    measure --> decide[decide]
    decide -->|healthy| play
    decide -->|stalling| drop_quality
    drop_quality --> play
    play --> done([⊥])

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;

    class measure measure;
    class decide decide;
    class drop_quality dropQuality;
    class play play;
Phase Nodes What happens
measure Network Sample the current bandwidth from the system clock.
decide QualityPolicy Compare projected drain rate against the buffer; set stalling.
drop_quality BitrateController Drop the target bitrate by one rung.
play Decoder, MediaSession, Logger Compute downloaded seconds, integrate the buffer, log.

At node level, the same model looks like this. Solid arrows show that one node reads another node's output; dashed arrows from state show self-reads, where a node reads its own output from the previous tick. The node colors correspond to the phase colors in the table above:

flowchart LR
    network["Network"]
    policy["QualityPolicy"]
    controller["BitrateController"]
    decoder["Decoder"]
    session["MediaSession"]
    logger["Logger"]
    controller_state(("state"))
    session_state(("state"))
    logger_state(("state"))

    network --> policy
    network --> decoder
    network --> logger
    controller --> policy
    controller --> decoder
    controller --> logger
    decoder --> session
    session --> logger
    session --> policy
    policy --> logger
    controller_state -.-> controller
    session_state -.-> session
    logger_state -.-> logger

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;
    classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;

    class network measure;
    class policy decide;
    class controller dropQuality;
    class decoder,session,logger play;
    class controller_state,session_state,logger_state state;
Full code listing: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.

Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality?  If yes, the short branch just plays the
next chunk.  If no, the long branch first lowers the target bitrate, then
plays.  Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.

Phase graph::

    measure (init) -> decide -+-[healthy]--> play -> bottom
                              |
                              +-[stalling]-> drop_quality -> play -> bottom
"""

from __future__ import annotations

from regelum import (
    Clock,
    Else,
    Goto,
    If,
    Input,
    Node,
    NodeInputs,
    NodeOutputs,
    Output,
    Phase,
    PhasedReactiveSystem,
    V,
    terminate,
)

TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0


class Network(Node):
    """Stochastic-looking but deterministic bandwidth model.

    Drops to a slow link in the middle of the run so the policy has to react,
    then recovers so the buffer can refill.  The schedule is fixed to keep the
    example reproducible.
    """

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)

    class Outputs(NodeOutputs):
        bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))

    def run(self, inputs: Inputs) -> Outputs:
        if inputs.tick < 6:
            value = 2400.0
        elif inputs.tick < 14:
            value = 600.0
        elif inputs.tick < 22:
            value = 1100.0
        else:
            value = 2400.0
        return self.Outputs(bandwidth_kbps=value)


class QualityPolicy(Node):
    """Decides whether the player is about to stall.

    The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
    lost per wall-second.  If buffered seconds will not survive
    ``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
    """

    class Inputs(NodeInputs):
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)

    class Outputs(NodeOutputs):
        stalling: bool = Output(initial=False)

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
        if drain <= 0.0:
            return self.Outputs(stalling=False)
        time_to_empty = inputs.buffer_seconds / drain
        return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)


class BitrateController(Node):
    """Owns the current target bitrate.  Drops one rung when invoked."""

    class Inputs(NodeInputs):
        current: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        value: int = Output(initial=TOP_BITRATE_KBPS)

    def run(self, inputs: Inputs) -> Outputs:
        try:
            index = BITRATE_LADDER_KBPS.index(inputs.current)
        except ValueError:
            index = len(BITRATE_LADDER_KBPS) - 1
        next_index = max(0, index - 1)
        return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])


class Decoder(Node):
    """Models how many seconds of video can be downloaded in one tick.

    With ``bandwidth_kbps`` of throughput and a video encoded at
    ``bitrate_kbps``, one wall-second of downloading produces
    ``bandwidth / bitrate`` seconds of playable content.
    """

    class Inputs(NodeInputs):
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        fetched_seconds: float

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)


class MediaSession(Node):
    """The plant.  Buffer fills with newly fetched video, drains with playback."""

    class Inputs(NodeInputs):
        previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        fetched: float = Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(NodeOutputs):
        buffer_seconds: float = Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


class Logger(Node):
    """Appends a per-tick record so the trajectory is visible after the run."""

    Sample = tuple[int, float, int, float, bool]

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
        history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)

    class Outputs(NodeOutputs):
        history: list["Logger.Sample"] = Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        record: Logger.Sample = (
            inputs.tick,
            inputs.bandwidth_kbps,
            inputs.bitrate_kbps,
            inputs.buffer_seconds,
            inputs.stalling,
        )
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)


def build_system() -> PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return PhasedReactiveSystem(
        phases=[
            Phase(
                "measure",
                nodes=(network,),
                transitions=(Goto("decide"),),
                is_initial=True,
            ),
            Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
                    Else("play", name="healthy"),
                ),
            ),
            Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(Goto("play"),),
            ),
            Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(Goto(terminate),),
            ),
        ],
    )


def main() -> None:
    system = build_system()
    print(f"compile_ok = {system.compile_report.ok}")
    print(
        "phase schedules: "
        + " | ".join(
            f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
        )
    )
    print()
    print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
    print("-----+----------+---------+-----------+--------+----------------------")
    for _ in range(30):
        records = system.step()
        path_phases: list[str] = []
        for record in records:
            if not path_phases or path_phases[-1] != record.phase:
                path_phases.append(record.phase)
        path = " -> ".join(path_phases)
        snapshot = system.snapshot()
        print(
            f"{system.read(Clock.tick):4d} | "
            f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
            f"{snapshot['BitrateController.value']:7d} | "
            f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
            f"{str(snapshot['QualityPolicy.stalling']):>6} | "
            f"{path}"
        )


if __name__ == "__main__":
    main()

Those declarations are still just Python objects. To make them executable, wrap the phase list in rg.PhasedReactiveSystem. System creation compiles the declarations into a concrete runtime model: references are resolved, nodes are scheduled, transition targets are checked, and the initial-state requirements are computed.

import regelum as rg


def build_system() -> rg.PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return rg.PhasedReactiveSystem(
        phases=[
            rg.Phase(
                "measure",
                nodes=(network,),
                transitions=(rg.Goto("decide"),),
                is_initial=True,
            ),
            rg.Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    rg.If(
                        rg.V(policy.Outputs.stalling),
                        "drop_quality",
                        name="stalling",
                    ),
                    rg.Else("play", name="healthy"),
                ),
            ),
            rg.Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(rg.Goto("play"),),
            ),
            rg.Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(rg.Goto(rg.terminate),),
            ),
        ],
    )

rg.PhasedReactiveSystem is the object you keep and run. It owns the compiled phase graph, the current runtime state, and the compile report.

Creating a system

The constructor receives the phases that define the system.

system = rg.PhasedReactiveSystem(phases=phases)

During construction, regelum compiles the system. If compilation succeeds, the returned object is ready for step(), run(), snapshot(), read(...), and reset(...). If compilation fails, the default behavior is to raise CompileError.

Use strict=False when you want to inspect a broken system instead of raising immediately:

system = rg.PhasedReactiveSystem(
    phases=phases,
    strict=False,
)

print(system.compile_report.issues)

What compilation resolves

Compilation resolves:

  • node names;
  • input sources;
  • instance connections;
  • output paths;
  • phase targets;
  • guard references;
  • phase schedules;
  • dependency edges;
  • required initial outputs.

For the video player, the report's phase_schedules shows the topologically ordered nodes per phase and minimal_initial_outputs lists the six outputs that need initial state (Network.bandwidth_kbps, BitrateController.value, QualityPolicy.stalling, MediaSession.buffer_seconds, Logger.history).

Compile report

Every rg.PhasedReactiveSystem stores a compile_report. Read it before debugging runtime behavior; it tells you what the constructor resolved and what it rejected.

system = build_system()
report = system.compile_report

print(report.ok)
print(report.issues)
print(report.warnings)
print(report.phase_schedules)
print(report.minimal_initial_outputs)
print(report.required_initial_outputs)

minimal_initial_outputs is the smallest set of outputs that must have a tick-zero value for this compiled graph. Those values may come from rg.Output(initial=...), from a callable initializer, or from a runtime initial_state override.

required_initial_outputs is the subset that still has no declared initial value. If it is non-empty, the system is telling you exactly which outputs must be provided before execution. You can use that list to build an initial_state mapping:

missing = system.compile_report.required_initial_outputs
print(missing)

system.reset(
    initial_state={
        MediaSession.Outputs.buffer_seconds: 5.0,
        BitrateController.Outputs.value: 720,
    }
)

In the video player, required_initial_outputs is empty because every required tick-zero output has either a static initial value or a callable initializer.

Use format() for a compact text report.

print(system.compile_report.format())

Common compile issues

Typical issues include:

  • input source is not connected;
  • input source is unknown;
  • class-level reference is ambiguous;
  • output path is duplicated;
  • explicit node names are duplicated;
  • output without initial value is read too early;
  • phase graph is incomplete;
  • transition target is unknown;
  • transition chain is malformed.

C1 and C3 checks

Compilation rejects cyclic dependency graphs inside a phase. This is the C1 check. For the video player, the only non-trivial intra-phase dependency chain is Decoder → MediaSession → Logger in play, which is acyclic.

For finite output domains, compilation also checks C3 by requiring exactly one enabled transition per sampled state. The branching in decide is If(V(QualityPolicy.Outputs.stalling), "drop_quality") plus Else("play"), with stalling: bool — boolean has a finite domain, so C3 is verified statically.

Runtime

After construction, the same rg.PhasedReactiveSystem object is the runtime handle. Runtime executes the compiled phase schedules and updates system state. It does not reinterpret declarations on every step.

The tick

A tick walks the phase graph from the initial phase until a transition reaches terminate. For the video player, a healthy tick visits measure → decide → play; a stalling tick visits measure → decide → drop_quality → play.

The feedback loop closes between ticks: MediaSession.buffer_seconds written in play of tick N is read by QualityPolicy in decide of tick N+1, and that read is what selects the branch.

Every system also has a built-in rg.Clock. Clock.tick is incremented after the whole tick terminates. In a discrete-only system, Clock.time advances with the tick by base_dt. When a tick contains a continuous phase, Clock.time advances immediately after that continuous phase, so later phases in the same tick can observe the new physical time. See Continuous dynamics for the ODE resolution rules.

Step order

One step() starts at the initial phase and follows transitions until the tick terminates.

For each phase:

  1. run active nodes in the compiled schedule;
  2. build each node input namespace;
  3. call run;
  4. normalize returned outputs;
  5. write outputs into state;
  6. choose the next phase from transitions.
records = system.step()

Each record contains the phase, node, inputs, and outputs. A 30-tick run of the player produces records like:

for record in records:
    print(record.phase, record.node, record.outputs)
# measure Network {'bandwidth_kbps': 600.0}
# decide  QualityPolicy {'stalling': False}
# play    Decoder {'fetched_seconds': 0.278}
# play    MediaSession {'buffer_seconds': 9.11}
# play    Logger {'history': [...]}

Running multiple ticks

Use run(steps=...) to execute several ticks.

system.run(steps=30)

Each tick starts from the initial phase again. State persists across ticks unless reset() is called.

State access

Use snapshot() to inspect current state. It returns user node outputs and committed ODE state values; system clock fields are read explicitly.

snapshot = system.snapshot()
print(snapshot["MediaSession.buffer_seconds"])
print(snapshot["BitrateController.value"])

Use read(...) when code has an output reference.

buffer = system.read(session.Outputs.buffer_seconds)
tick = system.read(rg.Clock.tick)
time = system.read(rg.Clock.time)

Reset

reset() clears runtime state and history. It then applies declared initial values and optional overrides.

system.reset()
system.reset(initial_state={MediaSession.Outputs.buffer_seconds: 5.0})

Logging nodes

A logger is just another node. It sees the values available at the point where its scheduled phase runs. The video player puts Logger last in play so it observes the buffer update from MediaSession and the freshly committed bitrate.

Rules

  • Read the compile report before debugging runtime behavior.
  • Create systems with rg.PhasedReactiveSystem(phases=[...]).
  • Use strict=False for diagnostics.
  • Resolve ambiguous class references with instance connections.
  • Add initial values only for outputs that must exist before execution.
  • Runtime follows compiled phase schedules.
  • State persists between ticks.
  • reset() clears state and history.
  • step() returns execution records.
  • snapshot() returns user-visible state; use read(rg.Clock.time) for clock fields.