Skip to content

workstream

Workstream model for lane-level execution and integration state.

This package defines immutable workstream specifications, mutable runtime state for each workstream lane, and the workstream-level lifecycle runner with its I/O protocols.

Subpackages

core: Specs, runtime state, protocols, and runner. implementations: Concrete protocol implementations.

WorkstreamIntegrator

Bases: Protocol

Create integration PR for a workstream lane.

Source code in src/agentrelay/workstream/core/io.py
33
34
35
36
37
38
39
40
41
42
43
44
@runtime_checkable
class WorkstreamIntegrator(Protocol):
    """Create integration PR for a workstream lane."""

    def create_integration_pr(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Create a PR from the integration branch to the merge target.

        Args:
            workstream_runtime: Workstream runtime whose integration branch
                should be submitted as a PR.
        """
        ...

create_integration_pr(workstream_runtime)

Create a PR from the integration branch to the merge target.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose integration branch should be submitted as a PR.

required
Source code in src/agentrelay/workstream/core/io.py
37
38
39
40
41
42
43
44
def create_integration_pr(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Create a PR from the integration branch to the merge target.

    Args:
        workstream_runtime: Workstream runtime whose integration branch
            should be submitted as a PR.
    """
    ...

WorkstreamPreparer

Bases: Protocol

Provision workspace infrastructure for a workstream lane.

Source code in src/agentrelay/workstream/core/io.py
20
21
22
23
24
25
26
27
28
29
30
@runtime_checkable
class WorkstreamPreparer(Protocol):
    """Provision workspace infrastructure for a workstream lane."""

    def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime to provision.
        """
        ...

prepare_workstream(workstream_runtime)

Provision worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to provision.

required
Source code in src/agentrelay/workstream/core/io.py
24
25
26
27
28
29
30
def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime to provision.
    """
    ...

WorkstreamTeardown

Bases: Protocol

Clean up workstream workspace infrastructure.

Source code in src/agentrelay/workstream/core/io.py
47
48
49
50
51
52
53
54
55
56
57
58
@runtime_checkable
class WorkstreamTeardown(Protocol):
    """Clean up workstream workspace infrastructure."""

    def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Delete worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime whose resources should be
                cleaned up.
        """
        ...

teardown_workstream(workstream_runtime)

Delete worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose resources should be cleaned up.

required
Source code in src/agentrelay/workstream/core/io.py
51
52
53
54
55
56
57
58
def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Delete worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime whose resources should be
            cleaned up.
    """
    ...

StandardWorkstreamRunner dataclass

Standard workstream lifecycle runner.

Drives workstream-level operations (prepare, integrate, teardown) by delegating to per-step protocol implementations. The orchestrator calls these methods at appropriate scheduling points.

Attributes:

Name Type Description
_preparer WorkstreamPreparer

Provision worktree and integration branch.

_integrator WorkstreamIntegrator

Create integration PR.

_teardown WorkstreamTeardown

Clean up worktree and integration branch.

Source code in src/agentrelay/workstream/core/runner.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@dataclass
class StandardWorkstreamRunner:
    """Standard workstream lifecycle runner.

    Drives workstream-level operations (prepare, integrate, teardown) by
    delegating to per-step protocol implementations. The orchestrator
    calls these methods at appropriate scheduling points.

    Attributes:
        _preparer: Provision worktree and integration branch.
        _integrator: Create integration PR.
        _teardown: Clean up worktree and integration branch.
    """

    _preparer: WorkstreamPreparer
    _integrator: WorkstreamIntegrator
    _teardown: WorkstreamTeardown

    def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision workspace infrastructure for a workstream.

        Creates the worktree and integration branch. Transitions from
        ``PENDING`` to ``ACTIVE`` on success, or ``FAILED`` on error.

        Args:
            workstream_runtime: Workstream runtime to provision.
        """
        try:
            self._preparer.prepare_workstream(workstream_runtime)
        except Exception as exc:
            workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
            raise
        workstream_runtime.mark_active()

    def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
        """Create the integration PR for the workstream.

        Transitions to ``PR_CREATED`` on success, or ``FAILED`` on error.

        Args:
            workstream_runtime: Workstream runtime to integrate.

        Returns:
            WorkstreamRunResult: Snapshot of state after the operation.
        """
        try:
            self._integrator.create_integration_pr(workstream_runtime)
        except Exception as exc:
            workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
            return WorkstreamRunResult.from_runtime(workstream_runtime)

        return WorkstreamRunResult.from_runtime(workstream_runtime)

    def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Clean up workstream workspace infrastructure.

        Deletes the worktree and integration branch. Failures are recorded
        as concerns rather than changing workstream status.

        Args:
            workstream_runtime: Workstream runtime to tear down.
        """
        try:
            self._teardown.teardown_workstream(workstream_runtime)
        except Exception as exc:
            workstream_runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

prepare(workstream_runtime)

Provision workspace infrastructure for a workstream.

Creates the worktree and integration branch. Transitions from PENDING to ACTIVE on success, or FAILED on error.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to provision.

required
Source code in src/agentrelay/workstream/core/runner.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision workspace infrastructure for a workstream.

    Creates the worktree and integration branch. Transitions from
    ``PENDING`` to ``ACTIVE`` on success, or ``FAILED`` on error.

    Args:
        workstream_runtime: Workstream runtime to provision.
    """
    try:
        self._preparer.prepare_workstream(workstream_runtime)
    except Exception as exc:
        workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
        raise
    workstream_runtime.mark_active()

integrate(workstream_runtime)

Create the integration PR for the workstream.

Transitions to PR_CREATED on success, or FAILED on error.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to integrate.

required

Returns:

Name Type Description
WorkstreamRunResult WorkstreamRunResult

Snapshot of state after the operation.

Source code in src/agentrelay/workstream/core/runner.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
    """Create the integration PR for the workstream.

    Transitions to ``PR_CREATED`` on success, or ``FAILED`` on error.

    Args:
        workstream_runtime: Workstream runtime to integrate.

    Returns:
        WorkstreamRunResult: Snapshot of state after the operation.
    """
    try:
        self._integrator.create_integration_pr(workstream_runtime)
    except Exception as exc:
        workstream_runtime.mark_failed(f"{type(exc).__name__}: {exc}")
        return WorkstreamRunResult.from_runtime(workstream_runtime)

    return WorkstreamRunResult.from_runtime(workstream_runtime)

teardown(workstream_runtime)

Clean up workstream workspace infrastructure.

Deletes the worktree and integration branch. Failures are recorded as concerns rather than changing workstream status.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to tear down.

required
Source code in src/agentrelay/workstream/core/runner.py
132
133
134
135
136
137
138
139
140
141
142
143
144
def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Clean up workstream workspace infrastructure.

    Deletes the worktree and integration branch. Failures are recorded
    as concerns rather than changing workstream status.

    Args:
        workstream_runtime: Workstream runtime to tear down.
    """
    try:
        self._teardown.teardown_workstream(workstream_runtime)
    except Exception as exc:
        workstream_runtime.artifacts.concerns.append(f"teardown_failed: {exc}")

WorkstreamRunner

Bases: Protocol

Protocol for the workstream runner boundary used by Orchestrator.

Different lifecycle variants (standard, dry-run) are different classes satisfying this protocol. The orchestrator does not know or care about internal step structure.

Source code in src/agentrelay/workstream/core/runner.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@runtime_checkable
class WorkstreamRunner(Protocol):
    """Protocol for the workstream runner boundary used by Orchestrator.

    Different lifecycle variants (standard, dry-run) are different classes
    satisfying this protocol. The orchestrator does not know or care about
    internal step structure.
    """

    def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision workspace infrastructure for a workstream."""
        ...

    def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
        """Create the integration PR for the workstream."""
        ...

    def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Clean up workstream workspace infrastructure."""
        ...

prepare(workstream_runtime)

Provision workspace infrastructure for a workstream.

Source code in src/agentrelay/workstream/core/runner.py
66
67
68
def prepare(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision workspace infrastructure for a workstream."""
    ...

integrate(workstream_runtime)

Create the integration PR for the workstream.

Source code in src/agentrelay/workstream/core/runner.py
70
71
72
def integrate(self, workstream_runtime: WorkstreamRuntime) -> WorkstreamRunResult:
    """Create the integration PR for the workstream."""
    ...

teardown(workstream_runtime)

Clean up workstream workspace infrastructure.

Source code in src/agentrelay/workstream/core/runner.py
74
75
76
def teardown(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Clean up workstream workspace infrastructure."""
    ...

WorkstreamRunResult dataclass

Snapshot of workstream state after a lifecycle operation.

Attributes:

Name Type Description
workstream_id str

Workstream identifier.

status WorkstreamStatus

Current workstream status after the operation.

error Optional[str]

Error message if the operation failed, or None.

Source code in src/agentrelay/workstream/core/runner.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@dataclass(frozen=True)
class WorkstreamRunResult:
    """Snapshot of workstream state after a lifecycle operation.

    Attributes:
        workstream_id: Workstream identifier.
        status: Current workstream status after the operation.
        error: Error message if the operation failed, or ``None``.
    """

    workstream_id: str
    status: WorkstreamStatus
    error: Optional[str]

    @classmethod
    def from_runtime(cls, runtime: WorkstreamRuntime) -> WorkstreamRunResult:
        """Build a result snapshot from the current workstream runtime state.

        Args:
            runtime: Workstream runtime to snapshot.

        Returns:
            WorkstreamRunResult: Snapshot of workstream ID, status, and error.
        """
        return cls(
            workstream_id=runtime.spec.id,
            status=runtime.status,
            error=runtime.state.error,
        )

from_runtime(runtime) classmethod

Build a result snapshot from the current workstream runtime state.

Parameters:

Name Type Description Default
runtime WorkstreamRuntime

Workstream runtime to snapshot.

required

Returns:

Name Type Description
WorkstreamRunResult WorkstreamRunResult

Snapshot of workstream ID, status, and error.

Source code in src/agentrelay/workstream/core/runner.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def from_runtime(cls, runtime: WorkstreamRuntime) -> WorkstreamRunResult:
    """Build a result snapshot from the current workstream runtime state.

    Args:
        runtime: Workstream runtime to snapshot.

    Returns:
        WorkstreamRunResult: Snapshot of workstream ID, status, and error.
    """
    return cls(
        workstream_id=runtime.spec.id,
        status=runtime.status,
        error=runtime.state.error,
    )

TaskSummary dataclass

Per-task summary included in integration PR body.

Attributes:

Name Type Description
task_id str

Unique task identifier.

description Optional[str]

Human-readable task description, or None.

pr_url Optional[str]

URL of the task's pull request, or None.

concerns tuple[str, ...]

Design concerns recorded by the agent.

Source code in src/agentrelay/workstream/core/runtime.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@dataclass(frozen=True)
class TaskSummary:
    """Per-task summary included in integration PR body.

    Attributes:
        task_id: Unique task identifier.
        description: Human-readable task description, or ``None``.
        pr_url: URL of the task's pull request, or ``None``.
        concerns: Design concerns recorded by the agent.
    """

    task_id: str
    description: Optional[str] = None
    pr_url: Optional[str] = None
    concerns: tuple[str, ...] = ()

WorkstreamArtifacts dataclass

Outputs and observations produced while advancing a workstream.

Attributes:

Name Type Description
merge_pr_url Optional[str]

URL of the workstream integration PR, or None if absent.

concerns list[str]

List of notable observations or concerns for this lane.

task_summaries list[TaskSummary]

Per-task summaries for integration PR body.

Source code in src/agentrelay/workstream/core/runtime.py
114
115
116
117
118
119
120
121
122
123
124
125
126
@dataclass
class WorkstreamArtifacts:
    """Outputs and observations produced while advancing a workstream.

    Attributes:
        merge_pr_url: URL of the workstream integration PR, or ``None`` if absent.
        concerns: List of notable observations or concerns for this lane.
        task_summaries: Per-task summaries for integration PR body.
    """

    merge_pr_url: Optional[str] = None
    concerns: list[str] = field(default_factory=list)
    task_summaries: list[TaskSummary] = field(default_factory=list)

WorkstreamRuntime dataclass

Mutable runtime envelope for a :class:WorkstreamSpec.

Attributes:

Name Type Description
spec WorkstreamSpec

Immutable workstream specification for this lane.

state WorkstreamState

Mutable lane operational state.

artifacts WorkstreamArtifacts

Mutable lane output artifacts.

Source code in src/agentrelay/workstream/core/runtime.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@dataclass
class WorkstreamRuntime:
    """Mutable runtime envelope for a :class:`WorkstreamSpec`.

    Attributes:
        spec: Immutable workstream specification for this lane.
        state: Mutable lane operational state.
        artifacts: Mutable lane output artifacts.
    """

    spec: WorkstreamSpec
    state: WorkstreamState = field(default_factory=WorkstreamState)
    artifacts: WorkstreamArtifacts = field(default_factory=WorkstreamArtifacts)

    @property
    def status(self) -> WorkstreamStatus:
        """Current workstream status, derived from signal files on disk.

        Falls back to ``PENDING`` if no signal directory has been set,
        unless an error has been recorded (indicating failure before
        provisioning), in which case ``FAILED`` is returned.
        """
        if self.state.signal_dir is None:
            if self.state.error is not None:
                return WorkstreamStatus.FAILED
            return WorkstreamStatus.PENDING
        return _read_status_from_signals(self.state.signal_dir)

    def _write_signal(self, name: str, content: str = "") -> None:
        """Write a signal file to the workstream signal directory."""
        assert self.state.signal_dir is not None, "signal_dir must be set"
        self.state.signal_dir.mkdir(parents=True, exist_ok=True)
        (self.state.signal_dir / name).write_text(content)

    def mark_pending(self) -> None:
        """Write the ``pending`` signal file."""
        self._write_signal("pending")

    def mark_active(self) -> None:
        """Write the ``active`` signal file."""
        self._write_signal("active")

    def mark_merge_ready(self) -> None:
        """Write the ``merge_ready`` signal file."""
        self._write_signal("merge_ready")

    def mark_pr_created(self, pr_url: str) -> None:
        """Write the ``pr_created`` signal file with the PR URL."""
        self._write_signal("pr_created", pr_url)
        self.artifacts.merge_pr_url = pr_url

    def mark_merged(self) -> None:
        """Write the ``merged`` signal file."""
        self._write_signal("merged")

    def mark_failed(self, error: str) -> None:
        """Write the ``failed`` signal file with the error message.

        If ``signal_dir`` is not set (workstream was never prepared),
        only the in-memory error is recorded without writing to disk.
        """
        if self.state.signal_dir is not None:
            self._write_signal("failed", error)
        self.state.error = error

status property

Current workstream status, derived from signal files on disk.

Falls back to PENDING if no signal directory has been set, unless an error has been recorded (indicating failure before provisioning), in which case FAILED is returned.

mark_pending()

Write the pending signal file.

Source code in src/agentrelay/workstream/core/runtime.py
163
164
165
def mark_pending(self) -> None:
    """Write the ``pending`` signal file."""
    self._write_signal("pending")

mark_active()

Write the active signal file.

Source code in src/agentrelay/workstream/core/runtime.py
167
168
169
def mark_active(self) -> None:
    """Write the ``active`` signal file."""
    self._write_signal("active")

mark_merge_ready()

Write the merge_ready signal file.

Source code in src/agentrelay/workstream/core/runtime.py
171
172
173
def mark_merge_ready(self) -> None:
    """Write the ``merge_ready`` signal file."""
    self._write_signal("merge_ready")

mark_pr_created(pr_url)

Write the pr_created signal file with the PR URL.

Source code in src/agentrelay/workstream/core/runtime.py
175
176
177
178
def mark_pr_created(self, pr_url: str) -> None:
    """Write the ``pr_created`` signal file with the PR URL."""
    self._write_signal("pr_created", pr_url)
    self.artifacts.merge_pr_url = pr_url

mark_merged()

Write the merged signal file.

Source code in src/agentrelay/workstream/core/runtime.py
180
181
182
def mark_merged(self) -> None:
    """Write the ``merged`` signal file."""
    self._write_signal("merged")

mark_failed(error)

Write the failed signal file with the error message.

If signal_dir is not set (workstream was never prepared), only the in-memory error is recorded without writing to disk.

Source code in src/agentrelay/workstream/core/runtime.py
184
185
186
187
188
189
190
191
192
def mark_failed(self, error: str) -> None:
    """Write the ``failed`` signal file with the error message.

    If ``signal_dir`` is not set (workstream was never prepared),
    only the in-memory error is recorded without writing to disk.
    """
    if self.state.signal_dir is not None:
        self._write_signal("failed", error)
    self.state.error = error

WorkstreamState dataclass

Mutable operational state of a workstream lane.

Attributes:

Name Type Description
signal_dir Optional[Path]

Path to the workstream signal directory for status files, or None if not provisioned yet.

worktree_path Optional[Path]

Filesystem path to the worktree used for this lane, or None if not provisioned yet.

branch_name Optional[str]

Primary branch name used for this lane, or None until set.

error Optional[str]

Error message if lane execution failed, else None.

Source code in src/agentrelay/workstream/core/runtime.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@dataclass
class WorkstreamState:
    """Mutable operational state of a workstream lane.

    Attributes:
        signal_dir: Path to the workstream signal directory for status files,
            or ``None`` if not provisioned yet.
        worktree_path: Filesystem path to the worktree used for this lane, or
            ``None`` if not provisioned yet.
        branch_name: Primary branch name used for this lane, or ``None`` until set.
        error: Error message if lane execution failed, else ``None``.
    """

    signal_dir: Optional[Path] = None
    worktree_path: Optional[Path] = None
    branch_name: Optional[str] = None
    error: Optional[str] = None

WorkstreamStatus

Bases: str, Enum

Execution state of a workstream during orchestration.

Attributes:

Name Type Description
PENDING

Workstream has not started provisioning/execution.

ACTIVE

Workstream is currently active (for example has running tasks).

MERGE_READY

All tasks completed; workstream is ready for integration.

PR_CREATED

Integration PR has been created, awaiting merge.

MERGED

Integration PR has been merged into the target branch.

FAILED

Workstream execution failed.

Source code in src/agentrelay/workstream/core/runtime.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class WorkstreamStatus(str, Enum):
    """Execution state of a workstream during orchestration.

    Attributes:
        PENDING: Workstream has not started provisioning/execution.
        ACTIVE: Workstream is currently active (for example has running tasks).
        MERGE_READY: All tasks completed; workstream is ready for integration.
        PR_CREATED: Integration PR has been created, awaiting merge.
        MERGED: Integration PR has been merged into the target branch.
        FAILED: Workstream execution failed.
    """

    PENDING = "pending"
    ACTIVE = "active"
    MERGE_READY = "merge_ready"
    PR_CREATED = "pr_created"
    MERGED = "merged"
    FAILED = "failed"

WorkstreamSpec dataclass

Immutable specification for a task workstream.

A workstream models a branch/worktree execution lane that one or more tasks can target. Parent/child workstream relationships are defined at the spec level and validated by graph-building layers.

Attributes:

Name Type Description
id str

Unique workstream identifier within a task graph.

parent_workstream_id Optional[str]

Optional parent workstream ID for hierarchical stream topologies (for example A -> A.1 and A.2).

base_branch str

Branch name used as the base when creating workstream integration branches or worktrees. Defaults to "main".

merge_target_branch str

Branch name the workstream ultimately merges into. Defaults to "main".

Source code in src/agentrelay/workstream/core/workstream.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@dataclass(frozen=True)
class WorkstreamSpec:
    """Immutable specification for a task workstream.

    A workstream models a branch/worktree execution lane that one or more tasks
    can target. Parent/child workstream relationships are defined at the spec
    level and validated by graph-building layers.

    Attributes:
        id: Unique workstream identifier within a task graph.
        parent_workstream_id: Optional parent workstream ID for hierarchical
            stream topologies (for example ``A`` -> ``A.1`` and ``A.2``).
        base_branch: Branch name used as the base when creating workstream
            integration branches or worktrees. Defaults to ``"main"``.
        merge_target_branch: Branch name the workstream ultimately merges into.
            Defaults to ``"main"``.
    """

    id: str
    parent_workstream_id: Optional[str] = None
    base_branch: str = "main"
    merge_target_branch: str = "main"

GhWorkstreamIntegrator dataclass

Create the workstream integration PR via GitHub CLI.

Creates a pull request from the workstream's integration branch into its merge_target_branch. Does NOT merge the PR — that is left for human review (or a future agent-merge step).

Source code in src/agentrelay/workstream/implementations/workstream_integrator.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@dataclass
class GhWorkstreamIntegrator:
    """Create the workstream integration PR via GitHub CLI.

    Creates a pull request from the workstream's integration branch into
    its ``merge_target_branch``. Does NOT merge the PR — that is left
    for human review (or a future agent-merge step).
    """

    repo_path: Path

    def create_integration_pr(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Create a PR from the integration branch to the merge target.

        Args:
            workstream_runtime: Workstream runtime whose integration branch
                should be submitted as a PR.
        """
        spec = workstream_runtime.spec
        branch_name = workstream_runtime.state.branch_name
        assert branch_name is not None, "branch_name must be set before integration"

        body = _build_pr_body(workstream_runtime)

        pr_url = gh.pr_create(
            self.repo_path,
            title=f"Integrate workstream {spec.id}",
            body=body,
            base=spec.merge_target_branch,
            head=branch_name,
        )

        workstream_runtime.mark_pr_created(pr_url)

create_integration_pr(workstream_runtime)

Create a PR from the integration branch to the merge target.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose integration branch should be submitted as a PR.

required
Source code in src/agentrelay/workstream/implementations/workstream_integrator.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def create_integration_pr(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Create a PR from the integration branch to the merge target.

    Args:
        workstream_runtime: Workstream runtime whose integration branch
            should be submitted as a PR.
    """
    spec = workstream_runtime.spec
    branch_name = workstream_runtime.state.branch_name
    assert branch_name is not None, "branch_name must be set before integration"

    body = _build_pr_body(workstream_runtime)

    pr_url = gh.pr_create(
        self.repo_path,
        title=f"Integrate workstream {spec.id}",
        body=body,
        base=spec.merge_target_branch,
        head=branch_name,
    )

    workstream_runtime.mark_pr_created(pr_url)

GitWorkstreamPreparer dataclass

Provision a worktree and integration branch for a workstream lane.

Creates a git worktree rooted at <repo_path>/.worktrees/<graph_name>/<workstream_id> with a new integration branch agentrelay/<graph_name>/<workstream_id>/integration off the workstream's base_branch. Pushes the integration branch to origin so that task PRs can target it.

Source code in src/agentrelay/workstream/implementations/workstream_preparer.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass
class GitWorkstreamPreparer:
    """Provision a worktree and integration branch for a workstream lane.

    Creates a git worktree rooted at
    ``<repo_path>/.worktrees/<graph_name>/<workstream_id>`` with a new
    integration branch
    ``agentrelay/<graph_name>/<workstream_id>/integration`` off the
    workstream's ``base_branch``. Pushes the integration branch to origin
    so that task PRs can target it.
    """

    repo_path: Path
    graph_name: str

    def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Provision worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime to provision.
        """
        spec = workstream_runtime.spec
        branch_name = f"agentrelay/{self.graph_name}/{spec.id}/integration"
        worktree_path = self.repo_path / ".worktrees" / self.graph_name / spec.id

        try:
            git.worktree_add(
                self.repo_path, worktree_path, branch_name, spec.base_branch
            )
            git.push_branch(self.repo_path, branch_name, set_upstream=True)
        except subprocess.CalledProcessError as exc:
            raise _WorkspaceIntegrationError(
                f"Failed to provision workstream {spec.id!r}: {exc}",
            ) from exc

        signal_dir = (
            self.repo_path / ".workflow" / self.graph_name / "workstreams" / spec.id
        )
        workstream_runtime.state.signal_dir = signal_dir
        workstream_runtime.state.worktree_path = worktree_path
        workstream_runtime.state.branch_name = branch_name
        workstream_runtime.mark_pending()

prepare_workstream(workstream_runtime)

Provision worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime to provision.

required
Source code in src/agentrelay/workstream/implementations/workstream_preparer.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def prepare_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Provision worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime to provision.
    """
    spec = workstream_runtime.spec
    branch_name = f"agentrelay/{self.graph_name}/{spec.id}/integration"
    worktree_path = self.repo_path / ".worktrees" / self.graph_name / spec.id

    try:
        git.worktree_add(
            self.repo_path, worktree_path, branch_name, spec.base_branch
        )
        git.push_branch(self.repo_path, branch_name, set_upstream=True)
    except subprocess.CalledProcessError as exc:
        raise _WorkspaceIntegrationError(
            f"Failed to provision workstream {spec.id!r}: {exc}",
        ) from exc

    signal_dir = (
        self.repo_path / ".workflow" / self.graph_name / "workstreams" / spec.id
    )
    workstream_runtime.state.signal_dir = signal_dir
    workstream_runtime.state.worktree_path = worktree_path
    workstream_runtime.state.branch_name = branch_name
    workstream_runtime.mark_pending()

GitWorkstreamTeardown dataclass

Clean up workstream worktree and integration branch.

Performs best-effort cleanup: removes the worktree directory, deletes the local integration branch, and deletes the remote integration branch. Errors during any step are silently caught so that subsequent cleanup steps still execute.

Source code in src/agentrelay/workstream/implementations/workstream_teardown.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass
class GitWorkstreamTeardown:
    """Clean up workstream worktree and integration branch.

    Performs best-effort cleanup: removes the worktree directory, deletes
    the local integration branch, and deletes the remote integration branch.
    Errors during any step are silently caught so that subsequent cleanup
    steps still execute.
    """

    repo_path: Path

    def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
        """Delete worktree and integration branch for this workstream.

        Args:
            workstream_runtime: Workstream runtime whose resources should be
                cleaned up.
        """
        worktree_path = workstream_runtime.state.worktree_path
        branch_name = workstream_runtime.state.branch_name

        if worktree_path is not None:
            try:
                git.worktree_remove(self.repo_path, worktree_path)
            except subprocess.CalledProcessError:
                pass  # Best-effort: worktree may already be gone

        # Skip branch deletion if the integration PR is open and awaiting
        # human merge — deleting the branch would auto-close the PR.
        if workstream_runtime.status == WorkstreamStatus.PR_CREATED:
            return

        if branch_name is not None:
            try:
                git.branch_delete(self.repo_path, branch_name)
            except subprocess.CalledProcessError:
                pass  # Best-effort: branch may already be deleted

            try:
                git.push_delete_branch(self.repo_path, branch_name)
            except subprocess.CalledProcessError:
                pass  # Best-effort: remote branch may already be gone

teardown_workstream(workstream_runtime)

Delete worktree and integration branch for this workstream.

Parameters:

Name Type Description Default
workstream_runtime WorkstreamRuntime

Workstream runtime whose resources should be cleaned up.

required
Source code in src/agentrelay/workstream/implementations/workstream_teardown.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def teardown_workstream(self, workstream_runtime: WorkstreamRuntime) -> None:
    """Delete worktree and integration branch for this workstream.

    Args:
        workstream_runtime: Workstream runtime whose resources should be
            cleaned up.
    """
    worktree_path = workstream_runtime.state.worktree_path
    branch_name = workstream_runtime.state.branch_name

    if worktree_path is not None:
        try:
            git.worktree_remove(self.repo_path, worktree_path)
        except subprocess.CalledProcessError:
            pass  # Best-effort: worktree may already be gone

    # Skip branch deletion if the integration PR is open and awaiting
    # human merge — deleting the branch would auto-close the PR.
    if workstream_runtime.status == WorkstreamStatus.PR_CREATED:
        return

    if branch_name is not None:
        try:
            git.branch_delete(self.repo_path, branch_name)
        except subprocess.CalledProcessError:
            pass  # Best-effort: branch may already be deleted

        try:
            git.push_delete_branch(self.repo_path, branch_name)
        except subprocess.CalledProcessError:
            pass  # Best-effort: remote branch may already be gone