Skip to content

task_graph

Task graph model and construction surface.

This package exposes the class-based task-graph API used by orchestration code: an immutable TaskGraph plus TaskGraphBuilder for loading and validating graph definitions. Internal indexing/validation helpers live in private submodules (_indexing, _validation).

TaskGraphBuilder

Builder for constructing :class:TaskGraph from validated input specs.

Source code in src/agentrelay/task_graph/builder.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class TaskGraphBuilder:
    """Builder for constructing :class:`TaskGraph` from validated input specs."""

    @classmethod
    def from_yaml(cls, path: Path) -> TaskGraph:
        """Parse YAML and build a validated :class:`TaskGraph`.

        Args:
            path: Path to a graph YAML file.

        Raises:
            ValueError: If YAML cannot be parsed or schema validation fails.

        Returns:
            TaskGraph: Validated immutable task graph.
        """
        try:
            raw = yaml.safe_load(path.read_text())
        except yaml.YAMLError as exc:
            raise ValueError(f"Invalid YAML in {path}: {exc}") from exc
        return cls.from_dict(raw)

    @classmethod
    def from_dict(cls, data: Mapping[str, Any] | Any) -> TaskGraph:
        """Build a validated :class:`TaskGraph` from mapping data.

        Args:
            data: Parsed graph data mapping.

        Raises:
            ValueError: If the graph schema is invalid.

        Returns:
            TaskGraph: Validated immutable task graph.
        """
        graph = _require_mapping(data, "graph")
        _reject_unknown_keys(
            graph,
            "graph",
            {"name", "tasks", "workstreams", "max_workstream_depth"},
        )
        name = _require_non_empty_string(_read_required(graph, "name", "graph"), "name")
        workstreams = _parse_workstreams(
            graph.get("workstreams"),
            "graph.workstreams",
        )
        max_workstream_depth = _parse_optional_positive_int(
            graph.get("max_workstream_depth"),
            "graph.max_workstream_depth",
        )
        if max_workstream_depth is None:
            max_workstream_depth = 1

        task_items = _read_required(graph, "tasks", "graph")
        if not isinstance(task_items, list):
            raise _schema_error("tasks", "must be a list")
        if not task_items:
            raise _schema_error("tasks", "must contain at least one task")

        raw_specs: list[_RawTaskSpec] = []
        seen_ids: set[str] = set()
        for index, task_item in enumerate(task_items):
            path = f"tasks[{index}]"
            raw = _parse_task(task_item, path)
            if raw.id in seen_ids:
                raise _schema_error(path + ".id", f"duplicate task id '{raw.id}'")
            seen_ids.add(raw.id)
            raw_specs.append(raw)

        raw_by_id = {spec.id: spec for spec in raw_specs}
        _validate_dependencies_exist(raw_specs, raw_by_id)
        task_ids = _topological_task_ids(raw_specs)

        built_tasks: dict[str, Task] = {}
        for task_id in task_ids:
            spec = raw_by_id[task_id]
            built_tasks[task_id] = Task(
                id=spec.id,
                role=spec.role,
                description=spec.description,
                paths=spec.paths,
                dependencies=spec.dependency_ids,
                completion_gate=spec.completion_gate,
                max_gate_attempts=spec.max_gate_attempts,
                primary_agent=spec.primary_agent,
                review=spec.review,
                workstream_id=spec.workstream_id,
            )

        return TaskGraph.from_tasks(
            (built_tasks[task_id] for task_id in task_ids),
            name=name,
            workstreams=workstreams,
            max_workstream_depth=max_workstream_depth,
        )

from_yaml(path) classmethod

Parse YAML and build a validated :class:TaskGraph.

Parameters:

Name Type Description Default
path Path

Path to a graph YAML file.

required

Raises:

Type Description
ValueError

If YAML cannot be parsed or schema validation fails.

Returns:

Name Type Description
TaskGraph TaskGraph

Validated immutable task graph.

Source code in src/agentrelay/task_graph/builder.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@classmethod
def from_yaml(cls, path: Path) -> TaskGraph:
    """Parse YAML and build a validated :class:`TaskGraph`.

    Args:
        path: Path to a graph YAML file.

    Raises:
        ValueError: If YAML cannot be parsed or schema validation fails.

    Returns:
        TaskGraph: Validated immutable task graph.
    """
    try:
        raw = yaml.safe_load(path.read_text())
    except yaml.YAMLError as exc:
        raise ValueError(f"Invalid YAML in {path}: {exc}") from exc
    return cls.from_dict(raw)

from_dict(data) classmethod

Build a validated :class:TaskGraph from mapping data.

Parameters:

Name Type Description Default
data Mapping[str, Any] | Any

Parsed graph data mapping.

required

Raises:

Type Description
ValueError

If the graph schema is invalid.

Returns:

Name Type Description
TaskGraph TaskGraph

Validated immutable task graph.

Source code in src/agentrelay/task_graph/builder.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def from_dict(cls, data: Mapping[str, Any] | Any) -> TaskGraph:
    """Build a validated :class:`TaskGraph` from mapping data.

    Args:
        data: Parsed graph data mapping.

    Raises:
        ValueError: If the graph schema is invalid.

    Returns:
        TaskGraph: Validated immutable task graph.
    """
    graph = _require_mapping(data, "graph")
    _reject_unknown_keys(
        graph,
        "graph",
        {"name", "tasks", "workstreams", "max_workstream_depth"},
    )
    name = _require_non_empty_string(_read_required(graph, "name", "graph"), "name")
    workstreams = _parse_workstreams(
        graph.get("workstreams"),
        "graph.workstreams",
    )
    max_workstream_depth = _parse_optional_positive_int(
        graph.get("max_workstream_depth"),
        "graph.max_workstream_depth",
    )
    if max_workstream_depth is None:
        max_workstream_depth = 1

    task_items = _read_required(graph, "tasks", "graph")
    if not isinstance(task_items, list):
        raise _schema_error("tasks", "must be a list")
    if not task_items:
        raise _schema_error("tasks", "must contain at least one task")

    raw_specs: list[_RawTaskSpec] = []
    seen_ids: set[str] = set()
    for index, task_item in enumerate(task_items):
        path = f"tasks[{index}]"
        raw = _parse_task(task_item, path)
        if raw.id in seen_ids:
            raise _schema_error(path + ".id", f"duplicate task id '{raw.id}'")
        seen_ids.add(raw.id)
        raw_specs.append(raw)

    raw_by_id = {spec.id: spec for spec in raw_specs}
    _validate_dependencies_exist(raw_specs, raw_by_id)
    task_ids = _topological_task_ids(raw_specs)

    built_tasks: dict[str, Task] = {}
    for task_id in task_ids:
        spec = raw_by_id[task_id]
        built_tasks[task_id] = Task(
            id=spec.id,
            role=spec.role,
            description=spec.description,
            paths=spec.paths,
            dependencies=spec.dependency_ids,
            completion_gate=spec.completion_gate,
            max_gate_attempts=spec.max_gate_attempts,
            primary_agent=spec.primary_agent,
            review=spec.review,
            workstream_id=spec.workstream_id,
        )

    return TaskGraph.from_tasks(
        (built_tasks[task_id] for task_id in task_ids),
        name=name,
        workstreams=workstreams,
        max_workstream_depth=max_workstream_depth,
    )

TaskGraph dataclass

Immutable DAG of :class:Task specifications keyed by task ID.

Attributes:

Name Type Description
name Optional[str]

Optional human-readable graph name.

max_workstream_depth int

Maximum allowed parent-depth for workstream hierarchies. 1 allows parent -> child only.

_tasks_by_id Mapping[str, Task]

Canonical task objects keyed by task ID.

_dependency_ids Mapping[str, tuple[str, ...]]

Dependency IDs for each task ID.

_dependent_ids Mapping[str, tuple[str, ...]]

Reverse dependency IDs for each task ID.

_topological_order tuple[str, ...]

Dependency-first stable task ordering.

_workstreams_by_id Mapping[str, WorkstreamSpec]

Workstream specs keyed by workstream ID.

_task_ids_by_workstream Mapping[str, tuple[str, ...]]

Task IDs grouped by workstream ID.

_child_workstream_ids Mapping[str, tuple[str, ...]]

Child workstream IDs grouped by parent workstream ID.

Source code in src/agentrelay/task_graph/graph.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
@dataclass(frozen=True)
class TaskGraph:
    """Immutable DAG of :class:`Task` specifications keyed by task ID.

    Attributes:
        name: Optional human-readable graph name.
        max_workstream_depth: Maximum allowed parent-depth for workstream
            hierarchies. ``1`` allows parent -> child only.
        _tasks_by_id: Canonical task objects keyed by task ID.
        _dependency_ids: Dependency IDs for each task ID.
        _dependent_ids: Reverse dependency IDs for each task ID.
        _topological_order: Dependency-first stable task ordering.
        _workstreams_by_id: Workstream specs keyed by workstream ID.
        _task_ids_by_workstream: Task IDs grouped by workstream ID.
        _child_workstream_ids: Child workstream IDs grouped by parent workstream ID.
    """

    name: Optional[str]
    _tasks_by_id: Mapping[str, Task]
    _dependency_ids: Mapping[str, tuple[str, ...]]
    _dependent_ids: Mapping[str, tuple[str, ...]]
    _topological_order: tuple[str, ...]
    max_workstream_depth: int
    _workstreams_by_id: Mapping[str, WorkstreamSpec]
    _task_ids_by_workstream: Mapping[str, tuple[str, ...]]
    _child_workstream_ids: Mapping[str, tuple[str, ...]]

    def __init__(
        self,
        tasks_by_id: Mapping[str, Task],
        name: Optional[str] = None,
        workstreams_by_id: Optional[Mapping[str, WorkstreamSpec]] = None,
        max_workstream_depth: int = 1,
    ):
        """Initialize and validate an immutable task graph.

        Args:
            tasks_by_id: Mapping of task IDs to canonical :class:`Task` objects.
                Each key must match its corresponding ``Task.id``.
            name: Optional human-readable name for this graph.
            workstreams_by_id: Optional mapping of workstream IDs to
                :class:`WorkstreamSpec`. If omitted, a default single workstream
                ``"default"`` is assumed.
            max_workstream_depth: Maximum parent-depth allowed for workstream
                hierarchies. ``1`` allows parent -> child only.

        Raises:
            ValueError: If the graph is empty, contains mismatched key/ID pairs,
                contains conflicting task definitions for a shared task ID, contains
                invalid dependency declarations, contains dependency cycles, or
                contains invalid workstream declarations.
        """
        if max_workstream_depth < 1:
            raise ValueError("max_workstream_depth must be >= 1.")

        if not tasks_by_id:
            raise ValueError("TaskGraph requires at least one task.")

        canonical = dict(tasks_by_id)
        for key, task in canonical.items():
            if key != task.id:
                raise ValueError(
                    f"TaskGraph key '{key}' does not match task.id '{task.id}'."
                )

        dependency_ids = _build_dependency_ids(canonical)
        _validate_dependencies_exist(canonical, dependency_ids)
        dependent_ids = _build_dependent_ids(canonical, dependency_ids)
        topo = _topological_order_or_raise(dependency_ids, dependent_ids)
        workstreams = _normalize_workstreams(workstreams_by_id)
        _validate_task_workstream_ids(canonical, workstreams)
        _validate_workstream_parent_ids_exist(workstreams)
        _validate_workstream_hierarchy_acyclic(workstreams)
        _validate_workstream_max_depth(workstreams, max_depth=max_workstream_depth)
        task_ids_by_workstream = _build_task_ids_by_workstream(
            canonical, topo, workstreams
        )
        child_workstream_ids = _build_child_workstream_ids(workstreams)

        object.__setattr__(self, "name", name)
        object.__setattr__(self, "_tasks_by_id", MappingProxyType(canonical))
        object.__setattr__(self, "_dependency_ids", MappingProxyType(dependency_ids))
        object.__setattr__(self, "_dependent_ids", MappingProxyType(dependent_ids))
        object.__setattr__(self, "_topological_order", topo)
        object.__setattr__(self, "max_workstream_depth", max_workstream_depth)
        object.__setattr__(self, "_workstreams_by_id", MappingProxyType(workstreams))
        object.__setattr__(
            self,
            "_task_ids_by_workstream",
            MappingProxyType(task_ids_by_workstream),
        )
        object.__setattr__(
            self,
            "_child_workstream_ids",
            MappingProxyType(child_workstream_ids),
        )

    @classmethod
    def from_tasks(
        cls,
        tasks: Iterable[Task],
        name: Optional[str] = None,
        workstreams: Optional[Iterable[WorkstreamSpec]] = None,
        max_workstream_depth: int = 1,
    ) -> "TaskGraph":
        """Build a :class:`TaskGraph` from an iterable of tasks.

        Args:
            tasks: Iterable of canonical graph tasks.
            name: Optional human-readable name for this graph.
            workstreams: Optional iterable of workstream specifications.
            max_workstream_depth: Maximum parent-depth allowed for workstream
                hierarchies. ``1`` allows parent -> child only.

        Raises:
            ValueError: If duplicate task IDs or duplicate workstream IDs are
                provided in input iterables.

        Returns:
            TaskGraph: A validated immutable task graph.
        """
        tasks_by_id: dict[str, Task] = {}
        for task in tasks:
            if task.id in tasks_by_id:
                raise ValueError(f"Duplicate task id '{task.id}' in input tasks.")
            tasks_by_id[task.id] = task
        workstreams_by_id: Optional[dict[str, WorkstreamSpec]] = None
        if workstreams is not None:
            workstreams_by_id = {}
            for workstream in workstreams:
                if workstream.id in workstreams_by_id:
                    raise ValueError(
                        f"Duplicate workstream id '{workstream.id}' in input workstreams."
                    )
                workstreams_by_id[workstream.id] = workstream
        return cls(
            tasks_by_id=tasks_by_id,
            name=name,
            workstreams_by_id=workstreams_by_id,
            max_workstream_depth=max_workstream_depth,
        )

    def task(self, task_id: str) -> Task:
        """Return the task specification for a task ID.

        Args:
            task_id: Task identifier to retrieve.

        Raises:
            KeyError: If ``task_id`` is not present in this graph.

        Returns:
            Task: The immutable task specification.
        """
        self._require_task_id(task_id)
        return self._tasks_by_id[task_id]

    def task_ids(self) -> tuple[str, ...]:
        """Return all task IDs in dependency-first topological order.

        Returns:
            tuple[str, ...]: All task IDs in stable topological order.
        """
        return self._topological_order

    def dependency_ids(self, task_id: str) -> tuple[str, ...]:
        """Return dependency IDs for a task in declared order.

        Args:
            task_id: Task identifier to inspect.

        Raises:
            KeyError: If ``task_id`` is not present in this graph.

        Returns:
            tuple[str, ...]: Dependency task IDs for ``task_id``.
        """
        self._require_task_id(task_id)
        return self._dependency_ids[task_id]

    def dependent_ids(self, task_id: str) -> tuple[str, ...]:
        """Return task IDs that depend on a task.

        Args:
            task_id: Task identifier to inspect.

        Raises:
            KeyError: If ``task_id`` is not present in this graph.

        Returns:
            tuple[str, ...]: Dependent task IDs in stable sorted order.
        """
        self._require_task_id(task_id)
        return self._dependent_ids[task_id]

    def roots(self) -> tuple[str, ...]:
        """Return IDs of tasks with no dependencies.

        Returns:
            tuple[str, ...]: Root task IDs in stable topological order.
        """
        return tuple(
            task_id
            for task_id in self._topological_order
            if not self._dependency_ids[task_id]
        )

    def leaves(self) -> tuple[str, ...]:
        """Return IDs of tasks with no dependents.

        Returns:
            tuple[str, ...]: Leaf task IDs in stable topological order.
        """
        return tuple(
            task_id
            for task_id in self._topological_order
            if not self._dependent_ids[task_id]
        )

    def topological_order(self) -> tuple[str, ...]:
        """Return dependency-first stable ordering of task IDs.

        Returns:
            tuple[str, ...]: Task IDs in stable topological order.
        """
        return self._topological_order

    def ready_ids(
        self,
        completed_ids: Iterable[str],
        running_ids: Iterable[str] = (),
    ) -> tuple[str, ...]:
        """Return runnable task IDs from pure set inputs.

        A task is ready when:
        - it is not in ``completed_ids``
        - it is not in ``running_ids``
        - every dependency ID is in ``completed_ids``

        Args:
            completed_ids: IDs currently considered completed by the caller.
            running_ids: IDs currently considered in progress by the caller.

        Raises:
            ValueError: If ``completed_ids`` or ``running_ids`` contains an
                unknown task ID.

        Returns:
            tuple[str, ...]: Runnable task IDs in stable topological order.
        """
        completed = set(completed_ids)
        running = set(running_ids)
        _validate_known_ids(completed, self._tasks_by_id, "completed_ids")
        _validate_known_ids(running, self._tasks_by_id, "running_ids")

        blocked = completed | running
        ready: list[str] = []
        for task_id in self._topological_order:
            if task_id in blocked:
                continue
            deps = self._dependency_ids[task_id]
            if all(dep_id in completed for dep_id in deps):
                ready.append(task_id)
        return tuple(ready)

    def workstream_ids(self) -> tuple[str, ...]:
        """Return all workstream IDs in stable sorted order.

        Returns:
            tuple[str, ...]: All workstream IDs in sorted order.
        """
        return tuple(sorted(self._workstreams_by_id))

    def workstream(self, workstream_id: str) -> WorkstreamSpec:
        """Return workstream specification for a workstream ID.

        Args:
            workstream_id: Workstream identifier to retrieve.

        Raises:
            KeyError: If ``workstream_id`` is not present in this graph.

        Returns:
            WorkstreamSpec: Immutable workstream specification.
        """
        self._require_workstream_id(workstream_id)
        return self._workstreams_by_id[workstream_id]

    def tasks_in_workstream(self, workstream_id: str) -> tuple[str, ...]:
        """Return task IDs in a workstream in graph topological order.

        Args:
            workstream_id: Workstream identifier to inspect.

        Raises:
            KeyError: If ``workstream_id`` is not present in this graph.

        Returns:
            tuple[str, ...]: Task IDs in the given workstream.
        """
        self._require_workstream_id(workstream_id)
        return self._task_ids_by_workstream[workstream_id]

    def child_workstream_ids(self, workstream_id: str) -> tuple[str, ...]:
        """Return child workstream IDs for a parent workstream.

        Args:
            workstream_id: Parent workstream identifier.

        Raises:
            KeyError: If ``workstream_id`` is not present in this graph.

        Returns:
            tuple[str, ...]: Child workstream IDs in stable sorted order.
        """
        self._require_workstream_id(workstream_id)
        return self._child_workstream_ids[workstream_id]

    def _require_task_id(self, task_id: str) -> None:
        """Assert that a task ID exists in this graph.

        Args:
            task_id: Task identifier to validate.

        Raises:
            KeyError: If ``task_id`` is not present in this graph.
        """
        if task_id not in self._tasks_by_id:
            raise KeyError(f"Unknown task id '{task_id}'.")

    def _require_workstream_id(self, workstream_id: str) -> None:
        """Assert that a workstream ID exists in this graph.

        Args:
            workstream_id: Workstream identifier to validate.

        Raises:
            KeyError: If ``workstream_id`` is not present in this graph.
        """
        if workstream_id not in self._workstreams_by_id:
            raise KeyError(f"Unknown workstream id '{workstream_id}'.")

__init__(tasks_by_id, name=None, workstreams_by_id=None, max_workstream_depth=1)

Initialize and validate an immutable task graph.

Parameters:

Name Type Description Default
tasks_by_id Mapping[str, Task]

Mapping of task IDs to canonical :class:Task objects. Each key must match its corresponding Task.id.

required
name Optional[str]

Optional human-readable name for this graph.

None
workstreams_by_id Optional[Mapping[str, WorkstreamSpec]]

Optional mapping of workstream IDs to :class:WorkstreamSpec. If omitted, a default single workstream "default" is assumed.

None
max_workstream_depth int

Maximum parent-depth allowed for workstream hierarchies. 1 allows parent -> child only.

1

Raises:

Type Description
ValueError

If the graph is empty, contains mismatched key/ID pairs, contains conflicting task definitions for a shared task ID, contains invalid dependency declarations, contains dependency cycles, or contains invalid workstream declarations.

Source code in src/agentrelay/task_graph/graph.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(
    self,
    tasks_by_id: Mapping[str, Task],
    name: Optional[str] = None,
    workstreams_by_id: Optional[Mapping[str, WorkstreamSpec]] = None,
    max_workstream_depth: int = 1,
):
    """Initialize and validate an immutable task graph.

    Args:
        tasks_by_id: Mapping of task IDs to canonical :class:`Task` objects.
            Each key must match its corresponding ``Task.id``.
        name: Optional human-readable name for this graph.
        workstreams_by_id: Optional mapping of workstream IDs to
            :class:`WorkstreamSpec`. If omitted, a default single workstream
            ``"default"`` is assumed.
        max_workstream_depth: Maximum parent-depth allowed for workstream
            hierarchies. ``1`` allows parent -> child only.

    Raises:
        ValueError: If the graph is empty, contains mismatched key/ID pairs,
            contains conflicting task definitions for a shared task ID, contains
            invalid dependency declarations, contains dependency cycles, or
            contains invalid workstream declarations.
    """
    if max_workstream_depth < 1:
        raise ValueError("max_workstream_depth must be >= 1.")

    if not tasks_by_id:
        raise ValueError("TaskGraph requires at least one task.")

    canonical = dict(tasks_by_id)
    for key, task in canonical.items():
        if key != task.id:
            raise ValueError(
                f"TaskGraph key '{key}' does not match task.id '{task.id}'."
            )

    dependency_ids = _build_dependency_ids(canonical)
    _validate_dependencies_exist(canonical, dependency_ids)
    dependent_ids = _build_dependent_ids(canonical, dependency_ids)
    topo = _topological_order_or_raise(dependency_ids, dependent_ids)
    workstreams = _normalize_workstreams(workstreams_by_id)
    _validate_task_workstream_ids(canonical, workstreams)
    _validate_workstream_parent_ids_exist(workstreams)
    _validate_workstream_hierarchy_acyclic(workstreams)
    _validate_workstream_max_depth(workstreams, max_depth=max_workstream_depth)
    task_ids_by_workstream = _build_task_ids_by_workstream(
        canonical, topo, workstreams
    )
    child_workstream_ids = _build_child_workstream_ids(workstreams)

    object.__setattr__(self, "name", name)
    object.__setattr__(self, "_tasks_by_id", MappingProxyType(canonical))
    object.__setattr__(self, "_dependency_ids", MappingProxyType(dependency_ids))
    object.__setattr__(self, "_dependent_ids", MappingProxyType(dependent_ids))
    object.__setattr__(self, "_topological_order", topo)
    object.__setattr__(self, "max_workstream_depth", max_workstream_depth)
    object.__setattr__(self, "_workstreams_by_id", MappingProxyType(workstreams))
    object.__setattr__(
        self,
        "_task_ids_by_workstream",
        MappingProxyType(task_ids_by_workstream),
    )
    object.__setattr__(
        self,
        "_child_workstream_ids",
        MappingProxyType(child_workstream_ids),
    )

from_tasks(tasks, name=None, workstreams=None, max_workstream_depth=1) classmethod

Build a :class:TaskGraph from an iterable of tasks.

Parameters:

Name Type Description Default
tasks Iterable[Task]

Iterable of canonical graph tasks.

required
name Optional[str]

Optional human-readable name for this graph.

None
workstreams Optional[Iterable[WorkstreamSpec]]

Optional iterable of workstream specifications.

None
max_workstream_depth int

Maximum parent-depth allowed for workstream hierarchies. 1 allows parent -> child only.

1

Raises:

Type Description
ValueError

If duplicate task IDs or duplicate workstream IDs are provided in input iterables.

Returns:

Name Type Description
TaskGraph 'TaskGraph'

A validated immutable task graph.

Source code in src/agentrelay/task_graph/graph.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
@classmethod
def from_tasks(
    cls,
    tasks: Iterable[Task],
    name: Optional[str] = None,
    workstreams: Optional[Iterable[WorkstreamSpec]] = None,
    max_workstream_depth: int = 1,
) -> "TaskGraph":
    """Build a :class:`TaskGraph` from an iterable of tasks.

    Args:
        tasks: Iterable of canonical graph tasks.
        name: Optional human-readable name for this graph.
        workstreams: Optional iterable of workstream specifications.
        max_workstream_depth: Maximum parent-depth allowed for workstream
            hierarchies. ``1`` allows parent -> child only.

    Raises:
        ValueError: If duplicate task IDs or duplicate workstream IDs are
            provided in input iterables.

    Returns:
        TaskGraph: A validated immutable task graph.
    """
    tasks_by_id: dict[str, Task] = {}
    for task in tasks:
        if task.id in tasks_by_id:
            raise ValueError(f"Duplicate task id '{task.id}' in input tasks.")
        tasks_by_id[task.id] = task
    workstreams_by_id: Optional[dict[str, WorkstreamSpec]] = None
    if workstreams is not None:
        workstreams_by_id = {}
        for workstream in workstreams:
            if workstream.id in workstreams_by_id:
                raise ValueError(
                    f"Duplicate workstream id '{workstream.id}' in input workstreams."
                )
            workstreams_by_id[workstream.id] = workstream
    return cls(
        tasks_by_id=tasks_by_id,
        name=name,
        workstreams_by_id=workstreams_by_id,
        max_workstream_depth=max_workstream_depth,
    )

task(task_id)

Return the task specification for a task ID.

Parameters:

Name Type Description Default
task_id str

Task identifier to retrieve.

required

Raises:

Type Description
KeyError

If task_id is not present in this graph.

Returns:

Name Type Description
Task Task

The immutable task specification.

Source code in src/agentrelay/task_graph/graph.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def task(self, task_id: str) -> Task:
    """Return the task specification for a task ID.

    Args:
        task_id: Task identifier to retrieve.

    Raises:
        KeyError: If ``task_id`` is not present in this graph.

    Returns:
        Task: The immutable task specification.
    """
    self._require_task_id(task_id)
    return self._tasks_by_id[task_id]

task_ids()

Return all task IDs in dependency-first topological order.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: All task IDs in stable topological order.

Source code in src/agentrelay/task_graph/graph.py
209
210
211
212
213
214
215
def task_ids(self) -> tuple[str, ...]:
    """Return all task IDs in dependency-first topological order.

    Returns:
        tuple[str, ...]: All task IDs in stable topological order.
    """
    return self._topological_order

dependency_ids(task_id)

Return dependency IDs for a task in declared order.

Parameters:

Name Type Description Default
task_id str

Task identifier to inspect.

required

Raises:

Type Description
KeyError

If task_id is not present in this graph.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Dependency task IDs for task_id.

Source code in src/agentrelay/task_graph/graph.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def dependency_ids(self, task_id: str) -> tuple[str, ...]:
    """Return dependency IDs for a task in declared order.

    Args:
        task_id: Task identifier to inspect.

    Raises:
        KeyError: If ``task_id`` is not present in this graph.

    Returns:
        tuple[str, ...]: Dependency task IDs for ``task_id``.
    """
    self._require_task_id(task_id)
    return self._dependency_ids[task_id]

dependent_ids(task_id)

Return task IDs that depend on a task.

Parameters:

Name Type Description Default
task_id str

Task identifier to inspect.

required

Raises:

Type Description
KeyError

If task_id is not present in this graph.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Dependent task IDs in stable sorted order.

Source code in src/agentrelay/task_graph/graph.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def dependent_ids(self, task_id: str) -> tuple[str, ...]:
    """Return task IDs that depend on a task.

    Args:
        task_id: Task identifier to inspect.

    Raises:
        KeyError: If ``task_id`` is not present in this graph.

    Returns:
        tuple[str, ...]: Dependent task IDs in stable sorted order.
    """
    self._require_task_id(task_id)
    return self._dependent_ids[task_id]

roots()

Return IDs of tasks with no dependencies.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Root task IDs in stable topological order.

Source code in src/agentrelay/task_graph/graph.py
247
248
249
250
251
252
253
254
255
256
257
def roots(self) -> tuple[str, ...]:
    """Return IDs of tasks with no dependencies.

    Returns:
        tuple[str, ...]: Root task IDs in stable topological order.
    """
    return tuple(
        task_id
        for task_id in self._topological_order
        if not self._dependency_ids[task_id]
    )

leaves()

Return IDs of tasks with no dependents.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Leaf task IDs in stable topological order.

Source code in src/agentrelay/task_graph/graph.py
259
260
261
262
263
264
265
266
267
268
269
def leaves(self) -> tuple[str, ...]:
    """Return IDs of tasks with no dependents.

    Returns:
        tuple[str, ...]: Leaf task IDs in stable topological order.
    """
    return tuple(
        task_id
        for task_id in self._topological_order
        if not self._dependent_ids[task_id]
    )

topological_order()

Return dependency-first stable ordering of task IDs.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Task IDs in stable topological order.

Source code in src/agentrelay/task_graph/graph.py
271
272
273
274
275
276
277
def topological_order(self) -> tuple[str, ...]:
    """Return dependency-first stable ordering of task IDs.

    Returns:
        tuple[str, ...]: Task IDs in stable topological order.
    """
    return self._topological_order

ready_ids(completed_ids, running_ids=())

Return runnable task IDs from pure set inputs.

A task is ready when: - it is not in completed_ids - it is not in running_ids - every dependency ID is in completed_ids

Parameters:

Name Type Description Default
completed_ids Iterable[str]

IDs currently considered completed by the caller.

required
running_ids Iterable[str]

IDs currently considered in progress by the caller.

()

Raises:

Type Description
ValueError

If completed_ids or running_ids contains an unknown task ID.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Runnable task IDs in stable topological order.

Source code in src/agentrelay/task_graph/graph.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def ready_ids(
    self,
    completed_ids: Iterable[str],
    running_ids: Iterable[str] = (),
) -> tuple[str, ...]:
    """Return runnable task IDs from pure set inputs.

    A task is ready when:
    - it is not in ``completed_ids``
    - it is not in ``running_ids``
    - every dependency ID is in ``completed_ids``

    Args:
        completed_ids: IDs currently considered completed by the caller.
        running_ids: IDs currently considered in progress by the caller.

    Raises:
        ValueError: If ``completed_ids`` or ``running_ids`` contains an
            unknown task ID.

    Returns:
        tuple[str, ...]: Runnable task IDs in stable topological order.
    """
    completed = set(completed_ids)
    running = set(running_ids)
    _validate_known_ids(completed, self._tasks_by_id, "completed_ids")
    _validate_known_ids(running, self._tasks_by_id, "running_ids")

    blocked = completed | running
    ready: list[str] = []
    for task_id in self._topological_order:
        if task_id in blocked:
            continue
        deps = self._dependency_ids[task_id]
        if all(dep_id in completed for dep_id in deps):
            ready.append(task_id)
    return tuple(ready)

workstream_ids()

Return all workstream IDs in stable sorted order.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: All workstream IDs in sorted order.

Source code in src/agentrelay/task_graph/graph.py
317
318
319
320
321
322
323
def workstream_ids(self) -> tuple[str, ...]:
    """Return all workstream IDs in stable sorted order.

    Returns:
        tuple[str, ...]: All workstream IDs in sorted order.
    """
    return tuple(sorted(self._workstreams_by_id))

workstream(workstream_id)

Return workstream specification for a workstream ID.

Parameters:

Name Type Description Default
workstream_id str

Workstream identifier to retrieve.

required

Raises:

Type Description
KeyError

If workstream_id is not present in this graph.

Returns:

Name Type Description
WorkstreamSpec WorkstreamSpec

Immutable workstream specification.

Source code in src/agentrelay/task_graph/graph.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def workstream(self, workstream_id: str) -> WorkstreamSpec:
    """Return workstream specification for a workstream ID.

    Args:
        workstream_id: Workstream identifier to retrieve.

    Raises:
        KeyError: If ``workstream_id`` is not present in this graph.

    Returns:
        WorkstreamSpec: Immutable workstream specification.
    """
    self._require_workstream_id(workstream_id)
    return self._workstreams_by_id[workstream_id]

tasks_in_workstream(workstream_id)

Return task IDs in a workstream in graph topological order.

Parameters:

Name Type Description Default
workstream_id str

Workstream identifier to inspect.

required

Raises:

Type Description
KeyError

If workstream_id is not present in this graph.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Task IDs in the given workstream.

Source code in src/agentrelay/task_graph/graph.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def tasks_in_workstream(self, workstream_id: str) -> tuple[str, ...]:
    """Return task IDs in a workstream in graph topological order.

    Args:
        workstream_id: Workstream identifier to inspect.

    Raises:
        KeyError: If ``workstream_id`` is not present in this graph.

    Returns:
        tuple[str, ...]: Task IDs in the given workstream.
    """
    self._require_workstream_id(workstream_id)
    return self._task_ids_by_workstream[workstream_id]

child_workstream_ids(workstream_id)

Return child workstream IDs for a parent workstream.

Parameters:

Name Type Description Default
workstream_id str

Parent workstream identifier.

required

Raises:

Type Description
KeyError

If workstream_id is not present in this graph.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Child workstream IDs in stable sorted order.

Source code in src/agentrelay/task_graph/graph.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def child_workstream_ids(self, workstream_id: str) -> tuple[str, ...]:
    """Return child workstream IDs for a parent workstream.

    Args:
        workstream_id: Parent workstream identifier.

    Raises:
        KeyError: If ``workstream_id`` is not present in this graph.

    Returns:
        tuple[str, ...]: Child workstream IDs in stable sorted order.
    """
    self._require_workstream_id(workstream_id)
    return self._child_workstream_ids[workstream_id]

agentrelay.task_graph._indexing

Index-building and ordering helpers for TaskGraph.

This module contains pure functions that build task/workstream indexes and compute topological order for agentrelay.task_graph.TaskGraph.

build_dependency_ids(tasks_by_id)

Build dependency ID index and validate per-task dependency declarations.

Parameters:

Name Type Description Default
tasks_by_id Mapping[str, Task]

Canonical mapping of graph task IDs to tasks.

required

Raises:

Type Description
ValueError

If a task depends on itself or repeats dependency IDs.

Returns:

Type Description
dict[str, tuple[str, ...]]

dict[str, tuple[str, ...]]: Dependency IDs keyed by task ID.

Source code in src/agentrelay/task_graph/_indexing.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def build_dependency_ids(
    tasks_by_id: Mapping[str, Task],
) -> dict[str, tuple[str, ...]]:
    """Build dependency ID index and validate per-task dependency declarations.

    Args:
        tasks_by_id: Canonical mapping of graph task IDs to tasks.

    Raises:
        ValueError: If a task depends on itself or repeats dependency IDs.

    Returns:
        dict[str, tuple[str, ...]]: Dependency IDs keyed by task ID.
    """
    dependency_ids: dict[str, tuple[str, ...]] = {}
    for task_id, task in tasks_by_id.items():
        dep_ids = task.dependencies
        if task_id in dep_ids:
            raise ValueError(f"Task '{task_id}' has a self-dependency.")
        if len(dep_ids) != len(set(dep_ids)):
            raise ValueError(f"Task '{task_id}' has duplicate dependency IDs.")
        dependency_ids[task_id] = dep_ids
    return dependency_ids

build_dependent_ids(tasks_by_id, dependency_ids)

Build reverse dependency index.

Parameters:

Name Type Description Default
tasks_by_id Mapping[str, Task]

Canonical mapping of graph task IDs to tasks.

required
dependency_ids Mapping[str, tuple[str, ...]]

Dependency ID index keyed by task ID.

required

Returns:

Type Description
dict[str, tuple[str, ...]]

dict[str, tuple[str, ...]]: Dependent IDs keyed by task ID.

Source code in src/agentrelay/task_graph/_indexing.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def build_dependent_ids(
    tasks_by_id: Mapping[str, Task],
    dependency_ids: Mapping[str, tuple[str, ...]],
) -> dict[str, tuple[str, ...]]:
    """Build reverse dependency index.

    Args:
        tasks_by_id: Canonical mapping of graph task IDs to tasks.
        dependency_ids: Dependency ID index keyed by task ID.

    Returns:
        dict[str, tuple[str, ...]]: Dependent IDs keyed by task ID.
    """
    dependents: dict[str, list[str]] = {task_id: [] for task_id in tasks_by_id}
    for task_id, dep_ids in dependency_ids.items():
        for dep_id in dep_ids:
            dependents[dep_id].append(task_id)
    return {task_id: tuple(sorted(ids)) for task_id, ids in dependents.items()}

topological_order_or_raise(dependency_ids, dependent_ids)

Compute stable topological order or raise on cyclic graphs.

Parameters:

Name Type Description Default
dependency_ids Mapping[str, tuple[str, ...]]

Dependency ID index keyed by task ID.

required
dependent_ids Mapping[str, tuple[str, ...]]

Reverse dependency ID index keyed by task ID.

required

Raises:

Type Description
ValueError

If the graph contains one or more cycles.

Returns:

Type Description
tuple[str, ...]

tuple[str, ...]: Stable dependency-first topological order.

Source code in src/agentrelay/task_graph/_indexing.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def topological_order_or_raise(
    dependency_ids: Mapping[str, tuple[str, ...]],
    dependent_ids: Mapping[str, tuple[str, ...]],
) -> tuple[str, ...]:
    """Compute stable topological order or raise on cyclic graphs.

    Args:
        dependency_ids: Dependency ID index keyed by task ID.
        dependent_ids: Reverse dependency ID index keyed by task ID.

    Raises:
        ValueError: If the graph contains one or more cycles.

    Returns:
        tuple[str, ...]: Stable dependency-first topological order.
    """
    in_degree = {task_id: len(dep_ids) for task_id, dep_ids in dependency_ids.items()}
    queue = sorted(task_id for task_id, degree in in_degree.items() if degree == 0)
    ordered: list[str] = []

    while queue:
        task_id = queue.pop(0)
        ordered.append(task_id)
        for dependent_id in dependent_ids[task_id]:
            in_degree[dependent_id] -= 1
            if in_degree[dependent_id] == 0:
                insort(queue, dependent_id)

    if len(ordered) == len(dependency_ids):
        return tuple(ordered)

    cycle = _find_cycle(dependency_ids)
    if cycle:
        raise ValueError(f"Task graph contains a cycle: {' -> '.join(cycle)}")
    raise ValueError("Task graph contains one or more cycles.")

build_task_ids_by_workstream(tasks_by_id, topological_order, workstreams_by_id)

Build a topological-order task ID index for each workstream.

Parameters:

Name Type Description Default
tasks_by_id Mapping[str, Task]

Canonical mapping of graph task IDs to tasks.

required
topological_order tuple[str, ...]

Graph topological task ID order.

required
workstreams_by_id Mapping[str, WorkstreamSpec]

Canonical mapping of workstream IDs to specs.

required

Returns:

Type Description
dict[str, tuple[str, ...]]

dict[str, tuple[str, ...]]: Task IDs keyed by workstream ID.

Source code in src/agentrelay/task_graph/_indexing.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def build_task_ids_by_workstream(
    tasks_by_id: Mapping[str, Task],
    topological_order: tuple[str, ...],
    workstreams_by_id: Mapping[str, WorkstreamSpec],
) -> dict[str, tuple[str, ...]]:
    """Build a topological-order task ID index for each workstream.

    Args:
        tasks_by_id: Canonical mapping of graph task IDs to tasks.
        topological_order: Graph topological task ID order.
        workstreams_by_id: Canonical mapping of workstream IDs to specs.

    Returns:
        dict[str, tuple[str, ...]]: Task IDs keyed by workstream ID.
    """
    grouped: dict[str, list[str]] = {
        workstream_id: [] for workstream_id in workstreams_by_id
    }
    for task_id in topological_order:
        grouped[tasks_by_id[task_id].workstream_id].append(task_id)
    return {
        workstream_id: tuple(task_ids) for workstream_id, task_ids in grouped.items()
    }

build_child_workstream_ids(workstreams_by_id)

Build an index of child workstream IDs for each workstream.

Parameters:

Name Type Description Default
workstreams_by_id Mapping[str, WorkstreamSpec]

Canonical mapping of workstream IDs to specs.

required

Returns:

Type Description
dict[str, tuple[str, ...]]

dict[str, tuple[str, ...]]: Child IDs keyed by parent workstream ID.

Source code in src/agentrelay/task_graph/_indexing.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def build_child_workstream_ids(
    workstreams_by_id: Mapping[str, WorkstreamSpec],
) -> dict[str, tuple[str, ...]]:
    """Build an index of child workstream IDs for each workstream.

    Args:
        workstreams_by_id: Canonical mapping of workstream IDs to specs.

    Returns:
        dict[str, tuple[str, ...]]: Child IDs keyed by parent workstream ID.
    """
    children: dict[str, list[str]] = {
        workstream_id: [] for workstream_id in workstreams_by_id
    }
    for workstream in workstreams_by_id.values():
        parent_id = workstream.parent_workstream_id
        if parent_id is not None:
            children[parent_id].append(workstream.id)
    return {
        workstream_id: tuple(sorted(child_ids))
        for workstream_id, child_ids in children.items()
    }

agentrelay.task_graph._validation

Validation helpers for TaskGraph construction and query inputs.

This module contains pure validation and normalization functions used by agentrelay.task_graph.TaskGraph. Keeping these helpers separate allows the TaskGraph type to stay focused on immutable graph state and query APIs.

normalize_workstreams(workstreams_by_id)

Normalize input workstream mapping with compatibility defaults.

Parameters:

Name Type Description Default
workstreams_by_id Optional[Mapping[str, WorkstreamSpec]]

Optional input mapping of workstreams.

required

Raises:

Type Description
ValueError

If workstream mapping keys mismatch WorkstreamSpec.id.

Returns:

Type Description
dict[str, WorkstreamSpec]

dict[str, WorkstreamSpec]: Canonical workstream mapping.

Source code in src/agentrelay/task_graph/_validation.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def normalize_workstreams(
    workstreams_by_id: Optional[Mapping[str, WorkstreamSpec]],
) -> dict[str, WorkstreamSpec]:
    """Normalize input workstream mapping with compatibility defaults.

    Args:
        workstreams_by_id: Optional input mapping of workstreams.

    Raises:
        ValueError: If workstream mapping keys mismatch ``WorkstreamSpec.id``.

    Returns:
        dict[str, WorkstreamSpec]: Canonical workstream mapping.
    """
    if workstreams_by_id is None:
        return {"default": WorkstreamSpec(id="default")}

    canonical = dict(workstreams_by_id)
    for key, spec in canonical.items():
        if key != spec.id:
            raise ValueError(
                f"TaskGraph workstream key '{key}' does not match "
                f"WorkstreamSpec.id '{spec.id}'."
            )
    return canonical

validate_task_workstream_ids(tasks_by_id, workstreams_by_id)

Validate that every task references a known workstream ID.

Parameters:

Name Type Description Default
tasks_by_id Mapping[str, Task]

Canonical mapping of graph task IDs to tasks.

required
workstreams_by_id Mapping[str, WorkstreamSpec]

Canonical mapping of workstream IDs to specs.

required

Raises:

Type Description
ValueError

If one or more task workstream IDs are unknown.

Source code in src/agentrelay/task_graph/_validation.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def validate_task_workstream_ids(
    tasks_by_id: Mapping[str, Task],
    workstreams_by_id: Mapping[str, WorkstreamSpec],
) -> None:
    """Validate that every task references a known workstream ID.

    Args:
        tasks_by_id: Canonical mapping of graph task IDs to tasks.
        workstreams_by_id: Canonical mapping of workstream IDs to specs.

    Raises:
        ValueError: If one or more task workstream IDs are unknown.
    """
    known = set(workstreams_by_id)
    unknown = {
        task.workstream_id
        for task in tasks_by_id.values()
        if task.workstream_id not in known
    }
    if unknown:
        unknown_str = ", ".join(sorted(unknown))
        raise ValueError(
            f"Unknown workstream id(s) referenced by tasks: {unknown_str}."
        )

validate_workstream_parent_ids_exist(workstreams_by_id)

Validate that each parent workstream reference exists.

Parameters:

Name Type Description Default
workstreams_by_id Mapping[str, WorkstreamSpec]

Canonical mapping of workstream IDs to specs.

required

Raises:

Type Description
ValueError

If any parent_workstream_id is unknown.

Source code in src/agentrelay/task_graph/_validation.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def validate_workstream_parent_ids_exist(
    workstreams_by_id: Mapping[str, WorkstreamSpec],
) -> None:
    """Validate that each parent workstream reference exists.

    Args:
        workstreams_by_id: Canonical mapping of workstream IDs to specs.

    Raises:
        ValueError: If any ``parent_workstream_id`` is unknown.
    """
    known = set(workstreams_by_id)
    for workstream in workstreams_by_id.values():
        parent_id = workstream.parent_workstream_id
        if parent_id is not None and parent_id not in known:
            raise ValueError(
                f"Workstream '{workstream.id}' references unknown parent_workstream_id "
                f"'{parent_id}'."
            )

validate_workstream_hierarchy_acyclic(workstreams_by_id)

Validate that the workstream parent hierarchy has no cycles.

Parameters:

Name Type Description Default
workstreams_by_id Mapping[str, WorkstreamSpec]

Canonical mapping of workstream IDs to specs.

required

Raises:

Type Description
ValueError

If a parent cycle exists.

Source code in src/agentrelay/task_graph/_validation.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def validate_workstream_hierarchy_acyclic(
    workstreams_by_id: Mapping[str, WorkstreamSpec],
) -> None:
    """Validate that the workstream parent hierarchy has no cycles.

    Args:
        workstreams_by_id: Canonical mapping of workstream IDs to specs.

    Raises:
        ValueError: If a parent cycle exists.
    """
    done: set[str] = set()
    for workstream_id in sorted(workstreams_by_id):
        if workstream_id in done:
            continue
        path: list[str] = []
        index_by_id: dict[str, int] = {}
        current: Optional[str] = workstream_id

        while current is not None:
            if current in index_by_id:
                start = index_by_id[current]
                cycle = path[start:] + [current]
                raise ValueError(
                    "Workstream hierarchy contains a cycle: " f"{' -> '.join(cycle)}"
                )
            if current in done:
                break
            index_by_id[current] = len(path)
            path.append(current)
            current = workstreams_by_id[current].parent_workstream_id

        done.update(path)

validate_workstream_max_depth(workstreams_by_id, max_depth)

Validate that workstream hierarchy depth does not exceed max_depth.

Parameters:

Name Type Description Default
workstreams_by_id Mapping[str, WorkstreamSpec]

Canonical mapping of workstream IDs to specs.

required
max_depth int

Maximum allowed ancestry depth.

required

Raises:

Type Description
ValueError

If any workstream has ancestry depth greater than max_depth.

Source code in src/agentrelay/task_graph/_validation.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def validate_workstream_max_depth(
    workstreams_by_id: Mapping[str, WorkstreamSpec],
    max_depth: int,
) -> None:
    """Validate that workstream hierarchy depth does not exceed ``max_depth``.

    Args:
        workstreams_by_id: Canonical mapping of workstream IDs to specs.
        max_depth: Maximum allowed ancestry depth.

    Raises:
        ValueError: If any workstream has ancestry depth greater than ``max_depth``.
    """
    for workstream in workstreams_by_id.values():
        depth = 0
        current = workstream.parent_workstream_id
        while current is not None:
            depth += 1
            if depth > max_depth:
                raise ValueError(
                    f"Workstream '{workstream.id}' exceeds maximum supported depth "
                    f"of {max_depth}."
                )
            current = workstreams_by_id[current].parent_workstream_id

validate_dependencies_exist(tasks_by_id, dependency_ids)

Validate that every dependency ID exists as a graph task.

Parameters:

Name Type Description Default
tasks_by_id Mapping[str, Task]

Canonical mapping of graph task IDs to tasks.

required
dependency_ids Mapping[str, tuple[str, ...]]

Dependency ID index keyed by task ID.

required

Raises:

Type Description
ValueError

If any dependency ID is unknown.

Source code in src/agentrelay/task_graph/_validation.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def validate_dependencies_exist(
    tasks_by_id: Mapping[str, Task],
    dependency_ids: Mapping[str, tuple[str, ...]],
) -> None:
    """Validate that every dependency ID exists as a graph task.

    Args:
        tasks_by_id: Canonical mapping of graph task IDs to tasks.
        dependency_ids: Dependency ID index keyed by task ID.

    Raises:
        ValueError: If any dependency ID is unknown.
    """
    all_task_ids = set(tasks_by_id)
    missing: set[str] = set()
    for dep_ids in dependency_ids.values():
        for dep_id in dep_ids:
            if dep_id not in all_task_ids:
                missing.add(dep_id)
    if missing:
        missing_str = ", ".join(sorted(missing))
        raise ValueError(f"Unknown dependency id(s): {missing_str}.")

validate_known_ids(ids, tasks_by_id, source_name)

Validate that an input ID set only references known graph tasks.

Parameters:

Name Type Description Default
ids set[str]

Input task IDs to validate.

required
tasks_by_id Mapping[str, Task]

Canonical mapping of graph task IDs to tasks.

required
source_name str

Human-readable source label for error reporting.

required

Raises:

Type Description
ValueError

If ids contains unknown task IDs.

Source code in src/agentrelay/task_graph/_validation.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def validate_known_ids(
    ids: set[str],
    tasks_by_id: Mapping[str, Task],
    source_name: str,
) -> None:
    """Validate that an input ID set only references known graph tasks.

    Args:
        ids: Input task IDs to validate.
        tasks_by_id: Canonical mapping of graph task IDs to tasks.
        source_name: Human-readable source label for error reporting.

    Raises:
        ValueError: If ``ids`` contains unknown task IDs.
    """
    unknown = sorted(task_id for task_id in ids if task_id not in tasks_by_id)
    if unknown:
        unknown_str = ", ".join(unknown)
        raise ValueError(f"{source_name} contains unknown task id(s): {unknown_str}.")