Skip to content

Python SDK Reference

Official Python SDK for Identity OS with full type hints and async support.

Installation

pip install identity-os-sdk

Quick Start

from identity_os_sdk import IdentityOS, Mode

# Initialize client
client = IdentityOS(api_key="idos_sk_xxx")

# Create instance
instance = client.instances.create(name="Aria")

# Process observation
result = client.engine.process(
    instance_id=instance.id,
    mode_target=Mode.EXPLORATION,
    signal_strength=0.8,
    confidence=0.9
)

# Read contract
contract = result.contract
print(contract.allowed_actions)

Client Initialization

Standard (Synchronous)

from identity_os_sdk import IdentityOS

client = IdentityOS(api_key="idos_sk_xxx")

Async

from identity_os_sdk import IdentityOSAsync

async def main():
    client = IdentityOSAsync(api_key="idos_sk_xxx")
    instance = await client.instances.create(name="Aria")
    result = await client.engine.process(
        instance_id=instance.id,
        mode_target=Mode.EXPLORATION,
        signal_strength=0.8
    )

import asyncio
asyncio.run(main())

Custom Base URL (Local Docker)

client = IdentityOS(
    api_key="",  # Not needed for local
    base_url="http://localhost:8000"
)

With Timeouts

client = IdentityOS(
    api_key="idos_sk_xxx",
    timeout=30.0  # seconds
)

Instances

Create

instance = client.instances.create(
    name="Aria",
    description="Research assistant",
    initial_config={
        "mode_strengths": {
            "perception": 0.50,
            "exploration": 0.50,
            "order": 0.50,
            "assertion": 0.50,
            "connection": 0.50,
            "identity": 0.50,
            "stress_response": 0.50
        },
        "energy_level": 0.50
    }
)

print(instance.id)       # inst_9f190826e20c49ea
print(instance.name)     # Aria
print(instance.created_at)

List

# Sync
instances = client.instances.list(limit=50, offset=0)
print(f"Total: {instances.pagination.total}")

# Async
instances = await async_client.instances.list(limit=50)

# Iterate through pages
offset = 0
while True:
    batch = client.instances.list(limit=100, offset=offset)
    for inst in batch.data:
        print(inst.name)
    if not batch.pagination.has_more:
        break
    offset += 100

Get

instance = client.instances.get("inst_9f190826e20c49ea")
print(f"Cycles: {instance.cycle_count}")
print(f"Energy: {instance.energy_level}")

Update

updated = client.instances.update(
    "inst_9f190826e20c49ea",
    name="Aria v2",
    description="Senior research assistant"
)

Delete

client.instances.delete("inst_9f190826e20c49ea")

Engine

Process

from identity_os_sdk import Mode

result = client.engine.process(
    instance_id="inst_xxx",
    mode_target=Mode.EXPLORATION,
    signal_strength=0.8,
    confidence=0.9,
    context={"task": "research"}
)

# Access results
contract = result.contract
snapshot = result.snapshot
drift = result.drift

print(f"Allowed: {contract.allowed_actions}")
print(f"Stress: {contract.stress_state}")
print(f"Energy: {contract.energy_level}")
print(f"Dominant modes: {contract.dominant_modes}")

Batch Process

result = client.engine.process_batch(
    instance_id="inst_xxx",
    observations=[
        {"mode_target": Mode.EXPLORATION, "signal_strength": 0.8},
        {"mode_target": Mode.ORDER, "signal_strength": 0.7},
        {"mode_target": Mode.ASSERTION, "signal_strength": 0.6}
    ],
    include_intermediate=False
)

print(f"Processed {result.processed_count} observations")
print(f"Final contract: {result.contract}")

Get Snapshot

snapshot = client.engine.get_snapshot("inst_xxx")

print(f"Cycle: {snapshot.cycle}")
print(f"Modes: {snapshot.mode_strengths}")
print(f"Stress: {snapshot.stress_state}")
print(f"Energy: {snapshot.energy_level}")
print(f"Stability: {snapshot.stability_index}")

Get Contract

contract = client.engine.get_contract("inst_xxx")

if "execute" in contract.allowed_actions:
    print("Can execute")

print(f"Decision tempo: {contract.decision_style['tempo']}")
print(f"Risk tolerance: {contract.decision_style['risk']}")

Get History

history = client.engine.get_history("inst_xxx", limit=100)

for obs in history.data:
    print(f"Cycle {obs.cycle}: {obs.mode_target}")

Get Drift

drift_events = client.engine.get_drift("inst_xxx")

for event in drift_events.data:
    print(f"Cycle {event.cycle}: {event.drift_level}")
    if event.rolled_back:
        print("  → Auto-rolled back")

Reset

# Reset to initial state
client.engine.reset("inst_xxx", mode="initial")

# Reset to a clean default state
client.engine.reset("inst_xxx", mode="default")

Personas

List

personas = client.personas.list()

for persona in personas.data:
    print(f"{persona.name}: {persona.archetype}")

Get

persona = client.personas.get("persona_analytical")

print(f"Name: {persona.name}")
print(f"Values: {persona.core_values}")
print(f"Mode profile: {persona.mode_profile}")
print(f"Preferred actions: {persona.preferred_actions}")

Testing

Get Questions

questions = client.test.get_questions(version="v1")

for q in questions.data.questions:
    print(f"{q.id}: {q.text}")
    for opt in q.options:
        print(f"  - {opt.text}")

Submit Test

result = client.test.submit(
    instance_id="inst_xxx",
    responses=[
        {"question_id": "q1", "selected_option_id": "q1_b"},
        {"question_id": "q2", "selected_option_id": "q2_d"},
    ]
)

print(f"Matched personas: {result.matched_personas}")
print(f"Mode profile: {result.mode_profile}")
print(f"Contract: {result.contract}")

Or create new instance:

result = client.test.submit(
    create_instance=True,
    instance_name="NewAgent",
    responses=[...]
)

print(f"Created: {result.instance_id}")

Data Types

Enums

from identity_os_sdk import Mode, StressState

# Modes
Mode.PERCEPTION
Mode.EXPLORATION
Mode.ORDER
Mode.ASSERTION
Mode.CONNECTION
Mode.IDENTITY
Mode.STRESS_RESPONSE

# Stress states
StressState.LOW
StressState.MED
StressState.HIGH
StressState.OVER

Key Models

from identity_os_sdk import (
    Instance,
    IdentitySnapshot,
    ExecutionContract,
    ProcessResult,
    Persona,
    TestResult
)

# All models are Pydantic v2 BaseModel
# Type hints available
# Full IDE support

Error Handling

from identity_os_sdk import IdentityOSError, AuthenticationError, NotFoundError

try:
    instance = client.instances.get("invalid_id")
except NotFoundError:
    print("Instance not found")
except AuthenticationError:
    print("Invalid API key")
except IdentityOSError as e:
    print(f"API error: {e.message}")

Async API

All endpoints support async:

from identity_os_sdk import IdentityOSAsync
import asyncio

async def main():
    client = IdentityOSAsync(api_key="idos_sk_xxx")

    # Create
    instance = await client.instances.create(name="Aria")

    # Process (non-blocking)
    result = await client.engine.process(
        instance_id=instance.id,
        mode_target=Mode.EXPLORATION,
        signal_strength=0.8
    )

    # Batch process
    result = await client.engine.process_batch(
        instance_id=instance.id,
        observations=[...]
    )

    return result

result = asyncio.run(main())

Bulk Operations

# Create multiple instances
instances = []
for i in range(10):
    instance = client.instances.create(
        name=f"Agent-{i}",
        description="Bulk created"
    )
    instances.append(instance)

# Process multiple observations in batch
result = client.engine.process_batch(
    instance_id=instances[0].id,
    observations=[
        {"mode_target": Mode.EXPLORATION, "signal_strength": 0.8},
        {"mode_target": Mode.ORDER, "signal_strength": 0.7},
        {"mode_target": Mode.ASSERTION, "signal_strength": 0.6},
    ]
)

# List all with pagination
all_instances = []
offset = 0
while True:
    batch = client.instances.list(limit=100, offset=offset)
    all_instances.extend(batch.data)
    if not batch.pagination.has_more:
        break
    offset += 100

Logging

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("identity_os_sdk")
logger.setLevel(logging.DEBUG)

# SDK will log API calls, responses, etc.
client = IdentityOS(api_key="idos_sk_xxx")

Environment Variables

# Can also use environment variables
export IDENTITY_OS_API_KEY="idos_sk_xxx"
export IDENTITY_OS_BASE_URL="https://api.identity-os.dev/v1"

Then:

from identity_os_sdk import IdentityOS

# Reads from env vars automatically
client = IdentityOS()

Pydantic Models

Full type hints with Pydantic v2:

from identity_os_sdk import ExecutionContract

contract: ExecutionContract = result.contract

# Type hints work
if contract.stress_state == "LOW":
    actions = contract.allowed_actions  # List[str]
    tempo = contract.decision_style["tempo"]  # str
    energy = contract.energy_level  # float

Common Patterns

Pattern: Monitor Agent Health

def monitor_agent(instance_id: str) -> dict:
    """Get current health metrics"""
    snapshot = client.engine.get_snapshot(instance_id)
    contract = client.engine.get_contract(instance_id)

    return {
        "stress": snapshot.stress_state,
        "energy": snapshot.energy_level,
        "stability": snapshot.stability_index,
        "cycles": snapshot.cycle_count,
        "allowed_actions": contract.allowed_actions,
        "decision_style": contract.decision_style
    }

health = monitor_agent("inst_xxx")
print(health)

Pattern: Adaptive Decision Making

def decide_with_constraints(
    options: List[str],
    instance_id: str
) -> str:
    """Choose option based on contract"""
    contract = client.engine.get_contract(instance_id)

    # Filter by allowed actions
    available = [o for o in options if o in contract.allowed_actions]

    if not available:
        return "defer"  # Safe fallback

    # Apply decision style
    if contract.decision_style["tempo"] == "decisive":
        return available[0]  # First option
    else:
        return best_option(available)  # Best option

choice = decide_with_constraints(["explore", "execute", "defer"], "inst_xxx")

Rate Limiting

import time

try:
    result = client.engine.process(...)
except RateLimitError as e:
    wait_seconds = e.retry_after
    print(f"Rate limited, waiting {wait_seconds}s")
    time.sleep(wait_seconds)
    result = client.engine.process(...)  # Retry

Reference