Background Worker Service
Asynchronous job processor for TrajectoryOS. Handles periodic tasks like background analysis, skill decay, notification generation, and data aggregation.
Full Public Reader
Background Worker Service
Status: ๐ง Beta - Basic job infrastructure exists, scheduled tasks partially implemented
Asynchronous job processor for TrajectoryOS. Handles periodic tasks like background analysis, skill decay, notification generation, and data aggregation.
---
Quick Start
### Prerequisites
- Node.js 20+
- pnpm
- Redis (for job queue)
Installation
# From repository root
cd services/background-worker
# Install dependencies
pnpm installDevelopment
# Start Redis (required)
redis-server
# Start worker
pnpm dev
# Worker connects to Redis and starts processing jobs---
Architecture
Directory Structure
background-worker/
โโโ src/
โ โโโ index.ts # Worker initialization
โ โโโ scheduler.ts # Job scheduling logic
โ โโโ failureTracker.ts # Failure detection & retry
โ โโโ userFilter.ts # User selection for jobs
โ โโโ metrics.ts # Performance tracking
โ โ
โ โโโ jobs/ # Job handlers
โ โโโ runBackgroundGeneration.ts # Background analysis job
โ
โโโ package.json
โโโ README.md---
Core Concepts
Job Queue
Uses Bull (Redis-backed job queue) for reliable async processing:
import Queue from 'bull';
const analysisQueue = new Queue('background-analysis', {
redis: { host: 'localhost', port: 6379 }
});
// Add job to queue
await analysisQueue.add('analyze-user', {
userId: 'user_123',
reason: 'nightly_analysis'
});
// Process jobs
analysisQueue.process('analyze-user', async (job) => {
const { userId } = job.data;
await analyzeUser(userId);
});Scheduling
Jobs run on schedules:
| Job | Schedule | Purpose |
|---|---|---|
| Nightly Analysis | Daily 2 AM | Generate insights, detect leverage points |
| Skill Decay | Daily 4 AM | Update skill confidence based on time |
| Weekly Summary | Sunday 9 AM | Send weekly progress email |
| State Snapshots | Every 6 hours | Archive life state history |
---
Current Implementation
What Exists (30
โ
Job Infrastructure:
- Bull queue setup
- Worker process
- Failure tracking
- Metrics collection
โ
User Filtering:
- Select users for batch jobs
- Avoid overloading agent service
- Gradual rollout capability
๐ง Jobs (Partial):
- `runBackgroundGeneration` - Exists but needs LLM integration
- State snapshots - Not started
- Skill decay - Not started
- Email notifications - Not started
---
Implemented Jobs
Background Analysis (`runBackgroundGeneration`)
Status: ๐ง Partial - Framework exists, awaiting agent-orchestrator LLM integration
Purpose: Generate nightly insights for users
Current Implementation:
// src/jobs/runBackgroundGeneration.ts
export async function runBackgroundGeneration(userId: string) {
console.log(`Running background analysis for ${userId}`);
// 1. Fetch user state
const state = await trajectoryCore.getState(userId);
// 2. TODO: Call agent-orchestrator for LLM analysis
// const insights = await agent.generateInsights(userId, state);
// 3. TODO: Store insights
// await db.insights.create({ userId, insights });
console.log(`Completed analysis for ${userId}`);
}Planned (Q1 2026):
- Integration with agent-orchestrator LLM service
- Leverage point detection
- Conflict warnings
- Trajectory predictions
---
Planned Jobs
1. Skill Decay (Planned Q1 2026)
Purpose: Reduce skill confidence for unused skills over time
Implementation:
async function processSkillDecay(userId: string) {
const skills = await trajectoryCore.getSkills(userId);
const now = Date.now();
for (const skill of skills) {
const daysSinceUpdate = (now - skill.updatedAt) / (1000 * 60 * 60 * 24);
if (daysSinceUpdate > 90) {
// Decay confidence by 10% every 90 days
const newConfidence = skill.confidence * 0.9;
await trajectoryCore.updateSkill(skill.id, {
confidence: newConfidence,
decayReason: `No evidence in ${daysSinceUpdate} days`
});
}
}
}Schedule: Daily at 4 AM
---
2. State Snapshots (Planned Q1 2026)
Purpose: Archive life state history for trajectory analysis
Implementation:
async function captureStateSnapshot(userId: string) {
const currentState = await trajectoryCore.getState(userId);
await db.lifeState.create({
userId,
thrust: currentState.thrust,
alignment: currentState.alignment,
gravity: currentState.gravity,
mass: currentState.mass,
escapeIndex: currentState.escapeIndex,
regime: currentState.regime,
timestamp: new Date()
});
}Schedule: Every 6 hours
---
3. Weekly Summary Emails (Planned Q2 2026)
Purpose: Send progress reports to users
Implementation:
async function sendWeeklySummary(userId: string) {
const user = await db.users.findUnique({ where: { id: userId } });
const thisWeek = await trajectoryCore.getStateHistory(userId, { days: 7 });
const lastWeek = await trajectoryCore.getStateHistory(userId, { days: 14, offset: 7 });
const summary = {
escapeIndexDelta: thisWeek[0].escapeIndex - lastWeek[0].escapeIndex,
topInsights: await db.insights.findMany({ userId, limit: 3 }),
accomplishments: await detectAccomplishments(userId, thisWeek)
};
await emailService.send({
to: user.email,
subject: `TrajectoryOS Weekly: ${summary.escapeIndexDelta > 0 ? '๐' : '๐'} ฮท ${summary.escapeIndexDelta > 0 ? 'increased' : 'decreased'}`,
template: 'weekly-summary',
data: summary
});
}Schedule: Sunday 9 AM
---
4. Cleanup Jobs (Planned Q2 2026)
Purpose: Delete old data, purge expired sessions
Examples:
- Delete interview sessions older than 30 days
- Purge expired JWT tokens
- Archive old state snapshots to cold storage
---
Configuration
Environment Variables
# Redis
REDIS_URL=redis://localhost:6379
# Job Concurrency
MAX_CONCURRENT_JOBS=5
JOB_TIMEOUT_MS=300000 # 5 minutes
# Trajectory Core
TRAJECTORY_CORE_URL=http://localhost:3003
# Agent Orchestrator (when available)
AGENT_ORCHESTRATOR_URL=http://localhost:3004
# Email Service (when available)
SENDGRID_API_KEY=...
FROM_EMAIL=[email]
# Monitoring
SENTRY_DSN=...
LOG_LEVEL=info---
Job Management
Adding a New Job
1. Create job handler in `src/jobs/`:
// src/jobs/myNewJob.ts
export async function myNewJob(userId: string, params: any) {
console.log(`Running myNewJob for ${userId}`);
// Job logic here
return { success: true };
}2. Register in scheduler:
// src/scheduler.ts
import { myNewJob } from './jobs/myNewJob';
scheduler.scheduleJob('my-new-job', '0 10 * * *', async () => {
const users = await getUsersForJob();
for (const user of users) {
await jobQueue.add('my-new-job', {
userId: user.id,
params: {}
});
}
});3. Add job processor:
// src/index.ts
jobQueue.process('my-new-job', async (job) => {
const { userId, params } = job.data;
return await myNewJob(userId, params);
});---
Failure Handling
Automatic Retries
Jobs automatically retry on failure:
jobQueue.add('analyze-user', { userId }, {
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential',
delay: 60000 // 1 min, 2 min, 4 min
}
});Failure Tracking
// src/failureTracker.ts
export class FailureTracker {
async recordFailure(jobId: string, error: Error) {
await db.jobFailures.create({
jobId,
error: error.message,
stack: error.stack,
timestamp: new Date()
});
// Alert if >10 failures in 1 hour
const recentFailures = await this.getRecentFailures(jobId, 3600000);
if (recentFailures.length > 10) {
await alerting.sendAlert({
title: `Job ${jobId} failing repeatedly`,
severity: 'high'
});
}
}
}---
Monitoring
Metrics Tracked
// src/metrics.ts
export class JobMetrics {
async recordJobCompletion(jobName: string, duration: number) {
await metrics.histogram('job_duration_seconds', duration / 1000, {
job: jobName
});
await metrics.increment('job_completed_total', {
job: jobName
});
}
async recordJobFailure(jobName: string, error: Error) {
await metrics.increment('job_failed_total', {
job: jobName,
error_type: error.constructor.name
});
}
}Grafana Dashboard (Planned)
Metrics to track:
- Job completion rate
- Average job duration
- Queue depth (jobs waiting)
- Failure rate by job type
- Redis memory usage
---
Development
Running Locally
# Terminal 1: Start Redis
redis-server
# Terminal 2: Start background worker
cd services/background-worker
pnpm dev
# Terminal 3: Monitor queue (optional)
redis-cli
> KEYS bull:background-analysis:*
> LRANGE bull:background-analysis:wait 0 -1Testing Jobs
# Trigger job manually (via trajectory-core)
curl -X POST http://localhost:3003/api/admin/trigger-job \
-H "Content-Type: application/json" \
-d '{
"jobType": "background-analysis",
"userId": "user_123"
}'
# Check job status
curl http://localhost:3003/api/admin/jobs/JOB_ID---
Deployment
Docker
FROM node:20-alpine
WORKDIR /app
COPY package.json pnpm-lock.yaml ./
RUN npm install -g pnpm && pnpm install --frozen-lockfile
COPY . .
RUN pnpm build
CMD ["node", "dist/index.js"]Docker Compose (with Redis)
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
background-worker:
build: .
depends_on:
- redis
environment:
REDIS_URL: redis://redis:6379
TRAJECTORY_CORE_URL: http://trajectory-core:3003
restart: unless-stopped
volumes:
redis-data:---
Scaling
Horizontal Scaling
Multiple worker instances can process jobs concurrently:
# Run 3 worker instances
docker-compose up --scale background-worker=3Bull automatically distributes jobs across workers.
Job Prioritization
// High priority jobs process first
await jobQueue.add('critical-analysis', { userId }, { priority: 10 });
await jobQueue.add('routine-analysis', { userId }, { priority: 1 });---
Troubleshooting
Common Issues
Issue: Worker not processing jobs
- Check: Is Redis running? `redis-cli ping` should return `PONG`
- Check: Are there jobs in the queue? `redis-cli LLEN bull:background-analysis:wait`
- Solution: Restart worker, check REDIS_URL in `.env`
Issue: Jobs timing out
- Solution: Increase `JOB_TIMEOUT_MS` in config
- Alternative: Break job into smaller sub-jobs
Issue: Memory issues with large queues
- Solution: Increase Redis max memory: `redis-cli CONFIG SET maxmemory 2gb`
- Alternative: Use job result expiration: `removeOnComplete: { age: 3600 }`
Issue: Jobs failing silently
- Check: Worker logs for errors
- Check: Job failure table in database
- Solution: Enable detailed logging: `LOG_LEVEL=debug`
---
Performance
Current Throughput
- Single worker: ~50-100 jobs/minute (depends on job complexity)
- Job queue overhead: ~5-10ms per job
- Redis latency: <1ms (local), ~5-20ms (remote)
Optimization Tips
1. Batch operations: Process multiple users per job
2. Caching: Cache trajectory-core responses
3. Parallelization: Use worker pools within jobs
4. Job prioritization: Critical jobs skip the line
---
Future Enhancements
Q1 2026:
- [ ] Complete LLM integration for background analysis
- [ ] Implement skill decay job
- [ ] Add state snapshot archival
- [ ] Failure alerting (Slack/PagerDuty)
Q2 2026:
- [ ] Email notification system
- [ ] Weekly summary reports
- [ ] Grafana monitoring dashboard
- [ ] Job replay/debugging tools
---
Related Documentation
- [Service Architecture](../../docs/architecture/services.md) - System overview
- [Trajectory Core](../trajectory-core/README.md) - Primary integration point
- [Agent Orchestrator](../agent-orchestrator/README.md) - LLM service for analysis
---
License
MIT
---
Maintained by: TrajectoryOS Core Team
Last Updated: December 21, 2025
Service Status: ๐ง Beta (30
Next Milestone: LLM-powered background analysis (Q1 2026)
Promotion Decision
Attach run IDs, datasets, metrics, and reproduction commands.
Source Anchor
Comp-Core/backend/cc-trajectory/services/background-worker/README.md
Detected Structure
Method ยท Evaluation ยท Code Anchors ยท Architecture