Skip to main content

Workflow Patterns

PraisonAI provides four powerful workflow patterns that can be combined to create complex, production-ready workflows.

Quick Comparison

PatternPurposeUse When
route()Decision-based branchingOutput determines next steps
parallel()Concurrent executionIndependent tasks can run together
loop()Iterate over dataProcessing lists, CSV files
repeat()Repeat until conditionIterative improvement

Import

Copy
from praisonaiagents import Workflow, WorkflowContext, StepResult
# Or use Pipeline (alias for Workflow)
from praisonaiagents import Pipeline
from praisonaiagents.workflows import route, parallel, loop, repeat
Pipeline and Workflow are the same class. Use whichever term you prefer!

Pattern Overview

1. Routing (Decision Branching)

Route to different steps based on previous output:
Copy
workflow = Workflow(steps=[
    classifier,
    route({
        "approve": [approve_handler],
        "reject": [reject_handler],
        "default": [fallback]
    })
])
📖 Full Documentation →

2. Parallel (Concurrent Execution)

Execute multiple steps at the same time:
Copy
workflow = Workflow(steps=[
    parallel([research_a, research_b, research_c]),
    aggregator  # Combines results
])
📖 Full Documentation →

3. Loop (Iterate Over Data)

Process each item in a list or file:
Copy
# From list
workflow = Workflow(
    steps=[loop(processor, over="items")],
    variables={"items": ["a", "b", "c"]}
)

# From CSV
workflow = Workflow(steps=[
    loop(processor, from_csv="data.csv")
])
📖 Full Documentation →

4. Repeat (Evaluator-Optimizer)

Repeat until a condition is met:
Copy
workflow = Workflow(steps=[
    repeat(
        generator,
        until=lambda ctx: "done" in ctx.previous_result,
        max_iterations=5
    )
])
📖 Full Documentation →

Combining Patterns

Patterns can be combined for complex workflows:
Copy
workflow = Workflow(steps=[
    # Step 1: Parallel research from multiple sources
    parallel([
        research_market,
        research_competitors,
        research_customers
    ]),
    
    # Step 2: Route based on findings
    analyze_findings,
    route({
        "positive": [expand_analysis],
        "negative": [summarize_concerns],
        "default": [standard_report]
    }),
    
    # Step 3: Process each recommendation
    loop(process_recommendation, over="recommendations"),
    
    # Step 4: Refine until quality threshold
    repeat(
        refine_output,
        until=meets_quality_threshold,
        max_iterations=3
    ),
    
    # Step 5: Final output
    format_final_report
])

Common Workflow Architectures

Orchestrator-Worker

Copy
workflow = Workflow(steps=[
    orchestrator,  # Decides which workers to use
    route({
        "type_a": [worker_a],
        "type_b": [worker_b],
        "type_c": [worker_c]
    }),
    synthesizer  # Combines worker outputs
])

Fan-Out/Fan-In

Copy
workflow = Workflow(steps=[
    splitter,  # Splits work into parts
    parallel([processor_1, processor_2, processor_3]),
    aggregator  # Combines results
])

Batch Processing Pipeline

Copy
workflow = Workflow(steps=[
    loop(validate_item, from_csv="input.csv"),
    loop(transform_item, over="validated_items"),
    loop(load_item, over="transformed_items"),
    generate_report
])

Self-Improving Agent

Copy
workflow = Workflow(steps=[
    initial_generator,
    repeat(
        improve_output,
        until=quality_check,
        max_iterations=5
    ),
    final_polish
])

Pattern Selection Guide

Best Practices

1. Start Simple

Copy
# Start with sequential
workflow = Workflow(steps=[step1, step2, step3])

# Add patterns as needed
workflow = Workflow(steps=[
    step1,
    parallel([step2a, step2b]),  # Optimize with parallel
    step3
])

2. Handle Errors

Copy
def safe_step(ctx: WorkflowContext) -> StepResult:
    try:
        # Your logic
        return StepResult(output="Success")
    except Exception as e:
        return StepResult(output=f"Error: {e}")

3. Use Verbose Mode

Copy
result = workflow.start("input", verbose=True)
# Shows step-by-step progress

4. Track State with Variables

Copy
def my_step(ctx: WorkflowContext) -> StepResult:
    count = ctx.variables.get("count", 0) + 1
    return StepResult(
        output=f"Count: {count}",
        variables={"count": count}
    )

API Reference

FunctionSignature
route()route(routes: Dict[str, List], default: Optional[List] = None)
parallel()parallel(steps: List)
loop()loop(step, over=None, from_csv=None, from_file=None, var_name="item")
repeat()repeat(step, until=None, max_iterations=10)

See Also

Morty Proxy This is a proxified and sanitized view of the page, visit original site.