Content Generation Pipeline¶
This example demonstrates how to build a sophisticated content generation workflow with GraphBit, featuring research, writing, editing, and quality assurance stages.
Overview¶
We'll create a multi-agent pipeline that: 1. Researches a given topic 2. Writes initial content based on research 3. Edits for clarity and engagement 4. Reviews for quality and accuracy 5. Formats the final output
Complete Example¶
import graphbit
import os
from typing import Optional
class ContentGenerationPipeline:
def __init__(self, api_key: str, model: str = "gpt-4o-mini"):
"""Initialize the content generation pipeline."""
# Initialize GraphBit
graphbit.init(log_level="info", enable_tracing=True)
# Create LLM configuration
self.llm_config = graphbit.LlmConfig.openai(api_key, model)
# Create executor with custom settings
self.executor = graphbit.Executor(
self.llm_config,
timeout_seconds=300, # 5 minutes
debug=True
)
def create_workflow(self, content_type: str = "article") -> graphbit.Workflow:
"""Create the content generation workflow."""
# Create workflow
workflow = graphbit.Workflow("Content Generation Pipeline")
# Stage 1: Research Agent
researcher = graphbit.Node.agent(
name="Research Specialist",
prompt="""Research the topic: {topic}
Please provide:
1. Key facts and statistics
2. Current trends and developments
3. Expert opinions or quotes
4. Relevant examples or case studies
5. Important considerations or nuances
Focus on accuracy and credibility. Cite sources where possible.
Format as structured research notes.
""",
agent_id="researcher"
)
# Stage 2: Content Writer
writer = graphbit.Node.agent(
name="Content Writer",
prompt="""Write a comprehensive {content_type} about: {topic}
Based on this research: {research_data}
Requirements:
- Target length: {target_length} words
- Tone: {tone}
- Audience: {audience}
- Include relevant examples and data from research
- Use engaging headlines and subheadings
- Ensure logical flow and structure
Create compelling, informative content that captures reader attention.
""",
agent_id="writer"
)
# Stage 3: Editor
editor = graphbit.Node.agent(
name="Content Editor",
prompt="""Edit and improve the following {content_type}:
{draft_content}
Focus on:
- Clarity and readability
- Flow and structure
- Engaging language
- Grammar and style
- Consistency in tone
- Compelling headlines
Maintain the core message while making it more engaging and polished.
""",
agent_id="editor"
)
# Stage 4: Quality Reviewer
reviewer = graphbit.Node.agent(
name="Quality Reviewer",
prompt="""Review this {content_type} for quality and accuracy:
{edited_content}
Provide feedback on:
1. Factual accuracy
2. Completeness of coverage
3. Logical flow
4. Audience appropriateness
5. Areas for improvement
Rate overall quality (1-10) and provide specific suggestions.
If quality is 7 or above, mark as APPROVED, otherwise mark as NEEDS_REVISION.
""",
agent_id="reviewer"
)
# Stage 5: Quality Gate (Condition Node)
quality_gate = graphbit.Node.condition(
name="Quality Gate",
expression="quality_rating >= 7"
)
# Stage 6: Final Formatter
formatter = graphbit.Node.agent(
name="Content Formatter",
prompt="""Format this content for publication:
{approved_content}
Apply:
- Professional formatting
- Proper headings hierarchy
- Bullet points where appropriate
- Clear paragraph breaks
- Call-to-action if needed
Output clean, publication-ready content.
""",
agent_id="formatter"
)
# Add nodes to workflow
research_id = workflow.add_node(researcher)
writer_id = workflow.add_node(writer)
editor_id = workflow.add_node(editor)
reviewer_id = workflow.add_node(reviewer)
quality_id = workflow.add_node(quality_gate)
formatter_id = workflow.add_node(formatter)
# Connect the workflow: Research → Write → Edit → Review → Quality Check → Format
workflow.connect(research_id, writer_id)
workflow.connect(writer_id, editor_id)
workflow.connect(editor_id, reviewer_id)
workflow.connect(reviewer_id, quality_id)
workflow.connect(quality_id, formatter_id)
# Validate workflow
workflow.validate()
return workflow
def generate_content(
self,
topic: str,
content_type: str = "article",
target_length: int = 800,
tone: str = "professional",
audience: str = "general"
) -> dict:
"""Generate content using the pipeline."""
print(f"🚀 Starting content generation for: {topic}")
# Create workflow
workflow = self.create_workflow(content_type)
# Execute workflow with input context
result = self.executor.execute(workflow)
if result.is_success():
execution_time = result.execution_time_ms()
print(f"Content generation completed in {execution_time}ms")
return {
"status": "success",
"content": result.get_output(),
"execution_time_ms": execution_time,
"workflow_stats": self.executor.get_stats()
}
else:
error_msg = result.get_error()
print(f"Content generation failed: {error_msg}")
return {
"status": "error",
"error": error_msg,
"workflow_stats": self.executor.get_stats()
}
# Example usage
def main():
"""Run the content generation pipeline."""
# Set up API key (you can also set OPENAI_API_KEY environment variable)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("❌ Please set OPENAI_API_KEY environment variable")
return
# Create pipeline
pipeline = ContentGenerationPipeline(api_key, "gpt-4o-mini")
# Generate content
result = pipeline.generate_content(
topic="Sustainable Energy Solutions",
content_type="blog post",
target_length=1200,
tone="informative yet engaging",
audience="technology enthusiasts"
)
# Display results
if result["status"] == "success":
print("\nGenerated Content:")
print("=" * 60)
print(result["content"])
print("\nPerformance Stats:")
print(f"Execution time: {result['execution_time_ms']}ms")
else:
print(f"\nGeneration failed: {result['error']}")
if __name__ == "__main__":
main()
Alternative: Using Different LLM Providers¶
Using Anthropic Claude¶
import graphbit
import os
def create_anthropic_pipeline():
"""Create pipeline using Anthropic Claude."""
graphbit.init()
# Configure for Anthropic
config = graphbit.LlmConfig.anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20241022"
)
executor = graphbit.Executor(config, debug=True)
# Create simple workflow
workflow = graphbit.Workflow("Anthropic Content Generator")
writer = graphbit.Node.agent(
name="Claude Writer",
prompt="""Write a comprehensive article about: {topic}
Requirements:
- Length: {word_count} words
- Tone: {tone}
- Include practical examples
- Structure with clear headings
Create engaging, well-researched content.
""",
agent_id="claude_writer"
)
workflow.add_node(writer)
workflow.validate()
return executor, workflow
# Usage
executor, workflow = create_anthropic_pipeline()
result = executor.execute(workflow)
Using Local Ollama Models¶
import graphbit
def create_ollama_pipeline():
"""Create pipeline using local Ollama models."""
graphbit.init()
# Configure for Ollama (no API key needed)
config = graphbit.LlmConfig.ollama("llama3.2")
executor = graphbit.Executor(
config,
timeout_seconds=180, # Longer timeout for local inference
debug=True
)
workflow = graphbit.Workflow("Local Content Generator")
writer = graphbit.Node.agent(
name="Llama Writer",
prompt="""Write about: {topic}
Keep it concise but informative.
Focus on practical insights.
""",
agent_id="llama_writer"
)
workflow.add_node(writer)
workflow.validate()
return executor, workflow
# Usage
executor, workflow = create_ollama_pipeline()
result = executor.execute(workflow)
Advanced Features¶
High-Performance Content Generation¶
import graphbit
import os
def create_high_performance_pipeline():
"""Create optimized pipeline for high-throughput content generation."""
graphbit.init()
config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini" # Faster model for high throughput
)
# Use high-throughput executor
executor = graphbit.Executor.new_high_throughput(
config,
timeout_seconds=60, # Shorter timeout
debug=False # Disable debug for performance
)
return executor
def create_low_latency_pipeline():
"""Create pipeline optimized for low latency."""
graphbit.init()
config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
# Use low-latency executor
executor = graphbit.Executor.new_low_latency(
config,
timeout_seconds=30, # Very short timeout
debug=False
)
return executor
def create_memory_optimized_pipeline():
"""Create pipeline optimized for memory usage."""
graphbit.init()
config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
# Use memory-optimized executor
executor = graphbit.Executor.new_memory_optimized(
config,
timeout_seconds=120,
debug=False
)
return executor
Async Content Generation¶
import graphbit
import asyncio
import os
async def generate_content_async():
"""Generate content asynchronously."""
graphbit.init()
config = graphbit.LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
executor = graphbit.Executor(config)
# Create workflow
workflow = graphbit.Workflow("Async Content Generator")
writer = graphbit.Node.agent(
name="Async Writer",
prompt="Write a brief article about: {topic}",
agent_id="async_writer"
)
workflow.add_node(writer)
workflow.validate()
# Execute asynchronously
result = await executor.run_async(workflow)
if result.is_success():
print("Async generation completed")
return result.get_output()
else:
print(f"Async generation failed: {result.get_error()}")
return None
# Usage
async def main_async():
content = await generate_content_async()
if content:
print(f"Generated: {content}")
# Run async
# asyncio.run(main_async())
System Information and Health Checks¶
import graphbit
def check_system_health():
"""Check GraphBit system health and capabilities."""
graphbit.init()
# Get system information
system_info = graphbit.get_system_info()
print("System Information:")
for key, value in system_info.items():
print(f" {key}: {value}")
# Perform health check
health_status = graphbit.health_check()
print(f"\nHealth Status:")
for key, value in health_status.items():
print(f" {key}: {value}")
# Check version
version = graphbit.version()
print(f"\nGraphBit Version: {version}")
# Usage
check_system_health()
Key Features¶
Content Pipeline Components¶
- Research Agent: Gathers comprehensive information on topics
- Content Writer: Creates initial drafts based on research
- Editor: Improves clarity, flow, and engagement
- Quality Reviewer: Ensures accuracy and completeness
- Formatter: Prepares content for publication
Reliability Features¶
- Multiple LLM Providers: OpenAI, Anthropic, Ollama support
- Execution Modes: High-throughput, low-latency, memory-optimized
- Error Handling: Comprehensive error reporting and recovery
- Performance Monitoring: Built-in execution statistics
- Health Checks: System health and capability monitoring
This example demonstrates GraphBit's capabilities for building production-ready content generation workflows with reliability, performance, and flexibility.