Grand Diomande Research ยท Full HTML Reader

Background Worker Service

Asynchronous job processor for TrajectoryOS. Handles periodic tasks like background analysis, skill decay, notification generation, and data aggregation.

Agents That Account for Themselves research note experiment writeup candidate score 32 .md

Full Public Reader

Background Worker Service

Status: ๐Ÿšง Beta - Basic job infrastructure exists, scheduled tasks partially implemented

Asynchronous job processor for TrajectoryOS. Handles periodic tasks like background analysis, skill decay, notification generation, and data aggregation.

---

Quick Start

### Prerequisites
- Node.js 20+
- pnpm
- Redis (for job queue)

Installation

bash
# From repository root
cd services/background-worker

# Install dependencies
pnpm install

Development

bash
# Start Redis (required)
redis-server

# Start worker
pnpm dev

# Worker connects to Redis and starts processing jobs

---

Architecture

Directory Structure

background-worker/
โ”œโ”€โ”€ src/
โ”‚   โ”œโ”€โ”€ index.ts                    # Worker initialization
โ”‚   โ”œโ”€โ”€ scheduler.ts                # Job scheduling logic
โ”‚   โ”œโ”€โ”€ failureTracker.ts           # Failure detection & retry
โ”‚   โ”œโ”€โ”€ userFilter.ts               # User selection for jobs
โ”‚   โ”œโ”€โ”€ metrics.ts                  # Performance tracking
โ”‚   โ”‚
โ”‚   โ””โ”€โ”€ jobs/                       # Job handlers
โ”‚       โ””โ”€โ”€ runBackgroundGeneration.ts  # Background analysis job
โ”‚
โ”œโ”€โ”€ package.json
โ””โ”€โ”€ README.md

---

Core Concepts

Job Queue

Uses Bull (Redis-backed job queue) for reliable async processing:

typescript
import Queue from 'bull';

const analysisQueue = new Queue('background-analysis', {
  redis: { host: 'localhost', port: 6379 }
});

// Add job to queue
await analysisQueue.add('analyze-user', {
  userId: 'user_123',
  reason: 'nightly_analysis'
});

// Process jobs
analysisQueue.process('analyze-user', async (job) => {
  const { userId } = job.data;
  await analyzeUser(userId);
});

Scheduling

Jobs run on schedules:

JobSchedulePurpose
Nightly AnalysisDaily 2 AMGenerate insights, detect leverage points
Skill DecayDaily 4 AMUpdate skill confidence based on time
Weekly SummarySunday 9 AMSend weekly progress email
State SnapshotsEvery 6 hoursArchive life state history

---

Current Implementation

What Exists (30

โœ… Job Infrastructure:
- Bull queue setup
- Worker process
- Failure tracking
- Metrics collection

โœ… User Filtering:
- Select users for batch jobs
- Avoid overloading agent service
- Gradual rollout capability

๐Ÿšง Jobs (Partial):
- `runBackgroundGeneration` - Exists but needs LLM integration
- State snapshots - Not started
- Skill decay - Not started
- Email notifications - Not started

---

Implemented Jobs

Background Analysis (`runBackgroundGeneration`)

Status: ๐Ÿšง Partial - Framework exists, awaiting agent-orchestrator LLM integration

Purpose: Generate nightly insights for users

Current Implementation:

typescript
// src/jobs/runBackgroundGeneration.ts

export async function runBackgroundGeneration(userId: string) {
  console.log(`Running background analysis for ${userId}`);

  // 1. Fetch user state
  const state = await trajectoryCore.getState(userId);

  // 2. TODO: Call agent-orchestrator for LLM analysis
  // const insights = await agent.generateInsights(userId, state);

  // 3. TODO: Store insights
  // await db.insights.create({ userId, insights });

  console.log(`Completed analysis for ${userId}`);
}

Planned (Q1 2026):
- Integration with agent-orchestrator LLM service
- Leverage point detection
- Conflict warnings
- Trajectory predictions

---

Planned Jobs

1. Skill Decay (Planned Q1 2026)

Purpose: Reduce skill confidence for unused skills over time

Implementation:

typescript
async function processSkillDecay(userId: string) {
  const skills = await trajectoryCore.getSkills(userId);
  const now = Date.now();

  for (const skill of skills) {
    const daysSinceUpdate = (now - skill.updatedAt) / (1000 * 60 * 60 * 24);

    if (daysSinceUpdate > 90) {
      // Decay confidence by 10% every 90 days
      const newConfidence = skill.confidence * 0.9;

      await trajectoryCore.updateSkill(skill.id, {
        confidence: newConfidence,
        decayReason: `No evidence in ${daysSinceUpdate} days`
      });
    }
  }
}

Schedule: Daily at 4 AM

---

2. State Snapshots (Planned Q1 2026)

Purpose: Archive life state history for trajectory analysis

Implementation:

typescript
async function captureStateSnapshot(userId: string) {
  const currentState = await trajectoryCore.getState(userId);

  await db.lifeState.create({
    userId,
    thrust: currentState.thrust,
    alignment: currentState.alignment,
    gravity: currentState.gravity,
    mass: currentState.mass,
    escapeIndex: currentState.escapeIndex,
    regime: currentState.regime,
    timestamp: new Date()
  });
}

Schedule: Every 6 hours

---

3. Weekly Summary Emails (Planned Q2 2026)

Purpose: Send progress reports to users

Implementation:

typescript
async function sendWeeklySummary(userId: string) {
  const user = await db.users.findUnique({ where: { id: userId } });
  const thisWeek = await trajectoryCore.getStateHistory(userId, { days: 7 });
  const lastWeek = await trajectoryCore.getStateHistory(userId, { days: 14, offset: 7 });

  const summary = {
    escapeIndexDelta: thisWeek[0].escapeIndex - lastWeek[0].escapeIndex,
    topInsights: await db.insights.findMany({ userId, limit: 3 }),
    accomplishments: await detectAccomplishments(userId, thisWeek)
  };

  await emailService.send({
    to: user.email,
    subject: `TrajectoryOS Weekly: ${summary.escapeIndexDelta > 0 ? '๐Ÿ“ˆ' : '๐Ÿ“‰'} ฮท ${summary.escapeIndexDelta > 0 ? 'increased' : 'decreased'}`,
    template: 'weekly-summary',
    data: summary
  });
}

Schedule: Sunday 9 AM

---

4. Cleanup Jobs (Planned Q2 2026)

Purpose: Delete old data, purge expired sessions

Examples:
- Delete interview sessions older than 30 days
- Purge expired JWT tokens
- Archive old state snapshots to cold storage

---

Configuration

Environment Variables

bash
# Redis
REDIS_URL=redis://localhost:6379

# Job Concurrency
MAX_CONCURRENT_JOBS=5
JOB_TIMEOUT_MS=300000  # 5 minutes

# Trajectory Core
TRAJECTORY_CORE_URL=http://localhost:3003

# Agent Orchestrator (when available)
AGENT_ORCHESTRATOR_URL=http://localhost:3004

# Email Service (when available)
SENDGRID_API_KEY=...
FROM_EMAIL=[email]

# Monitoring
SENTRY_DSN=...
LOG_LEVEL=info

---

Job Management

Adding a New Job

1. Create job handler in `src/jobs/`:

typescript
// src/jobs/myNewJob.ts
export async function myNewJob(userId: string, params: any) {
  console.log(`Running myNewJob for ${userId}`);

  // Job logic here

  return { success: true };
}

2. Register in scheduler:

typescript
// src/scheduler.ts
import { myNewJob } from './jobs/myNewJob';

scheduler.scheduleJob('my-new-job', '0 10 * * *', async () => {
  const users = await getUsersForJob();

  for (const user of users) {
    await jobQueue.add('my-new-job', {
      userId: user.id,
      params: {}
    });
  }
});

3. Add job processor:

typescript
// src/index.ts
jobQueue.process('my-new-job', async (job) => {
  const { userId, params } = job.data;
  return await myNewJob(userId, params);
});

---

Failure Handling

Automatic Retries

Jobs automatically retry on failure:

typescript
jobQueue.add('analyze-user', { userId }, {
  attempts: 3,           // Retry up to 3 times
  backoff: {
    type: 'exponential',
    delay: 60000         // 1 min, 2 min, 4 min
  }
});

Failure Tracking

typescript
// src/failureTracker.ts
export class FailureTracker {
  async recordFailure(jobId: string, error: Error) {
    await db.jobFailures.create({
      jobId,
      error: error.message,
      stack: error.stack,
      timestamp: new Date()
    });

    // Alert if >10 failures in 1 hour
    const recentFailures = await this.getRecentFailures(jobId, 3600000);
    if (recentFailures.length > 10) {
      await alerting.sendAlert({
        title: `Job ${jobId} failing repeatedly`,
        severity: 'high'
      });
    }
  }
}

---

Monitoring

Metrics Tracked

typescript
// src/metrics.ts
export class JobMetrics {
  async recordJobCompletion(jobName: string, duration: number) {
    await metrics.histogram('job_duration_seconds', duration / 1000, {
      job: jobName
    });

    await metrics.increment('job_completed_total', {
      job: jobName
    });
  }

  async recordJobFailure(jobName: string, error: Error) {
    await metrics.increment('job_failed_total', {
      job: jobName,
      error_type: error.constructor.name
    });
  }
}

Grafana Dashboard (Planned)

Metrics to track:
- Job completion rate
- Average job duration
- Queue depth (jobs waiting)
- Failure rate by job type
- Redis memory usage

---

Development

Running Locally

bash
# Terminal 1: Start Redis
redis-server

# Terminal 2: Start background worker
cd services/background-worker
pnpm dev

# Terminal 3: Monitor queue (optional)
redis-cli
> KEYS bull:background-analysis:*
> LRANGE bull:background-analysis:wait 0 -1

Testing Jobs

bash
# Trigger job manually (via trajectory-core)
curl -X POST http://localhost:3003/api/admin/trigger-job \
  -H "Content-Type: application/json" \
  -d '{
    "jobType": "background-analysis",
    "userId": "user_123"
  }'

# Check job status
curl http://localhost:3003/api/admin/jobs/JOB_ID

---

Deployment

Docker

dockerfile
FROM node:20-alpine
WORKDIR /app
COPY package.json pnpm-lock.yaml ./
RUN npm install -g pnpm && pnpm install --frozen-lockfile
COPY . .
RUN pnpm build
CMD ["node", "dist/index.js"]

Docker Compose (with Redis)

yaml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  background-worker:
    build: .
    depends_on:
      - redis
    environment:
      REDIS_URL: redis://redis:6379
      TRAJECTORY_CORE_URL: http://trajectory-core:3003
    restart: unless-stopped

volumes:
  redis-data:

---

Scaling

Horizontal Scaling

Multiple worker instances can process jobs concurrently:

bash
# Run 3 worker instances
docker-compose up --scale background-worker=3

Bull automatically distributes jobs across workers.

Job Prioritization

typescript
// High priority jobs process first
await jobQueue.add('critical-analysis', { userId }, { priority: 10 });
await jobQueue.add('routine-analysis', { userId }, { priority: 1 });

---

Troubleshooting

Common Issues

Issue: Worker not processing jobs
- Check: Is Redis running? `redis-cli ping` should return `PONG`
- Check: Are there jobs in the queue? `redis-cli LLEN bull:background-analysis:wait`
- Solution: Restart worker, check REDIS_URL in `.env`

Issue: Jobs timing out
- Solution: Increase `JOB_TIMEOUT_MS` in config
- Alternative: Break job into smaller sub-jobs

Issue: Memory issues with large queues
- Solution: Increase Redis max memory: `redis-cli CONFIG SET maxmemory 2gb`
- Alternative: Use job result expiration: `removeOnComplete: { age: 3600 }`

Issue: Jobs failing silently
- Check: Worker logs for errors
- Check: Job failure table in database
- Solution: Enable detailed logging: `LOG_LEVEL=debug`

---

Performance

Current Throughput

  • Single worker: ~50-100 jobs/minute (depends on job complexity)
  • Job queue overhead: ~5-10ms per job
  • Redis latency: <1ms (local), ~5-20ms (remote)

Optimization Tips

1. Batch operations: Process multiple users per job
2. Caching: Cache trajectory-core responses
3. Parallelization: Use worker pools within jobs
4. Job prioritization: Critical jobs skip the line

---

Future Enhancements

Q1 2026:
- [ ] Complete LLM integration for background analysis
- [ ] Implement skill decay job
- [ ] Add state snapshot archival
- [ ] Failure alerting (Slack/PagerDuty)

Q2 2026:
- [ ] Email notification system
- [ ] Weekly summary reports
- [ ] Grafana monitoring dashboard
- [ ] Job replay/debugging tools

---

Related Documentation

  • [Service Architecture](../../docs/architecture/services.md) - System overview
  • [Trajectory Core](../trajectory-core/README.md) - Primary integration point
  • [Agent Orchestrator](../agent-orchestrator/README.md) - LLM service for analysis

---

License

MIT

---

Maintained by: TrajectoryOS Core Team
Last Updated: December 21, 2025
Service Status: ๐Ÿšง Beta (30
Next Milestone: LLM-powered background analysis (Q1 2026)

Promotion Decision

Attach run IDs, datasets, metrics, and reproduction commands.

Source Anchor

Comp-Core/backend/cc-trajectory/services/background-worker/README.md

Detected Structure

Method ยท Evaluation ยท Code Anchors ยท Architecture