Dynamic Workflow Creation¶
GraphBit supports dynamic workflow creation, allowing you to build and modify workflows at runtime based on data, conditions, and business logic. This powerful feature enables adaptive workflows that can respond to changing requirements.
Overview¶
Dynamic workflow creation allows you to: - Create workflows that adapt to input data - Generate nodes and connections programmatically - Modify workflow structure based on runtime conditions - Build self-organizing processing pipelines - Implement conditional workflow branches
Basic Dynamic Workflow Creation¶
Simple Dynamic Workflow¶
import graphbit
# Initialize GraphBit
graphbit.init()
def create_dynamic_workflow(input_data):
"""Creates a workflow dynamically based on input data."""
# Analyze input to determine workflow structure
data_type = detect_data_type(input_data)
if data_type == "text":
return create_text_processing_workflow()
elif data_type == "numerical":
return create_numerical_analysis_workflow()
elif data_type == "mixed":
return create_mixed_data_workflow()
else:
return create_generic_workflow()
def detect_data_type(data):
"""Detect the type of input data."""
if isinstance(data, str):
return "text"
elif isinstance(data, (int, float)):
return "numerical"
elif isinstance(data, dict) or isinstance(data, list):
return "mixed"
else:
return "unknown"
def create_text_processing_workflow():
"""Create workflow optimized for text processing."""
workflow = graphbit.Workflow("Text Processing Workflow")
# Text analyzer
analyzer = graphbit.Node.agent(
name="Text Analyzer",
prompt="Analyze this text: {input}",
agent_id="text_analyzer"
)
# Sentiment detector
sentiment = graphbit.Node.agent(
name="Sentiment Detector",
prompt="Determine sentiment of: {analyzed_text}",
agent_id="sentiment_detector"
)
# Build text processing chain
analyzer_id = workflow.add_node(analyzer)
sentiment_id = workflow.add_node(sentiment)
workflow.connect(analyzer_id, sentiment_id)
return workflow
def create_numerical_analysis_workflow():
"""Create workflow optimized for numerical analysis."""
workflow = graphbit.Workflow("Numerical Analysis Workflow")
# Statistical analyzer
stats = graphbit.Node.agent(
name="Statistical Analyzer",
prompt="Perform statistical analysis on: {input}",
agent_id="stats_analyzer"
)
# Trend detector
trends = graphbit.Node.agent(
name="Trend Detector",
prompt="Identify trends in: {stats_results}",
agent_id="trend_detector"
)
# Build numerical analysis chain
stats_id = workflow.add_node(stats)
trends_id = workflow.add_node(trends)
workflow.connect(stats_id, trends_id)
return workflow
def create_mixed_data_workflow():
"""Create workflow for mixed data types."""
workflow = graphbit.Workflow("Mixed Data Workflow")
# Data classifier
classifier = graphbit.Node.agent(
name="Data Classifier",
prompt="Classify this mixed data: {input}",
agent_id="classifier"
)
# Multi-modal processor
processor = graphbit.Node.agent(
name="Multi-Modal Processor",
prompt="Process classified data: {classified_data}",
agent_id="multimodal_processor"
)
# Build mixed data chain
classifier_id = workflow.add_node(classifier)
processor_id = workflow.add_node(processor)
workflow.connect(classifier_id, processor_id)
return workflow
def create_generic_workflow():
"""Create generic workflow for unknown data types."""
workflow = graphbit.Workflow("Generic Workflow")
# Generic processor
processor = graphbit.Node.agent(
name="Generic Processor",
prompt="Process this input: {input}",
agent_id="generic_processor"
)
workflow.add_node(processor)
return workflow
Advanced Dynamic Generation¶
Data-Driven Node Creation¶
def create_data_driven_workflow(schema):
"""Create workflow based on data schema."""
workflow = graphbit.Workflow("Data-Driven Workflow")
node_ids = []
# Create nodes based on schema fields
for field in schema.get("fields", []):
field_type = field.get("type")
field_name = field.get("name")
if field_type == "string":
node = create_text_processing_node(field_name)
elif field_type == "number":
node = create_numerical_processing_node(field_name)
elif field_type == "date":
node = create_date_processing_node(field_name)
else:
node = create_generic_processing_node(field_name)
node_id = workflow.add_node(node)
node_ids.append((field_name, node_id))
# Create aggregator node
aggregator = graphbit.Node.agent(
name="Data Aggregator",
prompt="Combine and analyze these processed fields: {all_results}",
agent_id="aggregator"
)
agg_id = workflow.add_node(aggregator)
# Connect all field processors to aggregator
for field_name, node_id in node_ids:
workflow.connect(node_id, agg_id)
return workflow
def create_text_processing_node(field_name):
"""Create node for text field processing."""
return graphbit.Node.agent(
name=f"{field_name} Text Processor",
prompt=f"Process {field_name} text field: {{{field_name}_input}}",
agent_id=f"{field_name}_text_processor"
)
def create_numerical_processing_node(field_name):
"""Create node for numerical field processing."""
return graphbit.Node.agent(
name=f"{field_name} Numerical Processor",
prompt=f"Analyze {field_name} numerical data: {{{field_name}_input}}",
agent_id=f"{field_name}_num_processor"
)
def create_date_processing_node(field_name):
"""Create node for date field processing."""
return graphbit.Node.agent(
name=f"{field_name} Date Processor",
prompt=f"Analyze {field_name} date patterns: {{{field_name}_input}}",
agent_id=f"{field_name}_date_processor"
)
def create_generic_processing_node(field_name):
"""Create generic processing node."""
return graphbit.Node.agent(
name=f"{field_name} Generic Processor",
prompt=f"Process {field_name} field: {{{field_name}_input}}",
agent_id=f"{field_name}_generic_processor"
)
Conditional Workflow Generation¶
def create_conditional_workflow(requirements):
"""Create workflow with conditional branches."""
workflow = graphbit.Workflow("Conditional Workflow")
# Input processor
input_processor = graphbit.Node.agent(
name="Input Processor",
prompt="Process and analyze input: {input}",
agent_id="input_processor"
)
input_id = workflow.add_node(input_processor)
# Create conditional branches based on requirements
for requirement in requirements:
condition_type = requirement.get("type")
condition_value = requirement.get("value")
if condition_type == "quality_check":
branch = create_quality_branch(condition_value)
elif condition_type == "complexity_check":
branch = create_complexity_branch(condition_value)
elif condition_type == "priority_check":
branch = create_priority_branch(condition_value)
else:
branch = create_default_branch()
# Add branch to workflow
for node in branch["nodes"]:
node_id = workflow.add_node(node)
if branch["condition"]:
# Add condition node
condition_id = workflow.add_node(branch["condition"])
workflow.connect(input_id, condition_id)
workflow.connect(condition_id, node_id)
else:
workflow.connect(input_id, node_id)
return workflow
def create_quality_branch(threshold):
"""Create quality checking branch."""
condition = graphbit.Node.condition(
name="Quality Gate",
expression=f"quality_score > {threshold}"
)
high_quality_processor = graphbit.Node.agent(
name="High Quality Processor",
prompt="Process high-quality input: {input}",
agent_id="high_quality_proc"
)
low_quality_processor = graphbit.Node.agent(
name="Low Quality Processor",
prompt="Enhance and process low-quality input: {input}",
agent_id="low_quality_proc"
)
return {
"condition": condition,
"nodes": [high_quality_processor, low_quality_processor]
}
def create_complexity_branch(complexity_level):
"""Create complexity-based branch."""
condition = graphbit.Node.condition(
name="Complexity Check",
expression=f"complexity_level <= {complexity_level}"
)
simple_processor = graphbit.Node.agent(
name="Simple Processor",
prompt="Quick processing for simple input: {input}",
agent_id="simple_proc"
)
complex_processor = graphbit.Node.agent(
name="Complex Processor",
prompt="Detailed processing for complex input: {input}",
agent_id="complex_proc"
)
return {
"condition": condition,
"nodes": [simple_processor, complex_processor]
}
def create_priority_branch(priority_level):
"""Create priority-based branch."""
condition = graphbit.Node.condition(
name="Priority Check",
expression=f"priority >= {priority_level}"
)
urgent_processor = graphbit.Node.agent(
name="Urgent Processor",
prompt="Fast processing for urgent input: {input}",
agent_id="urgent_proc"
)
standard_processor = graphbit.Node.agent(
name="Standard Processor",
prompt="Standard processing: {input}",
agent_id="standard_proc"
)
return {
"condition": condition,
"nodes": [urgent_processor, standard_processor]
}
def create_default_branch():
"""Create default processing branch."""
default_processor = graphbit.Node.agent(
name="Default Processor",
prompt="Default processing: {input}",
agent_id="default_proc"
)
return {
"condition": None,
"nodes": [default_processor]
}
Runtime Workflow Modification¶
Dynamic Node Addition¶
class DynamicWorkflowBuilder:
"""Builder for dynamic workflow modification."""
def __init__(self, base_workflow=None):
if base_workflow:
self.workflow = base_workflow
else:
self.workflow = graphbit.Workflow("Dynamic Workflow")
self.node_registry = {}
def add_processing_stage(self, stage_type, stage_config):
"""Add a processing stage dynamically."""
if stage_type == "validation":
node = self._create_validation_node(stage_config)
elif stage_type == "transformation":
node = self._create_transformation_node(stage_config)
elif stage_type == "analysis":
node = self._create_analysis_node(stage_config)
elif stage_type == "aggregation":
node = self._create_aggregation_node(stage_config)
else:
node = self._create_generic_node(stage_config)
node_id = self.workflow.add_node(node)
self.node_registry[stage_config.get("name", f"node_{len(self.node_registry)}")] = node_id
return node_id
def connect_stages(self, source_stage, target_stage):
"""Connect two stages dynamically."""
source_id = self.node_registry.get(source_stage)
target_id = self.node_registry.get(target_stage)
if source_id and target_id:
self.workflow.connect(source_id, target_id)
return True
return False
def add_conditional_branch(self, condition_expr, true_stage, false_stage):
"""Add conditional branch to workflow."""
condition = graphbit.Node.condition(
name="Dynamic Condition",
expression=condition_expr
)
condition_id = self.workflow.add_node(condition)
# Connect to true and false branches
if true_stage in self.node_registry:
self.workflow.connect(condition_id, self.node_registry[true_stage])
if false_stage in self.node_registry:
self.workflow.connect(condition_id, self.node_registry[false_stage])
return condition_id
def _create_validation_node(self, config):
"""Create validation node."""
return graphbit.Node.agent(
name=config.get("name", "Validator"),
prompt=f"Validate input according to rules: {config.get('rules', 'standard validation')} - Input: {{input}}",
agent_id=config.get("agent_id", "validator")
)
def _create_transformation_node(self, config):
"""Create transformation node."""
transformation_type = config.get("transformation", "uppercase")
return graphbit.Node.transform(
name=config.get("name", "Transformer"),
transformation=transformation_type
)
def _create_analysis_node(self, config):
"""Create analysis node."""
return graphbit.Node.agent(
name=config.get("name", "Analyzer"),
prompt=f"Analyze input for: {config.get('analysis_type', 'general analysis')} - Input: {{input}}",
agent_id=config.get("agent_id", "analyzer")
)
def _create_aggregation_node(self, config):
"""Create aggregation node."""
return graphbit.Node.agent(
name=config.get("name", "Aggregator"),
prompt=f"Aggregate multiple inputs: {config.get('aggregation_method', 'combine all')} - Inputs: {{inputs}}",
agent_id=config.get("agent_id", "aggregator")
)
def _create_generic_node(self, config):
"""Create generic node."""
return graphbit.Node.agent(
name=config.get("name", "Generic Node"),
prompt=config.get("prompt", "Process input: {input}"),
agent_id=config.get("agent_id", "generic")
)
def get_workflow(self):
"""Get the built workflow."""
return self.workflow
def create_dynamic_pipeline(pipeline_config):
"""Create a dynamic processing pipeline."""
builder = DynamicWorkflowBuilder()
# Add stages from configuration
for stage in pipeline_config.get("stages", []):
builder.add_processing_stage(stage["type"], stage["config"])
# Add connections from configuration
for connection in pipeline_config.get("connections", []):
builder.connect_stages(connection["source"], connection["target"])
# Add conditional branches
for branch in pipeline_config.get("branches", []):
builder.add_conditional_branch(
branch["condition"],
branch["true_stage"],
branch["false_stage"]
)
return builder.get_workflow()
# Example usage
def example_dynamic_pipeline():
"""Example of creating a dynamic pipeline."""
pipeline_config = {
"stages": [
{
"type": "validation",
"config": {
"name": "input_validator",
"rules": "check data completeness and format",
"agent_id": "validator"
}
},
{
"type": "analysis",
"config": {
"name": "content_analyzer",
"analysis_type": "content quality and relevance",
"agent_id": "content_analyzer"
}
},
{
"type": "transformation",
"config": {
"name": "data_transformer",
"transformation": "uppercase"
}
},
{
"type": "aggregation",
"config": {
"name": "result_aggregator",
"aggregation_method": "combine analysis and transformation results",
"agent_id": "aggregator"
}
}
],
"connections": [
{"source": "input_validator", "target": "content_analyzer"},
{"source": "content_analyzer", "target": "data_transformer"},
{"source": "data_transformer", "target": "result_aggregator"}
],
"branches": []
}
workflow = create_dynamic_pipeline(pipeline_config)
return workflow
Adaptive Workflow Patterns¶
Self-Optimizing Workflows¶
class AdaptiveWorkflow:
"""Workflow that adapts based on execution history."""
def __init__(self, name):
self.name = name
self.workflow = graphbit.Workflow(name)
self.execution_history = []
self.performance_metrics = {}
self.optimization_rules = []
def add_optimization_rule(self, condition, action):
"""Add optimization rule."""
self.optimization_rules.append({
"condition": condition,
"action": action
})
def execute_and_adapt(self, executor, input_data):
"""Execute workflow and adapt based on results."""
import time
# Record execution start
start_time = time.time()
# Execute workflow
result = executor.execute(self.workflow)
# Record execution metrics
execution_time = (time.time() - start_time) * 1000
execution_record = {
"timestamp": time.time(),
"execution_time_ms": execution_time,
"success": result.is_completed(),
"input_size": len(str(input_data)) if input_data else 0,
"output_size": len(result.output()) if result.is_completed() else 0
}
self.execution_history.append(execution_record)
# Update performance metrics
self._update_performance_metrics()
# Apply optimization rules
self._apply_optimizations()
return result
def _update_performance_metrics(self):
"""Update performance metrics based on execution history."""
if not self.execution_history:
return
recent_executions = self.execution_history[-10:] # Last 10 executions
self.performance_metrics = {
"average_execution_time": sum(e["execution_time_ms"] for e in recent_executions) / len(recent_executions),
"success_rate": sum(1 for e in recent_executions if e["success"]) / len(recent_executions),
"total_executions": len(self.execution_history),
"throughput": len(recent_executions) / (recent_executions[-1]["timestamp"] - recent_executions[0]["timestamp"]) if len(recent_executions) > 1 else 0
}
def _apply_optimizations(self):
"""Apply optimization rules based on current metrics."""
for rule in self.optimization_rules:
if self._evaluate_condition(rule["condition"]):
self._execute_action(rule["action"])
def _evaluate_condition(self, condition):
"""Evaluate optimization condition."""
metrics = self.performance_metrics
if condition["type"] == "performance_threshold":
metric_value = metrics.get(condition["metric"], 0)
return self._compare_values(metric_value, condition["operator"], condition["threshold"])
elif condition["type"] == "execution_count":
return metrics.get("total_executions", 0) >= condition["count"]
return False
def _compare_values(self, value, operator, threshold):
"""Compare values based on operator."""
if operator == ">":
return value > threshold
elif operator == "<":
return value < threshold
elif operator == ">=":
return value >= threshold
elif operator == "<=":
return value <= threshold
elif operator == "==":
return value == threshold
return False
def _execute_action(self, action):
"""Execute optimization action."""
if action["type"] == "add_caching_layer":
self._add_caching_layer()
elif action["type"] == "add_parallel_processing":
self._add_parallel_processing()
elif action["type"] == "optimize_prompts":
self._optimize_prompts()
def _add_caching_layer(self):
"""Add caching layer to workflow."""
cache_node = graphbit.Node.agent(
name="Cache Manager",
prompt="Check cache for input: {input}. If found, return cached result, otherwise process normally.",
agent_id="cache_manager"
)
# Insert cache node at the beginning
cache_id = self.workflow.add_node(cache_node)
print(f"Added caching layer to workflow {self.name}")
def _add_parallel_processing(self):
"""Add parallel processing capability."""
# Create parallel branch
parallel_processor = graphbit.Node.agent(
name="Parallel Processor",
prompt="Process input in parallel: {input}",
agent_id="parallel_proc"
)
parallel_id = self.workflow.add_node(parallel_processor)
print(f"Added parallel processing to workflow {self.name}")
def _optimize_prompts(self):
"""Optimize prompts for better performance."""
# This would involve modifying existing nodes with optimized prompts
print(f"Optimized prompts for workflow {self.name}")
def create_adaptive_text_processor():
"""Create an adaptive text processing workflow."""
adaptive_workflow = AdaptiveWorkflow("Adaptive Text Processor")
# Build initial workflow
processor = graphbit.Node.agent(
name="Text Processor",
prompt="Process and analyze this text: {input}",
agent_id="text_proc"
)
adaptive_workflow.workflow.add_node(processor)
# Add optimization rules
adaptive_workflow.add_optimization_rule(
condition={
"type": "performance_threshold",
"metric": "average_execution_time",
"operator": ">",
"threshold": 5000 # 5 seconds
},
action={
"type": "add_caching_layer"
}
)
adaptive_workflow.add_optimization_rule(
condition={
"type": "execution_count",
"count": 10
},
action={
"type": "optimize_prompts"
}
)
return adaptive_workflow
Dynamic Workflow Templates¶
Template-Based Generation¶
class WorkflowTemplate:
"""Template for generating similar workflows."""
def __init__(self, template_name):
self.template_name = template_name
self.template_structure = {}
self.parameter_mappings = {}
def define_template(self, structure, parameter_mappings):
"""Define workflow template structure."""
self.template_structure = structure
self.parameter_mappings = parameter_mappings
def instantiate(self, parameters):
"""Create workflow instance from template."""
workflow = graphbit.Workflow(f"{self.template_name}_{parameters.get('instance_id', 'default')}")
node_map = {}
# Create nodes from template
for node_config in self.template_structure.get("nodes", []):
node = self._create_node_from_template(node_config, parameters)
node_id = workflow.add_node(node)
node_map[node_config["id"]] = node_id
# Create connections from template
for connection in self.template_structure.get("connections", []):
source_id = node_map.get(connection["source"])
target_id = node_map.get(connection["target"])
if source_id and target_id:
workflow.connect(source_id, target_id)
return workflow
def _create_node_from_template(self, node_config, parameters):
"""Create node from template configuration."""
node_type = node_config.get("type")
if node_type == "agent":
# Replace template parameters in prompt
prompt = node_config.get("prompt", "")
for param, value in parameters.items():
prompt = prompt.replace(f"${{{param}}}", str(value))
return graphbit.Node.agent(
name=node_config.get("name", "Agent"),
prompt=prompt,
agent_id=node_config.get("agent_id", "agent")
)
elif node_type == "transform":
return graphbit.Node.transform(
name=node_config.get("name", "Transform"),
transformation=node_config.get("transformation", "uppercase")
)
elif node_type == "condition":
# Replace template parameters in expression
expression = node_config.get("expression", "true")
for param, value in parameters.items():
expression = expression.replace(f"${{{param}}}", str(value))
return graphbit.Node.condition(
name=node_config.get("name", "Condition"),
expression=expression
)
# Default to agent node
return graphbit.Node.agent(
name="Default Agent",
prompt="Process input: {input}",
agent_id="default"
)
def create_data_processing_template():
"""Create a template for data processing workflows."""
template = WorkflowTemplate("Data Processing Template")
template_structure = {
"nodes": [
{
"id": "validator",
"type": "agent",
"name": "${domain} Data Validator",
"prompt": "Validate ${domain} data according to ${validation_rules}: {input}",
"agent_id": "validator"
},
{
"id": "processor",
"type": "agent",
"name": "${domain} Processor",
"prompt": "Process ${domain} data using ${processing_method}: {validated_data}",
"agent_id": "processor"
},
{
"id": "quality_check",
"type": "condition",
"name": "Quality Gate",
"expression": "quality_score >= ${quality_threshold}"
},
{
"id": "formatter",
"type": "transform",
"name": "Output Formatter",
"transformation": "${output_format}"
}
],
"connections": [
{"source": "validator", "target": "processor"},
{"source": "processor", "target": "quality_check"},
{"source": "quality_check", "target": "formatter"}
]
}
parameter_mappings = {
"domain": "Application domain (e.g., financial, medical, scientific)",
"validation_rules": "Specific validation rules for the domain",
"processing_method": "Method used for processing data",
"quality_threshold": "Minimum quality score threshold",
"output_format": "Format for output transformation"
}
template.define_template(template_structure, parameter_mappings)
return template
def create_workflows_from_template():
"""Create multiple workflows from template."""
template = create_data_processing_template()
# Financial data processing workflow
financial_workflow = template.instantiate({
"instance_id": "financial",
"domain": "financial",
"validation_rules": "GAAP compliance and data integrity checks",
"processing_method": "financial analysis algorithms",
"quality_threshold": "0.95",
"output_format": "uppercase"
})
# Medical data processing workflow
medical_workflow = template.instantiate({
"instance_id": "medical",
"domain": "medical",
"validation_rules": "HIPAA compliance and medical data standards",
"processing_method": "clinical analysis procedures",
"quality_threshold": "0.98",
"output_format": "lowercase"
})
return {
"financial": financial_workflow,
"medical": medical_workflow
}
Configuration-Driven Workflows¶
JSON-Based Workflow Definition¶
import json
def create_workflow_from_json(json_config):
"""Create workflow from JSON configuration."""
if isinstance(json_config, str):
config = json.loads(json_config)
else:
config = json_config
workflow = graphbit.Workflow(config.get("name", "JSON Workflow"))
node_map = {}
# Create nodes from configuration
for node_config in config.get("nodes", []):
node = _create_node_from_json(node_config)
node_id = workflow.add_node(node)
node_map[node_config["id"]] = node_id
# Create connections from configuration
for connection in config.get("connections", []):
source_id = node_map.get(connection["source"])
target_id = node_map.get(connection["target"])
if source_id and target_id:
workflow.connect(source_id, target_id)
return workflow
def _create_node_from_json(node_config):
"""Create node from JSON configuration."""
node_type = node_config.get("type")
if node_type == "agent":
return graphbit.Node.agent(
name=node_config.get("name", "Agent"),
prompt=node_config.get("prompt", "Process input: {input}"),
agent_id=node_config.get("agent_id", "agent")
)
elif node_type == "transform":
return graphbit.Node.transform(
name=node_config.get("name", "Transform"),
transformation=node_config.get("transformation", "uppercase")
)
elif node_type == "condition":
return graphbit.Node.condition(
name=node_config.get("name", "Condition"),
expression=node_config.get("expression", "true")
)
# Default to agent node
return graphbit.Node.agent(
name="Default Agent",
prompt="Process input: {input}",
agent_id="default"
)
# Example JSON configurations
def get_example_workflow_configs():
"""Get example workflow configurations."""
simple_config = {
"name": "Simple Analysis Workflow",
"nodes": [
{
"id": "analyzer",
"type": "agent",
"name": "Data Analyzer",
"prompt": "Analyze this data: {input}",
"agent_id": "analyzer"
},
{
"id": "formatter",
"type": "transform",
"name": "Output Formatter",
"transformation": "uppercase"
}
],
"connections": [
{"source": "analyzer", "target": "formatter"}
]
}
complex_config = {
"name": "Complex Processing Workflow",
"nodes": [
{
"id": "input_processor",
"type": "agent",
"name": "Input Processor",
"prompt": "Process and prepare input: {input}",
"agent_id": "input_proc"
},
{
"id": "quality_check",
"type": "condition",
"name": "Quality Gate",
"expression": "quality_score > 0.8"
},
{
"id": "high_quality_processor",
"type": "agent",
"name": "High Quality Processor",
"prompt": "Process high-quality data: {processed_input}",
"agent_id": "hq_proc"
},
{
"id": "enhancement_processor",
"type": "agent",
"name": "Enhancement Processor",
"prompt": "Enhance and process lower-quality data: {processed_input}",
"agent_id": "enhancement_proc"
},
{
"id": "aggregator",
"type": "agent",
"name": "Result Aggregator",
"prompt": "Combine processing results: {results}",
"agent_id": "aggregator"
}
],
"connections": [
{"source": "input_processor", "target": "quality_check"},
{"source": "quality_check", "target": "high_quality_processor"},
{"source": "quality_check", "target": "enhancement_processor"},
{"source": "high_quality_processor", "target": "aggregator"},
{"source": "enhancement_processor", "target": "aggregator"}
]
}
return {
"simple": simple_config,
"complex": complex_config
}
Best Practices¶
1. Dynamic Workflow Design Principles¶
def get_dynamic_workflow_best_practices():
"""Get best practices for dynamic workflow creation."""
best_practices = {
"modularity": "Design workflows with modular, reusable components",
"parameterization": "Use parameters and templates for flexibility",
"validation": "Always validate dynamically created workflows",
"performance": "Monitor and optimize dynamic workflow performance",
"maintainability": "Keep dynamic generation logic simple and readable",
"error_handling": "Implement robust error handling for dynamic creation",
"testing": "Thoroughly test dynamic workflows with various inputs"
}
for practice, description in best_practices.items():
print(f"✅ {practice.title()}: {description}")
return best_practices
2. Error Handling and Validation¶
def validate_dynamic_workflow(workflow):
"""Validate dynamically created workflow."""
try:
# Basic validation
workflow.validate()
print("✅ Dynamic workflow validation passed")
return True
except Exception as e:
print(f"❌ Dynamic workflow validation failed: {e}")
return False
def safe_dynamic_workflow_creation(creation_func, *args, **kwargs):
"""Safely create dynamic workflow with error handling."""
try:
workflow = creation_func(*args, **kwargs)
if validate_dynamic_workflow(workflow):
return workflow
else:
raise ValueError("Dynamic workflow validation failed")
except Exception as e:
print(f"Error creating dynamic workflow: {e}")
# Return a simple fallback workflow
fallback_workflow = graphbit.Workflow("Fallback Workflow")
fallback_node = graphbit.Node.agent(
name="Fallback Processor",
prompt="Process input safely: {input}",
agent_id="fallback"
)
fallback_workflow.add_node(fallback_node)
return fallback_workflow
Usage Examples¶
Complete Dynamic Workflow Example¶
def example_complete_dynamic_workflow():
"""Complete example of dynamic workflow creation and execution."""
# Initialize GraphBit
graphbit.init()
# Create dynamic workflow based on input
input_data = {
"type": "mixed",
"content": "Sample text with numerical data: 123, 456",
"requirements": ["quality_check", "fast_processing"]
}
# Create workflow dynamically
workflow = create_dynamic_workflow(input_data)
# Validate the workflow
if validate_dynamic_workflow(workflow):
print("✅ Dynamic workflow created and validated successfully")
# Create executor
llm_config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
executor = graphbit.Executor(llm_config)
# Execute workflow
result = executor.execute(workflow)
if result.is_completed():
print(f"✅ Dynamic workflow executed successfully")
print(f"Output: {result.output()}")
else:
print(f"❌ Dynamic workflow execution failed: {result.error()}")
else:
print("❌ Dynamic workflow validation failed")
if __name__ == "__main__":
example_complete_dynamic_workflow()
What's Next¶
- Learn about Performance optimization for dynamic workflows
- Explore Monitoring for tracking dynamic workflow execution
- Check Validation for comprehensive dynamic workflow testing
- See Workflow Builder for static workflow patterns