Comprehensive AI Pipeline¶
This example demonstrates a complete AI pipeline combining GraphBit's workflow system, embedding capabilities, and LLM integration for building sophisticated AI applications.
Overview¶
We'll build an intelligent document analysis and recommendation system that: 1. Processes documents with semantic understanding 2. Analyzes content using LLM workflows 3. Embeds documents for similarity search 4. Generates intelligent recommendations 5. Monitors system performance and health
Complete System Implementation¶
import graphbit
import os
import json
import asyncio
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import time
@dataclass
class Document:
"""Document data structure."""
id: str
title: str
content: str
category: str
metadata: Dict[str, any]
@dataclass
class AnalysisResult:
"""Analysis result data structure."""
document_id: str
summary: str
key_topics: List[str]
sentiment: str
quality_score: float
recommendations: List[str]
class IntelligentDocumentPipeline:
"""Comprehensive document analysis and recommendation system."""
def __init__(self, openai_api_key: str, anthropic_api_key: Optional[str] = None):
"""Initialize the intelligent document pipeline."""
# Initialize GraphBit
graphbit.init(log_level="info", enable_tracing=True)
# Configure multiple LLM providers
self.llm_configs = {
'openai': graphbit.LlmConfig.openai(openai_api_key, "gpt-4o-mini"),
'openai_fast': graphbit.LlmConfig.openai(openai_api_key, "gpt-4o-mini")
}
if anthropic_api_key:
self.llm_configs['anthropic'] = graphbit.LlmConfig.anthropic(
anthropic_api_key,
"claude-3-5-sonnet-20241022"
)
# Configure embeddings
self.embedding_config = graphbit.EmbeddingConfig.openai(
openai_api_key,
"text-embedding-3-small"
)
self.embedding_client = graphbit.EmbeddingClient(self.embedding_config)
# Create executors for different use cases
self.executors = {
'analysis': graphbit.Executor(
self.llm_configs['openai'],
timeout_seconds=180,
debug=True
),
'batch': graphbit.Executor.new_high_throughput(
self.llm_configs['openai_fast'],
timeout_seconds=120,
debug=False
),
'fast': graphbit.Executor.new_low_latency(
self.llm_configs['openai_fast'],
timeout_seconds=60,
debug=False
)
}
# Document storage
self.documents: List[Document] = []
self.embeddings: List[List[float]] = []
self.analysis_results: Dict[str, AnalysisResult] = {}
# Create workflows
self.workflows = self.create_workflows()
def create_workflows(self) -> Dict[str, graphbit.Workflow]:
"""Create all workflow pipelines."""
workflows = {}
# 1. Document Analysis Workflow
workflows['analysis'] = self.create_document_analysis_workflow()
# 2. Content Enhancement Workflow
workflows['enhancement'] = self.create_content_enhancement_workflow()
# 3. Quality Assessment Workflow
workflows['quality'] = self.create_quality_assessment_workflow()
# 4. Recommendation Generation Workflow
workflows['recommendation'] = self.create_recommendation_workflow()
return workflows
def create_document_analysis_workflow(self) -> graphbit.Workflow:
"""Create comprehensive document analysis workflow."""
workflow = graphbit.Workflow("Document Analysis Pipeline")
# Content Preprocessor
preprocessor = graphbit.Node.agent(
name="Content Preprocessor",
prompt="""Preprocess this document for analysis:
Title: {title}
Content: {content}
Category: {category}
Tasks:
1. Extract key information and structure
2. Identify main topics and themes
3. Note any special content (data, quotes, references)
4. Assess content complexity and readability
5. Identify potential quality issues
Provide structured preprocessing results.
""",
agent_id="preprocessor"
)
# Content Analyzer
analyzer = graphbit.Node.agent(
name="Content Analyzer",
prompt="""Analyze this preprocessed document content:
{preprocessed_content}
Perform comprehensive analysis:
1. **Topic Analysis**: Identify and rank key topics (max 5)
2. **Sentiment Analysis**: Determine overall sentiment (positive/negative/neutral)
3. **Content Quality**: Rate quality 1-10 based on clarity, depth, accuracy
4. **Key Insights**: Extract 3-5 main insights or findings
5. **Content Type**: Classify the content type and purpose
Format response as JSON with clear sections for each analysis type.
""",
agent_id="analyzer"
)
# Summary Generator
summarizer = graphbit.Node.agent(
name="Summary Generator",
prompt="""Generate a comprehensive summary based on this analysis:
Content Analysis: {analysis_results}
Original Content: {preprocessed_content}
Create:
1. **Executive Summary**: 2-3 sentence overview
2. **Key Points**: Bullet points of main findings
3. **Topics**: List of main topics with brief descriptions
4. **Insights**: Notable insights or conclusions
5. **Context**: How this content fits into its category
Keep summary informative yet concise.
""",
agent_id="summarizer"
)
# Connect the pipeline
prep_id = workflow.add_node(preprocessor)
analyze_id = workflow.add_node(analyzer)
summary_id = workflow.add_node(summarizer)
workflow.connect(prep_id, analyze_id)
workflow.connect(analyze_id, summary_id)
workflow.validate()
return workflow
def create_content_enhancement_workflow(self) -> graphbit.Workflow:
"""Create content enhancement and optimization workflow."""
workflow = graphbit.Workflow("Content Enhancement Pipeline")
# Content Reviewer
reviewer = graphbit.Node.agent(
name="Content Reviewer",
prompt="""Review this content for enhancement opportunities:
{content}
Evaluate:
1. **Clarity**: Is the content clear and well-structured?
2. **Completeness**: Are there missing elements or gaps?
3. **Engagement**: How engaging is the content?
4. **Accuracy**: Are there any factual concerns?
5. **Optimization**: What improvements could be made?
Provide specific, actionable recommendations.
""",
agent_id="reviewer"
)
# Enhancement Suggester
enhancer = graphbit.Node.agent(
name="Enhancement Suggester",
prompt="""Based on this review, suggest specific enhancements:
Review Results: {review_results}
Original Content: {content}
Suggest improvements for:
1. **Structure**: Better organization or formatting
2. **Content**: Additional information or clarifications
3. **Engagement**: Ways to make content more engaging
4. **SEO**: Search optimization opportunities
5. **Accessibility**: Improvements for better accessibility
Prioritize suggestions by impact and feasibility.
""",
agent_id="enhancer"
)
# Connect pipeline
review_id = workflow.add_node(reviewer)
enhance_id = workflow.add_node(enhancer)
workflow.connect(review_id, enhance_id)
workflow.validate()
return workflow
def create_quality_assessment_workflow(self) -> graphbit.Workflow:
"""Create quality assessment workflow."""
workflow = graphbit.Workflow("Quality Assessment Pipeline")
# Quality Assessor
assessor = graphbit.Node.agent(
name="Quality Assessor",
prompt="""Assess the quality of this content comprehensively:
{content}
Rate (1-10) and explain:
1. **Accuracy**: Factual correctness and reliability
2. **Clarity**: How clear and understandable the content is
3. **Depth**: Level of detail and thoroughness
4. **Structure**: Organization and logical flow
5. **Relevance**: How relevant to its intended purpose
6. **Originality**: Uniqueness and fresh insights
Provide overall quality score and detailed feedback.
""",
agent_id="assessor"
)
# Quality Gate
quality_gate = graphbit.Node.condition(
name="Quality Gate",
expression="overall_quality >= 7"
)
# Improvement Recommender
improver = graphbit.Node.agent(
name="Improvement Recommender",
prompt="""Based on this quality assessment, recommend improvements:
Quality Assessment: {quality_results}
For content that scored below 7, provide:
1. **Priority Issues**: Most critical problems to address
2. **Quick Wins**: Easy improvements with high impact
3. **Long-term Improvements**: Comprehensive enhancement strategies
4. **Resources**: Suggest tools or resources for improvement
Focus on actionable, specific recommendations.
""",
agent_id="improver"
)
# Connect pipeline
assess_id = workflow.add_node(assessor)
gate_id = workflow.add_node(quality_gate)
improve_id = workflow.add_node(improver)
workflow.connect(assess_id, gate_id)
workflow.connect(gate_id, improve_id)
workflow.validate()
return workflow
def create_recommendation_workflow(self) -> graphbit.Workflow:
"""Create intelligent recommendation workflow."""
workflow = graphbit.Workflow("Recommendation Engine")
# Context Analyzer
context_analyzer = graphbit.Node.agent(
name="Context Analyzer",
prompt="""Analyze the context for generating recommendations:
Current Document: {current_document}
Similar Documents: {similar_documents}
User Preferences: {user_preferences}
Document Category: {category}
Analyze:
1. **Content Themes**: Common themes across documents
2. **User Patterns**: What the user seems interested in
3. **Content Gaps**: Missing information or topics
4. **Complementary Content**: What would complement this content
5. **Trending Topics**: Relevant trending or popular topics
Provide context analysis for recommendation generation.
""",
agent_id="context_analyzer"
)
# Recommendation Generator
recommender = graphbit.Node.agent(
name="Recommendation Generator",
prompt="""Generate intelligent recommendations based on this context:
Context Analysis: {context_analysis}
Generate recommendations for:
1. **Related Content**: Documents or topics to explore next
2. **Deep Dive**: Areas for more detailed investigation
3. **Broad Exploration**: Related but different topics
4. **Practical Applications**: How to apply this knowledge
5. **Learning Path**: Suggested sequence for learning more
Rank recommendations by relevance and provide reasoning.
""",
agent_id="recommender"
)
# Connect pipeline
context_id = workflow.add_node(context_analyzer)
rec_id = workflow.add_node(recommender)
workflow.connect(context_id, rec_id)
workflow.validate()
return workflow
async def add_document(self, document: Document) -> str:
"""Add a document to the system with full analysis."""
print(f"🔄 Processing document: {document.title}")
# Store document
self.documents.append(document)
# Generate embedding
embedding = self.embedding_client.embed(
f"{document.title}\n\n{document.content}"
)
self.embeddings.append(embedding)
# Run analysis workflow
analysis_result = await self.analyze_document(document)
self.analysis_results[document.id] = analysis_result
print(f"Document processed: {document.id}")
return document.id
async def analyze_document(self, document: Document) -> AnalysisResult:
"""Analyze document using the analysis workflow."""
# Execute analysis workflow
workflow = self.workflows['analysis']
executor = self.executors['analysis']
try:
result = await executor.run_async(workflow)
if result.is_success():
# Parse results (simplified - in practice you'd parse JSON)
output = result.get_output()
return AnalysisResult(
document_id=document.id,
summary=output[:200] + "...", # Simplified
key_topics=["topic1", "topic2"], # Would parse from output
sentiment="neutral", # Would parse from output
quality_score=8.0, # Would parse from output
recommendations=["rec1", "rec2"] # Would parse from output
)
else:
raise Exception(f"Analysis failed: {result.get_error()}")
except Exception as e:
print(f"Analysis failed for {document.id}: {e}")
# Return default result
return AnalysisResult(
document_id=document.id,
summary="Analysis failed",
key_topics=[],
sentiment="unknown",
quality_score=0.0,
recommendations=[]
)
def find_similar_documents(self, document_id: str, top_k: int = 5) -> List[Dict]:
"""Find similar documents using embeddings."""
# Find document index
doc_index = None
for i, doc in enumerate(self.documents):
if doc.id == document_id:
doc_index = i
break
if doc_index is None:
return []
query_embedding = self.embeddings[doc_index]
similarities = []
for i, (doc, embedding) in enumerate(zip(self.documents, self.embeddings)):
if i == doc_index:
continue
similarity = graphbit.EmbeddingClient.similarity(query_embedding, embedding)
similarities.append({
'document_id': doc.id,
'title': doc.title,
'similarity': similarity,
'category': doc.category
})
# Sort by similarity
similarities.sort(key=lambda x: x['similarity'], reverse=True)
return similarities[:top_k]
async def generate_recommendations(self, document_id: str, user_preferences: Dict = None) -> List[str]:
"""Generate intelligent recommendations for a document."""
# Find similar documents
similar_docs = self.find_similar_documents(document_id, top_k=3)
# Get current document
current_doc = None
for doc in self.documents:
if doc.id == document_id:
current_doc = doc
break
if not current_doc:
return []
# Execute recommendation workflow
workflow = self.workflows['recommendation']
executor = self.executors['fast']
try:
result = await executor.run_async(workflow)
if result.is_success():
# Parse recommendations from output
output = result.get_output()
# In practice, you'd parse structured JSON output
return ["Recommendation 1", "Recommendation 2", "Recommendation 3"]
else:
return []
except Exception as e:
print(f"Recommendation generation failed: {e}")
return []
async def batch_process_documents(self, documents: List[Document]) -> Dict[str, AnalysisResult]:
"""Process multiple documents in batch."""
print(f"Batch processing {len(documents)} documents...")
# Add all documents first
for doc in documents:
self.documents.append(doc)
# Generate embeddings in batch
texts = [f"{doc.title}\n\n{doc.content}" for doc in documents]
embeddings = self.embedding_client.embed_many(texts)
self.embeddings.extend(embeddings)
# Process analysis in batches
batch_results = {}
batch_size = 5
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}/{(len(documents) + batch_size - 1)//batch_size}")
# Process batch concurrently
tasks = [self.analyze_document(doc) for doc in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for doc, result in zip(batch, results):
if isinstance(result, Exception):
print(f"Failed to process {doc.id}: {result}")
else:
batch_results[doc.id] = result
self.analysis_results[doc.id] = result
print(f"Batch processing completed: {len(batch_results)}/{len(documents)} successful")
return batch_results
def get_system_statistics(self) -> Dict:
"""Get comprehensive system statistics."""
stats = {
'documents': {
'total': len(self.documents),
'categories': {},
'average_length': 0
},
'analysis': {
'completed': len(self.analysis_results),
'average_quality': 0,
'sentiment_distribution': {'positive': 0, 'negative': 0, 'neutral': 0}
},
'embeddings': {
'total': len(self.embeddings),
'dimension': len(self.embeddings[0]) if self.embeddings else 0
},
'system': graphbit.get_system_info(),
'health': graphbit.health_check()
}
# Calculate document statistics
if self.documents:
category_counts = {}
total_length = 0
for doc in self.documents:
category_counts[doc.category] = category_counts.get(doc.category, 0) + 1
total_length += len(doc.content)
stats['documents']['categories'] = category_counts
stats['documents']['average_length'] = total_length / len(self.documents)
# Calculate analysis statistics
if self.analysis_results:
quality_scores = [r.quality_score for r in self.analysis_results.values()]
stats['analysis']['average_quality'] = sum(quality_scores) / len(quality_scores)
sentiment_counts = {'positive': 0, 'negative': 0, 'neutral': 0}
for result in self.analysis_results.values():
sentiment_counts[result.sentiment] = sentiment_counts.get(result.sentiment, 0) + 1
stats['analysis']['sentiment_distribution'] = sentiment_counts
return stats
def export_analysis_results(self, filepath: str):
"""Export analysis results to JSON file."""
export_data = {
'documents': [
{
'id': doc.id,
'title': doc.title,
'category': doc.category,
'metadata': doc.metadata
}
for doc in self.documents
],
'analysis_results': {
doc_id: {
'summary': result.summary,
'key_topics': result.key_topics,
'sentiment': result.sentiment,
'quality_score': result.quality_score,
'recommendations': result.recommendations
}
for doc_id, result in self.analysis_results.items()
},
'statistics': self.get_system_statistics(),
'export_timestamp': time.time()
}
with open(filepath, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"Analysis results exported to {filepath}")
# Example usage and demonstration
async def main():
"""Demonstrate the comprehensive AI pipeline."""
# Setup
openai_key = os.getenv("OPENAI_API_KEY")
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
if not openai_key:
print("OPENAI_API_KEY required")
return
print("Initializing Comprehensive AI Pipeline")
print("=" * 60)
# Create pipeline
pipeline = IntelligentDocumentPipeline(openai_key, anthropic_key)
# Sample documents
sample_documents = [
Document(
id="doc1",
title="Machine Learning Fundamentals",
content="""Machine learning is a subset of artificial intelligence that enables computers to learn and improve from experience without being explicitly programmed. It encompasses various algorithms and statistical models that allow systems to automatically learn patterns from data. The field includes supervised learning, where models learn from labeled training data, unsupervised learning, which finds hidden patterns in unlabeled data, and reinforcement learning, where agents learn through interaction with an environment. Key applications include image recognition, natural language processing, recommendation systems, and predictive analytics.""",
category="technology",
metadata={"author": "AI Research Team", "difficulty": "beginner", "tags": ["AI", "ML", "fundamentals"]}
),
Document(
id="doc2",
title="Sustainable Energy Solutions",
content="""Renewable energy sources such as solar, wind, hydroelectric, and geothermal power are becoming increasingly crucial for addressing climate change and reducing dependence on fossil fuels. Solar technology has seen dramatic cost reductions, with photovoltaic panels becoming more efficient and affordable. Wind energy has also expanded rapidly, with offshore wind farms providing significant power generation capacity. Energy storage technologies, including advanced battery systems and pumped hydro storage, are essential for managing the intermittent nature of renewable sources. Smart grid technologies enable better integration and distribution of renewable energy across power networks.""",
category="environment",
metadata={"author": "Green Energy Council", "difficulty": "intermediate", "tags": ["renewable", "solar", "wind"]}
),
Document(
id="doc3",
title="Digital Marketing Strategies",
content="""Modern digital marketing encompasses a wide range of strategies and channels designed to reach and engage target audiences online. Social media marketing leverages platforms like Facebook, Instagram, LinkedIn, and Twitter to build brand awareness and drive engagement. Search engine optimization (SEO) improves website visibility in search results, while pay-per-click (PPC) advertising provides immediate visibility. Content marketing focuses on creating valuable, relevant content to attract and retain customers. Email marketing remains highly effective for nurturing leads and maintaining customer relationships. Data analytics tools enable marketers to measure campaign performance and optimize strategies for better results.""",
category="business",
metadata={"author": "Marketing Institute", "difficulty": "intermediate", "tags": ["marketing", "digital", "SEO"]}
)
]
# Process documents individually
print("\nProcessing Documents Individually")
print("-" * 40)
for doc in sample_documents[:2]: # Process first 2 individually
doc_id = await pipeline.add_document(doc)
print(f"Processed: {doc.title} (ID: {doc_id})")
# Batch process remaining documents
print("\nBatch Processing Remaining Documents")
print("-" * 40)
remaining_docs = sample_documents[2:]
if remaining_docs:
batch_results = await pipeline.batch_process_documents(remaining_docs)
print(f"Batch processing completed: {len(batch_results)} documents")
# Find similar documents
print("\nFinding Similar Documents")
print("-" * 40)
similar_docs = pipeline.find_similar_documents("doc1", top_k=2)
print(f"Documents similar to '{pipeline.documents[0].title}':")
for sim_doc in similar_docs:
print(f" - {sim_doc['title']} (similarity: {sim_doc['similarity']:.3f})")
# Generate recommendations
print("\nGenerating Recommendations")
print("-" * 40)
recommendations = await pipeline.generate_recommendations("doc1")
print(f"Recommendations for '{pipeline.documents[0].title}':")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec}")
# Display system statistics
print("\nSystem Statistics")
print("-" * 40)
stats = pipeline.get_system_statistics()
print(f"Documents: {stats['documents']['total']}")
print(f"Categories: {list(stats['documents']['categories'].keys())}")
print(f"Analysis completed: {stats['analysis']['completed']}")
print(f"Average quality score: {stats['analysis']['average_quality']:.2f}")
print(f"Embeddings generated: {stats['embeddings']['total']}")
print(f"System health: {'✅' if stats['health']['overall_healthy'] else '❌'}")
# Export results
print("\nExporting Results")
print("-" * 40)
pipeline.export_analysis_results("analysis_results.json")
print("\nComprehensive AI Pipeline Demo Completed!")
if __name__ == "__main__":
asyncio.run(main())
Key System Features¶
Comprehensive Integration¶
- Multi-Workflow System: Analysis, enhancement, quality assessment, recommendations
- Multiple LLM Providers: OpenAI, Anthropic support with fallbacks
- Embedding Integration: Semantic search and similarity analysis
- Performance Optimization: Different executors for different use cases
Advanced Capabilities¶
- Batch Processing: Efficient handling of multiple documents
- Async Operations: Non-blocking operations for better performance
- Quality Gates: Conditional workflow execution based on quality scores
- Intelligent Recommendations: Context-aware recommendation generation
Production Features¶
- Error Handling: Comprehensive error management and fallbacks
- Monitoring: System health checks and performance statistics
- Export Capabilities: JSON export of analysis results
- Flexible Configuration: Multiple execution modes and provider support
Real-World Applications¶
- Document Management: Intelligent document analysis and organization
- Content Platforms: Automated content quality assessment
- Knowledge Management: Semantic search and recommendation systems
- Research Tools: Comprehensive analysis and insight generation
This comprehensive example demonstrates how GraphBit's various components work together to create sophisticated, production-ready AI applications that can handle complex workflows with reliability and performance.