Python SDK Reference¶
Official Python SDK for Identity OS with full type hints and async support.
Installation¶
Quick Start¶
from identity_os_sdk import IdentityOS, Mode
# Initialize client
client = IdentityOS(api_key="idos_sk_xxx")
# Create instance
instance = client.instances.create(name="Aria")
# Process observation
result = client.engine.process(
instance_id=instance.id,
mode_target=Mode.EXPLORATION,
signal_strength=0.8,
confidence=0.9
)
# Read contract
contract = result.contract
print(contract.allowed_actions)
Client Initialization¶
Standard (Synchronous)¶
Async¶
from identity_os_sdk import IdentityOSAsync
async def main():
client = IdentityOSAsync(api_key="idos_sk_xxx")
instance = await client.instances.create(name="Aria")
result = await client.engine.process(
instance_id=instance.id,
mode_target=Mode.EXPLORATION,
signal_strength=0.8
)
import asyncio
asyncio.run(main())
Custom Base URL (Local Docker)¶
With Timeouts¶
Instances¶
Create¶
instance = client.instances.create(
name="Aria",
description="Research assistant",
initial_config={
"mode_strengths": {
"perception": 0.50,
"exploration": 0.50,
"order": 0.50,
"assertion": 0.50,
"connection": 0.50,
"identity": 0.50,
"stress_response": 0.50
},
"energy_level": 0.50
}
)
print(instance.id) # inst_9f190826e20c49ea
print(instance.name) # Aria
print(instance.created_at)
List¶
# Sync
instances = client.instances.list(limit=50, offset=0)
print(f"Total: {instances.pagination.total}")
# Async
instances = await async_client.instances.list(limit=50)
# Iterate through pages
offset = 0
while True:
batch = client.instances.list(limit=100, offset=offset)
for inst in batch.data:
print(inst.name)
if not batch.pagination.has_more:
break
offset += 100
Get¶
instance = client.instances.get("inst_9f190826e20c49ea")
print(f"Cycles: {instance.cycle_count}")
print(f"Energy: {instance.energy_level}")
Update¶
updated = client.instances.update(
"inst_9f190826e20c49ea",
name="Aria v2",
description="Senior research assistant"
)
Delete¶
Engine¶
Process¶
from identity_os_sdk import Mode
result = client.engine.process(
instance_id="inst_xxx",
mode_target=Mode.EXPLORATION,
signal_strength=0.8,
confidence=0.9,
context={"task": "research"}
)
# Access results
contract = result.contract
snapshot = result.snapshot
drift = result.drift
print(f"Allowed: {contract.allowed_actions}")
print(f"Stress: {contract.stress_state}")
print(f"Energy: {contract.energy_level}")
print(f"Dominant modes: {contract.dominant_modes}")
Batch Process¶
result = client.engine.process_batch(
instance_id="inst_xxx",
observations=[
{"mode_target": Mode.EXPLORATION, "signal_strength": 0.8},
{"mode_target": Mode.ORDER, "signal_strength": 0.7},
{"mode_target": Mode.ASSERTION, "signal_strength": 0.6}
],
include_intermediate=False
)
print(f"Processed {result.processed_count} observations")
print(f"Final contract: {result.contract}")
Get Snapshot¶
snapshot = client.engine.get_snapshot("inst_xxx")
print(f"Cycle: {snapshot.cycle}")
print(f"Modes: {snapshot.mode_strengths}")
print(f"Stress: {snapshot.stress_state}")
print(f"Energy: {snapshot.energy_level}")
print(f"Stability: {snapshot.stability_index}")
Get Contract¶
contract = client.engine.get_contract("inst_xxx")
if "execute" in contract.allowed_actions:
print("Can execute")
print(f"Decision tempo: {contract.decision_style['tempo']}")
print(f"Risk tolerance: {contract.decision_style['risk']}")
Get History¶
history = client.engine.get_history("inst_xxx", limit=100)
for obs in history.data:
print(f"Cycle {obs.cycle}: {obs.mode_target}")
Get Drift¶
drift_events = client.engine.get_drift("inst_xxx")
for event in drift_events.data:
print(f"Cycle {event.cycle}: {event.drift_level}")
if event.rolled_back:
print(" → Auto-rolled back")
Reset¶
# Reset to initial state
client.engine.reset("inst_xxx", mode="initial")
# Reset to a clean default state
client.engine.reset("inst_xxx", mode="default")
Personas¶
List¶
personas = client.personas.list()
for persona in personas.data:
print(f"{persona.name}: {persona.archetype}")
Get¶
persona = client.personas.get("persona_analytical")
print(f"Name: {persona.name}")
print(f"Values: {persona.core_values}")
print(f"Mode profile: {persona.mode_profile}")
print(f"Preferred actions: {persona.preferred_actions}")
Testing¶
Get Questions¶
questions = client.test.get_questions(version="v1")
for q in questions.data.questions:
print(f"{q.id}: {q.text}")
for opt in q.options:
print(f" - {opt.text}")
Submit Test¶
result = client.test.submit(
instance_id="inst_xxx",
responses=[
{"question_id": "q1", "selected_option_id": "q1_b"},
{"question_id": "q2", "selected_option_id": "q2_d"},
]
)
print(f"Matched personas: {result.matched_personas}")
print(f"Mode profile: {result.mode_profile}")
print(f"Contract: {result.contract}")
Or create new instance:
result = client.test.submit(
create_instance=True,
instance_name="NewAgent",
responses=[...]
)
print(f"Created: {result.instance_id}")
Data Types¶
Enums¶
from identity_os_sdk import Mode, StressState
# Modes
Mode.PERCEPTION
Mode.EXPLORATION
Mode.ORDER
Mode.ASSERTION
Mode.CONNECTION
Mode.IDENTITY
Mode.STRESS_RESPONSE
# Stress states
StressState.LOW
StressState.MED
StressState.HIGH
StressState.OVER
Key Models¶
from identity_os_sdk import (
Instance,
IdentitySnapshot,
ExecutionContract,
ProcessResult,
Persona,
TestResult
)
# All models are Pydantic v2 BaseModel
# Type hints available
# Full IDE support
Error Handling¶
from identity_os_sdk import IdentityOSError, AuthenticationError, NotFoundError
try:
instance = client.instances.get("invalid_id")
except NotFoundError:
print("Instance not found")
except AuthenticationError:
print("Invalid API key")
except IdentityOSError as e:
print(f"API error: {e.message}")
Async API¶
All endpoints support async:
from identity_os_sdk import IdentityOSAsync
import asyncio
async def main():
client = IdentityOSAsync(api_key="idos_sk_xxx")
# Create
instance = await client.instances.create(name="Aria")
# Process (non-blocking)
result = await client.engine.process(
instance_id=instance.id,
mode_target=Mode.EXPLORATION,
signal_strength=0.8
)
# Batch process
result = await client.engine.process_batch(
instance_id=instance.id,
observations=[...]
)
return result
result = asyncio.run(main())
Bulk Operations¶
# Create multiple instances
instances = []
for i in range(10):
instance = client.instances.create(
name=f"Agent-{i}",
description="Bulk created"
)
instances.append(instance)
# Process multiple observations in batch
result = client.engine.process_batch(
instance_id=instances[0].id,
observations=[
{"mode_target": Mode.EXPLORATION, "signal_strength": 0.8},
{"mode_target": Mode.ORDER, "signal_strength": 0.7},
{"mode_target": Mode.ASSERTION, "signal_strength": 0.6},
]
)
# List all with pagination
all_instances = []
offset = 0
while True:
batch = client.instances.list(limit=100, offset=offset)
all_instances.extend(batch.data)
if not batch.pagination.has_more:
break
offset += 100
Logging¶
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("identity_os_sdk")
logger.setLevel(logging.DEBUG)
# SDK will log API calls, responses, etc.
client = IdentityOS(api_key="idos_sk_xxx")
Environment Variables¶
# Can also use environment variables
export IDENTITY_OS_API_KEY="idos_sk_xxx"
export IDENTITY_OS_BASE_URL="https://api.identity-os.dev/v1"
Then:
Pydantic Models¶
Full type hints with Pydantic v2:
from identity_os_sdk import ExecutionContract
contract: ExecutionContract = result.contract
# Type hints work
if contract.stress_state == "LOW":
actions = contract.allowed_actions # List[str]
tempo = contract.decision_style["tempo"] # str
energy = contract.energy_level # float
Common Patterns¶
Pattern: Monitor Agent Health¶
def monitor_agent(instance_id: str) -> dict:
"""Get current health metrics"""
snapshot = client.engine.get_snapshot(instance_id)
contract = client.engine.get_contract(instance_id)
return {
"stress": snapshot.stress_state,
"energy": snapshot.energy_level,
"stability": snapshot.stability_index,
"cycles": snapshot.cycle_count,
"allowed_actions": contract.allowed_actions,
"decision_style": contract.decision_style
}
health = monitor_agent("inst_xxx")
print(health)
Pattern: Adaptive Decision Making¶
def decide_with_constraints(
options: List[str],
instance_id: str
) -> str:
"""Choose option based on contract"""
contract = client.engine.get_contract(instance_id)
# Filter by allowed actions
available = [o for o in options if o in contract.allowed_actions]
if not available:
return "defer" # Safe fallback
# Apply decision style
if contract.decision_style["tempo"] == "decisive":
return available[0] # First option
else:
return best_option(available) # Best option
choice = decide_with_constraints(["explore", "execute", "defer"], "inst_xxx")
Rate Limiting¶
import time
try:
result = client.engine.process(...)
except RateLimitError as e:
wait_seconds = e.retry_after
print(f"Rate limited, waiting {wait_seconds}s")
time.sleep(wait_seconds)
result = client.engine.process(...) # Retry