Skip to content

Phases

Recap

In the overview we took the adaptive-bitrate video player and cut its feedback loop into ticks. Each tick enters the phase graph at init, visits one or more named phases, and leaves at .

The running example: an adaptive-bitrate video player

The player samples network bandwidth, decides whether the current bitrate is sustainable, optionally lowers quality, and then plays the next chunk. The buffer and bitrate persist across ticks, so the next tick starts from the state left by the previous one.

flowchart LR
    init([init]) --> measure
    measure --> decide[decide]
    decide -->|healthy| play
    decide -->|stalling| drop_quality
    drop_quality --> play
    play --> done([⊥])

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;

    class measure measure;
    class decide decide;
    class drop_quality dropQuality;
    class play play;

The graph has four phases:

Phase Nodes What happens
measure Network Sample the current bandwidth from the system clock.
decide QualityPolicy Compare projected drain rate against the buffer; set stalling.
drop_quality BitrateController Drop the target bitrate by one rung.
play Decoder, MediaSession, Logger Compute downloaded seconds, integrate the buffer, log.

At node level, the same model looks like this. Solid arrows show that one node reads another node's output; dashed arrows from state show self-reads, where a node reads its own output from the previous tick. The node colors correspond to the phase colors in the table above:

flowchart LR
    network["Network"]
    policy["QualityPolicy"]
    controller["BitrateController"]
    decoder["Decoder"]
    session["MediaSession"]
    logger["Logger"]
    controller_state(("state"))
    session_state(("state"))
    logger_state(("state"))

    network --> policy
    network --> decoder
    network --> logger
    controller --> policy
    controller --> decoder
    controller --> logger
    decoder --> session
    session --> logger
    session --> policy
    policy --> logger
    controller_state -.-> controller
    session_state -.-> session
    logger_state -.-> logger

    classDef measure fill:#2f6fed22,stroke:#2f6fed;
    classDef decide fill:#7c3aed22,stroke:#7c3aed;
    classDef dropQuality fill:#d9770622,stroke:#d97706;
    classDef play fill:#15803d22,stroke:#15803d;
    classDef state fill:#94a3b822,stroke:#94a3b8,stroke-dasharray:3 3;

    class network measure;
    class policy decide;
    class controller dropQuality;
    class decoder,session,logger play;
    class controller_state,session_state,logger_state state;
Full code listing: examples/video_player.py
"""Adaptive-bitrate video player as a feedback control loop.

Each tick the player asks one question: do I have enough buffered video to
keep playing at the current quality?  If yes, the short branch just plays the
next chunk.  If no, the long branch first lowers the target bitrate, then
plays.  Buffer and bitrate persist across ticks, so the branching pattern is
driven by the closed loop between the network, the buffer, and the policy.

Phase graph::

    measure (init) -> decide -+-[healthy]--> play -> bottom
                              |
                              +-[stalling]-> drop_quality -> play -> bottom
"""

from __future__ import annotations

from regelum import (
    Clock,
    Else,
    Goto,
    If,
    Input,
    Node,
    NodeInputs,
    NodeOutputs,
    Output,
    Phase,
    PhasedReactiveSystem,
    V,
    terminate,
)

TICK_DT_SECONDS = 1.0
BITRATE_LADDER_KBPS = (240, 480, 720, 1080, 2160)
TOP_BITRATE_KBPS = BITRATE_LADDER_KBPS[-1]
STALL_HORIZON_SECONDS = 4.0


class Network(Node):
    """Stochastic-looking but deterministic bandwidth model.

    Drops to a slow link in the middle of the run so the policy has to react,
    then recovers so the buffer can refill.  The schedule is fixed to keep the
    example reproducible.
    """

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)

    class Outputs(NodeOutputs):
        bandwidth_kbps: float = Output(initial=float(TOP_BITRATE_KBPS))

    def run(self, inputs: Inputs) -> Outputs:
        if inputs.tick < 6:
            value = 2400.0
        elif inputs.tick < 14:
            value = 600.0
        elif inputs.tick < 22:
            value = 1100.0
        else:
            value = 2400.0
        return self.Outputs(bandwidth_kbps=value)


class QualityPolicy(Node):
    """Decides whether the player is about to stall.

    The estimated drain rate is ``1 - bandwidth / bitrate`` seconds of video
    lost per wall-second.  If buffered seconds will not survive
    ``STALL_HORIZON_SECONDS`` at that drain rate, mark the tick as stalling.
    """

    class Inputs(NodeInputs):
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)

    class Outputs(NodeOutputs):
        stalling: bool = Output(initial=False)

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        drain = max(0.0, 1.0 - inputs.bandwidth_kbps / bitrate)
        if drain <= 0.0:
            return self.Outputs(stalling=False)
        time_to_empty = inputs.buffer_seconds / drain
        return self.Outputs(stalling=time_to_empty < STALL_HORIZON_SECONDS)


class BitrateController(Node):
    """Owns the current target bitrate.  Drops one rung when invoked."""

    class Inputs(NodeInputs):
        current: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        value: int = Output(initial=TOP_BITRATE_KBPS)

    def run(self, inputs: Inputs) -> Outputs:
        try:
            index = BITRATE_LADDER_KBPS.index(inputs.current)
        except ValueError:
            index = len(BITRATE_LADDER_KBPS) - 1
        next_index = max(0, index - 1)
        return self.Outputs(value=BITRATE_LADDER_KBPS[next_index])


class Decoder(Node):
    """Models how many seconds of video can be downloaded in one tick.

    With ``bandwidth_kbps`` of throughput and a video encoded at
    ``bitrate_kbps``, one wall-second of downloading produces
    ``bandwidth / bitrate`` seconds of playable content.
    """

    class Inputs(NodeInputs):
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)

    class Outputs(NodeOutputs):
        fetched_seconds: float

    def run(self, inputs: Inputs) -> Outputs:
        bitrate = max(inputs.bitrate_kbps, 1)
        return self.Outputs(fetched_seconds=inputs.bandwidth_kbps / bitrate * TICK_DT_SECONDS)


class MediaSession(Node):
    """The plant.  Buffer fills with newly fetched video, drains with playback."""

    class Inputs(NodeInputs):
        previous: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        fetched: float = Input(source=Decoder.Outputs.fetched_seconds)

    class Outputs(NodeOutputs):
        buffer_seconds: float = Output(initial=10.0)

    def run(self, inputs: Inputs) -> Outputs:
        next_buffer = inputs.previous + inputs.fetched - TICK_DT_SECONDS
        return self.Outputs(buffer_seconds=max(0.0, next_buffer))


class Logger(Node):
    """Appends a per-tick record so the trajectory is visible after the run."""

    Sample = tuple[int, float, int, float, bool]

    class Inputs(NodeInputs):
        tick: int = Input(source=Clock.tick)
        bandwidth_kbps: float = Input(source=Network.Outputs.bandwidth_kbps)
        bitrate_kbps: int = Input(source=lambda: BitrateController.Outputs.value)
        buffer_seconds: float = Input(source=lambda: MediaSession.Outputs.buffer_seconds)
        stalling: bool = Input(source=QualityPolicy.Outputs.stalling)
        history: list["Logger.Sample"] = Input(source=lambda: Logger.Outputs.history)

    class Outputs(NodeOutputs):
        history: list["Logger.Sample"] = Output(initial=lambda: [])

    def run(self, inputs: Inputs) -> Outputs:
        record: Logger.Sample = (
            inputs.tick,
            inputs.bandwidth_kbps,
            inputs.bitrate_kbps,
            inputs.buffer_seconds,
            inputs.stalling,
        )
        inputs.history.append(record)
        return self.Outputs(history=inputs.history)


def build_system() -> PhasedReactiveSystem:
    network = Network()
    policy = QualityPolicy()
    controller = BitrateController()
    decoder = Decoder()
    session = MediaSession()
    logger = Logger()

    return PhasedReactiveSystem(
        phases=[
            Phase(
                "measure",
                nodes=(network,),
                transitions=(Goto("decide"),),
                is_initial=True,
            ),
            Phase(
                "decide",
                nodes=(policy,),
                transitions=(
                    If(V(policy.Outputs.stalling), "drop_quality", name="stalling"),
                    Else("play", name="healthy"),
                ),
            ),
            Phase(
                "drop_quality",
                nodes=(controller,),
                transitions=(Goto("play"),),
            ),
            Phase(
                "play",
                nodes=(decoder, session, logger),
                transitions=(Goto(terminate),),
            ),
        ],
    )


def main() -> None:
    system = build_system()
    print(f"compile_ok = {system.compile_report.ok}")
    print(
        "phase schedules: "
        + " | ".join(
            f"{name}={schedule}" for name, schedule in system.compile_report.phase_schedules.items()
        )
    )
    print()
    print("tick | bw(kbps) | bitrate | buffer(s) | stall? | path")
    print("-----+----------+---------+-----------+--------+----------------------")
    for _ in range(30):
        records = system.step()
        path_phases: list[str] = []
        for record in records:
            if not path_phases or path_phases[-1] != record.phase:
                path_phases.append(record.phase)
        path = " -> ".join(path_phases)
        snapshot = system.snapshot()
        print(
            f"{system.read(Clock.tick):4d} | "
            f"{snapshot['Network.bandwidth_kbps']:8.0f} | "
            f"{snapshot['BitrateController.value']:7d} | "
            f"{snapshot['MediaSession.buffer_seconds']:9.2f} | "
            f"{str(snapshot['QualityPolicy.stalling']):>6} | "
            f"{path}"
        )


if __name__ == "__main__":
    main()

This page zooms in on how those phases are declared: which node instances belong to each phase, where a tick starts, how nodes inside a phase are scheduled, and how transitions choose the next phase.

What a phase is

A phase is defined by two things:

  • nodes — the node instances that are active together;
  • transitions — the outgoing edges that choose the next phase, or terminate the current tick.

The nodes inside one phase must form a DAG with respect to their input/output dependencies. This is what lets the compiler resolve a deterministic topological order for the phase. Across the whole system there must also be exactly one initial phase, so every tick has a single unambiguous entry point.

When a tick runs, it starts at the initial phase, executes that phase's nodes, and then evaluates the phase's transitions. Exactly one effective transition should be selected: zero true transitions means the tick has nowhere to go, and more than one true transition means the next phase is ambiguous. With symbolic predicates, regelum can inspect the phase graph during compilation and report structural problems before runtime where possible.

The video player has four phases — measure, decide, drop_quality, and play. measure and play carry several nodes each; decide and drop_quality each hold one.

Phase API

A phase declaration has three main parts:

  • a name, such as "measure" or "play";
  • nodes, a tuple of node instances that run together in that phase;
  • transitions, a tuple of transition rules that choose what happens after the phase has executed.

It may also set is_initial=True. The initial phase is where every tick enters the phase graph. For the player, that is measure: a tick begins by sampling the network before any decision is taken.

Phase names must be unique inside one PhasedReactiveSystem, because transitions target phases by name. Node instances also have runtime names, either explicit or derived by regelum, and those names must be unique in the compiled system so diagnostics and state paths are unambiguous.

Exactly one initial phase

Exactly one phase in a PhasedReactiveSystem must be marked is_initial=True. There is no fallback to the first phase, and more than one initial phase would make tick entry ambiguous.

All nodes live in phases

Every node instance that belongs to the system must be listed in at least one phase. The system's node set is derived from phase declarations. The same node instance may be reused in multiple phases when it should run in more than one part of the tick.

rg.Phase(
    "measure",
    nodes=(network,),
    transitions=(rg.Goto("decide"),),
    is_initial=True,
)

Here nodes=(network,) is the set of node instances active in the measure phase. transitions=(rg.Goto("decide"),) is the phase's transition rule set: in this case, the rule is unconditional and sends control to decide after measure finishes.

Transitions

Transitions choose the next phase after the current phase has executed. They may also terminate the current tick.

The video player uses symbolic predicates for the branching decision, plus unconditional Goto transitions for the linear segments and Goto(terminate) to end the tick.

Goto

Goto is an unconditional transition. It says where control goes after the current phase has finished. There is no predicate to evaluate: if the phase reaches this transition, the transition is taken.

Pass one of three target forms to Goto:

  • a phase name, such as "decide";
  • a phase instance, when you already have the target object;
  • rg.terminate, which ends the current tick.
rg.Phase(
    "measure",
    nodes=(network,),
    transitions=(rg.Goto("decide"),),  # (1)
    is_initial=True,
)

rg.Phase(
    "play",
    nodes=(decoder, session, logger),
    transitions=(rg.Goto(rg.terminate),),  # (2)
)
  1. rg.Goto("decide") sends control to the phase named "decide" after measure has executed. The same target could also be passed as a phase instance when the object is already available.
  2. rg.Goto(rg.terminate) does not choose another phase. It marks the current tick as complete.

Branch chains

For branching, regelum uses ordinary if / elif / else semantics. If starts a branch chain. Elif continues the current chain. Else closes the current chain and is taken only when none of the previous predicates in that chain were true. ElseIf is also available as an alias for Elif.

Every If and Elif transition takes a predicate as its first argument. This predicate is the transition's guard. A guard is a boolean condition over system variables: after the current phase has executed, it evaluates to either True or False. If the guard is True, that transition is enabled.

rg.If(predicate, target, name=None)
rg.Elif(predicate, target, name=None)
  • target is the phase to enter when the guard is true: a phase name, a phase instance, or rg.terminate;
  • name is optional and gives the transition a stable diagnostic label.

The point of the guard is to make the outgoing edge explicit. For the video player, the question is: did the policy detect that playback is stalling? If yes, the tick should enter drop_quality. If no, the tick should continue to play.

The video player only needs If plus Else, because the policy publishes a single boolean:

rg.Phase(
    "decide",
    nodes=(policy,),
    transitions=(
        rg.If(rg.V(policy.Outputs.stalling), "drop_quality", name="stalling"),
        rg.Else("play", name="healthy"),
    ),
)

policy.Outputs.stalling is an output reference, not the current boolean value. rg.V(policy.Outputs.stalling) means: read this output value when the transition is evaluated.

Expressions that contain rg.V(...) build symbolic guards under the hood. Use this form for transition guards by default: if a guard depends on a node output, wrap that output reference with rg.V(...). Symbolic guards can also compare output values or terminate a tick:

rg.If(rg.V(MediaSession.Outputs.buffer_seconds) < 2.0, "buffer_warning")
rg.If(rg.V(policy.Outputs.force_stop), rg.terminate, name="force_stop")

Keeping guards symbolic is what lets regelum reason about transitions during graph compilation instead of treating them as opaque Python code. The compiler can see which outputs the guard depends on and can check whether the transition graph has structural problems, such as an ambiguous next phase or a path that may fail to terminate where this can be proven. Under the hood, this analysis is backed by Z3, an SMT solver.

V(...) accepts the same kinds of output references as inputs:

  • output descriptors, such as rg.V(MediaSession.Outputs.buffer_seconds);
  • instance-bound output ports, such as rg.V(session.Outputs.buffer_seconds);
  • lazy callables, such as rg.V(lambda: MediaSession.Outputs.buffer_seconds);
  • string references, such as rg.V("MediaSession.buffer_seconds").

Enum outputs can be compared directly inside symbolic predicates:

from enum import Enum


class PlaybackMode(Enum):
    PLAYING = "playing"
    PAUSED = "paused"


rg.If(rg.V(state.Outputs.mode) == PlaybackMode.PLAYING, "play")

Python callables, including lambdas, can also be used as predicates when the guard cannot be expressed symbolically:

rg.If(lambda state: state.buffer_seconds < 2.0, "buffer_warning")

Callable predicates are an escape hatch. Prefer symbolic V(...) predicates when possible, because they keep guard dependencies visible to the compiler and produce clearer diagnostics.

A larger system might extend the chain with Elif:

transitions = (
    rg.If(rg.V(policy.Outputs.stalling), "drop_quality"),
    rg.Elif(rg.V(policy.Outputs.healthy_steady), "upgrade_quality"),
    rg.Else("play"),
)

Elif after Else is invalid, because Else closes the current chain.

A phase may contain several independent If chains. Compilation checks that the transition structure is well formed, and runtime evaluates effective transitions in order.

For example, this transition tuple contains three chains: a single If, an If/Elif chain, and another single If.

transitions = (
    rg.If(rg.V(network.Outputs.disconnected), "pause", name="offline"),
    rg.If(rg.V(policy.Outputs.stalling), "drop_quality", name="stalling"),
    rg.Elif(rg.V(policy.Outputs.can_upgrade), "upgrade_quality", name="upgrade"),
    rg.If(rg.V(logger.Outputs.should_flush), "flush_logs", name="flush_logs"),
)

Each If starts a chain. Each Elif continues the chain that immediately precedes it. If guards are written with rg.V(...), the compiler can see the guard dependencies and verify the transition structure. At execution time, the effective transitions should resolve to exactly one selected next step; zero or multiple selected transitions make the next phase ambiguous.

Avoid multiple Else branches in one transition tuple

A tuple like this is hard to reason about:

transitions = (
    rg.If(rg.V(network.Outputs.disconnected), "pause", name="offline"),
    rg.Else("decide_quality", name="online"),

    rg.If(rg.V(policy.Outputs.stalling), "drop_quality", name="stalling"),
    rg.Else("play", name="healthy"),
)

Each Else closes the chain immediately before it, but multiple Else branches in one phase usually hide the control flow rather than clarify it. Prefer explicit If / Elif chains and keep the phase's transition rules easy to audit.

Rules

  • Phases list node instances, not classes.
  • A system is defined by its phases.
  • Exactly one initial phase is expected.
  • Each phase is scheduled topologically.
  • Phase coverage must include input and guard producers.
  • Use Goto for unconditional jumps.
  • Use If / Elif / Else for ordered branches.
  • Use rg.terminate to end a tick.
  • Prefer symbolic V(...) predicates.