Skip to content

orchestrator

View module diagram

Orchestrator package — async graph scheduling over task and workstream runtimes.

Re-exports all public names so that from agentrelay.orchestrator import X continues to work after promotion from a single module to a package.

TaskRuntimeBuilder

Builder for initializing per-task runtime envelopes from a task graph.

Source code in src/agentrelay/orchestrator/builders.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class TaskRuntimeBuilder:
    """Builder for initializing per-task runtime envelopes from a task graph."""

    @classmethod
    def from_graph(cls, graph: TaskGraph) -> dict[str, TaskRuntime]:
        """Build initial runtimes for all tasks in a graph.

        Runtimes are returned in the graph's stable topological task order.
        Each runtime starts with default mutable state/artifacts and no agent.

        Args:
            graph: Validated immutable task graph.

        Returns:
            dict[str, TaskRuntime]: Task runtimes keyed by task ID.
        """
        runtimes: dict[str, TaskRuntime] = {}
        for task_id in graph.task_ids():
            runtimes[task_id] = TaskRuntime(task=graph.task(task_id))
        return runtimes

    @classmethod
    def from_probe(cls, graph: TaskGraph, probe: GraphProbe) -> dict[str, TaskRuntime]:
        """Build runtimes pre-populated from a disk probe.

        Each runtime's :class:`TaskState` and :class:`TaskArtifacts` are
        populated from the corresponding :class:`TaskProbe` when the task's
        signal directory exists on disk.  Tasks that never started (no
        signal dir) get fresh default state equivalent to
        :meth:`from_graph`.

        ``integration_branch`` and ``workstream_worktree_path`` on
        ``TaskState`` are **not** set here — the orchestrator sets those
        at dispatch time from the corresponding workstream runtime.
        Transient per-attempt fields (``agent_address``, ``sandbox``,
        ``sandbox_context``) are also intentionally left at their
        defaults; they belong to a live agent session and are discarded
        across orchestrator restarts.

        Args:
            graph: Validated immutable task graph.
            probe: Result of :func:`probe_graph_state` for the same graph.

        Returns:
            dict[str, TaskRuntime]: Task runtimes keyed by task ID.
        """
        runtimes: dict[str, TaskRuntime] = {}
        for task_id in graph.task_ids():
            task = graph.task(task_id)
            task_probe = probe.task_probes[task_id]
            runtime = TaskRuntime(task=task)
            if task_probe.signal_dir.is_dir():
                runtime.state.signal_dir = task_probe.signal_dir
                runtime.state.branch_name = task_probe.branch_name
                runtime.state.attempt_num = task_probe.attempt_num
                runtime.artifacts.pr_url = task_probe.pr_url
            runtimes[task_id] = runtime
        return runtimes

from_graph(graph) classmethod

Build initial runtimes for all tasks in a graph.

Runtimes are returned in the graph's stable topological task order. Each runtime starts with default mutable state/artifacts and no agent.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required

Returns:

Type Description
dict[str, TaskRuntime]

dict[str, TaskRuntime]: Task runtimes keyed by task ID.

Source code in src/agentrelay/orchestrator/builders.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@classmethod
def from_graph(cls, graph: TaskGraph) -> dict[str, TaskRuntime]:
    """Build initial runtimes for all tasks in a graph.

    Runtimes are returned in the graph's stable topological task order.
    Each runtime starts with default mutable state/artifacts and no agent.

    Args:
        graph: Validated immutable task graph.

    Returns:
        dict[str, TaskRuntime]: Task runtimes keyed by task ID.
    """
    runtimes: dict[str, TaskRuntime] = {}
    for task_id in graph.task_ids():
        runtimes[task_id] = TaskRuntime(task=graph.task(task_id))
    return runtimes

from_probe(graph, probe) classmethod

Build runtimes pre-populated from a disk probe.

Each runtime's :class:TaskState and :class:TaskArtifacts are populated from the corresponding :class:TaskProbe when the task's signal directory exists on disk. Tasks that never started (no signal dir) get fresh default state equivalent to :meth:from_graph.

integration_branch and workstream_worktree_path on TaskState are not set here — the orchestrator sets those at dispatch time from the corresponding workstream runtime. Transient per-attempt fields (agent_address, sandbox, sandbox_context) are also intentionally left at their defaults; they belong to a live agent session and are discarded across orchestrator restarts.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required
probe GraphProbe

Result of :func:probe_graph_state for the same graph.

required

Returns:

Type Description
dict[str, TaskRuntime]

dict[str, TaskRuntime]: Task runtimes keyed by task ID.

Source code in src/agentrelay/orchestrator/builders.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@classmethod
def from_probe(cls, graph: TaskGraph, probe: GraphProbe) -> dict[str, TaskRuntime]:
    """Build runtimes pre-populated from a disk probe.

    Each runtime's :class:`TaskState` and :class:`TaskArtifacts` are
    populated from the corresponding :class:`TaskProbe` when the task's
    signal directory exists on disk.  Tasks that never started (no
    signal dir) get fresh default state equivalent to
    :meth:`from_graph`.

    ``integration_branch`` and ``workstream_worktree_path`` on
    ``TaskState`` are **not** set here — the orchestrator sets those
    at dispatch time from the corresponding workstream runtime.
    Transient per-attempt fields (``agent_address``, ``sandbox``,
    ``sandbox_context``) are also intentionally left at their
    defaults; they belong to a live agent session and are discarded
    across orchestrator restarts.

    Args:
        graph: Validated immutable task graph.
        probe: Result of :func:`probe_graph_state` for the same graph.

    Returns:
        dict[str, TaskRuntime]: Task runtimes keyed by task ID.
    """
    runtimes: dict[str, TaskRuntime] = {}
    for task_id in graph.task_ids():
        task = graph.task(task_id)
        task_probe = probe.task_probes[task_id]
        runtime = TaskRuntime(task=task)
        if task_probe.signal_dir.is_dir():
            runtime.state.signal_dir = task_probe.signal_dir
            runtime.state.branch_name = task_probe.branch_name
            runtime.state.attempt_num = task_probe.attempt_num
            runtime.artifacts.pr_url = task_probe.pr_url
        runtimes[task_id] = runtime
    return runtimes

WorkstreamRuntimeBuilder

Builder for initializing per-workstream runtime envelopes from a graph.

Source code in src/agentrelay/orchestrator/builders.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class WorkstreamRuntimeBuilder:
    """Builder for initializing per-workstream runtime envelopes from a graph."""

    @classmethod
    def from_graph(cls, graph: TaskGraph) -> dict[str, WorkstreamRuntime]:
        """Build initial runtimes for all workstreams in a graph.

        Runtimes are returned in the graph's stable sorted workstream order.
        Each runtime starts with default mutable state/artifacts.

        Args:
            graph: Validated immutable task graph.

        Returns:
            dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.
        """
        runtimes: dict[str, WorkstreamRuntime] = {}
        for workstream_id in graph.workstream_ids():
            runtimes[workstream_id] = WorkstreamRuntime(
                spec=graph.workstream(workstream_id)
            )
        return runtimes

    @classmethod
    def from_probe(
        cls, graph: TaskGraph, probe: GraphProbe
    ) -> dict[str, WorkstreamRuntime]:
        """Build runtimes pre-populated from a disk probe.

        Each runtime's :class:`WorkstreamState` and
        :class:`WorkstreamArtifacts` are populated from the corresponding
        :class:`WorkstreamProbe` when the workstream's signal directory
        exists on disk.  Workstreams that never prepared (no signal dir)
        get fresh default state equivalent to :meth:`from_graph`.

        If the probe carries a frozen :class:`ResolvedWorkstream` record,
        its ``target_branch_before_any_merge`` value is copied into
        ``runtime.artifacts`` so downstream rollback logic can reach it
        without re-reading ``resolved.json``.

        Args:
            graph: Validated immutable task graph.
            probe: Result of :func:`probe_graph_state` for the same graph.

        Returns:
            dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.
        """
        runtimes: dict[str, WorkstreamRuntime] = {}
        for workstream_id in graph.workstream_ids():
            ws_probe = probe.workstream_probes[workstream_id]
            runtime = WorkstreamRuntime(spec=graph.workstream(workstream_id))
            if ws_probe.signal_dir.is_dir():
                runtime.state.signal_dir = ws_probe.signal_dir
                runtime.state.worktree_path = ws_probe.worktree_path
                runtime.state.branch_name = ws_probe.branch_name
                runtime.artifacts.merge_pr_url = ws_probe.merge_pr_url
                if ws_probe.resolved is not None:
                    runtime.artifacts.target_branch_before_any_merge = (
                        ws_probe.resolved.target_branch_before_any_merge
                    )
            runtimes[workstream_id] = runtime
        return runtimes

from_graph(graph) classmethod

Build initial runtimes for all workstreams in a graph.

Runtimes are returned in the graph's stable sorted workstream order. Each runtime starts with default mutable state/artifacts.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required

Returns:

Type Description
dict[str, WorkstreamRuntime]

dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.

Source code in src/agentrelay/orchestrator/builders.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@classmethod
def from_graph(cls, graph: TaskGraph) -> dict[str, WorkstreamRuntime]:
    """Build initial runtimes for all workstreams in a graph.

    Runtimes are returned in the graph's stable sorted workstream order.
    Each runtime starts with default mutable state/artifacts.

    Args:
        graph: Validated immutable task graph.

    Returns:
        dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.
    """
    runtimes: dict[str, WorkstreamRuntime] = {}
    for workstream_id in graph.workstream_ids():
        runtimes[workstream_id] = WorkstreamRuntime(
            spec=graph.workstream(workstream_id)
        )
    return runtimes

from_probe(graph, probe) classmethod

Build runtimes pre-populated from a disk probe.

Each runtime's :class:WorkstreamState and :class:WorkstreamArtifacts are populated from the corresponding :class:WorkstreamProbe when the workstream's signal directory exists on disk. Workstreams that never prepared (no signal dir) get fresh default state equivalent to :meth:from_graph.

If the probe carries a frozen :class:ResolvedWorkstream record, its target_branch_before_any_merge value is copied into runtime.artifacts so downstream rollback logic can reach it without re-reading resolved.json.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required
probe GraphProbe

Result of :func:probe_graph_state for the same graph.

required

Returns:

Type Description
dict[str, WorkstreamRuntime]

dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.

Source code in src/agentrelay/orchestrator/builders.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@classmethod
def from_probe(
    cls, graph: TaskGraph, probe: GraphProbe
) -> dict[str, WorkstreamRuntime]:
    """Build runtimes pre-populated from a disk probe.

    Each runtime's :class:`WorkstreamState` and
    :class:`WorkstreamArtifacts` are populated from the corresponding
    :class:`WorkstreamProbe` when the workstream's signal directory
    exists on disk.  Workstreams that never prepared (no signal dir)
    get fresh default state equivalent to :meth:`from_graph`.

    If the probe carries a frozen :class:`ResolvedWorkstream` record,
    its ``target_branch_before_any_merge`` value is copied into
    ``runtime.artifacts`` so downstream rollback logic can reach it
    without re-reading ``resolved.json``.

    Args:
        graph: Validated immutable task graph.
        probe: Result of :func:`probe_graph_state` for the same graph.

    Returns:
        dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.
    """
    runtimes: dict[str, WorkstreamRuntime] = {}
    for workstream_id in graph.workstream_ids():
        ws_probe = probe.workstream_probes[workstream_id]
        runtime = WorkstreamRuntime(spec=graph.workstream(workstream_id))
        if ws_probe.signal_dir.is_dir():
            runtime.state.signal_dir = ws_probe.signal_dir
            runtime.state.worktree_path = ws_probe.worktree_path
            runtime.state.branch_name = ws_probe.branch_name
            runtime.artifacts.merge_pr_url = ws_probe.merge_pr_url
            if ws_probe.resolved is not None:
                runtime.artifacts.target_branch_before_any_merge = (
                    ws_probe.resolved.target_branch_before_any_merge
                )
        runtimes[workstream_id] = runtime
    return runtimes

Orchestrator dataclass

Async graph scheduler using dependency + workstream constraints.

Immutable configuration holder. The :meth:run method creates an :class:_OrchestratorRun that owns all mutable execution state.

Source code in src/agentrelay/orchestrator/orchestrator.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@dataclass
class Orchestrator:
    """Async graph scheduler using dependency + workstream constraints.

    Immutable configuration holder. The :meth:`run` method creates an
    :class:`_OrchestratorRun` that owns all mutable execution state.
    """

    graph: TaskGraph
    task_runner: TaskRunner
    workstream_runner: WorkstreamRunner
    config: OrchestratorConfig = field(default_factory=OrchestratorConfig)
    listener: Optional[OrchestratorListener] = None
    integration_merge_checker: Optional[IntegrationMergeChecker] = None
    integration_auto_merger: Optional[IntegrationAutoMerger] = None

    async def run(
        self,
        task_runtimes: Optional[Mapping[str, TaskRuntime]] = None,
        workstream_runtimes: Optional[Mapping[str, WorkstreamRuntime]] = None,
    ) -> OrchestratorResult:
        """Run graph orchestration until terminal success/failure.

        Args:
            task_runtimes: Optional prebuilt task runtime map for resume-like runs.
                If omitted, built fresh from graph.
            workstream_runtimes: Optional prebuilt workstream runtime map for
                resume-like runs. If omitted, built fresh from graph.

        Returns:
            OrchestratorResult: Terminal orchestration result with mutated runtimes.

        Raises:
            ValueError: If configuration values are invalid or runtime maps do not
                align with graph IDs.
        """
        session = _OrchestratorRun(self, task_runtimes, workstream_runtimes)
        return await session.execute()

run(task_runtimes=None, workstream_runtimes=None) async

Run graph orchestration until terminal success/failure.

Parameters:

Name Type Description Default
task_runtimes Optional[Mapping[str, TaskRuntime]]

Optional prebuilt task runtime map for resume-like runs. If omitted, built fresh from graph.

None
workstream_runtimes Optional[Mapping[str, WorkstreamRuntime]]

Optional prebuilt workstream runtime map for resume-like runs. If omitted, built fresh from graph.

None

Returns:

Name Type Description
OrchestratorResult OrchestratorResult

Terminal orchestration result with mutated runtimes.

Raises:

Type Description
ValueError

If configuration values are invalid or runtime maps do not align with graph IDs.

Source code in src/agentrelay/orchestrator/orchestrator.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def run(
    self,
    task_runtimes: Optional[Mapping[str, TaskRuntime]] = None,
    workstream_runtimes: Optional[Mapping[str, WorkstreamRuntime]] = None,
) -> OrchestratorResult:
    """Run graph orchestration until terminal success/failure.

    Args:
        task_runtimes: Optional prebuilt task runtime map for resume-like runs.
            If omitted, built fresh from graph.
        workstream_runtimes: Optional prebuilt workstream runtime map for
            resume-like runs. If omitted, built fresh from graph.

    Returns:
        OrchestratorResult: Terminal orchestration result with mutated runtimes.

    Raises:
        ValueError: If configuration values are invalid or runtime maps do not
            align with graph IDs.
    """
    session = _OrchestratorRun(self, task_runtimes, workstream_runtimes)
    return await session.execute()

OrchestratorConfig dataclass

Configuration for one orchestrator scheduling run.

Attributes:

Name Type Description
max_concurrency int

Maximum number of task attempts to run concurrently.

max_task_attempts int

Maximum attempts per task (including first attempt).

task_teardown_mode TearDownMode

Teardown policy forwarded to TaskRunner.run(...).

fail_fast_on_internal_error bool

Stop scheduling immediately when a task run raises (internal/system failure).

fail_fast_on_workstream_error bool

When True, a workstream-level failure prevents preparing any new (PENDING) workstreams. In-flight work in already-active workstreams is not cancelled. Defaults to False.

merge_poll_interval float

Seconds between polls for integration PR merge status. Only used when an :class:IntegrationMergeChecker is configured on the :class:Orchestrator.

Source code in src/agentrelay/orchestrator/orchestrator.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass(frozen=True)
class OrchestratorConfig:
    """Configuration for one orchestrator scheduling run.

    Attributes:
        max_concurrency: Maximum number of task attempts to run concurrently.
        max_task_attempts: Maximum attempts per task (including first attempt).
        task_teardown_mode: Teardown policy forwarded to ``TaskRunner.run(...)``.
        fail_fast_on_internal_error: Stop scheduling immediately when a task run
            raises (internal/system failure).
        fail_fast_on_workstream_error: When ``True``, a workstream-level failure
            prevents preparing any new (PENDING) workstreams. In-flight work
            in already-active workstreams is not cancelled.  Defaults to
            ``False``.
        merge_poll_interval: Seconds between polls for integration PR merge
            status.  Only used when an :class:`IntegrationMergeChecker` is
            configured on the :class:`Orchestrator`.
    """

    max_concurrency: int = 1
    max_task_attempts: int = 1
    task_teardown_mode: TearDownMode = TearDownMode.ALWAYS
    fail_fast_on_internal_error: bool = True
    fail_fast_on_workstream_error: bool = False
    merge_poll_interval: float = 30.0

OrchestratorEvent dataclass

Structured event emitted by the orchestration loop.

Attributes:

Name Type Description
kind str

Event type identifier.

task_id Optional[str]

Optional task ID associated with the event.

workstream_id Optional[str]

Optional workstream ID associated with the event.

attempt_num Optional[int]

Optional 0-indexed attempt number for task-run events.

outcome_class Optional[TaskOutcomeClass]

Optional classified task outcome.

message Optional[str]

Optional event detail.

Source code in src/agentrelay/orchestrator/orchestrator.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@dataclass(frozen=True)
class OrchestratorEvent:
    """Structured event emitted by the orchestration loop.

    Attributes:
        kind: Event type identifier.
        task_id: Optional task ID associated with the event.
        workstream_id: Optional workstream ID associated with the event.
        attempt_num: Optional 0-indexed attempt number for task-run events.
        outcome_class: Optional classified task outcome.
        message: Optional event detail.
    """

    kind: str
    timestamp: float = field(default_factory=time.time)
    task_id: Optional[str] = None
    workstream_id: Optional[str] = None
    attempt_num: Optional[int] = None
    outcome_class: Optional[TaskOutcomeClass] = None
    message: Optional[str] = None

OrchestratorListener

Bases: Protocol

Callback protocol for real-time orchestration event observation.

Source code in src/agentrelay/orchestrator/orchestrator.py
117
118
119
120
121
122
123
124
125
126
127
@runtime_checkable
class OrchestratorListener(Protocol):
    """Callback protocol for real-time orchestration event observation."""

    def on_event(self, event: OrchestratorEvent) -> None:
        """Called each time the orchestrator produces an event.

        Args:
            event: The event that was just produced.
        """
        ...

on_event(event)

Called each time the orchestrator produces an event.

Parameters:

Name Type Description Default
event OrchestratorEvent

The event that was just produced.

required
Source code in src/agentrelay/orchestrator/orchestrator.py
121
122
123
124
125
126
127
def on_event(self, event: OrchestratorEvent) -> None:
    """Called each time the orchestrator produces an event.

    Args:
        event: The event that was just produced.
    """
    ...

OrchestratorOutcome

Bases: str, Enum

Terminal outcome for one orchestrator run.

Attributes:

Name Type Description
SUCCEEDED

All tasks reached a terminal success status.

COMPLETED_WITH_FAILURES

No fatal internal error, but one or more tasks ended FAILED.

FATAL_INTERNAL_ERROR

A task run raised and orchestration failed fast.

Source code in src/agentrelay/orchestrator/orchestrator.py
53
54
55
56
57
58
59
60
61
62
63
64
65
class OrchestratorOutcome(str, Enum):
    """Terminal outcome for one orchestrator run.

    Attributes:
        SUCCEEDED: All tasks reached a terminal success status.
        COMPLETED_WITH_FAILURES: No fatal internal error, but one or more tasks
            ended ``FAILED``.
        FATAL_INTERNAL_ERROR: A task run raised and orchestration failed fast.
    """

    SUCCEEDED = "succeeded"
    COMPLETED_WITH_FAILURES = "completed_with_failures"
    FATAL_INTERNAL_ERROR = "fatal_internal_error"

OrchestratorResult dataclass

Terminal result for one orchestrator run.

Attributes:

Name Type Description
outcome OrchestratorOutcome

Overall run outcome classification.

task_runtimes Mapping[str, TaskRuntime]

Final task runtimes keyed by task ID.

workstream_runtimes Mapping[str, WorkstreamRuntime]

Final workstream runtimes keyed by workstream ID.

events tuple[OrchestratorEvent, ...]

Ordered structured events produced during scheduling/execution.

fatal_error Optional[str]

Traceback text for fatal internal errors, if any.

Source code in src/agentrelay/orchestrator/orchestrator.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@dataclass(frozen=True)
class OrchestratorResult:
    """Terminal result for one orchestrator run.

    Attributes:
        outcome: Overall run outcome classification.
        task_runtimes: Final task runtimes keyed by task ID.
        workstream_runtimes: Final workstream runtimes keyed by workstream ID.
        events: Ordered structured events produced during scheduling/execution.
        fatal_error: Traceback text for fatal internal errors, if any.
    """

    outcome: OrchestratorOutcome
    task_runtimes: Mapping[str, TaskRuntime]
    workstream_runtimes: Mapping[str, WorkstreamRuntime]
    events: tuple[OrchestratorEvent, ...]
    fatal_error: Optional[str] = None

TaskOutcomeClass

Bases: str, Enum

Classification of one task attempt outcome from the orchestrator boundary.

Attributes:

Name Type Description
SUCCESS

Task reached a terminal success status (PR_MERGED or COMPLETED).

EXPECTED_FAILURE

Task run returned FAILED.

INTERNAL_ERROR

Task run raised an exception.

Source code in src/agentrelay/orchestrator/orchestrator.py
39
40
41
42
43
44
45
46
47
48
49
50
class TaskOutcomeClass(str, Enum):
    """Classification of one task attempt outcome from the orchestrator boundary.

    Attributes:
        SUCCESS: Task reached a terminal success status (``PR_MERGED`` or ``COMPLETED``).
        EXPECTED_FAILURE: Task run returned ``FAILED``.
        INTERNAL_ERROR: Task run raised an exception.
    """

    SUCCESS = "success"  # Task reached a terminal success status
    EXPECTED_FAILURE = "expected_failure"
    INTERNAL_ERROR = "internal_error"

GraphProbe dataclass

Aggregate probe result covering all tasks and workstreams in a graph.

Attributes:

Name Type Description
task_probes dict[str, TaskProbe]

Reconstructed task state keyed by task ID. Every task in the current graph has an entry, even tasks that never ran (signal dir absent).

workstream_probes dict[str, WorkstreamProbe]

Reconstructed workstream state keyed by workstream ID. Every workstream in the current graph has an entry.

Source code in src/agentrelay/orchestrator/probe.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@dataclass(frozen=True)
class GraphProbe:
    """Aggregate probe result covering all tasks and workstreams in a graph.

    Attributes:
        task_probes: Reconstructed task state keyed by task ID.  Every
            task in the current graph has an entry, even tasks that
            never ran (signal dir absent).
        workstream_probes: Reconstructed workstream state keyed by
            workstream ID.  Every workstream in the current graph has
            an entry.
    """

    task_probes: dict[str, TaskProbe]
    workstream_probes: dict[str, WorkstreamProbe]

TaskProbe dataclass

Reconstructed on-disk state for a single task.

Attributes:

Name Type Description
task_id str

Task identifier.

status TaskStatus

Resolved task status after stale-state normalization. Guaranteed never to be RUNNING or PR_CREATED.

signal_dir Path

Path to the task signal directory (run_dir/signals/<task_id>). May or may not exist on disk — check signal_dir.is_dir() to distinguish "never started" from "started then progressed".

attempt_num int

Highest attempt number found on disk (0 if no attempt directories exist).

branch_name str

Task feature branch name, derived from convention as agentrelay/<graph_name>/<task_id>.

pr_url Optional[str]

PR URL recovered from the latest attempt's .done file, or None for PR-less tasks / tasks that never reached the .done stage.

resolved Optional[ResolvedTask]

Frozen execution record loaded from signal_dir/resolved.json if present, else None.

Source code in src/agentrelay/orchestrator/probe.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@dataclass(frozen=True)
class TaskProbe:
    """Reconstructed on-disk state for a single task.

    Attributes:
        task_id: Task identifier.
        status: Resolved task status after stale-state normalization.
            Guaranteed never to be ``RUNNING`` or ``PR_CREATED``.
        signal_dir: Path to the task signal directory
            (``run_dir/signals/<task_id>``).  May or may not exist on
            disk — check ``signal_dir.is_dir()`` to distinguish "never
            started" from "started then progressed".
        attempt_num: Highest attempt number found on disk (0 if no
            attempt directories exist).
        branch_name: Task feature branch name, derived from convention
            as ``agentrelay/<graph_name>/<task_id>``.
        pr_url: PR URL recovered from the latest attempt's ``.done``
            file, or ``None`` for PR-less tasks / tasks that never
            reached the ``.done`` stage.
        resolved: Frozen execution record loaded from
            ``signal_dir/resolved.json`` if present, else ``None``.
    """

    task_id: str
    status: TaskStatus
    signal_dir: Path
    attempt_num: int
    branch_name: str
    pr_url: Optional[str]
    resolved: Optional[ResolvedTask]

WorkstreamProbe dataclass

Reconstructed on-disk state for a single workstream.

Attributes:

Name Type Description
workstream_id str

Workstream identifier.

status WorkstreamStatus

Current workstream status (no normalization needed — workstream transient states are handled by the orchestrator's main loop via _poll_integration_merges and _process_merge_ready_workstreams).

signal_dir Path

Path to the workstream signal directory (run_dir/workstreams/<ws_id>).

worktree_path Path

Path to the workstream worktree (repo_path/.worktrees/<graph_name>/<ws_id>). Worktrees are not per-run and may still exist from prior runs.

branch_name str

Workstream integration branch name, derived as agentrelay/<graph_name>/<ws_id>/integration.

merge_pr_url Optional[str]

Integration PR URL recovered from the pr_created signal file, or None if no PR was created.

resolved Optional[ResolvedWorkstream]

Frozen execution record loaded from signal_dir/resolved.json if present, else None.

Source code in src/agentrelay/orchestrator/probe.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@dataclass(frozen=True)
class WorkstreamProbe:
    """Reconstructed on-disk state for a single workstream.

    Attributes:
        workstream_id: Workstream identifier.
        status: Current workstream status (no normalization needed —
            workstream transient states are handled by the orchestrator's
            main loop via ``_poll_integration_merges`` and
            ``_process_merge_ready_workstreams``).
        signal_dir: Path to the workstream signal directory
            (``run_dir/workstreams/<ws_id>``).
        worktree_path: Path to the workstream worktree
            (``repo_path/.worktrees/<graph_name>/<ws_id>``).  Worktrees
            are not per-run and may still exist from prior runs.
        branch_name: Workstream integration branch name, derived as
            ``agentrelay/<graph_name>/<ws_id>/integration``.
        merge_pr_url: Integration PR URL recovered from the
            ``pr_created`` signal file, or ``None`` if no PR was
            created.
        resolved: Frozen execution record loaded from
            ``signal_dir/resolved.json`` if present, else ``None``.
    """

    workstream_id: str
    status: WorkstreamStatus
    signal_dir: Path
    worktree_path: Path
    branch_name: str
    merge_pr_url: Optional[str]
    resolved: Optional[ResolvedWorkstream]

build_integration_auto_merger(repo_path)

Build the standard integration auto-merger for GitHub CLI.

Parameters:

Name Type Description Default
repo_path Path

Path to the repository (for pre-merge SHA capture).

required

Returns:

Name Type Description
A GhIntegrationAutoMerger

class:GhIntegrationAutoMerger instance.

Source code in src/agentrelay/orchestrator/builders.py
367
368
369
370
371
372
373
374
375
376
def build_integration_auto_merger(repo_path: Path) -> GhIntegrationAutoMerger:
    """Build the standard integration auto-merger for GitHub CLI.

    Args:
        repo_path: Path to the repository (for pre-merge SHA capture).

    Returns:
        A :class:`GhIntegrationAutoMerger` instance.
    """
    return GhIntegrationAutoMerger(repo_path=repo_path)

build_integration_merge_checker(repo_path)

Build the standard integration merge checker for GitHub CLI.

Parameters:

Name Type Description Default
repo_path Path

Path to the repository (for git ref resolution).

required

Returns:

Name Type Description
A GhIntegrationMergeChecker

class:GhIntegrationMergeChecker instance.

Source code in src/agentrelay/orchestrator/builders.py
355
356
357
358
359
360
361
362
363
364
def build_integration_merge_checker(repo_path: Path) -> GhIntegrationMergeChecker:
    """Build the standard integration merge checker for GitHub CLI.

    Args:
        repo_path: Path to the repository (for git ref resolution).

    Returns:
        A :class:`GhIntegrationMergeChecker` instance.
    """
    return GhIntegrationMergeChecker(repo_path=repo_path)

build_run_repo_manager(repo_path, graph_name)

Build the standard run repo manager for git repositories.

Parameters:

Name Type Description Default
repo_path Path

Path to the repository root.

required
graph_name str

Name of the graph being executed.

required

Returns:

Type Description
GitRunRepoManager

A GitRunRepoManager instance.

Source code in src/agentrelay/orchestrator/builders.py
428
429
430
431
432
433
434
435
436
437
438
def build_run_repo_manager(repo_path: Path, graph_name: str) -> GitRunRepoManager:
    """Build the standard run repo manager for git repositories.

    Args:
        repo_path: Path to the repository root.
        graph_name: Name of the graph being executed.

    Returns:
        A ``GitRunRepoManager`` instance.
    """
    return GitRunRepoManager(repo_path=repo_path, graph_name=graph_name)

build_sandbox_infrastructure_manager(graph)

Build the appropriate sandbox infrastructure manager for a graph.

Inspects the graph's tasks for OCI sandbox usage. Returns an OciSandboxInfrastructureManager when any task uses OCI isolation, otherwise returns a NullSandboxInfrastructureManager.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required

Returns:

Type Description
OciSandboxInfrastructureManager | NullSandboxInfrastructureManager

Appropriate infrastructure manager implementation.

Source code in src/agentrelay/orchestrator/builders.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def build_sandbox_infrastructure_manager(
    graph: TaskGraph,
) -> OciSandboxInfrastructureManager | NullSandboxInfrastructureManager:
    """Build the appropriate sandbox infrastructure manager for a graph.

    Inspects the graph's tasks for OCI sandbox usage.  Returns an
    ``OciSandboxInfrastructureManager`` when any task uses OCI
    isolation, otherwise returns a
    ``NullSandboxInfrastructureManager``.

    Args:
        graph: Validated immutable task graph.

    Returns:
        Appropriate infrastructure manager implementation.
    """
    uses_oci = False
    for task_id in graph.task_ids():
        isolation = graph.task(task_id).primary_agent.isolation
        if isolation is not None and isolation.sandbox_type == SandboxType.OCI:
            uses_oci = True
            break
    if uses_oci:
        assert graph.name is not None
        return OciSandboxInfrastructureManager(graph.name)
    return NullSandboxInfrastructureManager()

build_session_resolver()

Build the standard session resolver for tmux environments.

Returns:

Type Description
TmuxSessionResolver

A TmuxSessionResolver instance.

Source code in src/agentrelay/orchestrator/builders.py
419
420
421
422
423
424
425
def build_session_resolver() -> TmuxSessionResolver:
    """Build the standard session resolver for tmux environments.

    Returns:
        A ``TmuxSessionResolver`` instance.
    """
    return TmuxSessionResolver()

build_standard_runner(repo_path, graph_name, run_dir, graph, keep_panes=False, poll_interval=2.0, context_content=None, tools=(), credential_provider=None, anthropic_credential=None)

Build the standard runner for worktree + tmux + Claude Code.

All steps use StepDispatch default since only one framework/environment combo currently exists.

When adding a new AgentFramework or AgentEnvironment, add keyed entries to the StepDispatch tables for steps that have distinct implementations. Refer to the sensitivity table in :class:StandardTaskRunner's docstring.

Dispatch table (current):

+-----------------------+----------------------------+-----------------------+ | Step | Implementation | Notes | +=======================+============================+=======================+ | preparer | WorktreeTaskPreparer | env/fw agnostic | +-----------------------+----------------------------+-----------------------+ | launcher | TmuxTaskLauncher | will need entries | +-----------------------+----------------------------+-----------------------+ | kickoff | TmuxTaskKickoff | will need entries | +-----------------------+----------------------------+-----------------------+ | completion_checker | SignalCompletionChecker | may need entries | +-----------------------+----------------------------+-----------------------+ | gate_checker | ShellGateChecker | env/fw agnostic | +-----------------------+----------------------------+-----------------------+ | merger | GhTaskMerger | env/fw agnostic | +-----------------------+----------------------------+-----------------------+ | log_capture | WorktreeTaskLogCapture | will need entries | +-----------------------+----------------------------+-----------------------+ | teardown | WorktreeTaskTeardown | will need entries | +-----------------------+----------------------------+-----------------------+

Parameters:

Name Type Description Default
repo_path Path

Path to the bare/main repository.

required
graph_name str

Name of the task graph being executed.

required
graph TaskGraph

The task graph (used to compute dependency descriptions).

required
keep_panes bool

Whether to keep tmux panes after teardown.

False
poll_interval float

Seconds between completion signal polls.

2.0
context_content Optional[str]

Optional context content to write to the signal dir.

None
tools tuple[str, ...]

Declared tool names from the graph YAML.

()

Returns:

Type Description
StandardTaskRunner

A fully wired :class:StandardTaskRunner.

Source code in src/agentrelay/orchestrator/builders.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def build_standard_runner(
    repo_path: Path,
    graph_name: str,
    run_dir: Path,
    graph: TaskGraph,
    keep_panes: bool = False,
    poll_interval: float = 2.0,
    context_content: Optional[str] = None,
    tools: tuple[str, ...] = (),
    credential_provider: Optional[CredentialProvider] = None,
    anthropic_credential: Optional[AnthropicCredential] = None,
) -> StandardTaskRunner:
    """Build the standard runner for worktree + tmux + Claude Code.

    All steps use ``StepDispatch`` ``default`` since only one
    framework/environment combo currently exists.

    When adding a new ``AgentFramework`` or ``AgentEnvironment``, add keyed
    entries to the ``StepDispatch`` tables for steps that have distinct
    implementations. Refer to the sensitivity table in
    :class:`StandardTaskRunner`'s docstring.

    Dispatch table (current):

    +-----------------------+----------------------------+-----------------------+
    | Step                  | Implementation             | Notes                 |
    +=======================+============================+=======================+
    | preparer              | WorktreeTaskPreparer       | env/fw agnostic       |
    +-----------------------+----------------------------+-----------------------+
    | launcher              | TmuxTaskLauncher           | will need entries     |
    +-----------------------+----------------------------+-----------------------+
    | kickoff               | TmuxTaskKickoff            | will need entries     |
    +-----------------------+----------------------------+-----------------------+
    | completion_checker    | SignalCompletionChecker     | may need entries      |
    +-----------------------+----------------------------+-----------------------+
    | gate_checker          | ShellGateChecker           | env/fw agnostic       |
    +-----------------------+----------------------------+-----------------------+
    | merger                | GhTaskMerger               | env/fw agnostic       |
    +-----------------------+----------------------------+-----------------------+
    | log_capture           | WorktreeTaskLogCapture      | will need entries     |
    +-----------------------+----------------------------+-----------------------+
    | teardown              | WorktreeTaskTeardown       | will need entries     |
    +-----------------------+----------------------------+-----------------------+

    Args:
        repo_path: Path to the bare/main repository.
        graph_name: Name of the task graph being executed.
        graph: The task graph (used to compute dependency descriptions).
        keep_panes: Whether to keep tmux panes after teardown.
        poll_interval: Seconds between completion signal polls.
        context_content: Optional context content to write to the signal dir.
        tools: Declared tool names from the graph YAML.

    Returns:
        A fully wired :class:`StandardTaskRunner`.
    """

    def _make_launcher(runtime: TaskRuntime) -> TmuxTaskLauncher:
        isolation = runtime.task.primary_agent.isolation
        if isolation is not None and isolation.sandbox_type == SandboxType.OCI:
            sandbox = OciSandbox(
                image=isolation.image,
                runtime=isolation.runtime,
                anthropic_credential=anthropic_credential,
            )
        else:
            sandbox = NullSandbox()
        return TmuxTaskLauncher(
            adapter=ClaudeCodeAdapter(),
            sandbox=sandbox,
            credential_provider=credential_provider or NullCredentialProvider(),
            repo_path=repo_path,
            graph_name=graph_name,
        )

    tmux_kickoff = TmuxTaskKickoff()

    def _make_preparer(runtime: TaskRuntime) -> TaskPreparer:
        dep_ids = graph.dependency_ids(runtime.task.id)
        dep_descs = {did: graph.task(did).description for did in dep_ids}
        return WorktreeTaskPreparer(
            run_dir=run_dir,
            graph_name=graph_name,
            dependency_descriptions=dep_descs,
            context_content=context_content,
            tools=tools,
        )

    def _make_merger(runtime: TaskRuntime) -> TaskMerger:
        return GhTaskMerger(repo_path=repo_path)

    def _make_completion_checker(runtime: TaskRuntime) -> TaskCompletionChecker:
        return SignalCompletionChecker(poll_interval=poll_interval)

    def _make_log_capture(runtime: TaskRuntime) -> TaskLogCapture:
        return WorktreeTaskLogCapture()

    def _make_teardown(runtime: TaskRuntime) -> TaskTeardown:
        return WorktreeTaskTeardown(repo_path=repo_path, keep_panes=keep_panes)

    return StandardTaskRunner(
        _preparer=StepDispatch(default=_make_preparer),
        _launcher=StepDispatch(default=_make_launcher),
        _kickoff=StepDispatch(default=lambda rt: tmux_kickoff),
        _completion_checker=StepDispatch(default=_make_completion_checker),
        _gate_checker=ShellGateChecker(),
        _merger=StepDispatch(default=_make_merger),
        _log_capture=StepDispatch(default=_make_log_capture),
        _teardown=StepDispatch(default=_make_teardown),
    )

build_standard_workstream_runner(repo_path, graph_name, run_dir)

Build the standard workstream runner for git worktree + GitHub CLI.

Wires the three workstream lifecycle steps with concrete implementations:

+----------+-------------------------+ | Step | Implementation | +==========+=========================+ | preparer | GitWorkstreamPreparer | +----------+-------------------------+ | integrator | GhWorkstreamIntegrator | +----------+-------------------------+ | teardown | GitWorkstreamTeardown | +----------+-------------------------+

Parameters:

Name Type Description Default
repo_path Path

Path to the bare/main repository.

required
graph_name str

Name of the task graph being executed.

required

Returns:

Type Description
StandardWorkstreamRunner

A fully wired :class:StandardWorkstreamRunner.

Source code in src/agentrelay/orchestrator/builders.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def build_standard_workstream_runner(
    repo_path: Path,
    graph_name: str,
    run_dir: Path,
) -> StandardWorkstreamRunner:
    """Build the standard workstream runner for git worktree + GitHub CLI.

    Wires the three workstream lifecycle steps with concrete implementations:

    +----------+-------------------------+
    | Step     | Implementation          |
    +==========+=========================+
    | preparer | GitWorkstreamPreparer   |
    +----------+-------------------------+
    | integrator | GhWorkstreamIntegrator |
    +----------+-------------------------+
    | teardown | GitWorkstreamTeardown   |
    +----------+-------------------------+

    Args:
        repo_path: Path to the bare/main repository.
        graph_name: Name of the task graph being executed.

    Returns:
        A fully wired :class:`StandardWorkstreamRunner`.
    """
    return StandardWorkstreamRunner(
        _preparer=GitWorkstreamPreparer(
            repo_path=repo_path, graph_name=graph_name, run_dir=run_dir
        ),
        _integrator=GhWorkstreamIntegrator(repo_path=repo_path),
        _teardown=GitWorkstreamTeardown(repo_path=repo_path),
    )

build_task_pr_prober()

Build the standard task PR prober for GitHub CLI.

Used during graph resumption to normalize stale PR_CREATED tasks by checking whether the PR was merged and optionally merging it.

Returns:

Name Type Description
A GhTaskPrProber

class:GhTaskPrProber instance.

Source code in src/agentrelay/orchestrator/builders.py
379
380
381
382
383
384
385
386
387
388
def build_task_pr_prober() -> GhTaskPrProber:
    """Build the standard task PR prober for GitHub CLI.

    Used during graph resumption to normalize stale ``PR_CREATED`` tasks
    by checking whether the PR was merged and optionally merging it.

    Returns:
        A :class:`GhTaskPrProber` instance.
    """
    return GhTaskPrProber()

probe_graph_state(repo_path, graph_name, graph, run_dir, pr_prober)

Reconstruct runtime state from on-disk signal files.

Normalizes stale transient states in place before returning. A task that was RUNNING at orchestrator-crash time is resolved by inspecting the attempt directory for terminal signals (.done or .failed); a task that was PR_CREATED is resolved by probing the hosting platform via pr_prober.

Chained normalization path when a RUNNING task has a .done file containing a PR URL::

RUNNING
  └─ .done + PR URL ──► PR_CREATED
                          ├─ is_merged=True ─────────────► PR_MERGED
                          ├─ is_merged=F, try_merge=T ──► PR_MERGED
                          ├─ is_merged=F, try_merge=F ──► FAILED
                          └─ pr_url=None (malformed) ───► FAILED

Parameters:

Name Type Description Default
repo_path Path

Path to the target repository (used to compute worktree paths).

required
graph_name str

Name of the graph being resumed.

required
graph TaskGraph

Current graph definition, used to enumerate task and workstream IDs.

required
run_dir Path

Path to the run directory being probed (typically .workflow/<graph>/runs/<N>/).

required
pr_prober TaskPrProber

Protocol implementation for checking and merging stale task PRs.

required

Returns:

Name Type Description
GraphProbe GraphProbe

Reconstructed task and workstream state.

Source code in src/agentrelay/orchestrator/probe.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def probe_graph_state(
    repo_path: Path,
    graph_name: str,
    graph: TaskGraph,
    run_dir: Path,
    pr_prober: TaskPrProber,
) -> GraphProbe:
    """Reconstruct runtime state from on-disk signal files.

    Normalizes stale transient states in place before returning.  A
    task that was ``RUNNING`` at orchestrator-crash time is resolved by
    inspecting the attempt directory for terminal signals (``.done`` or
    ``.failed``); a task that was ``PR_CREATED`` is resolved by probing
    the hosting platform via ``pr_prober``.

    Chained normalization path when a ``RUNNING`` task has a ``.done``
    file containing a PR URL::

        RUNNING
          └─ .done + PR URL ──► PR_CREATED
                                  ├─ is_merged=True ─────────────► PR_MERGED
                                  ├─ is_merged=F, try_merge=T ──► PR_MERGED
                                  ├─ is_merged=F, try_merge=F ──► FAILED
                                  └─ pr_url=None (malformed) ───► FAILED

    Args:
        repo_path: Path to the target repository (used to compute
            worktree paths).
        graph_name: Name of the graph being resumed.
        graph: Current graph definition, used to enumerate task and
            workstream IDs.
        run_dir: Path to the run directory being probed
            (typically ``.workflow/<graph>/runs/<N>/``).
        pr_prober: Protocol implementation for checking and merging
            stale task PRs.

    Returns:
        GraphProbe: Reconstructed task and workstream state.
    """
    task_probes = {
        task_id: _probe_task_state(run_dir, task_id, graph_name, pr_prober)
        for task_id in graph.task_ids()
    }
    workstream_probes = {
        ws_id: _probe_workstream_state(run_dir, ws_id, repo_path, graph_name)
        for ws_id in graph.workstream_ids()
    }
    return GraphProbe(
        task_probes=task_probes,
        workstream_probes=workstream_probes,
    )