Skip to content
Gary Wu
Go back

Building an Autonomous Data Pipeline on Cloudflare Workers

Edit page

A single Cloudflare Worker discovered 1.1 million apps in 6 days, enriched them with full metadata, clustered them into market opportunities, and ran AI-powered competitive analysis — all for less than $20/month in platform costs. This article is the architecture guide, the pattern book, and the postmortem rolled into one.

What you’ll learn:


The Problem

You want to build a data pipeline that crawls external sources, enriches the raw data, scores and clusters it, and surfaces opportunities. Traditional approaches force you into one of three bad positions:

Option A: The AWS orchestration tax. AWS Step Functions charges $0.025 per 1,000 state transitions. A pipeline that processes a million items through five stages costs $125 in orchestration alone — before you pay for Lambda, DynamoDB, or SQS. You also inherit cold starts (100ms-2s per Lambda invocation), VPC configuration, IAM policy management, and CloudWatch log costs.

Option B: The dedicated server trap. You spin up an EC2 instance or a VPS, run your pipeline as a long-running process, and pay $30-100/month whether it’s working or sleeping. You get full control but lose scale-to-zero, global distribution, and automatic failover. When the process crashes at 3 AM, nobody notices until Monday.

Option C: The managed queue + TSDB approach. Google Cloud Tasks + Cloud Scheduler + BigQuery. Each component is well-designed, but the integration surface is enormous: IAM bindings between services, Pub/Sub subscriptions, Cloud Functions triggers, and a billing model that charges separately for each primitive. Cloud Scheduler alone costs $0.10 per job per month.

Cloudflare offers a fourth option: a single Worker binary that contains your entire pipeline — crawling, enrichment, scoring, clustering, and opportunity analysis — with D1 as the database, Queues for async fan-out, Durable Objects for stateful coordination, R2 for blob storage, and Workflows for multi-step durability. The Worker scales to zero when idle, cold-starts in under 5ms because it runs on V8 isolates (not containers), and deploys globally to 300+ locations.

The catch: Cloudflare’s primitives are younger and less documented than AWS/GCP equivalents. The patterns for building autonomous pipelines on Workers aren’t obvious. This article fills that gap with production code from a system that processed 1.1 million records across 64 D1 tables in its first week.


Core Concepts

The Pipeline Shape

Every autonomous data pipeline has the same abstract shape: discover raw data, enrich it, evaluate it, group it, and surface the best opportunities. The Cloudflare-native version maps each stage to a platform primitive.

// The five stages and their Cloudflare primitive mappings
interface PipelineArchitecture {
  // Stage 1: Crawl — discover raw entities from external sources
  crawl: {
    trigger: 'cron'            // Scheduled via Workers cron triggers
    execution: 'Worker'        // Stateless, one cycle per invocation
    storage: 'D1'              // app_registry, crawl_keywords, crawl_rankings
    rate_limit: 'in-code'      // Delays between API calls
  }

  // Stage 2: Enrich — add depth to discovered entities
  enrich: {
    trigger: 'DO alarm'        // Self-scheduling Durable Object
    execution: 'DurableObject'  // Stateful, adaptive rate control
    storage: 'D1 + R2'         // Metadata in D1, raw HTML in R2
    rate_limit: 'adaptive'     // DO adjusts batch size based on success rate
  }

  // Stage 3: Evaluate — score individual entities
  evaluate: {
    trigger: 'Queue'           // Event-driven, reacts to enrichment completion
    execution: 'Worker'        // Stateless scoring functions
    storage: 'D1'              // agent_ledger, competitiveness scores
    rate_limit: 'queue'        // Queue backpressure handles throttling
  }

  // Stage 4: Cluster — group entities into market segments
  cluster: {
    trigger: 'cron'            // Every 6 hours
    execution: 'Worker'        // CPU-intensive keyword clustering
    storage: 'D1'              // market_clusters, cluster_apps, cluster_keywords
    rate_limit: 'none'         // Pure computation, no external calls
  }

  // Stage 5: Opportunity — AI-powered analysis of clusters
  opportunity: {
    trigger: 'Queue'           // Reacts to cluster-ready events
    execution: 'Workflow'      // Multi-step durable execution
    storage: 'D1'              // debate_reports, opportunity_evaluations
    rate_limit: 'API Mom'      // Centralized LLM cost management
  }
}

Key insight: Each stage uses a different trigger mechanism because each has different scheduling requirements. Crawling needs regular cadence (cron). Enrichment needs adaptive pacing (DO alarm). Evaluation needs reactive fan-out (Queue). Clustering needs periodic batch processing (cron). Opportunity analysis needs durable multi-step execution (Workflow). Matching the trigger to the work pattern is the most important architectural decision.

The Binding Graph

A Cloudflare Worker’s wrangler.jsonc declares all its bindings — the resources it can access at runtime. For an autonomous pipeline, this configuration IS the architecture diagram.

// wrangler.jsonc — the entire infrastructure declared in one file
{
  "name": "pipeline-api",
  "main": "src/index.ts",
  "compatibility_date": "2025-03-07",
  "compatibility_flags": ["nodejs_compat"],

  // D1 — the pipeline's single source of truth
  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "pipeline-db",
      "database_id": "98f63c80-...",
      "migrations_dir": "migrations"
    }
  ],

  // R2 — blob storage for raw HTML, screenshots, large payloads
  "r2_buckets": [
    {
      "binding": "HTML_CACHE",
      "bucket_name": "pipeline-html-cache"
    }
  ],

  // Queues — async event-driven processing
  "queues": {
    "producers": [
      { "binding": "EVENTS", "queue": "pipeline-events" },
      { "binding": "ENRICHMENT_QUEUE", "queue": "pipeline-enrichment" }
    ],
    "consumers": [
      {
        "queue": "pipeline-events",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "dead_letter_queue": "pipeline-events-dlq"
      }
    ]
  },

  // Durable Objects — stateful coordination
  "durable_objects": {
    "bindings": [
      { "name": "ENRICHMENT_CONTROLLER", "class_name": "EnrichmentController" },
      { "name": "OPPORTUNITY_HARNESS", "class_name": "OpportunityHarness" }
    ]
  },
  "migrations": [
    { "tag": "v1", "new_classes": ["EnrichmentController"] },
    { "tag": "v2", "new_classes": ["OpportunityHarness"] }
  ],

  // Workflows — durable multi-step pipelines
  "workflows": [
    {
      "name": "batch-research",
      "binding": "BATCH_RESEARCH",
      "class_name": "BatchResearchWorkflow"
    }
  ],

  // Analytics Engine — free metrics without D1 cost
  "analytics_engine_datasets": [
    { "binding": "METRICS", "dataset": "platform_metrics" }
  ],

  // Cron triggers — the pipeline's heartbeat
  // */5 (crawler), */6h (clustering), daily (snapshots)
  "triggers": {
    "crons": ["*/5 * * * *", "0 */6 * * *", "0 6 * * *"]
  }
}

Key insight: Everything the Worker needs is declared in one file. No Terraform modules, no CloudFormation stacks, no IAM policies. Deploy with wrangler deploy and the entire pipeline — database, object storage, queues, durable objects, workflows, and cron triggers — is live. This is Cloudflare’s superpower for small teams: infrastructure-as-configuration, not infrastructure-as-code.

D1 as the Integration Layer

In this architecture, D1 is not just a database — it is the integration layer between pipeline stages. Stages don’t pass data through messages. They pass references (IDs) through Queues and read/write shared state in D1.

// The pipeline's data flow through D1
//
// Crawl stage WRITES:   app_registry, crawl_keywords, crawl_rankings
// Enrich stage READS:   app_registry (unenriched)
// Enrich stage WRITES:  app_registry (enriched fields), developers, app_reviews
// Evaluate stage READS: app_registry (enriched)
// Evaluate stage WRITES: agent_ledger (scores)
// Cluster stage READS:  crawl_rankings, crawl_keywords, app_registry
// Cluster stage WRITES: market_clusters, cluster_apps, cluster_keywords
// Opportunity READS:    market_clusters, agent_ledger
// Opportunity WRITES:   debate_reports, opportunity_evaluations

interface AppRegistry {
  track_id: number            // Primary key
  bundle_id: string | null
  track_name: string
  seller_name: string
  average_rating: number
  rating_count: number
  primary_genre: string | null
  price: number
  description: string | null

  // Enrichment fields (NULL until enriched)
  subtitle: string | null
  enriched_at: string | null
  enrichment_source: 'itunes' | 'scraper' | 'failed' | null
  rating_histogram: string | null   // JSON: [1star, 2star, 3star, 4star, 5star]
  in_app_purchases: string | null   // JSON array of IAP objects
  iap_scraped_at: string | null
  similar_apps: string | null       // JSON array of track_ids

  // Evaluation fields (NULL until evaluated)
  competitiveness_score: number | null
  rating_velocity_7d: number | null

  // Pipeline control
  enrich_priority: number       // Higher = enriched sooner
  first_seen_at: string
  last_seen_at: string
}

Key insight: The app_registry table serves all five pipeline stages. Each stage adds columns, never removes them. This is the “append-only schema” pattern — it prevents stage coupling while keeping all data about an entity in one place. The alternative (separate tables per stage) creates JOIN complexity that D1’s single-threaded query engine handles poorly at scale.


Patterns

Pattern 1: Priority-Based Work Scheduler

The most important pattern in an autonomous pipeline is the work scheduler. Each cron cycle has limited time and API budget. The scheduler must pick the highest-value work, not just the next item in a queue.

When to use it: Any pipeline stage that has more work available than it can process in one cycle. The crawler has categories to scan, keywords to search, apps to rank, and subtitles to backfill — all competing for the same 30-second cron window.

/**
 * Priority-based work scheduler.
 * Each cron cycle: assess all data gaps → pick the most critical → work on it.
 *
 * This is NOT a FIFO queue. It's a dynamic priority system that shifts
 * based on what the pipeline needs most right now.
 */

// Bandwidth allocation: weighted random selection
const TIER_WEIGHTS = { high: 90, medium: 7, low: 3 } as const
const TIER_STALE_DAYS = { high: 7, medium: 14, low: 30 } as const

export async function runCrawlCycle(db: D1Database): Promise<CrawlStats> {
  const stats: CrawlStats = {
    phase: 'adaptive',
    categories_scanned: 0,
    apps_discovered: 0,
    apps_ranked: 0,
    keywords_searched: 0,
    rankings_added: 0,
    enrichment: null,
    keywords_with_new_rankings: [],
  }

  // ── Step 1: Assess all gaps in ONE parallel query ──────────
  // This costs exactly 5 row reads. Not 21 COUNT(*) queries.
  const [pendingScan, unrankedEnriched, pendingKeywords, iapGap, enrichGap] =
    await Promise.all([
      db.prepare(`
        SELECT COUNT(*) as n FROM category_scans
        WHERE priority IN ('high', 'medium', 'low', 'active')
        AND (scanned_at IS NULL
          OR (priority IN ('high', 'active') AND scanned_at < datetime('now', '-7 days'))
          OR (priority = 'medium' AND scanned_at < datetime('now', '-14 days'))
          OR (priority = 'low' AND scanned_at < datetime('now', '-30 days'))
        )
      `).first<{ n: number }>(),
      db.prepare(`
        SELECT COUNT(*) as n FROM app_registry ar
        WHERE ar.enrichment_source = 'scraper'
          AND NOT EXISTS (
            SELECT 1 FROM crawl_rankings cr WHERE cr.track_id = ar.track_id
          )
      `).first<{ n: number }>(),
      db.prepare(`
        SELECT COUNT(*) as n FROM crawl_keywords WHERE status = 'pending'
      `).first<{ n: number }>(),
      db.prepare(`
        SELECT COUNT(*) as n FROM app_registry
        WHERE enrichment_source = 'scraper' AND iap_scraped_at IS NULL
      `).first<{ n: number }>(),
      db.prepare(`
        SELECT COUNT(*) as n FROM app_registry WHERE enriched_at IS NULL
      `).first<{ n: number }>(),
    ])

  const categoriesNeeded = (pendingScan?.n ?? 0) > 0
  const rankingsNeeded = (unrankedEnriched?.n ?? 0) > 0
  const keywordsNeeded = (pendingKeywords?.n ?? 0) > 0
  const iapBackfillNeeded = (iapGap?.n ?? 0) > 0

  // ── Step 2: Pick ONE primary task (highest priority wins) ──
  if (categoriesNeeded) {
    stats.phase = 'category_scan'
    const CATS_PER_CYCLE = 3
    for (let i = 0; i < CATS_PER_CYCLE; i++) {
      await runCategoryScan(db, stats)
    }
  } else if (keywordsNeeded) {
    stats.phase = 'keyword_mapping'
    await runKeywordMapping(db, stats)
  } else if (rankingsNeeded) {
    stats.phase = 'ranking_fill'
    await runRankingFill(db, stats)
  }

  // ── Step 3: Always-run side tasks (parallel, independent) ──
  const alwaysRun: Promise<void>[] = []

  if (iapBackfillNeeded) {
    const iapBatchSize = (iapGap?.n ?? 0) > 500 ? 20 : 10
    alwaysRun.push(
      backfillIAPs(db, iapBatchSize)
        .catch(err => console.error('[crawl] IAP backfill failed:', err))
    )
  }

  // Run the non-primary task as a side task
  if (keywordsNeeded && stats.phase !== 'keyword_mapping') {
    alwaysRun.push(
      runKeywordMapping(db, stats, 50)
        .catch(err => console.error('[crawl] keyword mapping failed:', err))
    )
  } else if (rankingsNeeded && stats.phase === 'keyword_mapping') {
    alwaysRun.push(
      runRankingFill(db, stats)
        .catch(err => console.error('[crawl] ranking fill failed:', err))
    )
  }

  await Promise.all(alwaysRun)
  return stats
}

The priority order is deliberate: category scanning discovers new entities (highest leverage), keyword mapping builds the ranking graph (needed for clustering), and ranking fill adds depth to already-known entities (lowest leverage). Within category scanning, a weighted random roll ensures high-priority categories get 90% of bandwidth while low-priority categories still make progress.

Gotcha: Starvation. Without the weighted random selection, low-priority categories would never get scanned because high-priority categories regenerate work every 7 days. The 90/7/3 split guarantees progress across all tiers while maintaining clear priorities.

Gotcha: Side-task interference. Side tasks run via Promise.all alongside the primary task. If a side task throws, the .catch() prevents it from crashing the primary task. Never use Promise.all without individual error handling on side tasks.

Pattern 2: Adaptive Durable Object Controller

The enrichment stage can’t use a fixed cron interval. External APIs have variable rate limits — Apple’s App Store returns 403s when you hit too hard, but happily serves requests at 10/second when load is low. You need a controller that ramps up when succeeding and backs off when blocked.

When to use it: Any pipeline stage that calls rate-limited external APIs where the rate limit is dynamic, unpublished, or IP-based. The Durable Object’s alarm API provides self-scheduling with persistent state — exactly what you need for adaptive rate control.

import { DurableObject } from 'cloudflare:workers'

// Safety rails (bounds, not targets)
const ABSOLUTE_MIN_BATCH = 2
const ABSOLUTE_MAX_BATCH = 50
const MIN_INTERVAL_MS = 10_000    // Fastest: every 10 seconds
const MAX_INTERVAL_MS = 120_000   // Slowest: every 2 minutes
const COOLDOWN_MS = 300_000       // 5-minute cooldown on critical failure

// Adaptive thresholds
const SUCCESS_RATE_CRITICAL = 0.2  // Below 20% → enter cooldown
const SUCCESS_RATE_LOW = 0.5       // Below 50% → halve batch, slow interval
const SUCCESS_RATE_GOOD = 0.8      // Above 80% → gentle ramp up
const SUCCESS_RATE_GREAT = 0.95    // Above 95% → aggressive ramp up
const RAMP_UP_FACTOR = 1.25       // +25% batch when great
const RAMP_DOWN_FACTOR = 0.5      // -50% batch when struggling

interface ControllerState {
  total_dispatched: number
  total_completed: number
  total_failed: number
  last_tick_at: number
  running: boolean
  current_batch_size: number
  current_interval_ms: number
  cooldown_until: number
  // Rolling metrics keyed by minute timestamp
  metrics: Record<number, {
    dispatched: number
    completed: number
    failed: number
  }>
}

export class EnrichmentController extends DurableObject<Env> {
  private state: ControllerState

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env)
    ctx.blockConcurrencyWhile(async () => {
      const saved = await ctx.storage.get<ControllerState>('state')
      this.state = saved
        ? { ...DEFAULT_STATE, ...saved }
        : { ...DEFAULT_STATE }
      this.pruneMetrics()
      // Resume if we were running before a deploy/restart
      if (this.state.running) {
        const existing = await ctx.storage.getAlarm()
        if (!existing) {
          await ctx.storage.setAlarm(
            Date.now() + this.state.current_interval_ms
          )
        }
      }
    })
  }

  // The alarm IS the pipeline. No external trigger needed.
  override async alarm(): Promise<void> {
    if (!this.state.running) return

    // Cooldown check
    if (Date.now() < this.state.cooldown_until) {
      await this.scheduleNext()
      return
    }

    // Pick apps to enrich, prioritizing flagged items
    const toEnrich = await this.env.DB.prepare(`
      SELECT track_id FROM app_registry
      WHERE enriched_at IS NULL
        OR (enrichment_source = 'itunes'
            AND enriched_at < datetime('now', '-14 days'))
      ORDER BY enrich_priority DESC, rating_count DESC
      LIMIT ?
    `).bind(this.state.current_batch_size)
      .all<{ track_id: number }>()

    if (toEnrich.results.length === 0) {
      await this.scheduleNext()
      return
    }

    const trackIds = toEnrich.results.map(r => r.track_id)
    this.recordMetric('dispatched', trackIds.length)

    // Enrich in parallel batches of 8
    const { enriched, failed } = await this.enrichBatch(trackIds)

    this.state.total_completed += enriched
    this.state.total_failed += failed
    this.state.last_tick_at = Date.now()
    this.recordMetric('completed', enriched)
    this.recordMetric('failed', failed)

    // THE KEY: adapt based on results
    this.adapt()

    await this.persist()
    await this.scheduleNext()
  }

  private adapt() {
    const sr = this.successRate()
    if (this.recentTotal() < 5) return // not enough data

    if (sr < SUCCESS_RATE_CRITICAL) {
      // Catastrophic failure: enter cooldown
      this.state.cooldown_until = Date.now() + COOLDOWN_MS
      this.state.current_batch_size = ABSOLUTE_MIN_BATCH
      this.state.current_interval_ms = MAX_INTERVAL_MS
    } else if (sr < SUCCESS_RATE_LOW) {
      // Struggling: halve batch, slow down
      this.state.current_batch_size = Math.max(
        ABSOLUTE_MIN_BATCH,
        Math.floor(this.state.current_batch_size * RAMP_DOWN_FACTOR)
      )
      this.state.current_interval_ms = Math.min(
        MAX_INTERVAL_MS,
        Math.floor(this.state.current_interval_ms * 1.5)
      )
    } else if (sr > SUCCESS_RATE_GREAT) {
      // Thriving: ramp up
      this.state.current_batch_size = Math.min(
        ABSOLUTE_MAX_BATCH,
        Math.ceil(this.state.current_batch_size * RAMP_UP_FACTOR)
      )
      this.state.current_interval_ms = Math.max(
        MIN_INTERVAL_MS,
        Math.floor(this.state.current_interval_ms * 0.8)
      )
    } else if (sr > SUCCESS_RATE_GOOD) {
      // Good: gentle ramp
      this.state.current_batch_size = Math.min(
        ABSOLUTE_MAX_BATCH,
        Math.ceil(this.state.current_batch_size * 1.1)
      )
      this.state.current_interval_ms = Math.max(
        MIN_INTERVAL_MS,
        Math.floor(this.state.current_interval_ms * 0.95)
      )
    }
  }

  private async scheduleNext() {
    if (this.state.running) {
      await this.ctx.storage.setAlarm(
        Date.now() + this.state.current_interval_ms
      )
    }
  }

  // Rolling 5-minute success rate
  private successRate(): number {
    const windowStart = Date.now() - 5 * 60_000
    let completed = 0, failed = 0
    for (const [ts, m] of Object.entries(this.state.metrics)) {
      if (Number(ts) >= windowStart) {
        completed += m.completed
        failed += m.failed
      }
    }
    const total = completed + failed
    return total === 0 ? 1 : completed / total
  }
}

The controller exposes an HTTP API for operational control:

// GET /status — current state, success rate, projections
// POST /start — resume processing
// POST /stop  — pause processing
// POST /reset — reset all state to defaults

override async fetch(request: Request): Promise<Response> {
  const path = new URL(request.url).pathname

  if (request.method === 'GET' && path === '/status') {
    const sr = this.successRate()
    const rate = this.recentCompletionsPerMinute()
    return Response.json({
      running: this.state.running,
      total_dispatched: this.state.total_dispatched,
      total_completed: this.state.total_completed,
      total_failed: this.state.total_failed,
      completions_per_minute: rate,
      success_rate: Math.round(sr * 100),
      current_batch_size: this.state.current_batch_size,
      current_interval_ms: this.state.current_interval_ms,
      in_cooldown: Date.now() < this.state.cooldown_until,
      projected_per_day: rate * 60 * 24,
    })
  }

  if (request.method === 'POST' && path === '/stop') {
    this.state.running = false
    await this.persist()
    return Response.json({ ok: true, running: false })
  }

  // ...
}

Gotcha: Singleton identity. The DO must be addressed as idFromName('main') to ensure only one instance exists globally. If you use idFromString with different IDs, you get multiple controllers competing for the same work queue.

Gotcha: Location hints. Use locationHint: 'enam' when creating the stub if your external API is in a specific region. This pins the DO to a datacenter near the API, reducing latency by 50-200ms per request.

// In the cron handler or API route
const id = env.ENRICHMENT_CONTROLLER.idFromName('main')
const stub = env.ENRICHMENT_CONTROLLER.get(id, { locationHint: 'enam' })
const status = await stub.fetch(new Request('http://do/status'))

Connection to Pattern 1: The crawler (Pattern 1) discovers entities. The enrichment controller (Pattern 2) enriches them. They don’t coordinate directly — the crawler writes to app_registry with enriched_at IS NULL, and the controller queries for those rows. D1 is the integration layer.

Pattern 3: Event-Driven Agent Pipeline

After crawling and enrichment, the pipeline needs to evaluate, cluster, and analyze data. These stages are triggered by events, not schedules. When apps get enriched, run competitiveness scoring. When rankings update, run demand analysis. When clusters form, run fragmentation analysis.

When to use it: Any pipeline where later stages should react to earlier stage completions rather than polling on a schedule. The event-driven pattern eliminates wasted cycles (polling when nothing changed) and reduces latency (processing starts immediately after data arrives).

// The cron handler emits events after crawl work completes
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
  if (event.cron === '*/5 * * * *') {
    const stats = await runCrawlCycle(env.DB)

    // Write lightweight metrics (NOT 21 COUNT queries)
    env.METRICS.writeDataPoint({
      blobs: ['pipeline', 'cron', stats.phase],
      doubles: [
        stats.apps_discovered,
        stats.rankings_added,
        stats.keywords_searched,
        stats.categories_scanned,
      ],
      indexes: ['pipeline'],
    })

    // Emit events for downstream stages
    const events: { body: AgentEvent }[] = []

    if (stats.enrichment?.enriched_track_ids?.length) {
      events.push({
        body: {
          type: 'app-enriched',
          track_ids: stats.enrichment.enriched_track_ids,
        },
      })
    }

    if (stats.keywords_with_new_rankings?.length) {
      events.push({
        body: {
          type: 'rankings-updated',
          keywords: stats.keywords_with_new_rankings,
        },
      })
    }

    if (events.length > 0) {
      await env.EVENTS.sendBatch(events as any)
    }
  }
}

The queue consumer routes events to the appropriate handler:

// Each event type triggers a specific analysis agent
async queue(batch: MessageBatch<AgentEvent>, env: Env) {
  for (const msg of batch.messages) {
    const event = msg.body
    try {
      switch (event.type) {
        case 'app-enriched': {
          // Enrichment complete → score competitiveness
          const result = await runCompetitivenessAgent(env.DB)
          console.log(
            `[queue] competitiveness: processed=${result.processed}`
          )
          break
        }

        case 'rankings-updated': {
          // New ranking data → analyze demand patterns
          const result = await runDemandAgent(env.DB)
          console.log(
            `[queue] demand: clusters=${result.clusters_found}`
          )
          break
        }

        case 'clusters-ready': {
          // Clusters formed → run fragmentation + competitive analysis
          await runCompetitivenessAgent(env.DB)
          const fragResult = await runFragmentationAgent(env.DB)

          // Chain: fragmentation-done triggers design agent
          if (fragResult.clusters_analyzed > 0) {
            await env.EVENTS.send({
              type: 'fragmentation-done',
              cluster_ids: fragResult.scores.map(s => s.cluster_id),
            })
          }
          break
        }

        case 'fragmentation-done': {
          // Market fragmented → generate design concepts
          const result = await runDesignAgent(env.DB)
          console.log(
            `[queue] design: concepts=${result.concepts_generated}`
          )
          break
        }
      }

      msg.ack()
    } catch (err) {
      console.error(`[queue] Failed: ${event.type}`, err)
      msg.retry()
    }
  }
}

This creates an event chain: app-enriched → competitiveness → rankings-updated → demand → clusters-ready → fragmentation → fragmentation-done → design. Each step is independently retriable, and the queue provides backpressure automatically.

Gotcha: Fan-out limits. Cloudflare Queues supports only one consumer per queue. If you need multiple consumers for the same event, use a fan-out pattern in the single consumer: receive the event, then dispatch it to multiple handlers.

Gotcha: Message ordering. Queues do not guarantee ordering. Design handlers to work regardless of arrival order. If rankings-updated arrives before app-enriched, the demand agent should still work correctly (it queries D1 for current state, not the message payload).

Pattern 4: Cron-Driven Exit Conditions

The most expensive mistake in an autonomous pipeline is running expensive queries when there’s nothing to do. A * * * * * cron fires 1,440 times per day. If each invocation runs 21 COUNT(*) queries on million-row tables, you burn billions of D1 row reads per month — and get zero useful work done.

When to use it: Every cron handler, without exception. The first thing a cron handler does is check whether there’s work to do. If not, it returns immediately.

async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
  if (event.cron === '*/5 * * * *') {
    // ── Exit early: 3 cheap queries instead of 21 expensive ones ──
    const [pendingScan, pendingKeywords, enrichGap] = await Promise.all([
      env.DB.prepare(`
        SELECT COUNT(*) as n FROM category_scans
        WHERE scanned_at IS NULL
          OR scanned_at < datetime('now', '-7 days')
        LIMIT 1
      `).first<{ n: number }>(),
      env.DB.prepare(`
        SELECT COUNT(*) as n FROM crawl_keywords
        WHERE status = 'pending'
        LIMIT 1
      `).first<{ n: number }>(),
      env.DB.prepare(`
        SELECT COUNT(*) as n FROM app_registry
        WHERE enriched_at IS NULL
        LIMIT 1
      `).first<{ n: number }>(),
    ])

    const hasWork =
      (pendingScan?.n ?? 0) > 0 ||
      (pendingKeywords?.n ?? 0) > 0 ||
      (enrichGap?.n ?? 0) > 0

    if (!hasWork) {
      // Write "idle" metric (free via Analytics Engine)
      env.METRICS.writeDataPoint({
        blobs: ['pipeline', 'cron', 'crawl_idle'],
        doubles: [0, 0, 0, 0],
        indexes: ['pipeline'],
      })
      return // Done. Cost: 3 row reads.
    }

    // Actual work follows...
    const stats = await runCrawlCycle(env.DB)
    // ...
  }
}

The $19.63 lesson. The original pipeline ran * * * * * (every minute) and computed a full health snapshot on every invocation. The health snapshot ran 21 COUNT(*) queries across tables with hundreds of thousands to millions of rows. Over one billing cycle, this accumulated 19.6 billion D1 row reads — costing $19.63 in a system that produced zero product output because the downstream pipeline stages weren’t connected yet.

The fix was three-fold:

  1. Slow the cron: * * * * * became */5 * * * *. The 5x reduction in frequency was free and immediate.
  2. Add exit conditions: Check for work before doing work. If nothing changed, return immediately.
  3. Replace D1 snapshots with Analytics Engine: Instead of writing health metrics to D1 (which costs row reads to query later), write them to Analytics Engine (which is currently free and has 100K writes/day on the included tier).
// BEFORE: 21 COUNT(*) queries every minute, stored in D1
// Cost: ~20B row reads/month → $19.63
async function computeAndStoreHealthSnapshot(db: D1Database) {
  const [totalApps, enrichedApps, pendingApps, totalKeywords,
         pendingKeywords, totalClusters, totalReviews, /* ... 14 more */] =
    await Promise.all([
      db.prepare('SELECT COUNT(*) as n FROM app_registry').first(),
      db.prepare('SELECT COUNT(*) as n FROM app_registry WHERE enrichment_source = ?').bind('scraper').first(),
      // ... 19 more COUNT(*) queries ...
    ])
  // Store in D1 scan_snapshots table (more row writes)
  await db.prepare('INSERT INTO scan_snapshots ...').bind(/* ... */).run()
}

// AFTER: 3 lightweight queries, metrics to Analytics Engine
// Cost: ~3 row reads per cycle → $0.00
env.METRICS.writeDataPoint({
  blobs: ['pipeline', 'cron', stats.phase],
  doubles: [stats.apps_discovered, stats.rankings_added, 0, 0],
  indexes: ['pipeline'],
})

Gotcha: LIMIT 1 on existence checks. When you only care whether rows exist (not how many), add LIMIT 1 to your COUNT queries. D1 stops scanning after finding one match instead of counting all matches. On a million-row table, this reduces row reads from 1,000,000 to 1.

Connection to Pattern 2: The enrichment controller (Pattern 2) doesn’t need exit conditions because it’s self-scheduling. When it finds zero apps to enrich, it simply schedules the next alarm and returns. The alarm itself is essentially free — no D1 queries run until the alarm fires.

Pattern 5: Parallel Batch Processing with D1 Batch Limits

D1 has a hard limit on batch size: 100 statements per db.batch() call. When you’re inserting or updating thousands of rows, you need to chunk your batches.

When to use it: Every D1 write path that processes variable-length input. If you INSERT 200 apps from a category scan, you need two batches of 100, not one batch of 200.

/**
 * Upsert apps into the registry, respecting D1's 100-statement batch limit.
 * Uses ON CONFLICT for idempotent upserts — safe to run on duplicates.
 */
async function upsertScrapedApps(db: D1Database, apps: any[]): Promise<void> {
  const stmt = db.prepare(`
    INSERT INTO app_registry (
      track_id, bundle_id, track_name, seller_name,
      average_rating, rating_count, primary_genre,
      price, formatted_price, artwork_url, track_view_url,
      description, release_date, version, content_rating,
      languages, developer_id, developer_website,
      last_seen_at
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))
    ON CONFLICT(track_id) DO UPDATE SET
      track_name = excluded.track_name,
      average_rating = excluded.average_rating,
      rating_count = excluded.rating_count,
      price = excluded.price,
      description = excluded.description,
      version = excluded.version,
      last_seen_at = datetime('now')
  `)

  const batch: D1PreparedStatement[] = []
  for (const app of apps) {
    batch.push(stmt.bind(
      app.id,
      app.appId || null,
      app.title || 'Unknown',
      app.developer || 'Unknown',
      app.score || 0,
      app.reviews || 0,
      app.primaryGenre || null,
      app.price || 0,
      app.free ? 'Free' : `$${app.price || 0}`,
      app.icon || null,
      app.url || null,
      app.description ? app.description.slice(0, 2000) : null,
      app.released || null,
      app.version || null,
      app.contentRating || null,
      app.languages ? JSON.stringify(app.languages) : null,
      app.developerId || null,
      app.developerWebsite || null,
    ))
  }

  // Chunk into D1-safe batches of 100
  for (let i = 0; i < batch.length; i += 100) {
    await db.batch(batch.slice(i, i + 100))
  }

  // Also upsert developers (deduped in-memory first)
  const seenDevs = new Set<number>()
  const devBatch: D1PreparedStatement[] = []
  const devStmt = db.prepare(`
    INSERT INTO developers (developer_id, name, website)
    VALUES (?, ?, ?)
    ON CONFLICT(developer_id) DO UPDATE SET
      name = excluded.name,
      website = COALESCE(excluded.website, developers.website)
  `)

  for (const app of apps) {
    if (app.developerId && !seenDevs.has(app.developerId)) {
      seenDevs.add(app.developerId)
      devBatch.push(devStmt.bind(
        app.developerId,
        app.developer || 'Unknown',
        app.developerWebsite || null,
      ))
    }
  }

  for (let i = 0; i < devBatch.length; i += 100) {
    await db.batch(devBatch.slice(i, i + 100))
  }
}

Gotcha: Prepare once, bind many. Call db.prepare() once outside the loop, then call .bind() for each row. Preparing inside the loop creates unnecessary overhead.

Gotcha: In-memory deduplication. When upserting developers alongside apps, multiple apps may share the same developer. Deduplicate in-memory with a Set before building the batch, otherwise you waste batch slots on redundant upserts.

Gotcha: Description truncation. D1 has a 1MB maximum row size. Truncate large text fields (descriptions, release notes) to a reasonable limit (2000 characters) at insert time, not at query time.

Pattern 6: Weighted Tier Selection

When scanning categories, not all categories are equal. Health & Fitness and Finance apps monetize at 10x the rate of sticker packs. The pipeline should spend most of its bandwidth on high-value categories while still making progress on lower tiers.

When to use it: Any pipeline stage where work items have different value tiers and you want to allocate bandwidth proportionally without fully starving lower tiers.

/**
 * Weighted random tier selection for category scanning.
 * High: 90% of bandwidth, re-scan every 7 days
 * Medium: 7%, re-scan every 14 days
 * Low: 3%, re-scan every 30 days
 */
async function runCategoryScan(
  db: D1Database,
  stats: CrawlStats
): Promise<void> {
  // Weighted random selection
  const roll = Math.random() * 100
  const tierOrder = roll < 90
    ? [['high', 'active'], ['medium'], ['low']]
    : roll < 97
      ? [['medium'], ['high', 'active'], ['low']]
      : [['low'], ['high', 'active'], ['medium']]

  // Try each tier in priority order until we find work
  let next: Record<string, unknown> | null = null
  for (const tierGroup of tierOrder) {
    const placeholders = tierGroup.map(() => '?').join(', ')
    const staleDays = tierGroup.includes('high') || tierGroup.includes('active')
      ? 7 : tierGroup.includes('medium') ? 14 : 30

    next = await db.prepare(`
      SELECT id, collection, category, category_name, priority
      FROM category_scans
      WHERE priority IN (${placeholders})
        AND (scanned_at IS NULL
          OR scanned_at < datetime('now', '-' || ? || ' days'))
      ORDER BY sort_order ASC, scanned_at ASC NULLS FIRST
      LIMIT 1
    `).bind(...tierGroup, staleDays).first()

    if (next) break
  }

  if (!next) return

  // Scan the category: fetch top 200 apps from the store
  const apps = await appstore.list({
    collection: collectionValue,
    category: categoryValue,
    num: 200,
    country: 'us',
  })

  if (apps.length > 0) {
    await upsertScrapedApps(db, apps)
    stats.apps_discovered += apps.length

    // Store chart positions with history
    const chartBatch: D1PreparedStatement[] = []
    const histBatch: D1PreparedStatement[] = []
    for (let i = 0; i < apps.length; i++) {
      chartBatch.push(
        db.prepare(`
          INSERT INTO chart_positions
            (track_id, collection, category, rank, snapshot_at)
          VALUES (?, ?, ?, ?, datetime('now'))
          ON CONFLICT(track_id, collection, category) DO UPDATE SET
            rank = excluded.rank, snapshot_at = excluded.snapshot_at
        `).bind(apps[i].id, collectionKey, categoryKey, i + 1)
      )
      histBatch.push(
        db.prepare(`
          INSERT INTO chart_position_history
            (track_id, collection, category, rank)
          VALUES (?, ?, ?, ?)
        `).bind(apps[i].id, collectionKey, categoryKey, i + 1)
      )
    }

    // Interleave chart + history in same batch for atomicity
    for (let i = 0; i < chartBatch.length; i += 50) {
      await db.batch([
        ...chartBatch.slice(i, i + 50),
        ...histBatch.slice(i, i + 50),
      ])
    }

    // Derive keywords from discovered app titles
    await deriveKeywordsFromApps(db, apps)
  }

  // Mark as scanned regardless of result
  await db.prepare(`
    UPDATE category_scans SET scanned_at = datetime('now'), apps_found = ?
    WHERE id = ?
  `).bind(apps.length, next.id).run()

  stats.categories_scanned++
}

Gotcha: scanned_at ASC NULLS FIRST. SQLite (and thus D1) sorts NULLs last by default. NULLS FIRST ensures never-scanned categories are picked before stale categories. Without this, new categories added later would never get scanned until all stale categories are re-scanned.

Connection to Pattern 1: The category scanner is one of the three primary tasks in the priority-based scheduler. It runs only when categoriesNeeded is true — meaning there are unscanned or stale category/collection combinations.

Pattern 7: Keyword Derivation from Discovered Data

Rather than starting with a static keyword list, the pipeline derives keywords organically from the data it discovers. App titles and subtitles contain the keywords those apps rank for. Extract them, search for them, and build a ranking graph.

When to use it: Any pipeline where you want to discover the full search landscape without pre-seeding a keyword list. The pipeline bootstraps itself: category scans → app titles → derived keywords → search results → more apps → more keywords.

/**
 * Generate search keywords from an app's title and subtitle.
 * Returns targeted terms that this app likely ranks for.
 */
function generateAppKeywords(title: string, subtitle: string): string[] {
  const terms = new Set<string>()
  const stopWords = new Set([
    'the', 'and', 'for', 'with', 'your', 'you', 'app', 'free',
    'pro', 'lite', 'plus', 'premium', 'new', 'top', 'best',
    'get', 'now', 'ios', 'iphone', 'ipad', 'by', 'my', 'de',
  ])

  const clean = (s: string) => s.toLowerCase()
    .replace(/[^a-z\s]/g, ' ')
    .replace(/\s+/g, ' ')
    .trim()

  const titleClean = clean(title)
  const subtitleClean = clean(subtitle)
  const titleWords = titleClean.split(' ')
    .filter(w => w.length > 2 && !stopWords.has(w))
  const subtitleWords = subtitleClean.split(' ')
    .filter(w => w.length > 2 && !stopWords.has(w))

  // Full title (if reasonable length)
  if (titleWords.length >= 1 && titleWords.length <= 5) {
    terms.add(titleWords.join(' '))
  }

  // Full subtitle
  if (subtitleWords.length >= 2 && subtitleWords.length <= 5) {
    terms.add(subtitleWords.join(' '))
  }

  // Bigrams from title
  for (let i = 0; i + 1 < titleWords.length; i++) {
    terms.add(`${titleWords[i]} ${titleWords[i + 1]}`)
  }

  // Bigrams from subtitle
  for (let i = 0; i + 1 < subtitleWords.length; i++) {
    terms.add(`${subtitleWords[i]} ${subtitleWords[i + 1]}`)
  }

  // Cross-pollinate: title word × subtitle word
  for (const tw of titleWords.slice(0, 3)) {
    for (const sw of subtitleWords.slice(0, 3)) {
      if (tw !== sw) terms.add(`${tw} ${sw}`)
    }
  }

  // Individual distinctive words (5+ chars)
  for (const w of [...titleWords, ...subtitleWords]) {
    if (w.length >= 5) terms.add(w)
  }

  return [...terms]
}

/**
 * Derive keywords from app titles — insert as pending crawl keywords.
 * Uses ON CONFLICT DO NOTHING to avoid re-inserting known keywords.
 */
async function deriveKeywordsFromApps(
  db: D1Database,
  apps: any[]
): Promise<void> {
  const keywords = new Set<string>()

  for (const app of apps) {
    const title = (app.title || '').toLowerCase()
      .replace(/[^a-z\s]/g, ' ')
      .replace(/\s+/g, ' ')
      .trim()
    const words = title.split(' ')
      .filter((w: string) => w.length > 2 && !stopWords.has(w))

    // Single words (5+ chars only — shorter are too generic)
    for (const w of words) {
      if (w.length >= 5) keywords.add(w)
    }
    // Bigrams
    for (let i = 0; i + 1 < words.length; i++) {
      keywords.add(`${words[i]} ${words[i + 1]}`)
    }
    // Full title if 2-4 words
    if (words.length >= 2 && words.length <= 4) {
      keywords.add(words.join(' '))
    }
  }

  if (keywords.size === 0) return

  const stmt = db.prepare(`
    INSERT INTO crawl_keywords (keyword, source, source_keyword, depth)
    VALUES (?, 'category_scan', NULL, 0)
    ON CONFLICT(keyword) DO NOTHING
  `)

  const batch = [...keywords].map(kw => stmt.bind(kw))
  for (let i = 0; i < batch.length; i += 100) {
    await db.batch(batch.slice(i, i + 100))
  }
}

The self-bootstrapping cycle works like this:

  1. Category scan discovers 200 apps per category
  2. deriveKeywordsFromApps extracts bigrams and distinctive words from titles
  3. Keywords are inserted as status = 'pending' in crawl_keywords
  4. Next cycle, runKeywordMapping searches those keywords
  5. Search results discover new apps not in any category list
  6. Those apps get enriched, their titles generate more keywords
  7. The cycle converges when most keywords return apps already in the registry

Gotcha: Keyword explosion. Without length filtering (w.length >= 5 for single words), common short words like “run” or “fit” generate thousands of low-value keywords. Each keyword costs one API call to search. The 5-character minimum eliminates noise while keeping distinctive terms like “fitness”, “tracker”, “meditation”.


Small Examples

Example 1: Parallel Search with Rate Limiting

/**
 * Search keywords in parallel batches with rate limiting.
 * 10 concurrent searches per batch, 1s between batches.
 */
async function runKeywordMapping(
  db: D1Database,
  stats: CrawlStats,
  batchSize = 100
): Promise<void> {
  const toSearch = await db.prepare(`
    SELECT keyword FROM crawl_keywords
    WHERE status = 'pending'
    ORDER BY priority DESC, created_at ASC
    LIMIT ?
  `).bind(batchSize).all()

  const PARALLEL = 10

  for (let i = 0; i < toSearch.results.length; i += PARALLEL) {
    const chunk = toSearch.results.slice(i, i + PARALLEL)

    await Promise.allSettled(chunk.map(async (row) => {
      const kw = row.keyword as string
      try {
        const apps = await searchApps(kw, 'us', 25)
        stats.keywords_searched++

        if (apps.length > 0) {
          const stmt = db.prepare(`
            INSERT INTO crawl_rankings (keyword, track_id, rank, crawled_at)
            VALUES (?, ?, ?, datetime('now'))
            ON CONFLICT(keyword, track_id) DO UPDATE SET
              rank = excluded.rank, crawled_at = datetime('now')
          `)
          const batch = apps.map((app, idx) =>
            stmt.bind(kw, app.trackId, idx + 1)
          )
          for (let j = 0; j < batch.length; j += 100) {
            await db.batch(batch.slice(j, j + 100))
          }
          stats.rankings_added += apps.length
        }

        await db.prepare(`
          UPDATE crawl_keywords
          SET status = 'searched', app_count = ?,
              last_searched_at = datetime('now')
          WHERE keyword = ?
        `).bind(apps.length, kw).run()
      } catch (err) {
        console.error(`[crawl] Search failed for "${kw}":`, err)
      }
    }))

    // Delay between batches, not between individual searches
    if (i + PARALLEL < toSearch.results.length) {
      await new Promise(r => setTimeout(r, 1000))
    }
  }
}

Example 2: IAP Backfill with Adaptive Batch Size

/**
 * Backfill in-app purchase data for enriched apps.
 * Batch size adapts to gap size: larger gap → larger batches.
 */
async function backfillIAPs(
  db: D1Database,
  batchSize: number
): Promise<void> {
  const { batchScrapeIAPs } = await import('./iap-scraper')

  const rows = await db.prepare(`
    SELECT track_id FROM app_registry
    WHERE enrichment_source = 'scraper'
      AND iap_scraped_at IS NULL
    ORDER BY rating_count DESC
    LIMIT ?
  `).bind(batchSize).all()

  if (rows.results.length === 0) return

  const trackIds = rows.results.map(r => r.track_id as number)
  const results = await batchScrapeIAPs(trackIds, {
    parallel: 3,
    delayMs: 1500,
  })

  for (const r of results) {
    const iapJson = r.has_iaps && r.iaps.length > 0
      ? JSON.stringify(r.iaps)
      : null
    await db.prepare(`
      UPDATE app_registry SET
        in_app_purchases = ?,
        iap_scraped_at = datetime('now'),
        iap_scrape_status = ?
      WHERE track_id = ?
    `).bind(iapJson, r.scrape_status, r.track_id).run()
  }
}

Example 3: Daily Data Quality Check with Alerts

/**
 * Daily data quality check — detect field coverage degradation.
 * Sends email alert if any field drops below threshold.
 */
async function cronDataQualityCheck(
  db: D1Database,
  resendApiKey?: string
): Promise<void> {
  const thresholds: Record<string, number> = {
    content_rating: 95,
    description: 90,
    developer: 90,
    languages: 85,
    screenshots: 80,
    similar: 70,
    histogram: 60,
  }

  const coverage = await db.prepare(`
    SELECT COUNT(*) as total,
      SUM(CASE WHEN content_rating IS NOT NULL THEN 1 ELSE 0 END)
        as content_rating,
      SUM(CASE WHEN description IS NOT NULL
        AND LENGTH(description) > 10 THEN 1 ELSE 0 END)
        as description,
      SUM(CASE WHEN developer_id IS NOT NULL THEN 1 ELSE 0 END)
        as developer
    FROM app_registry
    WHERE enrichment_source = 'scraper'
  `).first()

  const total = (coverage as any)?.total ?? 0
  if (total === 0) return

  const alerts: string[] = []
  for (const [field, threshold] of Object.entries(thresholds)) {
    const count = (coverage as any)?.[field] ?? 0
    const pct = Math.round((count / total) * 100)
    if (pct < threshold) {
      alerts.push(`${field}: ${pct}% (threshold: ${threshold}%)`)
    }
  }

  if (alerts.length === 0) {
    console.log('[quality] all fields healthy')
    return
  }

  console.warn(`[quality] DEGRADED: ${alerts.join(', ')}`)

  // Send alert email via Resend
  if (resendApiKey) {
    await fetch('https://api.resend.com/emails', {
      method: 'POST',
      headers: {
        Authorization: `Bearer ${resendApiKey}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        from: 'pipeline@alerts.example.com',
        to: ['team@example.com'],
        subject: `[Pipeline] Data quality degraded: ${alerts.length} fields`,
        text: `Fields below threshold:\n${alerts.join('\n')}`,
      }),
    })
  }
}

Example 4: Rating Velocity Computation

/**
 * Compute 7-day rating velocity for trend detection.
 * Compares current rating_count against snapshot from 3-10 days ago.
 */
async function computeRatingVelocity(db: D1Database): Promise<void> {
  const rows = await db.prepare(`
    SELECT s.track_id,
           s.rating_count as old_count,
           s.snapshot_at,
           ar.rating_count as current_count
    FROM app_snapshots s
    JOIN app_registry ar ON ar.track_id = s.track_id
    WHERE s.snapshot_at < datetime('now', '-3 days')
      AND s.id = (
        SELECT id FROM app_snapshots s2
        WHERE s2.track_id = s.track_id
          AND s2.snapshot_at >= datetime('now', '-10 days')
          AND s2.snapshot_at < datetime('now', '-3 days')
        ORDER BY s2.snapshot_at ASC
        LIMIT 1
      )
  `).all()

  const batch: D1PreparedStatement[] = []
  for (const r of rows.results as any[]) {
    const days = Math.max(
      (Date.now() - new Date(r.snapshot_at).getTime()) / 86400000,
      1
    )
    const velocity = Math.round(
      ((r.current_count - r.old_count) / days) * 100
    ) / 100

    batch.push(
      db.prepare(`
        UPDATE app_registry
        SET rating_velocity_7d = ?,
            rating_count_7d_ago = ?,
            rating_velocity_updated_at = datetime('now')
        WHERE track_id = ?
      `).bind(velocity, r.old_count, r.track_id)
    )

    if (batch.length >= 50) {
      await db.batch(batch.splice(0))
    }
  }

  if (batch.length > 0) await db.batch(batch)
  console.log(`[velocity] updated ${rows.results.length} apps`)
}

Example 5: Analytics Engine Instead of D1 for Metrics

/**
 * Write pipeline metrics to Analytics Engine instead of D1.
 *
 * Analytics Engine is free (100K writes/day, 10K reads/day).
 * D1 snapshots cost row reads on every insert AND every dashboard read.
 * This one change saved $19/month.
 */
function writeMetrics(
  metrics: AnalyticsEngineDataset,
  stats: CrawlStats
): void {
  metrics.writeDataPoint({
    blobs: [
      'pipeline',           // service name
      'cron',               // trigger type
      stats.phase,          // current work phase
    ],
    doubles: [
      stats.apps_discovered,
      stats.rankings_added,
      stats.keywords_searched,
      stats.categories_scanned,
    ],
    indexes: ['pipeline'],  // for efficient querying
  })
}

// Query later via Analytics Engine SQL API:
// SELECT blob2 as phase,
//        SUM(double1) as apps,
//        SUM(double2) as rankings
// FROM platform_metrics
// WHERE blob1 = 'pipeline'
//   AND timestamp > NOW() - INTERVAL '24' HOUR
// GROUP BY blob2

Example 6: Queue Message with Fan-Out Chain

/**
 * Emit events after clustering to trigger downstream agents.
 * The chain: clusters-ready → fragmentation → fragmentation-done → design
 */
async function emitClusterEvents(
  env: Env,
  clusterResult: { clusters_found: number }
): Promise<void> {
  if (clusterResult.clusters_found === 0) return

  const clusters = await env.DB.prepare(`
    SELECT id FROM market_clusters
    ORDER BY opportunity_score DESC
    LIMIT 200
  `).all()

  const ids = clusters.results.map(r => r.id as number)
  if (ids.length > 0) {
    await env.EVENTS.send({
      type: 'clusters-ready',
      cluster_ids: ids,
    })
  }
}

// In the clustering cron handler
if (event.cron === '0 */6 * * *') {
  const result = await computeClusters(env.DB)

  // Feedback loop: reprioritize crawl based on cluster value
  const reprioResult = await reprioritizeFromClusters(env.DB)
  console.log(
    `[clustering] ${result.clusters_found} clusters → ` +
    `reprioritized: ${reprioResult.apps_boosted} apps`
  )

  await emitClusterEvents(env, result)
}

Example 7: R2 as HTML Cache for Raw Scrape Data

/**
 * Cache scraped HTML pages in R2 with lifecycle management.
 * R2 costs ~$0.015/GB/month. A 30-day lifecycle rule auto-deletes stale pages.
 *
 * This serves two purposes:
 * 1. Avoid re-scraping the same page if enrichment fails and retries
 * 2. Provide raw data for debugging parser failures
 */
async function scrapeWithCache(
  id: number,
  htmlCache: R2Bucket
): Promise<string> {
  const cacheKey = `app/${id}/page.html`

  // Check R2 cache first
  const cached = await htmlCache.get(cacheKey)
  if (cached) {
    return await cached.text()
  }

  // Scrape the page
  const response = await fetch(
    `https://apps.apple.com/us/app/id${id}`,
    { headers: { 'User-Agent': 'Mozilla/5.0...' } }
  )

  if (!response.ok) {
    throw new Error(`Scrape failed: ${response.status}`)
  }

  const html = await response.text()

  // Store in R2 (lifecycle rule handles cleanup)
  await htmlCache.put(cacheKey, html, {
    httpMetadata: { contentType: 'text/html' },
    customMetadata: { scraped_at: new Date().toISOString() },
  })

  return html
}

Example 8: Reprioritization Feedback Loop

/**
 * Feedback loop: cluster quality feeds back into crawl priorities.
 * High-opportunity clusters get their apps and keywords boosted,
 * directing future crawl bandwidth toward the most valuable segments.
 */
async function reprioritizeFromClusters(
  db: D1Database
): Promise<{ apps_boosted: number; keywords_boosted: number }> {
  // Boost apps in top clusters
  const appResult = await db.prepare(`
    UPDATE app_registry SET enrich_priority = enrich_priority + 10
    WHERE track_id IN (
      SELECT ca.track_id FROM cluster_apps ca
      JOIN market_clusters mc ON mc.id = ca.cluster_id
      WHERE mc.opportunity_score > 70
    ) AND enrich_priority < 100
  `).run()

  // Boost keywords in top clusters
  const kwResult = await db.prepare(`
    UPDATE crawl_keywords SET priority = priority + 5
    WHERE keyword IN (
      SELECT ck.keyword FROM cluster_keywords ck
      JOIN market_clusters mc ON mc.id = ck.cluster_id
      WHERE mc.opportunity_score > 70
    ) AND priority < 50
  `).run()

  return {
    apps_boosted: appResult.meta.changes ?? 0,
    keywords_boosted: kwResult.meta.changes ?? 0,
  }
}

Data Model Design on D1

The Schema at Scale

The production pipeline grew to 64 tables across 36 migration files. Here’s how they decompose by pipeline stage:

Stage 1 — Crawl (8 tables)
├── app_registry          — 1.1M rows, the master entity table
├── crawl_keywords        — 180K keywords, status tracking
├── crawl_rankings        — 2.8M keyword×app ranking pairs
├── crawl_seeds           — manual seed keywords
├── crawl_log             — crawl cycle history
├── category_scans        — 93 category×collection combos
├── chart_positions       — current chart rank per app
└── chart_position_history — historical chart positions

Stage 2 — Enrich (5 tables)
├── developers            — 45K developer profiles
├── app_reviews           — 120K reviews with sentiment
├── app_version_history   — version release timelines
├── app_keyword_mentions  — keyword ↔ app association
└── (enrichment fields on app_registry)

Stage 3 — Evaluate (3 tables)
├── agent_ledger          — per-agent scoring records
├── agent_memory          — agent belief persistence
└── agent_core_beliefs    — core beliefs for each agent

Stage 4 — Cluster (6 tables)
├── market_clusters       — 450 market segments
├── cluster_apps          — app membership per cluster
├── cluster_keywords      — keyword membership per cluster
├── cluster_snapshots     — historical cluster state
├── cluster_lineages      — cluster identity tracking over time
└── cluster_relations     — inter-cluster relationships

Stage 5 — Opportunity (6 tables)
├── opportunity_evaluations — multi-agent evaluations
├── opportunity_reference_apps — reference apps per opportunity
├── opportunity_method_scores — marketing method viability
├── debate_reports         — AI debate transcripts
├── debate_messages        — extracted debate messages
└── market_tips            — synthesized market insights

Infrastructure (36 tables)
├── keyword_cache, app_cache     — D1-based query caching
├── tracked_keywords             — keywords under monitoring
├── keyword_snapshots            — daily keyword metrics
├── app_snapshots                — daily app metric snapshots
├── ranking_snapshots            — daily ranking snapshots
├── api_calls_ledger             — cost tracking per API call
├── pipelines, pipeline_steps    — pipeline execution tracking
├── dfs_keyword_cache            — DataForSEO response cache
├── users, signup_allowlist      — auth
├── scan_snapshots               — health snapshots (deprecated)
├── app_stars, app_lists         — user bookmarks
├── publisher_stars              — starred publishers
├── shared_reports, shared_report_access — report sharing
├── marketing_methods, case_studies      — intelligence
├── marketing_playbooks          — generated playbooks
├── research_tasks               — research queue
├── creators, founder_profiles   — people database
├── harness_cycles, harness_processor_log — DO harness state
├── waitlist, report_email_captures      — growth
└── app_reddit_mentions, market_reddit_intel — social intel

D1 Design Principles

1. One entity table, many attribute tables. app_registry is the single source of truth for app identity. All other tables reference it via track_id. This avoids the “which table has the latest data?” problem.

2. NULL means “not yet processed.” Enrichment fields on app_registry start as NULL. The enrichment controller queries for enriched_at IS NULL to find work. After enrichment, enriched_at is set to the current timestamp. This is simpler and faster than maintaining a separate work queue table.

-- Find unenriched apps: one indexed query
SELECT track_id FROM app_registry
WHERE enriched_at IS NULL
ORDER BY enrich_priority DESC, rating_count DESC
LIMIT 50;

3. ON CONFLICT for idempotent writes. Every INSERT uses ON CONFLICT to handle duplicates gracefully. The pipeline may discover the same app from category scans, keyword searches, similar-app expansion, and developer portfolio crawls. Each path upserts safely.

INSERT INTO app_registry (track_id, track_name, ...)
VALUES (?, ?, ...)
ON CONFLICT(track_id) DO UPDATE SET
  track_name = excluded.track_name,
  last_seen_at = datetime('now');

4. Timestamps for staleness, not booleans. Instead of is_enriched BOOLEAN, use enriched_at DATETIME. This lets you query for both “not yet enriched” (IS NULL) and “stale enrichment” (< datetime('now', '-14 days')) with the same column.

5. JSON columns for variable-width data. Rating histograms, in-app purchase lists, and language arrays are stored as JSON text. D1 (SQLite) handles json_extract() in queries, but you rarely need to query inside these columns — they’re mostly for display.

-- Example: find apps with specific IAP pricing
SELECT track_id, track_name
FROM app_registry
WHERE json_extract(in_app_purchases, '$[0].price') > 9.99;

6. Index strategy: query-driven, not table-driven. Create indexes only for columns that appear in WHERE clauses of hot queries. The most impactful indexes in the pipeline:

CREATE INDEX idx_registry_enrich ON app_registry(enriched_at, enrich_priority, rating_count);
CREATE INDEX idx_keywords_status ON crawl_keywords(status, priority, created_at);
CREATE INDEX idx_rankings_keyword ON crawl_rankings(keyword, track_id);
CREATE INDEX idx_category_priority ON category_scans(priority, scanned_at);

The 10 GB Ceiling

D1 databases are capped at 10 GB. At 6.4 GB after one week of crawling, the pipeline was on track to hit the ceiling within 10 days. The solutions:

  1. Lifecycle columns. Add last_seen_at to entity tables. Apps not seen in 90 days can be archived to R2 and deleted from D1.
  2. Truncate large text. Descriptions capped at 2000 chars. Reviews capped at 2000 chars. Release notes capped at 2000 chars. This alone saved ~1.5 GB.
  3. Prune history tables. Chart position history, ranking snapshots, and app snapshots grow linearly with time. Prune entries older than 90 days.
  4. Horizontal sharding. D1 supports thousands of databases at no extra cost — pricing is usage-based, not instance-based. Shard by entity type: one DB for app data, one for rankings, one for clusters.

The Real-World Cost Story

The $19.63 Bill

Here’s the exact breakdown of what happened:

ResourceUsageCost
D1 row reads19.6 billion$19.63
D1 row writes82 million~$0.82
Workers requests2.1 millionIncluded
R2 storage1.2 GB~$0.02
Queues operations340KIncluded
DO duration~45 hours~$0.56
Total~$21.03

The D1 row reads dominated because of one function: computeAndStoreHealthSnapshot(). It ran 21 COUNT(*) queries every minute on tables with 100K-1M rows:

// This function ran 1,440 times per day for 6 days
// 21 COUNT queries × ~100K-1M rows scanned each
// = 21 × ~500K avg × 1,440 × 6 = ~90 billion row scans
// D1 charges $1 per billion row reads → $19.63

const [totalApps, enrichedApps, pendingApps, totalKeywords,
       pendingKeywords, totalClusters, totalReviews,
       appsWithReviews, agentStats,
       appsLastHour, appsLast24h, keywordsLastHour,
       keywordsLast24h, enrichedLastHour, enrichedLast24h,
       reviewsLast24h, costTotal, costToday, costLast24h,
       costByService, costByHour] = await Promise.all([
  db.prepare('SELECT COUNT(*) as n FROM app_registry').first(),
  db.prepare('SELECT COUNT(*) as n FROM app_registry WHERE ...').first(),
  // ... 19 more queries
])

The worst part: nobody was reading the dashboard. The health snapshots were written to a scan_snapshots table that no user or system ever queried. The pipeline burned $19.63/month producing data that served no purpose.

After the Fix

ChangeBeforeAfterSavings
Cron interval* * * * **/5 * * * *5x fewer invocations
Health queries21 COUNT(*) per cycle3 cheap checks per cycle~7x fewer row reads
Metrics storageD1 scan_snapshotsAnalytics EngineD1 reads → $0
Exit conditionNoneReturn if no workVariable (0-100% cycles saved)
Estimated monthly cost$19.63~$0.5097% reduction

Cost Model for Cloudflare Primitives

PrimitiveFree TierPaid RateCost Driver
Workers100K req/day$0.30/M requestsRequest count
D15M reads/day$0.001/M reads (25B included)Row reads (scans)
R210M reads/mo$0.36/M readsObject operations
Queues1M ops/mo$0.40/M opsMessages × size
Durable ObjectsIncluded$12.50/M requests + durationRequest count + wall time
Analytics Engine100K writes/dayCurrently freeData points written
WorkflowsIncludedCurrently freeStep executions

Key insight: D1 row reads are the dominant cost driver for data pipelines. A single COUNT(*) on a 1M-row table consumes 1M row reads. Run that every minute and you hit 1.44 billion reads per day — $1.44/day just from one query. The fix is always the same: don’t scan large tables in hot loops. Use indexed lookups, cached counters, or Analytics Engine instead.


Cron-Driven vs. DO-Driven Execution

Choosing between cron triggers and Durable Object alarms is the most consequential architectural decision in an autonomous pipeline. Here’s a framework for deciding.

When to Use Cron

PropertyCron is right when…
CadenceFixed interval is acceptable (every 5 min, hourly, daily)
StateNo state needed between invocations
Rate limitingStatic or absent (you control the pace)
CostEach invocation is cheap and predictable
FailureMissed invocation is not critical (next one catches up)
ExampleCategory scanning, clustering, daily snapshots
// wrangler.jsonc
{
  "triggers": {
    "crons": [
      "*/5 * * * *",   // Crawl: fixed 5-minute cadence
      "0 */6 * * *",   // Cluster: every 6 hours
      "0 6 * * *"      // Snapshots: daily at 6 AM UTC
    ]
  }
}

When to Use Durable Object Alarms

PropertyDO alarm is right when…
CadenceMust adapt to external conditions
StateNeeds metrics, cooldown timers, config between invocations
Rate limitingDynamic (external API throttles unpredictably)
CostVariable per invocation, needs budget tracking
FailureMust enter cooldown/backoff mode on failure
ExampleEnrichment, external API scraping, LLM calls
// DO alarms schedule the next invocation dynamically
override async alarm(): Promise<void> {
  // ... do work ...
  // Schedule next based on current conditions
  const nextInterval = this.state.current_interval_ms
  await this.ctx.storage.setAlarm(Date.now() + nextInterval)
}

Comparison: Cron vs. DO Alarm vs. Queue

DimensionCronDO AlarmQueue
TriggerTime-basedSelf-scheduledEvent-based
StateStatelessStateful (persistent)Stateless
CadenceFixedAdaptiveOn-demand
ConcurrencyOne per cronOne per DOmax_batch_size
RetryNext cron cycleAutomatic (6 retries, exponential)Automatic (3 retries)
CostRequest cost onlyRequest + duration + storage$0.40/M operations
Best forRegular pollingAdaptive rate controlReactive fan-out
Worst forVariable-rate workSimple periodic tasksRegular scheduling

The Hybrid Approach

The production pipeline uses all three:

┌─────────────────────────────────────────────────────────┐
│                    Cron (*/5 * * * *)                     │
│  ┌──────────────┐                                        │
│  │ runCrawlCycle │ ──writes──→ D1: app_registry          │
│  └──────┬───────┘              (enriched_at IS NULL)     │
│         │                              │                  │
│         │ emits events                 │ reads            │
│         ▼                              ▼                  │
│  ┌──────────────┐        ┌───────────────────────────┐   │
│  │ Queue: EVENTS │        │ DO: EnrichmentController  │   │
│  └──────┬───────┘        │ alarm() every 10-120s      │   │
│         │                 │ adaptive batch + interval  │   │
│         │ triggers        └───────────────────────────┘   │
│         ▼                                                 │
│  ┌──────────────────────────────┐                        │
│  │ Queue Consumer               │                        │
│  │ app-enriched → competitiveness│                        │
│  │ rankings-updated → demand     │                        │
│  │ clusters-ready → fragmentation│                        │
│  └──────────────────────────────┘                        │
│                                                           │
│                    Cron (0 */6 * * *)                     │
│  ┌────────────────┐                                      │
│  │ computeClusters │ ──emits──→ clusters-ready event     │
│  └────────────────┘                                      │
│                                                           │
│                    Cron (0 6 * * *)                       │
│  ┌────────────────────────┐                              │
│  │ dailySnapshots + quality│                              │
│  └────────────────────────┘                              │
└─────────────────────────────────────────────────────────┘

Comparisons

Cloudflare Workers Pipeline vs. Traditional Approaches

DimensionCloudflare Workers PipelineAWS Step Functions + LambdaGCP Cloud Tasks + Cloud RunSelf-Hosted (Node + Postgres + Redis)
Cold start<5ms (V8 isolates)100ms-2s (containers)100ms-1s (containers)0ms (always running)
Scale to zeroYes (true zero cost at idle)Yes (Lambda scales to zero)Yes (Cloud Run min 0)No ($30-100/mo minimum)
Orchestration cost$0 (cron + queues included)$0.025/1K state transitions$0.10/job/month + task costs$0 (your code)
DatabaseD1 (SQLite, 10 GB cap, $0.001/M reads)DynamoDB ($1.25/M reads) or RDS ($15+/mo)Cloud SQL ($7+/mo) or FirestorePostgreSQL (self-managed)
QueueQueues ($0.40/M ops, 1M free)SQS ($0.40/M messages)Pub/Sub ($0.40/M messages)Redis/BullMQ (self-managed)
Blob storageR2 ($0.015/GB/mo, free egress)S3 ($0.023/GB + egress fees)GCS ($0.020/GB + egress)Local disk or S3
Stateful coordinationDurable Objects (built-in)Step Functions activitiesN/A (must build with Cloud Tasks)Redis + custom logic
Durable executionWorkflows (built-in, free)Step Functions (built-in)Cloud Workflows ($0.01/1K steps)Temporal.io ($25+/mo)
Deploy complexitywrangler deploy (one command)CloudFormation/SAM/CDKTerraform + gcloud CLIDocker + k8s + CI/CD
IAM / authNone (bindings are implicit)IAM policies (per-resource)IAM + service accountsSelf-managed
Global distribution300+ locations (automatic)Regional (manual multi-region)Regional (manual)Single region (manual)
Max execution time30s (Worker), unlimited (DO/Workflow)5 min (Lambda), 1 year (Step Functions)60 min (Cloud Run)Unlimited
Vendor lock-inMedium (D1=SQLite, R2=S3-compatible)High (DynamoDB, Step Functions)High (Firestore, Cloud Tasks)Low (standard stack)

Cost Comparison: 1M Items Through 5 Stages

Assuming 1M entities processed through 5 pipeline stages, with each stage doing 3 DB reads, 1 DB write, and 1 queue message:

ComponentCloudflareAWSGCP
Orchestration$0$125 (5M transitions)$50 (5M task + scheduler)
Compute~$1.50 (5M requests)~$1.00 (Lambda)~$2.00 (Cloud Run)
Database reads~$15 (15M reads)~$18.75 (DynamoDB)~$5.40 (Firestore)
Database writes~$1 (5M writes)~$6.25 (DynamoDB)~$4.50 (Firestore)
Queue~$2 (5M messages)~$2 (SQS)~$2 (Pub/Sub)
Total~$19.50~$153~$63.90

The Cloudflare advantage is stark for orchestration-heavy workloads. The savings come from three places: zero orchestration cost (cron + queues are included/cheap), zero egress fees (R2), and D1’s aggressive pricing for reads (25B included in the $5/mo plan).

Key insight: Cloudflare is cheapest for pipelines that are orchestration-heavy and compute-light. If your pipeline is CPU-heavy (ML inference, video processing), the 30-second Worker CPU limit becomes the bottleneck, and AWS Lambda or Cloud Run with longer timeouts may be more practical.

When NOT to Use Cloudflare

ScenarioBetter AlternativeWhy
>10 GB per databasePostgres (Neon, Supabase)D1’s 10 GB cap is hard
CPU-intensive steps (>30s)AWS Lambda (15 min) or Cloud Run (60 min)Workers cap at 30s CPU
Strong consistency requiredDynamoDB or Cloud SpannerD1 is eventually consistent across regions
Existing AWS/GCP investmentStay on your cloudMigration cost exceeds savings
Team needs SQL JOINs at scalePostgreSQLD1 is single-threaded SQLite
Real-time streamingKafka, KinesisQueues is batch-oriented, not streaming

Deploy Readiness

The $19.63 incident produced a formal deploy readiness standard that now gates every pipeline deployment. Here are the rules, born from pain.

The Deploy Readiness Checklist

Before any autonomous pipeline goes to production:

[ ] Smoke test: at least one route check in the smoke test suite
[ ] Cost ceiling: CF notification rule OR self-check in cron handler
[ ] Exit conditions: every cron handler returns early when no work exists
[ ] Team ownership: assigned to a team with labeled issues
[ ] First-deploy log: initial deployment logged with crons, expected cost
[ ] Cron justification: no * * * * * without documented rationale
[ ] Output validation: full pipeline tested end-to-end before scaling input
[ ] COUNT(*) caching: unbounded counts on >10K row tables are cached
[ ] Monitoring proportionality: snapshot frequency matches actual audience
[ ] Cron interval documented: rationale in wrangler.jsonc comments

Smoke Test Pattern

Every deployed Worker gets a health route and a smoke test entry:

// In the Worker
app.get('/health', (c) => {
  return c.json({
    status: 'ok',
    service: 'pipeline-api',
    timestamp: new Date().toISOString(),
  })
})
#!/bin/bash
# .tools/smoke-test.sh

check() {
  local name="$1" url="$2"
  local status=$(curl -s -o /dev/null -w "%{http_code}" "$url")
  if [ "$status" = "200" ]; then
    echo "PASS  $name"
  else
    echo "FAIL  $name (HTTP $status)"
    FAILURES=$((FAILURES + 1))
  fi
}

FAILURES=0

check "pipeline health" \
  "https://pipeline-api.example.workers.dev/health"

check "pipeline crawl status" \
  "https://pipeline-api.example.workers.dev/v1/crawl/status"

check "pipeline enrichment status" \
  "https://pipeline-api.example.workers.dev/v1/enrichment/status"

if [ $FAILURES -gt 0 ]; then
  echo ""
  echo "$FAILURES checks failed"
  exit 1
fi

Cost Ceiling Pattern

Self-monitoring in the cron handler:

async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
  // Cost ceiling: check estimated D1 reads so far this billing cycle
  const usage = await env.DB.prepare(`
    SELECT COUNT(*) as cycle_invocations
    FROM pipeline_log
    WHERE created_at >= date('now', 'start of month')
  `).first<{ cycle_invocations: number }>()

  const estimatedReads = (usage?.cycle_invocations ?? 0) * 5 * 100_000
  const CEILING_READS = 5_000_000_000 // 5B reads = $5

  if (estimatedReads > CEILING_READS) {
    console.warn(`[cost] Estimated ${estimatedReads} reads exceeds ceiling`)
    return // Skip this cycle
  }

  // ... proceed with work
}

Output Validation Before Input Scaling

The pipeline’s most insidious failure mode: the input stages (crawling, enrichment) work perfectly, but the output stages (clustering, opportunity analysis) are disconnected or broken. You accumulate millions of rows of raw data that never produce user-visible output.

Rule: Validate the full pipeline end-to-end before enabling autonomous input. Manually trigger each stage in sequence: crawl 10 items → enrich them → evaluate them → cluster them → generate opportunities. If any stage produces empty output, fix it before enabling the cron.

// Integration test: verify the full pipeline chain
app.get('/v1/pipeline-test', async (c) => {
  const db = c.env.DB

  // Stage 1: Check crawl output
  const apps = await db.prepare(
    'SELECT COUNT(*) as n FROM app_registry'
  ).first<{ n: number }>()

  // Stage 2: Check enrichment output
  const enriched = await db.prepare(
    'SELECT COUNT(*) as n FROM app_registry WHERE enrichment_source = ?'
  ).bind('scraper').first<{ n: number }>()

  // Stage 3: Check evaluation output
  const evaluated = await db.prepare(
    'SELECT COUNT(*) as n FROM agent_ledger'
  ).first<{ n: number }>()

  // Stage 4: Check cluster output
  const clusters = await db.prepare(
    'SELECT COUNT(*) as n FROM market_clusters'
  ).first<{ n: number }>()

  // Stage 5: Check opportunity output
  const opportunities = await db.prepare(
    'SELECT COUNT(*) as n FROM opportunity_evaluations'
  ).first<{ n: number }>()

  const pipeline = {
    crawl: { count: apps?.n ?? 0, healthy: (apps?.n ?? 0) > 0 },
    enrich: { count: enriched?.n ?? 0, healthy: (enriched?.n ?? 0) > 0 },
    evaluate: { count: evaluated?.n ?? 0, healthy: (evaluated?.n ?? 0) > 0 },
    cluster: { count: clusters?.n ?? 0, healthy: (clusters?.n ?? 0) > 0 },
    opportunity: {
      count: opportunities?.n ?? 0,
      healthy: (opportunities?.n ?? 0) > 0,
    },
    pipeline_healthy: [apps, enriched, evaluated, clusters, opportunities]
      .every(r => (r?.n ?? 0) > 0),
  }

  return c.json(pipeline)
})

Anti-Patterns

Don’tDo InsteadWhy
* * * * * by default*/5 * * * * or slowerEvery-minute crons burn D1 reads even when idle
21 COUNT(*) every minute on million-row tablesAnalytics Engine for metrics, cached counters for dashboardsEach COUNT(*) scans the full table — 1M reads per query
Deploy and forgetTeam label + train slot + smoke testUnmonitored Workers accumulate invisible costs
Scale input before validating outputE2E test: crawl → enrich → cluster → opportunityMillions of rows with zero product output = waste
Fixed batch size for external API callsAdaptive DO controller with success-rate feedbackExternal APIs throttle unpredictably — fixed rates either under-utilize or get blocked
Store health metrics in D1Write to Analytics EngineAnalytics Engine is free; D1 reads cost money
db.prepare() inside a loopPrepare once, .bind() per iterationPreparation has overhead; binding is cheap
Single db.batch() with 200+ statementsChunk into batches of 100D1 batch limit is 100 statements
Use booleans for processing stateUse nullable timestamps (enriched_at DATETIME)Timestamps enable both “not done” (IS NULL) and “stale” (< datetime(...)) queries
Query for count when you need existenceAdd LIMIT 1 to existence checksStops scanning after first match (1 read vs. 1M reads)
Run enrichment from both cron and DOPick one controller (DO) and disable the otherDual execution doubles API load → rate limit bans
Assume queue messages arrive in orderDesign handlers that query current D1 stateQueues provide at-least-once delivery, not ordering
Put full data objects in queue messagesPut IDs in messages, read data from D1Messages should be <10KB; D1 is the source of truth
Track API costs but ignore platform costsMonitor D1 reads, R2 ops, DO duration in CF dashboardPlatform costs can exceed API costs (the $19.63 lesson)
Use idFromString for singleton DOsUse idFromName('main') for singletonsDifferent IDs create competing controllers

References

Official Cloudflare Documentation

Cloudflare Blog Posts

Comparison Resources

Architecture References


Built from production code running an autonomous App Store intelligence pipeline on Cloudflare Workers. The pipeline discovered 1.1 million apps across 31 categories, enriched them with metadata from 4 data sources, clustered them into 450 market segments, and ran multi-agent AI analysis on the top opportunities — all on a single Worker with D1, Queues, Durable Objects, R2, and Workflows. Total platform cost after optimization: under $1/month.


Edit page
Share this post on:

Previous Post
The Recurring Run Review
Next Post
Durable Object Patterns on Cloudflare Workers