LLM Integration and Advanced Usage¶
This example demonstrates comprehensive LLM integration with GraphBit, showcasing various providers, execution modes, and advanced features.
Overview¶
We'll explore: 1. Multiple LLM Providers: OpenAI, Anthropic, Ollama 2. Execution Modes: Sync, async, batch, streaming 3. Performance Optimization: High-throughput, low-latency, memory-optimized 4. Error Handling: Resilience patterns and fallbacks 5. Monitoring: Performance metrics and health checks
Complete LLM Client Example¶
from graphbit import init, LlmConfig, LlmClient, health_check, get_system_info
import os
import asyncio
import time
class AdvancedLLMSystem:
def __init__(self):
"""Initialize the advanced LLM system."""
# Initialize GraphBit
init(enable_tracing=True)
# Store multiple provider clients
self.clients = {}
self.initialize_providers()
def initialize_providers(self):
"""Initialize all available LLM providers."""
print("Initializing LLM providers...")
# OpenAI client
if os.getenv("OPENAI_API_KEY"):
openai_config = LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
self.clients['openai'] = LlmClient(openai_config, debug=True)
print("OpenAI client initialized")
# Anthropic client
if os.getenv("ANTHROPIC_API_KEY"):
anthropic_config = LlmConfig.anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-sonnet-4-20250514"
)
self.clients['anthropic'] = LlmClient(anthropic_config, debug=True)
print("Anthropic client initialized")
# DeepSeek client
if os.getenv("DEEPSEEK_API_KEY"):
deepseek_config = LlmConfig.deepseek(
api_key=os.getenv("DEEPSEEK_API_KEY"),
model="deepseek-chat"
)
self.clients['deepseek'] = LlmClient(deepseek_config, debug=True)
print("DeepSeek client initialized")
# Ollama client (no API key required)
try:
ollama_config = LlmConfig.ollama("llama3.2")
self.clients['ollama'] = LlmClient(ollama_config, debug=True)
print("Ollama client initialized")
except Exception as e:
print(f"Ollama client failed: {e}")
if not self.clients:
raise Exception("No LLM providers available. Please set API keys or install Ollama.")
def test_basic_completion(self, provider: str = 'openai'):
"""Test basic text completion."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return None
client = self.clients[provider]
prompt = "Explain quantum computing in simple terms."
print(f"\nTesting basic completion with {provider}...")
print(f"Prompt: {prompt}")
try:
start_time = time.time()
response = client.complete(
prompt=prompt,
max_tokens=200,
temperature=0.7
)
duration = (time.time() - start_time) * 1000
print(f"Completed in {duration:.2f}ms")
print(f"Response: {response[:200]}...")
return response
except Exception as e:
print(f"Completion failed: {e}")
return None
async def test_async_completion(self, provider: str = 'openai'):
"""Test asynchronous completion."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return None
client = self.clients[provider]
prompt = "Write a haiku about artificial intelligence."
print(f"\nTesting async completion with {provider}...")
print(f"Prompt: {prompt}")
try:
start_time = time.time()
response = await client.complete_async(
prompt=prompt,
max_tokens=100,
temperature=0.8
)
duration = (time.time() - start_time) * 1000
print(f"Async completed in {duration:.2f}ms")
print(f"Response: {response}")
return response
except Exception as e:
print(f"Async completion failed: {e}")
return None
async def test_batch_completion(self, provider: str = 'openai'):
"""Test batch completion for multiple prompts."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return None
client = self.clients[provider]
prompts = [
"What is machine learning?",
"Explain neural networks briefly.",
"What are the benefits of cloud computing?",
"How does blockchain work?",
"What is the future of AI?"
]
print(f"\nTesting batch completion with {provider}...")
print(f"Processing {len(prompts)} prompts...")
try:
start_time = time.time()
responses = await client.complete_batch(
prompts=prompts,
max_tokens=100,
temperature=0.6,
max_concurrency=3
)
duration = (time.time() - start_time) * 1000
print(f"Batch completed in {duration:.2f}ms")
print(f"Average per prompt: {duration/len(prompts):.2f}ms")
for i, (prompt, response) in enumerate(zip(prompts, responses)):
print(f"\n{i+1}. {prompt}")
print(f" → {response[:100]}...")
return responses
except Exception as e:
print(f"Batch completion failed: {e}")
return None
async def test_chat_optimized(self, provider: str = 'openai'):
"""Test optimized chat completion."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return None
client = self.clients[provider]
messages = [
("system", "You are a helpful AI assistant specialized in technology."),
("user", "What's the difference between AI and ML?"),
("assistant", "AI is the broader concept of machines being able to carry out tasks in a smart way, while ML is a specific subset of AI that involves training algorithms on data."),
("user", "Can you give me a practical example?")
]
print(f"\nTesting chat-optimized completion with {provider}...")
try:
start_time = time.time()
response = await client.chat_optimized(
messages=messages,
max_tokens=150,
temperature=0.7
)
duration = (time.time() - start_time) * 1000
print(f"Chat completed in {duration:.2f}ms")
print(f"Response: {response}")
return response
except Exception as e:
print(f"Chat completion failed: {e}")
return None
async def test_streaming_completion(self, provider: str = 'openai'):
"""Test streaming completion."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return None
client = self.clients[provider]
prompt = "Write a detailed explanation of how machine learning works, covering the key concepts step by step."
print(f"\nTesting streaming completion with {provider}...")
print(f"Prompt: {prompt}")
print("Streaming response:")
try:
start_time = time.time()
# Note: Streaming returns an async iterator
stream = await client.complete_stream(
prompt=prompt,
max_tokens=300,
temperature=0.7
)
full_response = ""
async for chunk in stream:
print(chunk, end='', flush=True)
full_response += chunk
duration = (time.time() - start_time) * 1000
print(f"\n\nStreaming completed in {duration:.2f}ms")
print(f"Total tokens: ~{len(full_response.split())}")
return full_response
except Exception as e:
print(f"Streaming completion failed: {e}")
return None
def test_client_warmup(self, provider: str = 'openai'):
"""Test client warmup for improved performance."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return
client = self.clients[provider]
print(f"\nTesting client warmup with {provider}...")
try:
# Warmup the client
start_time = time.time()
asyncio.run(client.warmup())
warmup_duration = (time.time() - start_time) * 1000
print(f"Warmup completed in {warmup_duration:.2f}ms")
# Test performance after warmup
start_time = time.time()
response = client.complete("Quick test after warmup", max_tokens=50)
completion_duration = (time.time() - start_time) * 1000
print(f"Post-warmup completion: {completion_duration:.2f}ms")
except Exception as e:
print(f"Warmup failed: {e}")
def get_client_statistics(self, provider: str = 'openai'):
"""Get detailed client statistics."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return None
client = self.clients[provider]
print(f"\nGetting statistics for {provider}...")
try:
stats = client.get_stats()
print(f"Client Statistics for {provider}:")
for key, value in stats.items():
if isinstance(value, float):
print(f" {key}: {value:.3f}")
else:
print(f" {key}: {value}")
return stats
except Exception as e:
print(f"Failed to get statistics: {e}")
return None
def reset_client_statistics(self, provider: str = 'openai'):
"""Reset client statistics."""
if provider not in self.clients:
print(f"Provider '{provider}' not available")
return
client = self.clients[provider]
try:
client.reset_stats()
print(f"Statistics reset for {provider}")
except Exception as e:
print(f"Failed to reset statistics: {e}")
def compare_providers(self, prompt: str = "Explain the concept of recursion in programming."):
"""Compare responses from all available providers."""
print("\nComparing providers...")
print(f"Prompt: {prompt}")
results = {}
for provider_name, client in self.clients.items():
print(f"\n--- Testing {provider_name} ---")
try:
start_time = time.time()
response = client.complete(
prompt=prompt,
max_tokens=150,
temperature=0.7
)
duration = (time.time() - start_time) * 1000
results[provider_name] = {
'response': response,
'duration_ms': duration,
'success': True
}
print(f"{provider_name}: {duration:.2f}ms")
print(f"Response: {response[:100]}...")
except Exception as e:
results[provider_name] = {
'error': str(e),
'success': False
}
print(f"{provider_name}: {e}")
return results
# Performance-optimized clients
def create_performance_optimized_clients():
"""Create clients optimized for different performance characteristics."""
init()
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("OpenAI API key required for performance tests")
return None
# High-throughput client
high_throughput_config = LlmConfig.openai(api_key, "gpt-4o-mini")
high_throughput_client = LlmClient(high_throughput_config, debug=False)
# Low-latency client
low_latency_config = LlmConfig.openai(api_key, "gpt-4o-mini")
low_latency_client = LlmClient(low_latency_config, debug=False)
return {
'high_throughput': high_throughput_client,
'low_latency': low_latency_client
}
async def performance_benchmark():
"""Benchmark different client configurations."""
clients = create_performance_optimized_clients()
if not clients:
return
test_prompt = "Summarize the benefits of renewable energy in one paragraph."
print("\n🏃 Performance Benchmark")
print("=" * 50)
for config_name, client in clients.items():
print(f"\nTesting {config_name} configuration...")
# Single completion test
start_time = time.time()
try:
response = client.complete(test_prompt, max_tokens=100)
single_duration = (time.time() - start_time) * 1000
print(f"Single completion: {single_duration:.2f}ms")
except Exception as e:
print(f"Single completion failed: {e}")
continue
# Batch test
batch_prompts = [test_prompt] * 5
start_time = time.time()
try:
batch_responses = await client.complete_batch(
batch_prompts,
max_tokens=100,
max_concurrency=3
)
batch_duration = (time.time() - start_time) * 1000
avg_per_prompt = batch_duration / len(batch_prompts)
print(f"Batch completion (5 prompts): {batch_duration:.2f}ms total, {avg_per_prompt:.2f}ms avg")
except Exception as e:
print(f"Batch completion failed: {e}")
# Get statistics
try:
stats = client.get_stats()
print(f"Stats: {stats.get('total_requests', 0)} requests, "
f"{stats.get('average_response_time_ms', 0):.2f}ms avg response time")
except:
pass
# Error handling and resilience examples
async def test_error_handling():
"""Test error handling and resilience features."""
init()
# Test with invalid API key
print("\nTesting Error Handling")
print("=" * 40)
try:
invalid_config = LlmConfig.openai("invalid-key", "gpt-4o-mini")
invalid_client = LlmClient(invalid_config)
print("Testing with invalid API key...")
response = invalid_client.complete("Test prompt", max_tokens=50)
print("Expected error but got response")
except Exception as e:
print(f"Correctly handled invalid API key: {type(e).__name__}")
# Test timeout handling
if os.getenv("OPENAI_API_KEY"):
try:
config = LlmConfig.openai(os.getenv("OPENAI_API_KEY"), "gpt-4o-mini")
client = LlmClient(config)
print("\nTesting very long prompt (potential timeout)...")
very_long_prompt = "Write a comprehensive essay about " + "technology " * 1000
response = client.complete(very_long_prompt, max_tokens=2000)
print("Long prompt handled successfully")
except Exception as e:
print(f"Timeout/limit handled: {type(e).__name__}")
# System health monitoring
def monitor_llm_system_health():
"""Monitor LLM system health and performance."""
init()
print("\nSystem Health Check")
print("=" * 40)
# Check GraphBit health
health = health_check()
print("GraphBit Health:")
for key, value in health.items():
status = "Ok!" if value else "Not Ok!"
print(f" {status} {key}: {value}")
# Get system information
info = get_system_info()
print("\nSystem Information:")
print(f" Version: {info.get('version', 'unknown')}")
print(f" Runtime threads: {info.get('runtime_worker_threads', 'unknown')}")
print(f" Memory allocator: {info.get('memory_allocator', 'unknown')}")
# Test provider connectivity
print("\nProvider Connectivity:")
providers_to_test = [
('OpenAI', lambda: LlmConfig.openai(os.getenv("OPENAI_API_KEY", "test"), "gpt-4o-mini")),
('Azure OpenAI', lambda: LlmConfig.azure_openai(
os.getenv("AZURE_OPENAI_API_KEY", "test"),
os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o"),
os.getenv("AZURE_OPENAI_ENDPOINT", "https://test.openai.azure.com")
)),
('Anthropic', lambda: LlmConfig.anthropic(os.getenv("ANTHROPIC_API_KEY", "test"), "claude-sonnet-4-20250514")),
('Ollama', lambda: LlmConfig.ollama("llama3.2"))
]
for provider_name, config_func in providers_to_test:
try:
config = config_func()
client = LlmClient(config)
print(f" {provider_name}: Configuration valid")
except Exception as e:
print(f" {provider_name}: {str(e)[:50]}...")
# Example usage
async def main():
"""Run comprehensive LLM system demonstration."""
print("GraphBit LLM Integration Demo")
print("=" * 60)
try:
# Initialize system
llm_system = AdvancedLLMSystem()
# Test basic functionality
for provider in llm_system.clients.keys():
llm_system.test_basic_completion(provider)
await llm_system.test_async_completion(provider)
break # Just test first available provider for demo
# Test advanced features with primary provider
primary_provider = list(llm_system.clients.keys())[0]
await llm_system.test_batch_completion(primary_provider)
await llm_system.test_chat_optimized(primary_provider)
# Test performance features
llm_system.test_client_warmup(primary_provider)
llm_system.get_client_statistics(primary_provider)
# Compare providers if multiple available
if len(llm_system.clients) > 1:
llm_system.compare_providers()
# Performance benchmark
await performance_benchmark()
# Error handling tests
await test_error_handling()
# System health check
monitor_llm_system_health()
print("\nDemo completed successfully!")
except Exception as e:
print(f"\nDemo failed: {e}")
if __name__ == "__main__":
asyncio.run(main())
Simplified Examples¶
Quick OpenAI Integration¶
from graphbit import init, LlmConfig, LlmClient
import os
def quick_openai_example():
"""Simple OpenAI integration example."""
# Initialize
init()
# Configure and create client
config = LlmConfig.openai(
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
client = LlmClient(config)
# Simple completion
response = client.complete(
"Explain quantum computing in 3 sentences.",
max_tokens=100,
temperature=0.7
)
print(f"Response: {response}")
# Get statistics
stats = client.get_stats()
print(f"Total requests: {stats.get('total_requests', 0)}")
# Usage
quick_openai_example()
Local Ollama Integration¶
from graphbit import init, LlmConfig, LlmClient
def quick_ollama_example():
"""Simple Ollama integration example."""
# Initialize
init()
# Configure Ollama (no API key needed)
config = LlmConfig.ollama("llama3.2")
client = LlmClient(config, debug=True)
# Test completion
try:
response = client.complete(
"What are the benefits of local AI models?",
max_tokens=150,
temperature=0.8
)
print(f"Ollama response: {response}")
except Exception as e:
print(f"Ollama error (make sure Ollama is running): {e}")
# Usage
quick_ollama_example()
Anthropic Claude Integration¶
from graphbit import init, LlmConfig, LlmClient
import os
def quick_anthropic_example():
"""Simple Anthropic Claude integration example."""
# Initialize
init()
# Configure Anthropic
config = LlmConfig.anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-sonnet-4-20250514"
)
client = LlmClient(config)
# Complex reasoning task
response = client.complete(
"""Analyze the pros and cons of remote work from both
employee and employer perspectives. Be balanced and thorough.""",
max_tokens=300,
temperature=0.6
)
print(f"Claude's analysis: {response}")
# Usage (requires ANTHROPIC_API_KEY)
quick_anthropic_example()
Best Practices¶
Configuration Management¶
from graphbit import init, LlmConfig, LlmClient
import time
import os
def setup_production_llm_config():
"""Set up production-ready LLM configuration."""
init(log_level="warn", enable_tracing=False)
# Primary provider with fallback
providers = []
if os.getenv("OPENAI_API_KEY"):
providers.append(('openai', LlmConfig.openai(
os.getenv("OPENAI_API_KEY"),
"gpt-4o-mini"
)))
if all([os.getenv("AZURE_OPENAI_API_KEY"), os.getenv("AZURE_OPENAI_ENDPOINT"), os.getenv("AZURE_OPENAI_DEPLOYMENT")]):
providers.append(('azure_openai', LlmConfig.azure_openai(
os.getenv("AZURE_OPENAI_API_KEY"),
os.getenv("AZURE_OPENAI_DEPLOYMENT"),
os.getenv("AZURE_OPENAI_ENDPOINT"),
os.getenv("AZURE_OPENAI_API_VERSION", "2024-10-21")
)))
if os.getenv("ANTHROPIC_API_KEY"):
providers.append(('anthropic', LlmConfig.anthropic(
os.getenv("ANTHROPIC_API_KEY"),
"claude-sonnet-4-20250514"
)))
# Add local fallback
try:
providers.append(('ollama', LlmConfig.ollama("llama3.2")))
except:
pass
if not providers:
raise Exception("No LLM providers configured")
return providers
def robust_completion(prompt: str, max_retries: int = 3):
"""Completion with provider fallback."""
providers = setup_production_llm_config()
for provider_name, config in providers:
for attempt in range(max_retries):
try:
client = LlmClient(config, debug=False)
return client.complete(prompt, max_tokens=200)
except Exception as e:
print(f"Attempt {attempt + 1} with {provider_name} failed: {e}")
if attempt == max_retries - 1:
continue # Try next provider
time.sleep(2 ** attempt) # Exponential backoff
raise Exception("All providers failed")
Key Features¶
Provider Flexibility¶
- Multiple Providers: OpenAI, Anthropic, Ollama support
- Easy Switching: Consistent API across providers
- Fallback Support: Automatic provider failover
Performance Optimization¶
- Async Operations: Non-blocking completions
- Batch Processing: Efficient multiple prompt handling
- Streaming: Real-time response streaming
- Client Warmup: Improved initial response times
Monitoring and Reliability¶
- Statistics Tracking: Detailed performance metrics
- Health Checks: System health monitoring
- Error Handling: Comprehensive error management
- Resilience Patterns: Circuit breakers and retry logic
This example demonstrates GraphBit's comprehensive LLM integration capabilities for building production-ready AI applications.