The Callback system in GraphAI provides a powerful mechanism for streaming data between nodes, particularly useful for handling streaming LLM outputs or other incremental data processing.
Callback Basics
At its core, the Callback is an asyncio-based system that:
- Provides a queue for passing streaming data between components
- Handles special tokens to mark node start/end events
- Structures streaming content for easy consumption by downstream processes
- Can be integrated with any async compatible streaming system
Creating a Callback
The Graph automatically creates a callback when needed, but you can also create and customize one:
from graphai import Callback
# Create a callback with default settings
callback = Callback()
# Create a callback with custom settings
callback = Callback(
identifier="custom_id", # Used for special tokens
special_token_format="<{identifier}:{token}:{params}>", # Format for special tokens
token_format="{token}" # Format for regular tokens
)
Callback In Nodes
To use callbacks in a node, mark it with stream=True
:
from graphai import node
@node(stream=True)
async def streaming_node(input: dict, callback):
"""This node receives a callback parameter because stream=True."""
# Process input
for chunk in process_chunks(input["data"]):
# Stream output chunks
await callback.acall(chunk)
# Return final result
return {"result": "streaming complete"}
Important points:
- The
stream=True
parameter tells GraphAI to inject a callback
- The node must have a
callback
parameter
- The callback can be used to stream output chunks
Streaming from LLMs
A common use case is streaming output from an LLM:
@node(stream=True)
async def llm_node(input: dict, callback):
"""Stream output from an LLM."""
from openai import AsyncOpenAI
client = AsyncOpenAI()
# Start streaming response
stream = await client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": input["query"]}
],
stream=True
)
response_text = ""
# Stream chunks through the callback
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
response_text += content
await callback.acall(content)
# Return the complete response
return {"response": response_text}
Callback Methods
The Callback provides several key methods:
Streaming Content
# Synchronous callback (use in sync contexts)
callback(token="chunk of text")
# Async callback (preferred)
await callback.acall(token="chunk of text")
Node Management
# Mark the start of a node
await callback.start_node(node_name="my_node")
# Mark the end of a node
await callback.end_node(node_name="my_node")
# Close the callback stream
await callback.close()
Consuming a Callback Stream
You can consume a callback’s stream using its async iterator:
async def consume_stream(callback):
async for token in callback.aiter():
# Process each token
print(token, end="", flush=True)
This is especially useful for web applications that need to provide real-time updates.
Special Tokens
GraphAI uses special tokens to mark events in the stream:
<graphai:node_name:start> # Marks the start of a node
<graphai:node_name> # Identifies the node
<graphai:node_name:end> # Marks the end of a node
<graphai:END> # Marks the end of the stream
These tokens can be customized using the special_token_format
parameter.
Example: Web Server with Streaming
Here’s how to use callbacks with a FastAPI server:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from graphai import Graph, node
import asyncio
app = FastAPI()
@node(start=True, stream=True)
async def llm_stream(input: dict, callback):
# Streaming LLM implementation...
for chunk in get_llm_response(input["query"]):
await callback.acall(chunk)
return {"status": "complete"}
@node(end=True)
async def final_node(input: dict):
return {"status": "success"}
# Create graph
graph = Graph()
graph.add_node(llm_stream())
graph.add_node(final_node())
graph.add_edge(llm_stream, final_node)
@app.post("/stream")
async def stream_endpoint(request: Request):
data = await request.json()
# Get a callback from the graph
callback = graph.get_callback()
# Start graph execution in background
asyncio.create_task(graph.execute({"query": data["query"]}))
# Return streaming response
return StreamingResponse(
callback.aiter(),
media_type="text/event-stream"
)
Callback Configuration
You can customize the callback’s behavior:
callback = Callback(
# Custom identifier (default is "graphai")
identifier="myapp",
# Custom format for special tokens
special_token_format="<[{identifier}|{token}|{params}]>",
# Custom format for regular tokens
token_format="TOKEN: {token}"
)
Advanced: Custom Processing of Special Tokens
You can implement custom processing of special tokens:
async def process_stream(callback):
current_node = None
buffer = ""
async for token in callback.aiter():
# Check if it's a special token
if token.startswith("<graphai:") and token.endswith(">"):
# Handle node start
if ":start" in token:
node_name = token.split(":")[1].split(":start")[0]
current_node = node_name
# Handle node start event
# Handle node end
elif ":end" in token:
# Handle node end event
current_node = None
# Handle stream end
elif token == "<graphai:END>":
# Handle end of stream
break
# Regular token
else:
# Process regular token
buffer += token
# Maybe do something with buffer
return buffer
Best Practices
- Use async whenever possible: The callback system is built on asyncio
- Close the callback when done: Always call
await callback.close()
when finished
- Keep streaming chunks small: Don’t stream large objects; break them into manageable chunks
- Handle special tokens correctly: When consuming streams, handle special tokens appropriately
Next Steps
- Learn about Graphs for orchestrating node execution
- Explore Nodes for processing logic
- Check out State for maintaining context across nodes