Deep Orchestrator
No description available
Ask AI about Deep Orchestrator
Powered by Claude ยท Grounded in docs
I know everything about Deep Orchestrator. Ask me about installation, configuration, usage, or troubleshooting.
0/500
Reviews
Documentation
Deep Orchestrator + Temporal (MCP Agent Server)
Implement a Deep Orchestrator workflow backed by Temporal and exposed as an MCP server.
This gives you durable, observable runs with remote MCP servers (via Streamable HTTP) and a local filesystem server.
โจ What you get
- Temporal-backed durability: pause/resume, retries, visibility UI
- Deep Orchestrator: dynamic agents, replanning, parallelism, budgets, memory
- MCP server surface: run your workflow via MCP tools like
workflows-<Name>-run/get_status/cancel - Remote tools: use existing MCP servers over
streamable_http(KG / metadata / data), keepfilesystemlocal
๐ฆ Directory layout
deep-orchestrator-temporal/
โโ pyproject.toml # or requirements.txt
โโ mcp_agent.config.yaml # MCP + Temporal config (remote servers)
โโ .env # API keys / endpoints
โโ src/
โโ deep_workflow.py # Deep Orchestrator workflow
โโ server.py # Expose app as MCP server (SSE)
โโ worker.py # Temporal worker for the app
โ Prerequisites
- Python 3.11+
- Temporal dev server (or a managed Temporal cluster)
- Accounts/keys for your LLM provider and remote MCP servers
- (Recommended) uv for fast, reproducible Python envs
๐งฐ Install
Option A: uv
uv init deep-orchestrator-temporal -p 3.11
cd deep-orchestrator-temporal
# Add deps
uv add "mcp-agent" "temporalio" "openai" "python-dotenv"
Option B: pip
python -m venv .venv && source .venv/bin/activate
pip install mcp-agent temporalio openai python-dotenv
If you use a different LLM, install that providerโs SDK and set env in
.envaccordingly.
โ๏ธ Configuration
Create mcp_agent.config.yaml in the project root.
All MCP servers are remote via Streamable HTTP, except
filesystemwhich is local via stdio.
execution_engine: temporal
temporal:
address: "localhost:7233" # dev server default
namespace: "default"
task_queue: "deep-data-agent"
openai:
default_model: "gpt-4o-mini"
mcp:
servers:
knowledge-graph:
transport: streamable_http
url: "${KG_URL}" # e.g., https://kg.example.com/mcp
headers:
Authorization: "Bearer ${KG_API_KEY}"
http_timeout_seconds: 60
read_timeout_seconds: 120
terminate_on_close: true
metadata:
transport: streamable_http
url: "${CATALOG_URL}"
headers:
Authorization: "Bearer ${CATALOG_API_KEY}"
http_timeout_seconds: 60
read_timeout_seconds: 120
terminate_on_close: true
data:
transport: streamable_http
url: "${DATA_URL}"
headers:
Authorization: "Bearer ${DATA_API_KEY}"
http_timeout_seconds: 60
read_timeout_seconds: 120
terminate_on_close: true
filesystem:
transport: stdio
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem"]
roots:
- uri: "file://${PWD}/" # must start with file://
name: "project"
Create .env with your secrets:
OPENAI_API_KEY=...
KG_URL=https://kg.example.com/mcp
KG_API_KEY=...
CATALOG_URL=https://metadata.example.com/mcp
CATALOG_API_KEY=...
DATA_URL=https://data.example.com/mcp
DATA_API_KEY=...
You can also configure Anthropic/Cohere/Azure/Google models in
mcp_agent.config.yaml.
๐ง Deep Orchestrator workflow
src/deep_workflow.py
from __future__ import annotations
from mcp_agent.app import MCPApp
from mcp_agent.executor.workflow import Workflow, WorkflowResult
from mcp_agent.workflows.deep_orchestrator.orchestrator import DeepOrchestrator
from mcp_agent.workflows.deep_orchestrator.config import (
DeepOrchestratorConfig, ExecutionConfig, BudgetConfig
)
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.workflows.llm.augmented_llm import RequestParams
# Create the application
app = MCPApp(name="deep_orch_server")
@app.workflow
class DeepDataAgentWorkflow(Workflow[str]):
"""Knowledge-Graph โ Metadata โ Query-Design โ Execute โ Synthesize"""
@app.workflow_run
async def run(self, objective: str) -> WorkflowResult[str]:
# Access the app context (server registry, config, logger, etc.)
ctx = app.context
# Configure Deep Orchestrator (dynamic agents, budgets, parallelism)
cfg = DeepOrchestratorConfig(
name="DeepDataAgent",
available_servers=list(ctx.server_registry.registry.keys()),
execution=ExecutionConfig(
max_iterations=20,
max_replans=3,
max_task_retries=3,
enable_parallel=True,
enable_filesystem=True,
),
budget=BudgetConfig(
max_tokens=120_000,
max_cost=3.00,
max_time_minutes=12,
),
)
orch = DeepOrchestrator(
llm_factory=OpenAIAugmentedLLM,
config=cfg,
context=ctx,
)
# Canonical deep plan for a data agent
message = f"""
Objective: {objective}
Steps:
1) Traverse the domain knowledge graph to identify entities, relationships, constraints, and lineage.
2) Query the metadata catalog for candidate datasets/columns with types, descriptions, sensitivity, and freshness.
3) Design a safe, efficient query (SQL/Graph) with rationale; validate schema, joins, and filters.
4) If PII/full scans or risky joins are detected, pause for human approval before execution.
5) Execute the approved query; stream/paginate; compute quick data profile (row count, null/min/max/distinct).
6) Adapt on failures/quality issues (replan: adjust sources/joins/filters) and synthesize final results.
Output: final answer, executed query, sources used, caveats. Persist learned joins/paths and synonyms.
"""
result = await orch.generate_str(
message=message,
request_params=RequestParams(model="gpt-4o-mini", temperature=0),
)
return WorkflowResult(value=result)
๐ Expose as an MCP server (SSE)
src/server.py
import asyncio
from mcp_agent.server.app_server import create_mcp_server_for_app
from deep_workflow import app
async def main():
# Boot the app (loads config, connects to MCP servers lazily)
async with app.run() as agent_app:
# Create SSE-based MCP server from the app
mcp_server = create_mcp_server_for_app(agent_app)
# Run SSE server (default: localhost:10000; override via env/config if supported)
await mcp_server.run_sse_async()
if __name__ == "__main__":
asyncio.run(main())
This exposes tools such as:
workflows-DeepDataAgentWorkflow-runworkflows-get_statusworkflows-cancelworkflows-resume(if you add signals/human checkpoints)
๐ Run a Temporal worker
src/worker.py
import asyncio
from mcp_agent.executor.temporal import create_temporal_worker_for_app
from deep_workflow import app
async def main():
# Start a Temporal worker bound to this app's workflows
async with create_temporal_worker_for_app(app) as worker:
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
โถ๏ธ Run locally (dev)
- Start Temporal dev server (in a separate terminal):
temporal server start-dev
- Set environment:
cp .env.example .env # if you created one
# export OPENAI_API_KEY=... (or use direnv)
- Install deps (choose uv or pip):
# uv
uv sync
# or pip
pip install -r requirements.txt
- Start Temporal worker:
python -m src.worker
- Start MCP server (SSE):
python -m src.server
- Invoke from an MCP client of your choice:
- Use an MCP Inspector / Claude Desktop / a custom MCP client
- Call the tool:
workflows-DeepDataAgentWorkflow-runwith{"run_parameters": {"objective": "Find customers churn risk by ..." }} - Poll
workflows-get_statususing the returnedrun_id - Optionally
workflows-cancel/workflows-resumeif you use signals
๐ Example objective
Produce a churn-risk summary for Q2:
- Use KG to map entities: customer, subscription, events, tickets.
- Pick trusted datasets and fields from the catalog with types and sensitivity.
- Design SQL to join subscriptions with support tickets and product usage for last 90 days.
- Require WHERE clause on date to avoid full scans; apply safe limits for preview.
- Execute, profile, and synthesize findings with caveats and the final SQL.
๐งช Tips & options
- Human-in-the-loop: before execution, pause for approval when PII or expensive operations are detected.
- Budgets: set
max_tokens,max_cost,max_time_minutesto bound spend. - Parallelism: enable
enable_parallel=Trueto fetch stats/partitions while validating joins. - Memory: persist approved join paths, synonyms, dataset trust levels for faster follow-ups.
- Observability: enable OTEL in config (or env) to trace plans, tool calls, and costs.
- Security: use service-to-service auth for remote MCP servers (
Authorizationheaders), and FQDN allowlists.
๐งฏ Troubleshooting
- Cannot connect to Temporal: verify
temporal.addressand that dev server is running. - Remote MCP 401/403: check
Authorizationheaders & tokens. - Time-outs: raise
http_timeout_secondsorread_timeout_secondsin server config. - Schema/Join validation fails repeatedly: inspect agent memory & planner rationale; seed more trusted datasets.
- Large results: prefer streaming/pagination; return sample + artifact link.
๐ฎ Next steps
- Add policy engine rules (block PII joins without approval, require date filters, etc.).
- Register a result-artifact writer (e.g., to blob storage) for query outputs and decision logs.
- Add router/intent-classifier in front if youโll support non-data tasks too.
- Wire OpenTelemetry collector + dashboards for rich traces.
๐ License
MIT (example). Use your orgโs license as needed.
