Grand Diomande Research · Full HTML Reader

DEP — Pipeline Protocol Architecture Audit

The Pipeline Protocol is a 3-table Supabase schema (`pipeline_definitions`, `pipeline_runs`, `pipeline_step_logs`) with 2 VIEWs, 1 trigger, and a shared TypeScript module consumed by 3 edge functions. It bridges to Nexus observability via a Prometheus exporter, Grafana dashboard, Prefect watcher, and a Next.js portal page.

Agents That Account for Themselves architecture technical paper candidate score 62 .md

Full Public Reader

# DEP — Pipeline Protocol Architecture Audit
Date: 2026-02-23
Scope: Full architecture review — security, schema, code quality, dependencies, observability bridge
Systems: Supabase Edge Functions, Pipeline Protocol DB, Nexus Portal, Nexus Exporter, Prefect Watcher, Grafana, Alert Rules

---

Executive Summary

The Pipeline Protocol is a 3-table Supabase schema (`pipeline_definitions`, `pipeline_runs`, `pipeline_step_logs`) with 2 VIEWs, 1 trigger, and a shared TypeScript module consumed by 3 edge functions. It bridges to Nexus observability via a Prometheus exporter, Grafana dashboard, Prefect watcher, and a Next.js portal page.

Overall status: Functionally complete but not production-hardened.

The system works end-to-end (verified: run created, step logged, run completed, metrics collected). However, the audit uncovered 3 critical, 10 high, 7 medium, and 6 low severity findings across security, schema integrity, code quality, and dependency management.

The most urgent issues are:
1. Hardcoded Supabase keys in committed source (nexus-portal)
2. `db-migrate-temp` edge function still deployed (arbitrary DDL execution)
3. Pipeline core tables have no migration files (schema drift)
4. `ensureRun` race condition under concurrent dispatch
5. Column name drift between migrations and edge functions (`sweep_id` vs `campaign_id`, `base_quantity` vs `boxes_per_delivery`)

---

I. Security Findings (19 total)

CRITICAL (3)

#FindingLocationRisk
S1Hardcoded Supabase anon JWT + project ref in client-side source. 9-year expiry token committed to git.`nexus-portal/src/lib/api.ts:87-89`Key extraction from browser bundle grants read access to all pipeline data
S2`toggle_pipeline_run_pause` RPC has no auth guard. SECURITY DEFINER function callable by anon role — anyone with the leaked key can pause/unpause any pipeline run.RPC function (applied via temp edge fn)State manipulation by unauthenticated callers
S3Pipeline tables may lack RLS policies. Tables created via Management API, not tracked in migrations. Comment says "SELECT RLS for anon" but no migration confirms this. If RLS is not enabled, anon key grants full table access.`pipeline_runs`, `pipeline_step_logs`, `pipeline_definitions`Full data exposure via REST API

Remediation priority: Rotate anon key, add auth guard to RPC, verify and formalize RLS policies in migration.

HIGH (7)

#FindingLocation
S4`db-migrate-temp` edge function not deleted. Accepts raw DDL via `sql.unsafe()` with direct Postgres connection.`supabase/functions/db-migrate-temp/`
S5*Wildcard CORS (``) on all 4 edge functions.** Combined with bearer token auth, enables cross-origin mutation from any domain.All function `index.ts` files
S6No auth header validation before creating service-role Supabase client. Functions rely entirely on Supabase's `verify_jwt` flag (which was deployed as `false`).`reconstruct-delivery-grid`, `territory-pipeline`, `territory-scheduler`
S7Unvalidated `agent_id` passed directly to DB query. No UUID format check. `select('*')` leaks full agent config in error responses.`territory-pipeline/index.ts:84-101`
S8Unbounded `agent_ids` array accepted with no length cap. Combined with `force=true`, enables DoS via mass parallel dispatch.`territory-scheduler/index.ts:17-34`
S9Nexus exporter binds to `[ip]:9120` with no authentication. Exposes service names, agent counts, rate limits, pipeline slugs to any network host.`nexus_exporter.py:362`
S10Service role key used without validation in pipeline watcher. Error messages from pipeline runs posted to Discord may contain stack traces and table names.`pipeline_watcher.py:30`

MEDIUM (6)

#FindingLocation
S11Discord webhook URL redirectable via DB setting manipulation in trigger function.`notify_pipeline_step_failure()`
S12PostgREST filter params interpolated without URL encoding or allowlist.`nexus-portal/api.ts:169-173`
S13`runId` interpolated into step logs URL without UUID validation.`nexus-portal/api.ts:177-180`
S14Raw `Error.message` (with DB table/constraint names) returned in HTTP 500 bodies.All edge function error handlers
S15Internal Tailscale IPs (`[ip]`) and ports hardcoded in committed source.`nexus-portal/config.ts:8-9`
S16Silent RLS deny on `sweep_campaigns`/`sweep_prospects` — returns empty set, not 403. May mask permissions failures as empty data.Migrations 004, 005

LOW (3)

#FindingLocation
S17Discord `fetch()` error may leak webhook URL in scheduler response body.`territory-scheduler/index.ts:127`
S18Alert rule runbooks expose internal SSH hostnames, file paths, and SQL queries via Prometheus/Alertmanager API.`alert-rules.yml`
S19`ensureRun` error propagates raw DB error text into step log storage and portal UI.`pipeline-protocol.ts:123`

---

II. Schema & Data Integrity Findings (14 total)

CRITICAL (3)

#FindingImpact
D1Pipeline core tables have no migration file. Created via Management API, not in version control. `supabase db reset`, branch deploy, or fresh setup will fail.Cannot reproduce environment
D2`sweep_campaigns` — TABLE vs VIEW conflict. Migration 004 creates it as a table; migration 100004 creates it as a VIEW. On fresh deploy one of these will fail.Broken migration chain
D3`sweep_prospects.sweep_id` vs `campaign_id` column name drift. Migrations define `campaign_id`; all territory-pipeline queries use `sweep_id`. Either queries return 0 rows silently, or column was added manually and is untracked.All territory pipeline steps may be no-ops

HIGH (6)

#FindingImpact
D4No FK from `pipeline_runs.pipeline_slug` to `pipeline_definitions.slug`. Orphaned runs with invalid slugs can accumulate.Data integrity gap
D5Missing indexes on `pipeline_runs` for `ensureRun()` lookup `(pipeline_slug, run_key, status)`, metrics VIEW `GROUP BY`, and scheduler `next_run_at` filter.Full table scans on every invocation
D6Missing index on `pipeline_step_logs.run_id`. VIEW join and step log queries do full scans.O(n) query cost grows with log volume
D7`sweep_territory_agents.pipeline_run_id` FK not indexed. Postgres FK enforcement requires scanning this on `pipeline_runs` DELETE.Slow cascading deletes
D8`partner_delivery_configurations` and `weekly_delivery_tracking` have no migration files. The upsert on `(partner_id, week_start_date)` requires a unique constraint that may not exist.Upsert may fail at runtime
D9`base_quantity` vs `boxes_per_delivery` column name drift. `territory-pipeline` writes `boxes_per_delivery`; `reconstruct-delivery-grid` reads `base_quantity`. One of these silently fails.Data loss or silent insert failure

MEDIUM (5)

#FindingImpact
D10`pipeline_definitions.steps` JSONB has no CHECK constraint. Non-array values cause `steps_total = null` for all runs, breaking progress percentages.UI shows incorrect progress
D11Trigger does cross-table SELECT inside transaction. `notify_pipeline_step_failure` looks up `pipeline_slug` from `pipeline_runs` on every failed step insert.Latency on INSERT
D12`territory-scheduler` has no ORDER BY. Agents returned in arbitrary heap order. Later-inserted agents may be starved when pool is full.Unfair scheduling
D13`failStep()` read-then-write race condition. Concurrent agents read same `error_count`, both increment to same value. Under parallel dispatch, error counts are undercounted.Pause threshold never triggers
D14`name` vs `business_name` column drift on `sweep_prospects`. `reconstruct-delivery-grid` queries `.ilike('name', legacy.title)` but migration defines `business_name`.Legacy match always returns 0 results

---

III. Code Quality Findings (15 total)

HIGH (6)

#FindingFile:Line
C1`ensureRun` TOCTOU race condition. SELECT-then-INSERT without unique constraint. Concurrent callers create duplicate runs.`pipeline-protocol.ts:73-124`
C2`shouldDeliverInWeek` uses `Math.abs`. Hides negative week differences when `weekStart < cycleStart`. Dates before the cycle start incorrectly return `true`.`reconstruct-delivery-grid:27-33`
C3N+1 query in `match_legacy` loop. Individual `ilike` query per legacy location. Hundreds of round-trips possible.`reconstruct-delivery-grid:115-151`
C4No timeout on `callEdgeFunction` fetch. Downstream function hangs propagate to caller and chain up through sequential loops.`territory-pipeline:53-72`
C5Dual `ensureRun` ownership. Both `territory-scheduler` and `territory-pipeline` call `ensureRun` for the same agent, creating a window for duplicate runs.Scheduler:58-71, Pipeline:116-143
C6No timeout on scheduler dispatch fetch. A single hanging `territory-pipeline` call blocks the entire `Promise.allSettled`, losing all other results at function timeout.`territory-scheduler:76-88`

MEDIUM (7)

#FindingFile:Line
C7`completeStep` and `failStep` don't check UPDATE errors. If the UPDATE fails silently, steps are stuck in `started` status forever.`pipeline-protocol.ts:174-184, 209-240`
C8Pipeline runs left open on early exit. When `match_legacy` or `backfill_tracking` returns early (no data), `completeStep` is called but `completeRun` is not. Run stays `running` indefinitely.`reconstruct-delivery-grid:96-99, 185-188`
C9`verify` Promise.all discards query errors. If any count query errors, `count` is `null`, and `null
C10`nextStep` uninitialized variable. If a new step is added without a matching `case`, `default` silently resets to `idle` (latent reset loop).`territory-pipeline:147, 389`
C11`enrich` step filter possibly inverted. Queries prospects with `enrichment_status='pending'` AND `website NOT NULL`. If website is populated by enrichment itself, this query returns 0 rows permanently.`territory-pipeline:206-214`
C12Partner config creation is a side-effect in `ai_generate`. State mutation mixed into content generation step. Re-runs create configs with stale `cycle_start_date`.`territory-pipeline:307-333`
C13Non-null assertion `!` on env vars at request time. Missing env vars cause cryptic `TypeError` instead of meaningful error.All 3 edge functions

LOW (2)

#FindingFile:Line
C14`batchInserts` typed as `any[]`. Defeats type safety for the entire backfill loop.`reconstruct-delivery-grid:204`
C15CORS headers copy-pasted across 3 files. Should be in `_shared/cors.ts`.All 3 edge functions

---

IV. Dependency Map

Edge Function Call Graph

territory-scheduler
  └─► territory-pipeline (×19 parallel)
        ├─► market-sweep-search        ✅ exists
        ├─► market-sweep-enrich        ✅ exists
        ├─► market-sweep-prospeo-enrich ❌ MISSING (skipped gracefully)
        ├─► market-sweep-visual-score   ✅ exists
        ├─► market-sweep-ai-generate    ✅ exists
        ├─► market-sweep-email          ✅ exists
        └─► market-sweep-followup       ✅ exists

reconstruct-delivery-grid (manual trigger)
  └─► (no edge function calls — self-contained)

### External Service Dependencies
| Service | Used By | Env Var | Failure Mode |
|---------|---------|---------|--------------|
| SerpAPI | market-sweep-search | `SERPAPI_KEY` | Step fails, retries via backoff |
| OpenAI (gpt-4o-mini) | visual-score, ai-generate | `OPENAI_API_KEY` | Step fails, agent pauses after 3 |
| Resend | email, followup | `RESEND_API_KEY` | Step fails, no emails sent |
| Discord | scheduler, trigger, watcher | `DISCORD_WEBHOOK_URL` / `app.settings` | Notifications lost, pipeline continues |

Database Table Dependency Matrix

TableWritten ByRead ByMigration Exists
`pipeline_definitions`Management API (manual)Portal, `ensureRun`❌ No
`pipeline_runs`pipeline-protocol helpersPortal, exporter, watcher❌ No
`pipeline_step_logs`pipeline-protocol helpersPortal, exporter, trigger❌ No
`sweep_territory_agents`scheduler, pipelinescheduler, pipeline❌ No (Management API)
`market_sweeps`sweep-search, orchestratorpipeline, discord bot✅ Yes
`sweep_prospects`search, enrich, score, emailpipeline, discord bot, recon✅ Yes (but column drift)
`partner_delivery_configurations`pipeline (auto-create)recon (backfill)❌ No
`weekly_delivery_tracking`recon (backfill)❌ No
`locations`recon (import)❌ No
`locations_legacy`(archive)recon (match)❌ No

6 of 10 core tables have no migration file. This is the single biggest infrastructure risk.

Nexus Bridge Dependencies

Prometheus (Mac3:9090)
  └─ scrapes nexus_exporter (Mac3:9120)
       └─ queries Supabase REST: pipeline_metrics_summary VIEW, pipeline_step_metrics VIEW
            └─ requires: SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY env vars

Grafana (Mac3:3000)
  └─ reads Prometheus metrics: nexus_pipeline_*

Alertmanager (Mac3:9093)
  └─ evaluates alert-rules.yml: pipeline_health group (3 rules)

Prefect (Mac3:4200)
  └─ runs pipeline_watcher.py every 5 min
       └─ queries Supabase REST: pipeline_runs, pipeline_step_logs
       └─ posts to Discord webhook

Nexus Portal (Mac3:3001)
  └─ reads Supabase REST: pipeline_definitions, pipeline_runs, pipeline_step_logs
  └─ calls RPC: toggle_pipeline_run_pause

---

V. Architecture Assessment

### What Works Well
- Pipeline Protocol is a clean abstraction. `ensureRun/startStep/completeStep/failStep/advanceRun/completeRun` provide a composable interface any edge function can adopt.
- Dual-write pattern preserves backward compatibility. Domain tables (sweep_territory_agents) and protocol tables (pipeline_runs) are written independently — the protocol layer is additive, not replacing existing state.
- Server-side VIEWs (Evo 1) eliminate Python-side joins and reduce REST calls from 3-4 to 2.
- Definition versioning (Evo 4) snapshots step definitions at run creation — in-flight runs are immune to definition changes.
- Graceful degradation throughout — all protocol operations are wrapped in try/catch with `/ non-fatal /` guards. A Supabase outage doesn't stop the pipeline.
- Observability bridge is lightweight — Prometheus scrapes a single exporter endpoint; no Supabase SDK dependency in the monitoring stack.

### What Needs Work
- Schema is not reproducible. The gap between "what's live in the DB" and "what's in migration files" is critical. A fresh environment cannot be stood up from the repo alone.
- Concurrency model is fragile. The 19-agent parallel dispatch creates race conditions in `ensureRun`, `failStep` error counting, and dual run-ID ownership between scheduler and pipeline.
- No timeouts on downstream calls. `callEdgeFunction` and scheduler dispatch `fetch` calls have no `AbortController`. One hanging downstream function cascades to total scheduler timeout.
- Column name drift between migration definitions and edge function queries is a correctness risk. Silent failures (0-row returns, swallowed insert errors) make this hard to diagnose.
- Error messages flow from DB → step logs → portal UI. Internal table names and constraint details are exposed at every layer.

---

VI. Remediation Priority Matrix

Immediate (do now)

#ActionEffortFindings
1Delete `db-migrate-temp` edge function1 minS4
2Move Supabase keys to env vars in nexus-portal15 minS1
3Add auth guard to `toggle_pipeline_run_pause` RPC10 minS2
4Verify RLS is enabled on pipeline tables (check live DB)5 minS3

Short-term (this week)

#ActionEffortFindings
5Create migration files for all 6 untracked tables2 hrD1, D8
6Resolve `sweep_id` vs `campaign_id` column drift30 minD3
7Resolve `base_quantity` vs `boxes_per_delivery` column drift15 minD9
8Add missing indexes (3 on pipeline_runs, 2 on step_logs, 1 on agents)30 minD5, D6, D7
9Add AbortController timeouts to `callEdgeFunction` and scheduler dispatch30 minC4, C6
10Fix `ensureRun` race condition — add unique partial index or use upsert30 minC1
11Remove dual `ensureRun` from scheduler — pipeline owns run creation20 minC5
12Restrict CORS to known portal origin15 minS5

Medium-term (next sprint)

#ActionEffortFindings
13Fix `shouldDeliverInWeek` math — remove `Math.abs`, guard negative weeks15 minC2
14Batch `match_legacy` queries — pull prospects once, match in memory30 minC3
15Atomic `failStep` error increment — use Postgres function with RETURNING30 minD13
16Add UUID validation to all request body params20 minS7, S8, S13
17Sanitize error messages before HTTP responses and step log storage30 minS14, S19
18Close pipeline runs on early exit paths in reconstruct-delivery-grid15 minC8
19Check UPDATE errors in completeStep/failStep/advanceRun/completeRun20 minC7
20Bind exporter to loopback or Tailscale IP only5 minS9

---

VII. Risk Register

RiskLikelihoodImpactMitigation
Leaked anon key exploited for data accessMediumHighRotate key, move to env vars
`db-migrate-temp` used for arbitrary DDLLow (requires key)CriticalDelete function immediately
Duplicate pipeline runs from concurrent dispatchHigh (happens every 15min)MediumFix `ensureRun` race condition
Schema can't be reproduced in new environmentCertain (already true)HighCreate missing migration files
Column drift causes silent data lossHighHighAudit live schema vs code, reconcile
Downstream function timeout cascadesMediumMediumAdd per-call AbortController timeouts
Error count undercounted, agents never pauseMediumMediumAtomic increment via Postgres function

---

VIII. Architecture Diagram

┌─────────────────────────────────────────────────────────────────────┐
│                        NEXUS OBSERVABILITY                          │
│  ┌──────────┐  ┌──────────┐  ┌──────────────┐  ┌───────────────┐  │
│  │ Grafana  │  │Prometheus│  │ Alertmanager │  │ Nexus Portal  │  │
│  │ :3000    │←─│ :9090    │──│ :9093        │  │ :3001         │  │
│  └──────────┘  └────┬─────┘  └──────────────┘  └───────┬───────┘  │
│                     │                                    │          │
│              scrapes│:9120                    reads REST │API       │
│                     │                                    │          │
│  ┌──────────────────┴─────────────┐                     │          │
│  │ nexus_exporter.py              │                     │          │
│  │ pipeline_metrics_summary VIEW  │─────────────────────┤          │
│  │ pipeline_step_metrics VIEW     │                     │          │
│  └────────────────────────────────┘                     │          │
│                                                          │          │
│  ┌──────────────────────────────┐                       │          │
│  │ pipeline_watcher.py (Prefect)│───── Discord ◄────────┘          │
│  └──────────────────────────────┘         ▲                        │
└───────────────────────────────────────────┼────────────────────────┘
                                            │
                         pg_net trigger ─────┘

┌─────────────────────────────────────────────────────────────────────┐
│                     SUPABASE (wovknbcvxjvayfdflxkn)                 │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │ _shared/pipeline-protocol.ts                                 │   │
│  │ ensureRun → startStep → completeStep/failStep → completeRun │   │
│  └────────────────────────┬────────────────────────────────────┘   │
│                           │                                         │
│  ┌────────────────────────┼────────────────────────────────┐       │
│  │ EDGE FUNCTIONS         │                                 │       │
│  │                        ▼                                 │       │
│  │  territory-scheduler ──► territory-pipeline (×19)        │       │
│  │                           ├─► market-sweep-search        │       │
│  │                           ├─► market-sweep-enrich        │       │
│  │                           ├─► market-sweep-prospeo ❌    │       │
│  │                           ├─► market-sweep-visual-score  │       │
│  │                           ├─► market-sweep-ai-generate   │       │
│  │                           ├─► market-sweep-email         │       │
│  │                           └─► market-sweep-followup      │       │
│  │                                                          │       │
│  │  reconstruct-delivery-grid (match/backfill/verify)       │       │
│  └──────────────────────────────────────────────────────────┘       │
│                                                                     │
│  ┌──────────────────────────────────────────────────────────┐       │
│  │ DATABASE                                                  │       │
│  │                                                          │       │
│  │  Protocol:    pipeline_definitions                        │       │
│  │               pipeline_runs (+ definition snapshots)      │       │
│  │               pipeline_step_logs → trigger → Discord      │       │
│  │               pipeline_metrics_summary (VIEW)             │       │
│  │               pipeline_step_metrics (VIEW)                │       │
│  │                                                          │       │
│  │  Domain:      sweep_territory_agents (19 agents)          │       │
│  │               market_sweeps                               │       │
│  │               sweep_prospects (column drift ⚠️)           │       │
│  │               partner_delivery_configurations             │       │
│  │               weekly_delivery_tracking                    │       │
│  │               locations / locations_legacy                │       │
│  │               email_outreach / inbound_leads              │       │
│  └──────────────────────────────────────────────────────────┘       │
│                                                                     │
│  External: SerpAPI, OpenAI, Resend, Discord Webhooks               │
└─────────────────────────────────────────────────────────────────────┘

---

IX. Migration Inventory

### Tracked (in `supabase/migrations/`)
| File | Content | Status |
|------|---------|--------|
| `20260215*` – `20260222000003` | Core business tables (accounts, locations, contacts, products, etc.) | ✅ |
| `20260222000004` | `sweep_campaigns` as TABLE | ⚠️ Conflicts with 100004 |
| `20260222000005` | `sweep_prospects` with `campaign_id` column | ⚠️ Code uses `sweep_id` |
| `20260222100004` | `sweep_campaigns` as VIEW over `market_sweeps` | ⚠️ Conflicts with 000004 |
| `20260223000001` | Pipeline VIEWs + trigger (references untracked tables) | ⚠️ Fails on fresh deploy |

### Untracked (live in DB, no migration file)
| Table/Object | How Created | Urgency |
|--------------|-------------|---------|
| `pipeline_definitions` | Management API | Critical |
| `pipeline_runs` | Management API | Critical |
| `pipeline_step_logs` | Management API | Critical |
| `sweep_territory_agents` | Management API | Critical |
| `partner_delivery_configurations` | Unknown | High |
| `weekly_delivery_tracking` | Unknown | High |
| `locations` (extended columns) | Unknown | Medium |
| `locations_legacy` | Archive import | Medium |
| All pipeline indexes (4) | Management API | High |
| RLS policies (6) | Management API | Critical |
| `toggle_pipeline_run_pause` RPC | Temp edge function | High |
| `notify_pipeline_step_failure` + trigger | Temp edge function | High |

---

X. Verification Checklist

When remediations are applied, verify:

  • [ ] `supabase db reset` succeeds from migration files alone (clean environment test)
  • [ ] `territory-scheduler` dispatch creates exactly 1 pipeline_run per agent (not duplicates)
  • [ ] `territory-pipeline` step queries return >0 rows (column names match live schema)
  • [ ] `reconstruct-delivery-grid verify` returns `verified: true` with real counts (not `null || 0`)
  • [ ] Nexus Portal loads pipeline data without hardcoded keys in bundle
  • [ ] `db-migrate-temp` returns 404 (deleted)
  • [ ] `toggle_pipeline_run_pause` returns 403 for anon role
  • [ ] Exporter metrics flow to Prometheus (`curl :9120/metrics | grep nexus_pipeline`)
  • [ ] Failed step insertion triggers Discord webhook within 5 seconds

---

Generated by Claw Mesh Daemon — Pipeline Protocol DEP Audit 2026-02-23

Promotion Decision

Promote into a technical note or architecture paper with implementation anchors.

Source Anchor

Milk Men/DEP-PIPELINE-PROTOCOL-20260223.md

Detected Structure

Method · Evaluation · References · Code Anchors · Architecture