Python API Reference¶
Complete reference for GraphBit's Python API. This document covers all classes, methods, and their usage based on the actual Python binding implementation.
Module: graphbit¶
Core Functions¶
init(log_level=None, enable_tracing=None, debug=None)¶
Initialize the GraphBit library with optional configuration.
from graphbit import init
# Basic initialization
init()
# With debugging enabled
init(debug=True)
# With custom log level and tracing
init(log_level="info", enable_tracing=True)
Parameters: - log_level (str, optional): Log level ("trace", "debug", "info", "warn", "error"). Default: "warn" - enable_tracing (bool, optional): Enable tracing. Default: False - debug (bool, optional): Enable debug mode (alias for enable_tracing). Default: False
Returns: None
Raises: RuntimeError if initialization fails
version()¶
Get the current GraphBit version.
from graphbit import version
# Get version
version = version()
print(f"GraphBit version: {version}")
Returns: str - Version string (e.g., "0.1.0")
get_system_info()¶
Get comprehensive system information and health status.
from graphbit import get_system_info
# Get system info
info = get_system_info()
print(f"CPU count: {info['cpu_count']}")
print(f"Runtime initialized: {info['runtime_initialized']}")
Returns: dict - Dictionary containing: - version: GraphBit version - python_binding_version: Python binding version - runtime_uptime_seconds: Runtime uptime - runtime_worker_threads: Number of worker threads - cpu_count: Number of CPU cores - runtime_initialized: Runtime initialization status - memory_allocator: Memory allocator type - build_target: Build target - build_profile: Build profile (debug/release)
health_check()¶
Perform comprehensive health checks.
from graphbit import health_check
health = health_check()
if health['overall_healthy']:
print("System is healthy")
else:
print("System has issues")
Returns: dict - Dictionary containing health status information
configure_runtime(worker_threads=None, max_blocking_threads=None, thread_stack_size_mb=None)¶
Configure the global runtime with custom settings (advanced).
# Configure runtime before init()
from graphbit import configure_runtime, init
configure_runtime(worker_threads=8, max_blocking_threads=16)
init()
Parameters: - worker_threads (int, optional): Number of worker threads - max_blocking_threads (int, optional): Maximum blocking threads - thread_stack_size_mb (int, optional): Thread stack size in MB
shutdown()¶
Gracefully shutdown the library (for testing and cleanup).
LLM Configuration¶
LlmConfig¶
Configuration class for Large Language Model providers.
Static Methods¶
LlmConfig.openai(api_key, model=None)¶
Create OpenAI provider configuration.
from graphbit import LlmConfig
# Basic configuration
config = LlmConfig.openai("your-openai-api-key", "gpt-4o-mini")
# With default model
config = LlmConfig.openai("your-openai-api-key") # Uses default model "gpt-4o-mini"
Parameters: - api_key (str): OpenAI API key - model (str, optional): Model name. Default: "gpt-4o-mini"
Returns: LlmConfig instance
LlmConfig.anthropic(api_key, model=None)¶
Create Anthropic provider configuration.
config = LlmConfig.anthropic("your-anthropic-api-key", "claude-sonnet-4-20250514")
# With default model
config = LlmConfig.anthropic("your-anthropic-api-key") # Uses default model "claude-sonnet-4-20250514"
Parameters: - api_key (str): Anthropic API key - model (str, optional): Model name. Default: "claude-sonnet-4-20250514"
Returns: LlmConfig instance
LlmConfig.deepseek(api_key, model=None)¶
Create DeepSeek provider configuration.
config = LlmConfig.deepseek("your-deepseek-api-key", "deepseek-chat")
# With default model
config = LlmConfig.deepseek("your-deepseek-api-key") # Uses default model "deepseek-chat"
Parameters: - api_key (str): DeepSeek API key - model (str, optional): Model name. Default: "deepseek-chat"
Available Models: - deepseek-chat: General conversation and instruction following - deepseek-coder: Specialized for code generation and programming tasks - deepseek-reasoner: Advanced reasoning and mathematical problem solving
Returns: LlmConfig instance
LlmConfig.mistralai(api_key, model=None)¶
Create MistralAI provider configuration.
config = LlmConfig.mistralai("your-mistralai-api-key", "mistral-large-latest")
# With default model
config = LlmConfig.mistralai("your-mistralai-api-key") # Uses default model "mistral-large-latest"
Parameters: - api_key (str): MistralAI API key - model (str, optional): Model name. Default: "mistral-large-latest"
Available Models: - mistral-large-latest: Complex reasoning, multilingual tasks (128K context, function calling) - mistral-medium-latest: Balanced performance and cost (32K context, function calling) - mistral-small-latest: Fast responses, simple tasks (32K context, function calling) - open-mistral-7b: Open source, lightweight (32K context) - open-mixtral-8x7b: Open source, high performance (32K context) - open-mixtral-8x22b: Open source, largest model (64K context)
Returns: LlmConfig instance
LlmConfig.ollama(model=None)¶
Create Ollama provider configuration.
config = LlmConfig.ollama("llama3.2")
# With default model
config = LlmConfig.ollama() # Uses default model "llama3.2"
Parameters: - model (str, optional): Model name. Default: "llama3.2"
Returns: LlmConfig instance
Instance Methods¶
provider()¶
Get the provider name.
model()¶
Get the model name.
LLM Client¶
LlmClient¶
Production-grade LLM client with resilience patterns.
Constructor¶
LlmClient(config, debug=None)¶
Create a new LLM client.
from graphbit import LlmClient
# Basic configuration
client = LlmClient(config)
# With debugging
client = LlmClient(config, debug=True)
Parameters: - config (LlmConfig): LLM configuration - debug (bool, optional): Enable debug mode. Default: False
Methods¶
complete(prompt, max_tokens=None, temperature=None)¶
Synchronous completion with resilience.
response = client.complete("Write a short story about a robot")
print(response)
# With parameters
response = client.complete(
"Explain quantum computing",
max_tokens=500,
temperature=0.7
)
Parameters: - prompt (str): Input prompt - max_tokens (int, optional): Maximum tokens to generate (1-100000) - temperature (float, optional): Sampling temperature (0.0-2.0)
Returns: str - Generated text Raises: ValueError for invalid parameters
complete_async(prompt, max_tokens=None, temperature=None)¶
Asynchronous completion with full resilience.
import asyncio
async def generate():
response = await client.complete_async("Tell me a joke")
return response
result = asyncio.run(generate())
Parameters: Same as complete() Returns: Awaitable[str] - Generated text
complete_batch(prompts, max_tokens=None, temperature=None, max_concurrency=None)¶
Ultra-fast batch processing with controlled concurrency.
import asyncio
prompts = [
"Summarize AI trends",
"Explain blockchain",
"Describe quantum computing"
]
async def batch_generate():
responses = await client.complete_batch(prompts, max_concurrency=5)
return responses
results = asyncio.run(batch_generate())
Parameters: - prompts (List[str]): List of prompts (max 1000) - max_tokens (int, optional): Maximum tokens per prompt - temperature (float, optional): Sampling temperature - max_concurrency (int, optional): Maximum concurrent requests. Default: CPU count * 2
Returns: Awaitable[List[str]] - List of generated responses
chat_optimized(messages, max_tokens=None, temperature=None)¶
Optimized chat completion with message validation.
import asyncio
messages = [
("system", "You are a helpful assistant"),
("user", "What is Python?"),
("assistant", "Python is a programming language"),
("user", "Tell me more about its features")
]
async def chat():
response = await client.chat_optimized(messages)
return response
result = asyncio.run(chat())
Parameters: - messages (List[Tuple[str, str]]): List of (role, content) tuples - max_tokens (int, optional): Maximum tokens to generate - temperature (float, optional): Sampling temperature
Returns: Awaitable[str] - Generated response
complete_stream(prompt, max_tokens=None, temperature=None)¶
Stream completion (alias for async complete).
import asyncio
async def stream():
response = await client.complete_stream("Write a poem")
return response
result = asyncio.run(stream())
get_stats()¶
Get comprehensive client statistics.
stats = client.get_stats()
print(f"Total requests: {stats['total_requests']}")
print(f"Success rate: {stats['success_rate']}")
print(f"Average response time: {stats['average_response_time_ms']}ms")
Returns: dict - Dictionary containing performance metrics
warmup()¶
Warm up the client to avoid initialization overhead.
import asyncio
async def prepare():
await client.warmup()
print("Client warmed up")
asyncio.run(prepare())
reset_stats()¶
Reset client statistics.
Document Processing¶
DocumentLoaderConfig¶
Configuration class for document loading operations.
Constructor¶
DocumentLoaderConfig(max_file_size=None, default_encoding=None, preserve_formatting=None)¶
Create a new document loader configuration.
from graphbit import DocumentLoaderConfig
# Default configuration
config = DocumentLoaderConfig()
# Custom configuration
config = DocumentLoaderConfig(
max_file_size=50_000_000, # 50MB limit
default_encoding="utf-8", # Text encoding
preserve_formatting=True # Keep document formatting
)
Parameters: - max_file_size (int, optional): Maximum file size in bytes. Must be greater than 0 - default_encoding (str, optional): Default text encoding for text files. Cannot be empty - preserve_formatting (bool, optional): Whether to preserve document formatting. Default: False
Properties¶
max_file_size¶
Get or set the maximum file size limit.
default_encoding¶
Get or set the default text encoding.
preserve_formatting¶
Get or set the formatting preservation flag.
extraction_settings¶
Get or set extraction settings as a dictionary.
settings = config.extraction_settings
settings = {"pdf_parser": "advanced", "ocr_enabled": True}
config.set_extraction_settings(settings)
DocumentContent¶
Contains the extracted content and metadata from a loaded document.
Properties¶
source¶
Get the source path or URL of the document.
document_type¶
Get the detected document type.
content¶
Get the extracted text content.
file_size¶
Get the file size in bytes.
extracted_at¶
Get the extraction timestamp as a UTC timestamp.
metadata¶
Get document metadata as a dictionary.
metadata = content.metadata
print(f"Author: {metadata.get('author', 'Unknown')}")
print(f"Pages: {metadata.get('pages', 'N/A')}")
Methods¶
content_length()¶
Get the length of extracted content.
is_empty()¶
Check if the extracted content is empty.
preview(max_length=500)¶
Get a preview of the content.
preview = content.preview(200) # First 200 characters
full_preview = content.preview() # First 500 characters (default)
DocumentLoader¶
Main class for loading and processing documents from various sources.
Constructor¶
DocumentLoader(config=None)¶
Create a new document loader.
from graphbit import DocumentLoader, DocumentLoaderConfig
# With default configuration
loader = DocumentLoader()
# With custom configuration
config = DocumentLoaderConfig(max_file_size=10_000_000)
loader = DocumentLoader(config)
Methods¶
load_document(source_path, document_type)¶
Load and extract content from a document.
# Load a PDF document
content = loader.load_document("/path/to/document.pdf", "pdf")
print(f"Extracted {content.content_length()} characters")
# Load a text file
content = loader.load_document("data/report.txt", "txt")
print(content.content)
# Load a Word document
content = loader.load_document("docs/manual.docx", "docx")
print(f"Document metadata: {content.metadata}")
Parameters: - source_path (str): Path to the document file. Cannot be empty - document_type (str): Type of document. Cannot be empty
Returns: DocumentContent - The extracted content and metadata Raises: ValueError for invalid parameters, RuntimeError for loading errors
Static Methods¶
DocumentLoader.supported_types()¶
Get list of supported document types.
types = DocumentLoader.supported_types()
print(f"Supported formats: {types}")
# Output: ['txt', 'pdf', 'docx', 'json', 'csv', 'xml', 'html']
DocumentLoader.detect_document_type(file_path)¶
Detect document type from file extension.
doc_type = DocumentLoader.detect_document_type("report.pdf")
print(f"Detected type: {doc_type}") # "pdf"
# Returns None if type cannot be detected
unknown_type = DocumentLoader.detect_document_type("file.unknown")
print(unknown_type) # None
DocumentLoader.validate_document_source(source_path, document_type)¶
Validate document source and type combination.
try:
DocumentLoader.validate_document_source("report.pdf", "pdf")
print("Valid document source")
except Exception as e:
print(f"Invalid: {e}")
Text Splitting¶
TextSplitterConfig¶
Configuration class for text chunking/splitting strategies.
Static Methods¶
TextSplitterConfig.character(chunk_size, chunk_overlap=0)¶
Create a character-based splitter.
Parameters: - chunk_size (int): Target max characters per chunk. Must be > 0 - chunk_overlap (int, optional): Character overlap between chunks. Must be < chunk_size. Default: 0
TextSplitterConfig.token(chunk_size, chunk_overlap=0, token_pattern=None)¶
Create a token-based splitter.
Parameters: - chunk_size (int): Target max tokens per chunk. Must be > 0 - chunk_overlap (int, optional): Token overlap. Must be < chunk_size. Default: 0 - token_pattern (str, optional): Regex for token boundaries. Default: None
TextSplitterConfig.sentence(chunk_size, chunk_overlap=0, sentence_endings=None)¶
Create a sentence-based splitter.
Parameters: - chunk_size (int): Approx. sentences per chunk. Must be > 0 - chunk_overlap (int, optional): Overlap in sentences. Default: 0 - sentence_endings (List[str], optional): Custom sentence terminators. Default: built-in
TextSplitterConfig.recursive(chunk_size, chunk_overlap=0, separators=None)¶
Create a recursive splitter (tries larger separators, then smaller).
Parameters: - chunk_size (int): Target max characters per chunk. Must be > 0 - chunk_overlap (int, optional): Character overlap. Default: 0 - separators (List[str], optional): Ordered fallback boundaries. Default: built-in
TextSplitterConfig.paragraph(chunk_size, chunk_overlap=0, min_paragraph_length=None)¶
Create a paragraph-based splitter.
Parameters: - chunk_size (int): Paragraphs per chunk. Must be > 0 - chunk_overlap (int, optional): Paragraph overlap. Default: 0 - min_paragraph_length (int, optional): Minimum chars for a standalone paragraph. Default: None
TextSplitterConfig.markdown(chunk_size, chunk_overlap=0, split_by_headers=True)¶
Create a Markdown-aware splitter.
Parameters: - chunk_size (int): Target max characters per chunk. Must be > 0 - chunk_overlap (int, optional): Character overlap. Default: 0 - split_by_headers (bool, optional): Prefer header boundaries. Default: True
TextSplitterConfig.code(chunk_size, chunk_overlap=0, language=None)¶
Create a code-aware splitter.
Parameters: - chunk_size (int): Target max characters per chunk. Must be > 0 - chunk_overlap (int, optional): Character overlap. Default: 0 - language (str, optional): Language hint (e.g., "python", "js"). Default: None
For code, whitespace trimming is disabled by default.
TextSplitterConfig.regex(pattern, chunk_size, chunk_overlap=0)¶
Create a regex-anchored splitter.
Parameters: - pattern (str): Non-empty regex used as a primary boundary - chunk_size (int): Target max characters per chunk. Must be > 0 - chunk_overlap (int, optional): Character overlap. Default: 0
Instance Methods¶
config.set_preserve_word_boundaries(True)
config.set_trim_whitespace(True)
config.set_include_metadata(True)
set_preserve_word_boundaries(preserve: bool)set_trim_whitespace(trim: bool)set_include_metadata(include: bool)
Concrete splitter classes and chunk representation used by GraphBit’s Python bindings. These work seamlessly with TextSplitterConfig or can be used directly.
TextChunk¶
Representation of a single chunk produced by any splitter.
Properties¶
chunk.content # str — the chunk text
chunk.start_index # int — start offset in original text
chunk.end_index # int — end offset (exclusive) in original text
chunk.chunk_index # int — position of the chunk in sequence (0-based)
chunk.metadata # Dict[str, str] — splitter-added metadata (if any)
# String forms (helpful in logs/debugging)
str(chunk) # -> "TextChunk(index=..., start=..., end=..., length=...)"
repr(chunk) # -> "TextChunk(content='...', index=..., start=..., end=...)"
CharacterSplitter¶
Character-count–based chunker.
Constructor¶
Parameters: - chunk_size (int): Max characters per chunk. Must be > 0. - chunk_overlap (int, optional): Overlap (chars) between consecutive chunks. Must be < chunk_size. Default: 0.
Methods¶
chunks = splitter.split_text(text) # -> List[TextChunk]
all_chunks = splitter.split_texts(texts) # -> List[List[TextChunk]]
size = splitter.chunk_size # -> int
overlap = splitter.chunk_overlap # -> int
Example
splitter = CharacterSplitter(1000, 100)
chunks = splitter.split_text("Large string ...")
print(chunks[0].content, chunks[0].start_index, chunks[0].end_index)
TokenSplitter¶
Token-count–based chunker with optional custom tokenization pattern.
Constructor¶
from graphbit import TokenSplitter
splitter = TokenSplitter(chunk_size, chunk_overlap=0, token_pattern=None)
Parameters: - chunk_size (int): Max tokens per chunk. Must be > 0. - chunk_overlap (int, optional): Overlap (tokens). Must be < chunk_size. Default: 0. - token_pattern (str, optional): Regex pattern to define token boundaries (e.g., r"\w+|[^\w\s]"). Default: None.
Methods¶
chunks = splitter.split_text(text) # -> List[TextChunk]
all_chunks = splitter.split_texts(texts) # -> List[List[TextChunk]]
size = splitter.chunk_size # -> int
overlap = splitter.chunk_overlap # -> int
Example
splitter = TokenSplitter(256, 32, r"\w+|[^\w\s]")
chunks = splitter.split_text("Tokenize me, please!")
SentenceSplitter¶
Splits by sentences; chunk sizes/counts are in sentences.
Constructor¶
from graphbit import SentenceSplitter
splitter = SentenceSplitter(chunk_size, chunk_overlap=0, sentence_endings=None)
Parameters: - chunk_size (int): Approx. sentences per chunk. Must be > 0. - chunk_overlap (int, optional): Sentence overlap. Must be < chunk_size. Default: 0. - sentence_endings (List[str], optional): Custom sentence terminators (e.g., [".", "!", "?"]). Default: built-in.
Methods¶
chunks = splitter.split_text(text) # -> List[TextChunk]
all_chunks = splitter.split_texts(texts) # -> List[List[TextChunk]]
size = splitter.chunk_size # -> int
overlap = splitter.chunk_overlap # -> int
Example
RecursiveSplitter¶
Hierarchical splitter that prefers large separators first, then backs off to smaller ones.
Constructor¶
from graphbit import RecursiveSplitter
splitter = RecursiveSplitter(chunk_size, chunk_overlap=0, separators=None)
Parameters: - chunk_size (int): Target max characters per chunk. Must be > 0. - chunk_overlap (int, optional): Character overlap. Must be < chunk_size. Default: 0. - separators (List[str], optional): Ordered boundaries to try (e.g., ["\n\n", "\n", ". ", " "]). Default: built-in.
Methods¶
chunks = splitter.split_text(text) # -> List[TextChunk]
all_chunks = splitter.split_texts(texts) # -> List[List[TextChunk]]
size = splitter.chunk_size # -> int
overlap = splitter.chunk_overlap # -> int
seps = splitter.separators # -> List[str]
Example
splitter = RecursiveSplitter(1000, 100, ["\n\n", "\n", ". ", " "])
chunks = splitter.split_text(readme_text)
TextSplitter (Generic, Config-Driven)¶
Creates a splitter from a TextSplitterConfig instance and exposes a common API.
Constructor¶
from graphbit import TextSplitter, TextSplitterConfig
# Example: markdown-aware recursive strategy via config
config = TextSplitterConfig.recursive(1000, 100, ["\n\n", "\n", ". ", " "])
config.set_trim_whitespace(True)
config.set_preserve_word_boundaries(True)
config.set_include_metadata(True)
splitter = TextSplitter(config)
Methods¶
chunks = splitter.split_text(text) # -> List[TextChunk]
all_chunks = splitter.split_texts(texts) # -> List[List[TextChunk]]
# Convenience: produce plain dicts instead of TextChunk objects
docs = splitter.create_documents(text) # -> List[Dict[str, str]]
# each dict: {"content", "start_index", "end_index", "chunk_index"}
Example
from graphbit import TextSplitter, TextSplitterConfig
config = TextSplitterConfig.token(256, 32, r"\w+|[^\w\s]")
splitter = TextSplitter(config)
docs = splitter.create_documents("Split me into token-based chunks.")
print(docs[0]["content"], docs[0]["start_index"], docs[0]["end_index"])
Embeddings¶
EmbeddingConfig¶
Configuration for embedding providers.
Static Methods¶
EmbeddingConfig.openai(api_key, model=None)¶
Create OpenAI embeddings configuration.
from graphbit import EmbeddingConfig
# Basic configuration
config = EmbeddingConfig.openai("your-openai-api-key", "text-embedding-3-small")
# With default model
config = EmbeddingConfig.openai("your-openai-api-key") # Uses default model "text-embedding-3-small"
Parameters: - api_key (str): OpenAI API key - model (str, optional): Model name. Default: "text-embedding-3-small"
EmbeddingClient¶
Client for generating text embeddings.
Constructor¶
EmbeddingClient(config)¶
Create embedding client.
Methods¶
embed(text)¶
Generate embedding for single text.
Parameters: - text (str): Input text
Returns: List[float] - Embedding vector
embed_many(texts)¶
Generate embeddings for multiple texts.
texts = ["Text 1", "Text 2", "Text 3"]
embeddings = client.embed_many(texts)
print(f"Generated {len(embeddings)} embeddings")
Parameters: - texts (List[str]): List of input texts
Returns: List[List[float]] - List of embedding vectors
similarity(a, b) (static)¶
Calculate cosine similarity between two embeddings.
Parameters: - a (List[float]): First embedding vector - b (List[float]): Second embedding vector
Returns: float - Cosine similarity (-1.0 to 1.0)
Workflow Components¶
Node¶
Factory class for creating different types of workflow nodes.
Static Methods¶
Node.agent(name, prompt, agent_id=None, output_name=None, tools=None, system_prompt=None, llm_config=None, temperature=None, max_tokens=None)¶
Create an AI agent node.
from graphbit import Node
agent = Node.agent(
name="Weather Analyzer",
prompt=f"Analyze the weather of: {input}",
agent_id="analyzer", # Optional, auto-generated if not provided
output_name="weather_report", # Optional, custom output name
tools=[get_weather], # Optional, list of tools available to the agent
system_prompt="You are a helpful assistant.", # Optional, system prompt that defines agent behavior and constraints
llm_config=my_llm_config, # Optional, custom LLM configuration for this specific agent
temperature=0.7, # Optional, sampling temperature (0.0-2.0)
max_tokens=500 # Optional, maximum tokens to generate
)
Parameters: - name (str): Human-readable node name - prompt (str): LLM prompt template with variables - agent_id (str, optional): Unique agent identifier. Auto-generated if not provided - output_name (str, optional): Custom name for the node's output - tools (List, optional): List of tools available to the agent - system_prompt (str, optional): System prompt that defines agent behavior and constraints - llm_config (LlmConfig, optional): Custom LLM configuration for this specific agent - temperature (float, optional): Sampling temperature (0.0-2.0) - max_tokens (int, optional): Maximum tokens to generate
Returns: Node instance
Instance Methods¶
id()¶
Get the node ID.
name()¶
Get the node name.
Workflow¶
Represents a complete workflow.
Constructor¶
Workflow(name)¶
Create a new workflow.
Methods¶
add_node(node)¶
Add a node to the workflow.
Parameters: - node (Node): Node to add
Returns: str - Unique node ID
connect(from_id, to_id)¶
Connect two nodes.
Parameters: - from_id (str): Source node ID - to_id (str): Target node ID
validate()¶
Validate the workflow structure.
try:
workflow.validate()
print("Workflow is valid")
except Exception as e:
print(f"Invalid workflow: {e}")
WorkflowResult¶
Contains workflow execution results.
Methods¶
is_success()¶
Check if workflow completed successfully.
is_failed()¶
Check if workflow failed.
state()¶
Get the workflow state.
execution_time_ms()¶
Get execution time in milliseconds.
get_variable(key)¶
Get a variable value.
get_all_variables()¶
Get all variables as a dictionary.
variables()¶
Get all variables as a list of tuples.
Workflow Execution¶
Executor¶
Production-grade workflow executor with comprehensive features.
Constructors¶
Executor(config, lightweight_mode=None, timeout_seconds=None, debug=None)¶
Create a basic executor.
from graphbit import Executor
# Basic executor
executor = Executor(llm_config)
# With configuration
executor = Executor(
llm_config,
lightweight_mode=False,
timeout_seconds=300,
debug=True
)
Parameters: - config (LlmConfig): LLM configuration - lightweight_mode (bool, optional): Enable lightweight mode (low latency) - timeout_seconds (int, optional): Execution timeout (1-3600 seconds) - debug (bool, optional): Enable debug mode
Configuration Methods¶
configure(timeout_seconds=None, max_retries=None, enable_metrics=None, debug=None)¶
Configure the executor with new settings.
set_lightweight_mode(enabled)¶
Legacy method for backward compatibility.
is_lightweight_mode()¶
Check if lightweight mode is enabled.
Execution Methods¶
execute(workflow)¶
Execute a workflow synchronously.
Parameters: - workflow (Workflow): Workflow to execute
Returns: WorkflowResult - Execution result
run_async(workflow)¶
Execute a workflow asynchronously.
import asyncio
async def run_workflow():
result = await executor.run_async(workflow)
return result
result = asyncio.run(run_workflow())
Statistics Methods¶
get_stats()¶
Get comprehensive execution statistics.
stats = executor.get_stats()
print(f"Total executions: {stats['total_executions']}")
print(f"Success rate: {stats['success_rate']}")
print(f"Average duration: {stats['average_duration_ms']}ms")
reset_stats()¶
Reset execution statistics.
get_execution_mode()¶
Get the current execution mode.
Error Handling¶
GraphBit uses standard Python exceptions:
ValueError- Invalid parameters or workflow structureRuntimeError- Execution errorsTimeoutError- Operation timeouts
try:
result = executor.execute(workflow)
except ValueError as e:
print(f"Invalid workflow: {e}")
except RuntimeError as e:
print(f"Execution failed: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Usage Examples¶
Basic Workflow¶
import os
from graphbit import LlmConfig, Workflow, Node, Executor
# Configure LLM
config = LlmConfig.openai(os.getenv("OPENAI_API_KEY"))
# Create workflow
workflow = Workflow("Analysis Workflow")
agent = Node.agent(
"Analyzer",
f"Analyze the following text: {input}"
)
node_id = workflow.add_node(agent)
workflow.validate()
# Execute
executor = Executor(config)
result = executor.execute(workflow)
if result.is_success():
output = result.get_variable("output")
print(f"Analysis result: {output}")
Advanced LLM Usage¶
from graphbit import LlmClient
# Create client with debugging
client = LlmClient(config, debug=True)
# Batch processing
prompts = [
"Summarize: AI is transforming industries",
"Explain: Machine learning algorithms",
"Analyze: The future of automation"
]
import asyncio
async def process_batch():
responses = await client.complete_batch(prompts, max_concurrency=3)
return responses
results = asyncio.run(process_batch())
# Chat conversation
messages = [
("system", "You are a helpful AI assistant"),
("user", "What is machine learning?"),
("assistant", "Machine learning is a subset of AI..."),
("user", "Can you give me an example?")
]
async def chat():
response = await client.chat_optimized(messages, temperature=0.7)
return response
chat_result = asyncio.run(chat())
High-Performance Execution¶
from graphbit import Executor
# Create high-throughput executor
executor = Executor(
llm_config,
timeout_seconds=600,
debug=False
)
# Configure for production
executor.configure(
timeout_seconds=300,
max_retries=3,
enable_metrics=True
)
# Execute workflow
result = executor.execute(workflow)
# Monitor performance
stats = executor.get_stats()
print(f"Execution stats: {stats}")
Embeddings Usage¶
from graphbit import EmbeddingConfig, EmbeddingClient
# Configure embeddings
embed_config = EmbeddingConfig.openai(os.getenv("OPENAI_API_KEY"))
embed_client = EmbeddingClient(embed_config)
# Generate embeddings
texts = ["Hello world", "Goodbye world", "Machine learning"]
embeddings = embed_client.embed_many(texts)
# Calculate similarity
similarity = EmbeddingClient.similarity(embeddings[0], embeddings[1])
print(f"Similarity between texts: {similarity}")