CrewAI Integration¶
Integrate Identity OS with CrewAI for behavioral control over multi-agent orchestration.
Overview¶
CrewAI is designed for multi-agent collaboration where specialized agents work together on tasks. Identity OS integrates at the agent level — each agent has its own instance and behavioral profile.
CrewAI Task Assignment
↓
[Agent Processes Task]
↓
[Infer Behavioral Mode from Task]
↓
Identity OS Process
↓
[Get ExecutionContract]
↓
[Adapt Agent Behavior]
Installation¶
Basic Setup¶
1. Initialize Clients¶
from crewai import Agent, Task, Crew, Process
from identity_os_sdk import IdentityOS, Mode
# Identity OS client
identity_client = IdentityOS(api_key="idos_sk_xxx")
# Create instances for each agent
researcher_instance = identity_client.instances.create(
name="Researcher",
description="Gathers and analyzes information"
)
writer_instance = identity_client.instances.create(
name="Writer",
description="Synthesizes findings into content"
)
analyst_instance = identity_client.instances.create(
name="Analyst",
description="Reviews and validates content"
)
# Store mapping
agent_instances = {
"researcher": researcher_instance.id,
"writer": writer_instance.id,
"analyst": analyst_instance.id
}
2. Wrap Agent Execution¶
def get_constrained_agent(
name: str,
role: str,
goal: str,
instance_id: str
) -> Agent:
"""Create agent with Identity OS constraint wrapper"""
# Original execute function
original_execute = Agent.execute
def constrained_execute(self, task):
"""Wrapped execute with behavioral constraints"""
# Get current contract
contract = identity_client.engine.get_contract(instance_id)
# Map task to behavioral mode
if "research" in task.description.lower():
mode = Mode.EXPLORATION
elif "analyze" in task.description.lower():
mode = Mode.PERCEPTION
elif "write" in task.description.lower():
mode = Mode.ASSERTION
else:
mode = Mode.ORDER
# Send observation
result = identity_client.engine.process(
instance_id=instance_id,
mode_target=mode,
signal_strength=0.8,
context={"task": task.description}
)
contract = result.contract
# Check if task is allowed
if contract.stress_state in ["HIGH", "OVER"]:
# Under stress, simplify task
task.description = f"(Simplified) {task.description}"
if contract.energy_level < 0.3:
# Low energy, reduce scope
task.description = f"(Quick version) {task.description}"
# Execute with constraints
try:
output = original_execute(self, task)
# Send success observation
identity_client.engine.process(
instance_id=instance_id,
mode_target=Mode.ASSERTION,
signal_strength=0.85,
confidence=0.9,
context={"success": True}
)
return output
except Exception as e:
# Send failure observation
identity_client.engine.process(
instance_id=instance_id,
mode_target=Mode.STRESS_RESPONSE,
signal_strength=0.8,
confidence=0.9,
context={"error": str(e)}
)
if contract.stress_state == "OVER":
# Too stressed, escalate
raise
else:
# Retry with simpler approach
return f"Error encountered: {str(e)}. Recommending review."
# Monkey-patch execute
Agent.execute = constrained_execute
return Agent(
name=name,
role=role,
goal=goal,
allow_delegation=False,
verbose=True
)
# Create agents with behavioral control
researcher = get_constrained_agent(
name="Researcher",
role="Senior Research Analyst",
goal="Find and analyze relevant information",
instance_id=agent_instances["researcher"]
)
writer = get_constrained_agent(
name="Writer",
role="Content Writer",
goal="Write clear, engaging content",
instance_id=agent_instances["writer"]
)
analyst = get_constrained_agent(
name="Analyst",
role="Quality Analyst",
goal="Verify accuracy and completeness",
instance_id=agent_instances["analyst"]
)
3. Define Tasks¶
from crewai import Task
research_task = Task(
description="""
Research the topic of AI agent behavioral drift.
Find 3-5 reliable sources.
Summarize key findings.
""",
agent=researcher,
expected_output="Research summary with sources"
)
writing_task = Task(
description="""
Based on the research, write a 500-word article
about AI agent behavioral drift.
Make it engaging and accessible.
""",
agent=writer,
expected_output="Complete article"
)
analysis_task = Task(
description="""
Review the article for:
- Accuracy of claims
- Completeness of coverage
- Clarity of writing
Provide feedback or approve.
""",
agent=analyst,
expected_output="Quality assessment"
)
4. Create and Run Crew¶
crew = Crew(
agents=[researcher, writer, analyst],
tasks=[research_task, writing_task, analysis_task],
process=Process.sequential, # One task at a time
verbose=True
)
# Run with monitoring
result = crew.kickoff()
# Check agent health
for agent_name, instance_id in agent_instances.items():
snapshot = identity_client.engine.get_snapshot(instance_id)
print(f"\n{agent_name}:")
print(f" Stress: {snapshot.stress_state}")
print(f" Energy: {snapshot.energy_level}")
print(f" Cycles: {snapshot.cycle_count}")
print(f"\nFinal output:\n{result}")
Complete Multi-Agent Example¶
from crewai import Agent, Task, Crew, Process
from identity_os_sdk import IdentityOS, Mode
# Initialize
identity_client = IdentityOS(api_key="idos_sk_xxx")
# Create instances
instances = {
"planner": identity_client.instances.create(
name="Planner",
description="Plans strategy"
).id,
"executor": identity_client.instances.create(
name="Executor",
description="Executes plans"
).id,
"reviewer": identity_client.instances.create(
name="Reviewer",
description="Reviews outcomes"
).id
}
# Create agents
def create_constrained_agent(name, role, instance_id):
def execute_task(task):
# Check contract
contract = identity_client.engine.get_contract(instance_id)
# Infer mode
if "plan" in task.description.lower():
mode = Mode.PERCEPTION
elif "execute" in task.description.lower():
mode = Mode.ASSERTION
else:
mode = Mode.ORDER
# Process
result = identity_client.engine.process(
instance_id=instance_id,
mode_target=mode,
signal_strength=0.8,
context={"task": task.description[:50]}
)
contract = result.contract
print(f"\n[{name}] Stress: {contract.stress_state}, Energy: {contract.energy_level}")
# Guard actions
if contract.stress_state == "OVER":
return "Cannot complete under current stress, escalating."
# Simulate task execution
return f"{name} completed: {task.description[:30]}..."
agent = Agent(
name=name,
role=role,
goal=f"Complete {name.lower()} tasks",
allow_delegation=False,
verbose=False
)
# Override execute
agent._execute_task = execute_task
return agent
planner = create_constrained_agent(
"Planner",
"Strategic Planner",
instances["planner"]
)
executor = create_constrained_agent(
"Executor",
"Task Executor",
instances["executor"]
)
reviewer = create_constrained_agent(
"Reviewer",
"Quality Reviewer",
instances["reviewer"]
)
# Define workflow
plan_task = Task(
description="Create a plan for launching a new feature",
agent=planner,
expected_output="Detailed launch plan"
)
exec_task = Task(
description="Execute the launch plan",
agent=executor,
expected_output="Execution report"
)
review_task = Task(
description="Review the launch outcome",
agent=reviewer,
expected_output="Quality assessment"
)
# Run crew
crew = Crew(
agents=[planner, executor, reviewer],
tasks=[plan_task, exec_task, review_task],
process=Process.sequential
)
print("=== Starting Multi-Agent Crew ===")
result = crew.kickoff()
# Final health check
print("\n=== Final Agent Health ===")
for agent_name, instance_id in instances.items():
snap = identity_client.engine.get_snapshot(instance_id)
print(f"{agent_name}: {snap.stress_state} stress, {snap.energy_level:.2f} energy")
Modeling Task Types as Modes¶
Map CrewAI task descriptions to behavioral modes:
| Task Type | Mode | Example |
|---|---|---|
| Information gathering | PERCEPTION | "Research and summarize..." |
| Brainstorming/ideation | EXPLORATION | "Come up with creative solutions..." |
| Following procedures | ORDER | "Execute the standard process..." |
| Executing plans | ASSERTION | "Implement the approved plan..." |
| Collaboration | CONNECTION | "Work with team to..." |
| Quality/consistency | IDENTITY | "Ensure alignment with values..." |
| Troubleshooting | STRESS_RESPONSE | "Handle unexpected issues..." |
Handling Agent Conflict¶
When multiple agents have conflicting outputs:
def resolve_agent_conflict(agent_name_1, agent_name_2, instance_id_1, instance_id_2):
"""Resolve conflicts by checking behavioral contracts"""
# Get both contracts
contract1 = identity_client.engine.get_contract(instance_id_1)
contract2 = identity_client.engine.get_contract(instance_id_2)
# If one is stressed, defer to the other
if contract1.stress_state in ["HIGH", "OVER"]:
return agent_name_2
if contract2.stress_state in ["HIGH", "OVER"]:
return agent_name_1
# If one has higher assertion (confidence), use that
if contract1.dominant_modes.count("Assertion") > contract2.dominant_modes.count("Assertion"):
return agent_name_1
else:
return agent_name_2
# Use in crew
def conflict_resolution_task(agent1_result, agent2_result):
winner = resolve_agent_conflict(
"Agent1", "Agent2",
instances["agent1"],
instances["agent2"]
)
print(f"Using {winner}'s approach")
return winner + "_result"
Monitoring Crew Health¶
def check_crew_health(instances):
"""Monitor overall crew behavioral health"""
alerts = []
for agent_name, instance_id in instances.items():
snap = identity_client.engine.get_snapshot(instance_id)
if snap.stress_state == "OVER":
alerts.append(f"CRITICAL: {agent_name} is in OVER stress")
elif snap.stress_state == "HIGH":
alerts.append(f"WARNING: {agent_name} in HIGH stress")
if snap.energy_level < 0.2:
alerts.append(f"WARNING: {agent_name} energy critical")
if snap.cycle_count > 100 and snap.stability_index < 0.6:
alerts.append(f"WARNING: {agent_name} stability degrading")
return alerts
# Check during execution
alerts = check_crew_health(instances)
if alerts:
for alert in alerts:
print(f"🚨 {alert}")
Agent Specialization¶
Different agents should have different initial behavioral profiles:
# Use built-in persona presets for different agent roles
researcher_instance = identity_client.instances.create(name="Researcher")
identity_client.personas.apply(researcher_instance.id, persona_id="persona_analytical")
writer_instance = identity_client.instances.create(name="Writer")
identity_client.personas.apply(writer_instance.id, persona_id="persona_creative")
analyst_instance = identity_client.instances.create(name="Analyst")
identity_client.personas.apply(analyst_instance.id, persona_id="persona_analytical")
Best Practices¶
- Create one instance per agent — Each agent has its own behavioral profile
- Initialize with specialization — Start agents with mode profiles matching their role
- Monitor crew health — Regularly check for stress or energy issues
- Implement conflict resolution — Use contracts to resolve agent disagreements
- Scale gracefully — Reduce complexity when agents are stressed
Troubleshooting¶
One agent is always stressed
Check: - Is its initial mode profile appropriate for its role? - Is it taking on too many tasks? - Are there cascading failures from other agents?
Solution: Adjust initial config or reduce task complexity.
Agents aren't collaborating well
Solution: Use connection-focused modes in task descriptions:
Rapid calls are being throttled (no state change)
If agents are making quick consecutive calls and seeing no state progression, the minimum cycle interval throttle may be dropping calls. Pass an explicit timestamp to bypass wall-clock throttle:
Crew is too slow
Check: - Are agents waiting for each other? - Is each agent making multiple observations?
Solution: Use process_batch for multiple observations:
Next Steps¶
- LangGraph Integration — Single-agent workflows
- Concepts — Understand modes and stress
- API Reference — Full endpoint documentation