NodeProtocol Objects

class NodeProtocol(Protocol)
Protocol defining the interface of a decorated node.

Graph Objects

class Graph()

get_state

def get_state() -> dict[str, Any]
Get the current graph state. Returns: The current graph state.

set_state

def set_state(state: dict[str, Any]) -> Graph
Set the graph state. Arguments:
  • state - The new state to set for the graph.
Returns: The graph instance.

update_state

def update_state(values: dict[str, Any]) -> Graph
Update the graph state with new values. Arguments:
  • values - The new values to update the graph state with.
Returns: The graph instance.

reset_state

def reset_state() -> Graph
Reset the graph state to an empty dict.

add_node

def add_node(node: NodeProtocol) -> Graph
Adds a node to the graph. Arguments:
  • node - The node to add to the graph.
Raises:
  • Exception - If a node with the same name already exists in the graph.

add_edge

def add_edge(source: NodeProtocol | str,
             destination: NodeProtocol | str) -> Graph
Adds an edge between two nodes that already exist in the graph. Arguments:
  • source - The source node or its name.
  • destination - The destination node or its name.

add_router

def add_router(sources: list[NodeProtocol], router: NodeProtocol,
               destinations: list[NodeProtocol]) -> Graph
Adds a router node, allowing for a decision to be made on which branch to follow based on the choice output of the router node. Arguments:
  • sources - The list of source nodes for the router.
  • router - The router node.
  • destinations - The list of destination nodes for the router.

compile

def compile(*, strict: bool = False) -> Graph
Validate the graph:
  • exactly one start node present (or Graph.start_node set)
  • at least one end node present
  • all edges reference known nodes
  • all nodes reachable from the start (optional) no cycles when strict=True Returns self on success; raises GraphCompileError otherwise.

execute_many

async def execute_many(inputs: Iterable[dict[str, Any]],
                       *,
                       concurrency: int = 5) -> list[Any]
Execute the graph on many inputs concurrently. Arguments:
  • inputs (Iterable[dict]): An iterable of input dicts to feed into the graph.
  • concurrency (int): Maximum number of graph executions to run at once.
  • state (Optional[Any]): Optional shared state to pass to each execution. If you want isolated state per execution, pass None and the graph’s normal semantics will apply.
Returns: list[Any]: The list of results in the same order as inputs.

get_callback

def get_callback()
Get a new instance of the callback class. Returns: Callback: A new instance of the callback class.

set_callback

def set_callback(callback_class: type[Callback]) -> "Graph"
Set the callback class that is returned by the get_callback method and used as the default callback when no callback is passed to the execute method. Arguments:
  • callback_class (type[Callback]): The callback class to use as the default callback.

add_parallel

def add_parallel(source: NodeProtocol | str,
                 destinations: list[NodeProtocol | str])
Add multiple outgoing edges from a single source node to be executed in parallel. Arguments:
  • source - The source node for the parallel branches.
  • destinations - The list of destination nodes for the parallel branches.

add_join

def add_join(sources: list[NodeProtocol | str],
             destination: NodeProtocol | str)
Joins multiple parallel branches into a single branch. Arguments:
  • sources - The list of source nodes for the join.
  • destination - The destination node for the join.

visualize

def visualize(*, save_path: str | None = None)
Render the current graph. If matplotlib is not installed, raise a helpful error telling users to install the viz extra. Optionally save to a file via save_path.