Skip to content

Workflow Builder

The Workflow Builder in GraphBit provides a simple, direct approach to creating AI agent workflows. Unlike complex builder patterns, GraphBit uses straightforward workflow and node creation methods.

Overview

GraphBit workflows are built using: - Workflow - Container for nodes and connections - Node - Individual processing units with different types - Connections - Links between nodes that define data flow

Basic Usage

Creating a Workflow

import graphbit

# Initialize GraphBit
graphbit.init()

# Create a new workflow
workflow = graphbit.Workflow("My AI Pipeline")

Creating Nodes

GraphBit supports several node types:

Agent Nodes

Execute AI tasks using LLM providers:

# Basic agent node
analyzer = graphbit.Node.agent(
    name="Data Analyzer",
    prompt="Analyze this data for patterns: {input}",
    agent_id="analyzer_001"  # Optional - auto-generated if not provided
)

# Agent with explicit ID
summarizer = graphbit.Node.agent(
    name="Content Summarizer", 
    prompt="Summarize the following content: {analysis}",
    agent_id="summarizer"
)

Transform Nodes

Process and modify data:

# Text transformation
formatter = graphbit.Node.transform(
    name="Text Formatter",
    transformation="uppercase"
)

# Other available transformations
lowercase_node = graphbit.Node.transform("Lowercase", "lowercase")

Condition Nodes

Make decisions based on data evaluation:

# Quality check
quality_gate = graphbit.Node.condition(
    name="Quality Gate",
    expression="quality_score > 0.8 and confidence > 0.7"
)

# Simple condition
threshold_check = graphbit.Node.condition(
    name="Threshold Check", 
    expression="value > 100"
)

Building the Workflow

# Add nodes to workflow and get their IDs
analyzer_id = workflow.add_node(analyzer)
formatter_id = workflow.add_node(formatter)
quality_id = workflow.add_node(quality_gate)
summarizer_id = workflow.add_node(summarizer)

# Connect nodes to define data flow
workflow.connect(analyzer_id, formatter_id)
workflow.connect(formatter_id, quality_id)
workflow.connect(quality_id, summarizer_id)

# Validate workflow structure
workflow.validate()

Workflow Patterns

Sequential Processing

Create linear processing pipelines:

# Create workflow
workflow = graphbit.Workflow("Sequential Pipeline")

# Create processing steps
step1 = graphbit.Node.agent(
    name="Input Processor",
    prompt="Process the initial input: {input}",
    agent_id="step1"
)

step2 = graphbit.Node.agent(
    name="Data Enricher", 
    prompt="Enrich the processed data: {step1_output}",
    agent_id="step2"
)

step3 = graphbit.Node.agent(
    name="Final Formatter",
    prompt="Format the final output: {step2_output}",
    agent_id="step3"
)

# Add nodes and connect sequentially
id1 = workflow.add_node(step1)
id2 = workflow.add_node(step2)
id3 = workflow.add_node(step3)

workflow.connect(id1, id2)
workflow.connect(id2, id3)

workflow.validate()

Parallel Processing Branches

Create workflows with multiple parallel processing paths:

# Create workflow
workflow = graphbit.Workflow("Parallel Analysis")

# Input node
input_processor = graphbit.Node.agent(
    name="Input Processor",
    prompt="Prepare data for analysis: {input}",
    agent_id="input_proc"
)

# Parallel analysis branches
sentiment_analyzer = graphbit.Node.agent(
    name="Sentiment Analyzer",
    prompt="Analyze sentiment of: {processed_data}",
    agent_id="sentiment"
)

topic_analyzer = graphbit.Node.agent(
    name="Topic Analyzer", 
    prompt="Extract key topics from: {processed_data}",
    agent_id="topics"
)

quality_analyzer = graphbit.Node.agent(
    name="Quality Analyzer",
    prompt="Assess content quality of: {processed_data}",
    agent_id="quality"
)

# Result aggregator
aggregator = graphbit.Node.agent(
    name="Result Aggregator",
    prompt="Combine analysis results:\nSentiment: {sentiment_output}\nTopics: {topics_output}\nQuality: {quality_output}",
    agent_id="aggregator"
)

# Build parallel structure
input_id = workflow.add_node(input_processor)
sentiment_id = workflow.add_node(sentiment_analyzer)
topic_id = workflow.add_node(topic_analyzer)
quality_id = workflow.add_node(quality_analyzer)
agg_id = workflow.add_node(aggregator)

# Connect input to all analyzers
workflow.connect(input_id, sentiment_id)
workflow.connect(input_id, topic_id)
workflow.connect(input_id, quality_id)

# Connect all analyzers to aggregator
workflow.connect(sentiment_id, agg_id)
workflow.connect(topic_id, agg_id)
workflow.connect(quality_id, agg_id)

workflow.validate()

Conditional Workflows

Use condition nodes for dynamic routing:

# Create workflow
workflow = graphbit.Workflow("Conditional Processing")

# Content analyzer
analyzer = graphbit.Node.agent(
    name="Content Analyzer",
    prompt="Analyze content quality (score 1-10): {input}",
    agent_id="analyzer"
)

# Quality gate condition
quality_gate = graphbit.Node.condition(
    name="Quality Gate",
    expression="quality_score >= 7"
)

# High quality path
approver = graphbit.Node.agent(
    name="Content Approver",
    prompt="Approve high-quality content: {analyzed_content}",
    agent_id="approver"
)

# Low quality path  
improver = graphbit.Node.agent(
    name="Content Improver",
    prompt="Suggest improvements for: {analyzed_content}",
    agent_id="improver"
)

# Build conditional flow
analyzer_id = workflow.add_node(analyzer)
gate_id = workflow.add_node(quality_gate)
approve_id = workflow.add_node(approver)
improve_id = workflow.add_node(improver)

# Connect analyzer to quality gate
workflow.connect(analyzer_id, gate_id)

# Connect based on conditions (simplified - actual conditional routing 
# is handled by the executor based on the condition node evaluation)
workflow.connect(gate_id, approve_id)
workflow.connect(gate_id, improve_id)

workflow.validate()

Data Transformation Pipelines

Combine transform nodes with AI agents:

# Create workflow
workflow = graphbit.Workflow("Data Transformation Pipeline")

# Text preprocessor
preprocessor = graphbit.Node.transform(
    name="Text Preprocessor",
    transformation="lowercase"
)

# Content processor
processor = graphbit.Node.agent(
    name="Content Processor", 
    prompt="Process the cleaned text: {preprocessed_text}",
    agent_id="processor"
)

# Output formatter
formatter = graphbit.Node.transform(
    name="Output Formatter",
    transformation="uppercase"
)

# Final quality check
validator = graphbit.Node.condition(
    name="Output Validator",
    expression="length > 10"
)

# Build transformation pipeline
prep_id = workflow.add_node(preprocessor)
proc_id = workflow.add_node(processor)
format_id = workflow.add_node(formatter)
valid_id = workflow.add_node(validator)

workflow.connect(prep_id, proc_id)
workflow.connect(proc_id, format_id)
workflow.connect(format_id, valid_id)

workflow.validate()

Node Properties and Management

Accessing Node Information

# Create a node
analyzer = graphbit.Node.agent(
    name="Data Analyzer",
    prompt="Analyze: {input}",
    agent_id="analyzer"
)

# Access node properties
print(f"Node ID: {analyzer.id()}")
print(f"Node Name: {analyzer.name()}")

# Add to workflow
workflow = graphbit.Workflow("Test Workflow")
node_id = workflow.add_node(analyzer)
print(f"Workflow Node ID: {node_id}")

Node Naming Best Practices

Use descriptive, clear names for nodes:

# Good - descriptive and clear
email_analyzer = graphbit.Node.agent(
    name="Email Content Analyzer",
    prompt="Analyze email for spam indicators: {email_content}",
    agent_id="email_spam_detector"
)

# Good - indicates purpose
quality_gate = graphbit.Node.condition(
    name="Content Quality Gate",
    expression="spam_score < 0.3"
)

# Avoid - vague names
node1 = graphbit.Node.agent(
    name="Node1", 
    prompt="Do something: {input}",
    agent_id="n1"
)

Workflow Execution

Setting up Execution

# Create LLM configuration
llm_config = graphbit.LlmConfig.openai(
    api_key="your-openai-key",
    model="gpt-4o-mini"
)

# Create executor
executor = graphbit.Executor(
    config=llm_config,
    timeout_seconds=300,
    debug=False
)

# Execute workflow
result = executor.execute(workflow)

# Check results
if result.is_completed():
    print("Success:", result.output())
elif result.is_failed():
    print("Failed:", result.error())

Asynchronous Execution

import asyncio

async def run_workflow():
    # Create workflow and executor as above
    result = await executor.run_async(workflow)
    return result

# Run asynchronously
result = asyncio.run(run_workflow())

Advanced Patterns

Multi-Stage Processing

def create_multi_stage_workflow():
    workflow = graphbit.Workflow("Multi-Stage Processing")

    # Stage 1: Data Preparation
    cleaner = graphbit.Node.transform("Data Cleaner", "lowercase")
    validator = graphbit.Node.condition("Data Validator", "length > 5")

    # Stage 2: Analysis
    analyzer = graphbit.Node.agent(
        name="Content Analyzer",
        prompt="Analyze cleaned content: {cleaned_data}",
        agent_id="analyzer"
    )

    # Stage 3: Output Processing
    formatter = graphbit.Node.transform("Output Formatter", "uppercase")
    finalizer = graphbit.Node.agent(
        name="Output Finalizer",
        prompt="Finalize the analysis: {formatted_output}",
        agent_id="finalizer"
    )

    # Build multi-stage pipeline
    clean_id = workflow.add_node(cleaner)
    valid_id = workflow.add_node(validator)
    analyze_id = workflow.add_node(analyzer)
    format_id = workflow.add_node(formatter)
    final_id = workflow.add_node(finalizer)

    # Connect stages
    workflow.connect(clean_id, valid_id)
    workflow.connect(valid_id, analyze_id)
    workflow.connect(analyze_id, format_id)
    workflow.connect(format_id, final_id)

    workflow.validate()
    return workflow

Error Handling in Workflows

def create_robust_workflow():
    workflow = graphbit.Workflow("Robust Processing")

    try:
        # Add nodes
        processor = graphbit.Node.agent(
            name="Data Processor",
            prompt="Process: {input}",
            agent_id="processor"
        )

        node_id = workflow.add_node(processor)
        # Validate before execution
        workflow.validate()        
        return workflow

    except Exception as e:
        print(f"Workflow creation failed: {e}")
        return None

Best Practices

1. Workflow Organization

# Organize complex workflows into functions
def create_analysis_workflow():
    workflow = graphbit.Workflow("Content Analysis")    
    # Input processing
    input_node = create_input_processor()    
    # Analysis stages  
    sentiment_node = create_sentiment_analyzer()
    topic_node = create_topic_analyzer()    
    # Output processing
    output_node = create_output_formatter()

    # Build workflow
    input_id = workflow.add_node(input_node)
    sentiment_id = workflow.add_node(sentiment_node)
    topic_id = workflow.add_node(topic_node)
    output_id = workflow.add_node(output_node)    
    # Connect nodes
    workflow.connect(input_id, sentiment_id)
    workflow.connect(input_id, topic_id)
    workflow.connect(sentiment_id, output_id)
    workflow.connect(topic_id, output_id)    
    workflow.validate()
    return workflow

def create_input_processor():
    return graphbit.Node.agent(
        name="Input Processor",
        prompt="Prepare input for analysis: {input}",
        agent_id="input_processor"
    )

2. Validation and Testing

def test_workflow(workflow):
    """Test workflow structure before execution"""
    try:
        workflow.validate()
        print("✅ Workflow validation passed")
        return True
    except Exception as e:
        print(f"❌ Workflow validation failed: {e}")
        return False

# Always validate before execution
workflow = create_analysis_workflow()
if test_workflow(workflow):
    result = executor.execute(workflow)

3. Resource Management

# Choose appropriate executor for your use case
def get_executor_for_workload(workload_type, llm_config):
    if workload_type == "batch":
        return graphbit.Executor.new_high_throughput(
            llm_config=llm_config,
            timeout_seconds=600
        )
    elif workload_type == "realtime":
        return graphbit.Executor.new_low_latency(
            llm_config=llm_config,
            timeout_seconds=30
        )
    elif workload_type == "constrained":
        return graphbit.Executor.new_memory_optimized(
            llm_config=llm_config,
            timeout_seconds=300
        )
    else:
        return graphbit.Executor(
            config=llm_config,
            timeout_seconds=300
        )

Common Patterns Summary

Pattern Use Case Example
Sequential Linear processing Data cleaning → Analysis → Output
Parallel Independent analysis Multiple analyzers running simultaneously
Conditional Dynamic routing Quality gate directing to approval/rejection
Transform Data preprocessing Text cleaning and formatting
Multi-stage Complex pipelines Preparation → Analysis → Finalization

What's Next

  • Learn about LLM Providers for configuring different AI models
  • Explore Agents for advanced agent configuration
  • Check Performance for execution optimization
  • See Validation for workflow validation strategies