Skip to content

Nodes

Recap

In the overview we took the adaptive-bitrate video player and turned it into a phase graph.

The running example: an adaptive-bitrate video player

Every adult has watched a video that quietly dropped from 1080p to 480p when the network slowed down. The player keeps a few seconds of pre-downloaded video in a buffer; when the network slows it must lower the target quality before the buffer runs out.

flowchart LR
    init([init]) --> measure
    measure --> decide[decide]
    decide -->|healthy| play
    decide -->|stalling| drop_quality
    drop_quality --> play
    play --> done([⊥])

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;

    class measure measure;
    class decide decide;
    class drop_quality dropQuality;
    class play play;

One pass through this graph from init to is a tick. The graph is high-level; each phase is itself made of smaller pieces called nodes, distributed like this:

Phase Nodes What happens
measure Network Sample the current bandwidth from the system clock.
decide QualityPolicy Compare projected drain rate against the buffer; set stalling.
drop_quality BitrateController Drop the target bitrate by one rung.
play Decoder, MediaSession, Logger Compute downloaded seconds, integrate the buffer, log.

At node level, the same model looks like this. Solid arrows show that one node reads another node's output; dashed arrows from state show self-reads, where a node reads its own output from the previous tick. The node colors correspond to the phase colors in the table above:

flowchart LR
    network["Network"]
    policy["QualityPolicy"]
    controller["BitrateController"]
    decoder["Decoder"]
    session["MediaSession"]
    logger["Logger"]
    controller_state(("state"))
    session_state(("state"))
    logger_state(("state"))

    network --> policy
    network --> decoder
    network --> logger
    controller --> policy
    controller --> decoder
    controller --> logger
    decoder --> session
    session --> logger
    session --> policy
    policy --> logger
    controller_state -.-> controller
    session_state -.-> session
    logger_state -.-> logger

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;
    classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;

    class network measure;
    class policy decide;
    class controller dropQuality;
    class decoder,session,logger play;
    class controller_state,session_state,logger_state state;
Full code listing: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.

Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality?  If yes, the short branch just plays the
next chunk.  If no, the long branch first lowers the target bitrate, then
plays.  Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.

Phase graph::

    measure (init) -> decide -+-[healthy]--> play -> bottom
                              |
                              +-[stalling]-> drop_quality -> play -> bottom
"""

from __future__ import annotations

from regelum import (
    Clock,
    Else,
    Goto,
    If,
    Input,
    Node,
    NodeInputs,
    NodeOutputs,
    Output,
    Phase,
    PhasedReactiveSystem,
    V,
    terminate,
)

TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0


class Network(Node):
    """Stochastic-looking but deterministic bandwidth model.

    Drops to a slow link in the middle of the run so the policy has to react,
    then recovers so the buffer can refill.  The schedule is fixed to keep the
    example reproducible.
    """

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)

    class Outputs(NodeOutputs):
        bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))

    def run(self, inputs: Inputs) -> Outputs:
        if inputs.tick < 6:
            value = 2400.0
        elif inputs.tick < 14:
            value = 600.0
        elif inputs.tick < 22:
            value = 1100.0
        else:
            value = 2400.0
        return self.Outputs(bandwidth_kbps=value)


class QualityPolicy(Node):
    """Decides whether the player is about to stall.

    The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
    lost per wall-second.  If buffered seconds will not survive
    ``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
    """

    class Inputs(NodeInputs):
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)

    class Outputs(NodeOutputs):
        stalling: bool = Output(initial=False)

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
        if drain <= 0.0:
            return self.Outputs(stalling=False)
        time_to_empty = inputs.buffer_seconds / drain
        return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)


class BitrateController(Node):
    """Owns the current target bitrate.  Drops one rung when invoked."""

    class Inputs(NodeInputs):
        current: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        value: int = Output(initial=TOP_BITRATE_KBPS)

    def run(self, inputs: Inputs) -> Outputs:
        try:
            index = BITRATE_LADDER_KBPS.index(inputs.current)
        except ValueError:
            index = len(BITRATE_LADDER_KBPS) - 1
        next_index = max(0, index - 1)
        return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])


class Decoder(Node):
    """Models how many seconds of video can be downloaded in one tick.

    With ``bandwidth_kbps`` of throughput and a video encoded at
    ``bitrate_kbps``, one wall-second of downloading produces
    ``bandwidth / bitrate`` seconds of playable content.
    """

    class Inputs(NodeInputs):
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        fetched_seconds: float

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)


class MediaSession(Node):
    """The plant.  Buffer fills with newly fetched video, drains with playback."""

    class Inputs(NodeInputs):
        previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        fetched: float = Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(NodeOutputs):
        buffer_seconds: float = Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


class Logger(Node):
    """Appends a per-tick record so the trajectory is visible after the run."""

    Sample = tuple[int, float, int, float, bool]

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
        history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)

    class Outputs(NodeOutputs):
        history: list["Logger.Sample"] = Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        record: Logger.Sample = (
            inputs.tick,
            inputs.bandwidth_kbps,
            inputs.bitrate_kbps,
            inputs.buffer_seconds,
            inputs.stalling,
        )
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)


def build_system() -> PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return PhasedReactiveSystem(
        phases=[
            Phase(
                "measure",
                nodes=(network,),
                transitions=(Goto("decide"),),
                is_initial=True,
            ),
            Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
                    Else("play", name="healthy"),
                ),
            ),
            Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(Goto("play"),),
            ),
            Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(Goto(terminate),),
            ),
        ],
    )


def main() -> None:
    system = build_system()
    print(f"compile_ok = {system.compile_report.ok}")
    print(
        "phase schedules: "
        + " | ".join(
            f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
        )
    )
    print()
    print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
    print("-----+----------+---------+-----------+--------+----------------------")
    for _ in range(30):
        records = system.step()
        path_phases: list[str] = []
        for record in records:
            if not path_phases or path_phases[-1] != record.phase:
                path_phases.append(record.phase)
        path = " -> ".join(path_phases)
        snapshot = system.snapshot()
        print(
            f"{system.read(Clock.tick):4d} | "
            f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
            f"{snapshot['BitrateController.value']:7d} | "
            f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
            f"{str(snapshot['QualityPolicy.stalling']):>6} | "
            f"{path}"
        )


if __name__ == "__main__":
    main()

This page zooms in on those nodes — what a node is, how it declares its inputs and outputs, how to instantiate it, and how its run method is wired up.

What is a Node?

A node is the atomic unit of computation in regelum: it reads named input variables, writes named output variables, and implements a run method that computes the next output values.

Each node declares its interface in terms of inputs and outputs. Inputs say which values the node reads. Outputs say which values the node writes and, when needed, how those values are initialized before the first tick. The run method is the node's one-step computation: the runtime gives it resolved inputs, and the method must return an Outputs object.

Nodes are classes; phases receive instances

A node in regelum is a Python class inherited from rg.Node. The class describes the node API and behavior. Concrete node instances are then placed into phases by literally listing them in the phase definition.

Nodes may be connected across the overall system in many ways, including through values from previous ticks. However, the nodes passed to a single phase must form a DAG with respect to the read relation: draw an edge Node X --> Node Y when at least one output of Node X is an input of Node Y in that phase. This lets the runtime compile the phase and automatically resolve an execution order.

A representative node in the player is MediaSession: it owns the buffer level, fills it with newly fetched video, and drains it as playback consumes seconds.

import regelum as rg


class MediaSession(rg.Node):
    """The plant. Buffer fills with newly fetched video, drains with playback."""

    class Inputs(rg.NodeInputs):
        previous: float = rg.Input(
            source=lambda: MediaSession.Outputs.buffer_seconds
        )
        fetched: float = rg.Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(rg.NodeOutputs):
        buffer_seconds: float = rg.Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


session = MediaSession()

Let us break down what happens here. Inputs declares variables the node reads. previous reads MediaSession.Outputs.buffer_seconds, so this is a self-read from the previous tick. fetched reads Decoder.Outputs.fetched_seconds, so MediaSession depends on Decoder in the play phase. Outputs declares variables the node writes. buffer_seconds is initialized with 10.0 because the first tick needs a buffer value before MediaSession has run. Finally, run receives the input snapshot and returns an output snapshot: the node computes the next buffer level and writes it as buffer_seconds.

Full file: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.

Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality?  If yes, the short branch just plays the
next chunk.  If no, the long branch first lowers the target bitrate, then
plays.  Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.

Phase graph::

    measure (init) -> decide -+-[healthy]--> play -> bottom
                              |
                              +-[stalling]-> drop_quality -> play -> bottom
"""

from __future__ import annotations

from regelum import (
    Clock,
    Else,
    Goto,
    If,
    Input,
    Node,
    NodeInputs,
    NodeOutputs,
    Output,
    Phase,
    PhasedReactiveSystem,
    V,
    terminate,
)

TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0


class Network(Node):
    """Stochastic-looking but deterministic bandwidth model.

    Drops to a slow link in the middle of the run so the policy has to react,
    then recovers so the buffer can refill.  The schedule is fixed to keep the
    example reproducible.
    """

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)

    class Outputs(NodeOutputs):
        bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))

    def run(self, inputs: Inputs) -> Outputs:
        if inputs.tick < 6:
            value = 2400.0
        elif inputs.tick < 14:
            value = 600.0
        elif inputs.tick < 22:
            value = 1100.0
        else:
            value = 2400.0
        return self.Outputs(bandwidth_kbps=value)


class QualityPolicy(Node):
    """Decides whether the player is about to stall.

    The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
    lost per wall-second.  If buffered seconds will not survive
    ``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
    """

    class Inputs(NodeInputs):
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)

    class Outputs(NodeOutputs):
        stalling: bool = Output(initial=False)

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
        if drain <= 0.0:
            return self.Outputs(stalling=False)
        time_to_empty = inputs.buffer_seconds / drain
        return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)


class BitrateController(Node):
    """Owns the current target bitrate.  Drops one rung when invoked."""

    class Inputs(NodeInputs):
        current: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        value: int = Output(initial=TOP_BITRATE_KBPS)

    def run(self, inputs: Inputs) -> Outputs:
        try:
            index = BITRATE_LADDER_KBPS.index(inputs.current)
        except ValueError:
            index = len(BITRATE_LADDER_KBPS) - 1
        next_index = max(0, index - 1)
        return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])


class Decoder(Node):
    """Models how many seconds of video can be downloaded in one tick.

    With ``bandwidth_kbps`` of throughput and a video encoded at
    ``bitrate_kbps``, one wall-second of downloading produces
    ``bandwidth / bitrate`` seconds of playable content.
    """

    class Inputs(NodeInputs):
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        fetched_seconds: float

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)


class MediaSession(Node):
    """The plant.  Buffer fills with newly fetched video, drains with playback."""

    class Inputs(NodeInputs):
        previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        fetched: float = Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(NodeOutputs):
        buffer_seconds: float = Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


class Logger(Node):
    """Appends a per-tick record so the trajectory is visible after the run."""

    Sample = tuple[int, float, int, float, bool]

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
        history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)

    class Outputs(NodeOutputs):
        history: list["Logger.Sample"] = Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        record: Logger.Sample = (
            inputs.tick,
            inputs.bandwidth_kbps,
            inputs.bitrate_kbps,
            inputs.buffer_seconds,
            inputs.stalling,
        )
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)


def build_system() -> PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return PhasedReactiveSystem(
        phases=[
            Phase(
                "measure",
                nodes=(network,),
                transitions=(Goto("decide"),),
                is_initial=True,
            ),
            Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
                    Else("play", name="healthy"),
                ),
            ),
            Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(Goto("play"),),
            ),
            Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(Goto(terminate),),
            ),
        ],
    )


def main() -> None:
    system = build_system()
    print(f"compile_ok = {system.compile_report.ok}")
    print(
        "phase schedules: "
        + " | ".join(
            f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
        )
    )
    print()
    print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
    print("-----+----------+---------+-----------+--------+----------------------")
    for _ in range(30):
        records = system.step()
        path_phases: list[str] = []
        for record in records:
            if not path_phases or path_phases[-1] != record.phase:
                path_phases.append(record.phase)
        path = " -> ".join(path_phases)
        snapshot = system.snapshot()
        print(
            f"{system.read(Clock.tick):4d} | "
            f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
            f"{snapshot['BitrateController.value']:7d} | "
            f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
            f"{str(snapshot['QualityPolicy.stalling']):>6} | "
            f"{path}"
        )


if __name__ == "__main__":
    main()

Node API

A node in regelum is a Python class inherited from rg.Node. The class declares the node's API: which variables it reads, which variables it writes, and how one execution transforms inputs into outputs.

Every non-trivial node should make two namespaces explicit:

  • Inputs lists the variables the node reads.
  • Outputs lists the variables the node writes.

Those namespaces are ordinary nested Python classes, subclassing rg.NodeInputs and rg.NodeOutputs. This means the interface can be declared before any node instance exists: MediaSession.Outputs.buffer_seconds is a valid output reference at class declaration time, even before session = MediaSession() is constructed.

class MediaSession(rg.Node):
    class Inputs(rg.NodeInputs):
        fetched: float = rg.Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(rg.NodeOutputs):
        buffer_seconds: float = rg.Output(initial=10.0)

The runtime detects these namespaces by base class, not by name. Inputs and Outputs are the conventional names, and each node may declare at most one input namespace and at most one output namespace.

Inputs and output references

An input is connected to the output it reads. The common form is rg.Input(source=...), where source points at an output:

class Network(rg.Node):
    ...


class Decoder(rg.Node):
    class Inputs(rg.NodeInputs):
        bandwidth_kbps: float = rg.Input(source=Network.Outputs.bandwidth_kbps)

Network.Outputs.bandwidth_kbps is a class-level reference. It is concise and works when there is exactly one Network producer in the compiled system. If multiple instances of the same node class exist, use an instance-bound reference such as network.Outputs.bandwidth_kbps, or connect the ports after instantiation.

class Network(rg.Node):
    ...


network_main = Network(name="main")
network_backup = Network(name="backup")


class Decoder(rg.Node):
    class Inputs(rg.NodeInputs):
        bandwidth_kbps: float = rg.Input(
            source=network_main.Outputs.bandwidth_kbps
        )

Here Decoder reads from the concrete network_main instance. The network_backup instance can exist in the same system without making the reference ambiguous.

A bare input annotation is shorthand for an unconnected input:

class Inputs(rg.NodeInputs):
    value: float

This is equivalent to:

class Inputs(rg.NodeInputs):
    value: float = rg.Input()

Unconnected inputs are compile errors unless connected later with post-instantiation connections.

Lazy references

Sometimes the producer cannot be referenced directly at class-body evaluation time. This happens when a node reads its own output, or when a producer class is defined later in the file. Use a zero-argument callable for those cases:

class MediaSession(rg.Node):
    class Inputs(rg.NodeInputs):
        previous: float = rg.Input(
            source=lambda: MediaSession.Outputs.buffer_seconds
        )

The lambda is not a different kind of connection. It is a lazy reference: regelum resolves it during compilation, after Python has finished creating the relevant classes. It also keeps linters and type checkers from complaining about names that are not available yet.

Self-referential inputs are how a node carries state across ticks. Here MediaSession reads the buffer_seconds value it wrote in the previous tick, then writes the next value in run.

Outputs

Outputs are declared by subclassing rg.NodeOutputs. Each output is owned by exactly one node, so there is never ambiguity about who produced a value. This rules out write/write races by construction.

class Outputs(rg.NodeOutputs):
    buffer_seconds: float = rg.Output(initial=10.0)

A bare output annotation is shorthand for an output without an initial value:

class Outputs(rg.NodeOutputs):
    fetched_seconds: float

Decoder.fetched_seconds can be declared this way because it is produced in play before MediaSession reads it in the same phase. Compilation rejects bare outputs that are read before they can be produced.

Post-instantiation connections

Connections do not have to be fully described inside the node class. You can instantiate nodes first and then link an input to a concrete output with port(...).connect(...). This is useful when identity matters, especially in multi-instance systems.

session_main = MediaSession(name="main")
session_pip = MediaSession(name="pip")
policy = QualityPolicy()

rg.port(policy.Inputs.buffer_seconds).connect(session_main.Outputs.buffer_seconds)

The port(...) wrapper exposes .connect(...) while preserving the normal descriptor syntax for type checkers and readers. Both directions of .connect(...) are accepted as long as one side is an input and the other side is an output:

rg.port(policy.Inputs.buffer_seconds).connect(session_main.Outputs.buffer_seconds)
rg.port(session_main.Outputs.buffer_seconds).connect(policy.Inputs.buffer_seconds)

Input-to-input and output-to-output connections are errors. If a class-level reference becomes ambiguous because several producers match, the fix is to use an instance-bound reference or an explicit post-instantiation connection.

Initial values

System state stores output values. Inputs read either values produced earlier in the same phase or values already present in state. An output needs an initial value when it may be read before the first time its node runs.

class MediaSession(rg.Node):
    class Outputs(rg.NodeOutputs):
        buffer_seconds: float = rg.Output(initial=10.0)

buffer_seconds is read by QualityPolicy in decide before MediaSession writes it in play, so it needs initial=10.0. The same idea applies to persistent outputs such as Network.bandwidth_kbps, BitrateController.value, and QualityPolicy.stalling.

Initial values can be written in three forms. Use a direct value when the value is static:

buffer_seconds: float = rg.Output(initial=10.0)

Use a zero-argument callable for fresh mutable objects. This avoids accidentally sharing one list between systems:

class Logger(rg.Node):
    class Outputs(rg.NodeOutputs):
        history: list[Logger.Sample] = rg.Output(initial=lambda: [])

Use a one-argument callable when the initial value depends on the node instance. The argument receives the concrete node instance; self is just the recommended spelling:

from typing import cast


class MediaSession(rg.Node):
    def __init__(
        self,
        *,
        initial_buffer: float = 10.0,
        name: str | None = None,
    ) -> None:
        super().__init__(name=name)
        self.initial_buffer = initial_buffer

    class Outputs(rg.NodeOutputs):
        buffer_seconds: float = rg.Output(
            initial=lambda self: cast(MediaSession, self).initial_buffer,
        )


short_start = MediaSession(initial_buffer=2.0)
full_start = MediaSession(initial_buffer=10.0)

Overriding initial state for a run

initial_state can override declared initial values for a single run without mutating the node declaration.

system.reset(
    initial_state={
        MediaSession.Outputs.buffer_seconds: 5.0,
        BitrateController.Outputs.value: 720,
    }
)

Once a system is instantiated, compilation has already resolved all output paths and can tell you which outputs define the initial state surface. Use system.compile_report.minimal_initial_outputs to inspect the outputs that actually need initial values for this compiled graph, and system.compile_report.required_initial_outputs to inspect the outputs that must be supplied because no declaration provides an initial value.

system = build_system()
report = system.compile_report

print(report.minimal_initial_outputs)
print(report.required_initial_outputs)

If required_initial_outputs is non-empty, provide those values in initial_state before running or resetting the system. See Compile report for inspecting the compiled initial-state requirements and Reset for the runtime reset behavior.

Run methods

run computes outputs for one execution of the node. The canonical form receives the input namespace and returns the output namespace:

def run(self, inputs: Inputs) -> Outputs:
    return self.Outputs(buffer_seconds=...)

No-input nodes may write run(self). Compact nodes may declare inputs directly on run; this is useful for small nodes with one or two inputs:

class TickCounter(rg.Node):
    class Outputs(rg.NodeOutputs):
        tick: int = rg.Output(initial=0)

    def run(
        self,
        tick: int = rg.Input(source=lambda: TickCounter.Outputs.tick),
    ) -> Outputs:
        return self.Outputs(tick=tick + 1)

Do not mix compact run inputs with a NodeInputs namespace in the same node.

System clock inputs

Every rg.PhasedReactiveSystem has a built-in system clock. It is not a Node, is not listed in any phase, and Clock is a reserved node name. Read it through rg.Clock.tick and rg.Clock.time:

class Network(rg.Node):
    class Inputs(rg.NodeInputs):
        tick: int = rg.Input(source=rg.Clock.tick)
        time: float = rg.Input(source=rg.Clock.time)

Clock.tick is the integer tick index. Clock.time is the physical time computed from tick * base_dt for discrete-only systems, or from the latest continuous integration boundary when the tick contains continuous dynamics. The clock can also be used in guards, for example rg.If(rg.V(rg.Clock.tick) >= 10, rg.terminate).

Mutating inputs

Inputs are normal Python objects. If an input value is mutable, run receives a reference to that object. The video player's Logger intentionally uses this for its own persistent history: it reads its own previous history, appends one record, and writes the same list back as the next output value.

class Logger(rg.Node):
    class Inputs(rg.NodeInputs):
        history: list[Logger.Sample] = rg.Input(
            source=lambda: Logger.Outputs.history
        )

    class Outputs(rg.NodeOutputs):
        history: list[Logger.Sample] = rg.Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)

Even in this in-place pattern, run must still return an Outputs object. The returned output namespace is the only supported way to commit writes back to system state.

This pattern is acceptable when the input is a self-read: the node is mutating its own carried state. Do not mutate inputs that belong to another node. That creates hidden side effects outside the output contract, and Python cannot reliably catch it at compile time. Treat foreign inputs as read-only; mutating them inside run is against the style guide.

Instances and names

Each node instance has a name. The name is used in state paths, compile reports, and snapshots.

If no name is passed, the class name is used. Implicit duplicates are deduplicated automatically (Plant, Plant_2, ...). Explicit duplicate names are compile errors.

session_a = MediaSession()
session_b = MediaSession()
session_c = MediaSession(name="archive")
Full file: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.

Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality?  If yes, the short branch just plays the
next chunk.  If no, the long branch first lowers the target bitrate, then
plays.  Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.

Phase graph::

    measure (init) -> decide -+-[healthy]--> play -> bottom
                              |
                              +-[stalling]-> drop_quality -> play -> bottom
"""

from __future__ import annotations

from regelum import (
    Clock,
    Else,
    Goto,
    If,
    Input,
    Node,
    NodeInputs,
    NodeOutputs,
    Output,
    Phase,
    PhasedReactiveSystem,
    V,
    terminate,
)

TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0


class Network(Node):
    """Stochastic-looking but deterministic bandwidth model.

    Drops to a slow link in the middle of the run so the policy has to react,
    then recovers so the buffer can refill.  The schedule is fixed to keep the
    example reproducible.
    """

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)

    class Outputs(NodeOutputs):
        bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))

    def run(self, inputs: Inputs) -> Outputs:
        if inputs.tick < 6:
            value = 2400.0
        elif inputs.tick < 14:
            value = 600.0
        elif inputs.tick < 22:
            value = 1100.0
        else:
            value = 2400.0
        return self.Outputs(bandwidth_kbps=value)


class QualityPolicy(Node):
    """Decides whether the player is about to stall.

    The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
    lost per wall-second.  If buffered seconds will not survive
    ``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
    """

    class Inputs(NodeInputs):
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)

    class Outputs(NodeOutputs):
        stalling: bool = Output(initial=False)

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
        if drain <= 0.0:
            return self.Outputs(stalling=False)
        time_to_empty = inputs.buffer_seconds / drain
        return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)


class BitrateController(Node):
    """Owns the current target bitrate.  Drops one rung when invoked."""

    class Inputs(NodeInputs):
        current: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        value: int = Output(initial=TOP_BITRATE_KBPS)

    def run(self, inputs: Inputs) -> Outputs:
        try:
            index = BITRATE_LADDER_KBPS.index(inputs.current)
        except ValueError:
            index = len(BITRATE_LADDER_KBPS) - 1
        next_index = max(0, index - 1)
        return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])


class Decoder(Node):
    """Models how many seconds of video can be downloaded in one tick.

    With ``bandwidth_kbps`` of throughput and a video encoded at
    ``bitrate_kbps``, one wall-second of downloading produces
    ``bandwidth / bitrate`` seconds of playable content.
    """

    class Inputs(NodeInputs):
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        fetched_seconds: float

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)


class MediaSession(Node):
    """The plant.  Buffer fills with newly fetched video, drains with playback."""

    class Inputs(NodeInputs):
        previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        fetched: float = Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(NodeOutputs):
        buffer_seconds: float = Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


class Logger(Node):
    """Appends a per-tick record so the trajectory is visible after the run."""

    Sample = tuple[int, float, int, float, bool]

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
        history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)

    class Outputs(NodeOutputs):
        history: list["Logger.Sample"] = Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        record: Logger.Sample = (
            inputs.tick,
            inputs.bandwidth_kbps,
            inputs.bitrate_kbps,
            inputs.buffer_seconds,
            inputs.stalling,
        )
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)


def build_system() -> PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return PhasedReactiveSystem(
        phases=[
            Phase(
                "measure",
                nodes=(network,),
                transitions=(Goto("decide"),),
                is_initial=True,
            ),
            Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
                    Else("play", name="healthy"),
                ),
            ),
            Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(Goto("play"),),
            ),
            Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(Goto(terminate),),
            ),
        ],
    )


def main() -> None:
    system = build_system()
    print(f"compile_ok = {system.compile_report.ok}")
    print(
        "phase schedules: "
        + " | ".join(
            f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
        )
    )
    print()
    print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
    print("-----+----------+---------+-----------+--------+----------------------")
    for _ in range(30):
        records = system.step()
        path_phases: list[str] = []
        for record in records:
            if not path_phases or path_phases[-1] != record.phase:
                path_phases.append(record.phase)
        path = " -> ".join(path_phases)
        snapshot = system.snapshot()
        print(
            f"{system.read(Clock.tick):4d} | "
            f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
            f"{snapshot['BitrateController.value']:7d} | "
            f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
            f"{str(snapshot['QualityPolicy.stalling']):>6} | "
            f"{path}"
        )


if __name__ == "__main__":
    main()

Custom constructors should forward name to Node.

    def __init__(self, *, name: str | None = None) -> None:
        super().__init__(name=name)

Rules

  • A node class declares behavior and port shape.
  • A node instance owns runtime identity and configuration.
  • run must always return the node output namespace.
  • Node instances, not node classes, are assigned to phases.
  • Inputs read outputs.
  • Outputs define state values.
  • Bare input annotations create unconnected inputs.
  • Bare output annotations create outputs without initial values.
  • Class-level references must resolve to exactly one producer.
  • Use instance-bound references or port(...).connect(...) to remove ambiguity.
  • Previous-state outputs need initial values.
  • Intermediate outputs may omit initial values.
  • Mutable defaults should use callables.