Skip to content

task_runner

View module diagram

Task runner package — one-task lifecycle state machine, dispatch, and I/O protocols.

Re-exports all public names so that from agentrelay.task_runner import X continues to work.

Subpackages

core: Protocols, state machine, dispatch, I/O boundary composition. implementations: Concrete protocol implementations.

StepDispatch dataclass

Bases: Generic[T]

Per-step dispatch table for one lifecycle step.

Selects the right protocol implementation based on the task's :class:~agentrelay.task.AgentFramework and :class:~agentrelay.environments.AgentEnvironment type. Each entry maps a (framework, env_type) key to a factory callable that receives the :class:TaskRuntime and returns a protocol implementation. The default fallback handles steps that don't vary by framework/environment.

Callable — use as self._preparer(runtime) directly via :meth:__call__.

Dispatch resolution order
  1. Exact match in entries for (framework, type(environment))
  2. default fallback
  3. KeyError if neither matches
Extension guide

To add support for a new AgentFramework or AgentEnvironment, add an entry to the entries dict for each step that has a distinct implementation for that combo. Steps that don't vary (e.g. preparer, merger) can continue using default.

Attributes:

Name Type Description
entries dict[DispatchKey, Callable[[TaskRuntime], T]]

Mapping of (AgentFramework, type) dispatch keys to factory callables returning protocol implementations.

default Callable[[TaskRuntime], T] | None

Fallback factory used when no exact key match exists.

Source code in src/agentrelay/task_runner/core/dispatch.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@dataclass(frozen=True)
class StepDispatch(Generic[T]):
    """Per-step dispatch table for one lifecycle step.

    Selects the right protocol implementation based on the task's
    :class:`~agentrelay.task.AgentFramework` and
    :class:`~agentrelay.environments.AgentEnvironment` type. Each entry
    maps a ``(framework, env_type)`` key to a factory callable that
    receives the :class:`TaskRuntime` and returns a protocol implementation.
    The ``default`` fallback handles steps that don't vary by
    framework/environment.

    Callable — use as ``self._preparer(runtime)`` directly via
    :meth:`__call__`.

    Dispatch resolution order:
      1. Exact match in ``entries`` for ``(framework, type(environment))``
      2. ``default`` fallback
      3. ``KeyError`` if neither matches

    Extension guide:
      To add support for a new ``AgentFramework`` or ``AgentEnvironment``,
      add an entry to the ``entries`` dict for each step that has a
      distinct implementation for that combo. Steps that don't vary
      (e.g. preparer, merger) can continue using ``default``.

    Attributes:
        entries: Mapping of ``(AgentFramework, type)`` dispatch keys to
            factory callables returning protocol implementations.
        default: Fallback factory used when no exact key match exists.
    """

    entries: dict[DispatchKey, Callable[[TaskRuntime], T]] = field(default_factory=dict)
    default: Callable[[TaskRuntime], T] | None = None

    def __call__(self, runtime: TaskRuntime) -> T:
        """Resolve and return the protocol implementation for this runtime.

        Args:
            runtime: Task runtime used to extract the dispatch key from
                ``runtime.task.primary_agent``.

        Returns:
            The resolved protocol implementation instance.

        Raises:
            KeyError: If no entry matches and no default is provided.
        """
        key = (
            runtime.task.primary_agent.framework,
            type(runtime.task.primary_agent.environment),
        )
        factory = self.entries.get(key)
        if factory is not None:
            return factory(runtime)
        if self.default is not None:
            return self.default(runtime)
        raise KeyError(
            f"No implementation registered for {key} and no default provided"
        )

__call__(runtime)

Resolve and return the protocol implementation for this runtime.

Parameters:

Name Type Description Default
runtime TaskRuntime

Task runtime used to extract the dispatch key from runtime.task.primary_agent.

required

Returns:

Type Description
T

The resolved protocol implementation instance.

Raises:

Type Description
KeyError

If no entry matches and no default is provided.

Source code in src/agentrelay/task_runner/core/dispatch.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __call__(self, runtime: TaskRuntime) -> T:
    """Resolve and return the protocol implementation for this runtime.

    Args:
        runtime: Task runtime used to extract the dispatch key from
            ``runtime.task.primary_agent``.

    Returns:
        The resolved protocol implementation instance.

    Raises:
        KeyError: If no entry matches and no default is provided.
    """
    key = (
        runtime.task.primary_agent.framework,
        type(runtime.task.primary_agent.environment),
    )
    factory = self.entries.get(key)
    if factory is not None:
        return factory(runtime)
    if self.default is not None:
        return self.default(runtime)
    raise KeyError(
        f"No implementation registered for {key} and no default provided"
    )

GateCheckResult dataclass

Result of a single completion gate check attempt.

Attributes:

Name Type Description
passed bool

Whether the gate command exited successfully (exit code 0).

output str

Combined stdout and stderr from the gate command.

Source code in src/agentrelay/task_runner/core/io.py
113
114
115
116
117
118
119
120
121
122
123
@dataclass(frozen=True)
class GateCheckResult:
    """Result of a single completion gate check attempt.

    Attributes:
        passed: Whether the gate command exited successfully (exit code 0).
        output: Combined stdout and stderr from the gate command.
    """

    passed: bool
    output: str

TaskCompletionChecker

Bases: Protocol

Wait for a terminal task completion signal.

Source code in src/agentrelay/task_runner/core/io.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@runtime_checkable
class TaskCompletionChecker(Protocol):
    """Wait for a terminal task completion signal."""

    async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
        """Wait for terminal task signal from the execution boundary.

        Args:
            runtime: Runtime envelope being observed.

        Returns:
            TaskCompletionSignal: Terminal signal payload with outcome data.
        """
        ...

wait_for_completion(runtime) async

Wait for terminal task signal from the execution boundary.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being observed.

required

Returns:

Name Type Description
TaskCompletionSignal TaskCompletionSignal

Terminal signal payload with outcome data.

Source code in src/agentrelay/task_runner/core/io.py
101
102
103
104
105
106
107
108
109
110
async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
    """Wait for terminal task signal from the execution boundary.

    Args:
        runtime: Runtime envelope being observed.

    Returns:
        TaskCompletionSignal: Terminal signal payload with outcome data.
    """
    ...

TaskCompletionSignal dataclass

Signal payload returned by the completion checker.

Attributes:

Name Type Description
outcome Literal['done', 'failed']

Terminal completion signal from external execution. "done" indicates the task finished and produced a PR. "failed" indicates the task failed before successful completion.

pr_url Optional[str]

Pull request URL for a "done" outcome, if available.

error Optional[str]

Failure detail for a "failed" outcome, if available.

concerns tuple[str, ...]

Design concerns captured during execution.

ops_concerns tuple[str, ...]

Operational concerns (build errors, tooling friction) captured during execution.

Source code in src/agentrelay/task_runner/core/io.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass(frozen=True)
class TaskCompletionSignal:
    """Signal payload returned by the completion checker.

    Attributes:
        outcome: Terminal completion signal from external execution.
            ``"done"`` indicates the task finished and produced a PR.
            ``"failed"`` indicates the task failed before successful completion.
        pr_url: Pull request URL for a ``"done"`` outcome, if available.
        error: Failure detail for a ``"failed"`` outcome, if available.
        concerns: Design concerns captured during execution.
        ops_concerns: Operational concerns (build errors, tooling friction)
            captured during execution.
    """

    outcome: Literal["done", "failed"]
    pr_url: Optional[str] = None
    error: Optional[str] = None
    concerns: tuple[str, ...] = ()
    ops_concerns: tuple[str, ...] = ()

TaskGateChecker

Bases: Protocol

Run a completion gate command and return the result.

Source code in src/agentrelay/task_runner/core/io.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@runtime_checkable
class TaskGateChecker(Protocol):
    """Run a completion gate command and return the result."""

    def check_gate(self, runtime: TaskRuntime) -> GateCheckResult:
        """Execute the completion gate command for a task.

        Args:
            runtime: Runtime envelope with gate command and worktree path.

        Returns:
            GateCheckResult: Pass/fail outcome and captured output.
        """
        ...

check_gate(runtime)

Execute the completion gate command for a task.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope with gate command and worktree path.

required

Returns:

Name Type Description
GateCheckResult GateCheckResult

Pass/fail outcome and captured output.

Source code in src/agentrelay/task_runner/core/io.py
130
131
132
133
134
135
136
137
138
139
def check_gate(self, runtime: TaskRuntime) -> GateCheckResult:
    """Execute the completion gate command for a task.

    Args:
        runtime: Runtime envelope with gate command and worktree path.

    Returns:
        GateCheckResult: Pass/fail outcome and captured output.
    """
    ...

TaskKickoff

Bases: Protocol

Send kickoff instructions to a launched agent.

Source code in src/agentrelay/task_runner/core/io.py
83
84
85
86
87
88
89
90
91
92
93
94
@runtime_checkable
class TaskKickoff(Protocol):
    """Send kickoff instructions to a launched agent."""

    def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
        """Send kickoff instructions to the launched task agent.

        Args:
            runtime: Runtime envelope for the task being kicked off.
            agent: Live agent handle to send instructions to.
        """
        ...

kickoff(runtime, agent)

Send kickoff instructions to the launched task agent.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope for the task being kicked off.

required
agent Agent

Live agent handle to send instructions to.

required
Source code in src/agentrelay/task_runner/core/io.py
87
88
89
90
91
92
93
94
def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
    """Send kickoff instructions to the launched task agent.

    Args:
        runtime: Runtime envelope for the task being kicked off.
        agent: Live agent handle to send instructions to.
    """
    ...

TaskLauncher

Bases: Protocol

Launch and return the primary agent for a task.

Source code in src/agentrelay/task_runner/core/io.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@runtime_checkable
class TaskLauncher(Protocol):
    """Launch and return the primary agent for a task."""

    def launch(self, runtime: TaskRuntime) -> Agent:
        """Launch and return the primary agent for this task runtime.

        Args:
            runtime: Runtime envelope to launch against.

        Returns:
            Agent: Live agent handle bound to this task.
        """
        ...

launch(runtime)

Launch and return the primary agent for this task runtime.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to launch against.

required

Returns:

Name Type Description
Agent Agent

Live agent handle bound to this task.

Source code in src/agentrelay/task_runner/core/io.py
71
72
73
74
75
76
77
78
79
80
def launch(self, runtime: TaskRuntime) -> Agent:
    """Launch and return the primary agent for this task runtime.

    Args:
        runtime: Runtime envelope to launch against.

    Returns:
        Agent: Live agent handle bound to this task.
    """
    ...

TaskLogCapture

Bases: Protocol

Capture agent execution log unconditionally at task termination.

Source code in src/agentrelay/task_runner/core/io.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@runtime_checkable
class TaskLogCapture(Protocol):
    """Capture agent execution log unconditionally at task termination."""

    def capture_log(self, runtime: TaskRuntime) -> None:
        """Capture the agent's execution log to the signal directory.

        Called unconditionally when a task reaches a terminal status,
        regardless of teardown mode.

        Args:
            runtime: Runtime envelope whose agent log should be captured.
        """
        ...

capture_log(runtime)

Capture the agent's execution log to the signal directory.

Called unconditionally when a task reaches a terminal status, regardless of teardown mode.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope whose agent log should be captured.

required
Source code in src/agentrelay/task_runner/core/io.py
175
176
177
178
179
180
181
182
183
184
def capture_log(self, runtime: TaskRuntime) -> None:
    """Capture the agent's execution log to the signal directory.

    Called unconditionally when a task reaches a terminal status,
    regardless of teardown mode.

    Args:
        runtime: Runtime envelope whose agent log should be captured.
    """
    ...

TaskMerger

Bases: Protocol

Merge the completed task PR into the integration target.

Source code in src/agentrelay/task_runner/core/io.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@runtime_checkable
class TaskMerger(Protocol):
    """Merge the completed task PR into the integration target."""

    def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> TaskMergeResult:
        """Merge the completed task PR.

        Args:
            runtime: Runtime envelope being merged.
            pr_url: Pull request URL to merge.

        Returns:
            TaskMergeResult: Pre-merge SHA for rollback support.
        """
        ...

merge_pr(runtime, pr_url)

Merge the completed task PR.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being merged.

required
pr_url str

Pull request URL to merge.

required

Returns:

Name Type Description
TaskMergeResult TaskMergeResult

Pre-merge SHA for rollback support.

Source code in src/agentrelay/task_runner/core/io.py
158
159
160
161
162
163
164
165
166
167
168
def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> TaskMergeResult:
    """Merge the completed task PR.

    Args:
        runtime: Runtime envelope being merged.
        pr_url: Pull request URL to merge.

    Returns:
        TaskMergeResult: Pre-merge SHA for rollback support.
    """
    ...

TaskMergeResult dataclass

Result from merging a task PR.

Attributes:

Name Type Description
integration_branch_before_merge str

SHA of the integration branch immediately before the task PR was merged into it.

Source code in src/agentrelay/task_runner/core/io.py
142
143
144
145
146
147
148
149
150
151
@dataclass(frozen=True)
class TaskMergeResult:
    """Result from merging a task PR.

    Attributes:
        integration_branch_before_merge: SHA of the integration branch
            immediately before the task PR was merged into it.
    """

    integration_branch_before_merge: str

TaskPreparer

Bases: Protocol

Prepare runtime execution prerequisites before agent launch.

Source code in src/agentrelay/task_runner/core/io.py
54
55
56
57
58
59
60
61
62
63
64
@runtime_checkable
class TaskPreparer(Protocol):
    """Prepare runtime execution prerequisites before agent launch."""

    def prepare(self, runtime: TaskRuntime) -> None:
        """Prepare runtime execution prerequisites.

        Args:
            runtime: Runtime envelope to prepare (e.g. branch, signal files).
        """
        ...

prepare(runtime)

Prepare runtime execution prerequisites.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to prepare (e.g. branch, signal files).

required
Source code in src/agentrelay/task_runner/core/io.py
58
59
60
61
62
63
64
def prepare(self, runtime: TaskRuntime) -> None:
    """Prepare runtime execution prerequisites.

    Args:
        runtime: Runtime envelope to prepare (e.g. branch, signal files).
    """
    ...

TaskTeardown

Bases: Protocol

Release runtime resources after terminal completion.

Source code in src/agentrelay/task_runner/core/io.py
187
188
189
190
191
192
193
194
195
196
197
@runtime_checkable
class TaskTeardown(Protocol):
    """Release runtime resources after terminal completion."""

    def teardown(self, runtime: TaskRuntime) -> None:
        """Release runtime resources after terminal completion.

        Args:
            runtime: Runtime envelope whose resources should be cleaned up.
        """
        ...

teardown(runtime)

Release runtime resources after terminal completion.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope whose resources should be cleaned up.

required
Source code in src/agentrelay/task_runner/core/io.py
191
192
193
194
195
196
197
def teardown(self, runtime: TaskRuntime) -> None:
    """Release runtime resources after terminal completion.

    Args:
        runtime: Runtime envelope whose resources should be cleaned up.
    """
    ...

StandardTaskRunner dataclass

Standard task lifecycle: prepare → launch → kickoff → wait → gate → merge → teardown.

Uses :class:StepDispatch tables for per-step implementation selection based on the task's AgentFramework and AgentEnvironment.

Step sensitivity reference:

+-----------------------+----------------+----------------------+ | Step | Varies by env? | Varies by framework? | +=======================+================+======================+ | preparer | No | No | +-----------------------+----------------+----------------------+ | launcher | Yes | Yes | +-----------------------+----------------+----------------------+ | kickoff | Yes | Yes | +-----------------------+----------------+----------------------+ | completion_checker | Maybe | No | +-----------------------+----------------+----------------------+ | gate_checker | No | No | +-----------------------+----------------+----------------------+ | merger | No | No | +-----------------------+----------------+----------------------+ | log_capture | Partially | No | +-----------------------+----------------+----------------------+ | teardown | Partially | No | +-----------------------+----------------+----------------------+

Extension guide — adding a new framework or environment: Add entries to the StepDispatch tables for steps that have distinct implementations. Steps that don't vary can keep using default.

Extension guide — different lifecycle (e.g., adding a review step): Create a new class satisfying the :class:TaskRunner protocol. It can reuse StepDispatch tables and per-step protocol implementations.

Attributes:

Name Type Description
_preparer StepDispatch[TaskPreparer]

Dispatch table for :class:TaskPreparer selection.

_launcher StepDispatch[TaskLauncher]

Dispatch table for :class:TaskLauncher selection.

_kickoff StepDispatch[TaskKickoff]

Dispatch table for :class:TaskKickoff selection.

_completion_checker StepDispatch[TaskCompletionChecker]

Dispatch table for :class:TaskCompletionChecker selection.

_gate_checker TaskGateChecker

Completion gate checker (not dispatch-based — always a shell command, does not vary by framework/environment).

_merger StepDispatch[TaskMerger]

Dispatch table for :class:TaskMerger selection.

_log_capture StepDispatch[TaskLogCapture]

Dispatch table for :class:TaskLogCapture selection.

_teardown StepDispatch[TaskTeardown]

Dispatch table for :class:TaskTeardown selection.

Source code in src/agentrelay/task_runner/core/runner.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
@dataclass
class StandardTaskRunner:
    """Standard task lifecycle: prepare → launch → kickoff → wait → gate → merge → teardown.

    Uses :class:`StepDispatch` tables for per-step implementation selection
    based on the task's ``AgentFramework`` and ``AgentEnvironment``.

    Step sensitivity reference:

    +-----------------------+----------------+----------------------+
    | Step                  | Varies by env? | Varies by framework? |
    +=======================+================+======================+
    | preparer              | No             | No                   |
    +-----------------------+----------------+----------------------+
    | launcher              | Yes            | Yes                  |
    +-----------------------+----------------+----------------------+
    | kickoff               | Yes            | Yes                  |
    +-----------------------+----------------+----------------------+
    | completion_checker    | Maybe          | No                   |
    +-----------------------+----------------+----------------------+
    | gate_checker          | No             | No                   |
    +-----------------------+----------------+----------------------+
    | merger                | No             | No                   |
    +-----------------------+----------------+----------------------+
    | log_capture           | Partially      | No                   |
    +-----------------------+----------------+----------------------+
    | teardown              | Partially      | No                   |
    +-----------------------+----------------+----------------------+

    Extension guide — adding a new framework or environment:
      Add entries to the ``StepDispatch`` tables for steps that have distinct
      implementations. Steps that don't vary can keep using ``default``.

    Extension guide — different lifecycle (e.g., adding a review step):
      Create a new class satisfying the :class:`TaskRunner` protocol. It can
      reuse ``StepDispatch`` tables and per-step protocol implementations.

    Attributes:
        _preparer: Dispatch table for :class:`TaskPreparer` selection.
        _launcher: Dispatch table for :class:`TaskLauncher` selection.
        _kickoff: Dispatch table for :class:`TaskKickoff` selection.
        _completion_checker: Dispatch table for :class:`TaskCompletionChecker` selection.
        _gate_checker: Completion gate checker (not dispatch-based — always
            a shell command, does not vary by framework/environment).
        _merger: Dispatch table for :class:`TaskMerger` selection.
        _log_capture: Dispatch table for :class:`TaskLogCapture` selection.
        _teardown: Dispatch table for :class:`TaskTeardown` selection.
    """

    _preparer: StepDispatch[TaskPreparer]
    _launcher: StepDispatch[TaskLauncher]
    _kickoff: StepDispatch[TaskKickoff]
    _completion_checker: StepDispatch[TaskCompletionChecker]
    _gate_checker: TaskGateChecker
    _merger: StepDispatch[TaskMerger]
    _log_capture: StepDispatch[TaskLogCapture]
    _teardown: StepDispatch[TaskTeardown]
    on_event: Optional[Callable[..., None]] = field(default=None, repr=False)

    async def run(
        self,
        runtime: TaskRuntime,
        *,
        teardown_mode: TearDownMode = TearDownMode.ALWAYS,
    ) -> TaskRunResult:
        """Execute one task lifecycle run and return a result snapshot.

        Args:
            runtime: Mutable task runtime to execute. Must enter in ``PENDING``
                state.
            teardown_mode: Resource teardown policy after run completion.

        Returns:
            TaskRunResult: Convenience snapshot of terminal runtime fields.

        Raises:
            ValueError: If ``runtime`` does not enter in ``PENDING`` status.
        """
        if runtime.status != TaskStatus.PENDING:
            raise ValueError(
                "StandardTaskRunner.run() requires runtime.status == PENDING. "
                f"Received {runtime.status!r}."
            )

        try:
            try:
                self._preparer(runtime).prepare(runtime)
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            self._transition(runtime, TaskStatus.RUNNING)
            self._emit_step(
                "task_prepared",
                runtime,
                f"branch={runtime.state.branch_name}",
            )

            try:
                agent = self._launcher(runtime).launch(runtime)
                runtime.artifacts.agent_address = agent.address
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            addr = runtime.artifacts.agent_address
            self._emit_step(
                "task_launched",
                runtime,
                addr.label if addr else None,
            )

            try:
                self._kickoff(runtime).kickoff(runtime, agent)
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            self._emit_step("task_waiting", runtime)

            try:
                signal = await self._completion_checker(runtime).wait_for_completion(
                    runtime
                )
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)

            runtime.artifacts.concerns.extend(signal.concerns)
            runtime.artifacts.ops_concerns.extend(signal.ops_concerns)

            if signal.outcome == "failed":
                runtime.mark_failed(
                    signal.error or "Task failed without an error message."
                )
                return TaskRunResult.from_runtime(runtime)

            if signal.pr_url:
                # Standard path: merge the PR.
                runtime.artifacts.pr_url = signal.pr_url
                self._transition(runtime, TaskStatus.PR_CREATED)

                self._save_pr_summary(runtime, signal.pr_url)

                # Completion gate check (optional — only if task declares one).
                if runtime.task.completion_gate is not None:
                    gate_failed = self._run_gate(runtime)
                    if gate_failed is not None:
                        return gate_failed

                self._emit_step("task_pr_merging", runtime, signal.pr_url)
                try:
                    merge_result = self._merger(runtime).merge_pr(
                        runtime, signal.pr_url
                    )
                except Exception as exc:
                    fc = self._record_io_failure(runtime, exc)
                    return TaskRunResult.from_runtime(runtime, failure_class=fc)

                self._transition(runtime, TaskStatus.PR_MERGED)
                self._write_resolved_task(
                    runtime, merge_result.integration_branch_before_merge
                )
            else:
                # PR-less completion (e.g., review-only task with no changes).
                self._transition(runtime, TaskStatus.COMPLETED)
                self._write_resolved_task(runtime, None)
        finally:
            try:
                self._log_capture(runtime).capture_log(runtime)
            except Exception as exc:
                runtime.artifacts.concerns.append(f"log_capture_failed: {exc}")
            if self._should_teardown(teardown_mode, runtime.status):
                try:
                    self._teardown(runtime).teardown(runtime)
                except Exception as exc:
                    runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

        return TaskRunResult.from_runtime(runtime)

    def _run_gate(self, runtime: TaskRuntime) -> Optional[TaskRunResult]:
        """Run the completion gate retry loop.

        Returns ``None`` if the gate passes, or a ``TaskRunResult`` if the
        gate fails all attempts or an exception occurs.
        """
        max_attempts = runtime.task.max_gate_attempts or _DEFAULT_MAX_GATE_ATTEMPTS
        command = runtime.task.completion_gate

        for attempt in range(max_attempts):
            self._emit_step(
                "task_gate_running",
                runtime,
                f"attempt {attempt + 1}/{max_attempts}: {command}",
            )
            try:
                gate_result = self._gate_checker.check_gate(runtime)
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)
            if gate_result.passed:
                self._emit_step("task_gate_passed", runtime)
                return None
        # All attempts exhausted.
        self._emit_step("task_gate_failed", runtime, command)
        runtime.mark_failed(
            f"Completion gate failed after {max_attempts} attempt(s): {command}"
        )
        return TaskRunResult.from_runtime(runtime)

    def _emit_step(
        self,
        kind: str,
        runtime: TaskRuntime,
        message: Optional[str] = None,
    ) -> None:
        """Emit a step-level event if an event callback is registered."""
        if self.on_event is None:
            return
        # Local import to avoid circular dependency (orchestrator → task_runner).
        from agentrelay.orchestrator.orchestrator import OrchestratorEvent

        self.on_event(
            OrchestratorEvent(
                kind=kind,
                task_id=runtime.task.id,
                workstream_id=runtime.task.workstream_id,
                message=message,
            )
        )

    def _transition(self, runtime: TaskRuntime, target: TaskStatus) -> None:
        """Transition runtime status while enforcing legal lifecycle edges.

        Handles non-failure transitions; ``FAILED`` is written directly by
        callers via ``runtime.mark_failed(error)``.
        """
        current = runtime.status
        allowed = ALLOWED_TASK_TRANSITIONS[current]
        if target not in allowed:
            raise RuntimeError(
                f"Illegal task status transition: {current.value} -> {target.value}"
            )
        method_name = _MARK_DISPATCH[target]
        getattr(runtime, method_name)()

    def _save_pr_summary(self, runtime: TaskRuntime, pr_url: str) -> None:
        """Fetch PR body and write summary.md to the attempt directory.

        Silently skips if the fetch fails or the body is empty — saving
        the summary must not block the merge.
        """
        if runtime.attempt_dir is None:
            return
        try:
            body = gh.pr_body(pr_url)
        except Exception:
            return
        if body:
            signals.write_text(runtime.attempt_dir, "summary.md", body)

    def _write_resolved_task(
        self,
        runtime: TaskRuntime,
        integration_branch_before_merge: Optional[str],
    ) -> None:
        """Write ``resolved.json`` for a successfully completed task.

        Failures are recorded as concerns rather than crashing the task.
        """
        if runtime.state.signal_dir is None:
            return
        try:
            resolved = build_resolved_task(runtime, integration_branch_before_merge)
            signals.write_json(
                runtime.state.signal_dir, "resolved.json", resolved.to_dict()
            )
        except Exception as exc:  # noqa: BLE001
            runtime.artifacts.concerns.append(f"resolved_json_failed: {exc}")

    def _record_io_failure(
        self, runtime: TaskRuntime, exc: Exception
    ) -> IntegrationFailureClass:
        """Record an I/O boundary failure and return its classification."""
        if runtime.status != TaskStatus.FAILED:
            runtime.mark_failed(f"{type(exc).__name__}: {exc}")
        return classify_integration_error(exc)

    def _should_teardown(
        self, teardown_mode: TearDownMode, terminal_status: TaskStatus
    ) -> bool:
        """Return whether teardown should run for the given policy/status."""
        if teardown_mode == TearDownMode.ALWAYS:
            return True
        if teardown_mode == TearDownMode.NEVER:
            return False
        return terminal_status in SUCCESS_STATUSES

run(runtime, *, teardown_mode=TearDownMode.ALWAYS) async

Execute one task lifecycle run and return a result snapshot.

Parameters:

Name Type Description Default
runtime TaskRuntime

Mutable task runtime to execute. Must enter in PENDING state.

required
teardown_mode TearDownMode

Resource teardown policy after run completion.

ALWAYS

Returns:

Name Type Description
TaskRunResult TaskRunResult

Convenience snapshot of terminal runtime fields.

Raises:

Type Description
ValueError

If runtime does not enter in PENDING status.

Source code in src/agentrelay/task_runner/core/runner.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
async def run(
    self,
    runtime: TaskRuntime,
    *,
    teardown_mode: TearDownMode = TearDownMode.ALWAYS,
) -> TaskRunResult:
    """Execute one task lifecycle run and return a result snapshot.

    Args:
        runtime: Mutable task runtime to execute. Must enter in ``PENDING``
            state.
        teardown_mode: Resource teardown policy after run completion.

    Returns:
        TaskRunResult: Convenience snapshot of terminal runtime fields.

    Raises:
        ValueError: If ``runtime`` does not enter in ``PENDING`` status.
    """
    if runtime.status != TaskStatus.PENDING:
        raise ValueError(
            "StandardTaskRunner.run() requires runtime.status == PENDING. "
            f"Received {runtime.status!r}."
        )

    try:
        try:
            self._preparer(runtime).prepare(runtime)
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)
        self._transition(runtime, TaskStatus.RUNNING)
        self._emit_step(
            "task_prepared",
            runtime,
            f"branch={runtime.state.branch_name}",
        )

        try:
            agent = self._launcher(runtime).launch(runtime)
            runtime.artifacts.agent_address = agent.address
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)
        addr = runtime.artifacts.agent_address
        self._emit_step(
            "task_launched",
            runtime,
            addr.label if addr else None,
        )

        try:
            self._kickoff(runtime).kickoff(runtime, agent)
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)
        self._emit_step("task_waiting", runtime)

        try:
            signal = await self._completion_checker(runtime).wait_for_completion(
                runtime
            )
        except Exception as exc:
            fc = self._record_io_failure(runtime, exc)
            return TaskRunResult.from_runtime(runtime, failure_class=fc)

        runtime.artifacts.concerns.extend(signal.concerns)
        runtime.artifacts.ops_concerns.extend(signal.ops_concerns)

        if signal.outcome == "failed":
            runtime.mark_failed(
                signal.error or "Task failed without an error message."
            )
            return TaskRunResult.from_runtime(runtime)

        if signal.pr_url:
            # Standard path: merge the PR.
            runtime.artifacts.pr_url = signal.pr_url
            self._transition(runtime, TaskStatus.PR_CREATED)

            self._save_pr_summary(runtime, signal.pr_url)

            # Completion gate check (optional — only if task declares one).
            if runtime.task.completion_gate is not None:
                gate_failed = self._run_gate(runtime)
                if gate_failed is not None:
                    return gate_failed

            self._emit_step("task_pr_merging", runtime, signal.pr_url)
            try:
                merge_result = self._merger(runtime).merge_pr(
                    runtime, signal.pr_url
                )
            except Exception as exc:
                fc = self._record_io_failure(runtime, exc)
                return TaskRunResult.from_runtime(runtime, failure_class=fc)

            self._transition(runtime, TaskStatus.PR_MERGED)
            self._write_resolved_task(
                runtime, merge_result.integration_branch_before_merge
            )
        else:
            # PR-less completion (e.g., review-only task with no changes).
            self._transition(runtime, TaskStatus.COMPLETED)
            self._write_resolved_task(runtime, None)
    finally:
        try:
            self._log_capture(runtime).capture_log(runtime)
        except Exception as exc:
            runtime.artifacts.concerns.append(f"log_capture_failed: {exc}")
        if self._should_teardown(teardown_mode, runtime.status):
            try:
                self._teardown(runtime).teardown(runtime)
            except Exception as exc:
                runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

    return TaskRunResult.from_runtime(runtime)

TaskRunner

Bases: Protocol

Protocol for the task runner boundary used by Orchestrator.

Different lifecycle variants (standard, reviewing, dry-run) are different classes satisfying this protocol. The orchestrator does not know or care about internal step structure.

Source code in src/agentrelay/task_runner/core/runner.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@runtime_checkable
class TaskRunner(Protocol):
    """Protocol for the task runner boundary used by Orchestrator.

    Different lifecycle variants (standard, reviewing, dry-run) are
    different classes satisfying this protocol. The orchestrator does
    not know or care about internal step structure.
    """

    async def run(
        self,
        runtime: TaskRuntime,
        *,
        teardown_mode: TearDownMode = TearDownMode.ALWAYS,
    ) -> TaskRunResult:
        """Execute one task attempt and return terminal task fields."""
        ...

run(runtime, *, teardown_mode=TearDownMode.ALWAYS) async

Execute one task attempt and return terminal task fields.

Source code in src/agentrelay/task_runner/core/runner.py
145
146
147
148
149
150
151
152
async def run(
    self,
    runtime: TaskRuntime,
    *,
    teardown_mode: TearDownMode = TearDownMode.ALWAYS,
) -> TaskRunResult:
    """Execute one task attempt and return terminal task fields."""
    ...

TaskRunResult dataclass

Convenience return value mirroring terminal fields on :class:TaskRuntime.

TaskRuntime remains the single source of truth; this result is a snapshot for ergonomic call-site consumption.

Attributes:

Name Type Description
task_id str

Task identifier.

status TaskStatus

Terminal task status after TaskRunner.run(...).

pr_url Optional[str]

Task PR URL, if one was recorded.

error Optional[str]

Task failure message, if one was recorded.

failure_class Optional[IntegrationFailureClass]

Integration error classification for I/O boundary failures. None for agent-signaled failures and successes.

Source code in src/agentrelay/task_runner/core/runner.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@dataclass(frozen=True)
class TaskRunResult:
    """Convenience return value mirroring terminal fields on :class:`TaskRuntime`.

    ``TaskRuntime`` remains the single source of truth; this result is a snapshot
    for ergonomic call-site consumption.

    Attributes:
        task_id: Task identifier.
        status: Terminal task status after ``TaskRunner.run(...)``.
        pr_url: Task PR URL, if one was recorded.
        error: Task failure message, if one was recorded.
        failure_class: Integration error classification for I/O boundary
            failures.  ``None`` for agent-signaled failures and successes.
    """

    task_id: str
    status: TaskStatus
    pr_url: Optional[str]
    error: Optional[str]
    failure_class: Optional[IntegrationFailureClass] = None

    @classmethod
    def from_runtime(
        cls,
        runtime: TaskRuntime,
        failure_class: Optional[IntegrationFailureClass] = None,
    ) -> TaskRunResult:
        """Build a result snapshot from the current runtime state.

        Args:
            runtime: Runtime envelope to snapshot.
            failure_class: Optional integration error classification.

        Returns:
            TaskRunResult: Snapshot of task ID, status, PR URL, and error.
        """
        return cls(
            task_id=runtime.task.id,
            status=runtime.status,
            pr_url=runtime.artifacts.pr_url,
            error=runtime.state.error,
            failure_class=failure_class,
        )

from_runtime(runtime, failure_class=None) classmethod

Build a result snapshot from the current runtime state.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to snapshot.

required
failure_class Optional[IntegrationFailureClass]

Optional integration error classification.

None

Returns:

Name Type Description
TaskRunResult TaskRunResult

Snapshot of task ID, status, PR URL, and error.

Source code in src/agentrelay/task_runner/core/runner.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def from_runtime(
    cls,
    runtime: TaskRuntime,
    failure_class: Optional[IntegrationFailureClass] = None,
) -> TaskRunResult:
    """Build a result snapshot from the current runtime state.

    Args:
        runtime: Runtime envelope to snapshot.
        failure_class: Optional integration error classification.

    Returns:
        TaskRunResult: Snapshot of task ID, status, PR URL, and error.
    """
    return cls(
        task_id=runtime.task.id,
        status=runtime.status,
        pr_url=runtime.artifacts.pr_url,
        error=runtime.state.error,
        failure_class=failure_class,
    )

TearDownMode

Bases: str, Enum

Policy controlling whether runtime resources are torn down after run.

Attributes:

Name Type Description
ALWAYS

Always call teardown at end of run.

NEVER

Never call teardown.

ON_SUCCESS

Call teardown only when task reaches a terminal success status (PR_MERGED or COMPLETED).

Source code in src/agentrelay/task_runner/core/runner.py
75
76
77
78
79
80
81
82
83
84
85
86
87
class TearDownMode(str, Enum):
    """Policy controlling whether runtime resources are torn down after run.

    Attributes:
        ALWAYS: Always call teardown at end of run.
        NEVER: Never call teardown.
        ON_SUCCESS: Call teardown only when task reaches a terminal success
            status (``PR_MERGED`` or ``COMPLETED``).
    """

    ALWAYS = "always"
    NEVER = "never"
    ON_SUCCESS = "on_success"

GhTaskMerger dataclass

Merge a task PR via GitHub CLI and update the local integration branch.

After merging, fetches the updated integration branch and writes a .merged signal file to the task's signal directory.

Reads integration_branch from runtime.state (set by the orchestrator before dispatch).

Source code in src/agentrelay/task_runner/implementations/task_merger.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@dataclass
class GhTaskMerger:
    """Merge a task PR via GitHub CLI and update the local integration branch.

    After merging, fetches the updated integration branch and writes a
    ``.merged`` signal file to the task's signal directory.

    Reads ``integration_branch`` from ``runtime.state`` (set by the
    orchestrator before dispatch).
    """

    repo_path: Path

    def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> TaskMergeResult:
        """Merge the completed task PR.

        Captures the integration branch SHA before merging for rollback
        support. Returns a :class:`TaskMergeResult` with the pre-merge SHA.

        Args:
            runtime: Runtime envelope being merged.
            pr_url: Pull request URL to merge.

        Returns:
            TaskMergeResult: Pre-merge SHA of the integration branch.

        Raises:
            ValueError: If ``runtime.state.integration_branch`` is None.
        """
        integration_branch = runtime.state.integration_branch
        if integration_branch is None:
            raise ValueError(
                "runtime.state.integration_branch must be set before merge_pr()"
            )

        before_sha = git.rev_parse(self.repo_path, integration_branch)

        gh.pr_merge(pr_url)

        git.fetch_branch(self.repo_path, integration_branch)
        git.update_local_ref(
            self.repo_path,
            integration_branch,
            f"origin/{integration_branch}",
        )

        if runtime.state.signal_dir is not None:
            timestamp = datetime.now(timezone.utc).isoformat() + "\n"
            signals.write_text(runtime.state.signal_dir, ".merged", timestamp)

        return TaskMergeResult(
            integration_branch_before_merge=before_sha,
        )

merge_pr(runtime, pr_url)

Merge the completed task PR.

Captures the integration branch SHA before merging for rollback support. Returns a :class:TaskMergeResult with the pre-merge SHA.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being merged.

required
pr_url str

Pull request URL to merge.

required

Returns:

Name Type Description
TaskMergeResult TaskMergeResult

Pre-merge SHA of the integration branch.

Raises:

Type Description
ValueError

If runtime.state.integration_branch is None.

Source code in src/agentrelay/task_runner/implementations/task_merger.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def merge_pr(self, runtime: TaskRuntime, pr_url: str) -> TaskMergeResult:
    """Merge the completed task PR.

    Captures the integration branch SHA before merging for rollback
    support. Returns a :class:`TaskMergeResult` with the pre-merge SHA.

    Args:
        runtime: Runtime envelope being merged.
        pr_url: Pull request URL to merge.

    Returns:
        TaskMergeResult: Pre-merge SHA of the integration branch.

    Raises:
        ValueError: If ``runtime.state.integration_branch`` is None.
    """
    integration_branch = runtime.state.integration_branch
    if integration_branch is None:
        raise ValueError(
            "runtime.state.integration_branch must be set before merge_pr()"
        )

    before_sha = git.rev_parse(self.repo_path, integration_branch)

    gh.pr_merge(pr_url)

    git.fetch_branch(self.repo_path, integration_branch)
    git.update_local_ref(
        self.repo_path,
        integration_branch,
        f"origin/{integration_branch}",
    )

    if runtime.state.signal_dir is not None:
        timestamp = datetime.now(timezone.utc).isoformat() + "\n"
        signals.write_text(runtime.state.signal_dir, ".merged", timestamp)

    return TaskMergeResult(
        integration_branch_before_merge=before_sha,
    )

ShellGateChecker dataclass

Run a completion gate shell command and return the result.

Executes the gate command from runtime.task.completion_gate in the task's workstream worktree. Captures combined stdout and stderr to gate_last_output.txt in the signal directory.

The checker does NOT raise on command failure — a non-zero exit code is returned as GateCheckResult(passed=False, ...).

Source code in src/agentrelay/task_runner/implementations/task_gate_checker.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@dataclass
class ShellGateChecker:
    """Run a completion gate shell command and return the result.

    Executes the gate command from ``runtime.task.completion_gate`` in the
    task's workstream worktree.  Captures combined stdout and stderr to
    ``gate_last_output.txt`` in the signal directory.

    The checker does NOT raise on command failure — a non-zero exit code
    is returned as ``GateCheckResult(passed=False, ...)``.
    """

    def check_gate(self, runtime: TaskRuntime) -> GateCheckResult:
        """Execute the completion gate command for a task.

        Args:
            runtime: Runtime envelope with gate command and worktree path.

        Returns:
            GateCheckResult: Pass/fail outcome and captured output.
        """
        command = runtime.task.completion_gate
        assert command is not None, "check_gate called without completion_gate"
        cwd = runtime.state.worktree_path

        try:
            result = subprocess.run(
                command,
                shell=True,
                cwd=cwd,
                capture_output=True,
                text=True,
                timeout=600,
            )
            output = result.stdout + result.stderr
            passed = result.returncode == 0
        except subprocess.TimeoutExpired as exc:
            output = f"Gate command timed out after 600s: {command}\n"
            if exc.stdout:
                output += (
                    exc.stdout if isinstance(exc.stdout, str) else exc.stdout.decode()
                )
            if exc.stderr:
                output += (
                    exc.stderr if isinstance(exc.stderr, str) else exc.stderr.decode()
                )
            passed = False
        except Exception as exc:
            output = f"Gate command error: {type(exc).__name__}: {exc}\n"
            passed = False

        if runtime.attempt_dir is not None:
            signals.write_text(runtime.attempt_dir, GATE_OUTPUT_FILE, output)

        return GateCheckResult(passed=passed, output=output)

check_gate(runtime)

Execute the completion gate command for a task.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope with gate command and worktree path.

required

Returns:

Name Type Description
GateCheckResult GateCheckResult

Pass/fail outcome and captured output.

Source code in src/agentrelay/task_runner/implementations/task_gate_checker.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def check_gate(self, runtime: TaskRuntime) -> GateCheckResult:
    """Execute the completion gate command for a task.

    Args:
        runtime: Runtime envelope with gate command and worktree path.

    Returns:
        GateCheckResult: Pass/fail outcome and captured output.
    """
    command = runtime.task.completion_gate
    assert command is not None, "check_gate called without completion_gate"
    cwd = runtime.state.worktree_path

    try:
        result = subprocess.run(
            command,
            shell=True,
            cwd=cwd,
            capture_output=True,
            text=True,
            timeout=600,
        )
        output = result.stdout + result.stderr
        passed = result.returncode == 0
    except subprocess.TimeoutExpired as exc:
        output = f"Gate command timed out after 600s: {command}\n"
        if exc.stdout:
            output += (
                exc.stdout if isinstance(exc.stdout, str) else exc.stdout.decode()
            )
        if exc.stderr:
            output += (
                exc.stderr if isinstance(exc.stderr, str) else exc.stderr.decode()
            )
        passed = False
    except Exception as exc:
        output = f"Gate command error: {type(exc).__name__}: {exc}\n"
        passed = False

    if runtime.attempt_dir is not None:
        signals.write_text(runtime.attempt_dir, GATE_OUTPUT_FILE, output)

    return GateCheckResult(passed=passed, output=output)

SignalCompletionChecker dataclass

Poll signal files and parse the result into a completion signal.

Watches the task's signal directory for .done or .failed files, then parses the signal file content and any concerns.log / ops_concerns.log entries into a :class:TaskCompletionSignal.

Source code in src/agentrelay/task_runner/implementations/task_completion_checker.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@dataclass
class SignalCompletionChecker:
    """Poll signal files and parse the result into a completion signal.

    Watches the task's signal directory for ``.done`` or ``.failed`` files,
    then parses the signal file content and any ``concerns.log`` /
    ``ops_concerns.log`` entries into a :class:`TaskCompletionSignal`.
    """

    poll_interval: float = 2.0

    async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
        """Wait for terminal task signal from the execution boundary.

        Args:
            runtime: Runtime envelope being observed.

        Returns:
            TaskCompletionSignal: Terminal signal payload with outcome data.

        Raises:
            ValueError: If ``signal_dir`` is not set on the runtime state.
        """
        if runtime.attempt_dir is None:
            raise ValueError("runtime.attempt_dir must be set before polling")

        attempt_dir = runtime.attempt_dir
        found = await signals.poll_signal_files(
            attempt_dir, (".done", ".failed"), self.poll_interval
        )

        content = signals.read_signal_file(attempt_dir, found) or ""
        lines = content.splitlines()

        # Line 1 = ISO timestamp (informational), Line 2 = payload
        payload = lines[1].strip() if len(lines) > 1 else None

        concerns_text = signals.read_signal_file(attempt_dir, "concerns.log")
        concerns = _parse_concerns(concerns_text)

        ops_concerns_text = signals.read_signal_file(attempt_dir, "ops_concerns.log")
        ops_concerns = _parse_concerns(ops_concerns_text)

        if found == ".done":
            pr_url = None if payload == NO_PR_SENTINEL else payload
            return TaskCompletionSignal(
                outcome="done",
                pr_url=pr_url,
                concerns=concerns,
                ops_concerns=ops_concerns,
            )

        return TaskCompletionSignal(
            outcome="failed",
            error=payload,
            concerns=concerns,
            ops_concerns=ops_concerns,
        )

wait_for_completion(runtime) async

Wait for terminal task signal from the execution boundary.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope being observed.

required

Returns:

Name Type Description
TaskCompletionSignal TaskCompletionSignal

Terminal signal payload with outcome data.

Raises:

Type Description
ValueError

If signal_dir is not set on the runtime state.

Source code in src/agentrelay/task_runner/implementations/task_completion_checker.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def wait_for_completion(self, runtime: TaskRuntime) -> TaskCompletionSignal:
    """Wait for terminal task signal from the execution boundary.

    Args:
        runtime: Runtime envelope being observed.

    Returns:
        TaskCompletionSignal: Terminal signal payload with outcome data.

    Raises:
        ValueError: If ``signal_dir`` is not set on the runtime state.
    """
    if runtime.attempt_dir is None:
        raise ValueError("runtime.attempt_dir must be set before polling")

    attempt_dir = runtime.attempt_dir
    found = await signals.poll_signal_files(
        attempt_dir, (".done", ".failed"), self.poll_interval
    )

    content = signals.read_signal_file(attempt_dir, found) or ""
    lines = content.splitlines()

    # Line 1 = ISO timestamp (informational), Line 2 = payload
    payload = lines[1].strip() if len(lines) > 1 else None

    concerns_text = signals.read_signal_file(attempt_dir, "concerns.log")
    concerns = _parse_concerns(concerns_text)

    ops_concerns_text = signals.read_signal_file(attempt_dir, "ops_concerns.log")
    ops_concerns = _parse_concerns(ops_concerns_text)

    if found == ".done":
        pr_url = None if payload == NO_PR_SENTINEL else payload
        return TaskCompletionSignal(
            outcome="done",
            pr_url=pr_url,
            concerns=concerns,
            ops_concerns=ops_concerns,
        )

    return TaskCompletionSignal(
        outcome="failed",
        error=payload,
        concerns=concerns,
        ops_concerns=ops_concerns,
    )

TmuxTaskKickoff dataclass

Send kickoff instructions to a launched agent.

Delegates to :meth:Agent.send_kickoff with the path to the instructions.md file in the task's signal directory.

Source code in src/agentrelay/task_runner/implementations/task_kickoff.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class TmuxTaskKickoff:
    """Send kickoff instructions to a launched agent.

    Delegates to :meth:`Agent.send_kickoff` with the path to the
    ``instructions.md`` file in the task's signal directory.
    """

    def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
        """Send kickoff instructions to the launched task agent.

        Args:
            runtime: Runtime envelope for the task being kicked off.
            agent: Live agent handle to send instructions to.

        Raises:
            ValueError: If ``signal_dir`` is not set on the runtime state.
        """
        if runtime.state.signal_dir is None:
            raise ValueError("runtime.state.signal_dir must be set before kickoff")

        instructions_path = str(runtime.state.signal_dir / "instructions.md")
        agent.send_kickoff(instructions_path)

kickoff(runtime, agent)

Send kickoff instructions to the launched task agent.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope for the task being kicked off.

required
agent Agent

Live agent handle to send instructions to.

required

Raises:

Type Description
ValueError

If signal_dir is not set on the runtime state.

Source code in src/agentrelay/task_runner/implementations/task_kickoff.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def kickoff(self, runtime: TaskRuntime, agent: Agent) -> None:
    """Send kickoff instructions to the launched task agent.

    Args:
        runtime: Runtime envelope for the task being kicked off.
        agent: Live agent handle to send instructions to.

    Raises:
        ValueError: If ``signal_dir`` is not set on the runtime state.
    """
    if runtime.state.signal_dir is None:
        raise ValueError("runtime.state.signal_dir must be set before kickoff")

    instructions_path = str(runtime.state.signal_dir / "instructions.md")
    agent.send_kickoff(instructions_path)

TmuxTaskLauncher dataclass

Launch an agent in a tmux pane using a framework adapter and sandbox.

Orchestrates the command-building pipeline: the adapter builds the framework-specific CLI command, the sandbox wraps it with isolation, and :meth:TmuxAgent.from_config sends the final command to a tmux pane.

Attributes:

Name Type Description
adapter AgentFrameworkAdapter

Framework adapter that builds the raw CLI command.

sandbox AgentSandbox

Sandbox that wraps the command with isolation.

credential_provider CredentialProvider

Credential provider that resolves token tier to environment variables.

repo_path Path

Path to the main repository.

graph_name str

Name of the task graph being executed.

Source code in src/agentrelay/task_runner/implementations/task_launcher.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclass
class TmuxTaskLauncher:
    """Launch an agent in a tmux pane using a framework adapter and sandbox.

    Orchestrates the command-building pipeline: the adapter builds the
    framework-specific CLI command, the sandbox wraps it with isolation,
    and :meth:`TmuxAgent.from_config` sends the final command to a tmux pane.

    Attributes:
        adapter: Framework adapter that builds the raw CLI command.
        sandbox: Sandbox that wraps the command with isolation.
        credential_provider: Credential provider that resolves token tier
            to environment variables.
        repo_path: Path to the main repository.
        graph_name: Name of the task graph being executed.
    """

    adapter: AgentFrameworkAdapter
    sandbox: AgentSandbox
    credential_provider: CredentialProvider
    repo_path: Path
    graph_name: str

    def launch(self, runtime: TaskRuntime) -> Agent:
        """Launch and return the primary agent for this task runtime.

        Builds the agent command via the adapter, wraps it with the sandbox,
        and sends it to a new tmux pane.

        Args:
            runtime: Runtime envelope to launch against.  Must have
                ``state.worktree_path`` and ``state.signal_dir`` set
                (typically by a prior :class:`TaskPreparer` step).

        Returns:
            Agent: Live agent handle bound to this task.

        Raises:
            ValueError: If ``worktree_path`` or ``signal_dir`` are not set.
        """
        if runtime.state.worktree_path is None:
            raise ValueError("runtime.state.worktree_path must be set before launch")
        if runtime.state.signal_dir is None:
            raise ValueError("runtime.state.signal_dir must be set before launch")

        config = runtime.task.primary_agent
        cmd = self.adapter.build_command(config, runtime.state.signal_dir)

        isolation = config.isolation
        tier = isolation.token_tier if isolation is not None else TokenTier.STANDARD
        env_vars = self.credential_provider.resolve(tier)

        context = SandboxContext(
            worktree_path=runtime.state.worktree_path,
            signal_dir=runtime.state.signal_dir,
            repo_path=self.repo_path,
            task_id=runtime.task.id,
            graph_name=self.graph_name,
            attempt_num=runtime.state.attempt_num,
            env_vars=env_vars,
        )
        self.sandbox.setup(context)
        cmd = self.sandbox.wrap_command(cmd, context)

        runtime.artifacts.sandbox = self.sandbox
        runtime.artifacts.sandbox_context = context

        attempt = runtime.state.attempt_num
        return TmuxAgent.from_config(
            config=config,
            task_id=f"{self.graph_name}-{runtime.task.id}-{attempt}",
            worktree_path=runtime.state.worktree_path,
            cmd=cmd,
        )

launch(runtime)

Launch and return the primary agent for this task runtime.

Builds the agent command via the adapter, wraps it with the sandbox, and sends it to a new tmux pane.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to launch against. Must have state.worktree_path and state.signal_dir set (typically by a prior :class:TaskPreparer step).

required

Returns:

Name Type Description
Agent Agent

Live agent handle bound to this task.

Raises:

Type Description
ValueError

If worktree_path or signal_dir are not set.

Source code in src/agentrelay/task_runner/implementations/task_launcher.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def launch(self, runtime: TaskRuntime) -> Agent:
    """Launch and return the primary agent for this task runtime.

    Builds the agent command via the adapter, wraps it with the sandbox,
    and sends it to a new tmux pane.

    Args:
        runtime: Runtime envelope to launch against.  Must have
            ``state.worktree_path`` and ``state.signal_dir`` set
            (typically by a prior :class:`TaskPreparer` step).

    Returns:
        Agent: Live agent handle bound to this task.

    Raises:
        ValueError: If ``worktree_path`` or ``signal_dir`` are not set.
    """
    if runtime.state.worktree_path is None:
        raise ValueError("runtime.state.worktree_path must be set before launch")
    if runtime.state.signal_dir is None:
        raise ValueError("runtime.state.signal_dir must be set before launch")

    config = runtime.task.primary_agent
    cmd = self.adapter.build_command(config, runtime.state.signal_dir)

    isolation = config.isolation
    tier = isolation.token_tier if isolation is not None else TokenTier.STANDARD
    env_vars = self.credential_provider.resolve(tier)

    context = SandboxContext(
        worktree_path=runtime.state.worktree_path,
        signal_dir=runtime.state.signal_dir,
        repo_path=self.repo_path,
        task_id=runtime.task.id,
        graph_name=self.graph_name,
        attempt_num=runtime.state.attempt_num,
        env_vars=env_vars,
    )
    self.sandbox.setup(context)
    cmd = self.sandbox.wrap_command(cmd, context)

    runtime.artifacts.sandbox = self.sandbox
    runtime.artifacts.sandbox_context = context

    attempt = runtime.state.attempt_num
    return TmuxAgent.from_config(
        config=config,
        task_id=f"{self.graph_name}-{runtime.task.id}-{attempt}",
        worktree_path=runtime.state.worktree_path,
        cmd=cmd,
    )

WorktreeTaskLogCapture dataclass

Delegate agent log capture to the agent address.

Calls agent_address.capture_log() to write the agent's execution log (e.g. tmux scrollback) to the signal directory. This step runs unconditionally at task termination, before the teardown decision.

Source code in src/agentrelay/task_runner/implementations/task_log_capture.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@dataclass
class WorktreeTaskLogCapture:
    """Delegate agent log capture to the agent address.

    Calls ``agent_address.capture_log()`` to write the agent's execution
    log (e.g. tmux scrollback) to the signal directory. This step runs
    unconditionally at task termination, before the teardown decision.
    """

    def capture_log(self, runtime: TaskRuntime) -> None:
        """Capture the agent's execution log to the signal directory.

        Args:
            runtime: Runtime envelope whose agent log should be captured.
        """
        agent_address = runtime.artifacts.agent_address

        if agent_address is not None:
            agent_address.capture_log(signal_dir=runtime.attempt_dir)

capture_log(runtime)

Capture the agent's execution log to the signal directory.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope whose agent log should be captured.

required
Source code in src/agentrelay/task_runner/implementations/task_log_capture.py
23
24
25
26
27
28
29
30
31
32
def capture_log(self, runtime: TaskRuntime) -> None:
    """Capture the agent's execution log to the signal directory.

    Args:
        runtime: Runtime envelope whose agent log should be captured.
    """
    agent_address = runtime.artifacts.agent_address

    if agent_address is not None:
        agent_address.capture_log(signal_dir=runtime.attempt_dir)

WorktreeTaskPreparer dataclass

Create a task branch in a shared workstream worktree and write protocol files.

Creates a task-specific branch off the integration branch, checks it out in the workstream worktree, writes manifest.json, policies.json, and instructions.md into the signal directory, and updates the runtime state with computed paths.

The worktree itself is owned by the workstream preparer — this class only creates and checks out branches within it.

Source code in src/agentrelay/task_runner/implementations/task_preparer.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@dataclass
class WorktreeTaskPreparer:
    """Create a task branch in a shared workstream worktree and write protocol files.

    Creates a task-specific branch off the integration branch, checks it out
    in the workstream worktree, writes ``manifest.json``, ``policies.json``,
    and ``instructions.md`` into the signal directory, and updates the runtime
    state with computed paths.

    The worktree itself is owned by the workstream preparer — this class only
    creates and checks out branches within it.
    """

    run_dir: Path
    graph_name: str
    dependency_descriptions: dict[str, Optional[str]] = field(default_factory=dict)
    context_content: Optional[str] = None
    tools: tuple[str, ...] = ()

    def prepare(self, runtime: TaskRuntime) -> None:
        """Prepare runtime execution prerequisites.

        Creates a task branch in the workstream worktree, checks it out,
        and writes protocol files to the signal directory.

        Reads ``integration_branch`` and ``workstream_worktree_path`` from
        ``runtime.state`` (set by the orchestrator before dispatch).

        Args:
            runtime: Runtime envelope to prepare (e.g. branch, signal files).

        Raises:
            ValueError: If ``runtime.state.integration_branch`` or
                ``runtime.state.workstream_worktree_path`` is None.
        """
        integration_branch = runtime.state.integration_branch
        workstream_worktree_path = runtime.state.workstream_worktree_path
        if integration_branch is None:
            raise ValueError(
                "runtime.state.integration_branch must be set before prepare()"
            )
        if workstream_worktree_path is None:
            raise ValueError(
                "runtime.state.workstream_worktree_path must be set before prepare()"
            )

        task = runtime.task
        branch_name = f"agentrelay/{self.graph_name}/{task.id}"
        signal_dir = self.run_dir / "signals" / task.id

        try:
            if git.current_branch(workstream_worktree_path) == branch_name:
                # Retry: branch already checked out from a prior attempt.
                # Keep the agent's prior commits so they can fix their code.
                pass
            else:
                git.branch_create(
                    workstream_worktree_path,
                    branch_name,
                    integration_branch,
                    force=True,
                )
                git.checkout(workstream_worktree_path, branch_name)
        except subprocess.CalledProcessError as exc:
            raise _WorkspaceIntegrationError(
                f"Failed to create branch for task {task.id!r}: {exc}",
            ) from exc

        signals.ensure_signal_dir(signal_dir)

        input_files = _resolve_input_files(task, self.run_dir)

        manifest = build_manifest(
            task=task,
            branch_name=branch_name,
            integration_branch=integration_branch,
            graph_name=self.graph_name,
            attempt_num=runtime.state.attempt_num,
            dependency_descriptions=self.dependency_descriptions,
            tools=self.tools,
            input_files=input_files,
        )
        signals.write_json(signal_dir, "manifest.json", manifest_to_dict(manifest))

        policies = build_policies(task, integration_branch)
        signals.write_json(signal_dir, "policies.json", policies_to_dict(policies))

        isolation = task.primary_agent.isolation
        instructions = resolve_instructions(
            task.role,
            manifest,
            adr_verbosity=task.primary_agent.adr_verbosity,
            sandbox_type=isolation.sandbox_type if isolation is not None else None,
            worktree_path=workstream_worktree_path,
            graph_yaml_path=self.run_dir / "graph.yaml",
            signals_base_path=self.run_dir / "signals",
        )
        signals.write_text(signal_dir, "instructions.md", instructions)

        if self.context_content is not None:
            signals.write_text(signal_dir, "context.md", self.context_content)

        runtime.state.worktree_path = workstream_worktree_path
        runtime.state.branch_name = branch_name
        runtime.state.signal_dir = signal_dir

prepare(runtime)

Prepare runtime execution prerequisites.

Creates a task branch in the workstream worktree, checks it out, and writes protocol files to the signal directory.

Reads integration_branch and workstream_worktree_path from runtime.state (set by the orchestrator before dispatch).

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope to prepare (e.g. branch, signal files).

required

Raises:

Type Description
ValueError

If runtime.state.integration_branch or runtime.state.workstream_worktree_path is None.

Source code in src/agentrelay/task_runner/implementations/task_preparer.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def prepare(self, runtime: TaskRuntime) -> None:
    """Prepare runtime execution prerequisites.

    Creates a task branch in the workstream worktree, checks it out,
    and writes protocol files to the signal directory.

    Reads ``integration_branch`` and ``workstream_worktree_path`` from
    ``runtime.state`` (set by the orchestrator before dispatch).

    Args:
        runtime: Runtime envelope to prepare (e.g. branch, signal files).

    Raises:
        ValueError: If ``runtime.state.integration_branch`` or
            ``runtime.state.workstream_worktree_path`` is None.
    """
    integration_branch = runtime.state.integration_branch
    workstream_worktree_path = runtime.state.workstream_worktree_path
    if integration_branch is None:
        raise ValueError(
            "runtime.state.integration_branch must be set before prepare()"
        )
    if workstream_worktree_path is None:
        raise ValueError(
            "runtime.state.workstream_worktree_path must be set before prepare()"
        )

    task = runtime.task
    branch_name = f"agentrelay/{self.graph_name}/{task.id}"
    signal_dir = self.run_dir / "signals" / task.id

    try:
        if git.current_branch(workstream_worktree_path) == branch_name:
            # Retry: branch already checked out from a prior attempt.
            # Keep the agent's prior commits so they can fix their code.
            pass
        else:
            git.branch_create(
                workstream_worktree_path,
                branch_name,
                integration_branch,
                force=True,
            )
            git.checkout(workstream_worktree_path, branch_name)
    except subprocess.CalledProcessError as exc:
        raise _WorkspaceIntegrationError(
            f"Failed to create branch for task {task.id!r}: {exc}",
        ) from exc

    signals.ensure_signal_dir(signal_dir)

    input_files = _resolve_input_files(task, self.run_dir)

    manifest = build_manifest(
        task=task,
        branch_name=branch_name,
        integration_branch=integration_branch,
        graph_name=self.graph_name,
        attempt_num=runtime.state.attempt_num,
        dependency_descriptions=self.dependency_descriptions,
        tools=self.tools,
        input_files=input_files,
    )
    signals.write_json(signal_dir, "manifest.json", manifest_to_dict(manifest))

    policies = build_policies(task, integration_branch)
    signals.write_json(signal_dir, "policies.json", policies_to_dict(policies))

    isolation = task.primary_agent.isolation
    instructions = resolve_instructions(
        task.role,
        manifest,
        adr_verbosity=task.primary_agent.adr_verbosity,
        sandbox_type=isolation.sandbox_type if isolation is not None else None,
        worktree_path=workstream_worktree_path,
        graph_yaml_path=self.run_dir / "graph.yaml",
        signals_base_path=self.run_dir / "signals",
    )
    signals.write_text(signal_dir, "instructions.md", instructions)

    if self.context_content is not None:
        signals.write_text(signal_dir, "context.md", self.context_content)

    runtime.state.worktree_path = workstream_worktree_path
    runtime.state.branch_name = branch_name
    runtime.state.signal_dir = signal_dir

WorktreeTaskTeardown dataclass

Delegate agent-address cleanup and delete the task branch.

Performs best-effort cleanup: delegates log capture and environment teardown to the agent address, then deletes the task branch. The workstream worktree is owned by the workstream teardown handler and is not touched here. Errors during teardown are caught and not propagated.

Source code in src/agentrelay/task_runner/implementations/task_teardown.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass
class WorktreeTaskTeardown:
    """Delegate agent-address cleanup and delete the task branch.

    Performs best-effort cleanup: delegates log capture and environment
    teardown to the agent address, then deletes the task branch.
    The workstream worktree is owned by the workstream teardown handler
    and is not touched here. Errors during teardown are caught and not
    propagated.
    """

    repo_path: Path
    keep_panes: bool = False

    def teardown(self, runtime: TaskRuntime) -> None:
        """Release runtime resources after terminal completion.

        Args:
            runtime: Runtime envelope whose resources should be cleaned up.
        """
        agent_address = runtime.artifacts.agent_address

        if agent_address is not None:
            agent_address.teardown(
                signal_dir=runtime.attempt_dir,
                keep_panes=self.keep_panes,
            )

        sandbox = runtime.artifacts.sandbox
        if sandbox is not None and runtime.artifacts.sandbox_context is not None:
            try:
                sandbox.teardown(runtime.artifacts.sandbox_context)
            except Exception:
                pass  # Best-effort: container may already be gone

        if runtime.state.branch_name is not None:
            try:
                git.branch_delete(self.repo_path, runtime.state.branch_name)
            except subprocess.CalledProcessError:
                pass  # Best-effort: branch may have been deleted by GitHub

teardown(runtime)

Release runtime resources after terminal completion.

Parameters:

Name Type Description Default
runtime TaskRuntime

Runtime envelope whose resources should be cleaned up.

required
Source code in src/agentrelay/task_runner/implementations/task_teardown.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def teardown(self, runtime: TaskRuntime) -> None:
    """Release runtime resources after terminal completion.

    Args:
        runtime: Runtime envelope whose resources should be cleaned up.
    """
    agent_address = runtime.artifacts.agent_address

    if agent_address is not None:
        agent_address.teardown(
            signal_dir=runtime.attempt_dir,
            keep_panes=self.keep_panes,
        )

    sandbox = runtime.artifacts.sandbox
    if sandbox is not None and runtime.artifacts.sandbox_context is not None:
        try:
            sandbox.teardown(runtime.artifacts.sandbox_context)
        except Exception:
            pass  # Best-effort: container may already be gone

    if runtime.state.branch_name is not None:
        try:
            git.branch_delete(self.repo_path, runtime.state.branch_name)
        except subprocess.CalledProcessError:
            pass  # Best-effort: branch may have been deleted by GitHub