Skip to content

CrewAI Integration

Integrate Identity OS with CrewAI for behavioral control over multi-agent orchestration.

Overview

CrewAI is designed for multi-agent collaboration where specialized agents work together on tasks. Identity OS integrates at the agent level — each agent has its own instance and behavioral profile.

CrewAI Task Assignment
[Agent Processes Task]
[Infer Behavioral Mode from Task]
Identity OS Process
[Get ExecutionContract]
[Adapt Agent Behavior]

Installation

pip install crewai identity-os-sdk

Basic Setup

1. Initialize Clients

from crewai import Agent, Task, Crew, Process
from identity_os_sdk import IdentityOS, Mode

# Identity OS client
identity_client = IdentityOS(api_key="idos_sk_xxx")

# Create instances for each agent
researcher_instance = identity_client.instances.create(
    name="Researcher",
    description="Gathers and analyzes information"
)

writer_instance = identity_client.instances.create(
    name="Writer",
    description="Synthesizes findings into content"
)

analyst_instance = identity_client.instances.create(
    name="Analyst",
    description="Reviews and validates content"
)

# Store mapping
agent_instances = {
    "researcher": researcher_instance.id,
    "writer": writer_instance.id,
    "analyst": analyst_instance.id
}

2. Wrap Agent Execution

def get_constrained_agent(
    name: str,
    role: str,
    goal: str,
    instance_id: str
) -> Agent:
    """Create agent with Identity OS constraint wrapper"""

    # Original execute function
    original_execute = Agent.execute

    def constrained_execute(self, task):
        """Wrapped execute with behavioral constraints"""
        # Get current contract
        contract = identity_client.engine.get_contract(instance_id)

        # Map task to behavioral mode
        if "research" in task.description.lower():
            mode = Mode.EXPLORATION
        elif "analyze" in task.description.lower():
            mode = Mode.PERCEPTION
        elif "write" in task.description.lower():
            mode = Mode.ASSERTION
        else:
            mode = Mode.ORDER

        # Send observation
        result = identity_client.engine.process(
            instance_id=instance_id,
            mode_target=mode,
            signal_strength=0.8,
            context={"task": task.description}
        )

        contract = result.contract

        # Check if task is allowed
        if contract.stress_state in ["HIGH", "OVER"]:
            # Under stress, simplify task
            task.description = f"(Simplified) {task.description}"

        if contract.energy_level < 0.3:
            # Low energy, reduce scope
            task.description = f"(Quick version) {task.description}"

        # Execute with constraints
        try:
            output = original_execute(self, task)

            # Send success observation
            identity_client.engine.process(
                instance_id=instance_id,
                mode_target=Mode.ASSERTION,
                signal_strength=0.85,
                confidence=0.9,
                context={"success": True}
            )

            return output

        except Exception as e:
            # Send failure observation
            identity_client.engine.process(
                instance_id=instance_id,
                mode_target=Mode.STRESS_RESPONSE,
                signal_strength=0.8,
                confidence=0.9,
                context={"error": str(e)}
            )

            if contract.stress_state == "OVER":
                # Too stressed, escalate
                raise
            else:
                # Retry with simpler approach
                return f"Error encountered: {str(e)}. Recommending review."

    # Monkey-patch execute
    Agent.execute = constrained_execute

    return Agent(
        name=name,
        role=role,
        goal=goal,
        allow_delegation=False,
        verbose=True
    )

# Create agents with behavioral control
researcher = get_constrained_agent(
    name="Researcher",
    role="Senior Research Analyst",
    goal="Find and analyze relevant information",
    instance_id=agent_instances["researcher"]
)

writer = get_constrained_agent(
    name="Writer",
    role="Content Writer",
    goal="Write clear, engaging content",
    instance_id=agent_instances["writer"]
)

analyst = get_constrained_agent(
    name="Analyst",
    role="Quality Analyst",
    goal="Verify accuracy and completeness",
    instance_id=agent_instances["analyst"]
)

3. Define Tasks

from crewai import Task

research_task = Task(
    description="""
    Research the topic of AI agent behavioral drift.
    Find 3-5 reliable sources.
    Summarize key findings.
    """,
    agent=researcher,
    expected_output="Research summary with sources"
)

writing_task = Task(
    description="""
    Based on the research, write a 500-word article
    about AI agent behavioral drift.
    Make it engaging and accessible.
    """,
    agent=writer,
    expected_output="Complete article"
)

analysis_task = Task(
    description="""
    Review the article for:
    - Accuracy of claims
    - Completeness of coverage
    - Clarity of writing
    Provide feedback or approve.
    """,
    agent=analyst,
    expected_output="Quality assessment"
)

4. Create and Run Crew

crew = Crew(
    agents=[researcher, writer, analyst],
    tasks=[research_task, writing_task, analysis_task],
    process=Process.sequential,  # One task at a time
    verbose=True
)

# Run with monitoring
result = crew.kickoff()

# Check agent health
for agent_name, instance_id in agent_instances.items():
    snapshot = identity_client.engine.get_snapshot(instance_id)
    print(f"\n{agent_name}:")
    print(f"  Stress: {snapshot.stress_state}")
    print(f"  Energy: {snapshot.energy_level}")
    print(f"  Cycles: {snapshot.cycle_count}")

print(f"\nFinal output:\n{result}")

Complete Multi-Agent Example

from crewai import Agent, Task, Crew, Process
from identity_os_sdk import IdentityOS, Mode

# Initialize
identity_client = IdentityOS(api_key="idos_sk_xxx")

# Create instances
instances = {
    "planner": identity_client.instances.create(
        name="Planner",
        description="Plans strategy"
    ).id,
    "executor": identity_client.instances.create(
        name="Executor",
        description="Executes plans"
    ).id,
    "reviewer": identity_client.instances.create(
        name="Reviewer",
        description="Reviews outcomes"
    ).id
}

# Create agents
def create_constrained_agent(name, role, instance_id):
    def execute_task(task):
        # Check contract
        contract = identity_client.engine.get_contract(instance_id)

        # Infer mode
        if "plan" in task.description.lower():
            mode = Mode.PERCEPTION
        elif "execute" in task.description.lower():
            mode = Mode.ASSERTION
        else:
            mode = Mode.ORDER

        # Process
        result = identity_client.engine.process(
            instance_id=instance_id,
            mode_target=mode,
            signal_strength=0.8,
            context={"task": task.description[:50]}
        )

        contract = result.contract

        print(f"\n[{name}] Stress: {contract.stress_state}, Energy: {contract.energy_level}")

        # Guard actions
        if contract.stress_state == "OVER":
            return "Cannot complete under current stress, escalating."

        # Simulate task execution
        return f"{name} completed: {task.description[:30]}..."

    agent = Agent(
        name=name,
        role=role,
        goal=f"Complete {name.lower()} tasks",
        allow_delegation=False,
        verbose=False
    )

    # Override execute
    agent._execute_task = execute_task

    return agent

planner = create_constrained_agent(
    "Planner",
    "Strategic Planner",
    instances["planner"]
)

executor = create_constrained_agent(
    "Executor",
    "Task Executor",
    instances["executor"]
)

reviewer = create_constrained_agent(
    "Reviewer",
    "Quality Reviewer",
    instances["reviewer"]
)

# Define workflow
plan_task = Task(
    description="Create a plan for launching a new feature",
    agent=planner,
    expected_output="Detailed launch plan"
)

exec_task = Task(
    description="Execute the launch plan",
    agent=executor,
    expected_output="Execution report"
)

review_task = Task(
    description="Review the launch outcome",
    agent=reviewer,
    expected_output="Quality assessment"
)

# Run crew
crew = Crew(
    agents=[planner, executor, reviewer],
    tasks=[plan_task, exec_task, review_task],
    process=Process.sequential
)

print("=== Starting Multi-Agent Crew ===")
result = crew.kickoff()

# Final health check
print("\n=== Final Agent Health ===")
for agent_name, instance_id in instances.items():
    snap = identity_client.engine.get_snapshot(instance_id)
    print(f"{agent_name}: {snap.stress_state} stress, {snap.energy_level:.2f} energy")

Modeling Task Types as Modes

Map CrewAI task descriptions to behavioral modes:

Task Type Mode Example
Information gathering PERCEPTION "Research and summarize..."
Brainstorming/ideation EXPLORATION "Come up with creative solutions..."
Following procedures ORDER "Execute the standard process..."
Executing plans ASSERTION "Implement the approved plan..."
Collaboration CONNECTION "Work with team to..."
Quality/consistency IDENTITY "Ensure alignment with values..."
Troubleshooting STRESS_RESPONSE "Handle unexpected issues..."

Handling Agent Conflict

When multiple agents have conflicting outputs:

def resolve_agent_conflict(agent_name_1, agent_name_2, instance_id_1, instance_id_2):
    """Resolve conflicts by checking behavioral contracts"""

    # Get both contracts
    contract1 = identity_client.engine.get_contract(instance_id_1)
    contract2 = identity_client.engine.get_contract(instance_id_2)

    # If one is stressed, defer to the other
    if contract1.stress_state in ["HIGH", "OVER"]:
        return agent_name_2
    if contract2.stress_state in ["HIGH", "OVER"]:
        return agent_name_1

    # If one has higher assertion (confidence), use that
    if contract1.dominant_modes.count("Assertion") > contract2.dominant_modes.count("Assertion"):
        return agent_name_1
    else:
        return agent_name_2

# Use in crew
def conflict_resolution_task(agent1_result, agent2_result):
    winner = resolve_agent_conflict(
        "Agent1", "Agent2",
        instances["agent1"],
        instances["agent2"]
    )
    print(f"Using {winner}'s approach")
    return winner + "_result"

Monitoring Crew Health

def check_crew_health(instances):
    """Monitor overall crew behavioral health"""
    alerts = []

    for agent_name, instance_id in instances.items():
        snap = identity_client.engine.get_snapshot(instance_id)

        if snap.stress_state == "OVER":
            alerts.append(f"CRITICAL: {agent_name} is in OVER stress")
        elif snap.stress_state == "HIGH":
            alerts.append(f"WARNING: {agent_name} in HIGH stress")

        if snap.energy_level < 0.2:
            alerts.append(f"WARNING: {agent_name} energy critical")

        if snap.cycle_count > 100 and snap.stability_index < 0.6:
            alerts.append(f"WARNING: {agent_name} stability degrading")

    return alerts

# Check during execution
alerts = check_crew_health(instances)
if alerts:
    for alert in alerts:
        print(f"🚨 {alert}")

Agent Specialization

Different agents should have different initial behavioral profiles:

# Use built-in persona presets for different agent roles
researcher_instance = identity_client.instances.create(name="Researcher")
identity_client.personas.apply(researcher_instance.id, persona_id="persona_analytical")

writer_instance = identity_client.instances.create(name="Writer")
identity_client.personas.apply(writer_instance.id, persona_id="persona_creative")

analyst_instance = identity_client.instances.create(name="Analyst")
identity_client.personas.apply(analyst_instance.id, persona_id="persona_analytical")

Best Practices

  1. Create one instance per agent — Each agent has its own behavioral profile
  2. Initialize with specialization — Start agents with mode profiles matching their role
  3. Monitor crew health — Regularly check for stress or energy issues
  4. Implement conflict resolution — Use contracts to resolve agent disagreements
  5. Scale gracefully — Reduce complexity when agents are stressed

Troubleshooting

One agent is always stressed

Check: - Is its initial mode profile appropriate for its role? - Is it taking on too many tasks? - Are there cascading failures from other agents?

Solution: Adjust initial config or reduce task complexity.

Agents aren't collaborating well

Solution: Use connection-focused modes in task descriptions:

task = Task(
    description="Collaborate with the team to...",
    agent=agent
)
# This nudges mode toward CONNECTION

Rapid calls are being throttled (no state change)

If agents are making quick consecutive calls and seeing no state progression, the minimum cycle interval throttle may be dropping calls. Pass an explicit timestamp to bypass wall-clock throttle:

from datetime import datetime, timedelta

base = datetime.utcnow()
result = identity_client.engine.process(
    instance_id=instance_id,
    mode_target=mode,
    signal_strength=0.8,
    timestamp=base + timedelta(seconds=step_index * 2),
)
The built-in CrewAI adapter handles this automatically when using remote mode.

Crew is too slow

Check: - Are agents waiting for each other? - Is each agent making multiple observations?

Solution: Use process_batch for multiple observations:

result = identity_client.engine.process_batch(
    instance_id,
    observations=[obs1, obs2, obs3]
)

Next Steps