Skip to content

orchestrator

Orchestrator package — async graph scheduling over task and workstream runtimes.

Re-exports all public names so that from agentrelay.orchestrator import X continues to work after promotion from a single module to a package.

TaskRuntimeBuilder

Builder for initializing per-task runtime envelopes from a task graph.

Source code in src/agentrelay/orchestrator/builders.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class TaskRuntimeBuilder:
    """Builder for initializing per-task runtime envelopes from a task graph."""

    @classmethod
    def from_graph(cls, graph: TaskGraph) -> dict[str, TaskRuntime]:
        """Build initial runtimes for all tasks in a graph.

        Runtimes are returned in the graph's stable topological task order.
        Each runtime starts with default mutable state/artifacts and no agent.

        Args:
            graph: Validated immutable task graph.

        Returns:
            dict[str, TaskRuntime]: Task runtimes keyed by task ID.
        """
        runtimes: dict[str, TaskRuntime] = {}
        for task_id in graph.task_ids():
            runtimes[task_id] = TaskRuntime(task=graph.task(task_id))
        return runtimes

from_graph(graph) classmethod

Build initial runtimes for all tasks in a graph.

Runtimes are returned in the graph's stable topological task order. Each runtime starts with default mutable state/artifacts and no agent.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required

Returns:

Type Description
dict[str, TaskRuntime]

dict[str, TaskRuntime]: Task runtimes keyed by task ID.

Source code in src/agentrelay/orchestrator/builders.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@classmethod
def from_graph(cls, graph: TaskGraph) -> dict[str, TaskRuntime]:
    """Build initial runtimes for all tasks in a graph.

    Runtimes are returned in the graph's stable topological task order.
    Each runtime starts with default mutable state/artifacts and no agent.

    Args:
        graph: Validated immutable task graph.

    Returns:
        dict[str, TaskRuntime]: Task runtimes keyed by task ID.
    """
    runtimes: dict[str, TaskRuntime] = {}
    for task_id in graph.task_ids():
        runtimes[task_id] = TaskRuntime(task=graph.task(task_id))
    return runtimes

WorkstreamRuntimeBuilder

Builder for initializing per-workstream runtime envelopes from a graph.

Source code in src/agentrelay/orchestrator/builders.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class WorkstreamRuntimeBuilder:
    """Builder for initializing per-workstream runtime envelopes from a graph."""

    @classmethod
    def from_graph(cls, graph: TaskGraph) -> dict[str, WorkstreamRuntime]:
        """Build initial runtimes for all workstreams in a graph.

        Runtimes are returned in the graph's stable sorted workstream order.
        Each runtime starts with default mutable state/artifacts.

        Args:
            graph: Validated immutable task graph.

        Returns:
            dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.
        """
        runtimes: dict[str, WorkstreamRuntime] = {}
        for workstream_id in graph.workstream_ids():
            runtimes[workstream_id] = WorkstreamRuntime(
                spec=graph.workstream(workstream_id)
            )
        return runtimes

from_graph(graph) classmethod

Build initial runtimes for all workstreams in a graph.

Runtimes are returned in the graph's stable sorted workstream order. Each runtime starts with default mutable state/artifacts.

Parameters:

Name Type Description Default
graph TaskGraph

Validated immutable task graph.

required

Returns:

Type Description
dict[str, WorkstreamRuntime]

dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.

Source code in src/agentrelay/orchestrator/builders.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
def from_graph(cls, graph: TaskGraph) -> dict[str, WorkstreamRuntime]:
    """Build initial runtimes for all workstreams in a graph.

    Runtimes are returned in the graph's stable sorted workstream order.
    Each runtime starts with default mutable state/artifacts.

    Args:
        graph: Validated immutable task graph.

    Returns:
        dict[str, WorkstreamRuntime]: Workstream runtimes keyed by ID.
    """
    runtimes: dict[str, WorkstreamRuntime] = {}
    for workstream_id in graph.workstream_ids():
        runtimes[workstream_id] = WorkstreamRuntime(
            spec=graph.workstream(workstream_id)
        )
    return runtimes

Orchestrator dataclass

Async graph scheduler using dependency + workstream constraints.

Immutable configuration holder. The :meth:run method creates an :class:_OrchestratorRun that owns all mutable execution state.

Source code in src/agentrelay/orchestrator/orchestrator.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@dataclass
class Orchestrator:
    """Async graph scheduler using dependency + workstream constraints.

    Immutable configuration holder. The :meth:`run` method creates an
    :class:`_OrchestratorRun` that owns all mutable execution state.
    """

    graph: TaskGraph
    task_runner: TaskRunner
    workstream_runner: WorkstreamRunner
    config: OrchestratorConfig = field(default_factory=OrchestratorConfig)
    listener: Optional[OrchestratorListener] = None

    async def run(
        self,
        task_runtimes: Optional[Mapping[str, TaskRuntime]] = None,
        workstream_runtimes: Optional[Mapping[str, WorkstreamRuntime]] = None,
    ) -> OrchestratorResult:
        """Run graph orchestration until terminal success/failure.

        Args:
            task_runtimes: Optional prebuilt task runtime map for resume-like runs.
                If omitted, built fresh from graph.
            workstream_runtimes: Optional prebuilt workstream runtime map for
                resume-like runs. If omitted, built fresh from graph.

        Returns:
            OrchestratorResult: Terminal orchestration result with mutated runtimes.

        Raises:
            ValueError: If configuration values are invalid or runtime maps do not
                align with graph IDs.
        """
        session = _OrchestratorRun(self, task_runtimes, workstream_runtimes)
        return await session.execute()

run(task_runtimes=None, workstream_runtimes=None) async

Run graph orchestration until terminal success/failure.

Parameters:

Name Type Description Default
task_runtimes Optional[Mapping[str, TaskRuntime]]

Optional prebuilt task runtime map for resume-like runs. If omitted, built fresh from graph.

None
workstream_runtimes Optional[Mapping[str, WorkstreamRuntime]]

Optional prebuilt workstream runtime map for resume-like runs. If omitted, built fresh from graph.

None

Returns:

Name Type Description
OrchestratorResult OrchestratorResult

Terminal orchestration result with mutated runtimes.

Raises:

Type Description
ValueError

If configuration values are invalid or runtime maps do not align with graph IDs.

Source code in src/agentrelay/orchestrator/orchestrator.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def run(
    self,
    task_runtimes: Optional[Mapping[str, TaskRuntime]] = None,
    workstream_runtimes: Optional[Mapping[str, WorkstreamRuntime]] = None,
) -> OrchestratorResult:
    """Run graph orchestration until terminal success/failure.

    Args:
        task_runtimes: Optional prebuilt task runtime map for resume-like runs.
            If omitted, built fresh from graph.
        workstream_runtimes: Optional prebuilt workstream runtime map for
            resume-like runs. If omitted, built fresh from graph.

    Returns:
        OrchestratorResult: Terminal orchestration result with mutated runtimes.

    Raises:
        ValueError: If configuration values are invalid or runtime maps do not
            align with graph IDs.
    """
    session = _OrchestratorRun(self, task_runtimes, workstream_runtimes)
    return await session.execute()

OrchestratorConfig dataclass

Configuration for one orchestrator scheduling run.

Attributes:

Name Type Description
max_concurrency int

Maximum number of task attempts to run concurrently.

max_task_attempts int

Maximum attempts per task (including first attempt).

task_teardown_mode TearDownMode

Teardown policy forwarded to TaskRunner.run(...).

fail_fast_on_internal_error bool

Stop scheduling immediately when a task run raises (internal/system failure).

fail_fast_on_workstream_error bool

When True, a workstream-level failure prevents preparing any new (PENDING) workstreams. In-flight work in already-active workstreams is not cancelled.

Source code in src/agentrelay/orchestrator/orchestrator.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@dataclass(frozen=True)
class OrchestratorConfig:
    """Configuration for one orchestrator scheduling run.

    Attributes:
        max_concurrency: Maximum number of task attempts to run concurrently.
        max_task_attempts: Maximum attempts per task (including first attempt).
        task_teardown_mode: Teardown policy forwarded to ``TaskRunner.run(...)``.
        fail_fast_on_internal_error: Stop scheduling immediately when a task run
            raises (internal/system failure).
        fail_fast_on_workstream_error: When ``True``, a workstream-level failure
            prevents preparing any new (PENDING) workstreams. In-flight work
            in already-active workstreams is not cancelled.
    """

    max_concurrency: int = 1
    max_task_attempts: int = 1
    task_teardown_mode: TearDownMode = TearDownMode.ON_SUCCESS
    fail_fast_on_internal_error: bool = True
    fail_fast_on_workstream_error: bool = True

OrchestratorEvent dataclass

Structured event emitted by the orchestration loop.

Attributes:

Name Type Description
kind str

Event type identifier.

task_id Optional[str]

Optional task ID associated with the event.

workstream_id Optional[str]

Optional workstream ID associated with the event.

attempt_num Optional[int]

Optional 0-indexed attempt number for task-run events.

outcome_class Optional[TaskOutcomeClass]

Optional classified task outcome.

message Optional[str]

Optional event detail.

Source code in src/agentrelay/orchestrator/orchestrator.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@dataclass(frozen=True)
class OrchestratorEvent:
    """Structured event emitted by the orchestration loop.

    Attributes:
        kind: Event type identifier.
        task_id: Optional task ID associated with the event.
        workstream_id: Optional workstream ID associated with the event.
        attempt_num: Optional 0-indexed attempt number for task-run events.
        outcome_class: Optional classified task outcome.
        message: Optional event detail.
    """

    kind: str
    timestamp: float = field(default_factory=time.time)
    task_id: Optional[str] = None
    workstream_id: Optional[str] = None
    attempt_num: Optional[int] = None
    outcome_class: Optional[TaskOutcomeClass] = None
    message: Optional[str] = None

OrchestratorListener

Bases: Protocol

Callback protocol for real-time orchestration event observation.

Source code in src/agentrelay/orchestrator/orchestrator.py
107
108
109
110
111
112
113
114
115
116
117
@runtime_checkable
class OrchestratorListener(Protocol):
    """Callback protocol for real-time orchestration event observation."""

    def on_event(self, event: OrchestratorEvent) -> None:
        """Called each time the orchestrator produces an event.

        Args:
            event: The event that was just produced.
        """
        ...

on_event(event)

Called each time the orchestrator produces an event.

Parameters:

Name Type Description Default
event OrchestratorEvent

The event that was just produced.

required
Source code in src/agentrelay/orchestrator/orchestrator.py
111
112
113
114
115
116
117
def on_event(self, event: OrchestratorEvent) -> None:
    """Called each time the orchestrator produces an event.

    Args:
        event: The event that was just produced.
    """
    ...

OrchestratorOutcome

Bases: str, Enum

Terminal outcome for one orchestrator run.

Attributes:

Name Type Description
SUCCEEDED

All tasks reached PR_MERGED.

COMPLETED_WITH_FAILURES

No fatal internal error, but one or more tasks ended FAILED.

FATAL_INTERNAL_ERROR

A task run raised and orchestration failed fast.

Source code in src/agentrelay/orchestrator/orchestrator.py
48
49
50
51
52
53
54
55
56
57
58
59
60
class OrchestratorOutcome(str, Enum):
    """Terminal outcome for one orchestrator run.

    Attributes:
        SUCCEEDED: All tasks reached ``PR_MERGED``.
        COMPLETED_WITH_FAILURES: No fatal internal error, but one or more tasks
            ended ``FAILED``.
        FATAL_INTERNAL_ERROR: A task run raised and orchestration failed fast.
    """

    SUCCEEDED = "succeeded"
    COMPLETED_WITH_FAILURES = "completed_with_failures"
    FATAL_INTERNAL_ERROR = "fatal_internal_error"

OrchestratorResult dataclass

Terminal result for one orchestrator run.

Attributes:

Name Type Description
outcome OrchestratorOutcome

Overall run outcome classification.

task_runtimes Mapping[str, TaskRuntime]

Final task runtimes keyed by task ID.

workstream_runtimes Mapping[str, WorkstreamRuntime]

Final workstream runtimes keyed by workstream ID.

events tuple[OrchestratorEvent, ...]

Ordered structured events produced during scheduling/execution.

fatal_error Optional[str]

Traceback text for fatal internal errors, if any.

Source code in src/agentrelay/orchestrator/orchestrator.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@dataclass(frozen=True)
class OrchestratorResult:
    """Terminal result for one orchestrator run.

    Attributes:
        outcome: Overall run outcome classification.
        task_runtimes: Final task runtimes keyed by task ID.
        workstream_runtimes: Final workstream runtimes keyed by workstream ID.
        events: Ordered structured events produced during scheduling/execution.
        fatal_error: Traceback text for fatal internal errors, if any.
    """

    outcome: OrchestratorOutcome
    task_runtimes: Mapping[str, TaskRuntime]
    workstream_runtimes: Mapping[str, WorkstreamRuntime]
    events: tuple[OrchestratorEvent, ...]
    fatal_error: Optional[str] = None

TaskOutcomeClass

Bases: str, Enum

Classification of one task attempt outcome from the orchestrator boundary.

Attributes:

Name Type Description
SUCCESS

Task reached PR_MERGED.

EXPECTED_FAILURE

Task run returned FAILED.

INTERNAL_ERROR

Task run raised an exception.

Source code in src/agentrelay/orchestrator/orchestrator.py
34
35
36
37
38
39
40
41
42
43
44
45
class TaskOutcomeClass(str, Enum):
    """Classification of one task attempt outcome from the orchestrator boundary.

    Attributes:
        SUCCESS: Task reached ``PR_MERGED``.
        EXPECTED_FAILURE: Task run returned ``FAILED``.
        INTERNAL_ERROR: Task run raised an exception.
    """

    SUCCESS = "success"
    EXPECTED_FAILURE = "expected_failure"
    INTERNAL_ERROR = "internal_error"

build_standard_runner(repo_path, graph_name, graph, keep_panes=False, poll_interval=2.0, context_content=None)

Build the standard runner for worktree + tmux + Claude Code.

All steps use StepDispatch default since only one framework/environment combo currently exists.

When adding a new AgentFramework or AgentEnvironment, add keyed entries to the StepDispatch tables for steps that have distinct implementations. Refer to the sensitivity table in :class:StandardTaskRunner's docstring.

Dispatch table (current):

+-----------------------+----------------------------+-----------------------+ | Step | Implementation | Notes | +=======================+============================+=======================+ | preparer | WorktreeTaskPreparer | env/fw agnostic | +-----------------------+----------------------------+-----------------------+ | launcher | TmuxTaskLauncher | will need entries | +-----------------------+----------------------------+-----------------------+ | kickoff | TmuxTaskKickoff | will need entries | +-----------------------+----------------------------+-----------------------+ | completion_checker | SignalCompletionChecker | may need entries | +-----------------------+----------------------------+-----------------------+ | merger | GhTaskMerger | env/fw agnostic | +-----------------------+----------------------------+-----------------------+ | teardown | WorktreeTaskTeardown | will need entries | +-----------------------+----------------------------+-----------------------+

Parameters:

Name Type Description Default
repo_path Path

Path to the bare/main repository.

required
graph_name str

Name of the task graph being executed.

required
graph TaskGraph

The task graph (used to compute dependency descriptions).

required
keep_panes bool

Whether to keep tmux panes after teardown.

False
poll_interval float

Seconds between completion signal polls.

2.0
context_content Optional[str]

Optional context content to write to the signal dir.

None

Returns:

Type Description
StandardTaskRunner

A fully wired :class:StandardTaskRunner.

Source code in src/agentrelay/orchestrator/builders.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def build_standard_runner(
    repo_path: Path,
    graph_name: str,
    graph: TaskGraph,
    keep_panes: bool = False,
    poll_interval: float = 2.0,
    context_content: Optional[str] = None,
) -> StandardTaskRunner:
    """Build the standard runner for worktree + tmux + Claude Code.

    All steps use ``StepDispatch`` ``default`` since only one
    framework/environment combo currently exists.

    When adding a new ``AgentFramework`` or ``AgentEnvironment``, add keyed
    entries to the ``StepDispatch`` tables for steps that have distinct
    implementations. Refer to the sensitivity table in
    :class:`StandardTaskRunner`'s docstring.

    Dispatch table (current):

    +-----------------------+----------------------------+-----------------------+
    | Step                  | Implementation             | Notes                 |
    +=======================+============================+=======================+
    | preparer              | WorktreeTaskPreparer       | env/fw agnostic       |
    +-----------------------+----------------------------+-----------------------+
    | launcher              | TmuxTaskLauncher           | will need entries     |
    +-----------------------+----------------------------+-----------------------+
    | kickoff               | TmuxTaskKickoff            | will need entries     |
    +-----------------------+----------------------------+-----------------------+
    | completion_checker    | SignalCompletionChecker     | may need entries      |
    +-----------------------+----------------------------+-----------------------+
    | merger                | GhTaskMerger               | env/fw agnostic       |
    +-----------------------+----------------------------+-----------------------+
    | teardown              | WorktreeTaskTeardown       | will need entries     |
    +-----------------------+----------------------------+-----------------------+

    Args:
        repo_path: Path to the bare/main repository.
        graph_name: Name of the task graph being executed.
        graph: The task graph (used to compute dependency descriptions).
        keep_panes: Whether to keep tmux panes after teardown.
        poll_interval: Seconds between completion signal polls.
        context_content: Optional context content to write to the signal dir.

    Returns:
        A fully wired :class:`StandardTaskRunner`.
    """
    tmux_launcher = TmuxTaskLauncher()
    tmux_kickoff = TmuxTaskKickoff()

    def _make_preparer(runtime: TaskRuntime) -> TaskPreparer:
        dep_ids = graph.dependency_ids(runtime.task.id)
        dep_descs = {did: graph.task(did).description for did in dep_ids}
        return WorktreeTaskPreparer(
            repo_path=repo_path,
            graph_name=graph_name,
            dependency_descriptions=dep_descs,
            context_content=context_content,
        )

    def _make_merger(runtime: TaskRuntime) -> TaskMerger:
        return GhTaskMerger(repo_path=repo_path)

    def _make_completion_checker(runtime: TaskRuntime) -> TaskCompletionChecker:
        return SignalCompletionChecker(poll_interval=poll_interval)

    def _make_teardown(runtime: TaskRuntime) -> TaskTeardown:
        return WorktreeTaskTeardown(repo_path=repo_path, keep_panes=keep_panes)

    return StandardTaskRunner(
        _preparer=StepDispatch(default=_make_preparer),
        _launcher=StepDispatch(default=lambda rt: tmux_launcher),
        _kickoff=StepDispatch(default=lambda rt: tmux_kickoff),
        _completion_checker=StepDispatch(default=_make_completion_checker),
        _merger=StepDispatch(default=_make_merger),
        _teardown=StepDispatch(default=_make_teardown),
    )

build_standard_workstream_runner(repo_path, graph_name)

Build the standard workstream runner for git worktree + GitHub CLI.

Wires the three workstream lifecycle steps with concrete implementations:

+----------+-------------------------+ | Step | Implementation | +==========+=========================+ | preparer | GitWorkstreamPreparer | +----------+-------------------------+ | integrator | GhWorkstreamIntegrator | +----------+-------------------------+ | teardown | GitWorkstreamTeardown | +----------+-------------------------+

Parameters:

Name Type Description Default
repo_path Path

Path to the bare/main repository.

required
graph_name str

Name of the task graph being executed.

required

Returns:

Type Description
StandardWorkstreamRunner

A fully wired :class:StandardWorkstreamRunner.

Source code in src/agentrelay/orchestrator/builders.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def build_standard_workstream_runner(
    repo_path: Path,
    graph_name: str,
) -> StandardWorkstreamRunner:
    """Build the standard workstream runner for git worktree + GitHub CLI.

    Wires the three workstream lifecycle steps with concrete implementations:

    +----------+-------------------------+
    | Step     | Implementation          |
    +==========+=========================+
    | preparer | GitWorkstreamPreparer   |
    +----------+-------------------------+
    | integrator | GhWorkstreamIntegrator |
    +----------+-------------------------+
    | teardown | GitWorkstreamTeardown   |
    +----------+-------------------------+

    Args:
        repo_path: Path to the bare/main repository.
        graph_name: Name of the task graph being executed.

    Returns:
        A fully wired :class:`StandardWorkstreamRunner`.
    """
    return StandardWorkstreamRunner(
        _preparer=GitWorkstreamPreparer(repo_path=repo_path, graph_name=graph_name),
        _integrator=GhWorkstreamIntegrator(repo_path=repo_path),
        _teardown=GitWorkstreamTeardown(repo_path=repo_path),
    )