Mageflow
A support package for task manager orchestration.
Installation
npx mageflowAsk AI about Mageflow
Powered by Claude Β· Grounded in docs
I know everything about Mageflow. Ask me about installation, configuration, usage, or troubleshooting.
0/500
Reviews
Documentation
MageFlow
Manage Graph Execution Flow - A unified interface for task orchestration across different task managers.
Why MageFlow?
Instead of spreading workflow logic throughout your codebase, MageFlow centralizes task orchestration with a clean, unified API. Switch between task managers (Hatchet, Taskiq, etc.) without rewriting your orchestration code.
Key Features
π Task Chaining - Sequential workflows where tasks depend on previous completions
π Task Swarms - Parallel execution with intelligent coordination
π Callback System - Robust success/error handling
π― Task Signatures - Flexible task definition with validation
β―οΈ Lifecycle Control - Pause, resume, and monitor task execution
πΎ Persistent State - Redis-backed state management with recovery
Installation
pip install mageflow[hatchet] # For Hatchet backend
Quick Setup
import asyncio
import redis
from hatchet_sdk import Hatchet, ClientConfig
import mageflow
# Configure backend and Redis
config = ClientConfig(token="your-hatchet-token")
redis_client = redis.asyncio.from_url("redis://localhost", decode_responses=True)
hatchet_client = Hatchet(config=config)
# Create MageFlow instance
mf = mageflow.Mageflow(hatchet_client, redis_client=redis_client)
Example Usage
Define Tasks
from pydantic import BaseModel
class ProcessData(BaseModel):
data: str
@mf.task(name="process-data", input_validator=ProcessData)
async def process_data(msg: ProcessData):
return {"processed": msg.data}
@mf.task(name="send-notification")
async def send_notification(msg):
print(f"Notification sent: {msg}")
return {"status": "sent"}
Chain Tasks
# Sequential execution
workflow = await mageflow.chain([
process_data_task,
send_notification_task
], name="data-pipeline")
Parallel Swarms
# Parallel execution
swarm = await mageflow.swarm([
process_user_task,
update_cache_task,
send_email_task
], task_name="user-onboarding")
Task Signatures with Callbacks
task_signature = await mageflow.sign(
task_name="process-order",
task_identifiers={"order_id": "12345"},
success_callbacks=[send_confirmation_task],
error_callbacks=[handle_error_task]
)
Use Cases
- Data Pipelines - ETL operations with error handling
- Microservice Coordination - Orchestrate distributed service calls
- Batch Processing - Parallel processing of large datasets
- User Workflows - Multi-step onboarding and registration
- Content Processing - Media processing with multiple stages
MageFlow Viewer
A desktop app for visualizing your workflows as interactive task graphs. See the full docs for details.
Homebrew (macOS):
brew install imaginary-cherry/mageflow/mage-voyance # stable
brew install imaginary-cherry/mageflow/mage-voyance-beta # beta
Direct download: GitHub Releases
Documentation
License
MIT
