LangGraph Integration¶
Integrate Identity OS with LangGraph for behavioral control over graph-based agent workflows.
Overview¶
LangGraph excels at managing complex, state-based workflows with explicit decision points. Identity OS integrates at the node level — each node transition is an observation opportunity.
LangGraph Node Transition
↓
[Infer Behavioral Mode]
↓
Identity OS Process
↓
[Get ExecutionContract]
↓
[Guard Next Node Choice]
Installation¶
Basic Setup¶
1. Initialize Clients¶
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage
from identity_os_sdk import IdentityOS, Mode
# Identity OS client
identity_client = IdentityOS(api_key="idos_sk_xxx")
# Create agent instance
agent_instance = identity_client.instances.create(
name="LangGraphAgent",
description="Agent with behavioral control"
)
# LangGraph setup
graph = StateGraph(AgentState)
2. Define State¶
from typing import TypedDict, List
class AgentState(TypedDict):
messages: List[BaseMessage]
next_action: str
contract: Optional[ExecutionContract]
instance_id: str
3. Create Nodes with Behavioral Observation¶
def thinking_node(state: AgentState):
"""Node where agent decides what to do"""
instance_id = state["instance_id"]
# Infer behavioral mode from current state
if len(state["messages"]) < 3:
mode = Mode.PERCEPTION # Early, gathering info
else:
mode = Mode.EXPLORATION # Have context, exploring options
# Send observation to Identity OS
result = identity_client.engine.process(
instance_id=instance_id,
mode_target=mode,
signal_strength=0.8,
confidence=0.85
)
# Store contract in state for downstream use
state["contract"] = result.contract
return state
4. Create Action Node with Contract Guard¶
def action_node(state: AgentState):
"""Node where agent takes action, guarded by contract"""
contract = state["contract"]
instance_id = state["instance_id"]
# Decide action
possible_actions = ["search", "reason", "ask_user", "synthesize"]
# Filter by contract
allowed = [a for a in possible_actions if a in contract.allowed_actions]
if not allowed:
# No allowed actions, defer or stabilize
action = "ask_user" # Safe fallback
else:
# Choose best action from allowed set
action = choose_best(allowed)
# Execute action
if action == "search":
# Call search tool
results = search_tool.invoke({})
state["messages"].append(HumanMessage(content=results))
elif action == "reason":
# Call reasoning
state["messages"].append(HumanMessage(content="Reasoning..."))
# ... etc
# Send execution observation
identity_client.engine.process(
instance_id=instance_id,
mode_target=Mode.ASSERTION,
signal_strength=0.9, # High confidence in execution
confidence=0.95
)
return state
5. Build Graph with Conditional Edges¶
# Add nodes
graph.add_node("think", thinking_node)
graph.add_node("act", action_node)
graph.add_node("reflect", reflection_node)
# Conditional routing based on contract
def route_based_contract(state: AgentState):
"""Route next node based on execution contract"""
contract = state["contract"]
instance_id = state["instance_id"]
# If stressed, focus on stabilization
if contract.stress_state in ["HIGH", "OVER"]:
return "reflect" # Go to recovery node
# If energy is low, wind down
if contract.energy_level < 0.2:
return "reflect"
# Otherwise, continue thinking and acting
return "think"
# Add edges
graph.add_edge("think", "act")
graph.add_conditional_edges(
"act",
route_based_contract,
{
"think": "think",
"reflect": "reflect",
}
)
graph.add_edge("reflect", END)
# Compile
compiled_graph = graph.compile()
Complete Example¶
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.tools import tool
from identity_os_sdk import IdentityOS, Mode
from typing import TypedDict, List, Optional
# Initialize Identity OS
identity_client = IdentityOS(api_key="idos_sk_xxx")
# Create instance
agent_instance = identity_client.instances.create(
name="ResearchAgent",
description="Agent that researches topics"
)
# Define state
class ResearchState(TypedDict):
topic: str
messages: List[BaseMessage]
search_results: List[str]
contract: Optional[dict]
instance_id: str
research_depth: int
# Define tools
@tool
def search(query: str) -> str:
"""Search for information"""
return f"Found 5 results for '{query}'"
@tool
def synthesize(results: List[str]) -> str:
"""Synthesize research findings"""
return "Synthesis: " + ", ".join(results)
# Define nodes
def research_node(state: ResearchState) -> ResearchState:
"""Determine research direction"""
instance_id = state["instance_id"]
# Shallow research = more order, deep = more exploration
if state["research_depth"] < 2:
mode = Mode.ORDER # Systematic
else:
mode = Mode.EXPLORATION # Going deeper
result = identity_client.engine.process(
instance_id=instance_id,
mode_target=mode,
signal_strength=0.75,
context={"depth": state["research_depth"]}
)
state["contract"] = result.contract
return state
def search_node(state: ResearchState) -> ResearchState:
"""Execute search, guard with contract"""
contract = state["contract"]
# Check if search is allowed
if "explore" not in contract.allowed_actions:
state["messages"].append(
HumanMessage(
content="Cannot explore new areas now, using cached results"
)
)
return state
# Perform search
query = state["topic"]
results = search.invoke({"query": query})
state["search_results"].append(results)
state["messages"].append(HumanMessage(content=results))
state["research_depth"] += 1
# Send observation
identity_client.engine.process(
instance_id=state["instance_id"],
mode_target=Mode.EXPLORATION,
signal_strength=0.85,
confidence=0.9
)
return state
def decide_continue(state: ResearchState) -> str:
"""Decide if research should continue"""
contract = state["contract"]
# Too stressed or low energy
if contract.stress_state in ["HIGH", "OVER"]:
return "synthesize"
if contract.energy_level < 0.3:
return "synthesize"
# Enough depth or energy running low
if state["research_depth"] >= 3:
return "synthesize"
# Continue researching
return "search"
def synthesize_node(state: ResearchState) -> ResearchState:
"""Synthesize findings"""
results = state["search_results"]
final_answer = synthesize.invoke({"results": results})
state["messages"].append(HumanMessage(content=final_answer))
# Final observation
identity_client.engine.process(
instance_id=state["instance_id"],
mode_target=Mode.ASSERTION,
signal_strength=0.8,
confidence=0.95
)
return state
# Build graph
graph = StateGraph(ResearchState)
graph.add_node("research", research_node)
graph.add_node("search", search_node)
graph.add_node("synthesize", synthesize_node)
graph.set_entry_point("research")
graph.add_edge("research", "search")
graph.add_conditional_edges(
"search",
decide_continue,
{
"search": "search",
"synthesize": "synthesize"
}
)
graph.add_edge("synthesize", END)
# Compile and run
compiled = graph.compile()
initial_state = {
"topic": "climate change impacts",
"messages": [],
"search_results": [],
"contract": None,
"instance_id": agent_instance.id,
"research_depth": 0
}
result = compiled.invoke(initial_state)
print("Final messages:")
for msg in result["messages"]:
print(f" {msg.content}")
print(f"\nFinal stress: {result['contract'].stress_state}")
print(f"Final energy: {result['contract'].energy_level}")
Handling Stress Recovery¶
When the agent enters HIGH or OVER stress:
def recovery_node(state: ResearchState) -> ResearchState:
"""Help agent recover from stress"""
instance_id = state["instance_id"]
contract = state["contract"]
if contract.stress_state == "OVER":
# Emergency: ask for help
state["messages"].append(
HumanMessage(content="I need help, too many errors")
)
return state
# Send recovery observation
recovery_profile = contract.recovery_profile
if recovery_profile["preferred_path"] == "gradual":
# Gradual recovery: simple, structured tasks
mode = Mode.ORDER
signal = 0.6
else:
# Aggressive recovery: push harder
mode = Mode.ASSERTION
signal = 0.8
identity_client.engine.process(
instance_id=instance_id,
mode_target=mode,
signal_strength=signal,
context={"recovery": True}
)
return state
Monitoring & Logging¶
import logging
logger = logging.getLogger("langgraph_identity")
def monitored_node(state):
"""Node with monitoring"""
result = identity_client.engine.process(
instance_id=state["instance_id"],
mode_target=Mode.EXPLORATION,
signal_strength=0.8
)
# Log behavioral state
logger.info(
"agent_step",
extra={
"stress": result.contract.stress_state,
"energy": result.contract.energy_level,
"allowed_actions": result.contract.allowed_actions,
"dominant_modes": result.contract.dominant_modes
}
)
# Alert on critical conditions
if result.contract.stress_state == "OVER":
logger.critical("Agent stress critical, recommend intervention")
state["contract"] = result.contract
return state
Best Practices¶
- Observe at decision points — Add observations at each critical node
- Guard action selection — Always check contract before executing actions
- Implement recovery — Have explicit recovery nodes for high stress
- Monitor energy — Track energy level and reduce complexity when low
- Use context — Pass relevant metadata to help Identity OS understand the situation
Troubleshooting¶
Contract is null in a node
Make sure the previous node (that calls engine.process) actually stores the contract in state.
Actions are too restricted
Your contract may be in HIGH/OVER stress. Check the stress state and implement stress recovery. Or, lower your signal_strength values to prevent stress.
Rapid calls are being throttled (no state change)
If you're calling engine.process in quick succession and seeing no state progression, the minimum cycle interval throttle may be dropping your calls. Pass an explicit timestamp to bypass wall-clock throttle:
Graph is too slow
Each engine.process call adds ~2ms. Use batch processing:
Next Steps¶
- CrewAI Integration — Multi-agent approach
- Concepts — Understand modes and stress
- API Reference — Full endpoint documentation