Skip to content
Gary Wu
Go back

Worker Analytics — Lightweight Monitoring for Continuous Pipelines

Edit page

Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28


Worker Analytics — Lightweight Monitoring for Continuous Cloudflare Workers Pipelines

When you run continuous processes on Cloudflare Workers — cron jobs crawling data every minute, queue consumers enriching records, Durable Objects coordinating agent loops — you need to know what is happening. How many items processed? What is the error rate? How much are API calls costing? Is throughput increasing or flatlined? The naive answer is to write monitoring data to your D1 database. That answer will cost you $19.63 in a single billing cycle when your 21 COUNT(*) queries run every minute against a 6.4 GB database and generate 19.6 billion row reads.

This article shows how to build operational monitoring for Workers pipelines that costs essentially nothing. The core idea: use each Cloudflare storage primitive for what it is good at. D1 for transactional data. KV for operational snapshots. Analytics Engine for long-term time-series. Never COUNT(*) on a large table from a cron.

What you’ll learn:


Table of Contents

Open Table of Contents

The Problem

Continuous Workers Need Monitoring

A traditional request-response Worker is simple to monitor. It handles a request, returns a response, and Cloudflare’s built-in analytics show you request counts, error rates, and CPU time. You look at the dashboard, everything is fine.

But modern Workers architectures are not request-response. They are continuous systems:

These systems run 24/7. They process thousands of items per day. They call external APIs that cost money. They accumulate data in D1 databases that grow from megabytes to gigabytes. And Cloudflare’s built-in Worker analytics tell you almost nothing about what they are actually doing.

The built-in dashboard shows:

You need application-level monitoring. You need to know the shape of your pipeline’s work over time.

The Naive Approach Fails

The obvious solution: add monitoring queries to your cron. Every time the cron fires, run some COUNT(*) queries against your D1 tables, build a health object, and store it somewhere. You want to know how many apps you have, how many are enriched, how many keywords are pending, what the API cost trend looks like.

// The approach that seems reasonable but isn't
async function computeHealth(db: D1Database): Promise<HealthSnapshot> {
  const [totalApps, enrichedApps, pendingApps, totalKeywords] = await Promise.all([
    db.prepare('SELECT COUNT(*) as n FROM app_registry').first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE enrichment_source = 'scraper'").first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM app_registry WHERE enriched_at IS NULL').first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM crawl_keywords').first<{ n: number }>(),
    // ... 17 more queries
  ]);

  return {
    apps: { total: totalApps?.n ?? 0, enriched: enrichedApps?.n ?? 0 },
    // ...
  };
}

This looks fine. Each query is simple. They run in parallel. The results are useful. What could go wrong?

Everything, once the database grows.

D1’s Row Read Model

D1 charges by rows read, not by queries executed. A COUNT(*) on a table with 100 rows reads 100 rows. A COUNT(*) on a table with 500,000 rows reads 500,000 rows. There is no shortcut — D1 (SQLite under the hood) does a full table scan for COUNT(*) unless you have a covering index that matches the WHERE clause.

The math gets ugly fast:

A single week of monitoring can consume 80% of your monthly row read allowance. Two weeks and you are paying overages.

Key insight: COUNT(*) is not free on D1. Every row in the table is a row read. Running COUNT(*) on a cron is a time bomb that detonates proportionally to your data growth.

What We Actually Need

The monitoring system for a Workers pipeline needs to answer five questions:

  1. Is it running? — When was the last successful cycle? Is the cron firing?
  2. What is the throughput? — How many items processed in the last hour, last 24 hours?
  3. What are the costs? — How much API spend today? What is the burn rate?
  4. Are there errors? — Which agents failed? Which stages are stuck?
  5. What is the trend? — Is throughput increasing or decreasing over the last 24 hours?

None of these questions require full table scans. Most of them can be answered from a precomputed snapshot. And the snapshot itself does not need to live in D1.


The Incident That Started Everything

This is the short version. The full incident analysis is in Cost Observability for Cloudflare Workers.

What Happened

A project tracking App Store rankings had a Worker with this cron configuration:

// wrangler.jsonc
{
  "triggers": {
    "crons": ["* * * * *"]  // Every minute
  }
}

The cron handler did two things:

  1. Useful work — crawl app store categories, discover new apps, check for pending keywords
  2. Health snapshot — run 21 COUNT(*) queries across app_registry, crawl_keywords, app_reviews, agent_ledger, and api_calls_ledger, then write results to a scan_snapshots table in D1

The health snapshot ran every single minute. The database grew to 6.4 GB with over 500,000 rows across tables. After six days:

The Root Cause

The monitoring system was more expensive than the system it was monitoring. The health snapshot function was doing full table scans on every invocation:

// Each of these is a full table scan on a table with 100,000+ rows
db.prepare('SELECT COUNT(*) as n FROM app_registry')           // 160,000 rows read
db.prepare('SELECT COUNT(*) as n FROM crawl_keywords')          // 45,000 rows read
db.prepare('SELECT COUNT(*) as n FROM app_reviews')             // 280,000 rows read
db.prepare('SELECT COUNT(*) as n FROM api_calls_ledger')        // 50,000 rows read
// ... plus 17 more queries with various WHERE clauses

Running 21 queries that scan ~2 million rows, 1,440 times per day, for six days = ~17 billion row reads from monitoring alone.

The Fix

Two changes eliminated the problem:

  1. Reduce frequency — Health snapshots every 5 minutes instead of every minute (12x reduction)
  2. Move storage to KV — Write the snapshot to KV instead of D1. Read from KV for the dashboard. Zero additional D1 reads for serving the monitoring data.

The D1 queries still run to compute the snapshot, but 288 times per day instead of 1,440. And the computed result lives in KV where reading it costs $0.20 per million reads instead of scanning the entire database again.


Core Concepts

Storage Primitives for Monitoring

Cloudflare gives you three storage primitives relevant to monitoring, and each has a different cost model and access pattern.

interface StoragePrimitive {
  name: 'D1' | 'KV' | 'AnalyticsEngine';
  bestFor: string;
  costModel: string;
  readLatency: string;
  writeLatency: string;
  queryCapability: string;
}

const primitives: StoragePrimitive[] = [
  {
    name: 'D1',
    bestFor: 'Transactional data — app records, keywords, user accounts',
    costModel: 'Per row read/written — $0.001/M reads, $1.00/M writes',
    readLatency: '~5-20ms (SQLite)',
    writeLatency: '~10-30ms',
    queryCapability: 'Full SQL — joins, aggregates, window functions',
  },
  {
    name: 'KV',
    bestFor: 'Key-value lookups — config, snapshots, cached results',
    costModel: 'Per operation — $0.20/M reads, $1.00/M writes',
    readLatency: '~1-5ms (edge cached)',
    writeLatency: '~1-5ms (eventually consistent, ~60s propagation)',
    queryCapability: 'None — get by key only, list by prefix',
  },
  {
    name: 'AnalyticsEngine',
    bestFor: 'Time-series metrics — counters, gauges, histograms',
    costModel: '$0.25/M data points written, $1.00/M queries',
    readLatency: '~50-200ms (SQL API over HTTP)',
    writeLatency: '~0ms (fire-and-forget, no await needed)',
    queryCapability: 'SQL subset — aggregates, time bucketing, no joins',
  },
];

Key insight: The right storage for monitoring data depends on the access pattern. If you are writing metrics and reading dashboards, you want KV or Analytics Engine. D1 is for your application data, not your monitoring data.

The Snapshot Pattern

The core pattern in this article is the precomputed snapshot. Instead of computing monitoring data on every dashboard load (read-time computation), you compute it on a schedule and store the result (write-time computation).

interface HealthSnapshot {
  taken_at: string;  // ISO timestamp
  health: {
    counts: Record<string, number>;       // Current totals
    velocity: Record<string, number>;     // Items per time window
    costs: {
      today: { usd: number; calls: number };
      rate: { per_hour: number; per_day_estimate: number };
    };
    agents: Record<string, AgentStatus>;  // Agent-specific stats
    errors: { last_hour: number; last_24h: number };
  };
}

// Write-time: compute once, store cheaply
// Read-time: return instantly from KV

The tradeoff is freshness. A snapshot computed every 5 minutes means your dashboard data can be up to 5 minutes stale. For operational monitoring of a data pipeline, this is fine. You do not need sub-second freshness for “how many apps have been enriched.”

KV As a Metrics Store

KV is not designed to be a metrics store. It has no query language, no aggregation, no time bucketing. But for the specific use case of “store the latest health snapshot and serve it to a dashboard,” it is perfect:

// Writing a snapshot to KV — 1 write operation
await kv.put('health:latest', JSON.stringify(snapshot));

// Reading a snapshot from KV — 1 read operation
const snapshot = await kv.get<HealthSnapshot>('health:latest', 'json');

Compare this with the D1 approach: reading the same data from D1 means re-running 21 queries that scan millions of rows. The KV approach costs one read operation. Period.

Analytics Engine As a Time-Series Store

Workers Analytics Engine is Cloudflare’s built-in time-series database. It was designed for exactly this use case: recording events from Workers and querying them over time.

// Writing a data point — fire and forget, no await needed
env.METRICS.writeDataPoint({
  blobs: ['enrichment', 'app_registry', 'success'],  // dimensions
  doubles: [1, 0.023, 45],                             // count, cost_usd, duration_ms
  indexes: ['aso-mrr'],                                 // sampling key
});

Analytics Engine gives you:

The limitation is that you cannot query Analytics Engine from a binding. You must call the SQL API over HTTP, which means an external fetch with authentication.


Pattern 1: KV Rolling Snapshots

The Pattern

Store health data in two KV keys:

  1. health:latest — The most recent snapshot, overwritten every cycle
  2. health:snapshots — A rolling array of recent snapshots, trimmed to a fixed window

This gives you both “what is the current state?” (latest) and “what has the trend been?” (snapshots array) without any D1 reads on the dashboard side.

When to Use It

The Write Side

This is the function that runs on the cron. It computes the snapshot from D1 (this is the only place D1 reads happen for monitoring), then writes the result to KV.

interface Env {
  DB: D1Database;
  KV: KVNamespace;
}

interface HealthData {
  counts: {
    total_items: number;
    enriched_items: number;
    pending_items: number;
    total_keywords: number;
  };
  velocity: {
    items_last_hour: number;
    items_last_24h: number;
    enriched_last_hour: number;
    enriched_last_24h: number;
    est_days_to_completion: number | null;
  };
  costs: {
    today_usd: number;
    today_calls: number;
    last_24h_usd: number;
    rate_per_hour: number;
    rate_per_day_estimate: number;
    by_service: Record<string, { usd: number; calls: number }>;
  };
  errors: {
    last_hour: number;
    last_24h: number;
  };
}

interface Snapshot {
  health: HealthData;
  taken_at: string;
}

async function computeAndStoreHealthSnapshot(
  db: D1Database,
  kv: KVNamespace,
): Promise<HealthData> {
  // ── Compute the snapshot from D1 ──────────────────────────────
  // This is the ONLY place we run COUNT(*) queries.
  // It runs every 5 minutes, not every minute.
  // It runs on the cron, not on dashboard load.

  const [
    totalItems,
    enrichedItems,
    pendingItems,
    totalKeywords,
    itemsLastHour,
    itemsLast24h,
    enrichedLastHour,
    enrichedLast24h,
    costToday,
    costLast24h,
    costByService,
    errorsLastHour,
    errorsLast24h,
  ] = await Promise.all([
    db.prepare('SELECT COUNT(*) as n FROM items').first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'enriched'")
      .first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'pending'")
      .first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM keywords').first<{ n: number }>(),
    db.prepare(
      "SELECT COUNT(*) as n FROM items WHERE created_at >= datetime('now', '-1 hour')",
    ).first<{ n: number }>(),
    db.prepare(
      "SELECT COUNT(*) as n FROM items WHERE created_at >= datetime('now', '-24 hours')",
    ).first<{ n: number }>(),
    db.prepare(
      "SELECT COUNT(*) as n FROM items WHERE enriched_at >= datetime('now', '-1 hour')",
    ).first<{ n: number }>(),
    db.prepare(
      "SELECT COUNT(*) as n FROM items WHERE enriched_at >= datetime('now', '-24 hours')",
    ).first<{ n: number }>(),
    db.prepare(
      "SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls WHERE created_at >= date('now')",
    ).first<{ cost: number; calls: number }>(),
    db.prepare(
      "SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls WHERE created_at >= datetime('now', '-24 hours')",
    ).first<{ cost: number; calls: number }>(),
    db.prepare(`
      SELECT service,
             COALESCE(SUM(cost_usd), 0) as cost,
             COUNT(*) as calls
      FROM api_calls
      WHERE created_at >= date('now')
      GROUP BY service
      ORDER BY cost DESC
    `).all(),
    db.prepare(
      "SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now', '-1 hour')",
    ).first<{ n: number }>(),
    db.prepare(
      "SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now', '-24 hours')",
    ).first<{ n: number }>(),
  ]);

  const last24hCost = costLast24h?.cost ?? 0;
  const avgCostPerHour = last24hCost / 24;

  const health: HealthData = {
    counts: {
      total_items: totalItems?.n ?? 0,
      enriched_items: enrichedItems?.n ?? 0,
      pending_items: pendingItems?.n ?? 0,
      total_keywords: totalKeywords?.n ?? 0,
    },
    velocity: {
      items_last_hour: itemsLastHour?.n ?? 0,
      items_last_24h: itemsLast24h?.n ?? 0,
      enriched_last_hour: enrichedLastHour?.n ?? 0,
      enriched_last_24h: enrichedLast24h?.n ?? 0,
      est_days_to_completion:
        (enrichedLast24h?.n ?? 0) > 0
          ? Math.round((pendingItems?.n ?? 0) / (enrichedLast24h?.n ?? 1))
          : null,
    },
    costs: {
      today_usd: round4(costToday?.cost ?? 0),
      today_calls: costToday?.calls ?? 0,
      last_24h_usd: round4(last24hCost),
      rate_per_hour: round4(avgCostPerHour),
      rate_per_day_estimate: round4(avgCostPerHour * 24),
      by_service: Object.fromEntries(
        costByService.results.map((r) => [
          r.service as string,
          { usd: round4(r.cost as number), calls: r.calls as number },
        ]),
      ),
    },
    errors: {
      last_hour: errorsLastHour?.n ?? 0,
      last_24h: errorsLast24h?.n ?? 0,
    },
  };

  // ── Write to KV ───────────────────────────────────────────────

  const now = new Date().toISOString();
  const snapshot: Snapshot = { health, taken_at: now };

  // Latest — always overwrite
  await kv.put('health:latest', JSON.stringify(snapshot));

  // Rolling history — keep last 288 entries (~24h at 5-min intervals)
  const existing = await kv.get<Snapshot[]>('health:snapshots', 'json');
  const history = existing ?? [];
  history.push(snapshot);

  // Trim to window
  while (history.length > 288) history.shift();
  await kv.put('health:snapshots', JSON.stringify(history));

  return health;
}

function round4(n: number): number {
  return Math.round(n * 10000) / 10000;
}

The Cron Handler

The snapshot computation runs inside the cron handler, but not on every tick. Every 5 minutes is enough for operational monitoring.

export default {
  async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
    if (event.cron === '* * * * *') {
      // Useful work — runs every minute
      ctx.waitUntil(runCrawlCycle(env));

      // Health snapshot — runs every 5 minutes
      const minute = new Date().getUTCMinutes();
      if (minute % 5 === 0) {
        ctx.waitUntil(
          computeAndStoreHealthSnapshot(env.DB, env.KV).catch((e) =>
            console.error('[cron] health snapshot failed:', e),
          ),
        );
      }
    }
  },

  fetch: app.fetch,
};

Why 288 Entries

The magic number 288 comes from 24 hours divided by 5-minute intervals:

24 hours * 60 minutes / 5-minute interval = 288 snapshots

This gives you a complete 24-hour rolling window. Each snapshot is roughly 1-3 KB of JSON, so the full array is 300-900 KB — well within KV’s 25 MB value limit.

If you want 48 hours of history, use 576. If you want a week, use 2,016. The limiting factor is the KV value size, not the cost.

Gotchas

  1. KV eventual consistency — KV writes propagate globally in roughly 60 seconds. If you have dashboards in multiple regions, a snapshot written in US-East might take a minute to be visible in EU-West. For operational monitoring, this is irrelevant.

  2. JSON array manipulation — Reading the full health:snapshots array, appending to it, and writing it back is not atomic. If two cron invocations overlap (unlikely at 5-minute intervals, but possible), you could lose a snapshot. This does not matter for monitoring data.

  3. KV value size limit — A single KV value can be up to 25 MB. With 3 KB snapshots, you can store ~8,000 entries (about 28 days at 5-minute intervals). If your snapshot JSON is large, keep the window smaller.

  4. No query capability — You cannot ask KV “what was the average throughput between 2pm and 4pm yesterday.” The snapshots array is just JSON — you filter it client-side or in your Worker route. For more sophisticated queries, use Analytics Engine.


Pattern 2: Health Snapshot Design

The Pattern

Design your health snapshot to capture everything a human operator needs to assess pipeline health in a single read. Structure it as a JSON object with predictable sections: counts, velocity, costs, agents, errors.

When to Use It

Snapshot Shape

The health snapshot from a real production system (the ASO MRR pipeline) includes these sections:

interface ProductionHealthSnapshot {
  // ── Inventory ─────────────────────────────────────────────────
  apps: {
    total: number;
    enriched: number;
    pending: number;
    pct_enriched: number;  // Precomputed percentage
  };
  keywords: {
    total: number;
    pending: number;
  };
  clusters: number;
  reviews: {
    total: number;
    apps_covered: number;
  };

  // ── Velocity ──────────────────────────────────────────────────
  velocity: {
    apps_last_hour: number;
    apps_last_24h: number;
    keywords_last_hour: number;
    keywords_last_24h: number;
    enriched_last_hour: number;
    enriched_last_24h: number;
    reviews_last_24h: number;
    est_days_to_full_enrichment: number | null;  // null if enrichment is paused
  };

  // ── Agent Status ──────────────────────────────────────────────
  agents: Record<string, {
    stage: string;
    processed: number;
    last_run: string;
    avg_duration_ms: number;
    error_rate: number;
  }>;

  // ── Costs ─────────────────────────────────────────────────────
  costs: {
    all_time: { usd: number; calls: number };
    today: { usd: number; calls: number };
    last_24h: { usd: number; calls: number };
    rate: {
      per_hour: number;
      per_day_estimate: number;
    };
    by_service: Record<string, {
      all_time: number;
      today: number;
      calls: number;
      calls_today: number;
    }>;
    hourly_trend: Array<{
      hour: string;
      usd: number;
      calls: number;
    }>;
  };
}

Design Principles

1. Precompute derived values

Do not store raw data and compute percentages on the dashboard. Compute them in the snapshot:

// Good — precomputed
pct_enriched: totalApps?.n
  ? Math.round(((enrichedApps?.n ?? 0) / totalApps.n) * 100)
  : 0,

// Bad — forces the dashboard to recompute
// pct_enriched: undefined  (let the frontend do the math)

2. Include time windows, not just totals

Totals tell you “how much exists.” Velocities tell you “how fast things are moving.” Both are essential:

velocity: {
  apps_last_hour: 12,     // 12 new apps discovered in the last hour
  apps_last_24h: 287,     // 287 new apps in the last day
  enriched_last_hour: 5,  // 5 apps enriched in the last hour
  enriched_last_24h: 120, // 120 enriched in the last day
}

3. Include cost projections

Knowing “we spent $0.12 today” is less useful than “at this rate we’ll spend $2.88 by end of day”:

rate: {
  per_hour: 0.005,              // $0.005/hour current burn rate
  per_day_estimate: 0.12,       // projected daily spend
}

4. Include ETA calculations

For batch processing pipelines, the most important number is “when will it be done?”:

est_days_to_full_enrichment:
  enrichedLast24h > 0
    ? Math.round(pendingItems / enrichedLast24h)
    : null,  // null = enrichment is paused, can't estimate

5. Round monetary values

Floating point math produces numbers like 0.023499999999999997. Round to 4 decimal places for display:

function round4(n: number): number {
  return Math.round(n * 10000) / 10000;
}

// $0.0235 not $0.023499999999999997
costs.today.usd = round4(rawCost);

Minimizing D1 Cost of Snapshot Computation

Even though the snapshot runs only every 5 minutes, each computation still hits D1 with multiple queries. Here are techniques to reduce the cost:

Use indexed columns in WHERE clauses

-- Expensive: full table scan
SELECT COUNT(*) FROM app_registry WHERE enrichment_source = 'scraper'

-- Cheaper: if enrichment_source is indexed, D1 uses the index
CREATE INDEX idx_enrichment_source ON app_registry(enrichment_source);

Use time-bounded queries with indexed timestamps

-- Cheaper than COUNT(*) on the whole table
SELECT COUNT(*) FROM app_registry
WHERE created_at >= datetime('now', '-1 hour')

-- Only works well if created_at is indexed
CREATE INDEX idx_created_at ON app_registry(created_at);

Batch related queries

// Instead of 21 individual queries, use compound queries where possible
const stats = await db.prepare(`
  SELECT
    COUNT(*) as total,
    COUNT(CASE WHEN status = 'enriched' THEN 1 END) as enriched,
    COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
    COUNT(CASE WHEN created_at >= datetime('now', '-1 hour') THEN 1 END) as last_hour,
    COUNT(CASE WHEN created_at >= datetime('now', '-24 hours') THEN 1 END) as last_24h
  FROM items
`).first();

Key insight: A single query with conditional COUNT expressions scans the table once instead of five times. Five COUNT(*) queries with different WHERE clauses = five full table scans. One query with five CASE WHEN expressions = one full table scan.

Consider maintaining a counters table

For the most expensive counts (total items, total enriched), maintain a separate counters table that is updated on writes:

// On every item insert
await db.batch([
  db.prepare('INSERT INTO items (id, name, status) VALUES (?, ?, ?)').bind(id, name, 'pending'),
  db.prepare("UPDATE counters SET value = value + 1 WHERE name = 'total_items'"),
]);

// On every enrichment
await db.batch([
  db.prepare("UPDATE items SET status = 'enriched', enriched_at = datetime('now') WHERE id = ?").bind(id),
  db.prepare("UPDATE counters SET value = value + 1 WHERE name = 'enriched_items'"),
]);

// In the snapshot — 1 row read instead of 500,000
const counts = await db.prepare('SELECT name, value FROM counters').all();

This is the precompute-on-write pattern applied to D1 internals.


Pattern 3: Dashboard Routes from KV

The Pattern

Build Hono API routes that serve monitoring data from KV instead of running live D1 queries. The dashboard calls these routes, gets instant responses, and never touches D1.

When to Use It

Route Structure

import { Hono } from 'hono';

interface Env {
  DB: D1Database;
  KV: KVNamespace;
}

const app = new Hono<{ Bindings: Env }>();

// ── Latest Health (from KV) ─────────────────────────────────────
// Cost: 1 KV read = $0.0000002
app.get('/api/health', async (c) => {
  const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');

  if (!snapshot) {
    return c.json({ error: 'No snapshot available yet' }, 404);
  }

  return c.json({
    health: snapshot.health,
    snapshot_at: snapshot.taken_at,
    freshness_seconds: Math.round(
      (Date.now() - new Date(snapshot.taken_at).getTime()) / 1000,
    ),
  });
});

// ── Snapshot History (from KV) ──────────────────────────────────
// Cost: 1 KV read = $0.0000002
app.get('/api/health/history', async (c) => {
  const hours = parseInt(c.req.query('hours') || '24');
  const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');

  if (!snapshots) {
    return c.json({ snapshots: [], count: 0 });
  }

  const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000).toISOString();
  const filtered = snapshots.filter((s) => s.taken_at >= cutoff);

  return c.json({
    snapshots: filtered,
    count: filtered.length,
    window_hours: hours,
  });
});

// ── Velocity Chart Data (from KV) ──────────────────────────────
// Transforms snapshot history into chart-ready time series
app.get('/api/health/velocity', async (c) => {
  const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');

  if (!snapshots || snapshots.length === 0) {
    return c.json({ series: [] });
  }

  const series = snapshots.map((s) => ({
    time: s.taken_at,
    items_per_hour: s.health.velocity.items_last_hour,
    enriched_per_hour: s.health.velocity.enriched_last_hour,
    cost_per_hour: s.health.costs.rate_per_hour,
  }));

  return c.json({ series });
});

// ── Cost Breakdown (from KV) ────────────────────────────────────
app.get('/api/health/costs', async (c) => {
  const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');

  if (!snapshot) {
    return c.json({ error: 'No snapshot available' }, 404);
  }

  return c.json({
    today: snapshot.health.costs.today,
    last_24h: snapshot.health.costs.last_24h,
    rate: snapshot.health.costs.rate,
    by_service: snapshot.health.costs.by_service,
    hourly_trend: snapshot.health.costs.hourly_trend,
    snapshot_at: snapshot.taken_at,
  });
});

export default app;

Adding Freshness Indicators

Stale data is dangerous. If the cron stops firing, the dashboard should tell you the data is old:

app.get('/api/health', async (c) => {
  const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');

  if (!snapshot) {
    return c.json({ status: 'no_data', error: 'No snapshot available' }, 404);
  }

  const ageSeconds = Math.round(
    (Date.now() - new Date(snapshot.taken_at).getTime()) / 1000,
  );

  // Stale if more than 10 minutes old (2x the 5-minute interval)
  const status = ageSeconds > 600 ? 'stale' : 'fresh';

  // Very stale if more than 30 minutes old (cron probably stopped)
  const warning = ageSeconds > 1800
    ? 'Health snapshot is over 30 minutes old. Check if the cron is running.'
    : undefined;

  return c.json({
    status,
    warning,
    health: snapshot.health,
    snapshot_at: snapshot.taken_at,
    age_seconds: ageSeconds,
  });
});

Response Headers for Caching

Since the data is already precomputed and at most 5 minutes stale, you can add cache headers to reduce even KV reads:

app.get('/api/health', async (c) => {
  const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
  if (!snapshot) return c.json({ error: 'No data' }, 404);

  // Cache for 60 seconds — the data only changes every 5 minutes anyway
  c.header('Cache-Control', 'public, max-age=60, s-maxage=60');
  c.header('CDN-Cache-Control', 'max-age=60');

  return c.json({
    health: snapshot.health,
    snapshot_at: snapshot.taken_at,
  });
});

Gotchas

  1. 404 on first deploy — The very first time you deploy, there is no snapshot in KV yet. The cron has not fired. Handle the 404 gracefully in your dashboard.

  2. Large snapshot arrays — If your snapshot JSON is 3 KB and you have 288 of them, the health:snapshots value is ~860 KB. KV reads are billed per operation regardless of value size, so this is fine cost-wise, but it means every history request downloads the full array. Consider client-side caching.

  3. No partial reads — You cannot read “just the last 6 hours” from KV. You always read the full array and filter in the Worker. If the array is large, this adds latency. The alternative is to split into multiple keys (one per hour), but that complicates the write side.


Pattern 4: Analytics Engine for Time-Series

The Pattern

Use Workers Analytics Engine to record individual events as data points. Query them with the SQL API for dashboards, alerts, or long-term trending.

When to Use It

Binding Configuration

// wrangler.jsonc
{
  "analytics_engine_datasets": [
    {
      "binding": "METRICS",
      "dataset": "pipeline_metrics"
    }
  ]
}

Writing Data Points

Analytics Engine data points have three fields: blobs (string dimensions), doubles (numeric values), and indexes (sampling keys).

interface Env {
  METRICS: AnalyticsEngineDataset;
}

// Record a crawl cycle completion
function recordCrawlCycle(env: Env, stats: CrawlStats) {
  env.METRICS.writeDataPoint({
    blobs: [
      'crawl_cycle',              // blob1: event type
      stats.source,                // blob2: data source
      stats.status,                // blob3: success/failure
    ],
    doubles: [
      stats.items_discovered,      // double1: count of new items
      stats.items_enriched,        // double2: count enriched
      stats.duration_ms,           // double3: cycle duration
      stats.api_cost_usd,          // double4: API cost
    ],
    indexes: [stats.pipeline_id],  // sampling key
  });
}

// Record an API call with cost
function recordApiCall(env: Env, service: string, cost: number, success: boolean) {
  env.METRICS.writeDataPoint({
    blobs: [
      'api_call',                  // blob1: event type
      service,                     // blob2: service name (e.g., 'dataforseo')
      success ? 'success' : 'error', // blob3: outcome
    ],
    doubles: [
      1,                           // double1: count (always 1 for individual events)
      cost,                        // double2: cost in USD
    ],
    indexes: [service],
  });
}

// Record an error
function recordError(env: Env, component: string, errorType: string) {
  env.METRICS.writeDataPoint({
    blobs: [
      'error',                     // blob1: event type
      component,                   // blob2: which component failed
      errorType,                   // blob3: error classification
    ],
    doubles: [1],                  // double1: count
    indexes: [component],
  });
}

Key insight: writeDataPoint() does not need to be awaited. It returns immediately and the Workers runtime persists the data point asynchronously. This means zero impact on your Worker’s response time or CPU budget.

Querying from a Worker

To query Analytics Engine from inside a Worker, you call the SQL API over HTTP. This requires your account ID and an API token with Analytics Read permissions.

interface Env {
  ACCOUNT_ID: string;
  ANALYTICS_API_TOKEN: string;
}

async function queryAnalyticsEngine(
  env: Env,
  sql: string,
): Promise<AnalyticsQueryResult> {
  const url = `https://api.cloudflare.com/client/v4/accounts/${env.ACCOUNT_ID}/analytics_engine/sql`;

  const response = await fetch(url, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${env.ANALYTICS_API_TOKEN}`,
    },
    body: sql,
  });

  if (!response.ok) {
    const text = await response.text();
    throw new Error(`Analytics Engine query failed: ${response.status} ${text}`);
  }

  return response.json();
}

interface AnalyticsQueryResult {
  data: Record<string, unknown>[];
  meta: Array<{ name: string; type: string }>;
  rows: number;
}

Example Queries

Throughput by hour for the last 24 hours:

SELECT
  toStartOfHour(timestamp) AS hour,
  SUM(_sample_interval * double1) AS items_discovered,
  SUM(_sample_interval * double2) AS items_enriched
FROM pipeline_metrics
WHERE
  blob1 = 'crawl_cycle'
  AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY hour
ORDER BY hour DESC

API cost by service for today:

SELECT
  blob2 AS service,
  SUM(_sample_interval * double2) AS total_cost_usd,
  SUM(_sample_interval * double1) AS total_calls
FROM pipeline_metrics
WHERE
  blob1 = 'api_call'
  AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY service
ORDER BY total_cost_usd DESC

Error rate by component, last 6 hours:

SELECT
  blob2 AS component,
  SUM(_sample_interval * double1) AS error_count,
  toStartOfHour(timestamp) AS hour
FROM pipeline_metrics
WHERE
  blob1 = 'error'
  AND timestamp > NOW() - INTERVAL '6' HOUR
GROUP BY component, hour
ORDER BY hour DESC, error_count DESC

Average cycle duration by source:

SELECT
  blob2 AS source,
  AVG(double3) AS avg_duration_ms,
  MAX(double3) AS max_duration_ms,
  SUM(_sample_interval) AS total_cycles
FROM pipeline_metrics
WHERE
  blob1 = 'crawl_cycle'
  AND timestamp > NOW() - INTERVAL '7' DAY
GROUP BY source

The _sample_interval Factor

Analytics Engine samples data at high volumes. The _sample_interval column tells you how many events each row represents. At low volumes (under the sampling threshold), _sample_interval is always 1. At high volumes, it may be higher.

Always multiply numeric values by _sample_interval when computing aggregates:

-- Wrong: undercounts at high volume
SELECT COUNT(*) AS total_events FROM pipeline_metrics

-- Right: accounts for sampling
SELECT SUM(_sample_interval) AS total_events FROM pipeline_metrics

Building a Grafana Dashboard

Analytics Engine’s SQL API is compatible with Grafana’s JSON API data source. You can point Grafana at a Worker route that proxies queries to Analytics Engine.

// Grafana-compatible proxy route
app.post('/api/grafana/query', async (c) => {
  const body = await c.req.json();
  const sql = body.sql;

  // Validate the query is read-only (prevent injection)
  if (!sql.trim().toUpperCase().startsWith('SELECT')) {
    return c.json({ error: 'Only SELECT queries allowed' }, 400);
  }

  const result = await queryAnalyticsEngine(c.env, sql);
  return c.json(result);
});

Gotchas

  1. No binding for reads — You can write with env.METRICS.writeDataPoint() but you cannot query with a binding. Reads go through the HTTP SQL API, which means you need to store an API token as a secret.

  2. Field ordering mattersblob1, blob2, blob3 etc. are positional. If you change the order of your blobs, old and new data become incompatible. Define a schema early and stick with it.

  3. 92-day retention — Data older than ~3 months is automatically deleted. If you need longer retention, roll up daily aggregates into D1.

  4. Not yet billed — As of early 2026, Cloudflare has published pricing ($0.25/M writes, $1.00/M queries) but is not actively billing for Analytics Engine usage. Plan for the cost but enjoy the free ride while it lasts.

  5. Query latency — SQL API queries typically take 50-200ms. This is fine for dashboards but too slow for hot-path request handling. Never put an Analytics Engine query in a user-facing request path.

  6. Single index only — You can only provide one index per data point. If you provide multiple, the data point is silently dropped. Use the most meaningful grouping key as your index.


Pattern 5: The Hybrid Storage Approach

The Pattern

Use each Cloudflare storage primitive for what it does best. D1 for transactional application data. KV for operational snapshots. Analytics Engine for time-series metrics. Never cross the streams.

When to Use It

The Architecture

┌─────────────────────────────────────────────────────────────────┐
│                      Workers Pipeline                           │
│                                                                 │
│  ┌──────────┐   ┌──────────────┐   ┌────────────────────────┐  │
│  │  Cron     │   │  Queue       │   │  Durable Object        │  │
│  │  Handler  │   │  Consumer    │   │  Controller            │  │
│  └─────┬────┘   └──────┬───────┘   └───────────┬────────────┘  │
│        │               │                        │               │
│        ▼               ▼                        ▼               │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │              Application Logic                          │    │
│  │  (crawl, enrich, score, publish)                       │    │
│  └──────┬──────────────┬──────────────────────┬───────────┘    │
│         │              │                       │                │
│    Write app data  Write metrics          Write snapshot        │
│         │              │                       │                │
│         ▼              ▼                       ▼                │
│  ┌──────────┐   ┌──────────────┐   ┌───────────────────┐      │
│  │    D1    │   │  Analytics   │   │       KV          │      │
│  │          │   │  Engine      │   │                   │      │
│  │ items    │   │              │   │ health:latest     │      │
│  │ keywords │   │ pipeline_    │   │ health:snapshots  │      │
│  │ api_calls│   │ metrics      │   │                   │      │
│  │ agents   │   │              │   │                   │      │
│  └──────────┘   └──────────────┘   └───────────────────┘      │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                Dashboard Routes                         │    │
│  │  GET /health         → reads KV (1 read)               │    │
│  │  GET /health/history → reads KV (1 read)               │    │
│  │  GET /health/costs   → reads KV (1 read)               │    │
│  │  GET /activity       → reads D1 (time-bounded query)   │    │
│  │  GET /metrics/trend  → queries Analytics Engine (HTTP)  │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

What Goes Where

Data TypeStorageWhy
App records, keywords, user dataD1Transactional, queryable, relational
API call ledger (individual calls)D1Needs to be queryable for billing disputes
Agent processing ledgerD1Auditable record of what each agent did
Current health snapshotKVSingle key read, no query needed
24h snapshot historyKVRolling array, no query needed
Per-event metrics (throughput, cost, errors)Analytics EngineTime-series queries, 92-day retention
Long-term daily aggregatesD1 (optional)Permanent trend data beyond 92 days

Binding Configuration

// wrangler.jsonc
{
  "name": "pipeline-worker",
  "main": "src/index.ts",
  "compatibility_date": "2025-03-07",
  "compatibility_flags": ["nodejs_compat"],

  // D1 — transactional application data
  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "pipeline-db",
      "database_id": "your-database-id",
      "migrations_dir": "migrations"
    }
  ],

  // KV — operational snapshots
  "kv_namespaces": [
    {
      "binding": "KV",
      "id": "your-kv-namespace-id"
    }
  ],

  // Analytics Engine — time-series metrics
  "analytics_engine_datasets": [
    {
      "binding": "METRICS",
      "dataset": "pipeline_metrics"
    }
  ],

  // Cron triggers
  "triggers": {
    "crons": [
      "* * * * *",     // every minute: crawl cycle
      "0 */6 * * *",   // every 6 hours: deeper analysis
      "0 6 * * *"      // daily: cleanup, aggregation
    ]
  }
}

Type Definitions

interface Env {
  DB: D1Database;
  KV: KVNamespace;
  METRICS: AnalyticsEngineDataset;
  ACCOUNT_ID: string;
  ANALYTICS_API_TOKEN: string;
}

The Instrumented Pipeline

Here is how a crawl cycle writes to all three storage layers:

async function runInstrumentedCrawlCycle(env: Env): Promise<void> {
  const startTime = Date.now();
  let itemsDiscovered = 0;
  let itemsEnriched = 0;
  let apiCostUsd = 0;
  let errors = 0;

  try {
    // ── Step 1: Crawl (writes to D1) ────────────────────────────
    const crawlResult = await crawlDataSource(env.DB);
    itemsDiscovered = crawlResult.newItems;

    // Record each API call to D1 (for auditing) and Analytics Engine (for trending)
    for (const call of crawlResult.apiCalls) {
      // D1: permanent audit record
      await env.DB.prepare(
        'INSERT INTO api_calls (service, endpoint, cost_usd, created_at) VALUES (?, ?, ?, datetime("now"))',
      ).bind(call.service, call.endpoint, call.cost).run();

      // Analytics Engine: time-series metric
      env.METRICS.writeDataPoint({
        blobs: ['api_call', call.service, 'success'],
        doubles: [1, call.cost],
        indexes: [call.service],
      });

      apiCostUsd += call.cost;
    }

    // ── Step 2: Enrich (writes to D1) ───────────────────────────
    const enrichResult = await enrichPendingItems(env.DB);
    itemsEnriched = enrichResult.enriched;

    // ── Step 3: Record cycle metrics to Analytics Engine ────────
    env.METRICS.writeDataPoint({
      blobs: ['crawl_cycle', 'main', 'success'],
      doubles: [
        itemsDiscovered,
        itemsEnriched,
        Date.now() - startTime,
        apiCostUsd,
      ],
      indexes: ['main'],
    });

  } catch (error) {
    errors++;

    // Record error to Analytics Engine
    env.METRICS.writeDataPoint({
      blobs: ['error', 'crawl_cycle', (error as Error).message.slice(0, 100)],
      doubles: [1],
      indexes: ['crawl_cycle'],
    });

    // Also log to D1 error table for investigation
    await env.DB.prepare(
      'INSERT INTO error_log (component, message, created_at) VALUES (?, ?, datetime("now"))',
    ).bind('crawl_cycle', (error as Error).message).run();
  }
}

Gotchas

  1. Do not query Analytics Engine from the cron — The SQL API adds 50-200ms latency per query. In a cron that runs every minute, this overhead is significant. Use Analytics Engine for dashboard queries, not for cron-time computation.

  2. Keep D1 writes minimal in the cron — The D1 write budget is 50 million rows/month included. Each crawl cycle should write only the data that needs to be durable and queryable. Metrics go to Analytics Engine.

  3. KV eventual consistency — If your dashboard is served from a different Cloudflare region than where the cron runs, the KV value might be up to 60 seconds behind. Add a freshness_seconds field to the response so the consumer knows.


Pattern 6: Activity Feed Pattern

The Pattern

For “what just happened?” queries — recent activity, last N minutes of work — use D1 with narrow time-bounded queries. These are fundamentally different from monitoring aggregates and are appropriate for D1 because they scan a small, bounded number of rows.

When to Use It

The Key Difference

Monitoring aggregates scan the entire table: “how many apps exist?” Activity feeds scan a time window: “what happened in the last 5 minutes?” If your table has 500,000 rows but only 50 were created in the last 5 minutes, the activity query reads ~50 rows (with a timestamp index), not 500,000.

// Activity feed route — reads D1, but with bounded queries
app.get('/api/activity', async (c) => {
  const minutes = parseInt(c.req.query('minutes') || '5');

  // Cap the window to prevent accidental full scans
  const cappedMinutes = Math.min(minutes, 60);

  const [recentApiCalls, recentAgentWork, recentDiscovered, recentEnriched] =
    await Promise.all([
      c.env.DB.prepare(`
        SELECT service, context_id, COUNT(*) as calls, MAX(created_at) as last_at
        FROM api_calls
        WHERE created_at >= datetime('now', '-' || ? || ' minutes')
        GROUP BY service, context_id
        ORDER BY last_at DESC
      `).bind(cappedMinutes).all(),

      c.env.DB.prepare(`
        SELECT agent, stage, COUNT(*) as processed, MAX(computed_at) as last_at
        FROM agent_ledger
        WHERE computed_at >= datetime('now', '-' || ? || ' minutes')
        GROUP BY agent, stage
        ORDER BY last_at DESC
      `).bind(cappedMinutes).all(),

      c.env.DB.prepare(`
        SELECT COUNT(*) as n FROM items
        WHERE created_at >= datetime('now', '-' || ? || ' minutes')
      `).bind(cappedMinutes).first<{ n: number }>(),

      c.env.DB.prepare(`
        SELECT COUNT(*) as n FROM items
        WHERE enriched_at >= datetime('now', '-' || ? || ' minutes')
      `).bind(cappedMinutes).first<{ n: number }>(),
    ]);

  return c.json({
    window_minutes: cappedMinutes,
    discovered: recentDiscovered?.n ?? 0,
    enriched: recentEnriched?.n ?? 0,
    api_calls: recentApiCalls.results,
    agent_work: recentAgentWork.results,
  });
});

Index Requirements

Activity feed queries are only cheap if the timestamp columns are indexed:

CREATE INDEX idx_api_calls_created_at ON api_calls(created_at);
CREATE INDEX idx_agent_ledger_computed_at ON agent_ledger(computed_at);
CREATE INDEX idx_items_created_at ON items(created_at);
CREATE INDEX idx_items_enriched_at ON items(enriched_at);

Without these indexes, WHERE created_at >= datetime('now', '-5 minutes') becomes a full table scan — exactly what we are trying to avoid.

Cap the Time Window

Always cap the time window on activity feed queries. A user requesting ?minutes=10080 (one week) would scan every row created in the last week — potentially hundreds of thousands of rows:

const cappedMinutes = Math.min(
  parseInt(c.req.query('minutes') || '5'),
  60,  // max 1 hour
);

Gotchas

  1. GROUP BY adds costGROUP BY service, context_id means D1 sorts the results. On a small result set (5 minutes of data), this is fast. On a large result set (1 hour of high-throughput data), it adds up.

  2. datetime() function callsdatetime('now', '-5 minutes') is computed by SQLite at query time. It does not use cached results. This is fine.

  3. Time zone awarenessdatetime('now') in SQLite returns UTC. Make sure your dashboard is aware of this.


Pattern 7: Precompute on Write, Not on Read

The Pattern

Instead of computing aggregates when the dashboard asks for them (read-time), compute them when the data changes (write-time) and store the results. This inverts the cost model: you pay a tiny premium on every write instead of a large cost on every read.

When to Use It

The Counters Table

The simplest version: maintain a counters table with pre-aggregated values.

CREATE TABLE counters (
  name TEXT PRIMARY KEY,
  value INTEGER NOT NULL DEFAULT 0,
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

-- Seed with initial values
INSERT INTO counters (name, value) VALUES
  ('total_items', 0),
  ('enriched_items', 0),
  ('pending_items', 0),
  ('total_keywords', 0),
  ('total_errors_today', 0);

Increment on Write

Use D1 batch operations to update counters atomically with the data write:

async function insertItem(db: D1Database, item: NewItem): Promise<void> {
  await db.batch([
    db.prepare(
      'INSERT INTO items (id, name, status, created_at) VALUES (?, ?, ?, datetime("now"))',
    ).bind(item.id, item.name, 'pending'),

    db.prepare(
      "UPDATE counters SET value = value + 1, updated_at = datetime('now') WHERE name = 'total_items'",
    ),

    db.prepare(
      "UPDATE counters SET value = value + 1, updated_at = datetime('now') WHERE name = 'pending_items'",
    ),
  ]);
}

async function markEnriched(db: D1Database, itemId: string): Promise<void> {
  await db.batch([
    db.prepare(
      "UPDATE items SET status = 'enriched', enriched_at = datetime('now') WHERE id = ?",
    ).bind(itemId),

    db.prepare(
      "UPDATE counters SET value = value + 1, updated_at = datetime('now') WHERE name = 'enriched_items'",
    ),

    db.prepare(
      "UPDATE counters SET value = value - 1, updated_at = datetime('now') WHERE name = 'pending_items'",
    ),
  ]);
}

Reading Counters

Now the health snapshot reads counters instead of running COUNT(*):

async function getCountsFromCountersTable(
  db: D1Database,
): Promise<Record<string, number>> {
  const result = await db.prepare('SELECT name, value FROM counters').all();

  return Object.fromEntries(
    result.results.map((r) => [r.name as string, r.value as number]),
  );
}

// This reads ~5 rows instead of scanning 500,000
// Cost: ~5 row reads vs ~500,000 row reads

Daily Counter Reset

Some counters need to reset daily (errors today, cost today). Use the daily cron for this:

// In the daily cron (0 0 * * *)
async function resetDailyCounters(db: D1Database): Promise<void> {
  await db.batch([
    db.prepare(
      "UPDATE counters SET value = 0, updated_at = datetime('now') WHERE name = 'total_errors_today'",
    ),
    db.prepare(
      "UPDATE counters SET value = 0, updated_at = datetime('now') WHERE name = 'cost_calls_today'",
    ),
  ]);
}

Cost Analysis

Without counters (21 COUNT queries, each scanning ~100,000 rows, every 5 minutes):

21 queries * 100,000 rows * 288 ticks/day = 604,800,000 row reads/day
Monthly: 18.1 billion row reads

With counters (1 query reading ~10 rows, every 5 minutes):

1 query * 10 rows * 288 ticks/day = 2,880 row reads/day
Monthly: 86,400 row reads

The counter approach writes a few extra rows per item processed. If you process 1,000 items per day and each insert triggers 2 counter updates:

1,000 items * 2 counter updates = 2,000 extra row writes/day
Monthly: 60,000 row writes

Row writes cost $1.00 per million. 60,000 writes cost $0.00006. The savings on reads (18.1 billion to 86,400) pay for the extra writes by a factor of approximately 200,000.

Key insight: Precomputing on write trades expensive reads for cheap writes. In D1’s pricing model, reads are $0.001/M and writes are $1.00/M — reads are 1,000x cheaper per row. But when your read queries scan 100,000 rows each, a single COUNT(*) costs the equivalent of 100 row reads (since $0.001/M * 100,000 = same cost as writing 0.1 rows at $1.00/M). The economics always favor precomputation when read queries scan large tables.

Gotchas

  1. Counter drift — If a write succeeds but the counter update fails (or vice versa), the counter becomes inaccurate. D1 batch operations are atomic within a single database, so this should not happen for same-database batches. But if you forget to batch them, you will drift.

  2. Reconciliation — Run a periodic reconciliation job (daily or weekly) that compares counter values with actual COUNT(*) results and corrects drift:

async function reconcileCounters(db: D1Database): Promise<void> {
  const actual = await db.prepare('SELECT COUNT(*) as n FROM items').first<{ n: number }>();
  await db.prepare(
    "UPDATE counters SET value = ?, updated_at = datetime('now') WHERE name = 'total_items'",
  ).bind(actual?.n ?? 0).run();
  // Repeat for other counters...
}
  1. Migration — If you add the counters table to an existing system, you need to seed the counters with the current COUNT(*) values. Run this once:
async function seedCounters(db: D1Database): Promise<void> {
  const [total, enriched, pending] = await Promise.all([
    db.prepare('SELECT COUNT(*) as n FROM items').first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'enriched'").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'pending'").first<{ n: number }>(),
  ]);

  await db.batch([
    db.prepare('INSERT OR REPLACE INTO counters (name, value) VALUES (?, ?)').bind('total_items', total?.n ?? 0),
    db.prepare('INSERT OR REPLACE INTO counters (name, value) VALUES (?, ?)').bind('enriched_items', enriched?.n ?? 0),
    db.prepare('INSERT OR REPLACE INTO counters (name, value) VALUES (?, ?)').bind('pending_items', pending?.n ?? 0),
  ]);
}

Small Examples

Example 1: Minimal Health Check Endpoint

The simplest possible health endpoint — just read the latest snapshot from KV.

import { Hono } from 'hono';

const app = new Hono<{ Bindings: { KV: KVNamespace } }>();

app.get('/health', async (c) => {
  const data = await c.env.KV.get('health:latest', 'json');
  if (!data) return c.json({ status: 'initializing' }, 503);

  const age = Date.now() - new Date((data as any).taken_at).getTime();
  return c.json({
    status: age < 600_000 ? 'healthy' : 'stale',
    age_seconds: Math.round(age / 1000),
    ...(data as any).health,
  });
});

export default app;

Example 2: Recording Queue Consumer Metrics

Instrument a queue consumer to write metrics to Analytics Engine after processing each batch.

interface Env {
  METRICS: AnalyticsEngineDataset;
  DB: D1Database;
}

interface QueueMessage {
  type: string;
  payload: Record<string, unknown>;
}

export default {
  async queue(
    batch: MessageBatch<QueueMessage>,
    env: Env,
  ): Promise<void> {
    const startTime = Date.now();
    let successes = 0;
    let failures = 0;

    for (const msg of batch.messages) {
      try {
        await processMessage(env.DB, msg.body);
        msg.ack();
        successes++;
      } catch {
        msg.retry();
        failures++;
      }
    }

    // Record batch metrics — fire and forget
    env.METRICS.writeDataPoint({
      blobs: ['queue_batch', batch.queue, successes === batch.messages.length ? 'clean' : 'partial'],
      doubles: [
        batch.messages.length,  // total messages
        successes,              // successful
        failures,               // failed
        Date.now() - startTime, // batch duration ms
      ],
      indexes: [batch.queue],
    });
  },
};

async function processMessage(db: D1Database, body: QueueMessage): Promise<void> {
  // Your processing logic here
}

Example 3: Durable Object Health Reporter

A Durable Object that exposes its internal state as a health report, written to KV on every alarm.

export class PipelineController implements DurableObject {
  private state: DurableObjectState;
  private env: Env;
  private processedCount = 0;
  private errorCount = 0;
  private lastCycleMs = 0;

  constructor(state: DurableObjectState, env: Env) {
    this.state = state;
    this.env = env;

    state.blockConcurrencyWhile(async () => {
      this.processedCount = (await state.storage.get('processedCount')) ?? 0;
      this.errorCount = (await state.storage.get('errorCount')) ?? 0;
    });
  }

  async alarm(): Promise<void> {
    const start = Date.now();

    try {
      await this.runCycle();
      this.processedCount++;
      await this.state.storage.put('processedCount', this.processedCount);
    } catch {
      this.errorCount++;
      await this.state.storage.put('errorCount', this.errorCount);
    }

    this.lastCycleMs = Date.now() - start;

    // Write health to KV
    await this.env.KV.put(
      'do:pipeline-controller:health',
      JSON.stringify({
        processed: this.processedCount,
        errors: this.errorCount,
        last_cycle_ms: this.lastCycleMs,
        error_rate: this.processedCount > 0
          ? this.errorCount / (this.processedCount + this.errorCount)
          : 0,
        updated_at: new Date().toISOString(),
      }),
    );

    // Schedule next alarm
    this.state.storage.setAlarm(Date.now() + 30_000); // 30 seconds
  }

  private async runCycle(): Promise<void> {
    // Processing logic
  }
}

Example 4: Cost Alerting with Analytics Engine

Query Analytics Engine to check if API spend exceeds a threshold, and log a warning.

async function checkCostAlert(env: Env): Promise<void> {
  const sql = `
    SELECT SUM(_sample_interval * double2) AS total_cost
    FROM pipeline_metrics
    WHERE blob1 = 'api_call'
      AND timestamp > NOW() - INTERVAL '24' HOUR
  `;

  const result = await queryAnalyticsEngine(env, sql);
  const totalCost = result.data[0]?.total_cost as number ?? 0;

  const DAILY_BUDGET = 1.00; // $1.00/day

  if (totalCost > DAILY_BUDGET * 0.8) {
    console.warn(
      `[COST ALERT] 24h API spend: $${totalCost.toFixed(4)} (${Math.round(totalCost / DAILY_BUDGET * 100)}% of budget)`,
    );

    // Write alert to KV for dashboard
    await env.KV.put(
      'alert:cost',
      JSON.stringify({
        level: totalCost > DAILY_BUDGET ? 'critical' : 'warning',
        spend_24h: totalCost,
        budget: DAILY_BUDGET,
        pct: Math.round(totalCost / DAILY_BUDGET * 100),
        checked_at: new Date().toISOString(),
      }),
    );
  }
}

Example 5: Multi-Pipeline Health Aggregator

When you have multiple pipelines (crawling, enrichment, publishing), aggregate their health into a single dashboard.

interface PipelineHealth {
  name: string;
  status: 'running' | 'paused' | 'error' | 'unknown';
  last_activity: string;
  items_processed_24h: number;
  error_rate: number;
}

app.get('/api/health/pipelines', async (c) => {
  // Read health from each pipeline's KV key
  const keys = [
    'health:crawler',
    'health:enrichment',
    'health:publisher',
    'health:agents',
  ];

  const results = await Promise.all(
    keys.map(async (key) => {
      const data = await c.env.KV.get(key, 'json');
      return { key, data };
    }),
  );

  const pipelines: PipelineHealth[] = results.map(({ key, data }) => {
    const name = key.split(':')[1];

    if (!data) {
      return { name, status: 'unknown' as const, last_activity: 'never', items_processed_24h: 0, error_rate: 0 };
    }

    const health = data as any;
    const age = Date.now() - new Date(health.updated_at).getTime();
    const status = age > 600_000 ? 'error' : health.paused ? 'paused' : 'running';

    return {
      name,
      status: status as PipelineHealth['status'],
      last_activity: health.updated_at,
      items_processed_24h: health.items_24h ?? 0,
      error_rate: health.error_rate ?? 0,
    };
  });

  const overall = pipelines.every((p) => p.status === 'running')
    ? 'healthy'
    : pipelines.some((p) => p.status === 'error')
      ? 'degraded'
      : 'partial';

  return c.json({ overall, pipelines });
});

Example 6: Snapshot Diff — What Changed Since Last Check

Compare the current snapshot with the previous one to highlight changes.

app.get('/api/health/diff', async (c) => {
  const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');

  if (!snapshots || snapshots.length < 2) {
    return c.json({ error: 'Need at least 2 snapshots for diff' }, 400);
  }

  const current = snapshots[snapshots.length - 1];
  const previous = snapshots[snapshots.length - 2];

  const diff = {
    interval_minutes: Math.round(
      (new Date(current.taken_at).getTime() - new Date(previous.taken_at).getTime()) / 60000,
    ),
    counts: {
      total_items: current.health.counts.total_items - previous.health.counts.total_items,
      enriched_items: current.health.counts.enriched_items - previous.health.counts.enriched_items,
      pending_items: current.health.counts.pending_items - previous.health.counts.pending_items,
    },
    costs: {
      usd_since_last: round4(
        current.health.costs.today_usd - previous.health.costs.today_usd,
      ),
    },
    errors: {
      new_errors: current.health.errors.last_hour - previous.health.errors.last_hour,
    },
  };

  return c.json({
    current_at: current.taken_at,
    previous_at: previous.taken_at,
    diff,
  });
});

Example 7: KV Snapshot with TTL Auto-Cleanup

Use KV’s built-in expiration to automatically clean up old snapshots without manual pruning.

async function writeSnapshotWithTTL(
  kv: KVNamespace,
  health: HealthData,
): Promise<void> {
  const now = new Date();
  const timestamp = now.toISOString();
  const hourKey = `health:hourly:${now.toISOString().slice(0, 13)}`; // health:hourly:2026-03-14T10

  // Latest — no TTL, always available
  await kv.put('health:latest', JSON.stringify({ health, taken_at: timestamp }));

  // Hourly bucket — expires after 7 days
  const existing = await kv.get<Snapshot[]>(hourKey, 'json');
  const bucket = existing ?? [];
  bucket.push({ health, taken_at: timestamp });

  await kv.put(hourKey, JSON.stringify(bucket), {
    expirationTtl: 7 * 24 * 60 * 60, // 7 days in seconds
  });
}

This eliminates the need for manual array trimming. Each hourly key contains all snapshots for that hour and automatically disappears after 7 days.

Example 8: Analytics Engine Dashboard Route

A complete dashboard route that queries Analytics Engine for trending data.

app.get('/api/metrics/throughput', async (c) => {
  const hours = parseInt(c.req.query('hours') || '24');
  const cappedHours = Math.min(hours, 168); // max 7 days

  const sql = `
    SELECT
      toStartOfHour(timestamp) AS hour,
      SUM(_sample_interval * double1) AS items_discovered,
      SUM(_sample_interval * double2) AS items_enriched,
      AVG(double3) AS avg_cycle_ms,
      SUM(_sample_interval * double4) AS total_cost_usd
    FROM pipeline_metrics
    WHERE
      blob1 = 'crawl_cycle'
      AND timestamp > NOW() - INTERVAL '${cappedHours}' HOUR
    GROUP BY hour
    ORDER BY hour ASC
  `;

  try {
    const result = await queryAnalyticsEngine(c.env, sql);

    return c.json({
      window_hours: cappedHours,
      data_points: result.data,
      total_rows: result.rows,
    });
  } catch (error) {
    return c.json(
      { error: 'Failed to query Analytics Engine', detail: (error as Error).message },
      500,
    );
  }
});

Example 9: Composite Health Score

Compute a single 0-100 health score from multiple signals.

function computeHealthScore(health: HealthData): number {
  let score = 100;

  // Freshness penalty: -20 if nothing processed in the last hour
  if (health.velocity.items_last_hour === 0) {
    score -= 20;
  }

  // Error penalty: -5 per error in the last hour, up to -30
  score -= Math.min(health.errors.last_hour * 5, 30);

  // Cost penalty: -20 if over daily budget
  const dailyBudget = 1.0;
  if (health.costs.rate_per_day_estimate > dailyBudget) {
    score -= 20;
  }

  // Backlog penalty: -10 if pending items are growing
  if (health.velocity.items_last_hour > health.velocity.enriched_last_hour * 2) {
    score -= 10;
  }

  // ETA penalty: -10 if enrichment will take more than 30 days
  if (health.velocity.est_days_to_completion !== null && health.velocity.est_days_to_completion > 30) {
    score -= 10;
  }

  return Math.max(0, Math.min(100, score));
}

app.get('/api/health/score', async (c) => {
  const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
  if (!snapshot) return c.json({ score: 0, status: 'no_data' });

  const score = computeHealthScore(snapshot.health);
  const status = score >= 80 ? 'healthy' : score >= 50 ? 'degraded' : 'critical';

  return c.json({ score, status, snapshot_at: snapshot.taken_at });
});

Example 10: Lightweight Uptime Ping

An external uptime monitor can hit this endpoint. It reads nothing — just confirms the Worker is alive and the cron is running.

app.get('/ping', async (c) => {
  // Read the latest snapshot timestamp from KV
  const raw = await c.env.KV.get('health:latest');
  if (!raw) {
    // No snapshot yet — Worker is running but cron has not fired
    return c.json({ status: 'starting', cron_active: false }, 200);
  }

  const snapshot = JSON.parse(raw);
  const age = Date.now() - new Date(snapshot.taken_at).getTime();

  return c.json({
    status: 'up',
    cron_active: age < 600_000,  // true if snapshot is < 10 min old
    last_snapshot: snapshot.taken_at,
    age_seconds: Math.round(age / 1000),
  });
});

Comparisons

Storage Primitives for Monitoring

FeatureD1KVAnalytics Engine
Data modelRelational (SQLite)Key-valueTime-series (columnar)
Query languageFull SQLGet by key, list by prefixSQL subset (no joins)
Write latency10-30ms1-5ms~0ms (async)
Read latency5-20ms1-5ms (edge cached)50-200ms (HTTP API)
Read cost modelPer row scannedPer key readPer query
Write cost modelPer row writtenPer key writtenPer data point
Read pricing$0.001/M rows$0.20/M reads$1.00/M queries
Write pricing$1.00/M rows$1.00/M writes$0.25/M data points
Included (paid plan)25B reads, 50M writesPaid per use (after free)10M writes, 1M queries
RetentionPermanentPermanent (or TTL)~92 days
Best for monitoringActivity feeds (bounded)Current state, snapshotsTrending, alerting
Worst for monitoringAggregates on large tablesTime-range queriesReal-time reads

Monitoring Approaches Compared

ApproachLatencyD1 CostSetup ComplexityQuery PowerData Retention
Live D1 queries5-20msVery high (full scans)NoneFull SQLPermanent
KV snapshots1-5msLow (5-min interval)LowNone (key lookup)Configurable
Analytics Engine50-200msZeroMediumSQL subset92 days
KV + Analytics Engine1-5ms / 50-200msLowMediumBoth patternsConfigurable + 92 days
External (Datadog)100-500msZeroHighFullPlan-dependent
External (Grafana Cloud)100-500msZeroHighPromQL/SQLPlan-dependent

External Monitoring vs. Built-In

FactorCloudflare Built-In (KV + AE)DatadogGrafana CloudBetter Uptime
SetupAdd bindings, write codeInstall agent, configureInstall agent, configureAdd URL
Monthly cost~$0 (within included)$15+/host$0 (free tier) - $29+$20+
Custom metricswriteDataPoint()DogStatsDPrometheus pushHTTP only
DashboardsBuild yourself (or Grafana)IncludedIncludedBasic
AlertingBuild yourselfIncludedIncludedIncluded
Workers-nativeYes (binding)No (HTTP push)No (HTTP push)No
Vendor lock-inCloudflareDatadogLow (OSS core)Low
Edge latency0ms (same worker)50-200ms (HTTP push)50-200ms (HTTP push)N/A
Retention92 days (AE) + permanent (KV/D1)15 months13 months (free)90 days

Recommendation: Start with KV snapshots for operational health + Analytics Engine for time-series. Add Grafana Cloud only if you need sophisticated alerting rules or cross-service dashboards. Datadog is overkill for most Workers-only architectures.

When to Use External Monitoring

External monitoring makes sense when:

External monitoring does NOT make sense when:


Anti-Patterns

Don’tDo InsteadWhy
Run COUNT(*) on large tables from a cronPrecompute counts on write (counters table) or snapshot to KV every 5 minCOUNT(*) scans every row. A 500K-row table = 500K row reads per invocation.
Store monitoring snapshots in D1Store in KV (health:latest + health:snapshots)D1 snapshots create a table that grows forever, and reading them requires more D1 queries.
Run health queries every minuteRun every 5 minutes (or 15 for non-critical systems)5-min intervals use 5x fewer D1 reads than 1-min. Dashboard users cannot tell the difference.
Query D1 on every dashboard loadRead from KV (precomputed snapshot)Each dashboard load re-runs expensive queries. 100 dashboard loads = 100x the D1 cost.
Use SELECT * for monitoring queriesUse SELECT COUNT(*) with specific WHERE clauses, or better, read countersSELECT * returns all columns and all rows. Monitoring needs counts, not records.
Forget timestamp indexesCreate indexes on created_at, updated_at, enriched_atWithout an index, WHERE created_at >= datetime('now', '-5 minutes') is a full table scan.
Put Analytics Engine queries in request-hot-pathQuery AE from background jobs or dedicated dashboard routesAE SQL API has 50-200ms latency. Acceptable for dashboards, not for user-facing requests.
Store unbounded history in KVTrim to a fixed window (288 entries = 24h at 5-min) or use TTLKV values have a 25 MB limit. Unbounded arrays will hit it and your writes will fail silently.
Use one blob field for everythingDesign a consistent schema with blob1=event_type, blob2=source, blob3=statusInconsistent blob ordering makes queries impossible since AE fields are positional.
Skip reconciliation for countersRun a weekly job that verifies counters against actual COUNT(*)Counters can drift if writes fail partially. Periodic reconciliation catches drift early.
Ignore staleness on dashboardsAdd freshness_seconds and status: 'stale' to health responsesA dashboard showing 2-hour-old data without indicating staleness creates false confidence.
Push metrics to external services from every Worker invocationBatch metrics in Analytics Engine, push to external on a schedulePushing per-invocation adds latency and creates a dependency on an external service.

Cost Math

Scenario: Pipeline Processing 1,000 Items Per Day

The pipeline runs a cron every minute, processes ~42 items per hour, and needs monitoring.

Approach 1: Live D1 Queries (the anti-pattern)

21 COUNT(*) queries per tick, average 100,000 rows scanned each
Ticks per day: 1,440 (every minute)

Row reads per day:  21 * 100,000 * 1,440 = 3,024,000,000
Row reads per month: 3,024,000,000 * 30 = 90,720,000,000

D1 included: 25,000,000,000 / month
Overage: 65,720,000,000 rows
Cost: 65,720,000,000 * $0.001 / 1,000,000 = $65.72/month

Plus D1 writes for snapshots:
1,440 snapshots/day * 30 = 43,200 writes/month
Cost: $0.043

Total monitoring cost: ~$65.77/month

Approach 2: KV Snapshots Every 5 Minutes

D1 reads (computing snapshots):
13 queries * 100,000 rows * 288 ticks/day = 374,400,000/day
Monthly: 11,232,000,000 rows
Within 25B included: $0.00

KV writes:
2 writes per tick (latest + history) * 288 ticks/day = 576 writes/day
Monthly: 17,280 writes
Cost: $0.00 (within free tier of 1,000/day)

KV reads (dashboard loads):
Assume 100 dashboard loads/day = 3,000/month
Cost: $0.00 (within free tier)

Total monitoring cost: ~$0.00/month

Wait — 11.2 billion row reads per month is still nearly half the included 25 billion. Let’s improve with the counters table.

Approach 3: KV Snapshots + Counters Table

D1 reads (computing snapshots with counters):
5 rows from counters + 4 time-bounded queries (~500 rows each)
Per tick: 5 + 2,000 = 2,005 rows
288 ticks/day = 577,440 rows/day
Monthly: 17,323,200 rows
Cost: $0.00 (well within 25B included)

D1 writes (counter increments):
1,000 items/day * 3 counter updates each = 3,000 writes/day
Monthly: 90,000 writes
Cost: $0.00 (within 50M included)

KV writes + reads: same as above = $0.00

Total monitoring cost: ~$0.00/month

Approach 4: KV + Analytics Engine

Same KV costs as Approach 3: $0.00

Analytics Engine writes:
1,000 items/day * 2 data points each = 2,000 writes/day
Plus 288 cycle metrics + 100 error events = ~2,400/day
Monthly: 72,000 data points
Cost: $0.00 (within 10M included)

Analytics Engine queries:
100 dashboard loads/day * 3 queries each = 300 queries/day
Monthly: 9,000 queries
Cost: $0.00 (within 1M included)

Total monitoring cost: ~$0.00/month

Breakeven Analysis

At what scale does each approach start costing money?

ApproachFree UntilMonthly Cost at 10x Scale
Live D1 queries~8 days at 1-min interval$65.77
KV snapshots (no counters)~67 days of data growth$0.00
KV + counters~4 years of data growth$0.00
KV + counters + Analytics Engine~130K items/day$0.00
External (Datadog)Never free$15-23/month minimum

Key insight: With the hybrid approach (KV snapshots + counters + Analytics Engine), monitoring is effectively free for any pipeline processing under 100,000 items per day. The included allocations on the $5/month Workers plan cover everything.

The $5 Plan Budget

The Workers paid plan ($5/month) includes:

ResourceIncludedMonitoring Budget (5%)
D1 row reads25 billion1.25 billion
D1 row writes50 million2.5 million
KV readsPaid per use ($0.20/M)~50,000 reads = $0.01
KV writesPaid per use ($1.00/M)~17,000 writes = $0.017
AE data points10 million500,000
AE queries1 million50,000
Worker invocations10 million500,000

If your monitoring uses more than 5% of any resource, something is wrong with your monitoring design. Monitoring should be a rounding error on your infrastructure bill.


Production Checklist

Before deploying monitoring for a Workers pipeline, verify:


Full Working Example

This is a complete Worker with all patterns integrated — cron with KV snapshots, Hono dashboard routes, Analytics Engine instrumentation, and an activity feed.

import { Hono } from 'hono';
import { cors } from 'hono/cors';

// ── Types ───────────────────────────────────────────────────────

interface Env {
  DB: D1Database;
  KV: KVNamespace;
  METRICS: AnalyticsEngineDataset;
  ACCOUNT_ID: string;
  ANALYTICS_API_TOKEN: string;
}

interface HealthData {
  counts: Record<string, number>;
  velocity: Record<string, number>;
  costs: {
    today_usd: number;
    today_calls: number;
    rate_per_hour: number;
    rate_per_day_estimate: number;
    by_service: Record<string, { usd: number; calls: number }>;
  };
  errors: { last_hour: number; last_24h: number };
}

interface Snapshot {
  health: HealthData;
  taken_at: string;
}

// ── Hono App ────────────────────────────────────────────────────

const app = new Hono<{ Bindings: Env }>();
app.use('*', cors());

// Health — from KV
app.get('/api/health', async (c) => {
  const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
  if (!snapshot) return c.json({ status: 'initializing' }, 503);

  const age = Date.now() - new Date(snapshot.taken_at).getTime();
  c.header('Cache-Control', 'public, max-age=60');

  return c.json({
    status: age < 600_000 ? 'healthy' : 'stale',
    age_seconds: Math.round(age / 1000),
    health: snapshot.health,
    snapshot_at: snapshot.taken_at,
  });
});

// History — from KV
app.get('/api/health/history', async (c) => {
  const hours = Math.min(parseInt(c.req.query('hours') || '24'), 48);
  const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');
  if (!snapshots) return c.json({ snapshots: [], count: 0 });

  const cutoff = new Date(Date.now() - hours * 3600_000).toISOString();
  const filtered = snapshots.filter((s) => s.taken_at >= cutoff);

  return c.json({ snapshots: filtered, count: filtered.length, window_hours: hours });
});

// Activity — from D1 (bounded time window)
app.get('/api/activity', async (c) => {
  const minutes = Math.min(parseInt(c.req.query('minutes') || '5'), 60);

  const [calls, items] = await Promise.all([
    c.env.DB.prepare(`
      SELECT service, COUNT(*) as n, MAX(created_at) as last_at
      FROM api_calls
      WHERE created_at >= datetime('now', '-' || ? || ' minutes')
      GROUP BY service ORDER BY last_at DESC
    `).bind(minutes).all(),
    c.env.DB.prepare(`
      SELECT COUNT(*) as n FROM items
      WHERE created_at >= datetime('now', '-' || ? || ' minutes')
    `).bind(minutes).first<{ n: number }>(),
  ]);

  return c.json({
    window_minutes: minutes,
    new_items: items?.n ?? 0,
    api_calls: calls.results,
  });
});

// Metrics — from Analytics Engine
app.get('/api/metrics/throughput', async (c) => {
  const hours = Math.min(parseInt(c.req.query('hours') || '24'), 168);

  const sql = `
    SELECT toStartOfHour(timestamp) AS hour,
           SUM(_sample_interval * double1) AS discovered,
           SUM(_sample_interval * double2) AS enriched,
           SUM(_sample_interval * double4) AS cost_usd
    FROM pipeline_metrics
    WHERE blob1 = 'crawl_cycle'
      AND timestamp > NOW() - INTERVAL '${hours}' HOUR
    GROUP BY hour ORDER BY hour ASC
  `;

  const url = `https://api.cloudflare.com/client/v4/accounts/${c.env.ACCOUNT_ID}/analytics_engine/sql`;
  const res = await fetch(url, {
    method: 'POST',
    headers: { Authorization: `Bearer ${c.env.ANALYTICS_API_TOKEN}` },
    body: sql,
  });

  if (!res.ok) return c.json({ error: 'Analytics query failed' }, 502);
  return c.json(await res.json());
});

// ── Cron Handler ────────────────────────────────────────────────

async function handleScheduled(event: ScheduledEvent, env: Env): Promise<void> {
  if (event.cron === '* * * * *') {
    // Useful work every minute
    const start = Date.now();
    const result = await runCrawlCycle(env);

    // Record to Analytics Engine (fire-and-forget)
    env.METRICS.writeDataPoint({
      blobs: ['crawl_cycle', 'main', result.error ? 'error' : 'success'],
      doubles: [result.discovered, result.enriched, Date.now() - start, result.cost],
      indexes: ['main'],
    });

    // Health snapshot every 5 minutes
    if (new Date().getUTCMinutes() % 5 === 0) {
      await computeAndStoreSnapshot(env);
    }
  }
}

async function computeAndStoreSnapshot(env: Env): Promise<void> {
  // Read from counters table (cheap) + time-bounded queries
  const [counters, costToday, errorsHour, errors24h] = await Promise.all([
    env.DB.prepare('SELECT name, value FROM counters').all(),
    env.DB.prepare(
      "SELECT COALESCE(SUM(cost_usd),0) as cost, COUNT(*) as calls FROM api_calls WHERE created_at >= date('now')"
    ).first<{ cost: number; calls: number }>(),
    env.DB.prepare(
      "SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now','-1 hour')"
    ).first<{ n: number }>(),
    env.DB.prepare(
      "SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now','-24 hours')"
    ).first<{ n: number }>(),
  ]);

  const counts = Object.fromEntries(
    counters.results.map((r) => [r.name as string, r.value as number]),
  );

  const todayCost = costToday?.cost ?? 0;
  const health: HealthData = {
    counts,
    velocity: {
      enriched_last_hour: counts.enriched_last_hour ?? 0,
      items_last_24h: counts.items_last_24h ?? 0,
    },
    costs: {
      today_usd: Math.round(todayCost * 10000) / 10000,
      today_calls: costToday?.calls ?? 0,
      rate_per_hour: Math.round((todayCost / Math.max(new Date().getUTCHours(), 1)) * 10000) / 10000,
      rate_per_day_estimate: Math.round((todayCost / Math.max(new Date().getUTCHours(), 1)) * 24 * 10000) / 10000,
      by_service: {},
    },
    errors: {
      last_hour: errorsHour?.n ?? 0,
      last_24h: errors24h?.n ?? 0,
    },
  };

  const now = new Date().toISOString();
  const snapshot: Snapshot = { health, taken_at: now };

  await env.KV.put('health:latest', JSON.stringify(snapshot));

  const existing = await env.KV.get<Snapshot[]>('health:snapshots', 'json');
  const history = existing ?? [];
  history.push(snapshot);
  while (history.length > 288) history.shift();
  await env.KV.put('health:snapshots', JSON.stringify(history));
}

// Stub for illustration
async function runCrawlCycle(env: Env): Promise<{
  discovered: number; enriched: number; cost: number; error?: string
}> {
  return { discovered: 3, enriched: 1, cost: 0.005 };
}

// ── Export ───────────────────────────────────────────────────────

export default {
  fetch: app.fetch,
  scheduled: handleScheduled,
};

Wrangler Configuration

// wrangler.jsonc
{
  "name": "monitored-pipeline",
  "main": "src/index.ts",
  "compatibility_date": "2025-03-07",
  "compatibility_flags": ["nodejs_compat"],

  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "pipeline-db",
      "database_id": "your-db-id",
      "migrations_dir": "migrations"
    }
  ],

  "kv_namespaces": [
    {
      "binding": "KV",
      "id": "your-kv-id"
    }
  ],

  "analytics_engine_datasets": [
    {
      "binding": "METRICS",
      "dataset": "pipeline_metrics"
    }
  ],

  "vars": {
    "ACCOUNT_ID": "your-account-id"
  },

  "triggers": {
    "crons": [
      "* * * * *"
    ]
  }
}

Migration for Counters Table

-- migrations/0001_counters.sql
CREATE TABLE IF NOT EXISTS counters (
  name TEXT PRIMARY KEY,
  value INTEGER NOT NULL DEFAULT 0,
  updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

INSERT OR IGNORE INTO counters (name, value) VALUES
  ('total_items', 0),
  ('enriched_items', 0),
  ('pending_items', 0),
  ('total_keywords', 0),
  ('total_errors_today', 0);

-- Indexes for time-bounded queries
CREATE INDEX IF NOT EXISTS idx_api_calls_created_at ON api_calls(created_at);
CREATE INDEX IF NOT EXISTS idx_error_log_created_at ON error_log(created_at);
CREATE INDEX IF NOT EXISTS idx_items_created_at ON items(created_at);
CREATE INDEX IF NOT EXISTS idx_items_enriched_at ON items(enriched_at);

Monitoring Monitoring

A monitoring system that fails silently is worse than no monitoring. Here is how to monitor your monitoring.

Staleness Detection

The simplest check: is the latest snapshot too old?

app.get('/api/health/meta', async (c) => {
  const latest = await c.env.KV.get<Snapshot>('health:latest', 'json');
  const history = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');

  const now = Date.now();
  const latestAge = latest
    ? Math.round((now - new Date(latest.taken_at).getTime()) / 1000)
    : null;

  return c.json({
    latest_snapshot_age_seconds: latestAge,
    latest_snapshot_at: latest?.taken_at ?? null,
    history_count: history?.length ?? 0,
    history_oldest: history?.[0]?.taken_at ?? null,
    history_newest: history?.[history.length - 1]?.taken_at ?? null,
    monitoring_status: latestAge === null
      ? 'no_data'
      : latestAge < 600
        ? 'healthy'
        : latestAge < 1800
          ? 'delayed'
          : 'down',
  });
});

External Ping

Use an external uptime service (Better Uptime, UptimeRobot, or even a simple cron on another machine) to hit your /ping endpoint every 5 minutes. If it stops responding or reports cron_active: false, alert.

KV Write Failures

KV writes can fail silently if the value exceeds the 25 MB limit. Log the size of your snapshots array periodically:

async function computeAndStoreSnapshot(env: Env): Promise<void> {
  // ... compute health ...

  const history = (await env.KV.get<Snapshot[]>('health:snapshots', 'json')) ?? [];
  history.push(snapshot);
  while (history.length > 288) history.shift();

  const serialized = JSON.stringify(history);
  const sizeKB = Math.round(serialized.length / 1024);

  if (sizeKB > 20_000) {
    console.warn(`[monitoring] Snapshot history is ${sizeKB} KB — approaching 25 MB KV limit`);
  }

  await env.KV.put('health:snapshots', serialized);
  await env.KV.put('health:latest', JSON.stringify(snapshot));

  // Record monitoring health to Analytics Engine
  env.METRICS.writeDataPoint({
    blobs: ['monitoring', 'snapshot', 'success'],
    doubles: [history.length, sizeKB],
    indexes: ['monitoring'],
  });
}

Decision Framework

Use this flowchart to decide where to store your monitoring data:

Do you need to query arbitrary time ranges?
├── YES → Use Analytics Engine
│         ├── Need retention > 92 days? → Also roll up daily to D1
│         └── Need real-time? → Also use KV for latest snapshot
└── NO
    ├── Do you need current state only?
    │   └── YES → Use KV (health:latest)
    └── Do you need short-term history (24-48h)?
        └── YES → Use KV (health:snapshots array)
            └── Also need activity feed? → Use D1 with bounded time queries

The answer for most Workers pipelines is: KV for current state + short-term history, Analytics Engine for trending, D1 for activity feeds only.


References

Official Cloudflare Documentation

Cloudflare Blog Posts

External Tools


Built from production experience running continuous Workers pipelines. The patterns in this article eliminated $65+/month in D1 overages and reduced monitoring cost to effectively $0.


Edit page
Share this post on:

Previous Post
Event-Driven Architecture on Cloudflare Queues
Next Post
Autonomy Is Substrate Discipline