Skip to content
Gary Wu
Go back

Multi-Agent AI Processing Pipelines on Cloudflare Workers

Edit page

Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28


Most multi-agent AI frameworks assume you have a Python server, a Redis cluster, and a deployment pipeline. They assume your agents will run for minutes, consume gigabytes of memory, and need a Kubernetes cluster to orchestrate. What if you could build the same thing on a serverless platform where each agent runs in isolation, costs nothing at rest, and scales to zero between invocations?

This article shows how to build multi-agent AI processing pipelines entirely on Cloudflare Workers. Not toy chatbots — production systems where specialized AI agents evaluate data, debate each other, form consensus, remember what they learned, and make recommendations. All running on edge infrastructure with sub-millisecond cold starts, D1-backed persistent memory, and per-request cost tracking measured in fractions of a cent.

What you will learn:



You want to build a system that takes raw data — say, market opportunities, content ideas, or investment signals — and runs it through multiple AI agents that each analyze a different dimension. One agent evaluates demand. Another challenges assumptions. A third looks for hidden risks. A fourth synthesizes everything into a recommendation.

The standard approach is to reach for LangChain, CrewAI, or AutoGen. Here is what happens next:

You deploy a Python server. Now you need a VM, a container, or a serverless function with Python runtime support. Cold starts are measured in seconds. You pay for idle time. You need to manage dependencies, virtual environments, and deployment pipelines.

You add Redis for state. Agents need to share context, remember past runs, and coordinate. Redis adds another service to manage, another failure mode, and another cost line.

You hit CPU time limits. A pipeline that runs 6 agents sequentially can take 30-60 seconds of LLM inference. Most serverless platforms have execution limits that force you into background job infrastructure.

You cannot track costs. When Agent A calls GPT-4 and Agent B calls Claude, the bill arrives as a single line item. You have no idea which agent consumed what, or whether the expensive model actually improved output quality.

You scale to infinity, not zero. When there is no work, your Python server still runs. When there is a burst, your single-threaded agent framework processes items sequentially.

The Workers-native approach solves all of these:

Pain PointTraditional FrameworkWorkers-Native
Cold start2-10 seconds (Python)<5ms
State managementExternal Redis/PostgresD1 + DO storage (built-in)
Idle cost$25-100/month minimum$0
CPU limits30-60s before timeoutWorkflows: unlimited via step-based execution
Cost trackingManual, per-providerAutomatic, per-agent, per-call
IsolationShared process, shared memoryPer-request V8 isolate
DeploymentDocker, CI/CD, environment managementwrangler deploy

Key insight: The multi-agent pipeline is not a monolithic process. It is a series of small, focused computations with state checkpoints between them. This maps perfectly to Workers + Durable Objects + Workflows.


The system has four layers:

                    +----------------------------+
                    |     API / Cron Trigger      |
                    |  (Worker fetch handler)     |
                    +-----------+----------------+
                                |
                                v
                    +----------------------------+
                    |   OpportunityHarness DO     |
                    |  (Coordinator / Governor)   |
                    |  - Budget tracking          |
                    |  - Phase management          |
                    |  - Alarm-driven tick loop    |
                    +-----------+----------------+
                                |
              +-----------------+------------------+
              |                 |                   |
              v                 v                   v
    +--------------+  +--------------+  +--------------+
    | Gather Phase |  | Analyze Phase|  | Evaluate     |
    | data-coverage|  | genre-scanner|  | evaluator    |
    | (deterministic) | niche-analyzer| | critic       |
    +--------------+  | (LLM)       |  | lens-applicator
                      +--------------+  | (LLM)       |
                                        +--------------+
                                                |
                                                v
                                     +-------------------+
                                     | Debate System      |
                                     | 6 agent roles      |
                                     | 3-phase protocol   |
                                     +-------------------+
                                                |
                                                v
                                     +-------------------+
                                     | Agent Memory       |
                                     | Core beliefs       |
                                     | Episodic memory    |
                                     | Reflective updates |
                                     +-------------------+

Each processor is a pure function: it receives a context (database handle, AI binding, budget state, model selector) and returns an array of results. The harness calls processors one at a time, records telemetry, and advances through phases until the cycle completes.

The key primitives from Cloudflare:


The Agent Processor

Every agent in the pipeline is a processor: a pure function registered by name, called by the harness when its phase is active.

// The processor signature — every agent implements this
export type ProcessorFn = (ctx: ProcessorContext) => Promise<ProcessorResult[]>;

export interface ProcessorContext {
  db: D1Database;
  ai: Ai;                          // Workers AI binding
  apiKey: string;                   // External LLM API key (e.g., Gemini)
  config: HarnessConfig;           // Full pipeline configuration
  processor: ProcessorConfig;       // This processor's specific config
  cycle: CycleState;               // Current cycle state (budget, counters, work queues)
  selectModel: () => string;       // Budget-aware model selector
  remainingBudget: () => number;   // USD remaining for this processor
}

export interface ProcessorResult {
  processor: string;               // 'evaluator', 'critic', etc.
  entity_type: string;             // What was processed
  entity_id: string;               // Unique identifier
  model_used: string | null;       // null for deterministic processors
  tokens_input: number;
  tokens_output: number;
  cost_usd: number;
  duration_ms: number;
  success: boolean;
  error_message?: string;
  result_summary?: string;
  spawned_entities?: Array<{ type: string; id: string }>;
}

The processor registry is a simple Map:

const PROCESSOR_REGISTRY = new Map<string, ProcessorFn>();

export function registerProcessor(name: string, fn: ProcessorFn) {
  PROCESSOR_REGISTRY.set(name, fn);
}

export function getProcessor(name: string): ProcessorFn | undefined {
  return PROCESSOR_REGISTRY.get(name);
}

Processors self-register at module load time. When a file like evaluator.ts is imported, its registerProcessor('evaluator', async (ctx) => { ... }) call adds it to the registry. The harness looks up processors by name from the config.

Key insight: Processors are pure functions, not classes. They do not hold state. All state lives in D1 or the cycle object. This means any processor can be tested in isolation by constructing a ProcessorContext with a test database.

The Harness Config

The pipeline is configured entirely through a database record. This means a dashboard can change processor weights, model tiers, budget limits, and batch sizes while the pipeline is running — the harness reloads config from DB at the start of every tick.

export interface HarnessConfig {
  id: number;
  slug: string;
  name: string;
  version: number;
  status: string;                    // 'active' | 'paused' | 'error'

  // Budget
  budget_mode: 'soft' | 'hard';     // soft = skip expensive, hard = stop entirely
  budget_max_per_cycle: number;      // USD cap per processing cycle
  budget_lambda: number;             // decay factor for adaptive budgeting

  // Models (tiered by cost)
  model_default: string;             // e.g., 'gemini-2.0-flash'
  model_expensive: string;           // e.g., 'gemini-2.5-flash'
  model_premium: string;             // e.g., 'gemini-2.5-pro'
  model_upgrade_threshold: number;   // score above which to upgrade model

  // Cycle control
  cycle_interval_ms: number;         // time between cycles
  cycle_max_iterations: number;
  cycle_genres_limit: number;
  cycle_niches_per_genre: number;
  cycle_critique_threshold: number;  // minimum score to trigger critique

  // Processors (ordered)
  processors: ProcessorConfig[];

  // Focus (optional filtering)
  focus_genres: string[] | null;
  focus_niches: string[] | null;

  // Memory
  memory_enabled: boolean;
  memory_reflect_every_n: number;    // reflect after every N debates
}

export interface ProcessorConfig {
  name: string;                      // 'data-coverage', 'evaluator', etc.
  type: 'deterministic' | 'llm';
  enabled: boolean;
  phase: string;                     // 'gather' | 'analyze' | 'evaluate' | 'critique'

  // LLM-specific
  model_tier?: 'default' | 'expensive' | 'premium';
  max_tokens?: number;
  temperature?: number;
  prompt_version?: number;

  // Budget allocation
  weight: number;                    // share of budget (0.0-1.0)
  max_cost_per_call: number;         // hard cap per invocation

  // Execution
  batch_size: number;                // entities per tick
}

A typical config stores its processors in JSON:

{
  "slug": "market-scanner",
  "budget_mode": "soft",
  "budget_max_per_cycle": 0.50,
  "model_default": "gemini-2.0-flash",
  "model_expensive": "gemini-2.5-flash",
  "model_premium": "gemini-2.5-pro",
  "processors": [
    {
      "name": "data-coverage",
      "type": "deterministic",
      "phase": "gather",
      "enabled": true,
      "weight": 0,
      "max_cost_per_call": 0,
      "batch_size": 10
    },
    {
      "name": "genre-scanner",
      "type": "deterministic",
      "phase": "gather",
      "enabled": true,
      "weight": 0,
      "max_cost_per_call": 0,
      "batch_size": 1
    },
    {
      "name": "niche-analyzer",
      "type": "llm",
      "phase": "analyze",
      "enabled": true,
      "model_tier": "default",
      "weight": 0.3,
      "max_cost_per_call": 0.02,
      "batch_size": 3,
      "temperature": 0.3
    },
    {
      "name": "evaluator",
      "type": "llm",
      "phase": "evaluate",
      "enabled": true,
      "model_tier": "default",
      "weight": 0.3,
      "max_cost_per_call": 0.05,
      "batch_size": 2,
      "temperature": 0.2
    },
    {
      "name": "critic",
      "type": "llm",
      "phase": "critique",
      "enabled": true,
      "model_tier": "expensive",
      "weight": 0.25,
      "max_cost_per_call": 0.05,
      "batch_size": 2,
      "temperature": 0.4
    },
    {
      "name": "lens-applicator",
      "type": "llm",
      "phase": "evaluate",
      "enabled": true,
      "model_tier": "default",
      "weight": 0.15,
      "max_cost_per_call": 0.03,
      "batch_size": 2,
      "temperature": 0.3
    }
  ]
}

Key insight: The entire pipeline topology — which agents run, in what order, with what models, at what cost — is a JSON document in D1. No code changes required to reconfigure. A dashboard writes to the DB, and the next tick picks it up.

The Processing Pipeline

The pipeline moves through four phases in strict order:

gather → analyze → evaluate → critique

Each phase has zero or more processors. The harness advances to the next phase only when all processors in the current phase have no more work to do.

const PHASE_ORDER = ['gather', 'analyze', 'evaluate', 'critique'];

Within a phase, processors are tried sequentially. When a processor produces results, the tick ends (one processor per tick to stay under CPU limits). When no processor in the current phase produces work, the harness advances.

This creates a pull-based data flow:

  1. Gather: data-coverage checks which data sources are ready. genre-scanner identifies candidate niches from ready data. Both are deterministic — zero LLM cost.
  2. Analyze: niche-analyzer takes the niche queue populated by gather and produces deep analysis using LLM. Spawns demographic sub-niches for further analysis.
  3. Evaluate: evaluator takes analyzed niches and produces full evaluations with deterministic tests (demand, supply, monetization, fragmentation) plus LLM tests (buildable, marketable, confidence). lens-applicator enriches evaluations with viral scoring and cross-pollination ideas.
  4. Critique: critic stress-tests high-scoring evaluations, challenges assumptions, and adjusts scores.
// CycleState tracks the work queues that flow between phases
export interface CycleState {
  cycle_id: number | null;
  cycle_number: number;
  phase: string;
  phase_index: number;
  cost_so_far: number;
  tokens_input: number;
  tokens_output: number;

  // Work queues populated by earlier phases
  genres_to_scan: string[];
  niches_to_analyze: Array<{
    genre: string;
    niche_id: string;
    keywords: string[];
  }>;
  evaluations_to_critique: string[];

  // Counters
  genres_scanned: number;
  niches_found: number;
  evaluations_created: number;
  evaluations_critiqued: number;
  research_tasks_created: number;

  // Per-processor telemetry
  processor_stats: Record<
    string,
    { cost: number; calls: number; total_ms: number; failures: number }
  >;
}

The Agent Ledger

The agent framework tracks every processing event in an agent_ledger table. This enables incremental processing: agents only process entities they have not seen, or entities whose upstream data has changed.

export interface AgentConfig {
  name: string;
  version: number;
  stages?: { name: string; version: number }[];
}

// An entity needs processing when:
//   1. It has never been processed by this agent
//   2. It was processed at an older agent version
//   3. It was processed at an older stage version
//   4. Its input hash has changed (upstream data changed)
export async function getUnprocessed(
  db: D1Database,
  agent: AgentConfig,
  entityType: string,
  candidateIds: string[],
  stage = 'main',
  inputHashes?: Map<string, string>,
): Promise<string[]> {
  if (candidateIds.length === 0) return [];

  const stageVersion =
    agent.stages?.find((s) => s.name === stage)?.version ?? agent.version;

  const existing = new Map<
    string,
    { agent_version: number; stage_version: number; input_hash: string | null }
  >();

  // Query in chunks of 80 (D1 variable limit)
  for (let i = 0; i < candidateIds.length; i += 80) {
    const chunk = candidateIds.slice(i, i + 80);
    const placeholders = chunk.map(() => '?').join(',');
    const rows = await db
      .prepare(
        `SELECT entity_id, agent_version, stage_version, input_hash
         FROM agent_ledger
         WHERE agent = ? AND entity_type = ? AND stage = ?
           AND entity_id IN (${placeholders})`,
      )
      .bind(agent.name, entityType, stage, ...chunk)
      .all();

    for (const row of rows.results) {
      existing.set(row.entity_id as string, {
        agent_version: row.agent_version as number,
        stage_version: row.stage_version as number,
        input_hash: row.input_hash as string | null,
      });
    }
  }

  return candidateIds.filter((id) => {
    const entry = existing.get(id);
    if (!entry) return true;                               // never processed
    if (entry.agent_version < agent.version) return true;  // agent upgraded
    if (entry.stage_version < stageVersion) return true;   // stage upgraded
    if (inputHashes) {
      const currentHash = inputHashes.get(id);
      if (currentHash && entry.input_hash !== currentHash) return true;
    }
    return false;
  });
}

Version bumping is the escape hatch. When you improve an agent’s prompt or logic, bump its version number. Every entity it previously processed gets flagged for reprocessing automatically. No migration scripts, no manual resets.

// Simple hash for change detection
export function hashInputs(
  ...values: (string | number | null | undefined)[]
): string {
  const str = values.map((v) => String(v ?? '')).join('|');
  let hash = 0;
  for (let i = 0; i < str.length; i++) {
    const char = str.charCodeAt(i);
    hash = ((hash << 5) - hash) + char;
    hash |= 0;
  }
  return hash.toString(36);
}

The D1 schema for the ledger:

CREATE TABLE agent_ledger (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  agent TEXT NOT NULL,
  agent_version INTEGER NOT NULL,
  entity_type TEXT NOT NULL,
  entity_id TEXT NOT NULL,
  stage TEXT NOT NULL DEFAULT 'main',
  stage_version INTEGER NOT NULL,
  input_hash TEXT,
  result_json TEXT,
  duration_ms INTEGER,
  computed_at TEXT NOT NULL,
  UNIQUE(agent, entity_type, entity_id, stage)
);

CREATE INDEX idx_ledger_agent ON agent_ledger(agent, entity_type, stage);

Key insight: The agent ledger is the foundation of incremental processing. It answers “what has changed since last time?” without scanning every record. Version bumps + input hashes give you full control over what gets reprocessed.


The OpportunityHarness is a Durable Object that serves as the control plane for the entire pipeline. It does not do analysis — it governs execution.

The harness is responsible for:

  1. Budget gating — can we afford the next step?
  2. Phase selection — what processor runs next?
  3. Entity selection — what data to process?
  4. Model routing — cheap model or expensive model?
  5. Executing the processor — calling the registered function
  6. Recording telemetry — cost, tokens, success/failure
  7. Adapting — adjusting batch sizes based on success rate

The DO Shell

import { DurableObject } from 'cloudflare:workers';

type Env = {
  DB: D1Database;
  AI: Ai;
  GEMINI_API_KEY: string;
  OPPORTUNITY_HARNESS: DurableObjectNamespace;
};

export class OpportunityHarness extends DurableObject<Env> {
  private state: HarnessState = DEFAULT_STATE;
  private config: HarnessConfig | null = null;

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    ctx.blockConcurrencyWhile(async () => {
      const saved = await ctx.storage.get<HarnessState>('state');
      if (saved) this.state = { ...DEFAULT_STATE, ...saved };
      // Resume alarm if the DO was evicted while running
      if (this.state.running) {
        const existing = await ctx.storage.getAlarm();
        if (!existing) {
          await ctx.storage.setAlarm(Date.now() + 5000);
        }
      }
    });
  }
}

The blockConcurrencyWhile call in the constructor is critical. It prevents any fetch or alarm from executing until the state is fully loaded. Without it, you get race conditions on startup.

The Alarm-Driven Main Loop

The harness uses DO alarms to drive execution. Each alarm tick runs one unit of work, then schedules the next tick. This avoids the Workers CPU time limit by breaking long pipelines into many small steps.

override async alarm() {
  if (!this.state.running) return;

  try {
    // Reload config from DB every tick -- dashboard can change it live
    this.config = await this.loadConfig();
    if (!this.config) {
      this.state.error_message = 'Harness config not found in DB';
      this.state.running = false;
      await this.save();
      return;
    }

    if (this.config.status === 'paused') {
      // Config says paused -- keep alarm alive but do not work
      await this.ctx.storage.setAlarm(Date.now() + 30_000);
      return;
    }

    // Start new cycle if needed
    if (!this.state.current_cycle) {
      await this.startCycle();
    }

    // Run one tick of work
    const done = await this.tick();

    if (done) {
      await this.finalizeCycle();
      await this.ctx.storage.setAlarm(
        Date.now() + this.config.cycle_interval_ms,
      );
    } else {
      // More work to do -- tick again in 2 seconds
      await this.ctx.storage.setAlarm(Date.now() + 2000);
    }

    await this.save();
  } catch (err: any) {
    this.state.error_message = err.message || 'Unknown error';
    await this.save();
    // Retry in 60s on error
    await this.ctx.storage.setAlarm(Date.now() + 60_000);
  }
}

The 2-second interval between ticks is deliberate. It keeps the harness responsive (you can pause it by changing DB config within 2 seconds) while avoiding unnecessary polling. For idle periods between cycles, the interval stretches to cycle_interval_ms (typically 1-24 hours).

HTTP Control API

The harness exposes a simple HTTP API for control operations:

override async fetch(request: Request): Promise<Response> {
  const url = new URL(request.url);
  const path = url.pathname;

  if (request.method === 'GET' && path === '/status') {
    return Response.json({
      running: this.state.running,
      harness_slug: this.state.harness_slug,
      cycle_count: this.state.cycle_count,
      last_cycle_at: this.state.last_cycle_at,
      error: this.state.error_message,
      current_cycle: this.state.current_cycle
        ? {
            phase: this.state.current_cycle.phase,
            cost_so_far: this.state.current_cycle.cost_so_far,
            genres_scanned: this.state.current_cycle.genres_scanned,
            niches_found: this.state.current_cycle.niches_found,
            evaluations_created: this.state.current_cycle.evaluations_created,
            processor_stats: this.state.current_cycle.processor_stats,
          }
        : null,
    });
  }

  if (request.method === 'POST' && path === '/start') {
    const body = await request.json<{ slug: string }>();
    this.state.running = true;
    this.state.harness_slug = body.slug;
    this.state.error_message = null;
    await this.save();
    await this.ctx.storage.setAlarm(Date.now() + 1000);
    return Response.json({ started: true, slug: body.slug });
  }

  if (request.method === 'POST' && path === '/stop') {
    this.state.running = false;
    await this.save();
    return Response.json({ stopped: true });
  }

  if (request.method === 'POST' && path === '/tick') {
    // Manual tick for testing
    await this.alarm();
    return Response.json({ ok: true, state: this.state });
  }

  return new Response('Not found', { status: 404 });
}

The Worker routes requests to the DO using idFromName:

// In your Worker's fetch handler
export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);

    if (url.pathname.startsWith('/harness/')) {
      const slug = url.pathname.split('/')[2]; // /harness/{slug}/status
      const id = env.OPPORTUNITY_HARNESS.idFromName(slug);
      const stub = env.OPPORTUNITY_HARNESS.get(id);
      const harnessPath = '/' + url.pathname.split('/').slice(3).join('/');
      return stub.fetch(new Request(url.origin + harnessPath, request));
    }

    return new Response('Not found', { status: 404 });
  },
};

The Tick: One Unit of Work

Each tick executes exactly one processor’s batch, then returns. This keeps CPU usage predictable and allows the harness to check budget between every operation.

private async tick(): Promise<boolean> {
  if (!this.config || !this.state.current_cycle) return true;
  const cycle = this.state.current_cycle;
  const config = this.config;

  // Budget gate (hard mode stops everything)
  if (
    config.budget_mode === 'hard' &&
    cycle.cost_so_far >= config.budget_max_per_cycle
  ) {
    if (cycle.cycle_id) {
      await this.env.DB.prepare(
        `UPDATE harness_cycles SET status = 'budget_exceeded' WHERE id = ?`,
      )
        .bind(cycle.cycle_id)
        .run();
    }
    return true; // done -- out of budget
  }

  // Get enabled processors for current phase
  const phaseProcessors = config.processors.filter(
    (p) => p.enabled && p.phase === cycle.phase,
  );

  if (phaseProcessors.length === 0) {
    return this.advancePhase();
  }

  let didWork = false;
  for (const proc of phaseProcessors) {
    const fn = getProcessor(proc.name);
    if (!fn) continue;

    // Soft budget: skip processors that exhausted their allocation
    if (proc.type === 'llm' && config.budget_mode === 'soft') {
      const allocated = config.budget_max_per_cycle * proc.weight;
      const spent = cycle.processor_stats[proc.name]?.cost || 0;
      if (spent >= allocated) continue;
    }

    const ctx: ProcessorContext = {
      db: this.env.DB,
      ai: this.env.AI,
      apiKey: this.env.GEMINI_API_KEY || '',
      config,
      processor: proc,
      cycle,
      selectModel: () => this.selectModel(proc, cycle),
      remainingBudget: () => {
        const allocated = config.budget_max_per_cycle * proc.weight;
        const spent = cycle.processor_stats[proc.name]?.cost || 0;
        return Math.max(0, allocated - spent);
      },
    };

    try {
      const results = await fn(ctx);
      for (const r of results) {
        this.recordResult(cycle, r);
      }
      didWork = results.length > 0;
      if (didWork) break; // one processor per tick
    } catch (err: any) {
      this.recordResult(cycle, {
        processor: proc.name,
        entity_type: 'error',
        entity_id: 'tick',
        model_used: null,
        tokens_input: 0,
        tokens_output: 0,
        cost_usd: 0,
        duration_ms: 0,
        success: false,
        error_message: err.message,
      });
    }
  }

  if (!didWork) {
    return this.advancePhase();
  }

  return false; // more work to do
}

private advancePhase(): boolean {
  if (!this.state.current_cycle) return true;
  const currentIdx = PHASE_ORDER.indexOf(this.state.current_cycle.phase);
  if (currentIdx < 0 || currentIdx >= PHASE_ORDER.length - 1) {
    return true; // last phase done
  }
  this.state.current_cycle.phase = PHASE_ORDER[currentIdx + 1];
  this.state.current_cycle.phase_index = 0;
  return false;
}

Key insight: The “one processor per tick” rule is the secret to staying under Workers CPU limits. Each tick runs one batch of one processor (typically 2-3 LLM calls), which takes 1-5 seconds. The alarm schedules the next tick. From the outside, it looks like a continuous process. From the inside, it is a sequence of independent, retryable steps.


The pipeline’s power comes from how processors in different phases feed data to each other. This is not configured with explicit wiring — it flows through the shared CycleState.

Phase 1: Gather (Deterministic)

The gather phase runs zero-cost processors that assess data readiness and identify candidates for analysis.

DataCoverage checks whether genres have sufficient data for meaningful analysis:

registerProcessor(
  'data-coverage',
  async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
    const { db, cycle } = ctx;
    const results: ProcessorResult[] = [];

    // One-shot: only run once per cycle
    if (cycle.processor_stats['data-coverage']?.calls > 0) return [];

    for (const genre of cycle.genres_to_scan) {
      const stats = await db
        .prepare(
          `SELECT
           COUNT(*) as total,
           SUM(CASE WHEN enrichment_source = 'scraper' THEN 1 ELSE 0 END) as enriched,
           SUM(CASE WHEN competitiveness_score IS NOT NULL THEN 1 ELSE 0 END) as scored
         FROM app_registry
         WHERE primary_genre = ?`,
        )
        .bind(genre)
        .first<any>();

      const enriched = stats?.enriched || 0;
      const enrichedPct =
        stats?.total > 0 ? Math.round((enriched / stats.total) * 100) : 0;
      const scored = stats?.scored || 0;
      const scoredPct =
        enriched > 0 ? Math.round((scored / enriched) * 100) : 0;

      // Ready threshold: 200+ enriched apps OR 30%+ enrichment rate
      const ready =
        (enriched >= 200 || enrichedPct >= 30) &&
        (scored >= 50 || scoredPct >= 20);

      if (!ready) {
        // Remove unready genres from the scan queue
        cycle.genres_to_scan = cycle.genres_to_scan.filter((g) => g !== genre);

        // Create a research task for the gap
        await db
          .prepare(
            `INSERT OR IGNORE INTO research_tasks
             (type, target_id, target_name, reason, priority, status)
             VALUES ('app_enrich', ?, ?, ?, 'high', 'pending')`,
          )
          .bind(
            genre,
            `${genre} enrichment`,
            `Genre has only ${enrichedPct}% enrichment coverage`,
          )
          .run();
        cycle.research_tasks_created++;
      }

      results.push({
        processor: 'data-coverage',
        entity_type: 'genre',
        entity_id: genre,
        model_used: null,
        tokens_input: 0,
        tokens_output: 0,
        cost_usd: 0,
        duration_ms: 0,
        success: true,
        result_summary: `${genre}: ${enrichedPct}% enriched, ready=${ready}`,
      });
    }

    return results;
  },
);

GenreScanner takes ready genres and finds candidate niches by clustering keywords:

registerProcessor(
  'genre-scanner',
  async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
    const { db, cycle } = ctx;

    // Process one genre per tick (heavy queries)
    const alreadyScanned = cycle.processor_stats['genre-scanner']?.calls || 0;
    const genre = cycle.genres_to_scan[alreadyScanned];
    if (!genre) return [];

    // Find vulnerable apps (stale + proven demand)
    const vulnerable = await db
      .prepare(
        `SELECT track_id, track_name, rating_count, competitiveness_score
         FROM app_registry
         WHERE primary_genre = ?
           AND enrichment_source = 'scraper'
           AND rating_count >= 500
           AND days_since_update >= 180
         ORDER BY competitiveness_score DESC
         LIMIT 30`,
      )
      .bind(genre)
      .all();

    // Find keyword gaps (keywords where incumbents are beatable)
    const keywordGaps = await db
      .prepare(
        `SELECT cr.keyword,
                COUNT(DISTINCT cr.track_id) as app_count,
                ROUND(AVG(ar.competitiveness_score), 1) as avg_comp,
                SUM(ar.rating_count) as total_ratings
         FROM crawl_rankings cr
         JOIN app_registry ar ON cr.track_id = ar.track_id
         WHERE ar.primary_genre = ?
           AND ar.competitiveness_score IS NOT NULL
         GROUP BY cr.keyword
         HAVING COUNT(DISTINCT cr.track_id) >= 3
           AND AVG(ar.competitiveness_score) >= 30
         ORDER BY avg_comp DESC
         LIMIT 30`,
      )
      .bind(genre)
      .all();

    // Cluster keywords into niches using word overlap
    const niches = clusterKeywords(keywordGaps.results as KeywordGap[]);

    // Feed niches into the analysis queue
    for (const niche of niches) {
      cycle.niches_to_analyze.push({
        genre,
        niche_id: `${genre}::${niche.label}`,
        keywords: niche.keywords,
      });
    }

    return [
      {
        processor: 'genre-scanner',
        entity_type: 'genre',
        entity_id: genre,
        model_used: null,
        tokens_input: 0,
        tokens_output: 0,
        cost_usd: 0,
        duration_ms: 0,
        success: true,
        result_summary: `${genre}: ${niches.length} niches, ${vulnerable.results.length} vulnerable apps`,
      },
    ];
  },
);

The keyword clustering is deterministic — no LLM needed:

function clusterKeywords(gaps: KeywordGap[]): NicheCluster[] {
  const clusters: NicheCluster[] = [];
  const used = new Set<string>();
  const sorted = [...gaps].sort((a, b) => b.total_ratings - a.total_ratings);

  for (const kw of sorted) {
    if (used.has(kw.keyword)) continue;

    const words = new Set(
      kw.keyword
        .toLowerCase()
        .split(/\s+/)
        .filter((w) => w.length > 2),
    );
    const cluster: KeywordGap[] = [kw];
    used.add(kw.keyword);

    for (const other of sorted) {
      if (used.has(other.keyword)) continue;
      const otherWords = other.keyword
        .toLowerCase()
        .split(/\s+/)
        .filter((w) => w.length > 2);
      const overlap = otherWords.filter((w) => words.has(w));
      if (overlap.length >= 1) {
        cluster.push(other);
        used.add(other.keyword);
        otherWords.forEach((w) => words.add(w));
      }
    }

    clusters.push({
      label: cluster[0].keyword,
      keywords: cluster.map((k) => k.keyword),
      total_ratings: cluster.reduce((s, k) => s + k.total_ratings, 0),
      avg_competitiveness:
        cluster.reduce((s, k) => s + k.avg_comp, 0) / cluster.length,
    });
  }

  return clusters.sort((a, b) => b.total_ratings - a.total_ratings).slice(0, 15);
}

Phase 2: Analyze (LLM)

The niche-analyzer takes each niche from the queue, gathers fresh data, and uses an LLM to interpret it:

registerProcessor(
  'niche-analyzer',
  async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
    const { db, cycle, processor, selectModel, remainingBudget } = ctx;
    const results: ProcessorResult[] = [];

    const batchSize = Math.min(
      processor.batch_size || 3,
      cycle.niches_to_analyze.length,
    );

    for (let i = 0; i < batchSize; i++) {
      const niche = cycle.niches_to_analyze.shift();
      if (!niche) break;

      // Budget gate: stop if we cannot afford another call
      if (remainingBudget() < (processor.max_cost_per_call || 0.05)) {
        cycle.niches_to_analyze.unshift(niche);
        break;
      }

      // Cap recursion depth
      const depth = (niche.niche_id.match(/::/g) || []).length;
      if (depth >= 3) continue;

      const start = Date.now();

      // Gather apps and reviews for this niche
      const apps = await fetchNicheApps(db, niche.keywords);
      const reviews = await fetchLowRatedReviews(db, apps.slice(0, 5));

      // Build prompt with real data
      const dataBlock = buildNicheDataBlock(niche, apps, reviews);

      const model = selectModel();
      const result = await llmObject(
        {
          apiKey: ctx.apiKey,
          model,
          temperature: 0.3,
          maxTokens: 4096,
          thinkingBudget: 0,
          db: ctx.db,
          contextType: 'harness',
          contextId: ctx.config.slug,
        },
        nicheAnalysisSchema,
        dataBlock,
        NICHE_ANALYZER_SYSTEM,
        `harness-niche-${niche.niche_id}`,
      );

      // Spawn sub-niches for demographic opportunities
      const spawned = result.data.demographic_opportunities.map((d) => ({
        type: 'niche' as const,
        id: `${niche.niche_id}::${d.segment}`,
      }));

      results.push({
        processor: 'niche-analyzer',
        entity_type: 'niche',
        entity_id: niche.niche_id,
        model_used: model,
        tokens_input: result.usage.inputTokens,
        tokens_output: result.usage.outputTokens,
        cost_usd: result.costUsd,
        duration_ms: Date.now() - start,
        success: true,
        spawned_entities: spawned,
      });
    }

    return results;
  },
);

The Zod schema enforces structured output from the LLM:

const nicheAnalysisSchema = z.object({
  user_need: z.string().describe('The core user problem this niche serves'),
  weakness_pattern: z
    .string()
    .describe('The common failure pattern across incumbents'),
  entry_vector: z
    .string()
    .describe('The strongest way to enter this market'),
  monetization_model: z
    .string()
    .describe('How to make money (subscription, IAP, paid upfront)'),
  price_range: z
    .string()
    .describe('Recommended price range based on competitor data'),
  buildability: z.enum(['trivial', 'moderate', 'complex', 'very_complex']),
  build_weeks: z.number().describe('Estimated weeks to MVP'),
  viral_score: z.number().min(0).max(5),
  viral_signals: z.array(z.string()),
  demographic_opportunities: z.array(
    z.object({
      segment: z.string(),
      axis: z.string(),
      reasoning: z.string(),
    }),
  ),
  risks: z.array(z.string()),
  confidence: z.enum(['high', 'medium', 'low']),
});

Phase 3: Evaluate (Hybrid)

The evaluator combines deterministic tests (computed from data) with LLM judgment:

// Tests 1-4: deterministic, computed from database queries
const totalRatings = appData.reduce(
  (s, a) => s + (a.rating_count || 0),
  0,
);
const staleApps = appData.filter(
  (a) => a.days_since_update && a.days_since_update >= 180,
);

// Test 1: Demand (10K+ combined ratings)
const demandStatus =
  totalRatings >= 10000
    ? 'pass'
    : totalRatings >= 5000
      ? 'partial'
      : 'fail';

// Test 2: Supply weakness (3+ stale apps)
const supplyStatus =
  staleApps.length >= 3 || avgCompScore > 40
    ? 'pass'
    : staleApps.length >= 1 || avgCompScore > 30
      ? 'partial'
      : 'fail';

// Tests 5-7: LLM judgment
const llmResult = await llmObject(
  config,
  evaluationSchema,
  `${dataBlock}\n\nYOUR JOB: Evaluate tests 5, 6, 7`,
  EVALUATOR_SYSTEM,
);

// Combine into a composite score
const tests = [
  demandStatus,
  supplyStatus,
  monetizationStatus,
  fragmentationStatus,
  llmResult.data.buildable_status,
  llmResult.data.marketable_status,
  llmResult.data.confidence_status,
];
const testsPassed = tests.filter((t) => t === 'pass').length;

The evaluation schema demands specificity:

const evaluationSchema = z.object({
  buildable_status: z.enum(['pass', 'fail', 'partial', 'unknown']),
  buildable_reasoning: z.string(),
  marketable_status: z.enum(['pass', 'fail', 'partial', 'unknown']),
  marketable_reasoning: z.string(),
  confidence_status: z.enum(['pass', 'fail', 'partial', 'unknown']),
  confidence_reasoning: z.string(),

  demand_summary: z.string(),
  supply_summary: z.string(),
  monetization_summary: z.string(),
  marketing_summary: z.string(),
  differentiation_angle: z.string(),

  failure_reasons: z.array(
    z.object({
      test: z.string(),
      reason: z.string(),
      solvability: z.enum(['easy', 'moderate', 'hard', 'impossible']),
      how_to_solve: z.string(),
    }),
  ),

  overall_score: z.number().min(0).max(100),
  confidence_level: z.enum(['high', 'medium', 'low']),
  status: z.enum(['promising', 'solvable', 'blocked', 'monitor', 'dead']),
});

Phase 4: Critique (Adversarial LLM)

The critic receives high-scoring evaluations and stress-tests them:

registerProcessor(
  'critic',
  async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
    const { db, cycle, processor, selectModel, remainingBudget } = ctx;
    const results: ProcessorResult[] = [];

    const batchSize = Math.min(
      processor.batch_size || 2,
      cycle.evaluations_to_critique.length,
    );

    for (let i = 0; i < batchSize; i++) {
      const slug = cycle.evaluations_to_critique.shift();
      if (!slug) break;
      if (remainingBudget() < (processor.max_cost_per_call || 0.05)) {
        cycle.evaluations_to_critique.unshift(slug);
        break;
      }

      // Load the evaluation to critique
      const eval_ = await db
        .prepare(`SELECT * FROM opportunity_evaluations WHERE slug = ?`)
        .bind(slug)
        .first<any>();

      // Load competitor data and reviews
      const refApps = await loadReferenceApps(db, eval_.id);
      const reviews = await loadCompetitorReviews(db, refApps);

      const model = selectModel();
      const llmResult = await llmObject(
        {
          apiKey: ctx.apiKey,
          model,
          temperature: 0.4, // higher for adversarial thinking
          maxTokens: 4096,
          db: ctx.db,
          contextType: 'harness',
          contextId: ctx.config.slug,
        },
        critiqueSchema,
        buildCritiqueDataBlock(eval_, refApps, reviews),
        CRITIC_SYSTEM,
      );

      // Apply score adjustment
      const adjustedScore = Math.max(
        0,
        Math.min(100, eval_.overall_score + llmResult.data.score_adjustment),
      );

      await db
        .prepare(
          `UPDATE opportunity_evaluations SET
           overall_score = ?,
           confidence = ?,
           critique_json = ?,
           critique_conviction = ?,
           critique_verdict = ?,
           critique_score_delta = ?
         WHERE slug = ?`,
        )
        .bind(
          adjustedScore,
          llmResult.data.conviction,
          JSON.stringify(llmResult.data),
          llmResult.data.conviction,
          llmResult.data.one_line_verdict,
          llmResult.data.score_adjustment,
          slug,
        )
        .run();

      cycle.evaluations_critiqued++;
    }

    return results;
  },
);

The critique schema forces structured adversarial thinking:

const critiqueSchema = z.object({
  challenges: z.array(
    z.object({
      claim: z.string().describe('The assumption being challenged'),
      counter: z.string().describe('Why this might be wrong'),
      severity: z.enum(['minor', 'moderate', 'critical']),
      verdict: z.enum(['holds', 'weakened', 'broken']),
    }),
  ),

  hidden_risks: z.array(
    z.object({
      risk: z.string(),
      likelihood: z.enum(['low', 'medium', 'high']),
      impact: z.enum(['low', 'medium', 'high']),
      mitigation: z.string(),
    }),
  ),

  market_timing: z.enum(['too_early', 'good', 'late', 'uncertain']),
  platform_risk: z.enum(['low', 'medium', 'high']),
  platform_risk_detail: z.string(),

  score_adjustment: z
    .number()
    .min(-30)
    .max(10)
    .describe('Negative = overrated, positive = underrated'),
  adjustment_reasoning: z.string(),

  conviction: z.enum(['high', 'medium', 'low']),
  one_line_verdict: z.string(),
});

Key insight: The evaluator and critic are adversarial by design. The evaluator tries to find opportunity. The critic tries to kill it. The score adjustment (clamped to -30/+10) ensures the critic can downgrade significantly but cannot inflate. This asymmetry prevents the system from producing false positives.


Beyond sequential evaluation, the system supports multi-agent debates where 6 specialized agents argue about opportunities from different perspectives.

Agent Roles

const AGENT_ROLES = {
  blue_ocean: 'Blue Ocean Scout',     // Finds uncontested market spaces
  vulnerability: 'Vulnerability Analyst', // Identifies competitor weaknesses
  revenue: 'Revenue Strategist',       // Evaluates monetization potential
  timing: 'Timing Analyst',           // Assesses market timing windows
  contrarian: 'Contrarian',           // Argues against the consensus
  synthesizer: 'Synthesizer',         // Integrates all perspectives
} as const;

type AgentRole = keyof typeof AGENT_ROLES;

Three-Phase Debate Protocol

The debate runs in three phases, each producing structured output that feeds the next:

Phase 1: Independent Analysis. Each agent evaluates the opportunity independently, without seeing other agents’ opinions. This prevents anchoring bias.

interface Phase1Output {
  role: string;
  score: number;            // 0-10
  confidence: number;       // 0-100%
  top_opportunities: Array<{
    cluster_name: string;
    thesis: string;
    score: number;
    evidence: string[];
    risk: string;
  }>;
  market_observations: string[];
  warnings: string[];
  anticipated_disagreements: Array<{
    from_agent: string;
    their_likely_argument: string;
    my_preemptive_response: string;
  }>;
}

The anticipated_disagreements field is powerful. It forces each agent to consider what others will say before they say it. The Blue Ocean Scout might anticipate: “Revenue Strategist will argue the market is too small. My preemptive response: the adjacent health market adds $2B TAM.”

Phase 2: Debate and Critique. Each agent sees all Phase 1 outputs and responds with challenges, agreements, revised positions, and new insights.

interface Phase2Output {
  role: string;
  revised_score: number;
  revised_confidence: number;

  challenges: Array<{
    target_agent: string;
    their_claim: string;
    steel_man: string;          // best version of their argument
    my_challenge: string;       // why I still disagree
    what_would_change_my_mind: string;
  }>;

  agreements: Array<{
    with_agent: string;
    on_what: string;
    combined_insight: string;   // what we learned together
  }>;

  revised_opportunities: Array<{
    cluster_name: string;
    revised_thesis: string;
    revised_score: number;
    changed_because: string;
  }>;

  new_insights: string[];
  position_changes: string[];
}

The steel_man field is important. Before challenging another agent, the challenger must first present the strongest version of the argument they disagree with. This prevents straw-man attacks and forces genuine engagement with opposing views.

Phase 3: Synthesis. The Synthesizer agent reads all Phase 1 and Phase 2 outputs and produces the final report.

interface DebateReport {
  ranked_opportunities: Array<{
    rank: number;
    cluster_name: string;
    composite_score: number;
    app_concept_name: string;
    app_concept_tagline: string;
    go_no_go: string;
    first_move: string;
    key_evidence: string[];
    key_risks: string[];
    resolved_disagreements: string[];
  }>;

  market_narrative: string;
  biggest_surprise: string;
  meta_risk: string;

  debate_quality: {
    genuine_disagreements: number;
    position_changes: number;
    novel_insights: number;
    consensus_areas: string[];
    unresolved_tensions: string[];
  };
}

Debate Chat: Making Debates Readable

Raw debate JSON is impenetrable. The debate-chat module transforms structured debate data into a Slack-like timeline of messages:

export async function extractDebateMessages(
  db: D1Database,
  debateId: number,
  report: DebateReport,
): Promise<void> {
  const stmt = db.prepare(
    `INSERT INTO debate_messages
     (debate_id, phase, agent_role, message_type, content,
      target_agent, metadata_json, sort_order)
     VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
  );

  const batch: D1PreparedStatement[] = [];
  let order = 0;

  // Phase 1: independent analyses become opening statements
  for (const [role, p1] of Object.entries(report.agent_evaluations.phase1)) {
    const name = AGENT_NAMES[role] || role;
    const parts: string[] = [];
    parts.push(`**${name}** -- Score: ${p1.score}/10`);

    if (p1.top_opportunities?.length) {
      parts.push('\n**My top opportunities:**');
      for (const opp of p1.top_opportunities) {
        parts.push(`- **${opp.cluster_name}** (${opp.score}/10): ${opp.thesis}`);
      }
    }

    batch.push(
      stmt.bind(
        debateId, 1, role, 'analysis',
        parts.join('\n'), null, null, order++,
      ),
    );
  }

  // Phase 2: challenges and agreements become threaded replies
  for (const [role, p2] of Object.entries(report.agent_evaluations.phase2)) {
    if (p2.challenges?.length) {
      for (const ch of p2.challenges) {
        const msg = [
          `**Challenging ${AGENT_NAMES[ch.target_agent]}:**`,
          `> Their claim: "${ch.their_claim}"`,
          `\n**Steel-man:** ${ch.steel_man}`,
          `\n**But I disagree:** ${ch.my_challenge}`,
          `\n*I'd change my mind if:* ${ch.what_would_change_my_mind}`,
        ].join('\n');
        batch.push(
          stmt.bind(
            debateId, 2, role, 'challenge',
            msg, ch.target_agent, null, order++,
          ),
        );
      }
    }

    if (p2.position_changes?.length) {
      for (const pc of p2.position_changes) {
        batch.push(
          stmt.bind(
            debateId, 2, role, 'position_change',
            `**I changed my mind:** ${pc}`, null, null, order++,
          ),
        );
      }
    }
  }

  // Phase 3: synthesis becomes the verdict
  for (const opp of report.ranked_opportunities) {
    batch.push(
      stmt.bind(
        debateId, 3, 'synthesizer', 'verdict',
        `**#${opp.rank}: ${opp.cluster_name}** -- Score: ${opp.composite_score}/10\n**Verdict:** ${opp.go_no_go}`,
        null,
        JSON.stringify({ rank: opp.rank, score: opp.composite_score }),
        order++,
      ),
    );
  }

  // Write in batches of 100 (D1 batch limit)
  for (let i = 0; i < batch.length; i += 100) {
    await db.batch(batch.slice(i, i + 100));
  }
}

The debate messages table:

CREATE TABLE debate_messages (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  debate_id INTEGER NOT NULL,
  phase INTEGER NOT NULL,        -- 1, 2, or 3
  agent_role TEXT NOT NULL,
  message_type TEXT NOT NULL,    -- analysis, challenge, agreement, position_change, verdict
  content TEXT NOT NULL,
  target_agent TEXT,             -- who they're responding to
  metadata_json TEXT,
  sort_order INTEGER NOT NULL,
  created_at TEXT DEFAULT (datetime('now'))
);

Key insight: The debate system is not a free-form conversation. It is a structured protocol with fixed phases and typed outputs. This makes it reliable (no infinite loops), auditable (every position change is recorded), and cost-predictable (exactly 6 + 6 + 1 = 13 LLM calls per debate).


Convergence detection answers the question: “Are the signals in our data getting stronger or weaker over time?”

In a market analysis context, convergence means two previously unrelated clusters are becoming similar — their shared keywords are growing, their app overlap is increasing. This is an opportunity signal: users are searching across categories, and no app serves the intersection.

export interface ConvergenceSignal {
  lineage_a: number;
  lineage_b: number;
  name_a: string;
  name_b: string;
  genre_a: string;
  genre_b: string;
  current_similarity: number;
  previous_similarity: number;
  delta: number;                  // positive = growing closer
  shared_keywords: string[];
  cross_genre: boolean;           // different genres = unexpected = higher signal
  first_detected_at: string | null;
  growth_rate: number;
  signal_strength: number;        // composite score
  insight: string;                // human-readable explanation
}

The detector compares current state against historical state:

export async function detectConvergence(db: D1Database): Promise<{
  signals: ConvergenceSignal[];
  stats: {
    pairs_analyzed: number;
    converging: number;
    diverging: number;
    stable: number;
  };
}> {
  // Get all current cluster relations
  const currentRels = await db
    .prepare(
      `SELECT cr.lineage_id_a, cr.lineage_id_b, cr.similarity, cr.app_overlap,
              cla.name as name_a, clb.name as name_b
       FROM cluster_relations cr
       JOIN cluster_lineages cla ON cr.lineage_id_a = cla.id
       JOIN cluster_lineages clb ON cr.lineage_id_b = clb.id
       WHERE cla.is_active = 1 AND clb.is_active = 1`,
    )
    .all();

  const signals: ConvergenceSignal[] = [];
  let converging = 0, diverging = 0, stable = 0;

  for (const rel of currentRels.results) {
    const currentSim = rel.similarity as number;

    // Get previous similarity for this pair
    const prevRel = await db
      .prepare(
        `SELECT similarity FROM cluster_relations
         WHERE lineage_id_a = ? AND lineage_id_b = ?
         ORDER BY detected_at DESC
         LIMIT 1 OFFSET 1`,
      )
      .bind(rel.lineage_id_a, rel.lineage_id_b)
      .first();

    const previousSim = (prevRel?.similarity as number) || 0;
    const delta = currentSim - previousSim;

    if (delta > 0.01) converging++;
    else if (delta < -0.01) diverging++;
    else stable++;

    // Signal strength = speed x unexpectedness x market size
    const crossGenre = genreA !== genreB;
    const unexpectedness = crossGenre ? 2.0 : 1.0;
    const marketSize = Math.log10(ratingsA + ratingsB);
    const signalStrength = Math.round(
      (delta > 0 ? delta * 100 : 0) * unexpectedness * marketSize,
    );

    // Generate human-readable insight
    let insight = '';
    if (delta > 0.05 && crossGenre) {
      insight = `Rapid cross-genre convergence: ${genreA} x ${genreB} merging around "${shared.slice(0, 3).join(', ')}". New app bridging both could capture untapped demand.`;
    } else if (delta > 0.05) {
      insight = `Same-genre clusters converging. Market subcategories increasingly share keywords. Consolidation opportunity.`;
    } else if (delta > 0 && crossGenre) {
      insight = `Slow cross-genre drift. Early signal -- monitor for acceleration.`;
    }

    signals.push({
      lineage_a: la,
      lineage_b: lb,
      name_a: rel.name_a as string,
      name_b: rel.name_b as string,
      genre_a: genreA,
      genre_b: genreB,
      current_similarity: currentSim,
      previous_similarity: previousSim,
      delta,
      shared_keywords: shared.slice(0, 10),
      cross_genre: crossGenre,
      first_detected_at: firstSeen,
      growth_rate: delta,
      signal_strength: signalStrength,
      insight,
    });
  }

  signals.sort((a, b) => b.signal_strength - a.signal_strength);

  return {
    signals,
    stats: {
      pairs_analyzed: currentRels.results.length,
      converging,
      diverging,
      stable,
    },
  };
}

The convergence signal feeds back into the pipeline. High signal-strength convergences become candidates for the niche-analyzer in the next cycle, creating a feedback loop where the system automatically explores emerging opportunities.

Convergence in Agent Debates

Convergence detection also applies to agent consensus. During a debate, you can measure convergence by tracking how agent scores change between Phase 1 and Phase 2:

function measureDebateConvergence(report: DebateReport): {
  initial_variance: number;
  final_variance: number;
  convergence_ratio: number;
  position_changes: number;
} {
  const phase1Scores = Object.values(report.agent_evaluations.phase1).map(
    (p) => p.score ?? 5,
  );
  const phase2Scores = Object.values(report.agent_evaluations.phase2).map(
    (p) => p.revised_score ?? 5,
  );

  const variance = (scores: number[]) => {
    const mean = scores.reduce((s, v) => s + v, 0) / scores.length;
    return (
      scores.reduce((s, v) => s + Math.pow(v - mean, 2), 0) / scores.length
    );
  };

  const initialVar = variance(phase1Scores);
  const finalVar = variance(phase2Scores);

  return {
    initial_variance: initialVar,
    final_variance: finalVar,
    convergence_ratio:
      initialVar > 0 ? 1 - finalVar / initialVar : 1,
    position_changes: report.debate_quality.position_changes,
  };
}

A convergence ratio above 0.5 means agents moved significantly closer to agreement. A ratio near 0 means the debate did not change minds. A negative ratio means the debate actually increased disagreement — which is also valuable information.

Key insight: Convergence detection works at two levels. At the data level, it finds markets that are merging. At the agent level, it measures whether debate is productive. Both are stored in D1 and tracked over time, creating a longitudinal record of how the system’s collective intelligence evolves.


Agents that forget everything between runs cannot learn from experience. The agent memory system gives each agent role persistent memory in three layers, inspired by cognitive architectures.

Layer 1: Core Beliefs

Short belief statements with confidence scores. Always loaded into the agent’s system prompt. Capped at 20 per agent.

export interface CoreBelief {
  id: number;
  agent_role: AgentRole;
  belief: string;
  confidence: number;       // 0.0 to 1.0
  evidence_count: number;   // how many times reinforced
  first_formed_at: string;
  last_reinforced_at: string;
}

Examples:

[c=0.90, n=12] Markets with 80%+ brand dominance never convert to indie opportunity.
[c=0.75, n=5]  Cross-genre convergence signals are 3x more predictive than same-genre.
[c=0.60, n=3]  Health niches have highest retention but hardest App Store review.

Layer 2: Episodic Memory

What happened in past debates — observations, predictions, and lessons. Loaded on demand.

export type MemoryType =
  | 'observation'    // what the agent noticed
  | 'prediction'    // what the agent thinks will happen
  | 'lesson'        // what the agent learned
  | 'belief'        // a core belief (stored separately)
  | 'debate_report'; // synthesis from a debate

export interface AgentMemoryEntry {
  id: number;
  agent_role: AgentRole;
  memory_type: MemoryType;
  content: string;
  context_json: Record<string, unknown> | null;
  confidence: number;
  source_run_id: string | null;
  created_at: string;
}

Layer 3: Reflective Updates

After each debate, the system reviews agent predictions against reality and updates beliefs:

export async function reflectOnDebate(
  db: D1Database,
  report: DebateReport,
  runId: string,
): Promise<{ memories_created: number; beliefs_updated: number }> {
  let memoriesCreated = 0;
  let beliefsUpdated = 0;

  // 1. Record each agent's top prediction
  for (const [role, p1] of Object.entries(report.agent_evaluations.phase1)) {
    if (!p1.top_opportunities?.length) continue;
    const topOpp = p1.top_opportunities[0];
    await recordMemory(
      db,
      role as AgentRole,
      'prediction',
      `Predicted "${topOpp.cluster_name}" as top opportunity (${topOpp.score}/10): ${topOpp.thesis}`,
      { cluster: topOpp.cluster_name, score: topOpp.score },
      runId,
    );
    memoriesCreated++;
  }

  // 2. Record position changes as lessons
  for (const [role, p2] of Object.entries(report.agent_evaluations.phase2)) {
    if (p2.position_changes) {
      for (const change of p2.position_changes) {
        await recordMemory(
          db,
          role as AgentRole,
          'lesson',
          change,
          undefined,
          runId,
        );
        memoriesCreated++;
      }
    }
  }

  // 3. Reinforce beliefs from repeated patterns
  for (const opp of report.ranked_opportunities.slice(0, 3)) {
    if (opp.composite_score >= 7) {
      await updateBelief(
        db,
        'synthesizer',
        `"${opp.cluster_name}" is a high-potential market (scored ${opp.composite_score}/10)`,
        0.15,
      );
      beliefsUpdated++;
    }
  }

  return { memories_created: memoriesCreated, beliefs_updated: beliefsUpdated };
}

Loading Memory into Prompts

Memory is injected into the agent’s system prompt at runtime:

export async function loadAgentMemory(
  db: D1Database,
  role: AgentRole,
): Promise<string> {
  const sections: string[] = [];

  // Core beliefs (always loaded, sorted by confidence)
  const beliefs = await db
    .prepare(
      `SELECT belief, confidence, evidence_count
       FROM agent_core_beliefs
       WHERE agent_role = ?
       ORDER BY confidence DESC
       LIMIT 20`,
    )
    .bind(role)
    .all();

  if (beliefs.results.length > 0) {
    sections.push(
      `## YOUR ACCUMULATED BELIEFS (from ${beliefs.results.length} observations)`,
    );
    sections.push(
      `These are beliefs you formed through experience. Use them, but update if contradicted.`,
    );
    for (const b of beliefs.results) {
      const conf = (b.confidence as number).toFixed(2);
      const count = b.evidence_count as number;
      sections.push(`- [c=${conf}, n=${count}] ${b.belief}`);
    }
  }

  // Recent predictions (for self-evaluation)
  const predictions = await db
    .prepare(
      `SELECT content, created_at FROM agent_memory
       WHERE agent_role = ? AND memory_type = 'prediction' AND is_active = 1
       ORDER BY created_at DESC LIMIT 5`,
    )
    .bind(role)
    .all();

  if (predictions.results.length > 0) {
    sections.push(`\n## YOUR PAST PREDICTIONS (check if they came true)`);
    for (const p of predictions.results) {
      sections.push(`- [${p.created_at}] ${p.content}`);
    }
  }

  // Lessons from past debates
  const lessons = await db
    .prepare(
      `SELECT content FROM agent_memory
       WHERE agent_role = ? AND memory_type = 'lesson' AND is_active = 1
       ORDER BY created_at DESC LIMIT 5`,
    )
    .bind(role)
    .all();

  if (lessons.results.length > 0) {
    sections.push(`\n## LESSONS FROM PAST DEBATES`);
    for (const l of lessons.results) {
      sections.push(`- ${l.content}`);
    }
  }

  if (sections.length === 0) return '';

  return `\n\n--- AGENT MEMORY ---\n${sections.join('\n')}\n--- END MEMORY ---\n`;
}

Belief Dynamics

Beliefs strengthen with confirming evidence and weaken with contradictions:

export async function updateBelief(
  db: D1Database,
  role: AgentRole,
  belief: string,
  confidenceDelta: number = 0.1,
): Promise<void> {
  const existing = await db
    .prepare(
      `SELECT id, confidence, evidence_count FROM agent_core_beliefs
       WHERE agent_role = ? AND belief = ?`,
    )
    .bind(role, belief)
    .first();

  if (existing) {
    const newConf = Math.max(
      0,
      Math.min(1, (existing.confidence as number) + confidenceDelta),
    );
    await db
      .prepare(
        `UPDATE agent_core_beliefs
         SET confidence = ?, evidence_count = evidence_count + 1,
             last_reinforced_at = datetime('now')
         WHERE id = ?`,
      )
      .bind(newConf, existing.id)
      .run();
  } else {
    const initialConf = Math.max(0.1, Math.min(0.9, 0.5 + confidenceDelta));
    await db
      .prepare(
        `INSERT OR IGNORE INTO agent_core_beliefs
         (agent_role, belief, confidence)
         VALUES (?, ?, ?)`,
      )
      .bind(role, belief, initialConf)
      .run();
  }
}

export async function weakenBelief(
  db: D1Database,
  role: AgentRole,
  belief: string,
  penalty: number = 0.15,
): Promise<void> {
  await db
    .prepare(
      `UPDATE agent_core_beliefs
       SET confidence = MAX(0, confidence - ?),
           last_reinforced_at = datetime('now')
       WHERE agent_role = ? AND belief = ?`,
    )
    .bind(penalty, role, belief)
    .run();
}

Memory Pruning

Old memories are pruned to prevent unbounded growth. Predictions are kept longer (they need time to verify):

export async function pruneMemories(
  db: D1Database,
): Promise<{ pruned: number }> {
  const result = await db
    .prepare(
      `UPDATE agent_memory SET is_active = 0
       WHERE id NOT IN (
         SELECT id FROM (
           SELECT id,
             ROW_NUMBER() OVER (
               PARTITION BY agent_role ORDER BY created_at DESC
             ) as rn
           FROM agent_memory WHERE is_active = 1
         ) WHERE rn <= 30
       )
       AND is_active = 1
       AND memory_type != 'prediction'
       AND created_at < datetime('now', '-90 days')`,
    )
    .run();

  return { pruned: result.meta.changes ?? 0 };
}

The D1 schema:

CREATE TABLE agent_memory (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  agent_role TEXT NOT NULL,
  memory_type TEXT NOT NULL,
  content TEXT NOT NULL,
  context_json TEXT,
  confidence REAL DEFAULT 0.5,
  source_run_id TEXT,
  is_active INTEGER DEFAULT 1,
  created_at TEXT DEFAULT (datetime('now'))
);

CREATE TABLE agent_core_beliefs (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  agent_role TEXT NOT NULL,
  belief TEXT NOT NULL,
  confidence REAL DEFAULT 0.5,
  evidence_count INTEGER DEFAULT 1,
  first_formed_at TEXT DEFAULT (datetime('now')),
  last_reinforced_at TEXT DEFAULT (datetime('now')),
  UNIQUE(agent_role, belief)
);

CREATE INDEX idx_memory_role ON agent_memory(agent_role, memory_type, is_active);
CREATE INDEX idx_beliefs_role ON agent_core_beliefs(agent_role, confidence DESC);

Key insight: Agent memory is not a chat history. It is a structured knowledge base with confidence scores and temporal dynamics. Beliefs form, strengthen, weaken, and disappear based on evidence. This makes agents that run daily genuinely different from agents that run once — they accumulate judgment.


Every LLM call costs money. The system tracks costs at three levels: per-call, per-processor, and per-cycle. Model selection adapts based on remaining budget.

The LLM Harness

All LLM calls go through a single entry point that handles Zod validation, retries, cost tracking, and error surfacing:

import { generateObject, NoObjectGeneratedError } from 'ai';
import { createGoogleGenerativeAI } from '@ai-sdk/google';
import { z } from 'zod';

export interface LLMConfig {
  apiKey: string;
  model?: string;
  temperature?: number;
  maxTokens?: number;
  maxRetries?: number;
  thinkingBudget?: number;  // 0 = disable thinking
  aiBinding?: unknown;      // Workers AI binding for AI Gateway
  db?: D1Database;          // auto-record to api_calls_ledger
  contextType?: string;     // 'harness', 'pipeline', etc.
  contextId?: string;       // harness slug, pipeline ID
}

export interface LLMResult<T> {
  data: T;
  usage: {
    inputTokens: number;
    outputTokens: number;
    thinkingTokens: number;
    totalTokens: number;
  };
  costUsd: number;
  finishReason: string;
  durationMs: number;
  warnings: string[];
}

Pricing Awareness

The system knows exactly what each model costs, including thinking tokens:

const PRICING = {
  'gemini-2.5-flash': {
    input: 0.15 / 1_000_000,
    output: 0.60 / 1_000_000,
    thinking: 3.50 / 1_000_000,
  },
  'gemini-2.5-pro': {
    input: 1.25 / 1_000_000,
    output: 10.00 / 1_000_000,
    thinking: 10.00 / 1_000_000,
  },
  'gemini-2.0-flash': {
    input: 0.10 / 1_000_000,
    output: 0.40 / 1_000_000,
    thinking: 0,
  },
} as Record<string, { input: number; output: number; thinking: number }>;

function calculateCost(
  model: string,
  inputTokens: number,
  outputTokens: number,
  thinkingTokens = 0,
): number {
  const pricing = PRICING[model] || PRICING['gemini-2.5-flash'];
  const textOutputTokens = Math.max(0, outputTokens - thinkingTokens);
  return (
    inputTokens * pricing.input +
    textOutputTokens * pricing.output +
    thinkingTokens * pricing.thinking
  );
}

Thinking Token Extraction

Thinking tokens (used by reasoning models like Gemini 2.5) are priced differently. The SDK reports them in different ways depending on the provider and version:

function extractThinkingTokens(result: any): number {
  // Vercel AI SDK: providerMetadata.google.usageMetadata.thoughtsTokenCount
  const fromProvider =
    result.providerMetadata?.google?.usageMetadata?.thoughtsTokenCount;
  if (fromProvider != null) return fromProvider;

  // AI SDK v6: usage.reasoningTokens
  const fromUsage = result.usage?.reasoningTokens;
  if (fromUsage != null) return fromUsage;

  // outputTokenDetails.reasoningTokens
  const fromDetails = result.usage?.outputTokenDetails?.reasoningTokens;
  if (fromDetails != null) return fromDetails;

  return 0;
}

Structured Output with Automatic Retry

The llmObject function uses Vercel AI SDK’s generateObject with Zod schemas. If the LLM produces output that does not validate, the SDK automatically retries:

export async function llmObject<T>(
  config: LLMConfig,
  schema: z.ZodType<T>,
  prompt: string,
  system?: string,
  label = 'llmObject',
): Promise<LLMResult<T>> {
  const model = createModel(config);
  const start = Date.now();

  try {
    const result = await generateObject({
      model,
      schema,
      prompt,
      system,
      temperature: config.temperature ?? 0.3,
      maxOutputTokens: config.maxTokens ?? 8192,
      maxRetries: config.maxRetries ?? 3,
      ...(config.thinkingBudget !== undefined && {
        providerOptions: {
          google: {
            thinkingConfig: { thinkingBudget: config.thinkingBudget },
          },
        },
      }),
    });

    const inputTokens = result.usage?.inputTokens ?? 0;
    const outputTokens = result.usage?.outputTokens ?? 0;
    const thinkingTokens = extractThinkingTokens(result);
    const costUsd = calculateCost(
      config.model || 'gemini-2.5-flash',
      inputTokens,
      outputTokens,
      thinkingTokens,
    );

    // Auto-record to cost ledger
    if (config.db) {
      await recordToLedger(
        config.db, config, label,
        { inputTokens, outputTokens, thinkingTokens },
        costUsd, Date.now() - start, true,
      );
    }

    return {
      data: result.object,
      usage: { inputTokens, outputTokens, thinkingTokens, totalTokens: inputTokens + outputTokens },
      costUsd,
      finishReason: result.finishReason,
      durationMs: Date.now() - start,
      warnings: [],
    };
  } catch (err) {
    if (NoObjectGeneratedError.isInstance(err)) {
      const rawText = err.text || '';
      if (rawText.length > 0 && !rawText.trim().endsWith('}')) {
        console.error(
          `[llm:${label}] Output truncated. Increase maxTokens (current: ${config.maxTokens ?? 8192}).`,
        );
      }
      throw new LLMError(label, 'Schema validation failed after retries', rawText);
    }
    throw err;
  }
}

Adaptive Model Selection

The harness selects models based on the processor’s tier and the remaining budget:

private selectModel(proc: ProcessorConfig, cycle: CycleState): string {
  if (!this.config) return 'gemini-2.5-flash';

  const tier = proc.model_tier || 'default';
  const budget = this.config.budget_max_per_cycle;
  const spent = cycle.cost_so_far;
  const remaining = budget - spent;

  // Budget pressure: downgrade to default when low
  if (remaining < budget * 0.2 && tier !== 'default') {
    return this.config.model_default;
  }

  switch (tier) {
    case 'premium':
      return this.config.model_premium;   // gemini-2.5-pro
    case 'expensive':
      return this.config.model_expensive; // gemini-2.5-flash
    default:
      return this.config.model_default;   // gemini-2.0-flash
  }
}

Cost Ledger

Every LLM call is recorded in the api_calls_ledger:

CREATE TABLE api_calls_ledger (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  context_type TEXT,         -- 'harness', 'pipeline', 'api'
  context_id TEXT,           -- harness slug, pipeline ID
  service TEXT NOT NULL,     -- 'gemini', 'openai', etc.
  operation TEXT NOT NULL,   -- label from the llm call
  endpoint TEXT,             -- model name
  cost_usd REAL NOT NULL,
  units INTEGER,             -- total tokens
  input_units INTEGER,
  output_units INTEGER,
  duration_ms INTEGER,
  status TEXT,               -- 'ok' | 'error'
  error_message TEXT,
  metadata TEXT,             -- JSON with thinking tokens breakdown
  created_at TEXT DEFAULT (datetime('now'))
);

This gives you dashboards like:

-- Cost per processor per cycle
SELECT operation, COUNT(*) as calls,
       ROUND(SUM(cost_usd), 4) as total_cost,
       ROUND(AVG(duration_ms)) as avg_ms
FROM api_calls_ledger
WHERE context_type = 'harness' AND context_id = 'market-scanner'
GROUP BY operation
ORDER BY total_cost DESC;

AI Gateway Integration

When the Workers AI binding is available, the system routes through Cloudflare AI Gateway for caching and logging:

function createModel(config: LLMConfig) {
  const modelId = config.model || 'gemini-2.5-flash';

  if (config.aiBinding) {
    try {
      const aigateway = createAiGateway({
        binding: (config.aiBinding as any).gateway('aigateway'),
        options: { cacheTtl: 300 },
      });
      const google = createGoogleGateway({ apiKey: config.apiKey });
      return aigateway(google(modelId));
    } catch (err) {
      console.warn(`AI Gateway unavailable, falling back to direct`);
    }
  }

  const google = createGoogleDirect({ apiKey: config.apiKey });
  return google(modelId);
}

Key insight: Cost control is not an afterthought — it is built into every layer. The processor declares its budget weight. The harness enforces it. The LLM wrapper records actuals. The ledger enables analysis. You always know exactly how much each agent costs and whether the expensive model justified its price.


For batch operations that need to process many items with automatic retries, Cloudflare Workflows provides durable execution with step-level checkpointing.

The Scatter-Gather Pattern

The BatchResearchWorkflow processes multiple research seeds in parallel, with configurable concurrency:

import {
  WorkflowEntrypoint,
  type WorkflowEvent,
  type WorkflowStep,
} from 'cloudflare:workers';

export interface BatchParams {
  batchId: string;
  seeds: string[];
  concurrency?: number; // default 3
}

export class BatchResearchWorkflow extends WorkflowEntrypoint<Env, BatchParams> {
  override async run(event: WorkflowEvent<BatchParams>, step: WorkflowStep) {
    const { batchId, seeds, concurrency = 3 } = event.payload;

    // Step 1: Create pipeline records (checkpointed)
    const pipelineIds = await step.do('create-pipelines', async () => {
      const ids: Array<{ id: string; seed: string }> = [];
      for (const seed of seeds) {
        const id = crypto.randomUUID();
        await this.env.DB.prepare(
          `INSERT INTO pipelines (id, type, seed, status, batch_id)
           VALUES (?, 'keyword_research', ?, 'pending', ?)`,
        )
          .bind(id, seed, batchId)
          .run();
        ids.push({ id, seed });
      }
      return ids;
    });

    // Step 2: Process in parallel chunks
    const allResults: SeedResult[] = [];

    for (
      let batchStart = 0;
      batchStart < pipelineIds.length;
      batchStart += concurrency
    ) {
      const chunk = pipelineIds.slice(batchStart, batchStart + concurrency);
      const chunkIndex = Math.floor(batchStart / concurrency);

      // Each chunk is a retryable step
      const chunkResults = await step.do(
        `process-chunk-${chunkIndex}`,
        {
          retries: { limit: 2, delay: '10 seconds', backoff: 'exponential' },
          timeout: '120 seconds',
        },
        async () => {
          // Scatter: parallel execution within chunk
          const settled = await Promise.allSettled(
            chunk.map(async ({ id, seed }) => {
              await this.env.DB.prepare(
                `UPDATE pipelines SET status = 'running' WHERE id = ?`,
              )
                .bind(id)
                .run();

              try {
                const result = await runKeywordResearchPipeline(
                  this.env, id, seed,
                );

                // Chain deep evaluation inline
                try {
                  await runDeepEvalPipeline(
                    this.env, crypto.randomUUID(), id, seed, result,
                  );
                } catch {
                  // Non-fatal: keyword research still succeeded
                }

                return {
                  seed,
                  pipelineId: id,
                  status: 'completed' as const,
                  recommendation: result.verdict?.recommendation,
                  cost: result.costs.total,
                };
              } catch (err: unknown) {
                const msg =
                  err instanceof Error ? err.message : String(err);
                await this.env.DB.prepare(
                  `UPDATE pipelines SET status = 'failed',
                   error_message = ? WHERE id = ?`,
                )
                  .bind(msg.substring(0, 1000), id)
                  .run();
                return {
                  seed,
                  pipelineId: id,
                  status: 'failed' as const,
                  error: msg,
                };
              }
            }),
          );

          // Gather: collect results
          return settled.map((r) =>
            r.status === 'fulfilled'
              ? r.value
              : {
                  seed: 'unknown',
                  pipelineId: 'unknown',
                  status: 'failed' as const,
                  error: r.reason?.message ?? 'Unknown error',
                },
          );
        },
      );

      allResults.push(...chunkResults);
    }

    // Step 3: Finalize
    await step.do('finalize', async () => {
      const completed = allResults.filter((r) => r.status === 'completed');
      const totalCost = completed.reduce((s, r) => s + (r.cost ?? 0), 0);
      console.log(
        `[batch] ${batchId}: ${completed.length}/${allResults.length} completed, $${totalCost.toFixed(4)}`,
      );
    });

    return allResults;
  }
}

Why Not Just Use the Harness?

The harness and workflows serve different purposes:

ConcernHarness (DO)Workflow
Execution modelContinuous alarm loopOne-shot durable run
StatePersistent across cyclesPer-execution, checkpointed
ConcurrencyOne processor per tickParallel within steps
RetriesManual (try/catch + reschedule)Built-in with backoff
Use caseOngoing monitoring pipelineBatch processing jobs
CPU limits2s per tick (alarm)Per-step (configurable timeout)
IdentitySingleton per pipelineOne instance per batch

Use the harness when: you need a continuously running pipeline that wakes up periodically, processes whatever is new, and goes back to sleep. The harness adapts to load — more data means more ticks per cycle.

Use workflows when: you have a batch of items to process right now, want parallel execution, and need automatic retries with backoff. Workflows are fire-and-forget with guaranteed completion.

NonRetryableError

Some errors should not be retried. Authentication failures, invalid input, and permanently missing resources should fail fast:

import { NonRetryableError } from 'cloudflare:workers';

const chunkResults = await step.do(
  `process-chunk-${i}`,
  { retries: { limit: 2, delay: '10 seconds', backoff: 'exponential' } },
  async () => {
    // This will retry up to 2 times
    const result = await callExternalApi(seed);

    if (result.status === 401) {
      // This will NOT retry -- authentication is broken
      throw new NonRetryableError('API key is invalid');
    }

    if (result.status === 404) {
      // This will NOT retry -- the resource does not exist
      throw new NonRetryableError(`Seed "${seed}" not found`);
    }

    return result.data;
  },
);

Wrangler Configuration

// wrangler.jsonc
{
  "name": "multi-agent-pipeline",
  "main": "src/index.ts",
  "compatibility_date": "2025-04-01",
  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "pipeline-db",
      "database_id": "your-d1-database-id"
    }
  ],
  "durable_objects": {
    "bindings": [
      {
        "name": "OPPORTUNITY_HARNESS",
        "class_name": "OpportunityHarness"
      }
    ]
  },
  "workflows": [
    {
      "name": "batch-research",
      "binding": "BATCH_RESEARCH",
      "class_name": "BatchResearchWorkflow"
    }
  ],
  "ai": {
    "binding": "AI"
  }
}

Key insight: Workflows and Durable Objects complement each other perfectly. The DO is the control plane — it decides what to process and manages budget. The Workflow is the data plane — it processes a specific batch with parallelism and retries. You can even trigger workflows from within a DO alarm tick.


Example 1: Minimal Agent Processor

A processor that scores text sentiment using Workers AI (free tier):

import { registerProcessor } from './harness';

registerProcessor(
  'sentiment-scorer',
  async (ctx): Promise<ProcessorResult[]> => {
    const { db, ai, cycle } = ctx;

    const items = await db
      .prepare(
        `SELECT id, content FROM review_queue
         WHERE scored_at IS NULL LIMIT ?`,
      )
      .bind(ctx.processor.batch_size || 10)
      .all();

    const results: ProcessorResult[] = [];

    for (const item of items.results) {
      const start = Date.now();
      const response = await ai.run('@cf/meta/llama-3.1-8b-instruct', {
        prompt: `Rate this review sentiment from 0 (negative) to 10 (positive). Reply with just the number.\n\n"${item.content}"`,
        max_tokens: 5,
      });

      const score = parseInt(response.response?.trim() || '5', 10);

      await db
        .prepare(
          `UPDATE review_queue SET sentiment = ?, scored_at = datetime('now') WHERE id = ?`,
        )
        .bind(score, item.id)
        .run();

      results.push({
        processor: 'sentiment-scorer',
        entity_type: 'review',
        entity_id: String(item.id),
        model_used: '@cf/meta/llama-3.1-8b-instruct',
        tokens_input: 0,
        tokens_output: 0,
        cost_usd: 0, // Workers AI free tier
        duration_ms: Date.now() - start,
        success: true,
      });
    }

    return results;
  },
);

Example 2: Input Hash Change Detection

Detecting when upstream data changes so agents reprocess automatically:

import { getUnprocessed, hashInputs, recordResult } from './agent-framework';

const AGENT: AgentConfig = { name: 'price-tracker', version: 2 };

async function checkForChanges(db: D1Database) {
  // Build input hashes from current data
  const products = await db
    .prepare(`SELECT id, price, stock_status FROM products`)
    .all();

  const hashes = new Map<string, string>();
  for (const p of products.results) {
    hashes.set(
      String(p.id),
      hashInputs(p.price as number, p.stock_status as string),
    );
  }

  // Get products that need reprocessing
  const needsWork = await getUnprocessed(
    db,
    AGENT,
    'product',
    products.results.map((p) => String(p.id)),
    'main',
    hashes,
  );

  console.log(`${needsWork.length} products need reprocessing`);
  return needsWork;
}

Example 3: Spawning Entities Across Phases

A processor that creates work for downstream processors:

registerProcessor(
  'topic-splitter',
  async (ctx): Promise<ProcessorResult[]> => {
    const { cycle } = ctx;

    // Read a broad topic from the queue
    const topic = cycle.topics_to_split.shift();
    if (!topic) return [];

    // Split into sub-topics (deterministic)
    const subtopics = splitByKeywordOverlap(topic);

    return [
      {
        processor: 'topic-splitter',
        entity_type: 'topic',
        entity_id: topic.id,
        model_used: null,
        tokens_input: 0,
        tokens_output: 0,
        cost_usd: 0,
        duration_ms: 0,
        success: true,
        result_summary: `Split into ${subtopics.length} sub-topics`,
        // These spawn entities that downstream processors will pick up
        spawned_entities: subtopics.map((st) => ({
          type: 'subtopic',
          id: st.id,
        })),
      },
    ];
  },
);

Example 4: Agent Tool Calling

Using the LLM agent mode with tools for data lookup:

import { llmAgent } from './llm';
import { z } from 'zod';

const result = await llmAgent(
  {
    apiKey: env.GEMINI_API_KEY,
    model: 'gemini-2.5-flash',
    temperature: 0.3,
    maxTokens: 4096,
  },
  'Analyze the competitive landscape for "habit tracker" apps.',
  `You are a market analyst. Use the tools to gather data before forming your opinion.`,
  {
    search_apps: {
      description: 'Search for apps by keyword',
      parameters: z.object({ keyword: z.string() }),
      execute: async (args: any) => {
        const rows = await db
          .prepare(
            `SELECT track_name, rating_count, average_rating
             FROM app_registry
             WHERE track_name LIKE ? LIMIT 10`,
          )
          .bind(`%${args.keyword}%`)
          .all();
        return JSON.stringify(rows.results);
      },
    },
    get_reviews: {
      description: 'Get reviews for an app',
      parameters: z.object({ app_name: z.string(), max: z.number() }),
      execute: async (args: any) => {
        const rows = await db
          .prepare(
            `SELECT title, content, score FROM app_reviews
             WHERE track_id IN (
               SELECT track_id FROM app_registry WHERE track_name LIKE ?
             ) LIMIT ?`,
          )
          .bind(`%${args.app_name}%`, args.max)
          .all();
        return JSON.stringify(rows.results);
      },
    },
  },
  8, // max 8 tool-calling steps
  'competitive-analysis',
);

console.log(result.text); // Agent's analysis after using tools
console.log(`Cost: $${result.costUsd.toFixed(4)}, Steps: ${result.steps}`);

Example 5: Cycle State Initialization

How a cycle starts and selects its work:

private async startCycle() {
  if (!this.config) return;
  const cycleNum = this.state.cycle_count + 1;
  this.state.current_cycle = {
    cycle_id: null,
    cycle_number: cycleNum,
    phase: 'gather',
    phase_index: 0,
    cost_so_far: 0,
    tokens_input: 0,
    tokens_output: 0,
    genres_to_scan: [],
    niches_to_analyze: [],
    evaluations_to_critique: [],
    genres_scanned: 0,
    niches_found: 0,
    evaluations_created: 0,
    evaluations_critiqued: 0,
    research_tasks_created: 0,
    processor_stats: {},
  };

  // Create cycle record in D1
  const res = await this.env.DB.prepare(
    `INSERT INTO harness_cycles
     (harness_id, cycle_number, budget_limit, phase)
     VALUES (?, ?, ?, 'gather')`,
  )
    .bind(this.config.id, cycleNum, this.config.budget_max_per_cycle)
    .run();

  this.state.current_cycle.cycle_id = res.meta.last_row_id as number;

  // Auto-select genres by opportunity signal
  if (this.config.focus_genres?.length) {
    this.state.current_cycle.genres_to_scan =
      this.config.focus_genres.slice(0, this.config.cycle_genres_limit);
  } else {
    const genres = await this.env.DB.prepare(
      `SELECT primary_genre FROM app_registry
       WHERE enrichment_source = 'scraper'
       GROUP BY primary_genre
       HAVING COUNT(*) >= 30
       ORDER BY (
         SUM(CASE WHEN days_since_update >= 180 THEN 1 ELSE 0 END) * 1.0
         / COUNT(*)
       ) DESC
       LIMIT ?`,
    )
      .bind(this.config.cycle_genres_limit)
      .all();

    this.state.current_cycle.genres_to_scan = genres.results.map(
      (g: any) => g.primary_genre,
    );
  }
}

Example 6: Telemetry Recording

How the harness tracks per-processor metrics in real time:

private recordResult(cycle: CycleState, result: ProcessorResult) {
  // Update cycle totals
  cycle.cost_so_far += result.cost_usd;
  cycle.tokens_input += result.tokens_input;
  cycle.tokens_output += result.tokens_output;

  // Per-processor stats
  if (!cycle.processor_stats[result.processor]) {
    cycle.processor_stats[result.processor] = {
      cost: 0,
      calls: 0,
      total_ms: 0,
      failures: 0,
    };
  }
  const stats = cycle.processor_stats[result.processor];
  stats.cost += result.cost_usd;
  stats.calls++;
  stats.total_ms += result.duration_ms;
  if (!result.success) stats.failures++;

  // Log to DB (fire-and-forget)
  if (cycle.cycle_id) {
    this.env.DB.prepare(
      `INSERT INTO harness_processor_log
       (cycle_id, processor, entity_type, entity_id, model_used,
        tokens_input, tokens_output, cost_usd, duration_ms,
        success, error_message, result_summary)
       VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
    )
      .bind(
        cycle.cycle_id,
        result.processor,
        result.entity_type,
        result.entity_id,
        result.model_used,
        result.tokens_input,
        result.tokens_output,
        result.cost_usd,
        result.duration_ms,
        result.success ? 1 : 0,
        result.error_message || null,
        result.result_summary || null,
      )
      .run()
      .catch(() => {}); // never fail the tick on log errors
  }

  // Handle spawned entities
  if (result.spawned_entities) {
    for (const entity of result.spawned_entities) {
      if (entity.type === 'niche') {
        cycle.niches_to_analyze.push({
          genre: '',
          niche_id: entity.id,
          keywords: [],
        });
      } else if (entity.type === 'evaluation') {
        cycle.evaluations_to_critique.push(entity.id);
      }
    }
  }
}

Example 7: Prompt Engineering for Structured Output

How to write system prompts that work reliably with Zod schemas:

const CRITIC_SYSTEM = `You are an adversarial opportunity critic. Your job is to stress-test evaluations and find weaknesses.

RULES:
1. CHALLENGE ASSUMPTIONS: For each major claim, ask "what if this is wrong?"
2. HIDDEN RISKS: Look for risks the evaluator missed:
   - Platform adding this feature natively
   - Policy risks (App Store guidelines)
   - Market timing (window closing?)
   - Retention challenges
3. SCORE ADJUSTMENT: Be calibrated.
   - -5 to -10: Minor concerns
   - -10 to -20: Significant issues
   - -20 to -30: Critical problems
   - 0 to +10: Evaluator was too conservative
4. Be SPECIFIC: "Apple Health added fasting in iOS 18" not "platform risk exists."
5. one_line_verdict must be actionable.`;

Rules for prompts that produce reliable structured output:

  1. Numbered rules work better than prose paragraphs for instruction following.
  2. Calibration examples (“score -5 means X, score -20 means Y”) reduce variance.
  3. SPECIFIC in capitals tells the model you mean it.
  4. Negative examples (“not ‘platform risk exists’”) are more effective than positive examples alone.
  5. Zod .describe() on schema fields acts as inline documentation for the model.

Example 8: Testing a Processor in Isolation

You can test any processor without the harness by constructing a mock context:

import { getProcessor } from './harness';

async function testEvaluator(db: D1Database, apiKey: string) {
  const fn = getProcessor('evaluator');
  if (!fn) throw new Error('evaluator not registered');

  const cycle: CycleState = {
    cycle_id: null,
    cycle_number: 0,
    phase: 'evaluate',
    phase_index: 0,
    cost_so_far: 0,
    tokens_input: 0,
    tokens_output: 0,
    genres_to_scan: [],
    niches_to_analyze: [
      {
        genre: 'Health & Fitness',
        niche_id: 'test::fasting-tracker',
        keywords: ['fasting tracker', 'intermittent fasting', 'fasting app'],
      },
    ],
    evaluations_to_critique: [],
    genres_scanned: 0,
    niches_found: 0,
    evaluations_created: 0,
    evaluations_critiqued: 0,
    research_tasks_created: 0,
    processor_stats: {},
  };

  const results = await fn({
    db,
    ai: null as any,
    apiKey,
    config: {
      slug: 'test',
      budget_max_per_cycle: 1.0,
      cycle_critique_threshold: 60,
    } as any,
    processor: {
      name: 'evaluator',
      type: 'llm',
      enabled: true,
      phase: 'evaluate',
      model_tier: 'default',
      weight: 1.0,
      max_cost_per_call: 0.10,
      batch_size: 1,
    },
    cycle,
    selectModel: () => 'gemini-2.5-flash',
    remainingBudget: () => 1.0,
  });

  console.log('Results:', JSON.stringify(results, null, 2));
}

Multi-Agent Frameworks

FeatureWorkers-Native (this article)LangGraphCrewAIAutoGen
LanguageTypeScriptPythonPythonPython
RuntimeCloudflare Workers (V8 isolate)Any Python serverAny Python serverAny Python server
Cold start<5ms2-10s (Python + deps)2-10s2-10s
Idle cost$0 (scale to zero)$25-100/month (server)$25-100/month$25-100/month
State managementD1 + DO storage (built-in)External store (Redis, Postgres)In-memory or externalIn-memory or external
Agent topologyPhased pipeline with harnessGraph (DAG with cycles)Role-based crewsConversation-based
Structured outputZod schemas via AI SDKPydantic via instructorPydanticJSON schemas
Cost trackingPer-call, per-agent, automaticManualManualManual
Budget enforcementBuilt-in soft/hard modesNone built-inNone built-inNone built-in
Agent memoryD1-backed beliefs + episodicLangGraph memory storeCrewAI memoryExternal
ConcurrencyWorkflows for parallel, DO for sequentialNative Python asyncTask delegationAgent chat threads
DurabilityDO alarms + Workflow checkpointsLangGraph Cloud (paid)None built-inNone built-in
Deploymentwrangler deployDocker + infraDocker + infraDocker + infra
DebuggingD1 telemetry tables + AI Gateway logsLangSmith (best-in-class)Limited (logging issues)Print statements
Best forBudget-conscious production pipelinesComplex graph workflowsTeam-metaphor tasksConversational agents

When to Choose Each

Choose Workers-Native when:

Choose LangGraph when:

Choose CrewAI when:

Choose AutoGen when:

Cloudflare Primitives vs External Infrastructure

CapabilityWorkers-NativeExternal EquivalentAdvantage
Agent coordinatorDurable ObjectRedis + custom codeDO: single-threaded, no race conditions, built-in persistence
Processing ledgerD1PostgreSQLD1: zero config, auto-backup, SQL at the edge
Batch processingWorkflowsTemporal / Step FunctionsWorkflows: simpler API, no infra, built into Workers
LLM routingAI GatewayLiteLLM / OpenRouterAI Gateway: native caching, logging, rate limiting
Cost trackingBuilt-in api_calls_ledgerManual instrumentationBuilt-in: every call auto-recorded with context
Model inferenceWorkers AI (free tier)Self-hosted / API callsWorkers AI: 10K free Neurons/day, no API key needed
Secrets managementWorkers secretsVault / .env filesWorkers: encrypted at rest, per-environment

Don’tDo InsteadWhy
Run all agents in a single Worker fetchUse DO alarms with one processor per tickWorkers have a 30s CPU limit. A 6-agent pipeline will timeout.
Store agent state in global variablesUse D1 or DO ctx.storageGlobal variables reset between requests in Workers. V8 isolates are ephemeral.
Use Promise.all for 10+ LLM callsUse Workflows with chunked Promise.allSettledOne failure in Promise.all rejects everything. allSettled lets you collect partial results.
Let agents pick their own modelsUse harness-level model selection with budget awarenessAgents will always pick the best model. Budget-aware selection prevents cost overruns.
Store full LLM responses in the ledgerStore result_summary (short) + full output in separate tableLedger queries need to be fast. Full LLM output can be 10KB+ per row.
Use untyped JSON for LLM outputUse Zod schemas with generateObjectUntyped JSON requires manual parsing, validation, and error handling. Zod handles all three.
Process all entities every cycleUse the agent ledger with version trackingWithout the ledger, you reprocess unchanged entities. Version bumps give you targeted reprocessing.
Use a single budget for all processorsAllocate budget by processor weightOne expensive processor can starve all others. Weighted allocation ensures fairness.
Retry LLM calls indefinitelyThrow NonRetryableError for permanent failuresAuth failures and invalid inputs will never succeed. Retrying wastes money and time.
Store agent memory as flat textUse structured tables with confidence scores and timestampsFlat text cannot be queried, pruned, or analyzed. Structured memory enables belief dynamics.
Build a custom orchestrator from scratchStart with DO alarms + processor registryCustom orchestrators accumulate edge cases. The alarm-tick pattern handles interruptions, errors, and pauses naturally.
Use in-memory caches for LLM responsesUse AI Gateway with cacheTtlIn-memory caches die with the isolate. AI Gateway caches persist across requests and are shared globally.
Skip cost tracking during developmentRecord costs from day one via api_calls_ledgerDevelopment costs add up fast with LLMs. Tracking from the start prevents surprise bills.
Make processors depend on execution order within a phaseKeep processors independent — share state via CycleStateExecution order within a phase is not guaranteed. Use the cycle’s work queues for coordination.

Official Cloudflare Documentation

Cloudflare Blog Posts

AI SDK and LLM Integration

Multi-Agent Framework Comparisons

Agent Architecture and Design

Libraries Used


Edit page
Share this post on:

Previous Post
LCD Display Protocols & LED Matrix Libraries
Next Post
Multi-Machine AI Agent Deployment