Create, Compile, and Run a Phased Reactive System
Recap
By this point the video player has three layers of declarations:
- node instances such as
Network,QualityPolicy, andMediaSession; - phases such as
measure,decide,drop_quality, andplay; - transitions that say how one phase hands control to the next.
The running example: an adaptive-bitrate video player
The player samples network bandwidth, decides whether the current bitrate is sustainable, optionally lowers quality, and then plays the next chunk. The buffer and bitrate persist across ticks, so the next tick starts from the state left by the previous one.
flowchart LR
init([init]) --> measure
measure --> decide[decide]
decide -->|healthy| play
decide -->|stalling| drop_quality
drop_quality --> play
play --> done([⊥])
classDef measure fill:#2f6fed22,stroke:#2f6fed;
classDef decide fill:#7c3aed22,stroke:#7c3aed;
classDef dropQuality fill:#d9770622,stroke:#d97706;
classDef play fill:#15803d22,stroke:#15803d;
class measure measure;
class decide decide;
class drop_quality dropQuality;
class play play;
| Phase | Nodes | What happens |
|---|---|---|
| measure | Network |
Sample the current bandwidth from the system clock. |
| decide | QualityPolicy |
Compare projected drain rate against the buffer; set stalling. |
| drop_quality | BitrateController |
Drop the target bitrate by one rung. |
| play | Decoder, MediaSession, Logger |
Compute downloaded seconds, integrate the buffer, log. |
At node level, the same model looks like this.
Solid arrows show that one node reads another node's output; dashed
arrows from state show self-reads, where a node reads its own output from
the previous tick.
The node colors correspond to the phase colors in the table above:
flowchart LR
network["Network"]
policy["QualityPolicy"]
controller["BitrateController"]
decoder["Decoder"]
session["MediaSession"]
logger["Logger"]
controller_state(("state"))
session_state(("state"))
logger_state(("state"))
network --> policy
network --> decoder
network --> logger
controller --> policy
controller --> decoder
controller --> logger
decoder --> session
session --> logger
session --> policy
policy --> logger
controller_state -.-> controller
session_state -.-> session
logger_state -.-> logger
classDef measure fill:#2f6fed22,stroke:#2f6fed;
classDef decide fill:#7c3aed22,stroke:#7c3aed;
classDef dropQuality fill:#d9770622,stroke:#d97706;
classDef play fill:#15803d22,stroke:#15803d;
classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;
class network measure;
class policy decide;
class controller dropQuality;
class decoder,session,logger play;
class controller_state,session_state,logger_state state;
Full code listing: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.
Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality? If yes, the short branch just plays the
next chunk. If no, the long branch first lowers the target bitrate, then
plays. Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.
Phase graph::
measure (init) -> decide -+-[healthy]--> play -> bottom
|
+-[stalling]-> drop_quality -> play -> bottom
"""
from __future__ import annotations
from regelum import (
Clock,
Else,
Goto,
If,
Input,
Node,
NodeInputs,
NodeOutputs,
Output,
Phase,
PhasedReactiveSystem,
V,
terminate,
)
TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0
class Network(Node):
"""Stochastic-looking but deterministic bandwidth model.
Drops to a slow link in the middle of the run so the policy has to react,
then recovers so the buffer can refill. The schedule is fixed to keep the
example reproducible.
"""
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
class Outputs(NodeOutputs):
bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))
def run(self, inputs: Inputs) -> Outputs:
if inputs.tick < 6:
value = 2400.0
elif inputs.tick < 14:
value = 600.0
elif inputs.tick < 22:
value = 1100.0
else:
value = 2400.0
return self.Outputs(bandwidth_kbps=value)
class QualityPolicy(Node):
"""Decides whether the player is about to stall.
The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
lost per wall-second. If buffered seconds will not survive
``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
"""
class Inputs(NodeInputs):
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
class Outputs(NodeOutputs):
stalling: bool = Output(initial=False)
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
if drain <= 0.0:
return self.Outputs(stalling=False)
time_to_empty = inputs.buffer_seconds / drain
return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)
class BitrateController(Node):
"""Owns the current target bitrate. Drops one rung when invoked."""
class Inputs(NodeInputs):
current: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
value: int = Output(initial=TOP_BITRATE_KBPS)
def run(self, inputs: Inputs) -> Outputs:
try:
index = BITRATE_LADDER_KBPS.index(inputs.current)
except ValueError:
index = len(BITRATE_LADDER_KBPS) - 1
next_index = max(0, index - 1)
return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])
class Decoder(Node):
"""Models how many seconds of video can be downloaded in one tick.
With ``bandwidth_kbps`` of throughput and a video encoded at
``bitrate_kbps``, one wall-second of downloading produces
``bandwidth / bitrate`` seconds of playable content.
"""
class Inputs(NodeInputs):
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
class Outputs(NodeOutputs):
fetched_seconds: float
def run(self, inputs: Inputs) -> Outputs:
bitrate = max(inputs.bitrate_kbps, 1)
return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)
class MediaSession(Node):
"""The plant. Buffer fills with newly fetched video, drains with playback."""
class Inputs(NodeInputs):
previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
fetched: float = Input(source=Decoder.Outputs.fetched_seconds)
class Outputs(NodeOutputs):
buffer_seconds: float = Output(initial=10.0)
def run(self, inputs: Inputs) -> Outputs:
next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
return self.Outputs(buffer_seconds=max(0.0, next_buffer))
class Logger(Node):
"""Appends a per-tick record so the trajectory is visible after the run."""
Sample = tuple[int, float, int, float, bool]
class Inputs(NodeInputs):
tick: int = Input(source=Clock.tick)
bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)
class Outputs(NodeOutputs):
history: list["Logger.Sample"] = Output(initial=lambda: [])
def run(self, inputs: Inputs) -> Outputs:
record: Logger.Sample = (
inputs.tick,
inputs.bandwidth_kbps,
inputs.bitrate_kbps,
inputs.buffer_seconds,
inputs.stalling,
)
inputs.history.append(record)
return self.Outputs(history=inputs.history)
def build_system() -> PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return PhasedReactiveSystem(
phases=[
Phase(
"measure",
nodes=(network,),
transitions=(Goto("decide"),),
is_initial=True,
),
Phase(
"decide",
nodes=(policy,),
transitions=(
If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
Else("play", name="healthy"),
),
),
Phase(
"drop_quality",
nodes=(controller,),
transitions=(Goto("play"),),
),
Phase(
"play",
nodes=(decoder, session, logger),
transitions=(Goto(terminate),),
),
],
)
def main() -> None:
system = build_system()
print(f"compile_ok = {system.compile_report.ok}")
print(
"phase schedules: "
+ " | ".join(
f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
)
)
print()
print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
print("-----+----------+---------+-----------+--------+----------------------")
for _ in range(30):
records = system.step()
path_phases: list[str] = []
for record in records:
if not path_phases or path_phases[-1] != record.phase:
path_phases.append(record.phase)
path = " -> ".join(path_phases)
snapshot = system.snapshot()
print(
f"{system.read(Clock.tick):4d} | "
f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
f"{snapshot['BitrateController.value']:7d} | "
f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
f"{str(snapshot['QualityPolicy.stalling']):>6} | "
f"{path}"
)
if __name__ == "__main__":
main()
Those declarations are still just Python objects.
To make them executable, wrap the phase list in rg.PhasedReactiveSystem.
System creation compiles the declarations into a concrete runtime model:
references are resolved, nodes are scheduled, transition targets are checked,
and the initial-state requirements are computed.
import regelum as rg
def build_system() -> rg.PhasedReactiveSystem:
network = Network()
policy = QualityPolicy()
controller = BitrateController()
decoder = Decoder()
session = MediaSession()
logger = Logger()
return rg.PhasedReactiveSystem(
phases=[
rg.Phase(
"measure",
nodes=(network,),
transitions=(rg.Goto("decide"),),
is_initial=True,
),
rg.Phase(
"decide",
nodes=(policy,),
transitions=(
rg.If(
rg.V(policy.Outputs.stalling),
"drop_quality",
name="stalling",
),
rg.Else("play", name="healthy"),
),
),
rg.Phase(
"drop_quality",
nodes=(controller,),
transitions=(rg.Goto("play"),),
),
rg.Phase(
"play",
nodes=(decoder, session, logger),
transitions=(rg.Goto(rg.terminate),),
),
],
)
rg.PhasedReactiveSystem is the object you keep and run.
It owns the compiled phase graph, the current runtime state, and the compile
report.
Creating a system
The constructor receives the phases that define the system.
During construction, regelum compiles the system.
If compilation succeeds, the returned object is ready for step(), run(),
snapshot(), read(...), and reset(...).
If compilation fails, the default behavior is to raise CompileError.
Use strict=False when you want to inspect a broken system instead of raising
immediately:
system = rg.PhasedReactiveSystem(
phases=phases,
strict=False,
)
print(system.compile_report.issues)
What compilation resolves
Compilation resolves:
- node names;
- input sources;
- instance connections;
- output paths;
- phase targets;
- guard references;
- phase schedules;
- dependency edges;
- required initial outputs.
For the video player, the report's phase_schedules shows the topologically
ordered nodes per phase and minimal_initial_outputs lists the six outputs
that need initial state
(Network.bandwidth_kbps, BitrateController.value,
QualityPolicy.stalling, MediaSession.buffer_seconds, Logger.history).
Compile report
Every rg.PhasedReactiveSystem stores a compile_report.
Read it before debugging runtime behavior; it tells you what the constructor
resolved and what it rejected.
system = build_system()
report = system.compile_report
print(report.ok)
print(report.issues)
print(report.warnings)
print(report.phase_schedules)
print(report.minimal_initial_outputs)
print(report.required_initial_outputs)
minimal_initial_outputs is the smallest set of outputs that must have a
tick-zero value for this compiled graph.
Those values may come from rg.Output(initial=...), from a callable
initializer, or from a runtime initial_state override.
required_initial_outputs is the subset that still has no declared initial
value.
If it is non-empty, the system is telling you exactly which outputs must be
provided before execution.
You can use that list to build an initial_state mapping:
missing = system.compile_report.required_initial_outputs
print(missing)
system.reset(
initial_state={
MediaSession.Outputs.buffer_seconds: 5.0,
BitrateController.Outputs.value: 720,
}
)
In the video player, required_initial_outputs is empty because every
required tick-zero output has either a static initial value or a callable
initializer.
Use format() for a compact text report.
Common compile issues
Typical issues include:
- input source is not connected;
- input source is unknown;
- class-level reference is ambiguous;
- output path is duplicated;
- explicit node names are duplicated;
- output without initial value is read too early;
- phase graph is incomplete;
- transition target is unknown;
- transition chain is malformed.
C1 and C3 checks
Compilation rejects cyclic dependency graphs inside a phase.
This is the C1 check.
For the video player, the only non-trivial intra-phase dependency chain is
Decoder → MediaSession → Logger in play, which is acyclic.
For finite output domains, compilation also checks C3 by requiring exactly one
enabled transition per sampled state.
The branching in decide is If(V(QualityPolicy.Outputs.stalling),
"drop_quality") plus Else("play"), with stalling: bool — boolean has a
finite domain, so C3 is verified statically.
Runtime
After construction, the same rg.PhasedReactiveSystem object is the runtime
handle.
Runtime executes the compiled phase schedules and updates system state.
It does not reinterpret declarations on every step.
The tick
A tick walks the phase graph from the initial phase until a transition reaches
terminate.
For the video player, a healthy tick visits measure → decide → play; a
stalling tick visits measure → decide → drop_quality → play.
The feedback loop closes between ticks: MediaSession.buffer_seconds written
in play of tick N is read by QualityPolicy in decide of tick N+1, and
that read is what selects the branch.
Every system also has a built-in rg.Clock.
Clock.tick is incremented after the whole tick terminates.
In a discrete-only system, Clock.time advances with the tick by base_dt.
When a tick contains a continuous phase, Clock.time advances immediately
after that continuous phase, so later phases in the same tick can observe the
new physical time.
See Continuous dynamics for the ODE resolution rules.
Step order
One step() starts at the initial phase and follows transitions until the
tick terminates.
For each phase:
- run active nodes in the compiled schedule;
- build each node input namespace;
- call
run; - normalize returned outputs;
- write outputs into state;
- choose the next phase from transitions.
Each record contains the phase, node, inputs, and outputs. A 30-tick run of the player produces records like:
for record in records:
print(record.phase, record.node, record.outputs)
# measure Network {'bandwidth_kbps': 600.0}
# decide QualityPolicy {'stalling': False}
# play Decoder {'fetched_seconds': 0.278}
# play MediaSession {'buffer_seconds': 9.11}
# play Logger {'history': [...]}
Running multiple ticks
Use run(steps=...) to execute several ticks.
Each tick starts from the initial phase again.
State persists across ticks unless reset() is called.
State access
Use snapshot() to inspect current state.
It returns user node outputs and committed ODE state values; system clock
fields are read explicitly.
snapshot = system.snapshot()
print(snapshot["MediaSession.buffer_seconds"])
print(snapshot["BitrateController.value"])
Use read(...) when code has an output reference.
buffer = system.read(session.Outputs.buffer_seconds)
tick = system.read(rg.Clock.tick)
time = system.read(rg.Clock.time)
Reset
reset() clears runtime state and history.
It then applies declared initial values and optional overrides.
Logging nodes
A logger is just another node.
It sees the values available at the point where its scheduled phase runs.
The video player puts Logger last in play so it observes the buffer
update from MediaSession and the freshly committed bitrate.
Rules
- Read the compile report before debugging runtime behavior.
- Create systems with
rg.PhasedReactiveSystem(phases=[...]). - Use
strict=Falsefor diagnostics. - Resolve ambiguous class references with instance connections.
- Add initial values only for outputs that must exist before execution.
- Runtime follows compiled phase schedules.
- State persists between ticks.
reset()clears state and history.step()returns execution records.snapshot()returns user-visible state; useread(rg.Clock.time)for clock fields.