Nodes are the fundamental processing units in GraphAI. They encapsulate discrete pieces of functionality and can be connected to form complex workflows.
A node in GraphAI is created by decorating an async function with the @node decorator:
Copy
Ask AI
from graphai import node@nodeasync def process_data(input: dict): # Process the input data result = do_something(input["data"]) return {"output": result}
All nodes must be async functions that:
Accept at least an input dictionary
Return a dictionary containing the processed results
Nodes that need to stream data (like LLM outputs) can use the stream parameter:
Copy
Ask AI
@node(stream=True)async def streaming_node(input: dict, callback): # Process with streaming for chunk in process_chunks(input["data"]): await callback.acall(chunk) return {"result": "streaming complete"}
Streaming nodes receive a callback parameter that can be used to stream data.
Nodes can access the graph’s state by adding a state parameter:
Copy
Ask AI
@nodeasync def stateful_node(input: dict, state: dict): # Access state history = state.get("history", []) # Process with state awareness result = process_with_history(input["data"], history) # Return updated state (will be merged with current state) return {"result": result, "history": history + [result]}
Routers are special nodes that determine the next node to execute:
Copy
Ask AI
from graphai import router@routerasync def route_based_on_content(input: dict): # Analyze input and decide on next node if "query" in input and "question" in input["query"].lower(): return {"choice": "question_node", "query": input["query"]} else: return {"choice": "statement_node", "statement": input["query"]}
Routers must return a dictionary with a "choice" key containing the name of the next node to execute.
Routers are often implemented using LLMs for intelligent routing:
Copy
Ask AI
@routerasync def llm_router(input: dict): from semantic_router.llms import OpenAILLM from semantic_router.schema import Message import openai from pydantic import BaseModel, Field class SearchRoute(BaseModel): query: str = Field(description="Route to search when needing external information") class MemoryRoute(BaseModel): query: str = Field(description="Route to memory when information is likely known") llm = OpenAILLM(name="gpt-4") messages = [ Message(role="system", content="Select the best route for the user query."), Message(role="user", content=input["query"]) ] response = llm( messages=messages, function_schemas=[ openai.pydantic_function_tool(SearchRoute), openai.pydantic_function_tool(MemoryRoute) ] ) # Parse response to get route choice import ast choice = ast.literal_eval(response)[0] return { "choice": choice["function_name"].lower(), "input": {**input, **choice["arguments"]} }
GraphAI automatically handles parameter mapping, so you only need to declare the parameters your node uses:
Copy
Ask AI
@nodeasync def selective_processor(query: str, metadata: dict = None): # Only uses query and metadata from the input # Other fields in the input dictionary are ignored result = process(query, metadata) return {"result": result}
The node will only receive the parameters it declares in its signature.
GraphAI validates that nodes receive the required parameters:
Copy
Ask AI
@nodeasync def validated_node(required_param: str, optional_param: int = 0): # Will raise an error if required_param is not provided return {"result": process(required_param, optional_param)}
If a node’s required parameters are missing, the graph execution will fail with a detailed error message.