Shared State Schema
This document defines the shared state format for agent coordination, including access patterns, locking mechanisms, and event bus design.
Full Public Reader
Shared State Schema
> Version 1.0.0 | Agent Swarm State Management
This document defines the shared state format for agent coordination, including access patterns, locking mechanisms, and event bus design.
---
Table of Contents
1. [State Architecture](#state-architecture)
2. [State Schema](#state-schema)
3. [Agent-Specific State](#agent-specific-state)
4. [Access Control](#access-control)
5. [Concurrency & Locking](#concurrency--locking)
6. [Event Bus Design](#event-bus-design)
7. [Persistence](#persistence)
---
State Architecture
Overview
┌─────────────────────────────────────────────────────────────┐
│ Coordinator │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ State Store │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌────────┐ │ │
│ │ │ Global │ │ Pulse │ │Heartbeat │ │ Dream │ │ │
│ │ │ State │ │ State │ │ State │ │ Weaver │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────┴─────────────────────────┐ │
│ │ Event Bus │ │
│ │ [state.update] [task.complete] [handoff.request] │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│ Pulse │ │Heartbeat│ │ Dream │ │ Guardian│
│ Agent │ │ Agent │ │ Weaver │ │ Agent │
└─────────┘ └─────────┘ └─────────┘ └─────────┘State Hierarchy
interface SwarmState {
// Global state (shared across all agents)
global: GlobalState;
// Per-agent state namespaces
agents: {
[agentType: string]: {
[sessionId: string]: AgentState;
};
};
// Resource locks
locks: LockRegistry;
// Event history (ring buffer)
events: EventHistory;
}---
State Schema
Global State
interface GlobalState {
// System configuration
config: {
maxConcurrentAgents: number;
defaultPriority: Priority;
timeouts: {
taskDefault: number; // ms
handoffDefault: number; // ms
lockDefault: number; // ms
};
features: {
autoHandoff: boolean;
preemption: boolean;
eventPersistence: boolean;
};
};
// Agent registry
registry: {
[agentId: string]: AgentRegistration;
};
// Active tasks across all agents
tasks: {
[taskId: string]: TaskState;
};
// System metrics
metrics: {
totalTasksCompleted: number;
totalTasksFailed: number;
totalHandoffs: number;
averageTaskDuration: number;
lastUpdated: string;
};
// System health
health: {
status: 'healthy' | 'degraded' | 'critical';
issues: string[];
lastCheck: string;
};
}
interface TaskState {
id: string;
goal: string;
status: 'pending' | 'running' | 'complete' | 'failed' | 'cancelled';
priority: Priority;
assignedTo: AgentIdentity | null;
createdAt: string;
startedAt: string | null;
completedAt: string | null;
parentTaskId?: string; // For subtasks
metadata: Record<string, unknown>;
}Agent State (Base)
interface AgentState {
// Identity
agentId: string;
agentType: AgentType;
sessionId: string;
// Lifecycle
status: AgentStatus;
startedAt: string;
lastActivityAt: string;
// Current work
currentTask: TaskState | null;
taskQueue: TaskState[];
// Progress tracking
progress: {
current: number; // 0-100
message: string;
eta?: string; // Estimated completion
};
// Resource usage
resources: {
tokensUsed: number;
tokensRemaining: number;
memoryUsed: number;
cpuTime: number;
};
// Error tracking
errors: {
count: number;
last?: {
code: string;
message: string;
timestamp: string;
};
};
// Arbitrary agent-specific data
data: Record<string, unknown>;
}
type AgentStatus =
| 'initializing'
| 'idle'
| 'busy'
| 'paused'
| 'draining'
| 'offline'
| 'error';---
Agent-Specific State
Pulse State
interface PulseState extends AgentState {
agentType: 'pulse';
data: {
// Session configuration
projectName: string;
projectPath: string;
goal: string;
branchName: string | null;
// Iteration tracking
currentIteration: number;
maxIterations: number;
lastSignal: PulseSignal;
// Work products
iterations: PulseIteration[];
totalCommits: number;
commitHashes: string[];
filesModified: string[];
// Progress log
progressLog: ProgressEntry[];
};
}
interface PulseIteration {
number: number;
status: 'pending' | 'running' | 'complete' | 'failed';
signal: PulseSignal;
summary: string;
commitHash: string | null;
startedAt: string | null;
completedAt: string | null;
durationSeconds: number | null;
filesModified: string[];
}
type PulseSignal =
| 'CONTINUE'
| 'COMPLETE'
| 'BLOCKED'
| 'PAUSE'
| 'UNKNOWN';Heartbeat State
interface HeartbeatState extends AgentState {
agentType: 'heartbeat';
data: {
// Check schedule
checkInterval: number; // ms
lastCheck: string;
nextCheck: string;
// Check history
checks: HeartbeatCheck[];
// Monitoring targets
targets: {
email: boolean;
calendar: boolean;
weather: boolean;
mentions: boolean;
custom: string[];
};
// Findings
findings: HeartbeatFinding[];
pendingNotifications: Notification[];
};
}
interface HeartbeatCheck {
id: string;
type: string;
timestamp: string;
duration: number;
status: 'success' | 'failed' | 'skipped';
findings: number;
}
interface HeartbeatFinding {
id: string;
type: string;
priority: Priority;
summary: string;
details: Record<string, unknown>;
notified: boolean;
timestamp: string;
}Dream Weaver State
interface DreamWeaverState extends AgentState {
agentType: 'dream_weaver';
data: {
// Current dream/task
dreamId: string | null;
dreamType: DreamType;
prompt: string;
// Generation state
phase: 'planning' | 'generating' | 'refining' | 'complete';
artifacts: Artifact[];
// Memory/context
contextWindow: {
documents: string[];
maxTokens: number;
usedTokens: number;
};
// Quality metrics
quality: {
coherence: number; // 0-1
relevance: number; // 0-1
creativity: number; // 0-1
};
};
}
type DreamType =
| 'analysis'
| 'creative_writing'
| 'code_generation'
| 'research'
| 'synthesis';
interface Artifact {
id: string;
type: 'document' | 'code' | 'diagram' | 'data';
path: string;
size: number;
createdAt: string;
metadata: Record<string, unknown>;
}Conductor State
interface ConductorState extends AgentState {
agentType: 'conductor';
data: {
// Orchestration state
activeSessions: string[];
pendingHandoffs: HandoffRequest[];
// Routing rules
routingTable: RoutingRule[];
// Load balancing
agentLoads: {
[agentId: string]: {
currentTasks: number;
queuedTasks: number;
capacity: number;
};
};
// Decision history
decisions: ConductorDecision[];
};
}
interface RoutingRule {
id: string;
condition: {
taskType?: string;
priority?: Priority;
keywords?: string[];
custom?: string; // Expression
};
target: AgentType;
weight: number; // For load balancing
enabled: boolean;
}
interface ConductorDecision {
id: string;
type: 'route' | 'handoff' | 'preempt' | 'reject';
input: Record<string, unknown>;
output: Record<string, unknown>;
reason: string;
timestamp: string;
}---
Access Control
Permission Matrix
| State Path | Pulse | Heartbeat | Dream Weaver | Conductor | Guardian |
|---|---|---|---|---|---|
| `global.config` | R | R | R | RW | R |
| `global.registry` | R | R | R | RW | R |
| `global.tasks` | R | R | R | RW | R |
| `global.metrics` | R | R | R | RW | R |
| `global.health` | R | R | R | RW | RW |
| `agents.pulse.*` | RW¹ | R | R | RW | R |
| `agents.heartbeat.*` | R | RW¹ | R | RW | R |
| `agents.dream_weaver.*` | R | R | RW¹ | RW | R |
| `agents.conductor.*` | R | R | R | RW | R |
| `locks.*` | RW² | RW² | RW² | RW | R |
¹ Only own sessions
² Only own locks
Access Control Implementation
interface AccessControl {
// Check if agent can perform operation
canAccess(
agent: AgentIdentity,
path: string,
operation: 'read' | 'write' | 'delete'
): boolean;
// Get accessible paths for agent
getAccessiblePaths(
agent: AgentIdentity,
operation: 'read' | 'write' | 'delete'
): string[];
// Validate state update
validateUpdate(
agent: AgentIdentity,
path: string,
oldValue: unknown,
newValue: unknown
): ValidationResult;
}
interface ValidationResult {
allowed: boolean;
reason?: string;
sanitizedValue?: unknown; // If transformation needed
}---
Concurrency & Locking
Lock Types
type LockType =
| 'exclusive' // Only one holder
| 'shared' // Multiple readers
| 'upgradeable'; // Shared → Exclusive
interface Lock {
id: string;
resource: string; // Resource path/identifier
type: LockType;
holder: AgentIdentity;
acquiredAt: string;
expiresAt: string;
ttl: number; // Original TTL
renewCount: number;
metadata?: Record<string, unknown>;
}
interface LockRegistry {
locks: { [lockId: string]: Lock };
waitQueue: { [resource: string]: LockRequest[] };
}
interface LockRequest {
id: string;
resource: string;
type: LockType;
requester: AgentIdentity;
requestedAt: string;
timeout: number;
priority: Priority;
}Locking API
interface LockManager {
// Acquire a lock
acquire(request: LockRequest): Promise<Lock | null>;
// Release a lock
release(lockId: string, holder: AgentIdentity): Promise<boolean>;
// Renew a lock (extend TTL)
renew(lockId: string, holder: AgentIdentity, ttl: number): Promise<boolean>;
// Upgrade shared → exclusive
upgrade(lockId: string, holder: AgentIdentity): Promise<boolean>;
// Downgrade exclusive → shared
downgrade(lockId: string, holder: AgentIdentity): Promise<boolean>;
// Force release (coordinator only)
forceRelease(lockId: string, reason: string): Promise<boolean>;
// Query locks
getLocks(resource?: string): Lock[];
getWaitQueue(resource: string): LockRequest[];
}Optimistic Concurrency
For state updates without explicit locks:
interface VersionedState<T> {
value: T;
version: number; // Monotonic counter
lastModified: string;
lastModifiedBy: AgentIdentity;
}
interface StateUpdate<T> {
path: string;
expectedVersion: number; // Fails if version mismatch
newValue: T;
merge?: boolean; // Deep merge vs replace
}
// Update result
interface UpdateResult {
success: boolean;
newVersion?: number;
conflict?: {
currentVersion: number;
currentValue: unknown;
};
}Conflict Resolution
interface ConflictResolver {
// Automatic resolution strategies
strategies: {
'last-write-wins': (a: unknown, b: unknown) => unknown;
'first-write-wins': (a: unknown, b: unknown) => unknown;
'merge-objects': (a: object, b: object) => object;
'merge-arrays': (a: unknown[], b: unknown[]) => unknown[];
'higher-priority-wins': (
a: { value: unknown; priority: Priority },
b: { value: unknown; priority: Priority }
) => unknown;
};
// Resolve conflict
resolve(
path: string,
conflictingUpdates: StateUpdate<unknown>[],
strategy?: keyof ConflictResolver['strategies']
): unknown;
}---
Event Bus Design
Event Types
interface SwarmEvent<T = unknown> {
id: string;
type: EventType;
source: AgentIdentity;
timestamp: string;
payload: T;
metadata: {
correlationId?: string;
causationId?: string; // Event that caused this
trace?: TraceContext;
};
}
type EventType =
// State events
| 'state.created'
| 'state.updated'
| 'state.deleted'
// Task events
| 'task.created'
| 'task.assigned'
| 'task.started'
| 'task.progress'
| 'task.completed'
| 'task.failed'
| 'task.cancelled'
// Handoff events
| 'handoff.requested'
| 'handoff.accepted'
| 'handoff.rejected'
| 'handoff.completed'
// Agent events
| 'agent.registered'
| 'agent.started'
| 'agent.stopped'
| 'agent.health_changed'
// Lock events
| 'lock.acquired'
| 'lock.released'
| 'lock.expired'
| 'lock.contention'
// System events
| 'system.config_changed'
| 'system.alert'
| 'system.maintenance';Event Bus API
interface EventBus {
// Publish event
emit<T>(event: Omit<SwarmEvent<T>, 'id' | 'timestamp'>): string;
// Subscribe to events
subscribe(
pattern: EventPattern,
handler: EventHandler,
options?: SubscribeOptions
): Subscription;
// Unsubscribe
unsubscribe(subscriptionId: string): void;
// Query event history
query(filter: EventFilter): SwarmEvent[];
// Replay events (for recovery)
replay(
filter: EventFilter,
handler: EventHandler
): Promise<void>;
}
interface EventPattern {
type?: EventType | EventType[] | RegExp;
source?: AgentIdentity | AgentType;
custom?: (event: SwarmEvent) => boolean;
}
interface SubscribeOptions {
// Delivery guarantees
delivery: 'at-most-once' | 'at-least-once' | 'exactly-once';
// Ordering
ordered: boolean;
// Filtering
filter?: (event: SwarmEvent) => boolean;
// Batching
batch?: {
maxSize: number;
maxWait: number; // ms
};
// Error handling
onError: 'ignore' | 'retry' | 'dead-letter';
maxRetries?: number;
}
interface Subscription {
id: string;
pattern: EventPattern;
status: 'active' | 'paused' | 'cancelled';
stats: {
received: number;
processed: number;
errors: number;
};
pause(): void;
resume(): void;
cancel(): void;
}Event History
interface EventHistory {
// Ring buffer configuration
maxEvents: number;
retentionMs: number;
// Storage
events: SwarmEvent[];
// Indexes for efficient queries
byType: Map<EventType, string[]>; // Event IDs
bySource: Map<string, string[]>; // Agent ID → Event IDs
byCorrelation: Map<string, string[]>; // Correlation ID → Event IDs
}
interface EventFilter {
types?: EventType[];
sources?: string[];
after?: string; // Timestamp
before?: string;
correlationId?: string;
limit?: number;
offset?: number;
}---
Persistence
Storage Backend
interface StateStorage {
// CRUD operations
get<T>(key: string): Promise<T | null>;
set<T>(key: string, value: T, options?: SetOptions): Promise<void>;
delete(key: string): Promise<boolean>;
// Batch operations
getMany<T>(keys: string[]): Promise<Map<string, T>>;
setMany<T>(entries: Map<string, T>): Promise<void>;
deleteMany(keys: string[]): Promise<number>;
// Query operations
list(prefix: string): Promise<string[]>;
scan(pattern: string): AsyncIterable<[string, unknown]>;
// Atomic operations
compareAndSet<T>(
key: string,
expected: T | null,
value: T
): Promise<boolean>;
increment(key: string, delta: number): Promise<number>;
}
interface SetOptions {
ttl?: number; // Time-to-live in ms
ifNotExists?: boolean; // Only set if key doesn't exist
version?: number; // Optimistic locking
}Persistence Strategies
type PersistenceStrategy =
| 'memory' // In-memory only (fast, volatile)
| 'file' // JSON files (simple, portable)
| 'sqlite' // SQLite (reliable, embedded)
| 'redis' // Redis (distributed, fast)
| 'hybrid'; // Memory + async persistence
interface PersistenceConfig {
strategy: PersistenceStrategy;
// File strategy options
file?: {
directory: string;
syncInterval: number; // ms
compression: boolean;
};
// SQLite options
sqlite?: {
path: string;
wal: boolean; // Write-ahead logging
};
// Redis options
redis?: {
url: string;
keyPrefix: string;
};
// Hybrid options
hybrid?: {
primary: PersistenceStrategy;
secondary: PersistenceStrategy;
syncInterval: number;
};
}Snapshotting
interface Snapshot {
id: string;
timestamp: string;
state: SwarmState;
eventSequence: number; // Last event included
checksum: string;
metadata: {
reason: 'scheduled' | 'manual' | 'recovery';
agentId?: string;
};
}
interface SnapshotManager {
// Create snapshot
create(reason?: string): Promise<Snapshot>;
// Restore from snapshot
restore(snapshotId: string): Promise<void>;
// List snapshots
list(limit?: number): Promise<Snapshot[]>;
// Delete old snapshots
prune(keepCount: number): Promise<number>;
}---
Appendix
State Path Syntax
Paths use dot notation with array indexing:
global.config.maxConcurrentAgents
agents.pulse.session-123.data.currentIteration
agents.heartbeat.*.lastCheck
locks.lock-456.holder.idRelated Documents
- [PROTOCOL.md](./PROTOCOL.md) — Agent coordination protocol
- [src/](./src/) — TypeScript type definitions
Promotion Decision
Attach run IDs, datasets, metrics, and reproduction commands.
Source Anchor
Comp-Core/docs/agent-protocol/STATE_SCHEMA.md
Detected Structure
Method · Evaluation · Code Anchors · Architecture