Nodes
Recap
In the overview we took the adaptive-bitrate video player and turned it into a phase graph.
The running example: an adaptive-bitrate video player
Every adult has watched a video that quietly dropped from 1080p to 480p when the network slowed down. The player keeps a few seconds of pre-downloaded video in a buffer; when the network slows it must lower the target quality before the buffer runs out.
flowchart LR
init([init]) --> measure
measure --> decide[decide]
decide -->|healthy| play
decide -->|stalling| drop_quality
drop_quality --> play
play --> done([⊥])
classDef measure fill:#2f6fed22,stroke:#2f6fed;
classDef decide fill:#7c3aed22,stroke:#7c3aed;
classDef dropQuality fill:#d9770622,stroke:#d97706;
classDef play fill:#15803d22,stroke:#15803d;
class measure measure;
class decide decide;
class drop_quality dropQuality;
class play play;
One pass through this graph from init to ⊥ is a tick.
The graph is high-level; each phase is itself made of smaller pieces called
nodes, distributed like this:
| Phase | Nodes | What happens |
|---|---|---|
| measure | Network |
Sample the current bandwidth from the system clock. |
| decide | QualityPolicy |
Compare projected drain rate against the buffer; set stalling. |
| drop_quality | BitrateController |
Drop the target bitrate by one rung. |
| play | Decoder, MediaSession, Logger |
Compute downloaded seconds, integrate the buffer, log. |
At node level, the same model looks like this.
Solid arrows show that one node reads another node's state variable; dashed
arrows from state show self-reads, where a node reads its own state variable from
the previous tick.
The node colors correspond to the phase colors in the table above:
flowchart LR
network["Network"]
policy["QualityPolicy"]
controller["BitrateController"]
decoder["Decoder"]
session["MediaSession"]
logger["Logger"]
controller_state(("state"))
session_state(("state"))
logger_state(("state"))
network --> policy
network --> decoder
network --> logger
controller --> policy
controller --> decoder
controller --> logger
decoder --> session
session --> logger
session --> policy
policy --> logger
controller_state -.-> controller
session_state -.-> session
logger_state -.-> logger
classDef measure fill:#2f6fed22,stroke:#2f6fed;
classDef decide fill:#7c3aed22,stroke:#7c3aed;
classDef dropQuality fill:#d9770622,stroke:#d97706;
classDef play fill:#15803d22,stroke:#15803d;
classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;
class network measure;
class policy decide;
class controller dropQuality;
class decoder,session,logger play;
class controller_state,session_state,logger_state state;
Full code listing: examples/video_player/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
import regelum as rg
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(rg.Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
class State(rg.NodeState):
bandwidth_kbps: float = rg.var(init=float(TOP_BITRATE_KBPS))
def update(self, inputs: Inputs) -> State:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.State(bandwidth_kbps=value)
class QualityPolicy(rg.Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(rg.NodeInputs):
buffer_seconds: float = rg.src(lambda: MediaSession.State.buffer_seconds)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
class State(rg.NodeState):
stalling: bool = rg.var(init=False)
def update(self, inputs: Inputs) -> State:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.State(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.State(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(rg.Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(rg.NodeInputs):
current: int = rg.src(lambda: BitrateController.State.value)
class State(rg.NodeState):
value: int = rg.var(init=TOP_BITRATE_KBPS)
def update(self, inputs: Inputs) -> State:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.State(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(rg.Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
class State(rg.NodeState):
fetched_seconds: float
def update(self, inputs: Inputs) -> State:
bitrate = max(inputs.bitrate_kbps, 1)
return self.State(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(rg.Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(rg.NodeInputs):
previous: float = rg.src(lambda: MediaSession.State.buffer_seconds)
fetched: float = rg.src(Decoder.State.fetched_seconds)
class State(rg.NodeState):
buffer_seconds: float = rg.var(init=10.0)
def update(self, inputs: Inputs) -> State:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.State(buffer_seconds=max(0.0, next_buffer))
class Logger(rg.Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
buffer_seconds: float = rg.src(lambda: MediaSession.State.buffer_seconds)
stalling: bool = rg.src(QualityPolicy.State.stalling)
history: list["Logger.Sample"] = rg.src(lambda: Logger.State.history)
class State(rg.NodeState):
history: list["Logger.Sample"] = rg.var(init=lambda: [])
def update(self, inputs: Inputs) -> State:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.State(history=inputs.history)
def build_system() -> rg.PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return rg.PhasedReactiveSystem(
phases=[
rg.Phase(
"measure",
nodes=(network,),
transitions=(rg.Goto("decide"),),
is_initial=True,
),
rg.Phase(
"decide",
nodes=(policy,),
transitions=(
rg.If(rg.V(policy.State.stalling), "drop_quality", name="stalling"),
rg.Else("play", name="healthy"),
),
),
rg.Phase(
"drop_quality",
nodes=(controller,),
transitions=(rg.Goto("play"),),
),
rg.Phase(
"play",
nodes=(decoder, session, logger),
transitions=(rg.Goto(rg.terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(rg.Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
This page zooms in on those nodes — what a node is, how it declares its
inputs and state variables, how to instantiate it, and how its update method is
wired up.
What is a Node?
A node is the atomic unit of computation in regelum: it reads named
input variables, writes named state variable variables, and implements a update method
that computes the next state variable values.
Each node declares its interface in terms of inputs and state variables.
Inputs say which values the node reads.
State say which values the node writes and, when needed, how those values
are initialized before the first tick.
The update method is the node's one-step computation: the runtime gives it
resolved inputs, and the method must return an State object.
Nodes are classes; phases receive instances
A node in regelum is a Python class inherited from rg.Node.
The class describes the node API and behavior.
Concrete node instances are then placed into phases by literally listing
them in the phase definition.
Nodes may be connected across the overall system in many ways, including
through values from previous ticks.
However, the nodes passed to a single phase must form a DAG with respect
to the read relation: draw an edge Node X --> Node Y when at least one
state variable of Node X is an input of Node Y in that phase.
This lets the runtime compile the phase and automatically resolve an
execution order.
A representative node in the player is MediaSession: it owns the buffer
level, fills it with newly fetched video, and drains it as playback consumes
seconds.
import regelum as rg
class MediaSession(rg.Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(rg.NodeInputs):
previous: float = rg.src(lambda: MediaSession.State.buffer_seconds)
fetched: float = rg.src(Decoder.State.fetched_seconds)
class State(rg.NodeState):
buffer_seconds: float = rg.var(init=10.0)
def update(self, inputs: Inputs) -> State:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.State(buffer_seconds=max(0.0, next_buffer))
session = MediaSession()
Let us break down what happens here.
Inputs declares variables the node reads.
previous reads MediaSession.State.buffer_seconds, so this is a
self-read from the previous tick.
fetched reads Decoder.State.fetched_seconds, so MediaSession depends
on Decoder in the play phase.
State declares variables the node writes.
buffer_seconds is initialized with 10.0 because the first tick needs a
buffer value before MediaSession has updated.
Finally, update receives the input snapshot and returns a state variable snapshot:
the node computes the next buffer level and writes it as buffer_seconds.
Full file: examples/video_player/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
import regelum as rg
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(rg.Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
class State(rg.NodeState):
bandwidth_kbps: float = rg.var(init=float(TOP_BITRATE_KBPS))
def update(self, inputs: Inputs) -> State:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.State(bandwidth_kbps=value)
class QualityPolicy(rg.Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(rg.NodeInputs):
buffer_seconds: float = rg.src(lambda: MediaSession.State.buffer_seconds)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
class State(rg.NodeState):
stalling: bool = rg.var(init=False)
def update(self, inputs: Inputs) -> State:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.State(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.State(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(rg.Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(rg.NodeInputs):
current: int = rg.src(lambda: BitrateController.State.value)
class State(rg.NodeState):
value: int = rg.var(init=TOP_BITRATE_KBPS)
def update(self, inputs: Inputs) -> State:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.State(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(rg.Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
class State(rg.NodeState):
fetched_seconds: float
def update(self, inputs: Inputs) -> State:
bitrate = max(inputs.bitrate_kbps, 1)
return self.State(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(rg.Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(rg.NodeInputs):
previous: float = rg.src(lambda: MediaSession.State.buffer_seconds)
fetched: float = rg.src(Decoder.State.fetched_seconds)
class State(rg.NodeState):
buffer_seconds: float = rg.var(init=10.0)
def update(self, inputs: Inputs) -> State:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.State(buffer_seconds=max(0.0, next_buffer))
class Logger(rg.Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
buffer_seconds: float = rg.src(lambda: MediaSession.State.buffer_seconds)
stalling: bool = rg.src(QualityPolicy.State.stalling)
history: list["Logger.Sample"] = rg.src(lambda: Logger.State.history)
class State(rg.NodeState):
history: list["Logger.Sample"] = rg.var(init=lambda: [])
def update(self, inputs: Inputs) -> State:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.State(history=inputs.history)
def build_system() -> rg.PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return rg.PhasedReactiveSystem(
phases=[
rg.Phase(
"measure",
nodes=(network,),
transitions=(rg.Goto("decide"),),
is_initial=True,
),
rg.Phase(
"decide",
nodes=(policy,),
transitions=(
rg.If(rg.V(policy.State.stalling), "drop_quality", name="stalling"),
rg.Else("play", name="healthy"),
),
),
rg.Phase(
"drop_quality",
nodes=(controller,),
transitions=(rg.Goto("play"),),
),
rg.Phase(
"play",
nodes=(decoder, session, logger),
transitions=(rg.Goto(rg.terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(rg.Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
Node API
A node in regelum is a Python class inherited from rg.Node.
The class declares the node's API: which variables it reads, which variables
it writes, and how one execution transforms inputs into state variables.
Every non-trivial node should make two namespaces explicit:
Inputslists the variables the node reads.Statelists the variables the node writes.
Those namespaces are ordinary nested Python classes, subclassing
rg.NodeInputs and rg.NodeState.
This means the interface can be declared before any node instance exists:
MediaSession.State.buffer_seconds is a valid state reference at class
declaration time, even before session = MediaSession() is constructed.
class MediaSession(rg.Node):
class Inputs(rg.NodeInputs):
fetched: float = rg.src(Decoder.State.fetched_seconds)
class State(rg.NodeState):
buffer_seconds: float = rg.var(init=10.0)
The runtime detects these namespaces by base class, not by name.
Inputs and State are the conventional names, and each node may declare
at most one input namespace and at most one state namespace.
Inputs and state references
An input is connected to the state variable it reads.
The common form is rg.src(...), where src points at a state variable:
class Network(rg.Node):
...
class Decoder(rg.Node):
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
Network.State.bandwidth_kbps is a class-level reference.
It is concise and works when there is exactly one Network producer in the
compiled system.
If multiple instances of the same node class exist, use an instance-bound
reference such as network.State.bandwidth_kbps, or connect the ports after
instantiation.
class Network(rg.Node):
...
network_main = Network(name="main")
network_backup = Network(name="backup")
class Decoder(rg.Node):
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.src(network_main.State.bandwidth_kbps)
Here Decoder reads from the concrete network_main instance.
The network_backup instance can exist in the same system without making the
reference ambiguous.
A bare input annotation is shorthand for an unconnected input:
This is equivalent to:
Unconnected inputs are compile errors unless connected later with post-instantiation connections.
Lazy references
Sometimes the producer cannot be referenced directly at class-body evaluation time. This happens when a node reads its own state variable, or when a producer class is defined later in the file. Use a zero-argument callable for those cases:
class MediaSession(rg.Node):
class Inputs(rg.NodeInputs):
previous: float = rg.src(lambda: MediaSession.State.buffer_seconds)
The lambda is not a different kind of connection.
It is a lazy reference: regelum resolves it during compilation, after Python
has finished creating the relevant classes.
It also keeps linters and type checkers from complaining about names that are
not available yet.
Self-referential inputs are how a node carries state across ticks.
Here MediaSession reads the buffer_seconds value it wrote in the previous
tick, then writes the next value in update.
State
State are declared by subclassing rg.NodeState.
Each state variable is owned by exactly one node, so there is never ambiguity about
who produced a value.
This rules out write/write races by construction.
A bare state variable annotation is shorthand for a state variable without an initial value:
Decoder.fetched_seconds can be declared this way because it is produced in
play before MediaSession reads it in the same phase.
Compilation rejects bare state variables that are read before they can be produced.
Post-instantiation connections
Connections do not have to be fully described inside the node class.
You can instantiate nodes first and then link an input to a concrete state variable
with port(...).connect(...).
This is useful when identity matters, especially in multi-instance systems.
session_main = MediaSession(name="main")
session_pip = MediaSession(name="pip")
policy = QualityPolicy()
rg.port(policy.Inputs.buffer_seconds).connect(session_main.State.buffer_seconds)
The port(...) wrapper exposes .connect(...) while preserving the normal
descriptor syntax for type checkers and readers.
Both directions of .connect(...) are accepted as long as one side is an
input and the other side is a state variable:
rg.port(policy.Inputs.buffer_seconds).connect(session_main.State.buffer_seconds)
rg.port(session_main.State.buffer_seconds).connect(policy.Inputs.buffer_seconds)
Input-to-input and state variable-to-state variable connections are errors. If a class-level reference becomes ambiguous because several producers match, the fix is to use an instance-bound reference or an explicit post-instantiation connection.
Initial values
System state stores state variable values. Inputs read either values produced earlier in the same phase or values already present in state. An state variable needs an initial value when it may be read before the first time its node runs.
buffer_seconds is read by QualityPolicy in decide before
MediaSession writes it in play, so it needs init=10.0.
The same idea applies to persistent state variables such as
Network.bandwidth_kbps, BitrateController.value, and
QualityPolicy.stalling.
Initial values can be written in three forms. Use a direct value when the value is static:
Use a zero-argument callable for fresh mutable objects. This avoids accidentally sharing one list between systems:
class Logger(rg.Node):
class State(rg.NodeState):
history: list[Logger.Sample] = rg.var(init=lambda: [])
Use a one-argument callable when the initial value depends on the node
instance.
The argument receives the concrete node instance; self is just the
recommended spelling:
from typing import cast
class MediaSession(rg.Node):
def __init__(
self,
*,
initial_buffer: float = 10.0,
name: str | None = None,
) -> None:
super().__init__(name=name)
self.initial_buffer = initial_buffer
class State(rg.NodeState):
buffer_seconds: float = rg.var(
init=lambda self: cast(MediaSession, self).initial_buffer,
)
short_start = MediaSession(initial_buffer=2.0)
full_start = MediaSession(initial_buffer=10.0)
Overriding initial state for a run
initial_state can override declared initial values for a single run
without mutating the node declaration.
system.reset(
initial_state={
MediaSession.State.buffer_seconds: 5.0,
BitrateController.State.value: 720,
}
)
Once a system is instantiated, compilation has already resolved all state variable
paths and can tell you which state variables define the initial state surface.
Use system.compile_report.minimal_initial_state_vars to inspect the state variables
that actually need initial values for this compiled graph, and
system.compile_report.required_initial_state_vars to inspect the state variables
that must be supplied because no declaration provides an initial value.
system = build_system()
report = system.compile_report
print(report.minimal_initial_state_vars)
print(report.required_initial_state_vars)
If required_initial_state_vars is non-empty, provide those values in
initial_state before running or resetting the system.
See Compile report for inspecting the
compiled initial-state requirements and Reset for
the runtime reset behavior.
Run methods
update computes state variables for one execution of the node.
The canonical form receives the input namespace and returns the state variable
namespace:
No-input nodes may write update(self).
Compact nodes may declare inputs directly on update; this is useful for small
nodes with one or two inputs:
class TickCounter(rg.Node):
class State(rg.NodeState):
tick: int = rg.var(init=0)
def update(
self,
tick: int = rg.src(lambda: TickCounter.State.tick),
) -> State:
return self.State(tick=tick + 1)
Do not mix compact update inputs with a NodeInputs namespace in the same
node.
System clock inputs
Every rg.PhasedReactiveSystem has a built-in system clock. It is not a
Node, is not listed in any phase, and Clock is a reserved node name.
Read it through rg.Clock.tick and rg.Clock.time:
class Network(rg.Node):
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
time: float = rg.src(rg.Clock.time)
Clock.tick is the integer tick index. Clock.time is the physical time
computed from tick * base_dt for discrete-only systems, or from the latest
continuous integration boundary when the tick contains continuous dynamics.
The clock can also be used in guards, for example
rg.If(rg.V(rg.Clock.tick) >= 10, rg.terminate).
Mutating inputs
Inputs are normal Python objects.
If an input value is mutable, update receives a reference to that object.
The video player's Logger intentionally uses this for its own persistent
history: it reads its own previous history, appends one record, and writes
the same list back as the next state variable value.
class Logger(rg.Node):
class Inputs(rg.NodeInputs):
history: list[Logger.Sample] = rg.src(lambda: Logger.State.history)
class State(rg.NodeState):
history: list[Logger.Sample] = rg.var(init=lambda: [])
def update(self, inputs: Inputs) -> State:
inputs.history.append(record)
return self.State(history=inputs.history)
Even in this in-place pattern, update must still return an State object.
The returned state namespace is the only supported way to commit writes back
to system state.
This pattern is acceptable when the input is a self-read: the node is
mutating its own carried state.
Do not mutate inputs that belong to another node.
That creates hidden side effects outside the state variable contract, and Python
cannot reliably catch it at compile time.
Treat foreign inputs as read-only; mutating them inside update is against the
style guide.
Instances and names
Each node instance has a name. The name is used in state paths, compile reports, and snapshots.
If no name is passed, the class name is used.
Implicit duplicates are deduplicated automatically (Plant, Plant_2, ...).
Explicit duplicate names are compile errors.
Full file: examples/video_player/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
import regelum as rg
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(rg.Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
class State(rg.NodeState):
bandwidth_kbps: float = rg.var(init=float(TOP_BITRATE_KBPS))
def update(self, inputs: Inputs) -> State:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.State(bandwidth_kbps=value)
class QualityPolicy(rg.Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(rg.NodeInputs):
buffer_seconds: float = rg.src(lambda: MediaSession.State.buffer_seconds)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
class State(rg.NodeState):
stalling: bool = rg.var(init=False)
def update(self, inputs: Inputs) -> State:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.State(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.State(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(rg.Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(rg.NodeInputs):
current: int = rg.src(lambda: BitrateController.State.value)
class State(rg.NodeState):
value: int = rg.var(init=TOP_BITRATE_KBPS)
def update(self, inputs: Inputs) -> State:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.State(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(rg.Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(rg.NodeInputs):
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
class State(rg.NodeState):
fetched_seconds: float
def update(self, inputs: Inputs) -> State:
bitrate = max(inputs.bitrate_kbps, 1)
return self.State(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(rg.Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(rg.NodeInputs):
previous: float = rg.src(lambda: MediaSession.State.buffer_seconds)
fetched: float = rg.src(Decoder.State.fetched_seconds)
class State(rg.NodeState):
buffer_seconds: float = rg.var(init=10.0)
def update(self, inputs: Inputs) -> State:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.State(buffer_seconds=max(0.0, next_buffer))
class Logger(rg.Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(rg.NodeInputs):
tick: int = rg.src(rg.Clock.tick)
bandwidth_kbps: float = rg.src(Network.State.bandwidth_kbps)
bitrate_kbps: int = rg.src(lambda: BitrateController.State.value)
buffer_seconds: float = rg.src(lambda: MediaSession.State.buffer_seconds)
stalling: bool = rg.src(QualityPolicy.State.stalling)
history: list["Logger.Sample"] = rg.src(lambda: Logger.State.history)
class State(rg.NodeState):
history: list["Logger.Sample"] = rg.var(init=lambda: [])
def update(self, inputs: Inputs) -> State:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.State(history=inputs.history)
def build_system() -> rg.PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return rg.PhasedReactiveSystem(
phases=[
rg.Phase(
"measure",
nodes=(network,),
transitions=(rg.Goto("decide"),),
is_initial=True,
),
rg.Phase(
"decide",
nodes=(policy,),
transitions=(
rg.If(rg.V(policy.State.stalling), "drop_quality", name="stalling"),
rg.Else("play", name="healthy"),
),
),
rg.Phase(
"drop_quality",
nodes=(controller,),
transitions=(rg.Goto("play"),),
),
rg.Phase(
"play",
nodes=(decoder, session, logger),
transitions=(rg.Goto(rg.terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(rg.Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
Custom constructors should forward name to Node.
Rules
- A node class declares behavior and port shape.
- A node instance owns runtime identity and configuration.
updatemust always return the node state namespace.- Node instances, not node classes, are assigned to phases.
- Inputs read state variables.
- State define state values.
- Bare input annotations create unconnected inputs.
- Bare state variable annotations create state variables without initial values.
- Class-level references must resolve to exactly one producer.
- Use instance-bound references or
port(...).connect(...)to remove ambiguity. - Previous-state variables need initial values.
- Intermediate state variables may omit initial values.
- Mutable defaults should use callables.