Skip to content

task_runner

Task runner package — one-task lifecycle state machine, dispatch, and I/O protocols.

Re-exports all public names so that from agentrelay.task_runner import X continues to work.

Subpackages

core: Protocols, state machine, dispatch, I/O boundary composition. implementations: Concrete protocol implementations.

StepDispatch dataclass

Bases: Generic[T]

Per-step dispatch table for one lifecycle step.

Selects the right protocol implementation based on the task's :class:~agentrelay.task.AgentFramework and :class:~agentrelay.environments.AgentEnvironment type. Each entry maps a (framework, env_type) key to a factory callable that receives the :class:TaskRuntime and returns a protocol implementation. The default fallback handles steps that don't vary by framework/environment.

Callable — use as self._preparer(runtime) directly via :meth:__call__.

Dispatch resolution order
  1. Exact match in entries for (framework, type(environment))
  2. default fallback
  3. KeyError if neither matches
Extension guide

To add support for a new AgentFramework or AgentEnvironment, add an entry to the entries dict for each step that has a distinct implementation for that combo. Steps that don't vary (e.g. preparer, merger) can continue using default.

Attributes:

Name Type Description
entries dict[DispatchKey, Callable[[TaskRuntime], T]]

Mapping of (AgentFramework, type) dispatch keys to factory callables returning protocol implementations.

default Callable[[TaskRuntime], T] | None

Fallback factory used when no exact key match exists.

Source code in src/agentrelay/task_runner/core/dispatch.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@dataclass(frozen=True)
class StepDispatch(Generic[T]):
    """Per-step dispatch table for one lifecycle step.

    Selects the right protocol implementation based on the task's
    :class:`~agentrelay.task.AgentFramework` and
    :class:`~agentrelay.environments.AgentEnvironment` type. Each entry
    maps a ``(framework, env_type)`` key to a factory callable that
    receives the :class:`TaskRuntime` and returns a protocol implementation.
    The ``default`` fallback handles steps that don't vary by
    framework/environment.

    Callable — use as ``self._preparer(runtime)`` directly via
    :meth:`__call__`.

    Dispatch resolution order:
      1. Exact match in ``entries`` for ``(framework, type(environment))``
      2. ``default`` fallback
      3. ``KeyError`` if neither matches

    Extension guide:
      To add support for a new ``AgentFramework`` or ``AgentEnvironment``,
      add an entry to the ``entries`` dict for each step that has a
      distinct implementation for that combo. Steps that don't vary
      (e.g. preparer, merger) can continue using ``default``.

    Attributes:
        entries: Mapping of ``(AgentFramework, type)`` dispatch keys to
            factory callables returning protocol implementations.
        default: Fallback factory used when no exact key match exists.
    """

    entries: dict[DispatchKey, Callable[[TaskRuntime], T]] = field(default_factory=dict)
    default: Callable[[TaskRuntime], T] | None = None

    def __call__(self, runtime: TaskRuntime) -> T:
        """Resolve and return the protocol implementation for this runtime.

        Args:
            runtime: Task runtime used to extract the dispatch key from
                ``runtime.task.primary_agent``.

        Returns:
            The resolved protocol implementation instance.

        Raises:
            KeyError: If no entry matches and no default is provided.
        """
        key = (
            runtime.task.primary_agent.framework,
            type(runtime.task.primary_agent.environment),
        )
        factory = self.entries.get(key)
        if factory is not None:
            return factory(runtime)
        if self.default is not None:
            return self.default(runtime)
        raise KeyError(
            f"No implementation registered for {key} and no default provided"
        )

__call__(runtime)

Resolve and return the protocol implementation for this runtime.

Parameters:

Name Type Description Default
runtime TaskRuntime

Task runtime used to extract the dispatch key from runtime.task.primary_agent.

required

Returns:

Type Description
T

The resolved protocol implementation instance.

Raises:

Type Description
KeyError

If no entry matches and no default is provided.

Source code in src/agentrelay/task_runner/core/dispatch.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __call__(self, runtime: TaskRuntime) -> T:
    """Resolve and return the protocol implementation for this runtime.

    Args:
        runtime: Task runtime used to extract the dispatch key from
            ``runtime.task.primary_agent``.

    Returns:
        The resolved protocol implementation instance.

    Raises:
        KeyError: If no entry matches and no default is provided.
    """
    key = (
        runtime.task.primary_agent.framework,
        type(runtime.task.primary_agent.environment),
    )
    factory = self.entries.get(key)
    if factory is not None:
        return factory(runtime)
    if self.default is not None:
        return self.default(runtime)
    raise KeyError(
        f"No implementation registered for {key} and no default provided"
    )

TaskCompletionChecker

Bases: Protocol

Wait for a terminal task completion signal.

Source code in src/agentrelay/task_runner/core/io.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@runtime_checkable
class TaskCompletionChecker(Protocol):
    """Wait for a terminal task completion signal."""

    async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
        """Wait for terminal task signal from the execution boundary.

        Args:
            runtime: Runtime envelope being observed.

        Returns:
            TaskCompletionSignal: Terminal signal payload with outcome data.
        """
        ...

wait_for_completion(runtime) async

Wait for terminal task signal from the execution boundary.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being observed.

required

Returns:

Name Type Description
TaskCompletionSignal TaskCompletionSignal

Terminal signal payload with outcome data.

Source code in src/agentrelay/task_runner/core/io.py
 96
 97
 98
 99
100
101
102
103
104
105
async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
    """Wait for terminal task signal from the execution boundary.

    Args:
        runtime: Runtime envelope being observed.

    Returns:
        TaskCompletionSignal: Terminal signal payload with outcome data.
    """
    ...

TaskCompletionSignal dataclass

Signal payload returned by the completion checker.

Attributes:

Name Type Description
outcome Literal['done', 'failed']

Terminal completion signal from external execution. "done" indicates the task finished and produced a PR. "failed" indicates the task failed before successful completion.

pr_url Optional[str]

Pull request URL for a "done" outcome, if available.

error Optional[str]

Failure detail for a "failed" outcome, if available.

concerns tuple[str, ...]

Semantic concerns captured during execution.

Source code in src/agentrelay/task_runner/core/io.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@dataclass(frozen=True)
class TaskCompletionSignal:
    """Signal payload returned by the completion checker.

    Attributes:
        outcome: Terminal completion signal from external execution.
            ``"done"`` indicates the task finished and produced a PR.
            ``"failed"`` indicates the task failed before successful completion.
        pr_url: Pull request URL for a ``"done"`` outcome, if available.
        error: Failure detail for a ``"failed"`` outcome, if available.
        concerns: Semantic concerns captured during execution.
    """

    outcome: Literal["done", "failed"]
    pr_url: Optional[str] = None
    error: Optional[str] = None
    concerns: tuple[str, ...] = ()

TaskKickoff

Bases: Protocol

Send kickoff instructions to a launched agent.

Source code in src/agentrelay/task_runner/core/io.py
78
79
80
81
82
83
84
85
86
87
88
89
@runtime_checkable
class TaskKickoff(Protocol):
    """Send kickoff instructions to a launched agent."""

    def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
        """Send kickoff instructions to the launched task agent.

        Args:
            runtime: Runtime envelope for the task being kicked off.
            agent: Live agent handle to send instructions to.
        """
        ...

kickoff(runtime, agent)

Send kickoff instructions to the launched task agent.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope for the task being kicked off.

required
agent Agent

Live agent handle to send instructions to.

required
Source code in src/agentrelay/task_runner/core/io.py
82
83
84
85
86
87
88
89
def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
    """Send kickoff instructions to the launched task agent.

    Args:
        runtime: Runtime envelope for the task being kicked off.
        agent: Live agent handle to send instructions to.
    """
    ...

TaskLauncher

Bases: Protocol

Launch and return the primary agent for a task.

Source code in src/agentrelay/task_runner/core/io.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@runtime_checkable
class TaskLauncher(Protocol):
    """Launch and return the primary agent for a task."""

    def launch(self, runtime: TaskRuntime) -> Agent:
        """Launch and return the primary agent for this task runtime.

        Args:
            runtime: Runtime envelope to launch against.

        Returns:
            Agent: Live agent handle bound to this task.
        """
        ...

launch(runtime)

Launch and return the primary agent for this task runtime.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to launch against.

required

Returns:

Name Type Description
Agent Agent

Live agent handle bound to this task.

Source code in src/agentrelay/task_runner/core/io.py
66
67
68
69
70
71
72
73
74
75
def launch(self, runtime: TaskRuntime) -> Agent:
    """Launch and return the primary agent for this task runtime.

    Args:
        runtime: Runtime envelope to launch against.

    Returns:
        Agent: Live agent handle bound to this task.
    """
    ...

TaskMerger

Bases: Protocol

Merge the completed task PR into the integration target.

Source code in src/agentrelay/task_runner/core/io.py
108
109
110
111
112
113
114
115
116
117
118
119
@runtime_checkable
class TaskMerger(Protocol):
    """Merge the completed task PR into the integration target."""

    def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> None:
        """Merge the completed task PR.

        Args:
            runtime: Runtime envelope being merged.
            pr_url: Pull request URL to merge.
        """
        ...

merge_pr(runtime, pr_url)

Merge the completed task PR.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being merged.

required
pr_url str

Pull request URL to merge.

required
Source code in src/agentrelay/task_runner/core/io.py
112
113
114
115
116
117
118
119
def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> None:
    """Merge the completed task PR.

    Args:
        runtime: Runtime envelope being merged.
        pr_url: Pull request URL to merge.
    """
    ...

TaskPreparer

Bases: Protocol

Prepare runtime execution prerequisites before agent launch.

Source code in src/agentrelay/task_runner/core/io.py
49
50
51
52
53
54
55
56
57
58
59
@runtime_checkable
class TaskPreparer(Protocol):
    """Prepare runtime execution prerequisites before agent launch."""

    def prepare(self, runtime: TaskRuntime) -> None:
        """Prepare runtime execution prerequisites.

        Args:
            runtime: Runtime envelope to prepare (e.g. branch, signal files).
        """
        ...

prepare(runtime)

Prepare runtime execution prerequisites.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to prepare (e.g. branch, signal files).

required
Source code in src/agentrelay/task_runner/core/io.py
53
54
55
56
57
58
59
def prepare(self, runtime: TaskRuntime) -> None:
    """Prepare runtime execution prerequisites.

    Args:
        runtime: Runtime envelope to prepare (e.g. branch, signal files).
    """
    ...

TaskTeardown

Bases: Protocol

Release runtime resources after terminal completion.

Source code in src/agentrelay/task_runner/core/io.py
122
123
124
125
126
127
128
129
130
131
132
@runtime_checkable
class TaskTeardown(Protocol):
    """Release runtime resources after terminal completion."""

    def teardown(self, runtime: TaskRuntime) -> None:
        """Release runtime resources after terminal completion.

        Args:
            runtime: Runtime envelope whose resources should be cleaned up.
        """
        ...

teardown(runtime)

Release runtime resources after terminal completion.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope whose resources should be cleaned up.

required
Source code in src/agentrelay/task_runner/core/io.py
126
127
128
129
130
131
132
def teardown(self, runtime: TaskRuntime) -> None:
    """Release runtime resources after terminal completion.

    Args:
        runtime: Runtime envelope whose resources should be cleaned up.
    """
    ...

StandardTaskRunner dataclass

Standard task lifecycle: prepare → launch → kickoff → wait → merge → teardown.

Uses :class:StepDispatch tables for per-step implementation selection based on the task's AgentFramework and AgentEnvironment.

Step sensitivity reference:

+-----------------------+----------------+----------------------+ | Step | Varies by env? | Varies by framework? | +=======================+================+======================+ | preparer | No | No | +-----------------------+----------------+----------------------+ | launcher | Yes | Yes | +-----------------------+----------------+----------------------+ | kickoff | Yes | Yes | +-----------------------+----------------+----------------------+ | completion_checker | Maybe | No | +-----------------------+----------------+----------------------+ | merger | No | No | +-----------------------+----------------+----------------------+ | teardown | Partially | No | +-----------------------+----------------+----------------------+

Extension guide — adding a new framework or environment: Add entries to the StepDispatch tables for steps that have distinct implementations. Steps that don't vary can keep using default.

Extension guide — different lifecycle (e.g., adding a review step): Create a new class satisfying the :class:TaskRunner protocol. It can reuse StepDispatch tables and per-step protocol implementations.

Attributes:

Name Type Description
_preparer StepDispatch[TaskPreparer]

Dispatch table for :class:TaskPreparer selection.

_launcher StepDispatch[TaskLauncher]

Dispatch table for :class:TaskLauncher selection.

_kickoff StepDispatch[TaskKickoff]

Dispatch table for :class:TaskKickoff selection.

_completion_checker StepDispatch[TaskCompletionChecker]

Dispatch table for :class:TaskCompletionChecker selection.

_merger StepDispatch[TaskMerger]

Dispatch table for :class:TaskMerger selection.

_teardown StepDispatch[TaskTeardown]

Dispatch table for :class:TaskTeardown selection.

Source code in src/agentrelay/task_runner/core/runner.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@dataclass
class StandardTaskRunner:
    """Standard task lifecycle: prepare → launch → kickoff → wait → merge → teardown.

    Uses :class:`StepDispatch` tables for per-step implementation selection
    based on the task's ``AgentFramework`` and ``AgentEnvironment``.

    Step sensitivity reference:

    +-----------------------+----------------+----------------------+
    | Step                  | Varies by env? | Varies by framework? |
    +=======================+================+======================+
    | preparer              | No             | No                   |
    +-----------------------+----------------+----------------------+
    | launcher              | Yes            | Yes                  |
    +-----------------------+----------------+----------------------+
    | kickoff               | Yes            | Yes                  |
    +-----------------------+----------------+----------------------+
    | completion_checker    | Maybe          | No                   |
    +-----------------------+----------------+----------------------+
    | merger                | No             | No                   |
    +-----------------------+----------------+----------------------+
    | teardown              | Partially      | No                   |
    +-----------------------+----------------+----------------------+

    Extension guide — adding a new framework or environment:
      Add entries to the ``StepDispatch`` tables for steps that have distinct
      implementations. Steps that don't vary can keep using ``default``.

    Extension guide — different lifecycle (e.g., adding a review step):
      Create a new class satisfying the :class:`TaskRunner` protocol. It can
      reuse ``StepDispatch`` tables and per-step protocol implementations.

    Attributes:
        _preparer: Dispatch table for :class:`TaskPreparer` selection.
        _launcher: Dispatch table for :class:`TaskLauncher` selection.
        _kickoff: Dispatch table for :class:`TaskKickoff` selection.
        _completion_checker: Dispatch table for :class:`TaskCompletionChecker` selection.
        _merger: Dispatch table for :class:`TaskMerger` selection.
        _teardown: Dispatch table for :class:`TaskTeardown` selection.
    """

    _preparer: StepDispatch[TaskPreparer]
    _launcher: StepDispatch[TaskLauncher]
    _kickoff: StepDispatch[TaskKickoff]
    _completion_checker: StepDispatch[TaskCompletionChecker]
    _merger: StepDispatch[TaskMerger]
    _teardown: StepDispatch[TaskTeardown]
    on_event: Optional[Callable[..., None]] = field(default=None, repr=False)

    async def run(
        self,
        runtime: TaskRuntime,
        *,
        teardown_mode: TearDownMode = TearDownMode.ALWAYS,
    ) -> TaskRunResult:
        """Execute one task lifecycle run and return a result snapshot.

        Args:
            runtime: Mutable task runtime to execute. Must enter in ``PENDING``
                state.
            teardown_mode: Resource teardown policy after run completion.

        Returns:
            TaskRunResult: Convenience snapshot of terminal runtime fields.

        Raises:
            ValueError: If ``runtime`` does not enter in ``PENDING`` status.
        """
        if runtime.state.status != TaskStatus.PENDING:
            raise ValueError(
                "StandardTaskRunner.run() requires runtime.state.status == PENDING. "
                f"Received {runtime.state.status!r}."
            )

        self._transition(runtime, TaskStatus.RUNNING)
        try:
            try:
                self._preparer(runtime).prepare(runtime)
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            self._emit_step(
                "task_prepared",
                runtime,
                f"branch={runtime.state.branch_name}",
            )

            try:
                agent = self._launcher(runtime).launch(runtime)
                runtime.artifacts.agent_address = agent.address
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            addr = runtime.artifacts.agent_address
            self._emit_step(
                "task_launched",
                runtime,
                addr.label if addr else None,
            )

            try:
                self._kickoff(runtime).kickoff(runtime, agent)
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            self._emit_step("task_waiting", runtime)

            try:
                signal = await self._completion_checker(runtime).wait_for_completion(
                    runtime
                )
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)

            runtime.artifacts.concerns.extend(signal.concerns)

            if signal.outcome == "failed":
                self._transition(runtime, TaskStatus.FAILED)
                runtime.state.error = (
                    signal.error or "Task failed without an error message."
                )
                return TaskRunResult.from_runtime(runtime)

            if not signal.pr_url:
                self._transition(runtime, TaskStatus.FAILED)
                runtime.state.error = (
                    "Task completion signaled 'done' but did not include pr_url."
                )
                return TaskRunResult.from_runtime(runtime)

            runtime.artifacts.pr_url = signal.pr_url
            self._transition(runtime, TaskStatus.PR_CREATED)

            self._emit_step("task_pr_merging", runtime, signal.pr_url)
            try:
                self._merger(runtime).merge_pr(runtime, signal.pr_url)
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)

            self._transition(runtime, TaskStatus.PR_MERGED)
        finally:
            if self._should_teardown(teardown_mode, runtime.state.status):
                try:
                    self._teardown(runtime).teardown(runtime)
                except Exception as exc:
                    runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

        return TaskRunResult.from_runtime(runtime)

    def _emit_step(
        self,
        kind: str,
        runtime: TaskRuntime,
        message: Optional[str] = None,
    ) -> None:
        """Emit a step-level event if an event callback is registered."""
        if self.on_event is None:
            return
        # Local import to avoid circular dependency (orchestrator → task_runner).
        from agentrelay.orchestrator.orchestrator import OrchestratorEvent

        self.on_event(
            OrchestratorEvent(
                kind=kind,
                task_id=runtime.task.id,
                workstream_id=runtime.task.workstream_id,
                message=message,
            )
        )

    def _transition(self, runtime: TaskRuntime, target: TaskStatus) -> None:
        """Transition runtime status while enforcing legal lifecycle edges."""
        current = runtime.state.status
        allowed = ALLOWED_TASK_TRANSITIONS[current]
        if target not in allowed:
            raise RuntimeError(
                f"Illegal task status transition: {current.value} -> {target.value}"
            )
        runtime.state.status = target

    def _transition_to_failed(self, runtime: TaskRuntime) -> None:
        """Move runtime to ``FAILED``, enforcing the transition table."""
        if runtime.state.status == TaskStatus.FAILED:
            return
        self._transition(runtime, TaskStatus.FAILED)

    def _record_io_failure(
        self, runtime: TaskRuntime, exc: Exception
    ) -> IntegrationFailureClass:
        """Record an I/O boundary failure and return its classification."""
        self._transition_to_failed(runtime)
        runtime.state.error = f"{type(exc).__name__}: {exc}"
        return classify_integration_error(exc)

    def _should_teardown(
        self, teardown_mode: TearDownMode, terminal_status: TaskStatus
    ) -> bool:
        """Return whether teardown should run for the given policy/status."""
        if teardown_mode == TearDownMode.ALWAYS:
            return True
        if teardown_mode == TearDownMode.NEVER:
            return False
        return terminal_status == TaskStatus.PR_MERGED

run(runtime, *, teardown_mode=TearDownMode.ALWAYS) async

Execute one task lifecycle run and return a result snapshot.

Parameters:

Name Type Description Default
runtime TaskRuntime

Mutable task runtime to execute. Must enter in PENDING state.

required
teardown_mode TearDownMode

Resource teardown policy after run completion.

ALWAYS

Returns:

Name Type Description
TaskRunResult TaskRunResult

Convenience snapshot of terminal runtime fields.

Raises:

Type Description
ValueError

If runtime does not enter in PENDING status.

Source code in src/agentrelay/task_runner/core/runner.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def run(
    self,
    runtime: TaskRuntime,
    *,
    teardown_mode: TearDownMode = TearDownMode.ALWAYS,
) -> TaskRunResult:
    """Execute one task lifecycle run and return a result snapshot.

    Args:
        runtime: Mutable task runtime to execute. Must enter in ``PENDING``
            state.
        teardown_mode: Resource teardown policy after run completion.

    Returns:
        TaskRunResult: Convenience snapshot of terminal runtime fields.

    Raises:
        ValueError: If ``runtime`` does not enter in ``PENDING`` status.
    """
    if runtime.state.status != TaskStatus.PENDING:
        raise ValueError(
            "StandardTaskRunner.run() requires runtime.state.status == PENDING. "
            f"Received {runtime.state.status!r}."
        )

    self._transition(runtime, TaskStatus.RUNNING)
    try:
        try:
            self._preparer(runtime).prepare(runtime)
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)
        self._emit_step(
            "task_prepared",
            runtime,
            f"branch={runtime.state.branch_name}",
        )

        try:
            agent = self._launcher(runtime).launch(runtime)
            runtime.artifacts.agent_address = agent.address
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)
        addr = runtime.artifacts.agent_address
        self._emit_step(
            "task_launched",
            runtime,
            addr.label if addr else None,
        )

        try:
            self._kickoff(runtime).kickoff(runtime, agent)
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)
        self._emit_step("task_waiting", runtime)

        try:
            signal = await self._completion_checker(runtime).wait_for_completion(
                runtime
            )
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)

        runtime.artifacts.concerns.extend(signal.concerns)

        if signal.outcome == "failed":
            self._transition(runtime, TaskStatus.FAILED)
            runtime.state.error = (
                signal.error or "Task failed without an error message."
            )
            return TaskRunResult.from_runtime(runtime)

        if not signal.pr_url:
            self._transition(runtime, TaskStatus.FAILED)
            runtime.state.error = (
                "Task completion signaled 'done' but did not include pr_url."
            )
            return TaskRunResult.from_runtime(runtime)

        runtime.artifacts.pr_url = signal.pr_url
        self._transition(runtime, TaskStatus.PR_CREATED)

        self._emit_step("task_pr_merging", runtime, signal.pr_url)
        try:
            self._merger(runtime).merge_pr(runtime, signal.pr_url)
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)

        self._transition(runtime, TaskStatus.PR_MERGED)
    finally:
        if self._should_teardown(teardown_mode, runtime.state.status):
            try:
                self._teardown(runtime).teardown(runtime)
            except Exception as exc:
                runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

    return TaskRunResult.from_runtime(runtime)

TaskRunner

Bases: Protocol

Protocol for the task runner boundary used by Orchestrator.

Different lifecycle variants (standard, reviewing, dry-run) are different classes satisfying this protocol. The orchestrator does not know or care about internal step structure.

Source code in src/agentrelay/task_runner/core/runner.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@runtime_checkable
class TaskRunner(Protocol):
    """Protocol for the task runner boundary used by Orchestrator.

    Different lifecycle variants (standard, reviewing, dry-run) are
    different classes satisfying this protocol. The orchestrator does
    not know or care about internal step structure.
    """

    async def run(
        self,
        runtime: TaskRuntime,
        *,
        teardown_mode: TearDownMode = TearDownMode.ALWAYS,
    ) -> TaskRunResult:
        """Execute one task attempt and return terminal task fields."""
        ...

run(runtime, *, teardown_mode=TearDownMode.ALWAYS) async

Execute one task attempt and return terminal task fields.

Source code in src/agentrelay/task_runner/core/runner.py
117
118
119
120
121
122
123
124
async def run(
    self,
    runtime: TaskRuntime,
    *,
    teardown_mode: TearDownMode = TearDownMode.ALWAYS,
) -> TaskRunResult:
    """Execute one task attempt and return terminal task fields."""
    ...

TaskRunResult dataclass

Convenience return value mirroring terminal fields on :class:TaskRuntime.

TaskRuntime remains the single source of truth; this result is a snapshot for ergonomic call-site consumption.

Attributes:

Name Type Description
task_id str

Task identifier.

status TaskStatus

Terminal task status after TaskRunner.run(...).

pr_url Optional[str]

Task PR URL, if one was recorded.

error Optional[str]

Task failure message, if one was recorded.

failure_class Optional[IntegrationFailureClass]

Integration error classification for I/O boundary failures. None for agent-signaled failures and successes.

Source code in src/agentrelay/task_runner/core/runner.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass(frozen=True)
class TaskRunResult:
    """Convenience return value mirroring terminal fields on :class:`TaskRuntime`.

    ``TaskRuntime`` remains the single source of truth; this result is a snapshot
    for ergonomic call-site consumption.

    Attributes:
        task_id: Task identifier.
        status: Terminal task status after ``TaskRunner.run(...)``.
        pr_url: Task PR URL, if one was recorded.
        error: Task failure message, if one was recorded.
        failure_class: Integration error classification for I/O boundary
            failures.  ``None`` for agent-signaled failures and successes.
    """

    task_id: str
    status: TaskStatus
    pr_url: Optional[str]
    error: Optional[str]
    failure_class: Optional[IntegrationFailureClass] = None

    @classmethod
    def from_runtime(
        cls,
        runtime: TaskRuntime,
        failure_class: Optional[IntegrationFailureClass] = None,
    ) -> TaskRunResult:
        """Build a result snapshot from the current runtime state.

        Args:
            runtime: Runtime envelope to snapshot.
            failure_class: Optional integration error classification.

        Returns:
            TaskRunResult: Snapshot of task ID, status, PR URL, and error.
        """
        return cls(
            task_id=runtime.task.id,
            status=runtime.state.status,
            pr_url=runtime.artifacts.pr_url,
            error=runtime.state.error,
            failure_class=failure_class,
        )

from_runtime(runtime, failure_class=None) classmethod

Build a result snapshot from the current runtime state.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to snapshot.

required
failure_class Optional[IntegrationFailureClass]

Optional integration error classification.

None

Returns:

Name Type Description
TaskRunResult TaskRunResult

Snapshot of task ID, status, PR URL, and error.

Source code in src/agentrelay/task_runner/core/runner.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@classmethod
def from_runtime(
    cls,
    runtime: TaskRuntime,
    failure_class: Optional[IntegrationFailureClass] = None,
) -> TaskRunResult:
    """Build a result snapshot from the current runtime state.

    Args:
        runtime: Runtime envelope to snapshot.
        failure_class: Optional integration error classification.

    Returns:
        TaskRunResult: Snapshot of task ID, status, PR URL, and error.
    """
    return cls(
        task_id=runtime.task.id,
        status=runtime.state.status,
        pr_url=runtime.artifacts.pr_url,
        error=runtime.state.error,
        failure_class=failure_class,
    )

TearDownMode

Bases: str, Enum

Policy controlling whether runtime resources are torn down after run.

Attributes:

Name Type Description
ALWAYS

Always call teardown at end of run.

NEVER

Never call teardown.

ON_SUCCESS

Call teardown only when task reaches PR_MERGED.

Source code in src/agentrelay/task_runner/core/runner.py
48
49
50
51
52
53
54
55
56
57
58
59
class TearDownMode(str, Enum):
    """Policy controlling whether runtime resources are torn down after run.

    Attributes:
        ALWAYS: Always call teardown at end of run.
        NEVER: Never call teardown.
        ON_SUCCESS: Call teardown only when task reaches ``PR_MERGED``.
    """

    ALWAYS = "always"
    NEVER = "never"
    ON_SUCCESS = "on_success"

GhTaskMerger dataclass

Merge a task PR via GitHub CLI and update the local integration branch.

After merging, fetches the updated integration branch and writes a .merged signal file to the task's signal directory.

Reads integration_branch from runtime.state (set by the orchestrator before dispatch).

Source code in src/agentrelay/task_runner/implementations/task_merger.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@dataclass
class GhTaskMerger:
    """Merge a task PR via GitHub CLI and update the local integration branch.

    After merging, fetches the updated integration branch and writes a
    ``.merged`` signal file to the task's signal directory.

    Reads ``integration_branch`` from ``runtime.state`` (set by the
    orchestrator before dispatch).
    """

    repo_path: Path

    def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> None:
        """Merge the completed task PR.

        Args:
            runtime: Runtime envelope being merged.
            pr_url: Pull request URL to merge.

        Raises:
            ValueError: If ``runtime.state.integration_branch`` is None.
        """
        integration_branch = runtime.state.integration_branch
        if integration_branch is None:
            raise ValueError(
                "runtime.state.integration_branch must be set before merge_pr()"
            )

        gh.pr_merge(pr_url)

        git.fetch_branch(self.repo_path, integration_branch)
        git.update_local_ref(
            self.repo_path,
            integration_branch,
            f"origin/{integration_branch}",
        )

        if runtime.state.signal_dir is not None:
            timestamp = datetime.now(timezone.utc).isoformat() + "\n"
            signals.write_text(runtime.state.signal_dir, ".merged", timestamp)

merge_pr(runtime, pr_url)

Merge the completed task PR.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being merged.

required
pr_url str

Pull request URL to merge.

required

Raises:

Type Description
ValueError

If runtime.state.integration_branch is None.

Source code in src/agentrelay/task_runner/implementations/task_merger.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> None:
    """Merge the completed task PR.

    Args:
        runtime: Runtime envelope being merged.
        pr_url: Pull request URL to merge.

    Raises:
        ValueError: If ``runtime.state.integration_branch`` is None.
    """
    integration_branch = runtime.state.integration_branch
    if integration_branch is None:
        raise ValueError(
            "runtime.state.integration_branch must be set before merge_pr()"
        )

    gh.pr_merge(pr_url)

    git.fetch_branch(self.repo_path, integration_branch)
    git.update_local_ref(
        self.repo_path,
        integration_branch,
        f"origin/{integration_branch}",
    )

    if runtime.state.signal_dir is not None:
        timestamp = datetime.now(timezone.utc).isoformat() + "\n"
        signals.write_text(runtime.state.signal_dir, ".merged", timestamp)

SignalCompletionChecker dataclass

Poll signal files and parse the result into a completion signal.

Watches the task's signal directory for .done or .failed files, then parses the signal file content and any concerns.log entries into a :class:TaskCompletionSignal.

Source code in src/agentrelay/task_runner/implementations/task_completion_checker.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@dataclass
class SignalCompletionChecker:
    """Poll signal files and parse the result into a completion signal.

    Watches the task's signal directory for ``.done`` or ``.failed`` files,
    then parses the signal file content and any ``concerns.log`` entries
    into a :class:`TaskCompletionSignal`.
    """

    poll_interval: float = 2.0

    async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
        """Wait for terminal task signal from the execution boundary.

        Args:
            runtime: Runtime envelope being observed.

        Returns:
            TaskCompletionSignal: Terminal signal payload with outcome data.

        Raises:
            ValueError: If ``signal_dir`` is not set on the runtime state.
        """
        if runtime.state.signal_dir is None:
            raise ValueError("runtime.state.signal_dir must be set before polling")

        signal_dir = runtime.state.signal_dir
        found = await signals.poll_signal_files(
            signal_dir, (".done", ".failed"), self.poll_interval
        )

        content = signals.read_signal_file(signal_dir, found) or ""
        lines = content.splitlines()

        # Line 1 = ISO timestamp (informational), Line 2 = payload
        payload = lines[1].strip() if len(lines) > 1 else None

        concerns_text = signals.read_signal_file(signal_dir, "concerns.log")
        concerns = _parse_concerns(concerns_text)

        if found == ".done":
            return TaskCompletionSignal(
                outcome="done",
                pr_url=payload,
                concerns=concerns,
            )

        return TaskCompletionSignal(
            outcome="failed",
            error=payload,
            concerns=concerns,
        )

wait_for_completion(runtime) async

Wait for terminal task signal from the execution boundary.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being observed.

required

Returns:

Name Type Description
TaskCompletionSignal TaskCompletionSignal

Terminal signal payload with outcome data.

Raises:

Type Description
ValueError

If signal_dir is not set on the runtime state.

Source code in src/agentrelay/task_runner/implementations/task_completion_checker.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
    """Wait for terminal task signal from the execution boundary.

    Args:
        runtime: Runtime envelope being observed.

    Returns:
        TaskCompletionSignal: Terminal signal payload with outcome data.

    Raises:
        ValueError: If ``signal_dir`` is not set on the runtime state.
    """
    if runtime.state.signal_dir is None:
        raise ValueError("runtime.state.signal_dir must be set before polling")

    signal_dir = runtime.state.signal_dir
    found = await signals.poll_signal_files(
        signal_dir, (".done", ".failed"), self.poll_interval
    )

    content = signals.read_signal_file(signal_dir, found) or ""
    lines = content.splitlines()

    # Line 1 = ISO timestamp (informational), Line 2 = payload
    payload = lines[1].strip() if len(lines) > 1 else None

    concerns_text = signals.read_signal_file(signal_dir, "concerns.log")
    concerns = _parse_concerns(concerns_text)

    if found == ".done":
        return TaskCompletionSignal(
            outcome="done",
            pr_url=payload,
            concerns=concerns,
        )

    return TaskCompletionSignal(
        outcome="failed",
        error=payload,
        concerns=concerns,
    )

TmuxTaskKickoff dataclass

Send kickoff instructions to a launched agent.

Delegates to :meth:Agent.send_kickoff with the path to the instructions.md file in the task's signal directory.

Source code in src/agentrelay/task_runner/implementations/task_kickoff.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class TmuxTaskKickoff:
    """Send kickoff instructions to a launched agent.

    Delegates to :meth:`Agent.send_kickoff` with the path to the
    ``instructions.md`` file in the task's signal directory.
    """

    def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
        """Send kickoff instructions to the launched task agent.

        Args:
            runtime: Runtime envelope for the task being kicked off.
            agent: Live agent handle to send instructions to.

        Raises:
            ValueError: If ``signal_dir`` is not set on the runtime state.
        """
        if runtime.state.signal_dir is None:
            raise ValueError("runtime.state.signal_dir must be set before kickoff")

        instructions_path = str(runtime.state.signal_dir / "instructions.md")
        agent.send_kickoff(instructions_path)

kickoff(runtime, agent)

Send kickoff instructions to the launched task agent.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope for the task being kicked off.

required
agent Agent

Live agent handle to send instructions to.

required

Raises:

Type Description
ValueError

If signal_dir is not set on the runtime state.

Source code in src/agentrelay/task_runner/implementations/task_kickoff.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
    """Send kickoff instructions to the launched task agent.

    Args:
        runtime: Runtime envelope for the task being kicked off.
        agent: Live agent handle to send instructions to.

    Raises:
        ValueError: If ``signal_dir`` is not set on the runtime state.
    """
    if runtime.state.signal_dir is None:
        raise ValueError("runtime.state.signal_dir must be set before kickoff")

    instructions_path = str(runtime.state.signal_dir / "instructions.md")
    agent.send_kickoff(instructions_path)

TmuxTaskLauncher dataclass

Launch a Claude Code agent in a tmux pane.

Delegates to :meth:TmuxAgent.from_config to create a tmux window, launch the Claude Code process, and return a live agent handle.

Source code in src/agentrelay/task_runner/implementations/task_launcher.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class TmuxTaskLauncher:
    """Launch a Claude Code agent in a tmux pane.

    Delegates to :meth:`TmuxAgent.from_config` to create a tmux window,
    launch the Claude Code process, and return a live agent handle.
    """

    def launch(self, runtime: TaskRuntime) -> Agent:
        """Launch and return the primary agent for this task runtime.

        Args:
            runtime: Runtime envelope to launch against.  Must have
                ``state.worktree_path`` and ``state.signal_dir`` set
                (typically by a prior :class:`TaskPreparer` step).

        Returns:
            Agent: Live agent handle bound to this task.

        Raises:
            ValueError: If ``worktree_path`` or ``signal_dir`` are not set.
        """
        if runtime.state.worktree_path is None:
            raise ValueError("runtime.state.worktree_path must be set before launch")
        if runtime.state.signal_dir is None:
            raise ValueError("runtime.state.signal_dir must be set before launch")

        return TmuxAgent.from_config(
            config=runtime.task.primary_agent,
            task_id=runtime.task.id,
            worktree_path=runtime.state.worktree_path,
            signal_dir=runtime.state.signal_dir,
        )

launch(runtime)

Launch and return the primary agent for this task runtime.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to launch against. Must have state.worktree_path and state.signal_dir set (typically by a prior :class:TaskPreparer step).

required

Returns:

Name Type Description
Agent Agent

Live agent handle bound to this task.

Raises:

Type Description
ValueError

If worktree_path or signal_dir are not set.

Source code in src/agentrelay/task_runner/implementations/task_launcher.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def launch(self, runtime: TaskRuntime) -> Agent:
    """Launch and return the primary agent for this task runtime.

    Args:
        runtime: Runtime envelope to launch against.  Must have
            ``state.worktree_path`` and ``state.signal_dir`` set
            (typically by a prior :class:`TaskPreparer` step).

    Returns:
        Agent: Live agent handle bound to this task.

    Raises:
        ValueError: If ``worktree_path`` or ``signal_dir`` are not set.
    """
    if runtime.state.worktree_path is None:
        raise ValueError("runtime.state.worktree_path must be set before launch")
    if runtime.state.signal_dir is None:
        raise ValueError("runtime.state.signal_dir must be set before launch")

    return TmuxAgent.from_config(
        config=runtime.task.primary_agent,
        task_id=runtime.task.id,
        worktree_path=runtime.state.worktree_path,
        signal_dir=runtime.state.signal_dir,
    )

WorktreeTaskPreparer dataclass

Create a task branch in a shared workstream worktree and write protocol files.

Creates a task-specific branch off the integration branch, checks it out in the workstream worktree, writes manifest.json, policies.json, and instructions.md into the signal directory, and updates the runtime state with computed paths.

The worktree itself is owned by the workstream preparer — this class only creates and checks out branches within it.

Source code in src/agentrelay/task_runner/implementations/task_preparer.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@dataclass
class WorktreeTaskPreparer:
    """Create a task branch in a shared workstream worktree and write protocol files.

    Creates a task-specific branch off the integration branch, checks it out
    in the workstream worktree, writes ``manifest.json``, ``policies.json``,
    and ``instructions.md`` into the signal directory, and updates the runtime
    state with computed paths.

    The worktree itself is owned by the workstream preparer — this class only
    creates and checks out branches within it.
    """

    repo_path: Path
    graph_name: str
    dependency_descriptions: dict[str, Optional[str]] = field(default_factory=dict)
    context_content: Optional[str] = None

    def prepare(self, runtime: TaskRuntime) -> None:
        """Prepare runtime execution prerequisites.

        Creates a task branch in the workstream worktree, checks it out,
        and writes protocol files to the signal directory.

        Reads ``integration_branch`` and ``workstream_worktree_path`` from
        ``runtime.state`` (set by the orchestrator before dispatch).

        Args:
            runtime: Runtime envelope to prepare (e.g. branch, signal files).

        Raises:
            ValueError: If ``runtime.state.integration_branch`` or
                ``runtime.state.workstream_worktree_path`` is None.
        """
        integration_branch = runtime.state.integration_branch
        workstream_worktree_path = runtime.state.workstream_worktree_path
        if integration_branch is None:
            raise ValueError(
                "runtime.state.integration_branch must be set before prepare()"
            )
        if workstream_worktree_path is None:
            raise ValueError(
                "runtime.state.workstream_worktree_path must be set before prepare()"
            )

        task = runtime.task
        branch_name = f"agentrelay/{self.graph_name}/{task.id}"
        signal_dir = self.repo_path / f".workflow/{self.graph_name}/signals/{task.id}"

        try:
            git.branch_create(
                workstream_worktree_path,
                branch_name,
                integration_branch,
                force=True,
            )
            git.checkout(workstream_worktree_path, branch_name)
        except subprocess.CalledProcessError as exc:
            raise _WorkspaceIntegrationError(
                f"Failed to create branch for task {task.id!r}: {exc}",
            ) from exc

        signals.ensure_signal_dir(signal_dir)

        manifest = build_manifest(
            task=task,
            branch_name=branch_name,
            integration_branch=integration_branch,
            graph_name=self.graph_name,
            attempt_num=runtime.state.attempt_num,
            dependency_descriptions=self.dependency_descriptions,
        )
        signals.write_json(signal_dir, "manifest.json", manifest_to_dict(manifest))

        policies = build_policies(task, integration_branch)
        signals.write_json(signal_dir, "policies.json", policies_to_dict(policies))

        instructions = resolve_instructions(task.role, manifest)
        signals.write_text(signal_dir, "instructions.md", instructions)

        if self.context_content is not None:
            signals.write_text(signal_dir, "context.md", self.context_content)

        runtime.state.worktree_path = workstream_worktree_path
        runtime.state.branch_name = branch_name
        runtime.state.signal_dir = signal_dir

prepare(runtime)

Prepare runtime execution prerequisites.

Creates a task branch in the workstream worktree, checks it out, and writes protocol files to the signal directory.

Reads integration_branch and workstream_worktree_path from runtime.state (set by the orchestrator before dispatch).

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to prepare (e.g. branch, signal files).

required

Raises:

Type Description
ValueError

If runtime.state.integration_branch or runtime.state.workstream_worktree_path is None.

Source code in src/agentrelay/task_runner/implementations/task_preparer.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def prepare(self, runtime: TaskRuntime) -> None:
    """Prepare runtime execution prerequisites.

    Creates a task branch in the workstream worktree, checks it out,
    and writes protocol files to the signal directory.

    Reads ``integration_branch`` and ``workstream_worktree_path`` from
    ``runtime.state`` (set by the orchestrator before dispatch).

    Args:
        runtime: Runtime envelope to prepare (e.g. branch, signal files).

    Raises:
        ValueError: If ``runtime.state.integration_branch`` or
            ``runtime.state.workstream_worktree_path`` is None.
    """
    integration_branch = runtime.state.integration_branch
    workstream_worktree_path = runtime.state.workstream_worktree_path
    if integration_branch is None:
        raise ValueError(
            "runtime.state.integration_branch must be set before prepare()"
        )
    if workstream_worktree_path is None:
        raise ValueError(
            "runtime.state.workstream_worktree_path must be set before prepare()"
        )

    task = runtime.task
    branch_name = f"agentrelay/{self.graph_name}/{task.id}"
    signal_dir = self.repo_path / f".workflow/{self.graph_name}/signals/{task.id}"

    try:
        git.branch_create(
            workstream_worktree_path,
            branch_name,
            integration_branch,
            force=True,
        )
        git.checkout(workstream_worktree_path, branch_name)
    except subprocess.CalledProcessError as exc:
        raise _WorkspaceIntegrationError(
            f"Failed to create branch for task {task.id!r}: {exc}",
        ) from exc

    signals.ensure_signal_dir(signal_dir)

    manifest = build_manifest(
        task=task,
        branch_name=branch_name,
        integration_branch=integration_branch,
        graph_name=self.graph_name,
        attempt_num=runtime.state.attempt_num,
        dependency_descriptions=self.dependency_descriptions,
    )
    signals.write_json(signal_dir, "manifest.json", manifest_to_dict(manifest))

    policies = build_policies(task, integration_branch)
    signals.write_json(signal_dir, "policies.json", policies_to_dict(policies))

    instructions = resolve_instructions(task.role, manifest)
    signals.write_text(signal_dir, "instructions.md", instructions)

    if self.context_content is not None:
        signals.write_text(signal_dir, "context.md", self.context_content)

    runtime.state.worktree_path = workstream_worktree_path
    runtime.state.branch_name = branch_name
    runtime.state.signal_dir = signal_dir

WorktreeTaskTeardown dataclass

Delegate agent-address cleanup and delete the task branch.

Performs best-effort cleanup: delegates log capture and environment teardown to the agent address, then deletes the task branch. The workstream worktree is owned by the workstream teardown handler and is not touched here. Errors during teardown are caught and not propagated.

Source code in src/agentrelay/task_runner/implementations/task_teardown.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@dataclass
class WorktreeTaskTeardown:
    """Delegate agent-address cleanup and delete the task branch.

    Performs best-effort cleanup: delegates log capture and environment
    teardown to the agent address, then deletes the task branch.
    The workstream worktree is owned by the workstream teardown handler
    and is not touched here. Errors during teardown are caught and not
    propagated.
    """

    repo_path: Path
    keep_panes: bool = False

    def teardown(self, runtime: TaskRuntime) -> None:
        """Release runtime resources after terminal completion.

        Args:
            runtime: Runtime envelope whose resources should be cleaned up.
        """
        agent_address = runtime.artifacts.agent_address

        if agent_address is not None:
            agent_address.teardown(
                signal_dir=runtime.state.signal_dir,
                keep_panes=self.keep_panes,
            )

        if runtime.state.branch_name is not None:
            try:
                git.branch_delete(self.repo_path, runtime.state.branch_name)
            except subprocess.CalledProcessError:
                pass  # Best-effort: branch may have been deleted by GitHub

teardown(runtime)

Release runtime resources after terminal completion.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope whose resources should be cleaned up.

required
Source code in src/agentrelay/task_runner/implementations/task_teardown.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def teardown(self, runtime: TaskRuntime) -> None:
    """Release runtime resources after terminal completion.

    Args:
        runtime: Runtime envelope whose resources should be cleaned up.
    """
    agent_address = runtime.artifacts.agent_address

    if agent_address is not None:
        agent_address.teardown(
            signal_dir=runtime.state.signal_dir,
            keep_panes=self.keep_panes,
        )

    if runtime.state.branch_name is not None:
        try:
            git.branch_delete(self.repo_path, runtime.state.branch_name)
        except subprocess.CalledProcessError:
            pass  # Best-effort: branch may have been deleted by GitHub