Skip to content

run_graph

View module diagram

Composition and CLI entry point for running an agentrelay task graph.

Usage::

python -m agentrelay.run_graph graphs/demo.yaml
python -m agentrelay.run_graph graphs/demo.yaml --dry-run
python -m agentrelay.run_graph graphs/demo.yaml --max-concurrency 4 --model claude-opus-4-6

This module provides:

  • run_graph(): async composition function that wires all components and runs the orchestrator.
  • dry_run(): validates a graph YAML and prints the execution plan.
  • main(): CLI entry point with argparse.

OperationalConfig dataclass

Graph-level operational settings extracted from YAML.

These keys are popped from the raw YAML dict before graph parsing (which rejects unknown keys). None means "not specified in YAML; fall back to CLI or default."

Source code in src/agentrelay/run_graph.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@dataclasses.dataclass(frozen=True)
class OperationalConfig:
    """Graph-level operational settings extracted from YAML.

    These keys are popped from the raw YAML dict before graph parsing
    (which rejects unknown keys).  ``None`` means "not specified in YAML;
    fall back to CLI or default."
    """

    keep_panes: bool = False
    model: Optional[str] = None
    tools: tuple[str, ...] = ()
    anthropic_credential: Optional[str] = None
    fail_fast_on_workstream_error: Optional[bool] = None
    fail_fast_on_internal_error: Optional[bool] = None
    max_concurrency: Optional[int] = None
    max_task_attempts: Optional[int] = None
    teardown_mode: Optional[str] = None

RunOptions dataclass

CLI-level options for run_graph().

Bundles the keyword arguments that callers pass to control orchestration behavior. None for optional fields means "not specified; fall back to graph YAML, then OrchestratorConfig default."

Attributes:

Name Type Description
tmux_session Optional[str]

Override tmux session name, or None for auto-detection.

keep_panes bool

Keep tmux panes open after task completion.

model_override Optional[str]

Override model for all agents.

max_concurrency Optional[int]

Maximum concurrent tasks.

max_task_attempts Optional[int]

Maximum attempts per task.

teardown_mode Optional[str]

When to tear down task resources ("always", "never", or "on_success").

fail_fast_on_workstream_error Optional[bool]

Stop on workstream failure.

fail_fast_on_internal_error Optional[bool]

Stop on internal errors.

credential_provider Optional[CredentialProvider]

Credential provider for sandboxed agents.

anthropic_credential_name Optional[str]

Anthropic credential name from the credentials YAML.

sandbox_override Optional[str]

Override sandbox type for all tasks ("oci" or "none").

verbose bool

Show detailed step-level output.

Source code in src/agentrelay/run_graph.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@dataclasses.dataclass(frozen=True)
class RunOptions:
    """CLI-level options for ``run_graph()``.

    Bundles the keyword arguments that callers pass to control
    orchestration behavior.  ``None`` for optional fields means
    "not specified; fall back to graph YAML, then
    ``OrchestratorConfig`` default."

    Attributes:
        tmux_session: Override tmux session name, or ``None`` for
            auto-detection.
        keep_panes: Keep tmux panes open after task completion.
        model_override: Override model for all agents.
        max_concurrency: Maximum concurrent tasks.
        max_task_attempts: Maximum attempts per task.
        teardown_mode: When to tear down task resources
            (``"always"``, ``"never"``, or ``"on_success"``).
        fail_fast_on_workstream_error: Stop on workstream failure.
        fail_fast_on_internal_error: Stop on internal errors.
        credential_provider: Credential provider for sandboxed agents.
        anthropic_credential_name: Anthropic credential name from
            the credentials YAML.
        sandbox_override: Override sandbox type for all tasks
            (``"oci"`` or ``"none"``).
        verbose: Show detailed step-level output.
    """

    tmux_session: Optional[str] = None
    keep_panes: bool = False
    model_override: Optional[str] = None
    max_concurrency: Optional[int] = None
    max_task_attempts: Optional[int] = None
    teardown_mode: Optional[str] = None
    fail_fast_on_workstream_error: Optional[bool] = None
    fail_fast_on_internal_error: Optional[bool] = None
    credential_provider: Optional[CredentialProvider] = None
    anthropic_credential_name: Optional[str] = None
    sandbox_override: Optional[str] = None
    verbose: bool = False

run_graph(graph_path, repo_path, *, options=RunOptions()) async

Build all components from a graph YAML and run the orchestrator.

This is the top-level composition function that wires :func:build_standard_runner, :func:build_standard_workstream_runner, and :class:Orchestrator together.

Parameters:

Name Type Description Default
graph_path Path

Path to the graph YAML file.

required
repo_path Path

Path to the repository root.

required
options RunOptions

CLI-level run options controlling orchestration behavior.

RunOptions()

Returns:

Name Type Description
OrchestratorResult OrchestratorResult

Terminal orchestration result.

Source code in src/agentrelay/run_graph.py
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
async def run_graph(
    graph_path: Path,
    repo_path: Path,
    *,
    options: RunOptions = RunOptions(),
) -> OrchestratorResult:
    """Build all components from a graph YAML and run the orchestrator.

    This is the top-level composition function that wires
    :func:`build_standard_runner`, :func:`build_standard_workstream_runner`,
    and :class:`Orchestrator` together.

    Args:
        graph_path: Path to the graph YAML file.
        repo_path: Path to the repository root.
        options: CLI-level run options controlling orchestration behavior.

    Returns:
        OrchestratorResult: Terminal orchestration result.
    """
    # Phase 1: Session resolution (before graph loading — needed for overrides).
    session_resolver = build_session_resolver()
    tmux_session = session_resolver.resolve(options.tmux_session)

    # Phase 2: Load graph and resolve config.
    graph, ops = _load_and_prepare_graph(
        graph_path,
        tmux_session=tmux_session,
        model_override=options.model_override,
        sandbox_override=options.sandbox_override,
    )
    config, effective_keep_panes = _resolve_config(ops, options)

    assert graph.name is not None, "Graph must have a name"

    # Phase 3: Session validation + tool validation.
    session_resolver.validate(graph)
    validate_tools(ops.tools)

    # Phase 4: Run context + resume setup.
    ctx = _resolve_run_context(repo_path, graph.name)
    repo_manager = build_run_repo_manager(repo_path, graph.name)
    task_runtimes, workstream_runtimes = _setup_resume(
        ctx,
        graph,
        graph.name,
        config,
        repo_path=repo_path,
        repo_manager=repo_manager,
        model_override=options.model_override,
        sandbox_override=options.sandbox_override,
    )

    # Phase 5: Record run metadata.
    if ctx.is_resume and ctx.prior_run_dir is not None:
        start_head = _read_prior_start_head(ctx.prior_run_dir)
    else:
        start_head = repo_manager.current_head()
    _record_run_start(ctx.run_dir, start_head)
    _copy_graph_yaml(ctx.run_dir, graph_path)

    # Resolve Anthropic credential: CLI flag > graph YAML default > auto-select.
    effective_anthropic_name = (
        options.anthropic_credential_name or ops.anthropic_credential
    )
    anthropic_credential: Optional[AnthropicCredential] = None
    if isinstance(options.credential_provider, FileCredentialProvider):
        anthropic_credential = options.credential_provider.resolve_anthropic(
            effective_anthropic_name
        )

    _record_run_config(
        ctx.run_dir,
        config,
        keep_panes=effective_keep_panes,
        model=options.model_override,
        sandbox=options.sandbox_override,
        anthropic_credential=effective_anthropic_name,
        verbose=options.verbose,
    )

    # Phase 6: Infrastructure setup + orchestrator wiring.
    infra_manager = build_sandbox_infrastructure_manager(graph)
    infra_manager.setup()

    try:
        task_runner = build_standard_runner(
            repo_path=repo_path,
            graph_name=graph.name,
            run_dir=ctx.run_dir,
            graph=graph,
            keep_panes=effective_keep_panes,
            tools=ops.tools,
            credential_provider=options.credential_provider,
            anthropic_credential=anthropic_credential,
        )
        workstream_runner = build_standard_workstream_runner(
            repo_path=repo_path,
            graph_name=graph.name,
            run_dir=ctx.run_dir,
        )
        orchestrator = Orchestrator(
            graph=graph,
            task_runner=task_runner,
            workstream_runner=workstream_runner,
            config=config,
            listener=ConsoleListener(verbose=options.verbose),
            integration_merge_checker=build_integration_merge_checker(repo_path),
            integration_auto_merger=build_integration_auto_merger(repo_path),
        )
        return await orchestrator.run(
            task_runtimes=task_runtimes,
            workstream_runtimes=workstream_runtimes,
        )
    finally:
        infra_manager.teardown()

dry_run(graph_path)

Validate a graph YAML and print the execution plan.

Parameters:

Name Type Description Default
graph_path Path

Path to the graph YAML file.

required
Source code in src/agentrelay/run_graph.py
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
def dry_run(graph_path: Path) -> None:
    """Validate a graph YAML and print the execution plan.

    Args:
        graph_path: Path to the graph YAML file.
    """
    graph, ops = _load_and_prepare_graph(graph_path)

    print(f"Graph: {graph.name}")
    print(f"Tasks: {len(graph.task_ids())}")
    print(f"Workstreams: {len(graph.workstream_ids())}")
    if ops.tools:
        print(f"Tools: {', '.join(ops.tools)}")

    print("\nWorkstreams:")
    for ws_id in graph.workstream_ids():
        ws = graph.workstream(ws_id)
        parent = ws.parent_workstream_id or "(root)"
        auto = "  auto_merge" if ws.auto_merge else ""
        print(
            f"  {ws_id}  parent={parent}  base={ws.base_branch}  target={ws.merge_target_branch}{auto}"
        )

    print("\nExecution order:")
    for task_id in graph.task_ids():
        task = graph.task(task_id)
        deps = graph.dependency_ids(task_id)
        dep_str = ", ".join(deps) if deps else "(none)"
        desc = task.description or ""
        if len(desc) > 60:
            desc = desc[:57] + "..."
        print(f"  {task_id}")
        print(f"    role={task.role.value}  workstream={task.workstream_id}")
        print(f"    deps: {dep_str}")
        if desc:
            print(f"    desc: {desc}")

    roots = graph.roots()
    leaves = graph.leaves()
    print(f"\nRoots (no deps): {', '.join(roots)}")
    print(f"Leaves (no dependents): {', '.join(leaves)}")

main()

CLI entry point for running an agentrelay task graph.

Source code in src/agentrelay/run_graph.py
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
def main() -> None:
    """CLI entry point for running an agentrelay task graph."""
    parser = _build_parser()
    args = parser.parse_args()

    graph_path = Path(args.graph).resolve()
    if not graph_path.is_file():
        print(f"Error: graph file not found: {graph_path}", file=sys.stderr)
        sys.exit(1)

    repo_path = Path.cwd()

    if args.dry_run:
        dry_run(graph_path)
        # Also check for conflicts in the current directory.
        raw = yaml.safe_load(graph_path.read_text())
        graph_name = raw.get("name")
        if graph_name:
            _dry_run_conflict_check(repo_path, graph_name)
        return

    credential_provider: Optional[CredentialProvider] = None
    if args.credentials is not None:
        creds_path = Path(args.credentials).resolve()
        if not creds_path.is_file():
            print(f"Error: credentials file not found: {creds_path}", file=sys.stderr)
            sys.exit(1)
        credential_provider = FileCredentialProvider(creds_path)

    options = RunOptions(
        tmux_session=args.tmux_session,
        keep_panes=args.keep_panes,
        model_override=args.model,
        max_concurrency=args.max_concurrency,
        max_task_attempts=args.max_task_attempts,
        teardown_mode=args.teardown_mode,
        fail_fast_on_workstream_error=args.fail_fast_workstream,
        fail_fast_on_internal_error=args.fail_fast_internal,
        credential_provider=credential_provider,
        anthropic_credential_name=args.anthropic_credential,
        sandbox_override=args.sandbox,
        verbose=args.verbose,
    )

    try:
        result = asyncio.run(
            run_graph(
                graph_path=graph_path,
                repo_path=repo_path,
                options=options,
            )
        )
    except (
        SessionError,
        ToolValidationError,
        ValueError,
        RuntimeError,
    ) as exc:
        print(f"Error: {exc}", file=sys.stderr)
        sys.exit(1)

    _print_result(result)

    if result.outcome != OrchestratorOutcome.SUCCEEDED:
        sys.exit(1)