Skip to content

workstream

View module diagram

Workstream model for lane-level execution and integration state.

This package defines immutable workstream specifications, mutable runtime state for each workstream lane, and the workstream-level lifecycle runner with its I/O protocols.

Subpackages

core: Specs, runtime state, protocols, and runner. implementations: Concrete protocol implementations.

IntegrationAutoMerger

Bases: Protocol

Merge a workstream's integration PR on the hosting platform.

Source code in src/agentrelay/workstream/core/io.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@runtime_checkable
class IntegrationAutoMerger(Protocol):
    """Merge a workstream's integration PR on the hosting platform."""

    def merge(self, workstream_runtime: WorkstreamRuntime) -> IntegrationMergeResult:
        """Merge the integration PR for this workstream.

        Args:
            workstream_runtime: Workstream runtime whose integration PR
                should be merged.

        Returns:
            IntegrationMergeResult: Pre-merge SHA for rollback support.

        Raises:
            RuntimeError: If the merge fails.
        """
        ...

merge(workstream_runtime)

Merge the integration PR for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose integration PR should be merged.

required

Returns:

Name Type Description
IntegrationMergeResult IntegrationMergeResult

Pre-merge SHA for rollback support.

Raises:

Type Description
RuntimeError

If the merge fails.

Source code in src/agentrelay/workstream/core/io.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def merge(self, workstream_runtime: WorkstreamRuntime) -> IntegrationMergeResult:
    """Merge the integration PR for this workstream.

    Args:
        workstream_runtime: Workstream runtime whose integration PR
            should be merged.

    Returns:
        IntegrationMergeResult: Pre-merge SHA for rollback support.

    Raises:
        RuntimeError: If the merge fails.
    """
    ...

IntegrationMergeChecker

Bases: Protocol

Check whether a workstream's integration PR has been merged.

Source code in src/agentrelay/workstream/core/io.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@runtime_checkable
class IntegrationMergeChecker(Protocol):
    """Check whether a workstream's integration PR has been merged."""

    def is_merged(
        self, workstream_runtime: WorkstreamRuntime
    ) -> IntegrationMergeCheckResult:
        """Check whether the integration PR for this workstream is merged.

        Args:
            workstream_runtime: Workstream runtime to check.

        Returns:
            IntegrationMergeCheckResult: Merge status and pre-merge SHA
            (when merged, derived from the merge commit's first parent).
        """
        ...

is_merged(workstream_runtime)

Check whether the integration PR for this workstream is merged.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to check.

required

Returns:

Name Type Description
IntegrationMergeCheckResult IntegrationMergeCheckResult

Merge status and pre-merge SHA

IntegrationMergeCheckResult

(when merged, derived from the merge commit's first parent).

Source code in src/agentrelay/workstream/core/io.py
124
125
126
127
128
129
130
131
132
133
134
135
136
def is_merged(
    self, workstream_runtime: WorkstreamRuntime
) -> IntegrationMergeCheckResult:
    """Check whether the integration PR for this workstream is merged.

    Args:
        workstream_runtime: Workstream runtime to check.

    Returns:
        IntegrationMergeCheckResult: Merge status and pre-merge SHA
        (when merged, derived from the merge commit's first parent).
    """
    ...

IntegrationMergeCheckResult dataclass

Result from polling whether a workstream integration PR is merged.

Attributes:

Name Type Description
merged bool

Whether the integration PR has been merged.

target_branch_before_merge Optional[str]

SHA of the target branch before the merge commit. Populated when merged=True, derived from the merge commit's first parent. None when merged=False or when the merge commit SHA could not be determined.

Source code in src/agentrelay/workstream/core/io.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass(frozen=True)
class IntegrationMergeCheckResult:
    """Result from polling whether a workstream integration PR is merged.

    Attributes:
        merged: Whether the integration PR has been merged.
        target_branch_before_merge: SHA of the target branch before the
            merge commit.  Populated when ``merged=True``, derived from
            the merge commit's first parent.  ``None`` when ``merged=False``
            or when the merge commit SHA could not be determined.
    """

    merged: bool
    target_branch_before_merge: Optional[str] = None

IntegrationMergeResult dataclass

Result from auto-merging a workstream integration PR.

Attributes:

Name Type Description
target_branch_before_merge str

SHA of the target branch immediately before the integration PR was merged.

Source code in src/agentrelay/workstream/core/io.py
42
43
44
45
46
47
48
49
50
51
@dataclass(frozen=True)
class IntegrationMergeResult:
    """Result from auto-merging a workstream integration PR.

    Attributes:
        target_branch_before_merge: SHA of the target branch immediately
            before the integration PR was merged.
    """

    target_branch_before_merge: str

IntegrationResult dataclass

Result from workstream integration PR creation.

Attributes:

Name Type Description
skipped bool

True when the integration branch had no commits ahead of the target and the PR was skipped.

target_branch_authoritative_sha Optional[str]

SHA of the target branch when skipped=True (the integrator is the authoritative source). None when skipped=False — the merger or checker that performs the actual merge is the authoritative source instead.

Source code in src/agentrelay/workstream/core/io.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass(frozen=True)
class IntegrationResult:
    """Result from workstream integration PR creation.

    Attributes:
        skipped: True when the integration branch had no commits ahead
            of the target and the PR was skipped.
        target_branch_authoritative_sha: SHA of the target branch when
            ``skipped=True`` (the integrator is the authoritative source).
            ``None`` when ``skipped=False`` — the merger or checker that
            performs the actual merge is the authoritative source instead.
    """

    skipped: bool
    target_branch_authoritative_sha: Optional[str] = None

TaskPrProber

Bases: Protocol

Check and optionally merge an individual task PR.

Used by the resumption probe to normalize stale PR_CREATED tasks left behind when the orchestrator was interrupted between PR creation and merge. Kept behind a protocol so the probe module does not depend directly on ops/gh, matching the pattern used by :class:TaskMerger, :class:IntegrationMergeChecker, and :class:IntegrationAutoMerger.

Source code in src/agentrelay/workstream/core/io.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
@runtime_checkable
class TaskPrProber(Protocol):
    """Check and optionally merge an individual task PR.

    Used by the resumption probe to normalize stale ``PR_CREATED`` tasks
    left behind when the orchestrator was interrupted between PR creation
    and merge.  Kept behind a protocol so the probe module does not depend
    directly on ``ops/gh``, matching the pattern used by
    :class:`TaskMerger`, :class:`IntegrationMergeChecker`, and
    :class:`IntegrationAutoMerger`.
    """

    def is_merged(self, pr_url: str) -> bool:
        """Return whether the given task PR is already merged.

        Args:
            pr_url: URL of the task PR to check.

        Returns:
            ``True`` if the PR is merged, ``False`` otherwise (including
            transient failures that should not crash the probe).
        """
        ...

    def try_merge(self, pr_url: str) -> bool:
        """Best-effort merge of the given task PR.

        Unlike :meth:`TaskMerger.merge_pr`, this does not raise on failure.
        A ``False`` return means "leave for manual review" — the caller
        treats this as retry-eligible, not as an internal error.

        Args:
            pr_url: URL of the task PR to merge.

        Returns:
            ``True`` on merge success, ``False`` on any merge failure.
        """
        ...

is_merged(pr_url)

Return whether the given task PR is already merged.

Parameters:

Name Type Description Default
pr_url str

URL of the task PR to check.

required

Returns:

Type Description
bool

True if the PR is merged, False otherwise (including

bool

transient failures that should not crash the probe).

Source code in src/agentrelay/workstream/core/io.py
171
172
173
174
175
176
177
178
179
180
181
def is_merged(self, pr_url: str) -> bool:
    """Return whether the given task PR is already merged.

    Args:
        pr_url: URL of the task PR to check.

    Returns:
        ``True`` if the PR is merged, ``False`` otherwise (including
        transient failures that should not crash the probe).
    """
    ...

try_merge(pr_url)

Best-effort merge of the given task PR.

Unlike :meth:TaskMerger.merge_pr, this does not raise on failure. A False return means "leave for manual review" — the caller treats this as retry-eligible, not as an internal error.

Parameters:

Name Type Description Default
pr_url str

URL of the task PR to merge.

required

Returns:

Type Description
bool

True on merge success, False on any merge failure.

Source code in src/agentrelay/workstream/core/io.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def try_merge(self, pr_url: str) -> bool:
    """Best-effort merge of the given task PR.

    Unlike :meth:`TaskMerger.merge_pr`, this does not raise on failure.
    A ``False`` return means "leave for manual review" — the caller
    treats this as retry-eligible, not as an internal error.

    Args:
        pr_url: URL of the task PR to merge.

    Returns:
        ``True`` on merge success, ``False`` on any merge failure.
    """
    ...

WorkstreamIntegrator

Bases: Protocol

Create integration PR for a workstream lane.

Source code in src/agentrelay/workstream/core/io.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@runtime_checkable
class WorkstreamIntegrator(Protocol):
    """Create integration PR for a workstream lane."""

    def create_integration_pr(
        self, workstream_runtime: WorkstreamRuntime
    ) -> IntegrationResult:
        """Create a PR from the integration branch to the merge target.

        Args:
            workstream_runtime: Workstream runtime whose integration branch
                should be submitted as a PR.

        Returns:
            IntegrationResult: Whether the PR was skipped (no commits ahead)
            and the authoritative target branch SHA when skipped.
        """
        ...

create_integration_pr(workstream_runtime)

Create a PR from the integration branch to the merge target.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose integration branch should be submitted as a PR.

required

Returns:

Name Type Description
IntegrationResult IntegrationResult

Whether the PR was skipped (no commits ahead)

IntegrationResult

and the authoritative target branch SHA when skipped.

Source code in src/agentrelay/workstream/core/io.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def create_integration_pr(
    self, workstream_runtime: WorkstreamRuntime
) -> IntegrationResult:
    """Create a PR from the integration branch to the merge target.

    Args:
        workstream_runtime: Workstream runtime whose integration branch
            should be submitted as a PR.

    Returns:
        IntegrationResult: Whether the PR was skipped (no commits ahead)
        and the authoritative target branch SHA when skipped.
    """
    ...

WorkstreamPreparer

Bases: Protocol

Provision workspace infrastructure for a workstream lane.

Source code in src/agentrelay/workstream/core/io.py
73
74
75
76
77
78
79
80
81
82
83
@runtime_checkable
class WorkstreamPreparer(Protocol):
    """Provision workspace infrastructure for a workstream lane."""

    def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime to provision.
        """
        ...

prepare_workstream(workstream_runtime)

Provision worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to provision.

required
Source code in src/agentrelay/workstream/core/io.py
77
78
79
80
81
82
83
def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime to provision.
    """
    ...

WorkstreamTeardown

Bases: Protocol

Clean up workstream workspace infrastructure.

Source code in src/agentrelay/workstream/core/io.py
106
107
108
109
110
111
112
113
114
115
116
117
@runtime_checkable
class WorkstreamTeardown(Protocol):
    """Clean up workstream workspace infrastructure."""

    def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Delete worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime whose resources should be
                cleaned up.
        """
        ...

teardown_workstream(workstream_runtime)

Delete worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose resources should be cleaned up.

required
Source code in src/agentrelay/workstream/core/io.py
110
111
112
113
114
115
116
117
def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Delete worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime whose resources should be
            cleaned up.
    """
    ...

StandardWorkstreamRunner dataclass

Standard workstream lifecycle runner.

Drives workstream-level operations (prepare, integrate, teardown) by delegating to per-step protocol implementations. The orchestrator calls these methods at appropriate scheduling points.

Attributes:

Name Type Description
_preparer WorkstreamPreparer

Provision worktree and integration branch.

_integrator WorkstreamIntegrator

Create integration PR.

_teardown WorkstreamTeardown

Clean up worktree and integration branch.

Source code in src/agentrelay/workstream/core/runner.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@dataclass
class StandardWorkstreamRunner:
    """Standard workstream lifecycle runner.

    Drives workstream-level operations (prepare, integrate, teardown) by
    delegating to per-step protocol implementations. The orchestrator
    calls these methods at appropriate scheduling points.

    Attributes:
        _preparer: Provision worktree and integration branch.
        _integrator: Create integration PR.
        _teardown: Clean up worktree and integration branch.
    """

    _preparer: WorkstreamPreparer
    _integrator: WorkstreamIntegrator
    _teardown: WorkstreamTeardown

    def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision workspace infrastructure for a workstream.

        Creates the worktree and integration branch. Transitions from
        ``PENDING`` to ``ACTIVE`` on success, or ``FAILED`` on error.

        Args:
            workstream_runtime: Workstream runtime to provision.
        """
        try:
            self._preparer.prepare_workstream(workstream_runtime)
        except Exception as exc:
            workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
            raise
        workstream_runtime.mark_active()

    def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
        """Create the integration PR for the workstream.

        Transitions to ``PR_CREATED`` on success, or ``FAILED`` on error.
        When the integration PR is skipped (no commits ahead), stores the
        authoritative target branch SHA on the runtime artifacts.

        Args:
            workstream_runtime: Workstream runtime to integrate.

        Returns:
            WorkstreamRunResult: Snapshot of state after the operation.
        """
        try:
            result = self._integrator.create_integration_pr(workstream_runtime)
        except Exception as exc:
            workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
            return WorkstreamRunResult.from_runtime(workstream_runtime)

        if result.skipped:
            workstream_runtime.artifacts.target_branch_before_any_merge = (
                result.target_branch_authoritative_sha
            )

        return WorkstreamRunResult.from_runtime(workstream_runtime)

    def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Clean up workstream workspace infrastructure.

        Deletes the worktree and integration branch. Failures are recorded
        as concerns rather than changing workstream status.

        Args:
            workstream_runtime: Workstream runtime to tear down.
        """
        try:
            self._teardown.teardown_workstream(workstream_runtime)
        except Exception as exc:
            workstream_runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

prepare(workstream_runtime)

Provision workspace infrastructure for a workstream.

Creates the worktree and integration branch. Transitions from PENDING to ACTIVE on success, or FAILED on error.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to provision.

required
Source code in src/agentrelay/workstream/core/runner.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision workspace infrastructure for a workstream.

    Creates the worktree and integration branch. Transitions from
    ``PENDING`` to ``ACTIVE`` on success, or ``FAILED`` on error.

    Args:
        workstream_runtime: Workstream runtime to provision.
    """
    try:
        self._preparer.prepare_workstream(workstream_runtime)
    except Exception as exc:
        workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
        raise
    workstream_runtime.mark_active()

integrate(workstream_runtime)

Create the integration PR for the workstream.

Transitions to PR_CREATED on success, or FAILED on error. When the integration PR is skipped (no commits ahead), stores the authoritative target branch SHA on the runtime artifacts.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to integrate.

required

Returns:

Name Type Description
WorkstreamRunResult WorkstreamRunResult

Snapshot of state after the operation.

Source code in src/agentrelay/workstream/core/runner.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
    """Create the integration PR for the workstream.

    Transitions to ``PR_CREATED`` on success, or ``FAILED`` on error.
    When the integration PR is skipped (no commits ahead), stores the
    authoritative target branch SHA on the runtime artifacts.

    Args:
        workstream_runtime: Workstream runtime to integrate.

    Returns:
        WorkstreamRunResult: Snapshot of state after the operation.
    """
    try:
        result = self._integrator.create_integration_pr(workstream_runtime)
    except Exception as exc:
        workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
        return WorkstreamRunResult.from_runtime(workstream_runtime)

    if result.skipped:
        workstream_runtime.artifacts.target_branch_before_any_merge = (
            result.target_branch_authoritative_sha
        )

    return WorkstreamRunResult.from_runtime(workstream_runtime)

teardown(workstream_runtime)

Clean up workstream workspace infrastructure.

Deletes the worktree and integration branch. Failures are recorded as concerns rather than changing workstream status.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to tear down.

required
Source code in src/agentrelay/workstream/core/runner.py
139
140
141
142
143
144
145
146
147
148
149
150
151
def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Clean up workstream workspace infrastructure.

    Deletes the worktree and integration branch. Failures are recorded
    as concerns rather than changing workstream status.

    Args:
        workstream_runtime: Workstream runtime to tear down.
    """
    try:
        self._teardown.teardown_workstream(workstream_runtime)
    except Exception as exc:
        workstream_runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

WorkstreamRunner

Bases: Protocol

Protocol for the workstream runner boundary used by Orchestrator.

Different lifecycle variants (standard, dry-run) are different classes satisfying this protocol. The orchestrator does not know or care about internal step structure.

Source code in src/agentrelay/workstream/core/runner.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@runtime_checkable
class WorkstreamRunner(Protocol):
    """Protocol for the workstream runner boundary used by Orchestrator.

    Different lifecycle variants (standard, dry-run) are different classes
    satisfying this protocol. The orchestrator does not know or care about
    internal step structure.
    """

    def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision workspace infrastructure for a workstream."""
        ...

    def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
        """Create the integration PR for the workstream."""
        ...

    def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Clean up workstream workspace infrastructure."""
        ...

prepare(workstream_runtime)

Provision workspace infrastructure for a workstream.

Source code in src/agentrelay/workstream/core/runner.py
66
67
68
def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision workspace infrastructure for a workstream."""
    ...

integrate(workstream_runtime)

Create the integration PR for the workstream.

Source code in src/agentrelay/workstream/core/runner.py
70
71
72
def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
    """Create the integration PR for the workstream."""
    ...

teardown(workstream_runtime)

Clean up workstream workspace infrastructure.

Source code in src/agentrelay/workstream/core/runner.py
74
75
76
def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Clean up workstream workspace infrastructure."""
    ...

WorkstreamRunResult dataclass

Snapshot of workstream state after a lifecycle operation.

Attributes:

Name Type Description
workstream_id str

Workstream identifier.

status WorkstreamStatus

Current workstream status after the operation.

error Optional[str]

Error message if the operation failed, or None.

Source code in src/agentrelay/workstream/core/runner.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(frozen=True)
class WorkstreamRunResult:
    """Snapshot of workstream state after a lifecycle operation.

    Attributes:
        workstream_id: Workstream identifier.
        status: Current workstream status after the operation.
        error: Error message if the operation failed, or ``None``.
    """

    workstream_id: str
    status: WorkstreamStatus
    error: Optional[str]

    @classmethod
    def from_runtime(cls, runtime: WorkstreamRuntime) -> WorkstreamRunResult:
        """Build a result snapshot from the current workstream runtime state.

        Args:
            runtime: Workstream runtime to snapshot.

        Returns:
            WorkstreamRunResult: Snapshot of workstream ID, status, and error.
        """
        return cls(
            workstream_id=runtime.spec.id,
            status=runtime.status,
            error=runtime.state.error,
        )

from_runtime(runtime) classmethod

Build a result snapshot from the current workstream runtime state.

Parameters:

Name Type Description Default
runtime WorkstreamRuntime

Workstream runtime to snapshot.

required

Returns:

Name Type Description
WorkstreamRunResult WorkstreamRunResult

Snapshot of workstream ID, status, and error.

Source code in src/agentrelay/workstream/core/runner.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def from_runtime(cls, runtime: WorkstreamRuntime) -> WorkstreamRunResult:
    """Build a result snapshot from the current workstream runtime state.

    Args:
        runtime: Workstream runtime to snapshot.

    Returns:
        WorkstreamRunResult: Snapshot of workstream ID, status, and error.
    """
    return cls(
        workstream_id=runtime.spec.id,
        status=runtime.status,
        error=runtime.state.error,
    )

TaskSummary dataclass

Per-task summary included in integration PR body.

Attributes:

Name Type Description
task_id str

Unique task identifier.

description Optional[str]

Human-readable task description, or None.

role Optional[str]

Agent role value string (e.g. "spec_writer"), or None.

pr_url Optional[str]

URL of the task's pull request, or None.

concerns tuple[str, ...]

Design concerns recorded by the agent.

ops_concerns tuple[str, ...]

Operational concerns recorded by the agent.

summary_text Optional[str]

Content of the agent's summary.md, or None.

Source code in src/agentrelay/workstream/core/runtime.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@dataclass(frozen=True)
class TaskSummary:
    """Per-task summary included in integration PR body.

    Attributes:
        task_id: Unique task identifier.
        description: Human-readable task description, or ``None``.
        role: Agent role value string (e.g. ``"spec_writer"``), or ``None``.
        pr_url: URL of the task's pull request, or ``None``.
        concerns: Design concerns recorded by the agent.
        ops_concerns: Operational concerns recorded by the agent.
        summary_text: Content of the agent's ``summary.md``, or ``None``.
    """

    task_id: str
    description: Optional[str] = None
    role: Optional[str] = None
    pr_url: Optional[str] = None
    concerns: tuple[str, ...] = ()
    ops_concerns: tuple[str, ...] = ()
    summary_text: Optional[str] = None

WorkstreamArtifacts dataclass

Outputs and observations produced while advancing a workstream.

Attributes:

Name Type Description
merge_pr_url Optional[str]

URL of the workstream integration PR, or None if absent.

concerns list[str]

List of notable observations or concerns for this lane.

task_summaries list[TaskSummary]

Per-task summaries for integration PR body.

target_branch_before_any_merge Optional[str]

SHA of the target branch before any merge related to this workstream. Populated from whichever authority path detected the merge (integrator for skipped workstreams, auto-merger, or polled merge checker).

Source code in src/agentrelay/workstream/core/runtime.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@dataclass
class WorkstreamArtifacts:
    """Outputs and observations produced while advancing a workstream.

    Attributes:
        merge_pr_url: URL of the workstream integration PR, or ``None`` if absent.
        concerns: List of notable observations or concerns for this lane.
        task_summaries: Per-task summaries for integration PR body.
        target_branch_before_any_merge: SHA of the target branch before any
            merge related to this workstream.  Populated from whichever
            authority path detected the merge (integrator for skipped
            workstreams, auto-merger, or polled merge checker).
    """

    merge_pr_url: Optional[str] = None
    concerns: list[str] = field(default_factory=list)
    task_summaries: list[TaskSummary] = field(default_factory=list)
    target_branch_before_any_merge: Optional[str] = None

WorkstreamRuntime dataclass

Mutable runtime envelope for a :class:WorkstreamSpec.

Attributes:

Name Type Description
spec WorkstreamSpec

Immutable workstream specification for this lane.

state WorkstreamState

Mutable lane operational state.

artifacts WorkstreamArtifacts

Mutable lane output artifacts.

Source code in src/agentrelay/workstream/core/runtime.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@dataclass
class WorkstreamRuntime:
    """Mutable runtime envelope for a :class:`WorkstreamSpec`.

    Attributes:
        spec: Immutable workstream specification for this lane.
        state: Mutable lane operational state.
        artifacts: Mutable lane output artifacts.
    """

    spec: WorkstreamSpec
    state: WorkstreamState = field(default_factory=WorkstreamState)
    artifacts: WorkstreamArtifacts = field(default_factory=WorkstreamArtifacts)

    @property
    def status(self) -> WorkstreamStatus:
        """Current workstream status, derived from signal files on disk.

        Falls back to ``PENDING`` if no signal directory has been set,
        unless an error has been recorded (indicating failure before
        provisioning), in which case ``FAILED`` is returned.
        """
        if self.state.signal_dir is None:
            if self.state.error is not None:
                return WorkstreamStatus.FAILED
            return WorkstreamStatus.PENDING
        return _read_status_from_signals(self.state.signal_dir)

    def _write_signal(self, name: str, content: str = "") -> None:
        """Write a signal file to the workstream signal directory."""
        assert self.state.signal_dir is not None, "signal_dir must be set"
        self.state.signal_dir.mkdir(parents=True, exist_ok=True)
        (self.state.signal_dir / name).write_text(content)

    def mark_pending(self) -> None:
        """Write the ``pending`` signal file."""
        self._write_signal("pending")

    def mark_active(self) -> None:
        """Write the ``active`` signal file."""
        self._write_signal("active")

    def mark_merge_ready(self) -> None:
        """Write the ``merge_ready`` signal file."""
        self._write_signal("merge_ready")

    def mark_pr_created(self, pr_url: str) -> None:
        """Write the ``pr_created`` signal file with the PR URL."""
        self._write_signal("pr_created", pr_url)
        self.artifacts.merge_pr_url = pr_url

    def mark_merged(self) -> None:
        """Write the ``merged`` signal file."""
        self._write_signal("merged")

    def mark_failed(self, error: str) -> None:
        """Write the ``failed`` signal file with the error message.

        If ``signal_dir`` is not set (workstream was never prepared),
        only the in-memory error is recorded without writing to disk.
        """
        if self.state.signal_dir is not None:
            self._write_signal("failed", error)
        self.state.error = error

    def mark_reset(self) -> None:
        """Write the ``reset`` signal file."""
        self._write_signal("reset")

status property

Current workstream status, derived from signal files on disk.

Falls back to PENDING if no signal directory has been set, unless an error has been recorded (indicating failure before provisioning), in which case FAILED is returned.

mark_pending()

Write the pending signal file.

Source code in src/agentrelay/workstream/core/runtime.py
181
182
183
def mark_pending(self) -> None:
    """Write the ``pending`` signal file."""
    self._write_signal("pending")

mark_active()

Write the active signal file.

Source code in src/agentrelay/workstream/core/runtime.py
185
186
187
def mark_active(self) -> None:
    """Write the ``active`` signal file."""
    self._write_signal("active")

mark_merge_ready()

Write the merge_ready signal file.

Source code in src/agentrelay/workstream/core/runtime.py
189
190
191
def mark_merge_ready(self) -> None:
    """Write the ``merge_ready`` signal file."""
    self._write_signal("merge_ready")

mark_pr_created(pr_url)

Write the pr_created signal file with the PR URL.

Source code in src/agentrelay/workstream/core/runtime.py
193
194
195
196
def mark_pr_created(self, pr_url: str) -> None:
    """Write the ``pr_created`` signal file with the PR URL."""
    self._write_signal("pr_created", pr_url)
    self.artifacts.merge_pr_url = pr_url

mark_merged()

Write the merged signal file.

Source code in src/agentrelay/workstream/core/runtime.py
198
199
200
def mark_merged(self) -> None:
    """Write the ``merged`` signal file."""
    self._write_signal("merged")

mark_failed(error)

Write the failed signal file with the error message.

If signal_dir is not set (workstream was never prepared), only the in-memory error is recorded without writing to disk.

Source code in src/agentrelay/workstream/core/runtime.py
202
203
204
205
206
207
208
209
210
def mark_failed(self, error: str) -> None:
    """Write the ``failed`` signal file with the error message.

    If ``signal_dir`` is not set (workstream was never prepared),
    only the in-memory error is recorded without writing to disk.
    """
    if self.state.signal_dir is not None:
        self._write_signal("failed", error)
    self.state.error = error

mark_reset()

Write the reset signal file.

Source code in src/agentrelay/workstream/core/runtime.py
212
213
214
def mark_reset(self) -> None:
    """Write the ``reset`` signal file."""
    self._write_signal("reset")

WorkstreamState dataclass

Mutable operational state of a workstream lane.

Attributes:

Name Type Description
signal_dir Optional[Path]

Path to the workstream signal directory for status files, or None if not provisioned yet.

worktree_path Optional[Path]

Filesystem path to the worktree used for this lane, or None if not provisioned yet.

branch_name Optional[str]

Primary branch name used for this lane, or None until set.

error Optional[str]

Error message if lane execution failed, else None.

Source code in src/agentrelay/workstream/core/runtime.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dataclass
class WorkstreamState:
    """Mutable operational state of a workstream lane.

    Attributes:
        signal_dir: Path to the workstream signal directory for status files,
            or ``None`` if not provisioned yet.
        worktree_path: Filesystem path to the worktree used for this lane, or
            ``None`` if not provisioned yet.
        branch_name: Primary branch name used for this lane, or ``None`` until set.
        error: Error message if lane execution failed, else ``None``.
    """

    signal_dir: Optional[Path] = None
    worktree_path: Optional[Path] = None
    branch_name: Optional[str] = None
    error: Optional[str] = None

WorkstreamStatus

Bases: str, Enum

Execution state of a workstream during orchestration.

Attributes:

Name Type Description
PENDING

Workstream has not started provisioning/execution.

ACTIVE

Workstream is currently active (for example has running tasks).

MERGE_READY

All tasks completed; workstream is ready for integration.

PR_CREATED

Integration PR has been created, awaiting merge.

MERGED

Integration PR has been merged into the target branch.

FAILED

Workstream execution failed.

RESET

Workstream was reset via reset-workstream or teardown-workstream. Signal directory is preserved for history; workstream is logically available for re-execution.

Source code in src/agentrelay/workstream/core/runtime.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class WorkstreamStatus(str, Enum):
    """Execution state of a workstream during orchestration.

    Attributes:
        PENDING: Workstream has not started provisioning/execution.
        ACTIVE: Workstream is currently active (for example has running tasks).
        MERGE_READY: All tasks completed; workstream is ready for integration.
        PR_CREATED: Integration PR has been created, awaiting merge.
        MERGED: Integration PR has been merged into the target branch.
        FAILED: Workstream execution failed.
        RESET: Workstream was reset via ``reset-workstream`` or
            ``teardown-workstream``.  Signal directory is preserved for
            history; workstream is logically available for re-execution.
    """

    PENDING = "pending"
    ACTIVE = "active"
    MERGE_READY = "merge_ready"
    PR_CREATED = "pr_created"
    MERGED = "merged"
    FAILED = "failed"
    RESET = "reset"

WorkstreamSpec dataclass

Immutable specification for a task workstream.

A workstream models a branch/worktree execution lane that one or more tasks can target. Parent/child workstream relationships are defined at the spec level and validated by graph-building layers.

Attributes:

Name Type Description
id str

Unique workstream identifier within a task graph.

parent_workstream_id Optional[str]

Optional parent workstream ID for hierarchical stream topologies (for example A -> A.1 and A.2).

base_branch str

Branch name used as the base when creating workstream integration branches or worktrees. Defaults to "main".

merge_target_branch str

Branch name the workstream ultimately merges into. Defaults to "main".

auto_merge bool

When True, the orchestrator merges the integration PR automatically after creation — provided no task in the workstream recorded a design concern. Defaults to False.

Source code in src/agentrelay/workstream/core/workstream.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass(frozen=True)
class WorkstreamSpec:
    """Immutable specification for a task workstream.

    A workstream models a branch/worktree execution lane that one or more tasks
    can target. Parent/child workstream relationships are defined at the spec
    level and validated by graph-building layers.

    Attributes:
        id: Unique workstream identifier within a task graph.
        parent_workstream_id: Optional parent workstream ID for hierarchical
            stream topologies (for example ``A`` -> ``A.1`` and ``A.2``).
        base_branch: Branch name used as the base when creating workstream
            integration branches or worktrees. Defaults to ``"main"``.
        merge_target_branch: Branch name the workstream ultimately merges into.
            Defaults to ``"main"``.
        auto_merge: When ``True``, the orchestrator merges the integration PR
            automatically after creation — provided no task in the workstream
            recorded a design concern.  Defaults to ``False``.
    """

    id: str
    parent_workstream_id: Optional[str] = None
    base_branch: str = "main"
    merge_target_branch: str = "main"
    auto_merge: bool = False
    isolation: Optional[IsolationConfig] = None

GhIntegrationAutoMerger dataclass

Merge a workstream's integration PR via GitHub CLI.

Captures the target branch SHA before merging for rollback support. Reads the merge PR URL from workstream artifacts and delegates to :func:~agentrelay.ops.gh.pr_merge.

Source code in src/agentrelay/workstream/implementations/integration_auto_merger.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@dataclass
class GhIntegrationAutoMerger:
    """Merge a workstream's integration PR via GitHub CLI.

    Captures the target branch SHA before merging for rollback support.
    Reads the merge PR URL from workstream artifacts and delegates
    to :func:`~agentrelay.ops.gh.pr_merge`.
    """

    repo_path: Path

    def merge(self, workstream_runtime: WorkstreamRuntime) -> IntegrationMergeResult:
        """Merge the integration PR for this workstream.

        Returns:
            IntegrationMergeResult: Pre-merge SHA of the target branch.

        Raises:
            RuntimeError: If no integration PR URL is available.
            subprocess.CalledProcessError: If the ``gh pr merge`` call fails.
        """
        pr_url = workstream_runtime.artifacts.merge_pr_url
        if pr_url is None:
            raise RuntimeError("No integration PR URL available for auto-merge")

        target_branch = workstream_runtime.spec.merge_target_branch
        before_sha = git.rev_parse(self.repo_path, target_branch)

        gh.pr_merge(pr_url)

        return IntegrationMergeResult(target_branch_before_merge=before_sha)

merge(workstream_runtime)

Merge the integration PR for this workstream.

Returns:

Name Type Description
IntegrationMergeResult IntegrationMergeResult

Pre-merge SHA of the target branch.

Raises:

Type Description
RuntimeError

If no integration PR URL is available.

CalledProcessError

If the gh pr merge call fails.

Source code in src/agentrelay/workstream/implementations/integration_auto_merger.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def merge(self, workstream_runtime: WorkstreamRuntime) -> IntegrationMergeResult:
    """Merge the integration PR for this workstream.

    Returns:
        IntegrationMergeResult: Pre-merge SHA of the target branch.

    Raises:
        RuntimeError: If no integration PR URL is available.
        subprocess.CalledProcessError: If the ``gh pr merge`` call fails.
    """
    pr_url = workstream_runtime.artifacts.merge_pr_url
    if pr_url is None:
        raise RuntimeError("No integration PR URL available for auto-merge")

    target_branch = workstream_runtime.spec.merge_target_branch
    before_sha = git.rev_parse(self.repo_path, target_branch)

    gh.pr_merge(pr_url)

    return IntegrationMergeResult(target_branch_before_merge=before_sha)

GhIntegrationMergeChecker dataclass

Check integration PR merge status via GitHub CLI.

Reads the merge PR URL from workstream artifacts and delegates to :func:~agentrelay.ops.gh.pr_is_merged. When a merge is detected, derives the pre-merge target branch SHA from the merge commit's first parent via :func:~agentrelay.ops.gh.pr_merge_commit_sha and :func:~agentrelay.ops.git.rev_parse.

Source code in src/agentrelay/workstream/implementations/integration_merge_checker.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class GhIntegrationMergeChecker:
    """Check integration PR merge status via GitHub CLI.

    Reads the merge PR URL from workstream artifacts and delegates
    to :func:`~agentrelay.ops.gh.pr_is_merged`.  When a merge is detected,
    derives the pre-merge target branch SHA from the merge commit's first
    parent via :func:`~agentrelay.ops.gh.pr_merge_commit_sha` and
    :func:`~agentrelay.ops.git.rev_parse`.
    """

    repo_path: Path

    def is_merged(
        self, workstream_runtime: WorkstreamRuntime
    ) -> IntegrationMergeCheckResult:
        """Check whether the workstream's integration PR is merged on GitHub.

        When the PR is merged, attempts to derive the pre-merge target
        branch SHA from the merge commit's first parent.  Gracefully
        degrades to ``target_branch_before_merge=None`` if the merge
        commit cannot be determined.

        Args:
            workstream_runtime: Workstream runtime to check.

        Returns:
            IntegrationMergeCheckResult: Merge status and pre-merge SHA.
        """
        pr_url = workstream_runtime.artifacts.merge_pr_url
        if pr_url is None:
            return IntegrationMergeCheckResult(merged=False)
        if not gh.pr_is_merged(pr_url):
            return IntegrationMergeCheckResult(merged=False)

        # Derive the target branch SHA from the merge commit's first parent.
        before_sha: str | None = None
        merge_sha = gh.pr_merge_commit_sha(pr_url)
        if merge_sha is not None:
            try:
                before_sha = git.rev_parse(self.repo_path, merge_sha + "^1")
            except Exception:  # noqa: BLE001
                pass  # Graceful degradation — before_sha stays None.

        return IntegrationMergeCheckResult(
            merged=True, target_branch_before_merge=before_sha
        )

is_merged(workstream_runtime)

Check whether the workstream's integration PR is merged on GitHub.

When the PR is merged, attempts to derive the pre-merge target branch SHA from the merge commit's first parent. Gracefully degrades to target_branch_before_merge=None if the merge commit cannot be determined.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to check.

required

Returns:

Name Type Description
IntegrationMergeCheckResult IntegrationMergeCheckResult

Merge status and pre-merge SHA.

Source code in src/agentrelay/workstream/implementations/integration_merge_checker.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def is_merged(
    self, workstream_runtime: WorkstreamRuntime
) -> IntegrationMergeCheckResult:
    """Check whether the workstream's integration PR is merged on GitHub.

    When the PR is merged, attempts to derive the pre-merge target
    branch SHA from the merge commit's first parent.  Gracefully
    degrades to ``target_branch_before_merge=None`` if the merge
    commit cannot be determined.

    Args:
        workstream_runtime: Workstream runtime to check.

    Returns:
        IntegrationMergeCheckResult: Merge status and pre-merge SHA.
    """
    pr_url = workstream_runtime.artifacts.merge_pr_url
    if pr_url is None:
        return IntegrationMergeCheckResult(merged=False)
    if not gh.pr_is_merged(pr_url):
        return IntegrationMergeCheckResult(merged=False)

    # Derive the target branch SHA from the merge commit's first parent.
    before_sha: str | None = None
    merge_sha = gh.pr_merge_commit_sha(pr_url)
    if merge_sha is not None:
        try:
            before_sha = git.rev_parse(self.repo_path, merge_sha + "^1")
        except Exception:  # noqa: BLE001
            pass  # Graceful degradation — before_sha stays None.

    return IntegrationMergeCheckResult(
        merged=True, target_branch_before_merge=before_sha
    )

GhTaskPrProber dataclass

Probe task PR merge state via the GitHub CLI.

Stateless thin wrapper over :func:agentrelay.ops.gh.pr_is_merged and :func:agentrelay.ops.gh.pr_merge. Used by the resumption probe to normalize stale PR_CREATED tasks without depending directly on the ops/gh module, matching the protocol-isolation pattern used by :class:GhTaskMerger, :class:GhIntegrationMergeChecker, and :class:GhIntegrationAutoMerger.

Source code in src/agentrelay/workstream/implementations/task_pr_prober.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@dataclass
class GhTaskPrProber:
    """Probe task PR merge state via the GitHub CLI.

    Stateless thin wrapper over :func:`agentrelay.ops.gh.pr_is_merged` and
    :func:`agentrelay.ops.gh.pr_merge`.  Used by the resumption probe to
    normalize stale ``PR_CREATED`` tasks without depending directly on the
    ``ops/gh`` module, matching the protocol-isolation pattern used by
    :class:`GhTaskMerger`, :class:`GhIntegrationMergeChecker`, and
    :class:`GhIntegrationAutoMerger`.
    """

    def is_merged(self, pr_url: str) -> bool:
        """Return whether the given task PR is already merged.

        Delegates to :func:`agentrelay.ops.gh.pr_is_merged`, which returns
        ``False`` on subprocess failure rather than raising — safe for use
        in the probe's normalization path.
        """
        return gh.pr_is_merged(pr_url)

    def try_merge(self, pr_url: str) -> bool:
        """Best-effort merge of the given task PR.

        Wraps :func:`agentrelay.ops.gh.pr_merge` in a try/except so merge
        failures (conflicts, branch protection, transient network errors)
        return ``False`` instead of propagating a ``CalledProcessError``.
        The probe treats ``False`` as retry-eligible ``FAILED``, not as an
        internal error.
        """
        try:
            gh.pr_merge(pr_url)
            return True
        except subprocess.CalledProcessError:
            return False

is_merged(pr_url)

Return whether the given task PR is already merged.

Delegates to :func:agentrelay.ops.gh.pr_is_merged, which returns False on subprocess failure rather than raising — safe for use in the probe's normalization path.

Source code in src/agentrelay/workstream/implementations/task_pr_prober.py
27
28
29
30
31
32
33
34
def is_merged(self, pr_url: str) -> bool:
    """Return whether the given task PR is already merged.

    Delegates to :func:`agentrelay.ops.gh.pr_is_merged`, which returns
    ``False`` on subprocess failure rather than raising — safe for use
    in the probe's normalization path.
    """
    return gh.pr_is_merged(pr_url)

try_merge(pr_url)

Best-effort merge of the given task PR.

Wraps :func:agentrelay.ops.gh.pr_merge in a try/except so merge failures (conflicts, branch protection, transient network errors) return False instead of propagating a CalledProcessError. The probe treats False as retry-eligible FAILED, not as an internal error.

Source code in src/agentrelay/workstream/implementations/task_pr_prober.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def try_merge(self, pr_url: str) -> bool:
    """Best-effort merge of the given task PR.

    Wraps :func:`agentrelay.ops.gh.pr_merge` in a try/except so merge
    failures (conflicts, branch protection, transient network errors)
    return ``False`` instead of propagating a ``CalledProcessError``.
    The probe treats ``False`` as retry-eligible ``FAILED``, not as an
    internal error.
    """
    try:
        gh.pr_merge(pr_url)
        return True
    except subprocess.CalledProcessError:
        return False

GhWorkstreamIntegrator dataclass

Create the workstream integration PR via GitHub CLI.

Creates a pull request from the workstream's integration branch into its merge_target_branch. Does NOT merge the PR — that is left for human review (or a future agent-merge step).

Source code in src/agentrelay/workstream/implementations/workstream_integrator.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@dataclass
class GhWorkstreamIntegrator:
    """Create the workstream integration PR via GitHub CLI.

    Creates a pull request from the workstream's integration branch into
    its ``merge_target_branch``. Does NOT merge the PR — that is left
    for human review (or a future agent-merge step).
    """

    repo_path: Path

    def create_integration_pr(
        self, workstream_runtime: WorkstreamRuntime
    ) -> IntegrationResult:
        """Create a PR from the integration branch to the merge target.

        When the integration branch has no commits ahead of the target,
        the PR is skipped and the workstream transitions directly to
        ``MERGED``.  In this case, the integrator is the authoritative
        source for the target branch SHA (nothing changed).

        Args:
            workstream_runtime: Workstream runtime whose integration branch
                should be submitted as a PR.

        Returns:
            IntegrationResult: Whether the PR was skipped and the
            authoritative target branch SHA when skipped.
        """
        spec = workstream_runtime.spec
        branch_name = workstream_runtime.state.branch_name
        assert branch_name is not None, "branch_name must be set before integration"

        ahead = git.rev_list_count(
            self.repo_path, spec.merge_target_branch, branch_name
        )
        if ahead == 0:
            target_sha = git.rev_parse(self.repo_path, spec.merge_target_branch)
            workstream_runtime.mark_merged()
            return IntegrationResult(
                skipped=True, target_branch_authoritative_sha=target_sha
            )

        body = _build_pr_body(workstream_runtime)

        pr_url = gh.pr_create(
            self.repo_path,
            title=f"Integrate workstream {spec.id}",
            body=body,
            base=spec.merge_target_branch,
            head=branch_name,
        )

        workstream_runtime.mark_pr_created(pr_url)
        return IntegrationResult(skipped=False)

create_integration_pr(workstream_runtime)

Create a PR from the integration branch to the merge target.

When the integration branch has no commits ahead of the target, the PR is skipped and the workstream transitions directly to MERGED. In this case, the integrator is the authoritative source for the target branch SHA (nothing changed).

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose integration branch should be submitted as a PR.

required

Returns:

Name Type Description
IntegrationResult IntegrationResult

Whether the PR was skipped and the

IntegrationResult

authoritative target branch SHA when skipped.

Source code in src/agentrelay/workstream/implementations/workstream_integrator.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def create_integration_pr(
    self, workstream_runtime: WorkstreamRuntime
) -> IntegrationResult:
    """Create a PR from the integration branch to the merge target.

    When the integration branch has no commits ahead of the target,
    the PR is skipped and the workstream transitions directly to
    ``MERGED``.  In this case, the integrator is the authoritative
    source for the target branch SHA (nothing changed).

    Args:
        workstream_runtime: Workstream runtime whose integration branch
            should be submitted as a PR.

    Returns:
        IntegrationResult: Whether the PR was skipped and the
        authoritative target branch SHA when skipped.
    """
    spec = workstream_runtime.spec
    branch_name = workstream_runtime.state.branch_name
    assert branch_name is not None, "branch_name must be set before integration"

    ahead = git.rev_list_count(
        self.repo_path, spec.merge_target_branch, branch_name
    )
    if ahead == 0:
        target_sha = git.rev_parse(self.repo_path, spec.merge_target_branch)
        workstream_runtime.mark_merged()
        return IntegrationResult(
            skipped=True, target_branch_authoritative_sha=target_sha
        )

    body = _build_pr_body(workstream_runtime)

    pr_url = gh.pr_create(
        self.repo_path,
        title=f"Integrate workstream {spec.id}",
        body=body,
        base=spec.merge_target_branch,
        head=branch_name,
    )

    workstream_runtime.mark_pr_created(pr_url)
    return IntegrationResult(skipped=False)

GitWorkstreamPreparer dataclass

Provision a worktree and integration branch for a workstream lane.

Creates a git worktree rooted at <repo_path>/.worktrees/<graph_name>/<workstream_id> with a new integration branch agentrelay/<graph_name>/<workstream_id>/integration off the workstream's base_branch. Pushes the integration branch to origin so that task PRs can target it.

Source code in src/agentrelay/workstream/implementations/workstream_preparer.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@dataclass
class GitWorkstreamPreparer:
    """Provision a worktree and integration branch for a workstream lane.

    Creates a git worktree rooted at
    ``<repo_path>/.worktrees/<graph_name>/<workstream_id>`` with a new
    integration branch
    ``agentrelay/<graph_name>/<workstream_id>/integration`` off the
    workstream's ``base_branch``. Pushes the integration branch to origin
    so that task PRs can target it.
    """

    repo_path: Path
    graph_name: str
    run_dir: Path

    def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime to provision.
        """
        spec = workstream_runtime.spec
        branch_name = f"agentrelay/{self.graph_name}/{spec.id}/integration"
        worktree_path = self.repo_path / ".worktrees" / self.graph_name / spec.id

        if worktree_path.is_dir():
            # Resume scenario: worktree already exists from a prior run.
            # Skip git operations — the integration branch and config are
            # already in place.
            pass
        else:
            try:
                # Ensure the base branch is current before creating the
                # worktree.  Integration PR merges happen on the remote; the
                # local ref may be stale unless we fetch + update before
                # branching from it.
                git.fetch_branch(self.repo_path, spec.base_branch)
                git.update_local_ref(
                    self.repo_path,
                    spec.base_branch,
                    f"origin/{spec.base_branch}",
                )

                # Check if the integration branch already exists (e.g.,
                # after reset-to rolled back the branch but didn't delete
                # it, and the prior run's teardown removed the worktree).
                branch_exists = False
                try:
                    git.rev_parse(self.repo_path, branch_name)
                    branch_exists = True
                except subprocess.CalledProcessError:
                    pass

                if branch_exists:
                    # Reuse existing branch — create worktree without -b.
                    git.worktree_add_existing(
                        self.repo_path, worktree_path, branch_name
                    )
                else:
                    git.worktree_add(
                        self.repo_path,
                        worktree_path,
                        branch_name,
                        spec.base_branch,
                    )
                    git.push_branch(self.repo_path, branch_name, set_upstream=True)
                git.set_config(worktree_path, "push.autoSetupRemote", "true")
            except subprocess.CalledProcessError as exc:
                raise _WorkspaceIntegrationError(
                    f"Failed to provision workstream {spec.id!r}: {exc}",
                ) from exc

        signal_dir = self.run_dir / "workstreams" / spec.id
        workstream_runtime.state.signal_dir = signal_dir
        workstream_runtime.state.worktree_path = worktree_path
        workstream_runtime.state.branch_name = branch_name
        workstream_runtime.mark_pending()

prepare_workstream(workstream_runtime)

Provision worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to provision.

required
Source code in src/agentrelay/workstream/implementations/workstream_preparer.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime to provision.
    """
    spec = workstream_runtime.spec
    branch_name = f"agentrelay/{self.graph_name}/{spec.id}/integration"
    worktree_path = self.repo_path / ".worktrees" / self.graph_name / spec.id

    if worktree_path.is_dir():
        # Resume scenario: worktree already exists from a prior run.
        # Skip git operations — the integration branch and config are
        # already in place.
        pass
    else:
        try:
            # Ensure the base branch is current before creating the
            # worktree.  Integration PR merges happen on the remote; the
            # local ref may be stale unless we fetch + update before
            # branching from it.
            git.fetch_branch(self.repo_path, spec.base_branch)
            git.update_local_ref(
                self.repo_path,
                spec.base_branch,
                f"origin/{spec.base_branch}",
            )

            # Check if the integration branch already exists (e.g.,
            # after reset-to rolled back the branch but didn't delete
            # it, and the prior run's teardown removed the worktree).
            branch_exists = False
            try:
                git.rev_parse(self.repo_path, branch_name)
                branch_exists = True
            except subprocess.CalledProcessError:
                pass

            if branch_exists:
                # Reuse existing branch — create worktree without -b.
                git.worktree_add_existing(
                    self.repo_path, worktree_path, branch_name
                )
            else:
                git.worktree_add(
                    self.repo_path,
                    worktree_path,
                    branch_name,
                    spec.base_branch,
                )
                git.push_branch(self.repo_path, branch_name, set_upstream=True)
            git.set_config(worktree_path, "push.autoSetupRemote", "true")
        except subprocess.CalledProcessError as exc:
            raise _WorkspaceIntegrationError(
                f"Failed to provision workstream {spec.id!r}: {exc}",
            ) from exc

    signal_dir = self.run_dir / "workstreams" / spec.id
    workstream_runtime.state.signal_dir = signal_dir
    workstream_runtime.state.worktree_path = worktree_path
    workstream_runtime.state.branch_name = branch_name
    workstream_runtime.mark_pending()

GitWorkstreamTeardown dataclass

Clean up workstream worktree and integration branch.

Performs best-effort cleanup: removes the worktree directory, deletes the local integration branch, and deletes the remote integration branch. Errors during any step are silently caught so that subsequent cleanup steps still execute.

Source code in src/agentrelay/workstream/implementations/workstream_teardown.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass
class GitWorkstreamTeardown:
    """Clean up workstream worktree and integration branch.

    Performs best-effort cleanup: removes the worktree directory, deletes
    the local integration branch, and deletes the remote integration branch.
    Errors during any step are silently caught so that subsequent cleanup
    steps still execute.
    """

    repo_path: Path

    def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Delete worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime whose resources should be
                cleaned up.
        """
        worktree_path = workstream_runtime.state.worktree_path
        branch_name = workstream_runtime.state.branch_name

        if worktree_path is not None:
            try:
                git.worktree_remove(self.repo_path, worktree_path)
            except subprocess.CalledProcessError:
                pass  # Best-effort: worktree may already be gone

        # Skip branch deletion if the integration PR is open and awaiting
        # human merge — deleting the branch would auto-close the PR.
        if workstream_runtime.status == WorkstreamStatus.PR_CREATED:
            return

        if branch_name is not None:
            try:
                git.branch_delete(self.repo_path, branch_name)
            except subprocess.CalledProcessError:
                pass  # Best-effort: branch may already be deleted

            try:
                git.push_delete_branch(self.repo_path, branch_name)
            except subprocess.CalledProcessError:
                pass  # Best-effort: remote branch may already be gone

teardown_workstream(workstream_runtime)

Delete worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose resources should be cleaned up.

required
Source code in src/agentrelay/workstream/implementations/workstream_teardown.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Delete worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime whose resources should be
            cleaned up.
    """
    worktree_path = workstream_runtime.state.worktree_path
    branch_name = workstream_runtime.state.branch_name

    if worktree_path is not None:
        try:
            git.worktree_remove(self.repo_path, worktree_path)
        except subprocess.CalledProcessError:
            pass  # Best-effort: worktree may already be gone

    # Skip branch deletion if the integration PR is open and awaiting
    # human merge — deleting the branch would auto-close the PR.
    if workstream_runtime.status == WorkstreamStatus.PR_CREATED:
        return

    if branch_name is not None:
        try:
            git.branch_delete(self.repo_path, branch_name)
        except subprocess.CalledProcessError:
            pass  # Best-effort: branch may already be deleted

        try:
            git.push_delete_branch(self.repo_path, branch_name)
        except subprocess.CalledProcessError:
            pass  # Best-effort: remote branch may already be gone