Grand Diomande Research · Full HTML Reader

cc-event-sourcing Specification

This specification defines the event sourcing patterns used across Comp-Core projects. Event sourcing captures all changes to application state as a sequence of immutable events, enabling:

Business Systems proposal experiment writeup candidate score 50 .md

Full Public Reader

cc-event-sourcing Specification

> Event Sourcing Standard for Comp-Core Projects

Overview

This specification defines the event sourcing patterns used across Comp-Core projects. Event sourcing captures all changes to application state as a sequence of immutable events, enabling:

  • Complete audit trails — Every state change is recorded
  • Time-travel debugging — Replay events to reconstruct past states
  • Temporal queries — Query state at any point in time
  • Event-driven architectures — React to events across system boundaries

1. Event Shape

1.1 Standard Event Structure

All events MUST conform to this shape:

typescript
interface Event<TPayload = unknown, TMetadata extends EventMetadata = EventMetadata> {
  /** Globally unique event identifier (UUIDv7 recommended for sortability) */
  id: string;

  /** Event type in domain.action format (e.g., "user.created", "order.shipped") */
  type: string;

  /** ISO 8601 timestamp when event occurred */
  timestamp: string;

  /** Event-specific data */
  payload: TPayload;

  /** Event metadata */
  metadata: TMetadata;
}

1.2 Event Metadata

Metadata provides context about the event's origin and processing:

typescript
interface EventMetadata {
  /** Aggregate ID this event belongs to */
  aggregateId: string;

  /** Aggregate type (e.g., "User", "Order") */
  aggregateType: string;

  /** Sequence number within the aggregate (starts at 1) */
  version: number;

  /** ID of the command/action that caused this event (for correlation) */
  correlationId?: string;

  /** ID linking related events across aggregates (for distributed tracing) */
  causationId?: string;

  /** User or system that triggered the event */
  actor?: string;

  /** Additional context (tags, source system, etc.) */
  context?: Record<string, unknown>;
}

1.3 Event Type Naming

Event types follow a hierarchical naming convention:

<domain>.<entity>.<action>

Examples:
- `user.created`
- `user.email.changed`
- `order.item.added`
- `payment.refunded`

Rules:
- Use lowercase with dots as separators
- Use past tense for actions (created, updated, deleted)
- Be specific and descriptive

1.4 Event ID Generation

Events MUST use UUIDv7 for IDs to ensure:
- Global uniqueness
- Temporal sortability
- No coordination required

typescript
import { uuidv7 } from 'uuidv7';
const eventId = uuidv7();

2. Aggregate Patterns

2.1 Aggregate Definition

An aggregate is a cluster of domain objects treated as a single unit for data changes:

typescript
abstract class Aggregate<TState, TEvent extends Event> {
  /** Unique identifier for this aggregate instance */
  abstract readonly id: string;

  /** Type name for this aggregate */
  abstract readonly type: string;

  /** Current state derived from events */
  protected state: TState;

  /** Current version (number of applied events) */
  private _version: number = 0;

  /** Uncommitted events pending persistence */
  private _uncommittedEvents: TEvent[] = [];

  /** Apply an event to update state (pure function) */
  protected abstract apply(state: TState, event: TEvent): TState;

  /** Get initial state for new aggregates */
  protected abstract getInitialState(): TState;
}

2.2 Command Handling

Commands are requests to change state. They validate business rules and emit events:

typescript
// Example: User aggregate handling a ChangeEmail command
class UserAggregate extends Aggregate<UserState, UserEvent> {
  changeEmail(newEmail: string): void {
    // Validate business rules
    if (!isValidEmail(newEmail)) {
      throw new DomainError('Invalid email format');
    }
    if (this.state.email === newEmail) {
      return; // No change needed
    }

    // Emit event (does not persist yet)
    this.emit({
      type: 'user.email.changed',
      payload: {
        oldEmail: this.state.email,
        newEmail
      }
    });
  }
}

2.3 Event Application

Events are applied to derive state. This MUST be a pure function:

typescript
protected apply(state: UserState, event: UserEvent): UserState {
  switch (event.type) {
    case 'user.created':
      return {
        id: event.payload.id,
        email: event.payload.email,
        name: event.payload.name,
        createdAt: event.timestamp,
      };

    case 'user.email.changed':
      return {
        ...state,
        email: event.payload.newEmail,
      };

    default:
      return state;
  }
}

2.4 Aggregate Invariants

1. Single Writer — Only one aggregate instance should handle commands for a given ID
2. Transactional Boundary — All events from a single command MUST be persisted atomically
3. Version Consistency — Event versions must be sequential with no gaps
4. Idempotent Apply — Applying the same event twice should produce the same state

3. Event Store

3.1 EventStore Interface

typescript
interface EventStore<TEvent extends Event = Event> {
  /**
   * Append events to a stream
   * @throws OptimisticConcurrencyError if expectedVersion doesn't match
   */
  append(
    streamId: string,
    events: TEvent[],
    expectedVersion: number
  ): Promise<void>;

  /**
   * Read all events from a stream
   */
  read(
    streamId: string,
    options?: ReadOptions
  ): AsyncIterable<TEvent>;

  /**
   * Read events across all streams (for projections)
   */
  readAll(options?: ReadAllOptions): AsyncIterable<TEvent>;

  /**
   * Subscribe to new events
   */
  subscribe(
    handler: (event: TEvent) => Promise<void>,
    options?: SubscribeOptions
  ): Subscription;

  /**
   * Get current version of a stream
   */
  getVersion(streamId: string): Promise<number>;
}

3.2 Stream Naming

Streams are named using the pattern:

<aggregateType>-<aggregateId>

Examples:
- `User-123e4567-e89b-12d3-a456-426614174000`
- `Order-ORD-2024-001`

3.3 Optimistic Concurrency

The event store MUST enforce optimistic concurrency:

typescript
// expectedVersion: -1 = stream must not exist (new aggregate)
// expectedVersion: 0+ = stream must be at this version
await store.append('User-123', [event], expectedVersion);

If the expected version doesn't match, throw `OptimisticConcurrencyError`.

3.4 Read Options

typescript
interface ReadOptions {
  /** Start reading from this version (inclusive) */
  fromVersion?: number;

  /** Stop reading at this version (inclusive) */
  toVersion?: number;

  /** Maximum number of events to read */
  limit?: number;

  /** Read events in reverse order */
  reverse?: boolean;
}

4. Replay Semantics

4.1 Replay Function

Replay reconstructs state by applying events in sequence:

typescript
function replay<TState, TEvent extends Event>(
  events: Iterable<TEvent>,
  apply: (state: TState, event: TEvent) => TState,
  initialState: TState
): TState {
  let state = initialState;
  for (const event of events) {
    state = apply(state, event);
  }
  return state;
}

4.2 Replay Invariants

1. Deterministic — Replaying the same events MUST produce identical state
2. Order Preserving — Events MUST be applied in version order
3. Side-Effect Free — Replay MUST NOT trigger external actions (emails, API calls)
4. Idempotent — Replaying is safe to do multiple times

4.3 Replay Modes

ModeDescriptionUse Case
Full ReplayAll events from beginningRebuilding projections
Partial ReplayFrom snapshot + recent eventsLoading aggregates
Selective ReplayFiltered by event typeDebugging specific flows
Temporal ReplayUp to a specific timestampTime-travel queries

4.4 Async Replay

For large event streams, use async iteration:

typescript
async function replayAsync<TState, TEvent extends Event>(
  events: AsyncIterable<TEvent>,
  apply: (state: TState, event: TEvent) => TState,
  initialState: TState
): Promise<TState> {
  let state = initialState;
  for await (const event of events) {
    state = apply(state, event);
  }
  return state;
}

5. Snapshots

5.1 Snapshot Purpose

Snapshots optimize replay by storing state at a point in time:

Events: [E1] → [E2] → [E3] → [SNAPSHOT @ v3] → [E4] → [E5]

Full replay:   E1 → E2 → E3 → E4 → E5  (5 events)
With snapshot: SNAPSHOT → E4 → E5     (2 events)

5.2 Snapshot Structure

typescript
interface Snapshot<TState> {
  /** Aggregate ID */
  aggregateId: string;

  /** Aggregate type */
  aggregateType: string;

  /** Version this snapshot represents */
  version: number;

  /** Serialized state */
  state: TState;

  /** When snapshot was created */
  createdAt: string;

  /** Schema version for migration support */
  schemaVersion: number;
}

5.3 Snapshot Strategy

Snapshots should be created based on policy:

typescript
interface SnapshotPolicy {
  /** Create snapshot every N events */
  everyNEvents?: number;

  /** Create snapshot if replay would take > N ms */
  maxReplayTimeMs?: number;

  /** Create snapshot if > N events since last snapshot */
  eventThreshold?: number;

  /** Create snapshot on specific event types */
  triggerEvents?: string[];
}

// Example: Snapshot every 100 events or on 'order.completed'
const policy: SnapshotPolicy = {
  everyNEvents: 100,
  triggerEvents: ['order.completed']
};

5.4 Snapshot Store Interface

typescript
interface SnapshotStore<TState> {
  /** Save a snapshot */
  save(snapshot: Snapshot<TState>): Promise<void>;

  /** Load latest snapshot for an aggregate */
  load(
    aggregateType: string,
    aggregateId: string
  ): Promise<Snapshot<TState> | null>;

  /** Delete snapshots older than a version */
  prune(
    aggregateType: string,
    aggregateId: string,
    keepAfterVersion: number
  ): Promise<number>;
}

6. Time-Travel Debugging

6.1 Overview

Time-travel debugging allows reconstructing system state at any point in time:

typescript
interface TimeTravel<TState> {
  /** Get state at a specific timestamp */
  stateAt(timestamp: Date): Promise<TState>;

  /** Get state at a specific version */
  stateAtVersion(version: number): Promise<TState>;

  /** Get state changes between two points */
  diff(from: Date, to: Date): Promise<StateDiff<TState>>;

  /** List all events in a time range */
  eventsInRange(from: Date, to: Date): AsyncIterable<Event>;
}

6.2 Implementation

typescript
class TimeTravelDebugger<TState, TEvent extends Event> {
  constructor(
    private store: EventStore<TEvent>,
    private apply: ApplyFunction<TState, TEvent>,
    private initialState: TState,
    private snapshotStore?: SnapshotStore<TState>
  ) {}

  async stateAt(streamId: string, timestamp: Date): Promise<TState> {
    // 1. Find nearest snapshot before timestamp
    // 2. Replay events from snapshot to timestamp
    // 3. Return reconstructed state
  }

  async eventsBefore(streamId: string, timestamp: Date): Promise<TEvent[]> {
    const events: TEvent[] = [];
    for await (const event of this.store.read(streamId)) {
      if (new Date(event.timestamp) > timestamp) break;
      events.push(event);
    }
    return events;
  }
}

6.3 Debugging Workflow

1. Identify Issue — User reports bug at specific time
2. Time Query — Reconstruct state at that moment
3. Event Inspection — Examine events leading to the state
4. Root Cause — Trace back to the problematic event/command
5. Fix & Verify — Fix code and replay to verify

6.4 Audit Trail Queries

typescript
// What happened to this aggregate?
const events = await store.read('User-123');

// Who changed what when?
for await (const event of events) {
  console.log(`${event.timestamp}: ${event.metadata.actor} - ${event.type}`);
}

// Find all events by a specific user
const userActions = store.readAll({
  filter: { 'metadata.actor': '[email]' }
});

7. Best Practices

7.1 Event Design

  • Events are facts — They describe what happened, not what should happen
  • Events are immutable — Never modify past events
  • Events are complete — Include all data needed to reconstruct state
  • Events are versioned — Support schema evolution

7.2 Schema Evolution

When event schemas change:

1. Add optional fields — Backward compatible
2. Use event versioning — `user.created.v2`
3. Write upcasters — Transform old events to new format during replay
4. Never delete fields — Mark as deprecated instead

typescript
// Upcaster example
function upcast(event: Event): Event {
  if (event.type === 'user.created' && !event.payload.role) {
    return {
      ...event,
      payload: { ...event.payload, role: 'member' }
    };
  }
  return event;
}

7.3 Performance Considerations

ConcernSolution
Large event streamsUse snapshots
Slow projectionsParallel processing, batch updates
High write volumeEvent batching, async persistence
Query performanceBuild read-optimized projections

7.4 Testing

typescript
// Given-When-Then style for aggregate testing
describe('UserAggregate', () => {
  it('should emit email changed event', () => {
    // Given: An existing user
    const aggregate = replay<UserAggregate>([
      createUserEvent({ id: '123', email: '[email]' })
    ]);

    // When: Email is changed
    aggregate.changeEmail('[email]');

    // Then: EmailChanged event is emitted
    expect(aggregate.uncommittedEvents).toContainEqual(
      expect.objectContaining({
        type: 'user.email.changed',
        payload: { oldEmail: '[email]', newEmail: '[email]' }
      })
    );
  });
});

8. Glossary

TermDefinition
AggregateCluster of domain objects treated as a single unit
CommandRequest to change state (validated, may be rejected)
EventImmutable fact about something that happened
Event StoreAppend-only database for events
ProjectionRead-optimized view derived from events
ReplayReconstructing state by applying events
SnapshotCached state at a point in time
StreamSequence of events for an aggregate
UpcasterFunction to migrate old event schemas

References

  • [Event Sourcing Pattern](https://martinfowler.com/eaaDev/EventSourcing.html) — Martin Fowler
  • [CQRS Journey](https://docs.microsoft.com/en-us/previous-versions/msp-n-p/jj554200(v=pandp.10)) — Microsoft Patterns & Practices
  • [Versioning in an Event Sourced System](https://leanpub.com/esversioning) — Greg Young

Promotion Decision

Attach run IDs, datasets, metrics, and reproduction commands.

Source Anchor

Comp-Core/packages/cc-event-sourcing/SPEC.md

Detected Structure

Abstract · Method · Evaluation · References · Architecture