Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28
Most multi-agent AI frameworks assume you have a Python server, a Redis cluster, and a deployment pipeline. They assume your agents will run for minutes, consume gigabytes of memory, and need a Kubernetes cluster to orchestrate. What if you could build the same thing on a serverless platform where each agent runs in isolation, costs nothing at rest, and scales to zero between invocations?
This article shows how to build multi-agent AI processing pipelines entirely on Cloudflare Workers. Not toy chatbots — production systems where specialized AI agents evaluate data, debate each other, form consensus, remember what they learned, and make recommendations. All running on edge infrastructure with sub-millisecond cold starts, D1-backed persistent memory, and per-request cost tracking measured in fractions of a cent.
What you will learn:
- How to architect a pipeline of specialized AI agent processors on Cloudflare Workers
- The Durable Object coordinator pattern (OpportunityHarness) that governs agent execution
- How to implement a multi-perspective debate system where agents argue, challenge, and reach consensus
- Convergence detection: knowing when agents have collectively figured something out
- Agent memory with core beliefs, episodic recall, and reflective updates
- LLM integration through Vercel AI SDK with Zod-validated structured output
- Budget-aware execution that adapts model selection based on remaining spend
- Durable Workflows for scatter-gather batch processing with automatic retries
- Why this Workers-native approach beats LangChain, CrewAI, and AutoGen for this class of problem
- The Problem: Why General-Purpose Agent Frameworks Fall Short
- Architecture Overview
- Core Concepts
- Pattern 1: The Durable Object Coordinator
- Pattern 2: Phased Pipeline Execution
- Pattern 3: The Debate System
- Pattern 4: Convergence Detection
- Pattern 5: Agent Memory and Core Beliefs
- Pattern 6: Budget-Aware Model Selection
- Pattern 7: Durable Workflow Orchestration
- Small Examples
- Comparisons
- Anti-Patterns
- References
You want to build a system that takes raw data — say, market opportunities, content ideas, or investment signals — and runs it through multiple AI agents that each analyze a different dimension. One agent evaluates demand. Another challenges assumptions. A third looks for hidden risks. A fourth synthesizes everything into a recommendation.
The standard approach is to reach for LangChain, CrewAI, or AutoGen. Here is what happens next:
You deploy a Python server. Now you need a VM, a container, or a serverless function with Python runtime support. Cold starts are measured in seconds. You pay for idle time. You need to manage dependencies, virtual environments, and deployment pipelines.
You add Redis for state. Agents need to share context, remember past runs, and coordinate. Redis adds another service to manage, another failure mode, and another cost line.
You hit CPU time limits. A pipeline that runs 6 agents sequentially can take 30-60 seconds of LLM inference. Most serverless platforms have execution limits that force you into background job infrastructure.
You cannot track costs. When Agent A calls GPT-4 and Agent B calls Claude, the bill arrives as a single line item. You have no idea which agent consumed what, or whether the expensive model actually improved output quality.
You scale to infinity, not zero. When there is no work, your Python server still runs. When there is a burst, your single-threaded agent framework processes items sequentially.
The Workers-native approach solves all of these:
| Pain Point | Traditional Framework | Workers-Native |
|---|---|---|
| Cold start | 2-10 seconds (Python) | <5ms |
| State management | External Redis/Postgres | D1 + DO storage (built-in) |
| Idle cost | $25-100/month minimum | $0 |
| CPU limits | 30-60s before timeout | Workflows: unlimited via step-based execution |
| Cost tracking | Manual, per-provider | Automatic, per-agent, per-call |
| Isolation | Shared process, shared memory | Per-request V8 isolate |
| Deployment | Docker, CI/CD, environment management | wrangler deploy |
Key insight: The multi-agent pipeline is not a monolithic process. It is a series of small, focused computations with state checkpoints between them. This maps perfectly to Workers + Durable Objects + Workflows.
The system has four layers:
+----------------------------+
| API / Cron Trigger |
| (Worker fetch handler) |
+-----------+----------------+
|
v
+----------------------------+
| OpportunityHarness DO |
| (Coordinator / Governor) |
| - Budget tracking |
| - Phase management |
| - Alarm-driven tick loop |
+-----------+----------------+
|
+-----------------+------------------+
| | |
v v v
+--------------+ +--------------+ +--------------+
| Gather Phase | | Analyze Phase| | Evaluate |
| data-coverage| | genre-scanner| | evaluator |
| (deterministic) | niche-analyzer| | critic |
+--------------+ | (LLM) | | lens-applicator
+--------------+ | (LLM) |
+--------------+
|
v
+-------------------+
| Debate System |
| 6 agent roles |
| 3-phase protocol |
+-------------------+
|
v
+-------------------+
| Agent Memory |
| Core beliefs |
| Episodic memory |
| Reflective updates |
+-------------------+
Each processor is a pure function: it receives a context (database handle, AI binding, budget state, model selector) and returns an array of results. The harness calls processors one at a time, records telemetry, and advances through phases until the cycle completes.
The key primitives from Cloudflare:
- Durable Objects: The harness coordinator. One DO per pipeline instance. Persistent state, alarm-driven execution, single-threaded consistency.
- D1: Agent memory, processing ledger, evaluation results, debate messages. SQLite at the edge.
- Workers AI: Free tier for lightweight models. 10,000 Neurons/day at no cost.
- Workflows: Durable execution for batch operations. Automatic retries, step-level checkpointing.
- AI Gateway: Optional caching and logging for external LLM calls.
The Agent Processor
Every agent in the pipeline is a processor: a pure function registered by name, called by the harness when its phase is active.
// The processor signature — every agent implements this
export type ProcessorFn = (ctx: ProcessorContext) => Promise<ProcessorResult[]>;
export interface ProcessorContext {
db: D1Database;
ai: Ai; // Workers AI binding
apiKey: string; // External LLM API key (e.g., Gemini)
config: HarnessConfig; // Full pipeline configuration
processor: ProcessorConfig; // This processor's specific config
cycle: CycleState; // Current cycle state (budget, counters, work queues)
selectModel: () => string; // Budget-aware model selector
remainingBudget: () => number; // USD remaining for this processor
}
export interface ProcessorResult {
processor: string; // 'evaluator', 'critic', etc.
entity_type: string; // What was processed
entity_id: string; // Unique identifier
model_used: string | null; // null for deterministic processors
tokens_input: number;
tokens_output: number;
cost_usd: number;
duration_ms: number;
success: boolean;
error_message?: string;
result_summary?: string;
spawned_entities?: Array<{ type: string; id: string }>;
}
The processor registry is a simple Map:
const PROCESSOR_REGISTRY = new Map<string, ProcessorFn>();
export function registerProcessor(name: string, fn: ProcessorFn) {
PROCESSOR_REGISTRY.set(name, fn);
}
export function getProcessor(name: string): ProcessorFn | undefined {
return PROCESSOR_REGISTRY.get(name);
}
Processors self-register at module load time. When a file like evaluator.ts is imported, its registerProcessor('evaluator', async (ctx) => { ... }) call adds it to the registry. The harness looks up processors by name from the config.
Key insight: Processors are pure functions, not classes. They do not hold state. All state lives in D1 or the cycle object. This means any processor can be tested in isolation by constructing a
ProcessorContextwith a test database.
The Harness Config
The pipeline is configured entirely through a database record. This means a dashboard can change processor weights, model tiers, budget limits, and batch sizes while the pipeline is running — the harness reloads config from DB at the start of every tick.
export interface HarnessConfig {
id: number;
slug: string;
name: string;
version: number;
status: string; // 'active' | 'paused' | 'error'
// Budget
budget_mode: 'soft' | 'hard'; // soft = skip expensive, hard = stop entirely
budget_max_per_cycle: number; // USD cap per processing cycle
budget_lambda: number; // decay factor for adaptive budgeting
// Models (tiered by cost)
model_default: string; // e.g., 'gemini-2.0-flash'
model_expensive: string; // e.g., 'gemini-2.5-flash'
model_premium: string; // e.g., 'gemini-2.5-pro'
model_upgrade_threshold: number; // score above which to upgrade model
// Cycle control
cycle_interval_ms: number; // time between cycles
cycle_max_iterations: number;
cycle_genres_limit: number;
cycle_niches_per_genre: number;
cycle_critique_threshold: number; // minimum score to trigger critique
// Processors (ordered)
processors: ProcessorConfig[];
// Focus (optional filtering)
focus_genres: string[] | null;
focus_niches: string[] | null;
// Memory
memory_enabled: boolean;
memory_reflect_every_n: number; // reflect after every N debates
}
export interface ProcessorConfig {
name: string; // 'data-coverage', 'evaluator', etc.
type: 'deterministic' | 'llm';
enabled: boolean;
phase: string; // 'gather' | 'analyze' | 'evaluate' | 'critique'
// LLM-specific
model_tier?: 'default' | 'expensive' | 'premium';
max_tokens?: number;
temperature?: number;
prompt_version?: number;
// Budget allocation
weight: number; // share of budget (0.0-1.0)
max_cost_per_call: number; // hard cap per invocation
// Execution
batch_size: number; // entities per tick
}
A typical config stores its processors in JSON:
{
"slug": "market-scanner",
"budget_mode": "soft",
"budget_max_per_cycle": 0.50,
"model_default": "gemini-2.0-flash",
"model_expensive": "gemini-2.5-flash",
"model_premium": "gemini-2.5-pro",
"processors": [
{
"name": "data-coverage",
"type": "deterministic",
"phase": "gather",
"enabled": true,
"weight": 0,
"max_cost_per_call": 0,
"batch_size": 10
},
{
"name": "genre-scanner",
"type": "deterministic",
"phase": "gather",
"enabled": true,
"weight": 0,
"max_cost_per_call": 0,
"batch_size": 1
},
{
"name": "niche-analyzer",
"type": "llm",
"phase": "analyze",
"enabled": true,
"model_tier": "default",
"weight": 0.3,
"max_cost_per_call": 0.02,
"batch_size": 3,
"temperature": 0.3
},
{
"name": "evaluator",
"type": "llm",
"phase": "evaluate",
"enabled": true,
"model_tier": "default",
"weight": 0.3,
"max_cost_per_call": 0.05,
"batch_size": 2,
"temperature": 0.2
},
{
"name": "critic",
"type": "llm",
"phase": "critique",
"enabled": true,
"model_tier": "expensive",
"weight": 0.25,
"max_cost_per_call": 0.05,
"batch_size": 2,
"temperature": 0.4
},
{
"name": "lens-applicator",
"type": "llm",
"phase": "evaluate",
"enabled": true,
"model_tier": "default",
"weight": 0.15,
"max_cost_per_call": 0.03,
"batch_size": 2,
"temperature": 0.3
}
]
}
Key insight: The entire pipeline topology — which agents run, in what order, with what models, at what cost — is a JSON document in D1. No code changes required to reconfigure. A dashboard writes to the DB, and the next tick picks it up.
The Processing Pipeline
The pipeline moves through four phases in strict order:
gather → analyze → evaluate → critique
Each phase has zero or more processors. The harness advances to the next phase only when all processors in the current phase have no more work to do.
const PHASE_ORDER = ['gather', 'analyze', 'evaluate', 'critique'];
Within a phase, processors are tried sequentially. When a processor produces results, the tick ends (one processor per tick to stay under CPU limits). When no processor in the current phase produces work, the harness advances.
This creates a pull-based data flow:
- Gather:
data-coveragechecks which data sources are ready.genre-scanneridentifies candidate niches from ready data. Both are deterministic — zero LLM cost. - Analyze:
niche-analyzertakes the niche queue populated by gather and produces deep analysis using LLM. Spawns demographic sub-niches for further analysis. - Evaluate:
evaluatortakes analyzed niches and produces full evaluations with deterministic tests (demand, supply, monetization, fragmentation) plus LLM tests (buildable, marketable, confidence).lens-applicatorenriches evaluations with viral scoring and cross-pollination ideas. - Critique:
criticstress-tests high-scoring evaluations, challenges assumptions, and adjusts scores.
// CycleState tracks the work queues that flow between phases
export interface CycleState {
cycle_id: number | null;
cycle_number: number;
phase: string;
phase_index: number;
cost_so_far: number;
tokens_input: number;
tokens_output: number;
// Work queues populated by earlier phases
genres_to_scan: string[];
niches_to_analyze: Array<{
genre: string;
niche_id: string;
keywords: string[];
}>;
evaluations_to_critique: string[];
// Counters
genres_scanned: number;
niches_found: number;
evaluations_created: number;
evaluations_critiqued: number;
research_tasks_created: number;
// Per-processor telemetry
processor_stats: Record<
string,
{ cost: number; calls: number; total_ms: number; failures: number }
>;
}
The Agent Ledger
The agent framework tracks every processing event in an agent_ledger table. This enables incremental processing: agents only process entities they have not seen, or entities whose upstream data has changed.
export interface AgentConfig {
name: string;
version: number;
stages?: { name: string; version: number }[];
}
// An entity needs processing when:
// 1. It has never been processed by this agent
// 2. It was processed at an older agent version
// 3. It was processed at an older stage version
// 4. Its input hash has changed (upstream data changed)
export async function getUnprocessed(
db: D1Database,
agent: AgentConfig,
entityType: string,
candidateIds: string[],
stage = 'main',
inputHashes?: Map<string, string>,
): Promise<string[]> {
if (candidateIds.length === 0) return [];
const stageVersion =
agent.stages?.find((s) => s.name === stage)?.version ?? agent.version;
const existing = new Map<
string,
{ agent_version: number; stage_version: number; input_hash: string | null }
>();
// Query in chunks of 80 (D1 variable limit)
for (let i = 0; i < candidateIds.length; i += 80) {
const chunk = candidateIds.slice(i, i + 80);
const placeholders = chunk.map(() => '?').join(',');
const rows = await db
.prepare(
`SELECT entity_id, agent_version, stage_version, input_hash
FROM agent_ledger
WHERE agent = ? AND entity_type = ? AND stage = ?
AND entity_id IN (${placeholders})`,
)
.bind(agent.name, entityType, stage, ...chunk)
.all();
for (const row of rows.results) {
existing.set(row.entity_id as string, {
agent_version: row.agent_version as number,
stage_version: row.stage_version as number,
input_hash: row.input_hash as string | null,
});
}
}
return candidateIds.filter((id) => {
const entry = existing.get(id);
if (!entry) return true; // never processed
if (entry.agent_version < agent.version) return true; // agent upgraded
if (entry.stage_version < stageVersion) return true; // stage upgraded
if (inputHashes) {
const currentHash = inputHashes.get(id);
if (currentHash && entry.input_hash !== currentHash) return true;
}
return false;
});
}
Version bumping is the escape hatch. When you improve an agent’s prompt or logic, bump its version number. Every entity it previously processed gets flagged for reprocessing automatically. No migration scripts, no manual resets.
// Simple hash for change detection
export function hashInputs(
...values: (string | number | null | undefined)[]
): string {
const str = values.map((v) => String(v ?? '')).join('|');
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash |= 0;
}
return hash.toString(36);
}
The D1 schema for the ledger:
CREATE TABLE agent_ledger (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent TEXT NOT NULL,
agent_version INTEGER NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
stage TEXT NOT NULL DEFAULT 'main',
stage_version INTEGER NOT NULL,
input_hash TEXT,
result_json TEXT,
duration_ms INTEGER,
computed_at TEXT NOT NULL,
UNIQUE(agent, entity_type, entity_id, stage)
);
CREATE INDEX idx_ledger_agent ON agent_ledger(agent, entity_type, stage);
Key insight: The agent ledger is the foundation of incremental processing. It answers “what has changed since last time?” without scanning every record. Version bumps + input hashes give you full control over what gets reprocessed.
The OpportunityHarness is a Durable Object that serves as the control plane for the entire pipeline. It does not do analysis — it governs execution.
The harness is responsible for:
- Budget gating — can we afford the next step?
- Phase selection — what processor runs next?
- Entity selection — what data to process?
- Model routing — cheap model or expensive model?
- Executing the processor — calling the registered function
- Recording telemetry — cost, tokens, success/failure
- Adapting — adjusting batch sizes based on success rate
The DO Shell
import { DurableObject } from 'cloudflare:workers';
type Env = {
DB: D1Database;
AI: Ai;
GEMINI_API_KEY: string;
OPPORTUNITY_HARNESS: DurableObjectNamespace;
};
export class OpportunityHarness extends DurableObject<Env> {
private state: HarnessState = DEFAULT_STATE;
private config: HarnessConfig | null = null;
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
ctx.blockConcurrencyWhile(async () => {
const saved = await ctx.storage.get<HarnessState>('state');
if (saved) this.state = { ...DEFAULT_STATE, ...saved };
// Resume alarm if the DO was evicted while running
if (this.state.running) {
const existing = await ctx.storage.getAlarm();
if (!existing) {
await ctx.storage.setAlarm(Date.now() + 5000);
}
}
});
}
}
The blockConcurrencyWhile call in the constructor is critical. It prevents any fetch or alarm from executing until the state is fully loaded. Without it, you get race conditions on startup.
The Alarm-Driven Main Loop
The harness uses DO alarms to drive execution. Each alarm tick runs one unit of work, then schedules the next tick. This avoids the Workers CPU time limit by breaking long pipelines into many small steps.
override async alarm() {
if (!this.state.running) return;
try {
// Reload config from DB every tick -- dashboard can change it live
this.config = await this.loadConfig();
if (!this.config) {
this.state.error_message = 'Harness config not found in DB';
this.state.running = false;
await this.save();
return;
}
if (this.config.status === 'paused') {
// Config says paused -- keep alarm alive but do not work
await this.ctx.storage.setAlarm(Date.now() + 30_000);
return;
}
// Start new cycle if needed
if (!this.state.current_cycle) {
await this.startCycle();
}
// Run one tick of work
const done = await this.tick();
if (done) {
await this.finalizeCycle();
await this.ctx.storage.setAlarm(
Date.now() + this.config.cycle_interval_ms,
);
} else {
// More work to do -- tick again in 2 seconds
await this.ctx.storage.setAlarm(Date.now() + 2000);
}
await this.save();
} catch (err: any) {
this.state.error_message = err.message || 'Unknown error';
await this.save();
// Retry in 60s on error
await this.ctx.storage.setAlarm(Date.now() + 60_000);
}
}
The 2-second interval between ticks is deliberate. It keeps the harness responsive (you can pause it by changing DB config within 2 seconds) while avoiding unnecessary polling. For idle periods between cycles, the interval stretches to cycle_interval_ms (typically 1-24 hours).
HTTP Control API
The harness exposes a simple HTTP API for control operations:
override async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
const path = url.pathname;
if (request.method === 'GET' && path === '/status') {
return Response.json({
running: this.state.running,
harness_slug: this.state.harness_slug,
cycle_count: this.state.cycle_count,
last_cycle_at: this.state.last_cycle_at,
error: this.state.error_message,
current_cycle: this.state.current_cycle
? {
phase: this.state.current_cycle.phase,
cost_so_far: this.state.current_cycle.cost_so_far,
genres_scanned: this.state.current_cycle.genres_scanned,
niches_found: this.state.current_cycle.niches_found,
evaluations_created: this.state.current_cycle.evaluations_created,
processor_stats: this.state.current_cycle.processor_stats,
}
: null,
});
}
if (request.method === 'POST' && path === '/start') {
const body = await request.json<{ slug: string }>();
this.state.running = true;
this.state.harness_slug = body.slug;
this.state.error_message = null;
await this.save();
await this.ctx.storage.setAlarm(Date.now() + 1000);
return Response.json({ started: true, slug: body.slug });
}
if (request.method === 'POST' && path === '/stop') {
this.state.running = false;
await this.save();
return Response.json({ stopped: true });
}
if (request.method === 'POST' && path === '/tick') {
// Manual tick for testing
await this.alarm();
return Response.json({ ok: true, state: this.state });
}
return new Response('Not found', { status: 404 });
}
The Worker routes requests to the DO using idFromName:
// In your Worker's fetch handler
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
if (url.pathname.startsWith('/harness/')) {
const slug = url.pathname.split('/')[2]; // /harness/{slug}/status
const id = env.OPPORTUNITY_HARNESS.idFromName(slug);
const stub = env.OPPORTUNITY_HARNESS.get(id);
const harnessPath = '/' + url.pathname.split('/').slice(3).join('/');
return stub.fetch(new Request(url.origin + harnessPath, request));
}
return new Response('Not found', { status: 404 });
},
};
The Tick: One Unit of Work
Each tick executes exactly one processor’s batch, then returns. This keeps CPU usage predictable and allows the harness to check budget between every operation.
private async tick(): Promise<boolean> {
if (!this.config || !this.state.current_cycle) return true;
const cycle = this.state.current_cycle;
const config = this.config;
// Budget gate (hard mode stops everything)
if (
config.budget_mode === 'hard' &&
cycle.cost_so_far >= config.budget_max_per_cycle
) {
if (cycle.cycle_id) {
await this.env.DB.prepare(
`UPDATE harness_cycles SET status = 'budget_exceeded' WHERE id = ?`,
)
.bind(cycle.cycle_id)
.run();
}
return true; // done -- out of budget
}
// Get enabled processors for current phase
const phaseProcessors = config.processors.filter(
(p) => p.enabled && p.phase === cycle.phase,
);
if (phaseProcessors.length === 0) {
return this.advancePhase();
}
let didWork = false;
for (const proc of phaseProcessors) {
const fn = getProcessor(proc.name);
if (!fn) continue;
// Soft budget: skip processors that exhausted their allocation
if (proc.type === 'llm' && config.budget_mode === 'soft') {
const allocated = config.budget_max_per_cycle * proc.weight;
const spent = cycle.processor_stats[proc.name]?.cost || 0;
if (spent >= allocated) continue;
}
const ctx: ProcessorContext = {
db: this.env.DB,
ai: this.env.AI,
apiKey: this.env.GEMINI_API_KEY || '',
config,
processor: proc,
cycle,
selectModel: () => this.selectModel(proc, cycle),
remainingBudget: () => {
const allocated = config.budget_max_per_cycle * proc.weight;
const spent = cycle.processor_stats[proc.name]?.cost || 0;
return Math.max(0, allocated - spent);
},
};
try {
const results = await fn(ctx);
for (const r of results) {
this.recordResult(cycle, r);
}
didWork = results.length > 0;
if (didWork) break; // one processor per tick
} catch (err: any) {
this.recordResult(cycle, {
processor: proc.name,
entity_type: 'error',
entity_id: 'tick',
model_used: null,
tokens_input: 0,
tokens_output: 0,
cost_usd: 0,
duration_ms: 0,
success: false,
error_message: err.message,
});
}
}
if (!didWork) {
return this.advancePhase();
}
return false; // more work to do
}
private advancePhase(): boolean {
if (!this.state.current_cycle) return true;
const currentIdx = PHASE_ORDER.indexOf(this.state.current_cycle.phase);
if (currentIdx < 0 || currentIdx >= PHASE_ORDER.length - 1) {
return true; // last phase done
}
this.state.current_cycle.phase = PHASE_ORDER[currentIdx + 1];
this.state.current_cycle.phase_index = 0;
return false;
}
Key insight: The “one processor per tick” rule is the secret to staying under Workers CPU limits. Each tick runs one batch of one processor (typically 2-3 LLM calls), which takes 1-5 seconds. The alarm schedules the next tick. From the outside, it looks like a continuous process. From the inside, it is a sequence of independent, retryable steps.
The pipeline’s power comes from how processors in different phases feed data to each other. This is not configured with explicit wiring — it flows through the shared CycleState.
Phase 1: Gather (Deterministic)
The gather phase runs zero-cost processors that assess data readiness and identify candidates for analysis.
DataCoverage checks whether genres have sufficient data for meaningful analysis:
registerProcessor(
'data-coverage',
async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db, cycle } = ctx;
const results: ProcessorResult[] = [];
// One-shot: only run once per cycle
if (cycle.processor_stats['data-coverage']?.calls > 0) return [];
for (const genre of cycle.genres_to_scan) {
const stats = await db
.prepare(
`SELECT
COUNT(*) as total,
SUM(CASE WHEN enrichment_source = 'scraper' THEN 1 ELSE 0 END) as enriched,
SUM(CASE WHEN competitiveness_score IS NOT NULL THEN 1 ELSE 0 END) as scored
FROM app_registry
WHERE primary_genre = ?`,
)
.bind(genre)
.first<any>();
const enriched = stats?.enriched || 0;
const enrichedPct =
stats?.total > 0 ? Math.round((enriched / stats.total) * 100) : 0;
const scored = stats?.scored || 0;
const scoredPct =
enriched > 0 ? Math.round((scored / enriched) * 100) : 0;
// Ready threshold: 200+ enriched apps OR 30%+ enrichment rate
const ready =
(enriched >= 200 || enrichedPct >= 30) &&
(scored >= 50 || scoredPct >= 20);
if (!ready) {
// Remove unready genres from the scan queue
cycle.genres_to_scan = cycle.genres_to_scan.filter((g) => g !== genre);
// Create a research task for the gap
await db
.prepare(
`INSERT OR IGNORE INTO research_tasks
(type, target_id, target_name, reason, priority, status)
VALUES ('app_enrich', ?, ?, ?, 'high', 'pending')`,
)
.bind(
genre,
`${genre} enrichment`,
`Genre has only ${enrichedPct}% enrichment coverage`,
)
.run();
cycle.research_tasks_created++;
}
results.push({
processor: 'data-coverage',
entity_type: 'genre',
entity_id: genre,
model_used: null,
tokens_input: 0,
tokens_output: 0,
cost_usd: 0,
duration_ms: 0,
success: true,
result_summary: `${genre}: ${enrichedPct}% enriched, ready=${ready}`,
});
}
return results;
},
);
GenreScanner takes ready genres and finds candidate niches by clustering keywords:
registerProcessor(
'genre-scanner',
async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db, cycle } = ctx;
// Process one genre per tick (heavy queries)
const alreadyScanned = cycle.processor_stats['genre-scanner']?.calls || 0;
const genre = cycle.genres_to_scan[alreadyScanned];
if (!genre) return [];
// Find vulnerable apps (stale + proven demand)
const vulnerable = await db
.prepare(
`SELECT track_id, track_name, rating_count, competitiveness_score
FROM app_registry
WHERE primary_genre = ?
AND enrichment_source = 'scraper'
AND rating_count >= 500
AND days_since_update >= 180
ORDER BY competitiveness_score DESC
LIMIT 30`,
)
.bind(genre)
.all();
// Find keyword gaps (keywords where incumbents are beatable)
const keywordGaps = await db
.prepare(
`SELECT cr.keyword,
COUNT(DISTINCT cr.track_id) as app_count,
ROUND(AVG(ar.competitiveness_score), 1) as avg_comp,
SUM(ar.rating_count) as total_ratings
FROM crawl_rankings cr
JOIN app_registry ar ON cr.track_id = ar.track_id
WHERE ar.primary_genre = ?
AND ar.competitiveness_score IS NOT NULL
GROUP BY cr.keyword
HAVING COUNT(DISTINCT cr.track_id) >= 3
AND AVG(ar.competitiveness_score) >= 30
ORDER BY avg_comp DESC
LIMIT 30`,
)
.bind(genre)
.all();
// Cluster keywords into niches using word overlap
const niches = clusterKeywords(keywordGaps.results as KeywordGap[]);
// Feed niches into the analysis queue
for (const niche of niches) {
cycle.niches_to_analyze.push({
genre,
niche_id: `${genre}::${niche.label}`,
keywords: niche.keywords,
});
}
return [
{
processor: 'genre-scanner',
entity_type: 'genre',
entity_id: genre,
model_used: null,
tokens_input: 0,
tokens_output: 0,
cost_usd: 0,
duration_ms: 0,
success: true,
result_summary: `${genre}: ${niches.length} niches, ${vulnerable.results.length} vulnerable apps`,
},
];
},
);
The keyword clustering is deterministic — no LLM needed:
function clusterKeywords(gaps: KeywordGap[]): NicheCluster[] {
const clusters: NicheCluster[] = [];
const used = new Set<string>();
const sorted = [...gaps].sort((a, b) => b.total_ratings - a.total_ratings);
for (const kw of sorted) {
if (used.has(kw.keyword)) continue;
const words = new Set(
kw.keyword
.toLowerCase()
.split(/\s+/)
.filter((w) => w.length > 2),
);
const cluster: KeywordGap[] = [kw];
used.add(kw.keyword);
for (const other of sorted) {
if (used.has(other.keyword)) continue;
const otherWords = other.keyword
.toLowerCase()
.split(/\s+/)
.filter((w) => w.length > 2);
const overlap = otherWords.filter((w) => words.has(w));
if (overlap.length >= 1) {
cluster.push(other);
used.add(other.keyword);
otherWords.forEach((w) => words.add(w));
}
}
clusters.push({
label: cluster[0].keyword,
keywords: cluster.map((k) => k.keyword),
total_ratings: cluster.reduce((s, k) => s + k.total_ratings, 0),
avg_competitiveness:
cluster.reduce((s, k) => s + k.avg_comp, 0) / cluster.length,
});
}
return clusters.sort((a, b) => b.total_ratings - a.total_ratings).slice(0, 15);
}
Phase 2: Analyze (LLM)
The niche-analyzer takes each niche from the queue, gathers fresh data, and uses an LLM to interpret it:
registerProcessor(
'niche-analyzer',
async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db, cycle, processor, selectModel, remainingBudget } = ctx;
const results: ProcessorResult[] = [];
const batchSize = Math.min(
processor.batch_size || 3,
cycle.niches_to_analyze.length,
);
for (let i = 0; i < batchSize; i++) {
const niche = cycle.niches_to_analyze.shift();
if (!niche) break;
// Budget gate: stop if we cannot afford another call
if (remainingBudget() < (processor.max_cost_per_call || 0.05)) {
cycle.niches_to_analyze.unshift(niche);
break;
}
// Cap recursion depth
const depth = (niche.niche_id.match(/::/g) || []).length;
if (depth >= 3) continue;
const start = Date.now();
// Gather apps and reviews for this niche
const apps = await fetchNicheApps(db, niche.keywords);
const reviews = await fetchLowRatedReviews(db, apps.slice(0, 5));
// Build prompt with real data
const dataBlock = buildNicheDataBlock(niche, apps, reviews);
const model = selectModel();
const result = await llmObject(
{
apiKey: ctx.apiKey,
model,
temperature: 0.3,
maxTokens: 4096,
thinkingBudget: 0,
db: ctx.db,
contextType: 'harness',
contextId: ctx.config.slug,
},
nicheAnalysisSchema,
dataBlock,
NICHE_ANALYZER_SYSTEM,
`harness-niche-${niche.niche_id}`,
);
// Spawn sub-niches for demographic opportunities
const spawned = result.data.demographic_opportunities.map((d) => ({
type: 'niche' as const,
id: `${niche.niche_id}::${d.segment}`,
}));
results.push({
processor: 'niche-analyzer',
entity_type: 'niche',
entity_id: niche.niche_id,
model_used: model,
tokens_input: result.usage.inputTokens,
tokens_output: result.usage.outputTokens,
cost_usd: result.costUsd,
duration_ms: Date.now() - start,
success: true,
spawned_entities: spawned,
});
}
return results;
},
);
The Zod schema enforces structured output from the LLM:
const nicheAnalysisSchema = z.object({
user_need: z.string().describe('The core user problem this niche serves'),
weakness_pattern: z
.string()
.describe('The common failure pattern across incumbents'),
entry_vector: z
.string()
.describe('The strongest way to enter this market'),
monetization_model: z
.string()
.describe('How to make money (subscription, IAP, paid upfront)'),
price_range: z
.string()
.describe('Recommended price range based on competitor data'),
buildability: z.enum(['trivial', 'moderate', 'complex', 'very_complex']),
build_weeks: z.number().describe('Estimated weeks to MVP'),
viral_score: z.number().min(0).max(5),
viral_signals: z.array(z.string()),
demographic_opportunities: z.array(
z.object({
segment: z.string(),
axis: z.string(),
reasoning: z.string(),
}),
),
risks: z.array(z.string()),
confidence: z.enum(['high', 'medium', 'low']),
});
Phase 3: Evaluate (Hybrid)
The evaluator combines deterministic tests (computed from data) with LLM judgment:
// Tests 1-4: deterministic, computed from database queries
const totalRatings = appData.reduce(
(s, a) => s + (a.rating_count || 0),
0,
);
const staleApps = appData.filter(
(a) => a.days_since_update && a.days_since_update >= 180,
);
// Test 1: Demand (10K+ combined ratings)
const demandStatus =
totalRatings >= 10000
? 'pass'
: totalRatings >= 5000
? 'partial'
: 'fail';
// Test 2: Supply weakness (3+ stale apps)
const supplyStatus =
staleApps.length >= 3 || avgCompScore > 40
? 'pass'
: staleApps.length >= 1 || avgCompScore > 30
? 'partial'
: 'fail';
// Tests 5-7: LLM judgment
const llmResult = await llmObject(
config,
evaluationSchema,
`${dataBlock}\n\nYOUR JOB: Evaluate tests 5, 6, 7`,
EVALUATOR_SYSTEM,
);
// Combine into a composite score
const tests = [
demandStatus,
supplyStatus,
monetizationStatus,
fragmentationStatus,
llmResult.data.buildable_status,
llmResult.data.marketable_status,
llmResult.data.confidence_status,
];
const testsPassed = tests.filter((t) => t === 'pass').length;
The evaluation schema demands specificity:
const evaluationSchema = z.object({
buildable_status: z.enum(['pass', 'fail', 'partial', 'unknown']),
buildable_reasoning: z.string(),
marketable_status: z.enum(['pass', 'fail', 'partial', 'unknown']),
marketable_reasoning: z.string(),
confidence_status: z.enum(['pass', 'fail', 'partial', 'unknown']),
confidence_reasoning: z.string(),
demand_summary: z.string(),
supply_summary: z.string(),
monetization_summary: z.string(),
marketing_summary: z.string(),
differentiation_angle: z.string(),
failure_reasons: z.array(
z.object({
test: z.string(),
reason: z.string(),
solvability: z.enum(['easy', 'moderate', 'hard', 'impossible']),
how_to_solve: z.string(),
}),
),
overall_score: z.number().min(0).max(100),
confidence_level: z.enum(['high', 'medium', 'low']),
status: z.enum(['promising', 'solvable', 'blocked', 'monitor', 'dead']),
});
Phase 4: Critique (Adversarial LLM)
The critic receives high-scoring evaluations and stress-tests them:
registerProcessor(
'critic',
async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db, cycle, processor, selectModel, remainingBudget } = ctx;
const results: ProcessorResult[] = [];
const batchSize = Math.min(
processor.batch_size || 2,
cycle.evaluations_to_critique.length,
);
for (let i = 0; i < batchSize; i++) {
const slug = cycle.evaluations_to_critique.shift();
if (!slug) break;
if (remainingBudget() < (processor.max_cost_per_call || 0.05)) {
cycle.evaluations_to_critique.unshift(slug);
break;
}
// Load the evaluation to critique
const eval_ = await db
.prepare(`SELECT * FROM opportunity_evaluations WHERE slug = ?`)
.bind(slug)
.first<any>();
// Load competitor data and reviews
const refApps = await loadReferenceApps(db, eval_.id);
const reviews = await loadCompetitorReviews(db, refApps);
const model = selectModel();
const llmResult = await llmObject(
{
apiKey: ctx.apiKey,
model,
temperature: 0.4, // higher for adversarial thinking
maxTokens: 4096,
db: ctx.db,
contextType: 'harness',
contextId: ctx.config.slug,
},
critiqueSchema,
buildCritiqueDataBlock(eval_, refApps, reviews),
CRITIC_SYSTEM,
);
// Apply score adjustment
const adjustedScore = Math.max(
0,
Math.min(100, eval_.overall_score + llmResult.data.score_adjustment),
);
await db
.prepare(
`UPDATE opportunity_evaluations SET
overall_score = ?,
confidence = ?,
critique_json = ?,
critique_conviction = ?,
critique_verdict = ?,
critique_score_delta = ?
WHERE slug = ?`,
)
.bind(
adjustedScore,
llmResult.data.conviction,
JSON.stringify(llmResult.data),
llmResult.data.conviction,
llmResult.data.one_line_verdict,
llmResult.data.score_adjustment,
slug,
)
.run();
cycle.evaluations_critiqued++;
}
return results;
},
);
The critique schema forces structured adversarial thinking:
const critiqueSchema = z.object({
challenges: z.array(
z.object({
claim: z.string().describe('The assumption being challenged'),
counter: z.string().describe('Why this might be wrong'),
severity: z.enum(['minor', 'moderate', 'critical']),
verdict: z.enum(['holds', 'weakened', 'broken']),
}),
),
hidden_risks: z.array(
z.object({
risk: z.string(),
likelihood: z.enum(['low', 'medium', 'high']),
impact: z.enum(['low', 'medium', 'high']),
mitigation: z.string(),
}),
),
market_timing: z.enum(['too_early', 'good', 'late', 'uncertain']),
platform_risk: z.enum(['low', 'medium', 'high']),
platform_risk_detail: z.string(),
score_adjustment: z
.number()
.min(-30)
.max(10)
.describe('Negative = overrated, positive = underrated'),
adjustment_reasoning: z.string(),
conviction: z.enum(['high', 'medium', 'low']),
one_line_verdict: z.string(),
});
Key insight: The evaluator and critic are adversarial by design. The evaluator tries to find opportunity. The critic tries to kill it. The score adjustment (clamped to -30/+10) ensures the critic can downgrade significantly but cannot inflate. This asymmetry prevents the system from producing false positives.
Beyond sequential evaluation, the system supports multi-agent debates where 6 specialized agents argue about opportunities from different perspectives.
Agent Roles
const AGENT_ROLES = {
blue_ocean: 'Blue Ocean Scout', // Finds uncontested market spaces
vulnerability: 'Vulnerability Analyst', // Identifies competitor weaknesses
revenue: 'Revenue Strategist', // Evaluates monetization potential
timing: 'Timing Analyst', // Assesses market timing windows
contrarian: 'Contrarian', // Argues against the consensus
synthesizer: 'Synthesizer', // Integrates all perspectives
} as const;
type AgentRole = keyof typeof AGENT_ROLES;
Three-Phase Debate Protocol
The debate runs in three phases, each producing structured output that feeds the next:
Phase 1: Independent Analysis. Each agent evaluates the opportunity independently, without seeing other agents’ opinions. This prevents anchoring bias.
interface Phase1Output {
role: string;
score: number; // 0-10
confidence: number; // 0-100%
top_opportunities: Array<{
cluster_name: string;
thesis: string;
score: number;
evidence: string[];
risk: string;
}>;
market_observations: string[];
warnings: string[];
anticipated_disagreements: Array<{
from_agent: string;
their_likely_argument: string;
my_preemptive_response: string;
}>;
}
The anticipated_disagreements field is powerful. It forces each agent to consider what others will say before they say it. The Blue Ocean Scout might anticipate: “Revenue Strategist will argue the market is too small. My preemptive response: the adjacent health market adds $2B TAM.”
Phase 2: Debate and Critique. Each agent sees all Phase 1 outputs and responds with challenges, agreements, revised positions, and new insights.
interface Phase2Output {
role: string;
revised_score: number;
revised_confidence: number;
challenges: Array<{
target_agent: string;
their_claim: string;
steel_man: string; // best version of their argument
my_challenge: string; // why I still disagree
what_would_change_my_mind: string;
}>;
agreements: Array<{
with_agent: string;
on_what: string;
combined_insight: string; // what we learned together
}>;
revised_opportunities: Array<{
cluster_name: string;
revised_thesis: string;
revised_score: number;
changed_because: string;
}>;
new_insights: string[];
position_changes: string[];
}
The steel_man field is important. Before challenging another agent, the challenger must first present the strongest version of the argument they disagree with. This prevents straw-man attacks and forces genuine engagement with opposing views.
Phase 3: Synthesis. The Synthesizer agent reads all Phase 1 and Phase 2 outputs and produces the final report.
interface DebateReport {
ranked_opportunities: Array<{
rank: number;
cluster_name: string;
composite_score: number;
app_concept_name: string;
app_concept_tagline: string;
go_no_go: string;
first_move: string;
key_evidence: string[];
key_risks: string[];
resolved_disagreements: string[];
}>;
market_narrative: string;
biggest_surprise: string;
meta_risk: string;
debate_quality: {
genuine_disagreements: number;
position_changes: number;
novel_insights: number;
consensus_areas: string[];
unresolved_tensions: string[];
};
}
Debate Chat: Making Debates Readable
Raw debate JSON is impenetrable. The debate-chat module transforms structured debate data into a Slack-like timeline of messages:
export async function extractDebateMessages(
db: D1Database,
debateId: number,
report: DebateReport,
): Promise<void> {
const stmt = db.prepare(
`INSERT INTO debate_messages
(debate_id, phase, agent_role, message_type, content,
target_agent, metadata_json, sort_order)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
);
const batch: D1PreparedStatement[] = [];
let order = 0;
// Phase 1: independent analyses become opening statements
for (const [role, p1] of Object.entries(report.agent_evaluations.phase1)) {
const name = AGENT_NAMES[role] || role;
const parts: string[] = [];
parts.push(`**${name}** -- Score: ${p1.score}/10`);
if (p1.top_opportunities?.length) {
parts.push('\n**My top opportunities:**');
for (const opp of p1.top_opportunities) {
parts.push(`- **${opp.cluster_name}** (${opp.score}/10): ${opp.thesis}`);
}
}
batch.push(
stmt.bind(
debateId, 1, role, 'analysis',
parts.join('\n'), null, null, order++,
),
);
}
// Phase 2: challenges and agreements become threaded replies
for (const [role, p2] of Object.entries(report.agent_evaluations.phase2)) {
if (p2.challenges?.length) {
for (const ch of p2.challenges) {
const msg = [
`**Challenging ${AGENT_NAMES[ch.target_agent]}:**`,
`> Their claim: "${ch.their_claim}"`,
`\n**Steel-man:** ${ch.steel_man}`,
`\n**But I disagree:** ${ch.my_challenge}`,
`\n*I'd change my mind if:* ${ch.what_would_change_my_mind}`,
].join('\n');
batch.push(
stmt.bind(
debateId, 2, role, 'challenge',
msg, ch.target_agent, null, order++,
),
);
}
}
if (p2.position_changes?.length) {
for (const pc of p2.position_changes) {
batch.push(
stmt.bind(
debateId, 2, role, 'position_change',
`**I changed my mind:** ${pc}`, null, null, order++,
),
);
}
}
}
// Phase 3: synthesis becomes the verdict
for (const opp of report.ranked_opportunities) {
batch.push(
stmt.bind(
debateId, 3, 'synthesizer', 'verdict',
`**#${opp.rank}: ${opp.cluster_name}** -- Score: ${opp.composite_score}/10\n**Verdict:** ${opp.go_no_go}`,
null,
JSON.stringify({ rank: opp.rank, score: opp.composite_score }),
order++,
),
);
}
// Write in batches of 100 (D1 batch limit)
for (let i = 0; i < batch.length; i += 100) {
await db.batch(batch.slice(i, i + 100));
}
}
The debate messages table:
CREATE TABLE debate_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
debate_id INTEGER NOT NULL,
phase INTEGER NOT NULL, -- 1, 2, or 3
agent_role TEXT NOT NULL,
message_type TEXT NOT NULL, -- analysis, challenge, agreement, position_change, verdict
content TEXT NOT NULL,
target_agent TEXT, -- who they're responding to
metadata_json TEXT,
sort_order INTEGER NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
Key insight: The debate system is not a free-form conversation. It is a structured protocol with fixed phases and typed outputs. This makes it reliable (no infinite loops), auditable (every position change is recorded), and cost-predictable (exactly 6 + 6 + 1 = 13 LLM calls per debate).
Convergence detection answers the question: “Are the signals in our data getting stronger or weaker over time?”
In a market analysis context, convergence means two previously unrelated clusters are becoming similar — their shared keywords are growing, their app overlap is increasing. This is an opportunity signal: users are searching across categories, and no app serves the intersection.
export interface ConvergenceSignal {
lineage_a: number;
lineage_b: number;
name_a: string;
name_b: string;
genre_a: string;
genre_b: string;
current_similarity: number;
previous_similarity: number;
delta: number; // positive = growing closer
shared_keywords: string[];
cross_genre: boolean; // different genres = unexpected = higher signal
first_detected_at: string | null;
growth_rate: number;
signal_strength: number; // composite score
insight: string; // human-readable explanation
}
The detector compares current state against historical state:
export async function detectConvergence(db: D1Database): Promise<{
signals: ConvergenceSignal[];
stats: {
pairs_analyzed: number;
converging: number;
diverging: number;
stable: number;
};
}> {
// Get all current cluster relations
const currentRels = await db
.prepare(
`SELECT cr.lineage_id_a, cr.lineage_id_b, cr.similarity, cr.app_overlap,
cla.name as name_a, clb.name as name_b
FROM cluster_relations cr
JOIN cluster_lineages cla ON cr.lineage_id_a = cla.id
JOIN cluster_lineages clb ON cr.lineage_id_b = clb.id
WHERE cla.is_active = 1 AND clb.is_active = 1`,
)
.all();
const signals: ConvergenceSignal[] = [];
let converging = 0, diverging = 0, stable = 0;
for (const rel of currentRels.results) {
const currentSim = rel.similarity as number;
// Get previous similarity for this pair
const prevRel = await db
.prepare(
`SELECT similarity FROM cluster_relations
WHERE lineage_id_a = ? AND lineage_id_b = ?
ORDER BY detected_at DESC
LIMIT 1 OFFSET 1`,
)
.bind(rel.lineage_id_a, rel.lineage_id_b)
.first();
const previousSim = (prevRel?.similarity as number) || 0;
const delta = currentSim - previousSim;
if (delta > 0.01) converging++;
else if (delta < -0.01) diverging++;
else stable++;
// Signal strength = speed x unexpectedness x market size
const crossGenre = genreA !== genreB;
const unexpectedness = crossGenre ? 2.0 : 1.0;
const marketSize = Math.log10(ratingsA + ratingsB);
const signalStrength = Math.round(
(delta > 0 ? delta * 100 : 0) * unexpectedness * marketSize,
);
// Generate human-readable insight
let insight = '';
if (delta > 0.05 && crossGenre) {
insight = `Rapid cross-genre convergence: ${genreA} x ${genreB} merging around "${shared.slice(0, 3).join(', ')}". New app bridging both could capture untapped demand.`;
} else if (delta > 0.05) {
insight = `Same-genre clusters converging. Market subcategories increasingly share keywords. Consolidation opportunity.`;
} else if (delta > 0 && crossGenre) {
insight = `Slow cross-genre drift. Early signal -- monitor for acceleration.`;
}
signals.push({
lineage_a: la,
lineage_b: lb,
name_a: rel.name_a as string,
name_b: rel.name_b as string,
genre_a: genreA,
genre_b: genreB,
current_similarity: currentSim,
previous_similarity: previousSim,
delta,
shared_keywords: shared.slice(0, 10),
cross_genre: crossGenre,
first_detected_at: firstSeen,
growth_rate: delta,
signal_strength: signalStrength,
insight,
});
}
signals.sort((a, b) => b.signal_strength - a.signal_strength);
return {
signals,
stats: {
pairs_analyzed: currentRels.results.length,
converging,
diverging,
stable,
},
};
}
The convergence signal feeds back into the pipeline. High signal-strength convergences become candidates for the niche-analyzer in the next cycle, creating a feedback loop where the system automatically explores emerging opportunities.
Convergence in Agent Debates
Convergence detection also applies to agent consensus. During a debate, you can measure convergence by tracking how agent scores change between Phase 1 and Phase 2:
function measureDebateConvergence(report: DebateReport): {
initial_variance: number;
final_variance: number;
convergence_ratio: number;
position_changes: number;
} {
const phase1Scores = Object.values(report.agent_evaluations.phase1).map(
(p) => p.score ?? 5,
);
const phase2Scores = Object.values(report.agent_evaluations.phase2).map(
(p) => p.revised_score ?? 5,
);
const variance = (scores: number[]) => {
const mean = scores.reduce((s, v) => s + v, 0) / scores.length;
return (
scores.reduce((s, v) => s + Math.pow(v - mean, 2), 0) / scores.length
);
};
const initialVar = variance(phase1Scores);
const finalVar = variance(phase2Scores);
return {
initial_variance: initialVar,
final_variance: finalVar,
convergence_ratio:
initialVar > 0 ? 1 - finalVar / initialVar : 1,
position_changes: report.debate_quality.position_changes,
};
}
A convergence ratio above 0.5 means agents moved significantly closer to agreement. A ratio near 0 means the debate did not change minds. A negative ratio means the debate actually increased disagreement — which is also valuable information.
Key insight: Convergence detection works at two levels. At the data level, it finds markets that are merging. At the agent level, it measures whether debate is productive. Both are stored in D1 and tracked over time, creating a longitudinal record of how the system’s collective intelligence evolves.
Agents that forget everything between runs cannot learn from experience. The agent memory system gives each agent role persistent memory in three layers, inspired by cognitive architectures.
Layer 1: Core Beliefs
Short belief statements with confidence scores. Always loaded into the agent’s system prompt. Capped at 20 per agent.
export interface CoreBelief {
id: number;
agent_role: AgentRole;
belief: string;
confidence: number; // 0.0 to 1.0
evidence_count: number; // how many times reinforced
first_formed_at: string;
last_reinforced_at: string;
}
Examples:
[c=0.90, n=12] Markets with 80%+ brand dominance never convert to indie opportunity.
[c=0.75, n=5] Cross-genre convergence signals are 3x more predictive than same-genre.
[c=0.60, n=3] Health niches have highest retention but hardest App Store review.
Layer 2: Episodic Memory
What happened in past debates — observations, predictions, and lessons. Loaded on demand.
export type MemoryType =
| 'observation' // what the agent noticed
| 'prediction' // what the agent thinks will happen
| 'lesson' // what the agent learned
| 'belief' // a core belief (stored separately)
| 'debate_report'; // synthesis from a debate
export interface AgentMemoryEntry {
id: number;
agent_role: AgentRole;
memory_type: MemoryType;
content: string;
context_json: Record<string, unknown> | null;
confidence: number;
source_run_id: string | null;
created_at: string;
}
Layer 3: Reflective Updates
After each debate, the system reviews agent predictions against reality and updates beliefs:
export async function reflectOnDebate(
db: D1Database,
report: DebateReport,
runId: string,
): Promise<{ memories_created: number; beliefs_updated: number }> {
let memoriesCreated = 0;
let beliefsUpdated = 0;
// 1. Record each agent's top prediction
for (const [role, p1] of Object.entries(report.agent_evaluations.phase1)) {
if (!p1.top_opportunities?.length) continue;
const topOpp = p1.top_opportunities[0];
await recordMemory(
db,
role as AgentRole,
'prediction',
`Predicted "${topOpp.cluster_name}" as top opportunity (${topOpp.score}/10): ${topOpp.thesis}`,
{ cluster: topOpp.cluster_name, score: topOpp.score },
runId,
);
memoriesCreated++;
}
// 2. Record position changes as lessons
for (const [role, p2] of Object.entries(report.agent_evaluations.phase2)) {
if (p2.position_changes) {
for (const change of p2.position_changes) {
await recordMemory(
db,
role as AgentRole,
'lesson',
change,
undefined,
runId,
);
memoriesCreated++;
}
}
}
// 3. Reinforce beliefs from repeated patterns
for (const opp of report.ranked_opportunities.slice(0, 3)) {
if (opp.composite_score >= 7) {
await updateBelief(
db,
'synthesizer',
`"${opp.cluster_name}" is a high-potential market (scored ${opp.composite_score}/10)`,
0.15,
);
beliefsUpdated++;
}
}
return { memories_created: memoriesCreated, beliefs_updated: beliefsUpdated };
}
Loading Memory into Prompts
Memory is injected into the agent’s system prompt at runtime:
export async function loadAgentMemory(
db: D1Database,
role: AgentRole,
): Promise<string> {
const sections: string[] = [];
// Core beliefs (always loaded, sorted by confidence)
const beliefs = await db
.prepare(
`SELECT belief, confidence, evidence_count
FROM agent_core_beliefs
WHERE agent_role = ?
ORDER BY confidence DESC
LIMIT 20`,
)
.bind(role)
.all();
if (beliefs.results.length > 0) {
sections.push(
`## YOUR ACCUMULATED BELIEFS (from ${beliefs.results.length} observations)`,
);
sections.push(
`These are beliefs you formed through experience. Use them, but update if contradicted.`,
);
for (const b of beliefs.results) {
const conf = (b.confidence as number).toFixed(2);
const count = b.evidence_count as number;
sections.push(`- [c=${conf}, n=${count}] ${b.belief}`);
}
}
// Recent predictions (for self-evaluation)
const predictions = await db
.prepare(
`SELECT content, created_at FROM agent_memory
WHERE agent_role = ? AND memory_type = 'prediction' AND is_active = 1
ORDER BY created_at DESC LIMIT 5`,
)
.bind(role)
.all();
if (predictions.results.length > 0) {
sections.push(`\n## YOUR PAST PREDICTIONS (check if they came true)`);
for (const p of predictions.results) {
sections.push(`- [${p.created_at}] ${p.content}`);
}
}
// Lessons from past debates
const lessons = await db
.prepare(
`SELECT content FROM agent_memory
WHERE agent_role = ? AND memory_type = 'lesson' AND is_active = 1
ORDER BY created_at DESC LIMIT 5`,
)
.bind(role)
.all();
if (lessons.results.length > 0) {
sections.push(`\n## LESSONS FROM PAST DEBATES`);
for (const l of lessons.results) {
sections.push(`- ${l.content}`);
}
}
if (sections.length === 0) return '';
return `\n\n--- AGENT MEMORY ---\n${sections.join('\n')}\n--- END MEMORY ---\n`;
}
Belief Dynamics
Beliefs strengthen with confirming evidence and weaken with contradictions:
export async function updateBelief(
db: D1Database,
role: AgentRole,
belief: string,
confidenceDelta: number = 0.1,
): Promise<void> {
const existing = await db
.prepare(
`SELECT id, confidence, evidence_count FROM agent_core_beliefs
WHERE agent_role = ? AND belief = ?`,
)
.bind(role, belief)
.first();
if (existing) {
const newConf = Math.max(
0,
Math.min(1, (existing.confidence as number) + confidenceDelta),
);
await db
.prepare(
`UPDATE agent_core_beliefs
SET confidence = ?, evidence_count = evidence_count + 1,
last_reinforced_at = datetime('now')
WHERE id = ?`,
)
.bind(newConf, existing.id)
.run();
} else {
const initialConf = Math.max(0.1, Math.min(0.9, 0.5 + confidenceDelta));
await db
.prepare(
`INSERT OR IGNORE INTO agent_core_beliefs
(agent_role, belief, confidence)
VALUES (?, ?, ?)`,
)
.bind(role, belief, initialConf)
.run();
}
}
export async function weakenBelief(
db: D1Database,
role: AgentRole,
belief: string,
penalty: number = 0.15,
): Promise<void> {
await db
.prepare(
`UPDATE agent_core_beliefs
SET confidence = MAX(0, confidence - ?),
last_reinforced_at = datetime('now')
WHERE agent_role = ? AND belief = ?`,
)
.bind(penalty, role, belief)
.run();
}
Memory Pruning
Old memories are pruned to prevent unbounded growth. Predictions are kept longer (they need time to verify):
export async function pruneMemories(
db: D1Database,
): Promise<{ pruned: number }> {
const result = await db
.prepare(
`UPDATE agent_memory SET is_active = 0
WHERE id NOT IN (
SELECT id FROM (
SELECT id,
ROW_NUMBER() OVER (
PARTITION BY agent_role ORDER BY created_at DESC
) as rn
FROM agent_memory WHERE is_active = 1
) WHERE rn <= 30
)
AND is_active = 1
AND memory_type != 'prediction'
AND created_at < datetime('now', '-90 days')`,
)
.run();
return { pruned: result.meta.changes ?? 0 };
}
The D1 schema:
CREATE TABLE agent_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_role TEXT NOT NULL,
memory_type TEXT NOT NULL,
content TEXT NOT NULL,
context_json TEXT,
confidence REAL DEFAULT 0.5,
source_run_id TEXT,
is_active INTEGER DEFAULT 1,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE agent_core_beliefs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_role TEXT NOT NULL,
belief TEXT NOT NULL,
confidence REAL DEFAULT 0.5,
evidence_count INTEGER DEFAULT 1,
first_formed_at TEXT DEFAULT (datetime('now')),
last_reinforced_at TEXT DEFAULT (datetime('now')),
UNIQUE(agent_role, belief)
);
CREATE INDEX idx_memory_role ON agent_memory(agent_role, memory_type, is_active);
CREATE INDEX idx_beliefs_role ON agent_core_beliefs(agent_role, confidence DESC);
Key insight: Agent memory is not a chat history. It is a structured knowledge base with confidence scores and temporal dynamics. Beliefs form, strengthen, weaken, and disappear based on evidence. This makes agents that run daily genuinely different from agents that run once — they accumulate judgment.
Every LLM call costs money. The system tracks costs at three levels: per-call, per-processor, and per-cycle. Model selection adapts based on remaining budget.
The LLM Harness
All LLM calls go through a single entry point that handles Zod validation, retries, cost tracking, and error surfacing:
import { generateObject, NoObjectGeneratedError } from 'ai';
import { createGoogleGenerativeAI } from '@ai-sdk/google';
import { z } from 'zod';
export interface LLMConfig {
apiKey: string;
model?: string;
temperature?: number;
maxTokens?: number;
maxRetries?: number;
thinkingBudget?: number; // 0 = disable thinking
aiBinding?: unknown; // Workers AI binding for AI Gateway
db?: D1Database; // auto-record to api_calls_ledger
contextType?: string; // 'harness', 'pipeline', etc.
contextId?: string; // harness slug, pipeline ID
}
export interface LLMResult<T> {
data: T;
usage: {
inputTokens: number;
outputTokens: number;
thinkingTokens: number;
totalTokens: number;
};
costUsd: number;
finishReason: string;
durationMs: number;
warnings: string[];
}
Pricing Awareness
The system knows exactly what each model costs, including thinking tokens:
const PRICING = {
'gemini-2.5-flash': {
input: 0.15 / 1_000_000,
output: 0.60 / 1_000_000,
thinking: 3.50 / 1_000_000,
},
'gemini-2.5-pro': {
input: 1.25 / 1_000_000,
output: 10.00 / 1_000_000,
thinking: 10.00 / 1_000_000,
},
'gemini-2.0-flash': {
input: 0.10 / 1_000_000,
output: 0.40 / 1_000_000,
thinking: 0,
},
} as Record<string, { input: number; output: number; thinking: number }>;
function calculateCost(
model: string,
inputTokens: number,
outputTokens: number,
thinkingTokens = 0,
): number {
const pricing = PRICING[model] || PRICING['gemini-2.5-flash'];
const textOutputTokens = Math.max(0, outputTokens - thinkingTokens);
return (
inputTokens * pricing.input +
textOutputTokens * pricing.output +
thinkingTokens * pricing.thinking
);
}
Thinking Token Extraction
Thinking tokens (used by reasoning models like Gemini 2.5) are priced differently. The SDK reports them in different ways depending on the provider and version:
function extractThinkingTokens(result: any): number {
// Vercel AI SDK: providerMetadata.google.usageMetadata.thoughtsTokenCount
const fromProvider =
result.providerMetadata?.google?.usageMetadata?.thoughtsTokenCount;
if (fromProvider != null) return fromProvider;
// AI SDK v6: usage.reasoningTokens
const fromUsage = result.usage?.reasoningTokens;
if (fromUsage != null) return fromUsage;
// outputTokenDetails.reasoningTokens
const fromDetails = result.usage?.outputTokenDetails?.reasoningTokens;
if (fromDetails != null) return fromDetails;
return 0;
}
Structured Output with Automatic Retry
The llmObject function uses Vercel AI SDK’s generateObject with Zod schemas. If the LLM produces output that does not validate, the SDK automatically retries:
export async function llmObject<T>(
config: LLMConfig,
schema: z.ZodType<T>,
prompt: string,
system?: string,
label = 'llmObject',
): Promise<LLMResult<T>> {
const model = createModel(config);
const start = Date.now();
try {
const result = await generateObject({
model,
schema,
prompt,
system,
temperature: config.temperature ?? 0.3,
maxOutputTokens: config.maxTokens ?? 8192,
maxRetries: config.maxRetries ?? 3,
...(config.thinkingBudget !== undefined && {
providerOptions: {
google: {
thinkingConfig: { thinkingBudget: config.thinkingBudget },
},
},
}),
});
const inputTokens = result.usage?.inputTokens ?? 0;
const outputTokens = result.usage?.outputTokens ?? 0;
const thinkingTokens = extractThinkingTokens(result);
const costUsd = calculateCost(
config.model || 'gemini-2.5-flash',
inputTokens,
outputTokens,
thinkingTokens,
);
// Auto-record to cost ledger
if (config.db) {
await recordToLedger(
config.db, config, label,
{ inputTokens, outputTokens, thinkingTokens },
costUsd, Date.now() - start, true,
);
}
return {
data: result.object,
usage: { inputTokens, outputTokens, thinkingTokens, totalTokens: inputTokens + outputTokens },
costUsd,
finishReason: result.finishReason,
durationMs: Date.now() - start,
warnings: [],
};
} catch (err) {
if (NoObjectGeneratedError.isInstance(err)) {
const rawText = err.text || '';
if (rawText.length > 0 && !rawText.trim().endsWith('}')) {
console.error(
`[llm:${label}] Output truncated. Increase maxTokens (current: ${config.maxTokens ?? 8192}).`,
);
}
throw new LLMError(label, 'Schema validation failed after retries', rawText);
}
throw err;
}
}
Adaptive Model Selection
The harness selects models based on the processor’s tier and the remaining budget:
private selectModel(proc: ProcessorConfig, cycle: CycleState): string {
if (!this.config) return 'gemini-2.5-flash';
const tier = proc.model_tier || 'default';
const budget = this.config.budget_max_per_cycle;
const spent = cycle.cost_so_far;
const remaining = budget - spent;
// Budget pressure: downgrade to default when low
if (remaining < budget * 0.2 && tier !== 'default') {
return this.config.model_default;
}
switch (tier) {
case 'premium':
return this.config.model_premium; // gemini-2.5-pro
case 'expensive':
return this.config.model_expensive; // gemini-2.5-flash
default:
return this.config.model_default; // gemini-2.0-flash
}
}
Cost Ledger
Every LLM call is recorded in the api_calls_ledger:
CREATE TABLE api_calls_ledger (
id INTEGER PRIMARY KEY AUTOINCREMENT,
context_type TEXT, -- 'harness', 'pipeline', 'api'
context_id TEXT, -- harness slug, pipeline ID
service TEXT NOT NULL, -- 'gemini', 'openai', etc.
operation TEXT NOT NULL, -- label from the llm call
endpoint TEXT, -- model name
cost_usd REAL NOT NULL,
units INTEGER, -- total tokens
input_units INTEGER,
output_units INTEGER,
duration_ms INTEGER,
status TEXT, -- 'ok' | 'error'
error_message TEXT,
metadata TEXT, -- JSON with thinking tokens breakdown
created_at TEXT DEFAULT (datetime('now'))
);
This gives you dashboards like:
-- Cost per processor per cycle
SELECT operation, COUNT(*) as calls,
ROUND(SUM(cost_usd), 4) as total_cost,
ROUND(AVG(duration_ms)) as avg_ms
FROM api_calls_ledger
WHERE context_type = 'harness' AND context_id = 'market-scanner'
GROUP BY operation
ORDER BY total_cost DESC;
AI Gateway Integration
When the Workers AI binding is available, the system routes through Cloudflare AI Gateway for caching and logging:
function createModel(config: LLMConfig) {
const modelId = config.model || 'gemini-2.5-flash';
if (config.aiBinding) {
try {
const aigateway = createAiGateway({
binding: (config.aiBinding as any).gateway('aigateway'),
options: { cacheTtl: 300 },
});
const google = createGoogleGateway({ apiKey: config.apiKey });
return aigateway(google(modelId));
} catch (err) {
console.warn(`AI Gateway unavailable, falling back to direct`);
}
}
const google = createGoogleDirect({ apiKey: config.apiKey });
return google(modelId);
}
Key insight: Cost control is not an afterthought — it is built into every layer. The processor declares its budget weight. The harness enforces it. The LLM wrapper records actuals. The ledger enables analysis. You always know exactly how much each agent costs and whether the expensive model justified its price.
For batch operations that need to process many items with automatic retries, Cloudflare Workflows provides durable execution with step-level checkpointing.
The Scatter-Gather Pattern
The BatchResearchWorkflow processes multiple research seeds in parallel, with configurable concurrency:
import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from 'cloudflare:workers';
export interface BatchParams {
batchId: string;
seeds: string[];
concurrency?: number; // default 3
}
export class BatchResearchWorkflow extends WorkflowEntrypoint<Env, BatchParams> {
override async run(event: WorkflowEvent<BatchParams>, step: WorkflowStep) {
const { batchId, seeds, concurrency = 3 } = event.payload;
// Step 1: Create pipeline records (checkpointed)
const pipelineIds = await step.do('create-pipelines', async () => {
const ids: Array<{ id: string; seed: string }> = [];
for (const seed of seeds) {
const id = crypto.randomUUID();
await this.env.DB.prepare(
`INSERT INTO pipelines (id, type, seed, status, batch_id)
VALUES (?, 'keyword_research', ?, 'pending', ?)`,
)
.bind(id, seed, batchId)
.run();
ids.push({ id, seed });
}
return ids;
});
// Step 2: Process in parallel chunks
const allResults: SeedResult[] = [];
for (
let batchStart = 0;
batchStart < pipelineIds.length;
batchStart += concurrency
) {
const chunk = pipelineIds.slice(batchStart, batchStart + concurrency);
const chunkIndex = Math.floor(batchStart / concurrency);
// Each chunk is a retryable step
const chunkResults = await step.do(
`process-chunk-${chunkIndex}`,
{
retries: { limit: 2, delay: '10 seconds', backoff: 'exponential' },
timeout: '120 seconds',
},
async () => {
// Scatter: parallel execution within chunk
const settled = await Promise.allSettled(
chunk.map(async ({ id, seed }) => {
await this.env.DB.prepare(
`UPDATE pipelines SET status = 'running' WHERE id = ?`,
)
.bind(id)
.run();
try {
const result = await runKeywordResearchPipeline(
this.env, id, seed,
);
// Chain deep evaluation inline
try {
await runDeepEvalPipeline(
this.env, crypto.randomUUID(), id, seed, result,
);
} catch {
// Non-fatal: keyword research still succeeded
}
return {
seed,
pipelineId: id,
status: 'completed' as const,
recommendation: result.verdict?.recommendation,
cost: result.costs.total,
};
} catch (err: unknown) {
const msg =
err instanceof Error ? err.message : String(err);
await this.env.DB.prepare(
`UPDATE pipelines SET status = 'failed',
error_message = ? WHERE id = ?`,
)
.bind(msg.substring(0, 1000), id)
.run();
return {
seed,
pipelineId: id,
status: 'failed' as const,
error: msg,
};
}
}),
);
// Gather: collect results
return settled.map((r) =>
r.status === 'fulfilled'
? r.value
: {
seed: 'unknown',
pipelineId: 'unknown',
status: 'failed' as const,
error: r.reason?.message ?? 'Unknown error',
},
);
},
);
allResults.push(...chunkResults);
}
// Step 3: Finalize
await step.do('finalize', async () => {
const completed = allResults.filter((r) => r.status === 'completed');
const totalCost = completed.reduce((s, r) => s + (r.cost ?? 0), 0);
console.log(
`[batch] ${batchId}: ${completed.length}/${allResults.length} completed, $${totalCost.toFixed(4)}`,
);
});
return allResults;
}
}
Why Not Just Use the Harness?
The harness and workflows serve different purposes:
| Concern | Harness (DO) | Workflow |
|---|---|---|
| Execution model | Continuous alarm loop | One-shot durable run |
| State | Persistent across cycles | Per-execution, checkpointed |
| Concurrency | One processor per tick | Parallel within steps |
| Retries | Manual (try/catch + reschedule) | Built-in with backoff |
| Use case | Ongoing monitoring pipeline | Batch processing jobs |
| CPU limits | 2s per tick (alarm) | Per-step (configurable timeout) |
| Identity | Singleton per pipeline | One instance per batch |
Use the harness when: you need a continuously running pipeline that wakes up periodically, processes whatever is new, and goes back to sleep. The harness adapts to load — more data means more ticks per cycle.
Use workflows when: you have a batch of items to process right now, want parallel execution, and need automatic retries with backoff. Workflows are fire-and-forget with guaranteed completion.
NonRetryableError
Some errors should not be retried. Authentication failures, invalid input, and permanently missing resources should fail fast:
import { NonRetryableError } from 'cloudflare:workers';
const chunkResults = await step.do(
`process-chunk-${i}`,
{ retries: { limit: 2, delay: '10 seconds', backoff: 'exponential' } },
async () => {
// This will retry up to 2 times
const result = await callExternalApi(seed);
if (result.status === 401) {
// This will NOT retry -- authentication is broken
throw new NonRetryableError('API key is invalid');
}
if (result.status === 404) {
// This will NOT retry -- the resource does not exist
throw new NonRetryableError(`Seed "${seed}" not found`);
}
return result.data;
},
);
Wrangler Configuration
// wrangler.jsonc
{
"name": "multi-agent-pipeline",
"main": "src/index.ts",
"compatibility_date": "2025-04-01",
"d1_databases": [
{
"binding": "DB",
"database_name": "pipeline-db",
"database_id": "your-d1-database-id"
}
],
"durable_objects": {
"bindings": [
{
"name": "OPPORTUNITY_HARNESS",
"class_name": "OpportunityHarness"
}
]
},
"workflows": [
{
"name": "batch-research",
"binding": "BATCH_RESEARCH",
"class_name": "BatchResearchWorkflow"
}
],
"ai": {
"binding": "AI"
}
}
Key insight: Workflows and Durable Objects complement each other perfectly. The DO is the control plane — it decides what to process and manages budget. The Workflow is the data plane — it processes a specific batch with parallelism and retries. You can even trigger workflows from within a DO alarm tick.
Example 1: Minimal Agent Processor
A processor that scores text sentiment using Workers AI (free tier):
import { registerProcessor } from './harness';
registerProcessor(
'sentiment-scorer',
async (ctx): Promise<ProcessorResult[]> => {
const { db, ai, cycle } = ctx;
const items = await db
.prepare(
`SELECT id, content FROM review_queue
WHERE scored_at IS NULL LIMIT ?`,
)
.bind(ctx.processor.batch_size || 10)
.all();
const results: ProcessorResult[] = [];
for (const item of items.results) {
const start = Date.now();
const response = await ai.run('@cf/meta/llama-3.1-8b-instruct', {
prompt: `Rate this review sentiment from 0 (negative) to 10 (positive). Reply with just the number.\n\n"${item.content}"`,
max_tokens: 5,
});
const score = parseInt(response.response?.trim() || '5', 10);
await db
.prepare(
`UPDATE review_queue SET sentiment = ?, scored_at = datetime('now') WHERE id = ?`,
)
.bind(score, item.id)
.run();
results.push({
processor: 'sentiment-scorer',
entity_type: 'review',
entity_id: String(item.id),
model_used: '@cf/meta/llama-3.1-8b-instruct',
tokens_input: 0,
tokens_output: 0,
cost_usd: 0, // Workers AI free tier
duration_ms: Date.now() - start,
success: true,
});
}
return results;
},
);
Example 2: Input Hash Change Detection
Detecting when upstream data changes so agents reprocess automatically:
import { getUnprocessed, hashInputs, recordResult } from './agent-framework';
const AGENT: AgentConfig = { name: 'price-tracker', version: 2 };
async function checkForChanges(db: D1Database) {
// Build input hashes from current data
const products = await db
.prepare(`SELECT id, price, stock_status FROM products`)
.all();
const hashes = new Map<string, string>();
for (const p of products.results) {
hashes.set(
String(p.id),
hashInputs(p.price as number, p.stock_status as string),
);
}
// Get products that need reprocessing
const needsWork = await getUnprocessed(
db,
AGENT,
'product',
products.results.map((p) => String(p.id)),
'main',
hashes,
);
console.log(`${needsWork.length} products need reprocessing`);
return needsWork;
}
Example 3: Spawning Entities Across Phases
A processor that creates work for downstream processors:
registerProcessor(
'topic-splitter',
async (ctx): Promise<ProcessorResult[]> => {
const { cycle } = ctx;
// Read a broad topic from the queue
const topic = cycle.topics_to_split.shift();
if (!topic) return [];
// Split into sub-topics (deterministic)
const subtopics = splitByKeywordOverlap(topic);
return [
{
processor: 'topic-splitter',
entity_type: 'topic',
entity_id: topic.id,
model_used: null,
tokens_input: 0,
tokens_output: 0,
cost_usd: 0,
duration_ms: 0,
success: true,
result_summary: `Split into ${subtopics.length} sub-topics`,
// These spawn entities that downstream processors will pick up
spawned_entities: subtopics.map((st) => ({
type: 'subtopic',
id: st.id,
})),
},
];
},
);
Example 4: Agent Tool Calling
Using the LLM agent mode with tools for data lookup:
import { llmAgent } from './llm';
import { z } from 'zod';
const result = await llmAgent(
{
apiKey: env.GEMINI_API_KEY,
model: 'gemini-2.5-flash',
temperature: 0.3,
maxTokens: 4096,
},
'Analyze the competitive landscape for "habit tracker" apps.',
`You are a market analyst. Use the tools to gather data before forming your opinion.`,
{
search_apps: {
description: 'Search for apps by keyword',
parameters: z.object({ keyword: z.string() }),
execute: async (args: any) => {
const rows = await db
.prepare(
`SELECT track_name, rating_count, average_rating
FROM app_registry
WHERE track_name LIKE ? LIMIT 10`,
)
.bind(`%${args.keyword}%`)
.all();
return JSON.stringify(rows.results);
},
},
get_reviews: {
description: 'Get reviews for an app',
parameters: z.object({ app_name: z.string(), max: z.number() }),
execute: async (args: any) => {
const rows = await db
.prepare(
`SELECT title, content, score FROM app_reviews
WHERE track_id IN (
SELECT track_id FROM app_registry WHERE track_name LIKE ?
) LIMIT ?`,
)
.bind(`%${args.app_name}%`, args.max)
.all();
return JSON.stringify(rows.results);
},
},
},
8, // max 8 tool-calling steps
'competitive-analysis',
);
console.log(result.text); // Agent's analysis after using tools
console.log(`Cost: $${result.costUsd.toFixed(4)}, Steps: ${result.steps}`);
Example 5: Cycle State Initialization
How a cycle starts and selects its work:
private async startCycle() {
if (!this.config) return;
const cycleNum = this.state.cycle_count + 1;
this.state.current_cycle = {
cycle_id: null,
cycle_number: cycleNum,
phase: 'gather',
phase_index: 0,
cost_so_far: 0,
tokens_input: 0,
tokens_output: 0,
genres_to_scan: [],
niches_to_analyze: [],
evaluations_to_critique: [],
genres_scanned: 0,
niches_found: 0,
evaluations_created: 0,
evaluations_critiqued: 0,
research_tasks_created: 0,
processor_stats: {},
};
// Create cycle record in D1
const res = await this.env.DB.prepare(
`INSERT INTO harness_cycles
(harness_id, cycle_number, budget_limit, phase)
VALUES (?, ?, ?, 'gather')`,
)
.bind(this.config.id, cycleNum, this.config.budget_max_per_cycle)
.run();
this.state.current_cycle.cycle_id = res.meta.last_row_id as number;
// Auto-select genres by opportunity signal
if (this.config.focus_genres?.length) {
this.state.current_cycle.genres_to_scan =
this.config.focus_genres.slice(0, this.config.cycle_genres_limit);
} else {
const genres = await this.env.DB.prepare(
`SELECT primary_genre FROM app_registry
WHERE enrichment_source = 'scraper'
GROUP BY primary_genre
HAVING COUNT(*) >= 30
ORDER BY (
SUM(CASE WHEN days_since_update >= 180 THEN 1 ELSE 0 END) * 1.0
/ COUNT(*)
) DESC
LIMIT ?`,
)
.bind(this.config.cycle_genres_limit)
.all();
this.state.current_cycle.genres_to_scan = genres.results.map(
(g: any) => g.primary_genre,
);
}
}
Example 6: Telemetry Recording
How the harness tracks per-processor metrics in real time:
private recordResult(cycle: CycleState, result: ProcessorResult) {
// Update cycle totals
cycle.cost_so_far += result.cost_usd;
cycle.tokens_input += result.tokens_input;
cycle.tokens_output += result.tokens_output;
// Per-processor stats
if (!cycle.processor_stats[result.processor]) {
cycle.processor_stats[result.processor] = {
cost: 0,
calls: 0,
total_ms: 0,
failures: 0,
};
}
const stats = cycle.processor_stats[result.processor];
stats.cost += result.cost_usd;
stats.calls++;
stats.total_ms += result.duration_ms;
if (!result.success) stats.failures++;
// Log to DB (fire-and-forget)
if (cycle.cycle_id) {
this.env.DB.prepare(
`INSERT INTO harness_processor_log
(cycle_id, processor, entity_type, entity_id, model_used,
tokens_input, tokens_output, cost_usd, duration_ms,
success, error_message, result_summary)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)
.bind(
cycle.cycle_id,
result.processor,
result.entity_type,
result.entity_id,
result.model_used,
result.tokens_input,
result.tokens_output,
result.cost_usd,
result.duration_ms,
result.success ? 1 : 0,
result.error_message || null,
result.result_summary || null,
)
.run()
.catch(() => {}); // never fail the tick on log errors
}
// Handle spawned entities
if (result.spawned_entities) {
for (const entity of result.spawned_entities) {
if (entity.type === 'niche') {
cycle.niches_to_analyze.push({
genre: '',
niche_id: entity.id,
keywords: [],
});
} else if (entity.type === 'evaluation') {
cycle.evaluations_to_critique.push(entity.id);
}
}
}
}
Example 7: Prompt Engineering for Structured Output
How to write system prompts that work reliably with Zod schemas:
const CRITIC_SYSTEM = `You are an adversarial opportunity critic. Your job is to stress-test evaluations and find weaknesses.
RULES:
1. CHALLENGE ASSUMPTIONS: For each major claim, ask "what if this is wrong?"
2. HIDDEN RISKS: Look for risks the evaluator missed:
- Platform adding this feature natively
- Policy risks (App Store guidelines)
- Market timing (window closing?)
- Retention challenges
3. SCORE ADJUSTMENT: Be calibrated.
- -5 to -10: Minor concerns
- -10 to -20: Significant issues
- -20 to -30: Critical problems
- 0 to +10: Evaluator was too conservative
4. Be SPECIFIC: "Apple Health added fasting in iOS 18" not "platform risk exists."
5. one_line_verdict must be actionable.`;
Rules for prompts that produce reliable structured output:
- Numbered rules work better than prose paragraphs for instruction following.
- Calibration examples (“score -5 means X, score -20 means Y”) reduce variance.
- SPECIFIC in capitals tells the model you mean it.
- Negative examples (“not ‘platform risk exists’”) are more effective than positive examples alone.
- Zod
.describe()on schema fields acts as inline documentation for the model.
Example 8: Testing a Processor in Isolation
You can test any processor without the harness by constructing a mock context:
import { getProcessor } from './harness';
async function testEvaluator(db: D1Database, apiKey: string) {
const fn = getProcessor('evaluator');
if (!fn) throw new Error('evaluator not registered');
const cycle: CycleState = {
cycle_id: null,
cycle_number: 0,
phase: 'evaluate',
phase_index: 0,
cost_so_far: 0,
tokens_input: 0,
tokens_output: 0,
genres_to_scan: [],
niches_to_analyze: [
{
genre: 'Health & Fitness',
niche_id: 'test::fasting-tracker',
keywords: ['fasting tracker', 'intermittent fasting', 'fasting app'],
},
],
evaluations_to_critique: [],
genres_scanned: 0,
niches_found: 0,
evaluations_created: 0,
evaluations_critiqued: 0,
research_tasks_created: 0,
processor_stats: {},
};
const results = await fn({
db,
ai: null as any,
apiKey,
config: {
slug: 'test',
budget_max_per_cycle: 1.0,
cycle_critique_threshold: 60,
} as any,
processor: {
name: 'evaluator',
type: 'llm',
enabled: true,
phase: 'evaluate',
model_tier: 'default',
weight: 1.0,
max_cost_per_call: 0.10,
batch_size: 1,
},
cycle,
selectModel: () => 'gemini-2.5-flash',
remainingBudget: () => 1.0,
});
console.log('Results:', JSON.stringify(results, null, 2));
}
Multi-Agent Frameworks
| Feature | Workers-Native (this article) | LangGraph | CrewAI | AutoGen |
|---|---|---|---|---|
| Language | TypeScript | Python | Python | Python |
| Runtime | Cloudflare Workers (V8 isolate) | Any Python server | Any Python server | Any Python server |
| Cold start | <5ms | 2-10s (Python + deps) | 2-10s | 2-10s |
| Idle cost | $0 (scale to zero) | $25-100/month (server) | $25-100/month | $25-100/month |
| State management | D1 + DO storage (built-in) | External store (Redis, Postgres) | In-memory or external | In-memory or external |
| Agent topology | Phased pipeline with harness | Graph (DAG with cycles) | Role-based crews | Conversation-based |
| Structured output | Zod schemas via AI SDK | Pydantic via instructor | Pydantic | JSON schemas |
| Cost tracking | Per-call, per-agent, automatic | Manual | Manual | Manual |
| Budget enforcement | Built-in soft/hard modes | None built-in | None built-in | None built-in |
| Agent memory | D1-backed beliefs + episodic | LangGraph memory store | CrewAI memory | External |
| Concurrency | Workflows for parallel, DO for sequential | Native Python async | Task delegation | Agent chat threads |
| Durability | DO alarms + Workflow checkpoints | LangGraph Cloud (paid) | None built-in | None built-in |
| Deployment | wrangler deploy | Docker + infra | Docker + infra | Docker + infra |
| Debugging | D1 telemetry tables + AI Gateway logs | LangSmith (best-in-class) | Limited (logging issues) | Print statements |
| Best for | Budget-conscious production pipelines | Complex graph workflows | Team-metaphor tasks | Conversational agents |
When to Choose Each
Choose Workers-Native when:
- You want zero idle cost and sub-5ms cold starts
- You need per-agent cost tracking and budget enforcement
- Your pipeline has a clear phased structure (gather, analyze, evaluate, critique)
- You are already on Cloudflare or want to be
- You prefer TypeScript over Python
- Your agents process data, not chat with users
Choose LangGraph when:
- You need complex graph topologies with conditional branching and cycles
- LangSmith observability is important to you
- You are building a conversational agent with tool use
- You need the LangChain ecosystem (document loaders, vector stores, chains)
Choose CrewAI when:
- Your problem maps naturally to a team metaphor (researcher, writer, editor)
- You want the fastest path to a working multi-agent prototype
- Role-based delegation is your primary coordination pattern
Choose AutoGen when:
- You need human-in-the-loop approval at every step
- Your agents should negotiate in natural language
- You are prototyping and want maximum flexibility
Cloudflare Primitives vs External Infrastructure
| Capability | Workers-Native | External Equivalent | Advantage |
|---|---|---|---|
| Agent coordinator | Durable Object | Redis + custom code | DO: single-threaded, no race conditions, built-in persistence |
| Processing ledger | D1 | PostgreSQL | D1: zero config, auto-backup, SQL at the edge |
| Batch processing | Workflows | Temporal / Step Functions | Workflows: simpler API, no infra, built into Workers |
| LLM routing | AI Gateway | LiteLLM / OpenRouter | AI Gateway: native caching, logging, rate limiting |
| Cost tracking | Built-in api_calls_ledger | Manual instrumentation | Built-in: every call auto-recorded with context |
| Model inference | Workers AI (free tier) | Self-hosted / API calls | Workers AI: 10K free Neurons/day, no API key needed |
| Secrets management | Workers secrets | Vault / .env files | Workers: encrypted at rest, per-environment |
| Don’t | Do Instead | Why |
|---|---|---|
| Run all agents in a single Worker fetch | Use DO alarms with one processor per tick | Workers have a 30s CPU limit. A 6-agent pipeline will timeout. |
| Store agent state in global variables | Use D1 or DO ctx.storage | Global variables reset between requests in Workers. V8 isolates are ephemeral. |
Use Promise.all for 10+ LLM calls | Use Workflows with chunked Promise.allSettled | One failure in Promise.all rejects everything. allSettled lets you collect partial results. |
| Let agents pick their own models | Use harness-level model selection with budget awareness | Agents will always pick the best model. Budget-aware selection prevents cost overruns. |
| Store full LLM responses in the ledger | Store result_summary (short) + full output in separate table | Ledger queries need to be fast. Full LLM output can be 10KB+ per row. |
| Use untyped JSON for LLM output | Use Zod schemas with generateObject | Untyped JSON requires manual parsing, validation, and error handling. Zod handles all three. |
| Process all entities every cycle | Use the agent ledger with version tracking | Without the ledger, you reprocess unchanged entities. Version bumps give you targeted reprocessing. |
| Use a single budget for all processors | Allocate budget by processor weight | One expensive processor can starve all others. Weighted allocation ensures fairness. |
| Retry LLM calls indefinitely | Throw NonRetryableError for permanent failures | Auth failures and invalid inputs will never succeed. Retrying wastes money and time. |
| Store agent memory as flat text | Use structured tables with confidence scores and timestamps | Flat text cannot be queried, pruned, or analyzed. Structured memory enables belief dynamics. |
| Build a custom orchestrator from scratch | Start with DO alarms + processor registry | Custom orchestrators accumulate edge cases. The alarm-tick pattern handles interruptions, errors, and pauses naturally. |
| Use in-memory caches for LLM responses | Use AI Gateway with cacheTtl | In-memory caches die with the isolate. AI Gateway caches persist across requests and are shared globally. |
| Skip cost tracking during development | Record costs from day one via api_calls_ledger | Development costs add up fast with LLMs. Tracking from the start prevents surprise bills. |
| Make processors depend on execution order within a phase | Keep processors independent — share state via CycleState | Execution order within a phase is not guaranteed. Use the cycle’s work queues for coordination. |
Official Cloudflare Documentation
- Durable Objects — persistent, single-threaded compute objects with storage
- Durable Object Alarms — scheduled wake-ups for background processing
- D1 Database — serverless SQLite at the edge
- Cloudflare Workflows — durable execution engine with step-level checkpointing
- Workflows: Sleeping and Retrying — retry configuration, NonRetryableError
- Workers AI — run AI models on Cloudflare’s GPU fleet
- Workers AI Pricing — 10,000 free Neurons/day, $0.011/1K after
- AI Gateway — caching, logging, and rate limiting for LLM calls
- Cloudflare Agents SDK — framework for building AI agents on Workers
- Agents Patterns — sequential, routing, parallel, and orchestrator patterns
- Rules of Durable Objects — best practices for DO design
- Control and Data Plane Pattern for Durable Objects — architectural reference
- Durable Objects on Workers Free Plan — DOs available on free tier since April 2025
- Workers AI Changelog — latest model additions and feature updates
Cloudflare Blog Posts
- Workflows GA: Production-Ready Durable Execution — Workflows reaching general availability
- Building Workflows: Durable Execution on Workers — design philosophy behind Workflows
- Making Cloudflare the Best Platform for Building AI Agents — AI agent capabilities overview
- Building Agents with OpenAI and Cloudflare’s Agents SDK — integration patterns
AI SDK and LLM Integration
- Vercel AI SDK: generateObject — structured output with Zod schemas
- Vercel AI SDK: Generating Structured Data — guide to structured LLM output
- @ai-sdk/google — Google/Gemini provider for Vercel AI SDK
- ai-gateway-provider — Cloudflare AI Gateway integration for AI SDK
Multi-Agent Framework Comparisons
- First-Hand Comparison of LangGraph, CrewAI and AutoGen — practical experience building with all three
- CrewAI vs LangGraph vs AutoGen (DataCamp) — structured comparison with use case guidance
- LangGraph vs AutoGen vs CrewAI: Architecture Analysis 2025 (Latenode) — deep architectural comparison
- Mastering Agents: LangGraph vs AutoGen vs CrewAI (Galileo) — evaluation framework for agent systems
- Best AI Agent Frameworks 2025 (Maxim) — practical guide for choosing frameworks
- Open-Source AI Agent Frameworks Compared 2026 (OpenAgents) — latest framework landscape
Agent Architecture and Design
- Cloudflare Agents Patterns: Using the Agents SDK — community patterns and examples
- The Ultimate Guide to Cloudflare’s Durable Objects — comprehensive DO guide
- Cloudflare AI Week 2025 — latest AI capabilities and announcements
Libraries Used
- Zod — TypeScript-first schema validation
- Vercel AI SDK (ai) — unified API for LLM providers
- Cloudflare Workers SDK — runtime APIs for Workers, DOs, and Workflows