Skip to content

Learn

regelum is a framework for modeling Phased Reactive Systems. What that means is best understood through a concrete example, working from the top down: starting with a continuously running system, breaking it into something executable, and then descending one level at a time until we reach the primitives the framework actually asks you to write.

The running example: an adaptive-bitrate video player

Every adult has watched a video that quietly dropped from 1080p to 480p when the network slowed down. Let us think about how to model this process.

The feedback loop

A video player never finishes by itself. While you watch the current second of video, the player is also quietly downloading the next few seconds in the background, so that playback does not have to wait for the network on every frame. That backlog of already-downloaded but not yet shown video is what we will call the buffer in this example — measured in seconds of viewable content sitting in memory ahead of the playhead.

If the network is fast, the buffer grows and the player has slack. If the network slows down, the buffer shrinks; if it runs out completely, playback stalls and the user sees a spinner. The player's job is to make sure that does not happen — it watches how much buffer is left, decides whether the current quality is sustainable, lowers the quality if not, plays the next chunk, and then goes back to watching the buffer.

This is a feedback loop: the result of one pass becomes the input of the next, and the system runs forever as long as it is alive.

flowchart LR
    measure --> decide[decide]
    decide -->|healthy| play
    decide -->|stalling| drop_quality
    drop_quality --> play
    play -->|next pass| measure

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;

    class measure measure;
    class decide decide;
    class drop_quality dropQuality;
    class play play;

Breaking the cycle into ticks

A diagram with a back-edge tells us what the system is, but not how to execute it. To run it, we cut the loop into a unit of work that has a clear start and a clear end. We declare an initial point — where every pass enters the graph — and a terminator — where every pass leaves it:

flowchart LR
    init([init]) --> measure
    measure --> decide[decide]
    decide -->|healthy| play
    decide -->|stalling| drop_quality
    drop_quality --> play
    play --> done([⊥])

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;

    class measure measure;
    class decide decide;
    class drop_quality dropQuality;
    class play play;

One such pass through the graph — from the initial point to — is a tick. A tick is the unit of execution.

is not the end of the system. It is the end of one tick. The original feedback loop is recovered by running tick after tick, with the state values from the previous tick carried over into the next. The cycle now lives outside the graph — between successive ticks — instead of being drawn as an explicit edge.

The high-level graph: phases and transitions

The graph above is still high-level: it tells us the shape of one tick, not yet what each box actually does.

The boxes (measure, decide, drop_quality, play) are called phases. A phase is a labelled stage of one tick.

Each arrow leaving a phase carries a predicate that is evaluated at runtime to decide whether that arrow fires:

  • if a phase has one outgoing arrow, its predicate is trivially true — the arrow always fires.
  • if a phase has several outgoing arrows, the predicates must be mutually exclusive — exactly one fires per tick.

In other words, every arrow is conditional; an unconditional transition is just the special case where the condition is True. This unifies sequential flow and branching under a single rule: at the end of each phase, evaluate the predicates and follow the one that matches.

In the player, decide has two outgoing arrows:

  • If(stalling)drop_quality
  • Elseplay

while measure, drop_quality, and play each have a single arrow that always fires.

Phases up close: nodes

So far we have only described the shape of one tick: which phases exist, which arrows connect them, which predicates gate the arrows. That is enough to draw a diagram, but not enough to actually execute anything. To execute the system we need to make the high-level boxes concrete: what variables exist, who reads them, who writes them, and what computation happens inside each phase.

A phase is a high-level story. It can be arbitrarily complex, and it is built out of smaller primitives called nodes. A node is an atomic unit of computation.

Every node has two kinds of variables:

  • inputs — variables the node reads. An input is always the output of some other node (or the node's own output from a previous tick — that is how persistent state and feedback are expressed).
  • outputs — variables the node writes. Each output is owned by exactly one node, so there is never any ambiguity about who produced a given value.

Alongside inputs and outputs, a node defines a run method. This method is the node's basic computation: it receives the node's inputs as arguments and returns the node's outputs. In other words, inputs and outputs describe the data boundary, while run contains the logic that transforms the current input values into the next output values.

Inside a phase, several nodes can be active. They are scheduled in topological order from their input/output dependencies, so that every read sees a freshly written value when there is one.

For the video player, here is how the high-level phases decompose into nodes:

Phase Nodes What happens
measure Network Sample the current bandwidth from the system clock.
decide QualityPolicy Compare projected drain rate against the buffer; set stalling.
drop_quality BitrateController Drop the target bitrate by one rung.
play Decoder, MediaSession, Logger Compute downloaded seconds, integrate the buffer, log.

The same system at node level looks like this. Solid arrows show that one node reads another node's output; dashed arrows from state show self-reads, where a node reads its own output from the previous tick. The node colors correspond to the phase colors in the table above:

flowchart LR
    network["Network"]
    policy["QualityPolicy"]
    controller["BitrateController"]
    decoder["Decoder"]
    session["MediaSession"]
    logger["Logger"]
    controller_state(("state"))
    session_state(("state"))
    logger_state(("state"))

    network --> policy
    network --> decoder
    network --> logger
    controller --> policy
    controller --> decoder
    controller --> logger
    decoder --> session
    session --> logger
    session --> policy
    policy --> logger
    controller_state -.-> controller
    session_state -.-> session
    logger_state -.-> logger

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;
    classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;

    class network measure;
    class policy decide;
    class controller dropQuality;
    class decoder,session,logger play;
    class controller_state,session_state,logger_state state;

This is where the framework's actual work happens: writing node classes, declaring their inputs and outputs, assigning instances to phases, and attaching predicates to transitions.

Full code listing: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.

Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality?  If yes, the short branch just plays the
next chunk.  If no, the long branch first lowers the target bitrate, then
plays.  Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.

Phase graph::

    measure (init) -> decide -+-[healthy]--> play -> bottom
                              |
                              +-[stalling]-> drop_quality -> play -> bottom
"""

from __future__ import annotations

from regelum import (
    Clock,
    Else,
    Goto,
    If,
    Input,
    Node,
    NodeInputs,
    NodeOutputs,
    Output,
    Phase,
    PhasedReactiveSystem,
    V,
    terminate,
)

TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0


class Network(Node):
    """Stochastic-looking but deterministic bandwidth model.

    Drops to a slow link in the middle of the run so the policy has to react,
    then recovers so the buffer can refill.  The schedule is fixed to keep the
    example reproducible.
    """

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)

    class Outputs(NodeOutputs):
        bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))

    def run(self, inputs: Inputs) -> Outputs:
        if inputs.tick < 6:
            value = 2400.0
        elif inputs.tick < 14:
            value = 600.0
        elif inputs.tick < 22:
            value = 1100.0
        else:
            value = 2400.0
        return self.Outputs(bandwidth_kbps=value)


class QualityPolicy(Node):
    """Decides whether the player is about to stall.

    The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
    lost per wall-second.  If buffered seconds will not survive
    ``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
    """

    class Inputs(NodeInputs):
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)

    class Outputs(NodeOutputs):
        stalling: bool = Output(initial=False)

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
        if drain <= 0.0:
            return self.Outputs(stalling=False)
        time_to_empty = inputs.buffer_seconds / drain
        return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)


class BitrateController(Node):
    """Owns the current target bitrate.  Drops one rung when invoked."""

    class Inputs(NodeInputs):
        current: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        value: int = Output(initial=TOP_BITRATE_KBPS)

    def run(self, inputs: Inputs) -> Outputs:
        try:
            index = BITRATE_LADDER_KBPS.index(inputs.current)
        except ValueError:
            index = len(BITRATE_LADDER_KBPS) - 1
        next_index = max(0, index - 1)
        return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])


class Decoder(Node):
    """Models how many seconds of video can be downloaded in one tick.

    With ``bandwidth_kbps`` of throughput and a video encoded at
    ``bitrate_kbps``, one wall-second of downloading produces
    ``bandwidth / bitrate`` seconds of playable content.
    """

    class Inputs(NodeInputs):
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        fetched_seconds: float

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)


class MediaSession(Node):
    """The plant.  Buffer fills with newly fetched video, drains with playback."""

    class Inputs(NodeInputs):
        previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        fetched: float = Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(NodeOutputs):
        buffer_seconds: float = Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


class Logger(Node):
    """Appends a per-tick record so the trajectory is visible after the run."""

    Sample = tuple[int, float, int, float, bool]

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
        history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)

    class Outputs(NodeOutputs):
        history: list["Logger.Sample"] = Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        record: Logger.Sample = (
            inputs.tick,
            inputs.bandwidth_kbps,
            inputs.bitrate_kbps,
            inputs.buffer_seconds,
            inputs.stalling,
        )
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)


def build_system() -> PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return PhasedReactiveSystem(
        phases=[
            Phase(
                "measure",
                nodes=(network,),
                transitions=(Goto("decide"),),
                is_initial=True,
            ),
            Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
                    Else("play", name="healthy"),
                ),
            ),
            Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(Goto("play"),),
            ),
            Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(Goto(terminate),),
            ),
        ],
    )


def main() -> None:
    system = build_system()
    print(f"compile_ok = {system.compile_report.ok}")
    print(
        "phase schedules: "
        + " | ".join(
            f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
        )
    )
    print()
    print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
    print("-----+----------+---------+-----------+--------+----------------------")
    for _ in range(30):
        records = system.step()
        path_phases: list[str] = []
        for record in records:
            if not path_phases or path_phases[-1] != record.phase:
                path_phases.append(record.phase)
        path = " -> ".join(path_phases)
        snapshot = system.snapshot()
        print(
            f"{system.read(Clock.tick):4d} | "
            f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
            f"{snapshot['BitrateController.value']:7d} | "
            f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
            f"{str(snapshot['QualityPolicy.stalling']):>6} | "
            f"{path}"
        )


if __name__ == "__main__":
    main()

Where to go next

The remaining pages in this section walk through the model from the bottom up — same model, opposite direction:

  1. Nodes — declaring typed inputs, typed outputs, connections, initialization, and a run method.
  2. Phases — assembling node instances into phases, declaring transitions, and reading the compiled schedule.
  3. Continuous dynamics — declaring ODENode state, grouping ODE systems, and understanding how continuous phases update time.
  4. Create, compile, and run — constructing rg.PhasedReactiveSystem, reading the compile report, and executing ticks.