Skip to content

output

View module diagram

Output package — presentation layer for orchestrator events and results.

ConsoleListener dataclass

Real-time console output for orchestrator events.

Satisfies the :class:~agentrelay.orchestrator.OrchestratorListener protocol. Prints timestamped event lines to stream as events arrive.

Source code in src/agentrelay/output/console.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@dataclass
class ConsoleListener:
    """Real-time console output for orchestrator events.

    Satisfies the :class:`~agentrelay.orchestrator.OrchestratorListener`
    protocol. Prints timestamped event lines to *stream* as events arrive.
    """

    stream: TextIO = field(default_factory=lambda: sys.stderr)
    verbose: bool = False
    _start_times: dict[str, float] = field(default_factory=dict, repr=False)

    def on_event(self, event: OrchestratorEvent) -> None:
        """Print a formatted line for each orchestrator event."""
        handler = getattr(self, f"_on_{event.kind}", None)
        if handler is not None:
            handler(event)

    def _print(self, timestamp: float, label: str, detail: str) -> None:
        ts = _format_time(timestamp)
        self.stream.write(f"[{ts}] {label:<14} {detail}\n")
        self.stream.flush()

    def _on_workstream_prepared(self, event: OrchestratorEvent) -> None:
        self._print(event.timestamp, event.workstream_id or "?", "prepared")

    def _on_workstream_prepare_failed(self, event: OrchestratorEvent) -> None:
        msg = event.message or "unknown error"
        self._print(
            event.timestamp,
            event.workstream_id or "?",
            f"prepare FAILED: {msg}",
        )

    def _on_task_started(self, event: OrchestratorEvent) -> None:
        task_id = event.task_id or "?"
        self._start_times[task_id] = event.timestamp
        attempt = event.attempt_num or 0
        parts = [f"started ({event.workstream_id}"]
        if attempt > 0:
            parts.append(f", retry {attempt}")
        parts.append(")")
        self._print(event.timestamp, task_id, "".join(parts))

    def _on_task_finished(self, event: OrchestratorEvent) -> None:
        task_id = event.task_id or "?"
        start = self._start_times.pop(task_id, None)
        duration = _format_duration(event.timestamp - start) if start else ""

        if event.outcome_class == TaskOutcomeClass.SUCCESS:
            detail = f"succeeded ({duration})"
            if event.message:
                detail += f" \u2192 {event.message}"
        elif event.outcome_class == TaskOutcomeClass.INTERNAL_ERROR:
            detail = f"INTERNAL ERROR: {event.message or ''}"
        else:
            msg = event.message or ""
            if msg == "retry_scheduled":
                detail = f"failed, retrying ({duration})"
            elif msg == "max_attempts_reached":
                detail = f"failed, no retries left ({duration})"
            else:
                detail = f"failed: {msg}"

        self._print(event.timestamp, task_id, detail)

    def _on_task_blocked(self, event: OrchestratorEvent) -> None:
        self._print(event.timestamp, event.task_id or "?", "blocked")

    def _on_workstream_pr_created(self, event: OrchestratorEvent) -> None:
        detail = "integration PR created"
        if event.message:
            detail += f" \u2192 {event.message}"
        self._print(event.timestamp, event.workstream_id or "?", detail)

    def _on_workstream_integration_failed(self, event: OrchestratorEvent) -> None:
        msg = event.message or "unknown error"
        self._print(
            event.timestamp,
            event.workstream_id or "?",
            f"integration FAILED: {msg}",
        )

    def _on_workstream_merged(self, event: OrchestratorEvent) -> None:
        detail = "integration PR merged"
        if event.message:
            detail += f" \u2192 {event.message}"
        self._print(event.timestamp, event.workstream_id or "?", detail)

    def _on_workstream_auto_merged(self, event: OrchestratorEvent) -> None:
        detail = "integration PR auto-merged"
        if event.message:
            detail += f" \u2192 {event.message}"
        self._print(event.timestamp, event.workstream_id or "?", detail)

    def _on_workstream_integration_skipped(self, event: OrchestratorEvent) -> None:
        detail = "integration skipped (no changes)"
        if event.message:
            detail += f": {event.message}"
        self._print(event.timestamp, event.workstream_id or "?", detail)

    def _on_workstream_auto_merge_skipped(self, event: OrchestratorEvent) -> None:
        detail = "auto-merge skipped"
        if event.message:
            detail += f": {event.message}"
        self._print(event.timestamp, event.workstream_id or "?", detail)

    def _on_workstream_auto_merge_failed(self, event: OrchestratorEvent) -> None:
        msg = event.message or "unknown error"
        self._print(
            event.timestamp,
            event.workstream_id or "?",
            f"auto-merge FAILED: {msg}",
        )

    def _on_waiting_for_integration_merge(self, event: OrchestratorEvent) -> None:
        self._print(
            event.timestamp,
            "orchestrator",
            "waiting for integration PR merge...",
        )

    # -- Verbose-only step events (from StandardTaskRunner) --

    def _on_task_prepared(self, event: OrchestratorEvent) -> None:
        if self.verbose:
            self._print(
                event.timestamp,
                event.task_id or "?",
                f"prepared ({event.message or ''})",
            )

    def _on_task_launched(self, event: OrchestratorEvent) -> None:
        if self.verbose:
            self._print(
                event.timestamp,
                event.task_id or "?",
                f"agent launched ({event.message or ''})",
            )

    def _on_task_waiting(self, event: OrchestratorEvent) -> None:
        if self.verbose:
            self._print(
                event.timestamp,
                event.task_id or "?",
                "waiting for completion signal",
            )

    def _on_task_gate_running(self, event: OrchestratorEvent) -> None:
        if self.verbose:
            self._print(
                event.timestamp,
                event.task_id or "?",
                f"gate running ({event.message or ''})",
            )

    def _on_task_gate_passed(self, event: OrchestratorEvent) -> None:
        if self.verbose:
            self._print(
                event.timestamp,
                event.task_id or "?",
                "gate passed",
            )

    def _on_task_gate_failed(self, event: OrchestratorEvent) -> None:
        msg = event.message or "unknown command"
        self._print(
            event.timestamp,
            event.task_id or "?",
            f"gate FAILED: {msg}",
        )

    def _on_task_pr_merging(self, event: OrchestratorEvent) -> None:
        if self.verbose:
            self._print(
                event.timestamp,
                event.task_id or "?",
                f"merging PR to integration branch",
            )

on_event(event)

Print a formatted line for each orchestrator event.

Source code in src/agentrelay/output/console.py
52
53
54
55
56
def on_event(self, event: OrchestratorEvent) -> None:
    """Print a formatted line for each orchestrator event."""
    handler = getattr(self, f"_on_{event.kind}", None)
    if handler is not None:
        handler(event)

ResumeTaskInfo dataclass

Per-task information for the resume summary table.

Attributes:

Name Type Description
task_id str

Task identifier.

status TaskStatus

Task status from the prior run probe.

frozen bool

Whether the task has a frozen resolved.json record.

Source code in src/agentrelay/output/console.py
357
358
359
360
361
362
363
364
365
366
367
368
369
@dataclass(frozen=True)
class ResumeTaskInfo:
    """Per-task information for the resume summary table.

    Attributes:
        task_id: Task identifier.
        status: Task status from the prior run probe.
        frozen: Whether the task has a frozen ``resolved.json`` record.
    """

    task_id: str
    status: TaskStatus
    frozen: bool

print_config_warnings(warnings, *, stream=sys.stderr)

Print config mismatch warnings from a prior run.

Parameters:

Name Type Description Default
warnings Sequence[str]

Warning strings from config comparison.

required
stream TextIO

Output stream (default stderr).

stderr
Source code in src/agentrelay/output/console.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def print_config_warnings(
    warnings: Sequence[str],
    *,
    stream: TextIO = sys.stderr,
) -> None:
    """Print config mismatch warnings from a prior run.

    Args:
        warnings: Warning strings from config comparison.
        stream: Output stream (default stderr).
    """
    stream.write("\nConfig changed from prior run:\n")
    for warning in warnings:
        stream.write(f"  {warning}\n")
    stream.flush()

print_override_report(validation, *, stream=sys.stderr)

Print informational override report for frozen tasks.

Called only when validation.has_overrides is True.

Parameters:

Name Type Description Default
validation FrozenValidationResult

Frozen validation result from :func:~agentrelay.resolved_validation.validate_frozen_tasks.

required
stream TextIO

Output stream (default stderr).

stderr
Source code in src/agentrelay/output/console.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
def print_override_report(
    validation: FrozenValidationResult,
    *,
    stream: TextIO = sys.stderr,
) -> None:
    """Print informational override report for frozen tasks.

    Called only when ``validation.has_overrides`` is ``True``.

    Args:
        validation: Frozen validation result from
            :func:`~agentrelay.resolved_validation.validate_frozen_tasks`.
        stream: Output stream (default stderr).
    """
    stream.write(
        "\nFrozen task overrides (current YAML differs from executed values):\n"
    )
    for task_result in validation.task_results:
        if not task_result.has_overrides:
            continue
        stream.write(f"\n  {task_result.task_id}:\n")
        for mismatch in task_result.mismatches:
            stream.write(
                f"    {mismatch.field}: {mismatch.resolved_value}"
                f" (current: {mismatch.current_value})\n"
            )
    stream.flush()

print_resume_summary(graph_name, run_number, prior_run_number, task_infos, *, stream=sys.stderr)

Print a pre-orchestrator resume summary table.

Shows the status and planned action for each task in graph topological order.

Parameters:

Name Type Description Default
graph_name str

Name of the graph being resumed.

required
run_number int

New run number being created.

required
prior_run_number int

Run number of the prior run being resumed from.

required
task_infos Sequence[ResumeTaskInfo]

Per-task resume info in graph topological order.

required
stream TextIO

Output stream (default stderr).

stderr
Source code in src/agentrelay/output/console.py
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def print_resume_summary(
    graph_name: str,
    run_number: int,
    prior_run_number: int,
    task_infos: Sequence[ResumeTaskInfo],
    *,
    stream: TextIO = sys.stderr,
) -> None:
    """Print a pre-orchestrator resume summary table.

    Shows the status and planned action for each task in graph topological
    order.

    Args:
        graph_name: Name of the graph being resumed.
        run_number: New run number being created.
        prior_run_number: Run number of the prior run being resumed from.
        task_infos: Per-task resume info in graph topological order.
        stream: Output stream (default stderr).
    """
    stream.write(
        f"\nResuming graph '{graph_name}' (run {run_number}, prior: run {prior_run_number})\n"
    )

    if not task_infos:
        stream.write("\n  (no tasks)\n")
        stream.flush()
        return

    headers = ("Task", "Status", "Action")
    rows: list[tuple[str, str, str]] = []
    status_labels = {
        TaskStatus.PR_MERGED: "completed",
        TaskStatus.COMPLETED: "completed",
        TaskStatus.FAILED: "failed",
        TaskStatus.RUNNING: "running",
        TaskStatus.PR_CREATED: "pr_created",
        TaskStatus.PENDING: "pending",
    }
    for info in task_infos:
        label = status_labels.get(info.status, info.status.value)
        action = _resume_action(info)
        rows.append((info.task_id, label, action))

    widths = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            widths[i] = max(widths[i], len(cell))

    def _fmt(cells: tuple[str, ...]) -> str:
        return "  ".join(cell.ljust(widths[i]) for i, cell in enumerate(cells))

    stream.write("\n")
    stream.write("  " + _fmt(headers) + "\n")
    stream.write("  " + "  ".join("\u2500" * w for w in widths) + "\n")
    for row in rows:
        stream.write("  " + _fmt(row) + "\n")

    stream.flush()

print_summary(result, *, stream=sys.stderr)

Print a post-run summary table.

Parameters:

Name Type Description Default
result OrchestratorResult

Terminal orchestration result.

required
stream TextIO

Output stream (default stderr).

stderr
Source code in src/agentrelay/output/console.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def print_summary(
    result: OrchestratorResult,
    *,
    stream: TextIO = sys.stderr,
) -> None:
    """Print a post-run summary table.

    Args:
        result: Terminal orchestration result.
        stream: Output stream (default stderr).
    """
    durations = _task_durations(result.events)

    stream.write(f"\nOutcome: {result.outcome.value}\n")
    if result.fatal_error:
        stream.write(f"Fatal error:\n{result.fatal_error}\n")

    if not result.task_runtimes:
        return

    # Column headers and rows.
    headers = ("Task", "Status", "Workstream", "Attempts", "Duration", "PR")
    status_labels = {
        TaskStatus.PR_MERGED: "succeeded",
        TaskStatus.COMPLETED: "succeeded",
        TaskStatus.FAILED: "failed",
        TaskStatus.RUNNING: "running",
        TaskStatus.PR_CREATED: "pr_created",
        TaskStatus.PENDING: "pending",
    }
    rows: list[tuple[str, ...]] = []
    for task_id, runtime in result.task_runtimes.items():
        status = status_labels.get(runtime.status, runtime.status.value)
        workstream = runtime.task.workstream_id
        attempts = str(runtime.state.attempt_num + 1)
        dur = durations.get(task_id)
        duration = _format_duration(dur) if dur is not None else "-"
        pr_url = runtime.artifacts.pr_url or ""
        rows.append((task_id, status, workstream, attempts, duration, pr_url))

    # Compute column widths.
    widths = [len(h) for h in headers]
    for row in rows:
        for i, cell in enumerate(row):
            widths[i] = max(widths[i], len(cell))

    def _fmt_row(cells: tuple[str, ...]) -> str:
        return "  ".join(cell.ljust(widths[i]) for i, cell in enumerate(cells))

    stream.write("\n")
    stream.write(_fmt_row(headers) + "\n")
    stream.write("  ".join("\u2500" * w for w in widths) + "\n")
    for row in rows:
        stream.write(_fmt_row(row) + "\n")

    # Total duration from first event to last event.
    if result.events:
        total = result.events[-1].timestamp - result.events[0].timestamp
        stream.write(f"\nTotal: {_format_duration(total)}\n")

    # Per-task errors for failed tasks.
    failed_errors = [
        (tid, rt.state.error)
        for tid, rt in result.task_runtimes.items()
        if rt.status is TaskStatus.FAILED and rt.state.error
    ]
    if failed_errors:
        stream.write("\nErrors:\n")
        for task_id, error in failed_errors:
            stream.write(f"  {task_id}: {error}\n")

    # Per-task concerns.
    task_concerns = [
        (tid, rt.artifacts.concerns)
        for tid, rt in result.task_runtimes.items()
        if rt.artifacts.concerns
    ]
    if task_concerns:
        stream.write("\nConcerns:\n")
        _write_concerns(stream, task_concerns)

    # Per-task ops concerns.
    task_ops_concerns = [
        (tid, rt.artifacts.ops_concerns)
        for tid, rt in result.task_runtimes.items()
        if rt.artifacts.ops_concerns
    ]
    if task_ops_concerns:
        stream.write("\nOps Concerns:\n")
        _write_concerns(stream, task_ops_concerns)

    stream.flush()