Skip to content

LangGraph Integration

Integrate Identity OS with LangGraph for behavioral control over graph-based agent workflows.

Overview

LangGraph excels at managing complex, state-based workflows with explicit decision points. Identity OS integrates at the node level — each node transition is an observation opportunity.

LangGraph Node Transition
[Infer Behavioral Mode]
Identity OS Process
[Get ExecutionContract]
[Guard Next Node Choice]

Installation

pip install langgraph identity-os-sdk

Basic Setup

1. Initialize Clients

from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage
from identity_os_sdk import IdentityOS, Mode

# Identity OS client
identity_client = IdentityOS(api_key="idos_sk_xxx")

# Create agent instance
agent_instance = identity_client.instances.create(
    name="LangGraphAgent",
    description="Agent with behavioral control"
)

# LangGraph setup
graph = StateGraph(AgentState)

2. Define State

from typing import TypedDict, List

class AgentState(TypedDict):
    messages: List[BaseMessage]
    next_action: str
    contract: Optional[ExecutionContract]
    instance_id: str

3. Create Nodes with Behavioral Observation

def thinking_node(state: AgentState):
    """Node where agent decides what to do"""
    instance_id = state["instance_id"]

    # Infer behavioral mode from current state
    if len(state["messages"]) < 3:
        mode = Mode.PERCEPTION  # Early, gathering info
    else:
        mode = Mode.EXPLORATION  # Have context, exploring options

    # Send observation to Identity OS
    result = identity_client.engine.process(
        instance_id=instance_id,
        mode_target=mode,
        signal_strength=0.8,
        confidence=0.85
    )

    # Store contract in state for downstream use
    state["contract"] = result.contract

    return state

4. Create Action Node with Contract Guard

def action_node(state: AgentState):
    """Node where agent takes action, guarded by contract"""
    contract = state["contract"]
    instance_id = state["instance_id"]

    # Decide action
    possible_actions = ["search", "reason", "ask_user", "synthesize"]

    # Filter by contract
    allowed = [a for a in possible_actions if a in contract.allowed_actions]

    if not allowed:
        # No allowed actions, defer or stabilize
        action = "ask_user"  # Safe fallback
    else:
        # Choose best action from allowed set
        action = choose_best(allowed)

    # Execute action
    if action == "search":
        # Call search tool
        results = search_tool.invoke({})
        state["messages"].append(HumanMessage(content=results))
    elif action == "reason":
        # Call reasoning
        state["messages"].append(HumanMessage(content="Reasoning..."))
    # ... etc

    # Send execution observation
    identity_client.engine.process(
        instance_id=instance_id,
        mode_target=Mode.ASSERTION,
        signal_strength=0.9,  # High confidence in execution
        confidence=0.95
    )

    return state

5. Build Graph with Conditional Edges

# Add nodes
graph.add_node("think", thinking_node)
graph.add_node("act", action_node)
graph.add_node("reflect", reflection_node)

# Conditional routing based on contract
def route_based_contract(state: AgentState):
    """Route next node based on execution contract"""
    contract = state["contract"]
    instance_id = state["instance_id"]

    # If stressed, focus on stabilization
    if contract.stress_state in ["HIGH", "OVER"]:
        return "reflect"  # Go to recovery node

    # If energy is low, wind down
    if contract.energy_level < 0.2:
        return "reflect"

    # Otherwise, continue thinking and acting
    return "think"

# Add edges
graph.add_edge("think", "act")
graph.add_conditional_edges(
    "act",
    route_based_contract,
    {
        "think": "think",
        "reflect": "reflect",
    }
)
graph.add_edge("reflect", END)

# Compile
compiled_graph = graph.compile()

Complete Example

from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.tools import tool
from identity_os_sdk import IdentityOS, Mode
from typing import TypedDict, List, Optional

# Initialize Identity OS
identity_client = IdentityOS(api_key="idos_sk_xxx")

# Create instance
agent_instance = identity_client.instances.create(
    name="ResearchAgent",
    description="Agent that researches topics"
)

# Define state
class ResearchState(TypedDict):
    topic: str
    messages: List[BaseMessage]
    search_results: List[str]
    contract: Optional[dict]
    instance_id: str
    research_depth: int

# Define tools
@tool
def search(query: str) -> str:
    """Search for information"""
    return f"Found 5 results for '{query}'"

@tool
def synthesize(results: List[str]) -> str:
    """Synthesize research findings"""
    return "Synthesis: " + ", ".join(results)

# Define nodes
def research_node(state: ResearchState) -> ResearchState:
    """Determine research direction"""
    instance_id = state["instance_id"]

    # Shallow research = more order, deep = more exploration
    if state["research_depth"] < 2:
        mode = Mode.ORDER  # Systematic
    else:
        mode = Mode.EXPLORATION  # Going deeper

    result = identity_client.engine.process(
        instance_id=instance_id,
        mode_target=mode,
        signal_strength=0.75,
        context={"depth": state["research_depth"]}
    )

    state["contract"] = result.contract
    return state

def search_node(state: ResearchState) -> ResearchState:
    """Execute search, guard with contract"""
    contract = state["contract"]

    # Check if search is allowed
    if "explore" not in contract.allowed_actions:
        state["messages"].append(
            HumanMessage(
                content="Cannot explore new areas now, using cached results"
            )
        )
        return state

    # Perform search
    query = state["topic"]
    results = search.invoke({"query": query})
    state["search_results"].append(results)
    state["messages"].append(HumanMessage(content=results))
    state["research_depth"] += 1

    # Send observation
    identity_client.engine.process(
        instance_id=state["instance_id"],
        mode_target=Mode.EXPLORATION,
        signal_strength=0.85,
        confidence=0.9
    )

    return state

def decide_continue(state: ResearchState) -> str:
    """Decide if research should continue"""
    contract = state["contract"]

    # Too stressed or low energy
    if contract.stress_state in ["HIGH", "OVER"]:
        return "synthesize"

    if contract.energy_level < 0.3:
        return "synthesize"

    # Enough depth or energy running low
    if state["research_depth"] >= 3:
        return "synthesize"

    # Continue researching
    return "search"

def synthesize_node(state: ResearchState) -> ResearchState:
    """Synthesize findings"""
    results = state["search_results"]
    final_answer = synthesize.invoke({"results": results})
    state["messages"].append(HumanMessage(content=final_answer))

    # Final observation
    identity_client.engine.process(
        instance_id=state["instance_id"],
        mode_target=Mode.ASSERTION,
        signal_strength=0.8,
        confidence=0.95
    )

    return state

# Build graph
graph = StateGraph(ResearchState)

graph.add_node("research", research_node)
graph.add_node("search", search_node)
graph.add_node("synthesize", synthesize_node)

graph.set_entry_point("research")
graph.add_edge("research", "search")
graph.add_conditional_edges(
    "search",
    decide_continue,
    {
        "search": "search",
        "synthesize": "synthesize"
    }
)
graph.add_edge("synthesize", END)

# Compile and run
compiled = graph.compile()

initial_state = {
    "topic": "climate change impacts",
    "messages": [],
    "search_results": [],
    "contract": None,
    "instance_id": agent_instance.id,
    "research_depth": 0
}

result = compiled.invoke(initial_state)

print("Final messages:")
for msg in result["messages"]:
    print(f"  {msg.content}")

print(f"\nFinal stress: {result['contract'].stress_state}")
print(f"Final energy: {result['contract'].energy_level}")

Handling Stress Recovery

When the agent enters HIGH or OVER stress:

def recovery_node(state: ResearchState) -> ResearchState:
    """Help agent recover from stress"""
    instance_id = state["instance_id"]
    contract = state["contract"]

    if contract.stress_state == "OVER":
        # Emergency: ask for help
        state["messages"].append(
            HumanMessage(content="I need help, too many errors")
        )
        return state

    # Send recovery observation
    recovery_profile = contract.recovery_profile

    if recovery_profile["preferred_path"] == "gradual":
        # Gradual recovery: simple, structured tasks
        mode = Mode.ORDER
        signal = 0.6
    else:
        # Aggressive recovery: push harder
        mode = Mode.ASSERTION
        signal = 0.8

    identity_client.engine.process(
        instance_id=instance_id,
        mode_target=mode,
        signal_strength=signal,
        context={"recovery": True}
    )

    return state

Monitoring & Logging

import logging

logger = logging.getLogger("langgraph_identity")

def monitored_node(state):
    """Node with monitoring"""
    result = identity_client.engine.process(
        instance_id=state["instance_id"],
        mode_target=Mode.EXPLORATION,
        signal_strength=0.8
    )

    # Log behavioral state
    logger.info(
        "agent_step",
        extra={
            "stress": result.contract.stress_state,
            "energy": result.contract.energy_level,
            "allowed_actions": result.contract.allowed_actions,
            "dominant_modes": result.contract.dominant_modes
        }
    )

    # Alert on critical conditions
    if result.contract.stress_state == "OVER":
        logger.critical("Agent stress critical, recommend intervention")

    state["contract"] = result.contract
    return state

Best Practices

  1. Observe at decision points — Add observations at each critical node
  2. Guard action selection — Always check contract before executing actions
  3. Implement recovery — Have explicit recovery nodes for high stress
  4. Monitor energy — Track energy level and reduce complexity when low
  5. Use context — Pass relevant metadata to help Identity OS understand the situation

Troubleshooting

Contract is null in a node

Make sure the previous node (that calls engine.process) actually stores the contract in state.

Actions are too restricted

Your contract may be in HIGH/OVER stress. Check the stress state and implement stress recovery. Or, lower your signal_strength values to prevent stress.

Rapid calls are being throttled (no state change)

If you're calling engine.process in quick succession and seeing no state progression, the minimum cycle interval throttle may be dropping your calls. Pass an explicit timestamp to bypass wall-clock throttle:

from datetime import datetime, timedelta

base = datetime.utcnow()
for i, obs in enumerate(observations):
    result = identity_client.engine.process(
        instance_id=instance_id,
        mode_target=obs.mode,
        signal_strength=obs.strength,
        timestamp=base + timedelta(seconds=i * 2),
    )
The built-in LangGraph adapter handles this automatically when using remote mode.

Graph is too slow

Each engine.process call adds ~2ms. Use batch processing:

# Instead of 3 calls
observations = [...]
result = identity_client.engine.process_batch(
    instance_id,
    observations=observations,
    include_intermediate=False
)

Next Steps