cc-event-sourcing Specification
This specification defines the event sourcing patterns used across Comp-Core projects. Event sourcing captures all changes to application state as a sequence of immutable events, enabling:
Full Public Reader
cc-event-sourcing Specification
> Event Sourcing Standard for Comp-Core Projects
Overview
This specification defines the event sourcing patterns used across Comp-Core projects. Event sourcing captures all changes to application state as a sequence of immutable events, enabling:
- Complete audit trails — Every state change is recorded
- Time-travel debugging — Replay events to reconstruct past states
- Temporal queries — Query state at any point in time
- Event-driven architectures — React to events across system boundaries
1. Event Shape
1.1 Standard Event Structure
All events MUST conform to this shape:
interface Event<TPayload = unknown, TMetadata extends EventMetadata = EventMetadata> {
/** Globally unique event identifier (UUIDv7 recommended for sortability) */
id: string;
/** Event type in domain.action format (e.g., "user.created", "order.shipped") */
type: string;
/** ISO 8601 timestamp when event occurred */
timestamp: string;
/** Event-specific data */
payload: TPayload;
/** Event metadata */
metadata: TMetadata;
}1.2 Event Metadata
Metadata provides context about the event's origin and processing:
interface EventMetadata {
/** Aggregate ID this event belongs to */
aggregateId: string;
/** Aggregate type (e.g., "User", "Order") */
aggregateType: string;
/** Sequence number within the aggregate (starts at 1) */
version: number;
/** ID of the command/action that caused this event (for correlation) */
correlationId?: string;
/** ID linking related events across aggregates (for distributed tracing) */
causationId?: string;
/** User or system that triggered the event */
actor?: string;
/** Additional context (tags, source system, etc.) */
context?: Record<string, unknown>;
}1.3 Event Type Naming
Event types follow a hierarchical naming convention:
<domain>.<entity>.<action>Examples:
- `user.created`
- `user.email.changed`
- `order.item.added`
- `payment.refunded`
Rules:
- Use lowercase with dots as separators
- Use past tense for actions (created, updated, deleted)
- Be specific and descriptive
1.4 Event ID Generation
Events MUST use UUIDv7 for IDs to ensure:
- Global uniqueness
- Temporal sortability
- No coordination required
import { uuidv7 } from 'uuidv7';
const eventId = uuidv7();2. Aggregate Patterns
2.1 Aggregate Definition
An aggregate is a cluster of domain objects treated as a single unit for data changes:
abstract class Aggregate<TState, TEvent extends Event> {
/** Unique identifier for this aggregate instance */
abstract readonly id: string;
/** Type name for this aggregate */
abstract readonly type: string;
/** Current state derived from events */
protected state: TState;
/** Current version (number of applied events) */
private _version: number = 0;
/** Uncommitted events pending persistence */
private _uncommittedEvents: TEvent[] = [];
/** Apply an event to update state (pure function) */
protected abstract apply(state: TState, event: TEvent): TState;
/** Get initial state for new aggregates */
protected abstract getInitialState(): TState;
}2.2 Command Handling
Commands are requests to change state. They validate business rules and emit events:
// Example: User aggregate handling a ChangeEmail command
class UserAggregate extends Aggregate<UserState, UserEvent> {
changeEmail(newEmail: string): void {
// Validate business rules
if (!isValidEmail(newEmail)) {
throw new DomainError('Invalid email format');
}
if (this.state.email === newEmail) {
return; // No change needed
}
// Emit event (does not persist yet)
this.emit({
type: 'user.email.changed',
payload: {
oldEmail: this.state.email,
newEmail
}
});
}
}2.3 Event Application
Events are applied to derive state. This MUST be a pure function:
protected apply(state: UserState, event: UserEvent): UserState {
switch (event.type) {
case 'user.created':
return {
id: event.payload.id,
email: event.payload.email,
name: event.payload.name,
createdAt: event.timestamp,
};
case 'user.email.changed':
return {
...state,
email: event.payload.newEmail,
};
default:
return state;
}
}2.4 Aggregate Invariants
1. Single Writer — Only one aggregate instance should handle commands for a given ID
2. Transactional Boundary — All events from a single command MUST be persisted atomically
3. Version Consistency — Event versions must be sequential with no gaps
4. Idempotent Apply — Applying the same event twice should produce the same state
3. Event Store
3.1 EventStore Interface
interface EventStore<TEvent extends Event = Event> {
/**
* Append events to a stream
* @throws OptimisticConcurrencyError if expectedVersion doesn't match
*/
append(
streamId: string,
events: TEvent[],
expectedVersion: number
): Promise<void>;
/**
* Read all events from a stream
*/
read(
streamId: string,
options?: ReadOptions
): AsyncIterable<TEvent>;
/**
* Read events across all streams (for projections)
*/
readAll(options?: ReadAllOptions): AsyncIterable<TEvent>;
/**
* Subscribe to new events
*/
subscribe(
handler: (event: TEvent) => Promise<void>,
options?: SubscribeOptions
): Subscription;
/**
* Get current version of a stream
*/
getVersion(streamId: string): Promise<number>;
}3.2 Stream Naming
Streams are named using the pattern:
<aggregateType>-<aggregateId>Examples:
- `User-123e4567-e89b-12d3-a456-426614174000`
- `Order-ORD-2024-001`
3.3 Optimistic Concurrency
The event store MUST enforce optimistic concurrency:
// expectedVersion: -1 = stream must not exist (new aggregate)
// expectedVersion: 0+ = stream must be at this version
await store.append('User-123', [event], expectedVersion);If the expected version doesn't match, throw `OptimisticConcurrencyError`.
3.4 Read Options
interface ReadOptions {
/** Start reading from this version (inclusive) */
fromVersion?: number;
/** Stop reading at this version (inclusive) */
toVersion?: number;
/** Maximum number of events to read */
limit?: number;
/** Read events in reverse order */
reverse?: boolean;
}4. Replay Semantics
4.1 Replay Function
Replay reconstructs state by applying events in sequence:
function replay<TState, TEvent extends Event>(
events: Iterable<TEvent>,
apply: (state: TState, event: TEvent) => TState,
initialState: TState
): TState {
let state = initialState;
for (const event of events) {
state = apply(state, event);
}
return state;
}4.2 Replay Invariants
1. Deterministic — Replaying the same events MUST produce identical state
2. Order Preserving — Events MUST be applied in version order
3. Side-Effect Free — Replay MUST NOT trigger external actions (emails, API calls)
4. Idempotent — Replaying is safe to do multiple times
4.3 Replay Modes
| Mode | Description | Use Case |
|---|---|---|
| Full Replay | All events from beginning | Rebuilding projections |
| Partial Replay | From snapshot + recent events | Loading aggregates |
| Selective Replay | Filtered by event type | Debugging specific flows |
| Temporal Replay | Up to a specific timestamp | Time-travel queries |
4.4 Async Replay
For large event streams, use async iteration:
async function replayAsync<TState, TEvent extends Event>(
events: AsyncIterable<TEvent>,
apply: (state: TState, event: TEvent) => TState,
initialState: TState
): Promise<TState> {
let state = initialState;
for await (const event of events) {
state = apply(state, event);
}
return state;
}5. Snapshots
5.1 Snapshot Purpose
Snapshots optimize replay by storing state at a point in time:
Events: [E1] → [E2] → [E3] → [SNAPSHOT @ v3] → [E4] → [E5]
Full replay: E1 → E2 → E3 → E4 → E5 (5 events)
With snapshot: SNAPSHOT → E4 → E5 (2 events)5.2 Snapshot Structure
interface Snapshot<TState> {
/** Aggregate ID */
aggregateId: string;
/** Aggregate type */
aggregateType: string;
/** Version this snapshot represents */
version: number;
/** Serialized state */
state: TState;
/** When snapshot was created */
createdAt: string;
/** Schema version for migration support */
schemaVersion: number;
}5.3 Snapshot Strategy
Snapshots should be created based on policy:
interface SnapshotPolicy {
/** Create snapshot every N events */
everyNEvents?: number;
/** Create snapshot if replay would take > N ms */
maxReplayTimeMs?: number;
/** Create snapshot if > N events since last snapshot */
eventThreshold?: number;
/** Create snapshot on specific event types */
triggerEvents?: string[];
}
// Example: Snapshot every 100 events or on 'order.completed'
const policy: SnapshotPolicy = {
everyNEvents: 100,
triggerEvents: ['order.completed']
};5.4 Snapshot Store Interface
interface SnapshotStore<TState> {
/** Save a snapshot */
save(snapshot: Snapshot<TState>): Promise<void>;
/** Load latest snapshot for an aggregate */
load(
aggregateType: string,
aggregateId: string
): Promise<Snapshot<TState> | null>;
/** Delete snapshots older than a version */
prune(
aggregateType: string,
aggregateId: string,
keepAfterVersion: number
): Promise<number>;
}6. Time-Travel Debugging
6.1 Overview
Time-travel debugging allows reconstructing system state at any point in time:
interface TimeTravel<TState> {
/** Get state at a specific timestamp */
stateAt(timestamp: Date): Promise<TState>;
/** Get state at a specific version */
stateAtVersion(version: number): Promise<TState>;
/** Get state changes between two points */
diff(from: Date, to: Date): Promise<StateDiff<TState>>;
/** List all events in a time range */
eventsInRange(from: Date, to: Date): AsyncIterable<Event>;
}6.2 Implementation
class TimeTravelDebugger<TState, TEvent extends Event> {
constructor(
private store: EventStore<TEvent>,
private apply: ApplyFunction<TState, TEvent>,
private initialState: TState,
private snapshotStore?: SnapshotStore<TState>
) {}
async stateAt(streamId: string, timestamp: Date): Promise<TState> {
// 1. Find nearest snapshot before timestamp
// 2. Replay events from snapshot to timestamp
// 3. Return reconstructed state
}
async eventsBefore(streamId: string, timestamp: Date): Promise<TEvent[]> {
const events: TEvent[] = [];
for await (const event of this.store.read(streamId)) {
if (new Date(event.timestamp) > timestamp) break;
events.push(event);
}
return events;
}
}6.3 Debugging Workflow
1. Identify Issue — User reports bug at specific time
2. Time Query — Reconstruct state at that moment
3. Event Inspection — Examine events leading to the state
4. Root Cause — Trace back to the problematic event/command
5. Fix & Verify — Fix code and replay to verify
6.4 Audit Trail Queries
// What happened to this aggregate?
const events = await store.read('User-123');
// Who changed what when?
for await (const event of events) {
console.log(`${event.timestamp}: ${event.metadata.actor} - ${event.type}`);
}
// Find all events by a specific user
const userActions = store.readAll({
filter: { 'metadata.actor': '[email]' }
});7. Best Practices
7.1 Event Design
- Events are facts — They describe what happened, not what should happen
- Events are immutable — Never modify past events
- Events are complete — Include all data needed to reconstruct state
- Events are versioned — Support schema evolution
7.2 Schema Evolution
When event schemas change:
1. Add optional fields — Backward compatible
2. Use event versioning — `user.created.v2`
3. Write upcasters — Transform old events to new format during replay
4. Never delete fields — Mark as deprecated instead
// Upcaster example
function upcast(event: Event): Event {
if (event.type === 'user.created' && !event.payload.role) {
return {
...event,
payload: { ...event.payload, role: 'member' }
};
}
return event;
}7.3 Performance Considerations
| Concern | Solution |
|---|---|
| Large event streams | Use snapshots |
| Slow projections | Parallel processing, batch updates |
| High write volume | Event batching, async persistence |
| Query performance | Build read-optimized projections |
7.4 Testing
// Given-When-Then style for aggregate testing
describe('UserAggregate', () => {
it('should emit email changed event', () => {
// Given: An existing user
const aggregate = replay<UserAggregate>([
createUserEvent({ id: '123', email: '[email]' })
]);
// When: Email is changed
aggregate.changeEmail('[email]');
// Then: EmailChanged event is emitted
expect(aggregate.uncommittedEvents).toContainEqual(
expect.objectContaining({
type: 'user.email.changed',
payload: { oldEmail: '[email]', newEmail: '[email]' }
})
);
});
});8. Glossary
| Term | Definition |
|---|---|
| Aggregate | Cluster of domain objects treated as a single unit |
| Command | Request to change state (validated, may be rejected) |
| Event | Immutable fact about something that happened |
| Event Store | Append-only database for events |
| Projection | Read-optimized view derived from events |
| Replay | Reconstructing state by applying events |
| Snapshot | Cached state at a point in time |
| Stream | Sequence of events for an aggregate |
| Upcaster | Function to migrate old event schemas |
References
- [Event Sourcing Pattern](https://martinfowler.com/eaaDev/EventSourcing.html) — Martin Fowler
- [CQRS Journey](https://docs.microsoft.com/en-us/previous-versions/msp-n-p/jj554200(v=pandp.10)) — Microsoft Patterns & Practices
- [Versioning in an Event Sourced System](https://leanpub.com/esversioning) — Greg Young
Promotion Decision
Attach run IDs, datasets, metrics, and reproduction commands.
Source Anchor
Comp-Core/packages/cc-event-sourcing/SPEC.md
Detected Structure
Abstract · Method · Evaluation · References · Architecture