Nodes
Recap
In the overview we took the adaptive-bitrate video player and turned it into a phase graph.
The running example: an adaptive-bitrate video player
Every adult has watched a video that quietly dropped from 1080p to 480p when the network slowed down. The player keeps a few seconds of pre-downloaded video in a buffer; when the network slows it must lower the target quality before the buffer runs out.
flowchart LR
init([init]) --> measure
measure --> decide[decide]
decide -->|healthy| play
decide -->|stalling| drop_quality
drop_quality --> play
play --> done([⊥])
classDef measure fill:#2f6fed22,stroke:#2f6fed;
classDef decide fill:#7c3aed22,stroke:#7c3aed;
classDef dropQuality fill:#d9770622,stroke:#d97706;
classDef play fill:#15803d22,stroke:#15803d;
class measure measure;
class decide decide;
class drop_quality dropQuality;
class play play;
One pass through this graph from init to ⊥ is a tick.
The graph is high-level; each phase is itself made of smaller pieces called
nodes, distributed like this:
| Phase | Nodes | What happens |
|---|---|---|
| measure | Network |
Sample the current bandwidth from the system clock. |
| decide | QualityPolicy |
Compare projected drain rate against the buffer; set stalling. |
| drop_quality | BitrateController |
Drop the target bitrate by one rung. |
| play | Decoder, MediaSession, Logger |
Compute downloaded seconds, integrate the buffer, log. |
At node level, the same model looks like this.
Solid arrows show that one node reads another node's output; dashed
arrows from state show self-reads, where a node reads its own output from
the previous tick.
The node colors correspond to the phase colors in the table above:
flowchart LR
network["Network"]
policy["QualityPolicy"]
controller["BitrateController"]
decoder["Decoder"]
session["MediaSession"]
logger["Logger"]
controller_state(("state"))
session_state(("state"))
logger_state(("state"))
network --> policy
network --> decoder
network --> logger
controller --> policy
controller --> decoder
controller --> logger
decoder --> session
session --> logger
session --> policy
policy --> logger
controller_state -.-> controller
session_state -.-> session
logger_state -.-> logger
classDef measure fill:#2f6fed22,stroke:#2f6fed;
classDef decide fill:#7c3aed22,stroke:#7c3aed;
classDef dropQuality fill:#d9770622,stroke:#d97706;
classDef play fill:#15803d22,stroke:#15803d;
classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;
class network measure;
class policy decide;
class controller dropQuality;
class decoder,session,logger play;
class controller_state,session_state,logger_state state;
Full code listing: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
from regelum import (
Clock,
Else,
Goto,
If,
Input,
Node,
NodeInputs,
NodeOutputs,
Output,
Phase,
PhasedReactiveSystem,
V,
terminate,
)
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
class Outputs(NodeOutputs):
bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))
def run(self, inputs: Inputs) -> Outputs:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.Outputs(bandwidth_kbps=value)
class QualityPolicy(Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(NodeInputs):
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
class Outputs(NodeOutputs):
stalling: bool = Output(initial=False)
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.Outputs(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(NodeInputs):
current: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
value: int = Output(initial=TOP_BITRATE_KBPS)
def run(self, inputs: Inputs) -> Outputs:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(NodeInputs):
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
fetched_seconds: float
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(NodeInputs):
previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
fetched: float = Input(source=Decoder.Outputs.fetched_seconds)
class Outputs(NodeOutputs):
buffer_seconds: float = Output(initial=10.0)
def run(self, inputs: Inputs) -> Outputs:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.Outputs(buffer_seconds=max(0.0, next_buffer))
class Logger(Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)
class Outputs(NodeOutputs):
history: list["Logger.Sample"] = Output(initial=lambda: [])
def run(self, inputs: Inputs) -> Outputs:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.Outputs(history=inputs.history)
def build_system() -> PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return PhasedReactiveSystem(
phases=[
Phase(
"measure",
nodes=(network,),
transitions=(Goto("decide"),),
is_initial=True,
),
Phase(
"decide",
nodes=(policy,),
transitions=(
If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
Else("play", name="healthy"),
),
),
Phase(
"drop_quality",
nodes=(controller,),
transitions=(Goto("play"),),
),
Phase(
"play",
nodes=(decoder, session, logger),
transitions=(Goto(terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
This page zooms in on those nodes — what a node is, how it declares its
inputs and outputs, how to instantiate it, and how its run method is
wired up.
What is a Node?
A node is the atomic unit of computation in regelum: it reads named
input variables, writes named output variables, and implements a run method
that computes the next output values.
Each node declares its interface in terms of inputs and outputs.
Inputs say which values the node reads.
Outputs say which values the node writes and, when needed, how those values
are initialized before the first tick.
The run method is the node's one-step computation: the runtime gives it
resolved inputs, and the method must return an Outputs object.
Nodes are classes; phases receive instances
A node in regelum is a Python class inherited from rg.Node.
The class describes the node API and behavior.
Concrete node instances are then placed into phases by literally listing
them in the phase definition.
Nodes may be connected across the overall system in many ways, including
through values from previous ticks.
However, the nodes passed to a single phase must form a DAG with respect
to the read relation: draw an edge Node X --> Node Y when at least one
output of Node X is an input of Node Y in that phase.
This lets the runtime compile the phase and automatically resolve an
execution order.
A representative node in the player is MediaSession: it owns the buffer
level, fills it with newly fetched video, and drains it as playback consumes
seconds.
import regelum as rg
class MediaSession(rg.Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(rg.NodeInputs):
previous: float = rg.Input(
source=lambda: MediaSession.Outputs.buffer_seconds
)
fetched: float = rg.Input(source=Decoder.Outputs.fetched_seconds)
class Outputs(rg.NodeOutputs):
buffer_seconds: float = rg.Output(initial=10.0)
def run(self, inputs: Inputs) -> Outputs:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.Outputs(buffer_seconds=max(0.0, next_buffer))
session = MediaSession()
Let us break down what happens here.
Inputs declares variables the node reads.
previous reads MediaSession.Outputs.buffer_seconds, so this is a
self-read from the previous tick.
fetched reads Decoder.Outputs.fetched_seconds, so MediaSession depends
on Decoder in the play phase.
Outputs declares variables the node writes.
buffer_seconds is initialized with 10.0 because the first tick needs a
buffer value before MediaSession has run.
Finally, run receives the input snapshot and returns an output snapshot:
the node computes the next buffer level and writes it as buffer_seconds.
Full file: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
from regelum import (
Clock,
Else,
Goto,
If,
Input,
Node,
NodeInputs,
NodeOutputs,
Output,
Phase,
PhasedReactiveSystem,
V,
terminate,
)
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
class Outputs(NodeOutputs):
bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))
def run(self, inputs: Inputs) -> Outputs:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.Outputs(bandwidth_kbps=value)
class QualityPolicy(Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(NodeInputs):
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
class Outputs(NodeOutputs):
stalling: bool = Output(initial=False)
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.Outputs(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(NodeInputs):
current: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
value: int = Output(initial=TOP_BITRATE_KBPS)
def run(self, inputs: Inputs) -> Outputs:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(NodeInputs):
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
fetched_seconds: float
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(NodeInputs):
previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
fetched: float = Input(source=Decoder.Outputs.fetched_seconds)
class Outputs(NodeOutputs):
buffer_seconds: float = Output(initial=10.0)
def run(self, inputs: Inputs) -> Outputs:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.Outputs(buffer_seconds=max(0.0, next_buffer))
class Logger(Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)
class Outputs(NodeOutputs):
history: list["Logger.Sample"] = Output(initial=lambda: [])
def run(self, inputs: Inputs) -> Outputs:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.Outputs(history=inputs.history)
def build_system() -> PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return PhasedReactiveSystem(
phases=[
Phase(
"measure",
nodes=(network,),
transitions=(Goto("decide"),),
is_initial=True,
),
Phase(
"decide",
nodes=(policy,),
transitions=(
If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
Else("play", name="healthy"),
),
),
Phase(
"drop_quality",
nodes=(controller,),
transitions=(Goto("play"),),
),
Phase(
"play",
nodes=(decoder, session, logger),
transitions=(Goto(terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
Node API
A node in regelum is a Python class inherited from rg.Node.
The class declares the node's API: which variables it reads, which variables
it writes, and how one execution transforms inputs into outputs.
Every non-trivial node should make two namespaces explicit:
Inputslists the variables the node reads.Outputslists the variables the node writes.
Those namespaces are ordinary nested Python classes, subclassing
rg.NodeInputs and rg.NodeOutputs.
This means the interface can be declared before any node instance exists:
MediaSession.Outputs.buffer_seconds is a valid output reference at class
declaration time, even before session = MediaSession() is constructed.
class MediaSession(rg.Node):
class Inputs(rg.NodeInputs):
fetched: float = rg.Input(source=Decoder.Outputs.fetched_seconds)
class Outputs(rg.NodeOutputs):
buffer_seconds: float = rg.Output(initial=10.0)
The runtime detects these namespaces by base class, not by name.
Inputs and Outputs are the conventional names, and each node may declare
at most one input namespace and at most one output namespace.
Inputs and output references
An input is connected to the output it reads.
The common form is rg.Input(source=...), where source points at an output:
class Network(rg.Node):
...
class Decoder(rg.Node):
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.Input(source=Network.Outputs.bandwidth_kbps)
Network.Outputs.bandwidth_kbps is a class-level reference.
It is concise and works when there is exactly one Network producer in the
compiled system.
If multiple instances of the same node class exist, use an instance-bound
reference such as network.Outputs.bandwidth_kbps, or connect the ports after
instantiation.
class Network(rg.Node):
...
network_main = Network(name="main")
network_backup = Network(name="backup")
class Decoder(rg.Node):
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.Input(
source=network_main.Outputs.bandwidth_kbps
)
Here Decoder reads from the concrete network_main instance.
The network_backup instance can exist in the same system without making the
reference ambiguous.
A bare input annotation is shorthand for an unconnected input:
This is equivalent to:
Unconnected inputs are compile errors unless connected later with post-instantiation connections.
Lazy references
Sometimes the producer cannot be referenced directly at class-body evaluation time. This happens when a node reads its own output, or when a producer class is defined later in the file. Use a zero-argument callable for those cases:
class MediaSession(rg.Node):
class Inputs(rg.NodeInputs):
previous: float = rg.Input(
source=lambda: MediaSession.Outputs.buffer_seconds
)
The lambda is not a different kind of connection.
It is a lazy reference: regelum resolves it during compilation, after Python
has finished creating the relevant classes.
It also keeps linters and type checkers from complaining about names that are
not available yet.
Self-referential inputs are how a node carries state across ticks.
Here MediaSession reads the buffer_seconds value it wrote in the previous
tick, then writes the next value in run.
Outputs
Outputs are declared by subclassing rg.NodeOutputs.
Each output is owned by exactly one node, so there is never ambiguity about
who produced a value.
This rules out write/write races by construction.
A bare output annotation is shorthand for an output without an initial value:
Decoder.fetched_seconds can be declared this way because it is produced in
play before MediaSession reads it in the same phase.
Compilation rejects bare outputs that are read before they can be produced.
Post-instantiation connections
Connections do not have to be fully described inside the node class.
You can instantiate nodes first and then link an input to a concrete output
with port(...).connect(...).
This is useful when identity matters, especially in multi-instance systems.
session_main = MediaSession(name="main")
session_pip = MediaSession(name="pip")
policy = QualityPolicy()
rg.port(policy.Inputs.buffer_seconds).connect(session_main.Outputs.buffer_seconds)
The port(...) wrapper exposes .connect(...) while preserving the normal
descriptor syntax for type checkers and readers.
Both directions of .connect(...) are accepted as long as one side is an
input and the other side is an output:
rg.port(policy.Inputs.buffer_seconds).connect(session_main.Outputs.buffer_seconds)
rg.port(session_main.Outputs.buffer_seconds).connect(policy.Inputs.buffer_seconds)
Input-to-input and output-to-output connections are errors. If a class-level reference becomes ambiguous because several producers match, the fix is to use an instance-bound reference or an explicit post-instantiation connection.
Initial values
System state stores output values. Inputs read either values produced earlier in the same phase or values already present in state. An output needs an initial value when it may be read before the first time its node runs.
class MediaSession(rg.Node):
class Outputs(rg.NodeOutputs):
buffer_seconds: float = rg.Output(initial=10.0)
buffer_seconds is read by QualityPolicy in decide before
MediaSession writes it in play, so it needs initial=10.0.
The same idea applies to persistent outputs such as
Network.bandwidth_kbps, BitrateController.value, and
QualityPolicy.stalling.
Initial values can be written in three forms. Use a direct value when the value is static:
Use a zero-argument callable for fresh mutable objects. This avoids accidentally sharing one list between systems:
class Logger(rg.Node):
class Outputs(rg.NodeOutputs):
history: list[Logger.Sample] = rg.Output(initial=lambda: [])
Use a one-argument callable when the initial value depends on the node
instance.
The argument receives the concrete node instance; self is just the
recommended spelling:
from typing import cast
class MediaSession(rg.Node):
def __init__(
self,
*,
initial_buffer: float = 10.0,
name: str | None = None,
) -> None:
super().__init__(name=name)
self.initial_buffer = initial_buffer
class Outputs(rg.NodeOutputs):
buffer_seconds: float = rg.Output(
initial=lambda self: cast(MediaSession, self).initial_buffer,
)
short_start = MediaSession(initial_buffer=2.0)
full_start = MediaSession(initial_buffer=10.0)
Overriding initial state for a run
initial_state can override declared initial values for a single run
without mutating the node declaration.
system.reset(
initial_state={
MediaSession.Outputs.buffer_seconds: 5.0,
BitrateController.Outputs.value: 720,
}
)
Once a system is instantiated, compilation has already resolved all output
paths and can tell you which outputs define the initial state surface.
Use system.compile_report.minimal_initial_outputs to inspect the outputs
that actually need initial values for this compiled graph, and
system.compile_report.required_initial_outputs to inspect the outputs
that must be supplied because no declaration provides an initial value.
system = build_system()
report = system.compile_report
print(report.minimal_initial_outputs)
print(report.required_initial_outputs)
If required_initial_outputs is non-empty, provide those values in
initial_state before running or resetting the system.
See Compile report for inspecting the
compiled initial-state requirements and Reset for
the runtime reset behavior.
Run methods
run computes outputs for one execution of the node.
The canonical form receives the input namespace and returns the output
namespace:
No-input nodes may write run(self).
Compact nodes may declare inputs directly on run; this is useful for small
nodes with one or two inputs:
class TickCounter(rg.Node):
class Outputs(rg.NodeOutputs):
tick: int = rg.Output(initial=0)
def run(
self,
tick: int = rg.Input(source=lambda: TickCounter.Outputs.tick),
) -> Outputs:
return self.Outputs(tick=tick + 1)
Do not mix compact run inputs with a NodeInputs namespace in the same
node.
System clock inputs
Every rg.PhasedReactiveSystem has a built-in system clock. It is not a
Node, is not listed in any phase, and Clock is a reserved node name.
Read it through rg.Clock.tick and rg.Clock.time:
class Network(rg.Node):
class Inputs(rg.NodeInputs):
tick: int = rg.Input(source=rg.Clock.tick)
time: float = rg.Input(source=rg.Clock.time)
Clock.tick is the integer tick index. Clock.time is the physical time
computed from tick * base_dt for discrete-only systems, or from the latest
continuous integration boundary when the tick contains continuous dynamics.
The clock can also be used in guards, for example
rg.If(rg.V(rg.Clock.tick) >= 10, rg.terminate).
Mutating inputs
Inputs are normal Python objects.
If an input value is mutable, run receives a reference to that object.
The video player's Logger intentionally uses this for its own persistent
history: it reads its own previous history, appends one record, and writes
the same list back as the next output value.
class Logger(rg.Node):
class Inputs(rg.NodeInputs):
history: list[Logger.Sample] = rg.Input(
source=lambda: Logger.Outputs.history
)
class Outputs(rg.NodeOutputs):
history: list[Logger.Sample] = rg.Output(initial=lambda: [])
def run(self, inputs: Inputs) -> Outputs:
inputs.history.append(record)
return self.Outputs(history=inputs.history)
Even in this in-place pattern, run must still return an Outputs object.
The returned output namespace is the only supported way to commit writes back
to system state.
This pattern is acceptable when the input is a self-read: the node is
mutating its own carried state.
Do not mutate inputs that belong to another node.
That creates hidden side effects outside the output contract, and Python
cannot reliably catch it at compile time.
Treat foreign inputs as read-only; mutating them inside run is against the
style guide.
Instances and names
Each node instance has a name. The name is used in state paths, compile reports, and snapshots.
If no name is passed, the class name is used.
Implicit duplicates are deduplicated automatically (Plant, Plant_2, ...).
Explicit duplicate names are compile errors.
Full file: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
from regelum import (
Clock,
Else,
Goto,
If,
Input,
Node,
NodeInputs,
NodeOutputs,
Output,
Phase,
PhasedReactiveSystem,
V,
terminate,
)
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
class Outputs(NodeOutputs):
bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))
def run(self, inputs: Inputs) -> Outputs:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.Outputs(bandwidth_kbps=value)
class QualityPolicy(Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(NodeInputs):
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
class Outputs(NodeOutputs):
stalling: bool = Output(initial=False)
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.Outputs(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(NodeInputs):
current: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
value: int = Output(initial=TOP_BITRATE_KBPS)
def run(self, inputs: Inputs) -> Outputs:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(NodeInputs):
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
fetched_seconds: float
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(NodeInputs):
previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
fetched: float = Input(source=Decoder.Outputs.fetched_seconds)
class Outputs(NodeOutputs):
buffer_seconds: float = Output(initial=10.0)
def run(self, inputs: Inputs) -> Outputs:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.Outputs(buffer_seconds=max(0.0, next_buffer))
class Logger(Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)
class Outputs(NodeOutputs):
history: list["Logger.Sample"] = Output(initial=lambda: [])
def run(self, inputs: Inputs) -> Outputs:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.Outputs(history=inputs.history)
def build_system() -> PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return PhasedReactiveSystem(
phases=[
Phase(
"measure",
nodes=(network,),
transitions=(Goto("decide"),),
is_initial=True,
),
Phase(
"decide",
nodes=(policy,),
transitions=(
If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
Else("play", name="healthy"),
),
),
Phase(
"drop_quality",
nodes=(controller,),
transitions=(Goto("play"),),
),
Phase(
"play",
nodes=(decoder, session, logger),
transitions=(Goto(terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
Custom constructors should forward name to Node.
Rules
- A node class declares behavior and port shape.
- A node instance owns runtime identity and configuration.
runmust always return the node output namespace.- Node instances, not node classes, are assigned to phases.
- Inputs read outputs.
- Outputs define state values.
- Bare input annotations create unconnected inputs.
- Bare output annotations create outputs without initial values.
- Class-level references must resolve to exactly one producer.
- Use instance-bound references or
port(...).connect(...)to remove ambiguity. - Previous-state outputs need initial values.
- Intermediate outputs may omit initial values.
- Mutable defaults should use callables.