Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28
Worker Analytics — Lightweight Monitoring for Continuous Cloudflare Workers Pipelines
When you run continuous processes on Cloudflare Workers — cron jobs crawling data every minute, queue consumers enriching records, Durable Objects coordinating agent loops — you need to know what is happening. How many items processed? What is the error rate? How much are API calls costing? Is throughput increasing or flatlined? The naive answer is to write monitoring data to your D1 database. That answer will cost you $19.63 in a single billing cycle when your 21 COUNT(*) queries run every minute against a 6.4 GB database and generate 19.6 billion row reads.
This article shows how to build operational monitoring for Workers pipelines that costs essentially nothing. The core idea: use each Cloudflare storage primitive for what it is good at. D1 for transactional data. KV for operational snapshots. Analytics Engine for long-term time-series. Never COUNT(*) on a large table from a cron.
What you’ll learn:
- Why D1 is the wrong place for time-series monitoring data
- How to use KV rolling snapshots to serve dashboards with zero D1 reads
- When Analytics Engine is the right choice for metrics collection
- The hybrid storage approach: D1 + KV + Analytics Engine, each in its lane
- How to design health snapshots that are cheap to compute and cheap to read
- How to build Hono API routes that read from KV instead of D1
- Cost math comparing D1, KV, and Analytics Engine for monitoring workloads
- Anti-patterns that turn your monitoring into your biggest line item
Table of Contents
Open Table of Contents
- The Problem
- The Incident That Started Everything
- Core Concepts
- Pattern 1: KV Rolling Snapshots
- Pattern 2: Health Snapshot Design
- Pattern 3: Dashboard Routes from KV
- Pattern 4: Analytics Engine for Time-Series
- Pattern 5: The Hybrid Storage Approach
- Pattern 6: Activity Feed Pattern
- Pattern 7: Precompute on Write, Not on Read
- Small Examples
- Example 1: Minimal Health Check Endpoint
- Example 2: Recording Queue Consumer Metrics
- Example 3: Durable Object Health Reporter
- Example 4: Cost Alerting with Analytics Engine
- Example 5: Multi-Pipeline Health Aggregator
- Example 6: Snapshot Diff — What Changed Since Last Check
- Example 7: KV Snapshot with TTL Auto-Cleanup
- Example 8: Analytics Engine Dashboard Route
- Example 9: Composite Health Score
- Example 10: Lightweight Uptime Ping
- Comparisons
- Anti-Patterns
- Cost Math
- Production Checklist
- Full Working Example
- Monitoring Monitoring
- Decision Framework
- References
The Problem
Continuous Workers Need Monitoring
A traditional request-response Worker is simple to monitor. It handles a request, returns a response, and Cloudflare’s built-in analytics show you request counts, error rates, and CPU time. You look at the dashboard, everything is fine.
But modern Workers architectures are not request-response. They are continuous systems:
- Cron triggers running every minute to crawl external APIs, sync data, compute aggregates
- Queue consumers processing messages in batches — enrichment pipelines, content publishing, notification dispatch
- Durable Objects maintaining state across requests — rate limiters, coordinators, budget governors
- Workflows executing multi-step durable pipelines — research chains, approval flows, data transformation
These systems run 24/7. They process thousands of items per day. They call external APIs that cost money. They accumulate data in D1 databases that grow from megabytes to gigabytes. And Cloudflare’s built-in Worker analytics tell you almost nothing about what they are actually doing.
The built-in dashboard shows:
- Total requests (but not “items enriched” or “crawl cycles completed”)
- CPU time (but not “API dollars spent” or “queue messages processed”)
- Error rate (but not “which agent failed on which stage”)
You need application-level monitoring. You need to know the shape of your pipeline’s work over time.
The Naive Approach Fails
The obvious solution: add monitoring queries to your cron. Every time the cron fires, run some COUNT(*) queries against your D1 tables, build a health object, and store it somewhere. You want to know how many apps you have, how many are enriched, how many keywords are pending, what the API cost trend looks like.
// The approach that seems reasonable but isn't
async function computeHealth(db: D1Database): Promise<HealthSnapshot> {
const [totalApps, enrichedApps, pendingApps, totalKeywords] = await Promise.all([
db.prepare('SELECT COUNT(*) as n FROM app_registry').first<{ n: number }>(),
db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE enrichment_source = 'scraper'").first<{ n: number }>(),
db.prepare('SELECT COUNT(*) as n FROM app_registry WHERE enriched_at IS NULL').first<{ n: number }>(),
db.prepare('SELECT COUNT(*) as n FROM crawl_keywords').first<{ n: number }>(),
// ... 17 more queries
]);
return {
apps: { total: totalApps?.n ?? 0, enriched: enrichedApps?.n ?? 0 },
// ...
};
}
This looks fine. Each query is simple. They run in parallel. The results are useful. What could go wrong?
Everything, once the database grows.
D1’s Row Read Model
D1 charges by rows read, not by queries executed. A COUNT(*) on a table with 100 rows reads 100 rows. A COUNT(*) on a table with 500,000 rows reads 500,000 rows. There is no shortcut — D1 (SQLite under the hood) does a full table scan for COUNT(*) unless you have a covering index that matches the WHERE clause.
The math gets ugly fast:
- 21 queries per cron tick
- Each query scans a portion of a database with 500,000+ total rows
- Average rows read per tick: ~2,000,000
- Ticks per day: 1,440 (every minute)
- Rows read per day: 2.88 billion
- Rows read per week: 20.16 billion
- D1 included free: 25 billion per month
A single week of monitoring can consume 80% of your monthly row read allowance. Two weeks and you are paying overages.
Key insight:
COUNT(*)is not free on D1. Every row in the table is a row read. RunningCOUNT(*)on a cron is a time bomb that detonates proportionally to your data growth.
What We Actually Need
The monitoring system for a Workers pipeline needs to answer five questions:
- Is it running? — When was the last successful cycle? Is the cron firing?
- What is the throughput? — How many items processed in the last hour, last 24 hours?
- What are the costs? — How much API spend today? What is the burn rate?
- Are there errors? — Which agents failed? Which stages are stuck?
- What is the trend? — Is throughput increasing or decreasing over the last 24 hours?
None of these questions require full table scans. Most of them can be answered from a precomputed snapshot. And the snapshot itself does not need to live in D1.
The Incident That Started Everything
This is the short version. The full incident analysis is in Cost Observability for Cloudflare Workers.
What Happened
A project tracking App Store rankings had a Worker with this cron configuration:
// wrangler.jsonc
{
"triggers": {
"crons": ["* * * * *"] // Every minute
}
}
The cron handler did two things:
- Useful work — crawl app store categories, discover new apps, check for pending keywords
- Health snapshot — run 21
COUNT(*)queries acrossapp_registry,crawl_keywords,app_reviews,agent_ledger, andapi_calls_ledger, then write results to ascan_snapshotstable in D1
The health snapshot ran every single minute. The database grew to 6.4 GB with over 500,000 rows across tables. After six days:
- 19.6 billion row reads consumed
- $19.63 in D1 overages on an account where the included 25 billion reads should last a full month
- Zero alerts — nobody noticed until checking the billing dashboard
The Root Cause
The monitoring system was more expensive than the system it was monitoring. The health snapshot function was doing full table scans on every invocation:
// Each of these is a full table scan on a table with 100,000+ rows
db.prepare('SELECT COUNT(*) as n FROM app_registry') // 160,000 rows read
db.prepare('SELECT COUNT(*) as n FROM crawl_keywords') // 45,000 rows read
db.prepare('SELECT COUNT(*) as n FROM app_reviews') // 280,000 rows read
db.prepare('SELECT COUNT(*) as n FROM api_calls_ledger') // 50,000 rows read
// ... plus 17 more queries with various WHERE clauses
Running 21 queries that scan ~2 million rows, 1,440 times per day, for six days = ~17 billion row reads from monitoring alone.
The Fix
Two changes eliminated the problem:
- Reduce frequency — Health snapshots every 5 minutes instead of every minute (12x reduction)
- Move storage to KV — Write the snapshot to KV instead of D1. Read from KV for the dashboard. Zero additional D1 reads for serving the monitoring data.
The D1 queries still run to compute the snapshot, but 288 times per day instead of 1,440. And the computed result lives in KV where reading it costs $0.20 per million reads instead of scanning the entire database again.
Core Concepts
Storage Primitives for Monitoring
Cloudflare gives you three storage primitives relevant to monitoring, and each has a different cost model and access pattern.
interface StoragePrimitive {
name: 'D1' | 'KV' | 'AnalyticsEngine';
bestFor: string;
costModel: string;
readLatency: string;
writeLatency: string;
queryCapability: string;
}
const primitives: StoragePrimitive[] = [
{
name: 'D1',
bestFor: 'Transactional data — app records, keywords, user accounts',
costModel: 'Per row read/written — $0.001/M reads, $1.00/M writes',
readLatency: '~5-20ms (SQLite)',
writeLatency: '~10-30ms',
queryCapability: 'Full SQL — joins, aggregates, window functions',
},
{
name: 'KV',
bestFor: 'Key-value lookups — config, snapshots, cached results',
costModel: 'Per operation — $0.20/M reads, $1.00/M writes',
readLatency: '~1-5ms (edge cached)',
writeLatency: '~1-5ms (eventually consistent, ~60s propagation)',
queryCapability: 'None — get by key only, list by prefix',
},
{
name: 'AnalyticsEngine',
bestFor: 'Time-series metrics — counters, gauges, histograms',
costModel: '$0.25/M data points written, $1.00/M queries',
readLatency: '~50-200ms (SQL API over HTTP)',
writeLatency: '~0ms (fire-and-forget, no await needed)',
queryCapability: 'SQL subset — aggregates, time bucketing, no joins',
},
];
Key insight: The right storage for monitoring data depends on the access pattern. If you are writing metrics and reading dashboards, you want KV or Analytics Engine. D1 is for your application data, not your monitoring data.
The Snapshot Pattern
The core pattern in this article is the precomputed snapshot. Instead of computing monitoring data on every dashboard load (read-time computation), you compute it on a schedule and store the result (write-time computation).
interface HealthSnapshot {
taken_at: string; // ISO timestamp
health: {
counts: Record<string, number>; // Current totals
velocity: Record<string, number>; // Items per time window
costs: {
today: { usd: number; calls: number };
rate: { per_hour: number; per_day_estimate: number };
};
agents: Record<string, AgentStatus>; // Agent-specific stats
errors: { last_hour: number; last_24h: number };
};
}
// Write-time: compute once, store cheaply
// Read-time: return instantly from KV
The tradeoff is freshness. A snapshot computed every 5 minutes means your dashboard data can be up to 5 minutes stale. For operational monitoring of a data pipeline, this is fine. You do not need sub-second freshness for “how many apps have been enriched.”
KV As a Metrics Store
KV is not designed to be a metrics store. It has no query language, no aggregation, no time bucketing. But for the specific use case of “store the latest health snapshot and serve it to a dashboard,” it is perfect:
- Writes are cheap — $1.00 per million writes. At 288 writes per day (5-minute intervals), you use 8,640 writes per month. That is 0.86% of one million.
- Reads are cheap — $0.20 per million reads. Even if you hit the dashboard 1,000 times a day, that is 30,000 reads per month.
- Reads are fast — KV values are cached at the edge. A dashboard loading from KV gets sub-5ms response times.
- No query cost — Unlike D1, there is no concept of “rows scanned.” You pay per key read, regardless of the value size.
// Writing a snapshot to KV — 1 write operation
await kv.put('health:latest', JSON.stringify(snapshot));
// Reading a snapshot from KV — 1 read operation
const snapshot = await kv.get<HealthSnapshot>('health:latest', 'json');
Compare this with the D1 approach: reading the same data from D1 means re-running 21 queries that scan millions of rows. The KV approach costs one read operation. Period.
Analytics Engine As a Time-Series Store
Workers Analytics Engine is Cloudflare’s built-in time-series database. It was designed for exactly this use case: recording events from Workers and querying them over time.
// Writing a data point — fire and forget, no await needed
env.METRICS.writeDataPoint({
blobs: ['enrichment', 'app_registry', 'success'], // dimensions
doubles: [1, 0.023, 45], // count, cost_usd, duration_ms
indexes: ['aso-mrr'], // sampling key
});
Analytics Engine gives you:
- Free writes — 10 million data points per month included on the paid plan (not yet billed as of early 2026)
- SQL queries — Real aggregation, time bucketing, filtering by dimensions
- Automatic timestamps — Every data point gets a timestamp, no schema management
- 92-day retention — Data is kept for approximately 3 months
- Zero-latency writes —
writeDataPoint()returns immediately, the runtime handles persistence in the background
The limitation is that you cannot query Analytics Engine from a binding. You must call the SQL API over HTTP, which means an external fetch with authentication.
Pattern 1: KV Rolling Snapshots
The Pattern
Store health data in two KV keys:
health:latest— The most recent snapshot, overwritten every cyclehealth:snapshots— A rolling array of recent snapshots, trimmed to a fixed window
This gives you both “what is the current state?” (latest) and “what has the trend been?” (snapshots array) without any D1 reads on the dashboard side.
When to Use It
- You need a dashboard showing current pipeline health
- You want trend data for the last 24-48 hours
- You do not need arbitrary time-range queries or SQL aggregation
- You want zero D1 reads when loading the dashboard
The Write Side
This is the function that runs on the cron. It computes the snapshot from D1 (this is the only place D1 reads happen for monitoring), then writes the result to KV.
interface Env {
DB: D1Database;
KV: KVNamespace;
}
interface HealthData {
counts: {
total_items: number;
enriched_items: number;
pending_items: number;
total_keywords: number;
};
velocity: {
items_last_hour: number;
items_last_24h: number;
enriched_last_hour: number;
enriched_last_24h: number;
est_days_to_completion: number | null;
};
costs: {
today_usd: number;
today_calls: number;
last_24h_usd: number;
rate_per_hour: number;
rate_per_day_estimate: number;
by_service: Record<string, { usd: number; calls: number }>;
};
errors: {
last_hour: number;
last_24h: number;
};
}
interface Snapshot {
health: HealthData;
taken_at: string;
}
async function computeAndStoreHealthSnapshot(
db: D1Database,
kv: KVNamespace,
): Promise<HealthData> {
// ── Compute the snapshot from D1 ──────────────────────────────
// This is the ONLY place we run COUNT(*) queries.
// It runs every 5 minutes, not every minute.
// It runs on the cron, not on dashboard load.
const [
totalItems,
enrichedItems,
pendingItems,
totalKeywords,
itemsLastHour,
itemsLast24h,
enrichedLastHour,
enrichedLast24h,
costToday,
costLast24h,
costByService,
errorsLastHour,
errorsLast24h,
] = await Promise.all([
db.prepare('SELECT COUNT(*) as n FROM items').first<{ n: number }>(),
db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'enriched'")
.first<{ n: number }>(),
db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'pending'")
.first<{ n: number }>(),
db.prepare('SELECT COUNT(*) as n FROM keywords').first<{ n: number }>(),
db.prepare(
"SELECT COUNT(*) as n FROM items WHERE created_at >= datetime('now', '-1 hour')",
).first<{ n: number }>(),
db.prepare(
"SELECT COUNT(*) as n FROM items WHERE created_at >= datetime('now', '-24 hours')",
).first<{ n: number }>(),
db.prepare(
"SELECT COUNT(*) as n FROM items WHERE enriched_at >= datetime('now', '-1 hour')",
).first<{ n: number }>(),
db.prepare(
"SELECT COUNT(*) as n FROM items WHERE enriched_at >= datetime('now', '-24 hours')",
).first<{ n: number }>(),
db.prepare(
"SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls WHERE created_at >= date('now')",
).first<{ cost: number; calls: number }>(),
db.prepare(
"SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls WHERE created_at >= datetime('now', '-24 hours')",
).first<{ cost: number; calls: number }>(),
db.prepare(`
SELECT service,
COALESCE(SUM(cost_usd), 0) as cost,
COUNT(*) as calls
FROM api_calls
WHERE created_at >= date('now')
GROUP BY service
ORDER BY cost DESC
`).all(),
db.prepare(
"SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now', '-1 hour')",
).first<{ n: number }>(),
db.prepare(
"SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now', '-24 hours')",
).first<{ n: number }>(),
]);
const last24hCost = costLast24h?.cost ?? 0;
const avgCostPerHour = last24hCost / 24;
const health: HealthData = {
counts: {
total_items: totalItems?.n ?? 0,
enriched_items: enrichedItems?.n ?? 0,
pending_items: pendingItems?.n ?? 0,
total_keywords: totalKeywords?.n ?? 0,
},
velocity: {
items_last_hour: itemsLastHour?.n ?? 0,
items_last_24h: itemsLast24h?.n ?? 0,
enriched_last_hour: enrichedLastHour?.n ?? 0,
enriched_last_24h: enrichedLast24h?.n ?? 0,
est_days_to_completion:
(enrichedLast24h?.n ?? 0) > 0
? Math.round((pendingItems?.n ?? 0) / (enrichedLast24h?.n ?? 1))
: null,
},
costs: {
today_usd: round4(costToday?.cost ?? 0),
today_calls: costToday?.calls ?? 0,
last_24h_usd: round4(last24hCost),
rate_per_hour: round4(avgCostPerHour),
rate_per_day_estimate: round4(avgCostPerHour * 24),
by_service: Object.fromEntries(
costByService.results.map((r) => [
r.service as string,
{ usd: round4(r.cost as number), calls: r.calls as number },
]),
),
},
errors: {
last_hour: errorsLastHour?.n ?? 0,
last_24h: errorsLast24h?.n ?? 0,
},
};
// ── Write to KV ───────────────────────────────────────────────
const now = new Date().toISOString();
const snapshot: Snapshot = { health, taken_at: now };
// Latest — always overwrite
await kv.put('health:latest', JSON.stringify(snapshot));
// Rolling history — keep last 288 entries (~24h at 5-min intervals)
const existing = await kv.get<Snapshot[]>('health:snapshots', 'json');
const history = existing ?? [];
history.push(snapshot);
// Trim to window
while (history.length > 288) history.shift();
await kv.put('health:snapshots', JSON.stringify(history));
return health;
}
function round4(n: number): number {
return Math.round(n * 10000) / 10000;
}
The Cron Handler
The snapshot computation runs inside the cron handler, but not on every tick. Every 5 minutes is enough for operational monitoring.
export default {
async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
if (event.cron === '* * * * *') {
// Useful work — runs every minute
ctx.waitUntil(runCrawlCycle(env));
// Health snapshot — runs every 5 minutes
const minute = new Date().getUTCMinutes();
if (minute % 5 === 0) {
ctx.waitUntil(
computeAndStoreHealthSnapshot(env.DB, env.KV).catch((e) =>
console.error('[cron] health snapshot failed:', e),
),
);
}
}
},
fetch: app.fetch,
};
Why 288 Entries
The magic number 288 comes from 24 hours divided by 5-minute intervals:
24 hours * 60 minutes / 5-minute interval = 288 snapshots
This gives you a complete 24-hour rolling window. Each snapshot is roughly 1-3 KB of JSON, so the full array is 300-900 KB — well within KV’s 25 MB value limit.
If you want 48 hours of history, use 576. If you want a week, use 2,016. The limiting factor is the KV value size, not the cost.
Gotchas
-
KV eventual consistency — KV writes propagate globally in roughly 60 seconds. If you have dashboards in multiple regions, a snapshot written in US-East might take a minute to be visible in EU-West. For operational monitoring, this is irrelevant.
-
JSON array manipulation — Reading the full
health:snapshotsarray, appending to it, and writing it back is not atomic. If two cron invocations overlap (unlikely at 5-minute intervals, but possible), you could lose a snapshot. This does not matter for monitoring data. -
KV value size limit — A single KV value can be up to 25 MB. With 3 KB snapshots, you can store ~8,000 entries (about 28 days at 5-minute intervals). If your snapshot JSON is large, keep the window smaller.
-
No query capability — You cannot ask KV “what was the average throughput between 2pm and 4pm yesterday.” The snapshots array is just JSON — you filter it client-side or in your Worker route. For more sophisticated queries, use Analytics Engine.
Pattern 2: Health Snapshot Design
The Pattern
Design your health snapshot to capture everything a human operator needs to assess pipeline health in a single read. Structure it as a JSON object with predictable sections: counts, velocity, costs, agents, errors.
When to Use It
- You are building a dashboard for a data processing pipeline
- You want to answer “is everything OK?” with one API call
- Multiple subsystems contribute to the health picture (crawlers, enrichers, agents)
Snapshot Shape
The health snapshot from a real production system (the ASO MRR pipeline) includes these sections:
interface ProductionHealthSnapshot {
// ── Inventory ─────────────────────────────────────────────────
apps: {
total: number;
enriched: number;
pending: number;
pct_enriched: number; // Precomputed percentage
};
keywords: {
total: number;
pending: number;
};
clusters: number;
reviews: {
total: number;
apps_covered: number;
};
// ── Velocity ──────────────────────────────────────────────────
velocity: {
apps_last_hour: number;
apps_last_24h: number;
keywords_last_hour: number;
keywords_last_24h: number;
enriched_last_hour: number;
enriched_last_24h: number;
reviews_last_24h: number;
est_days_to_full_enrichment: number | null; // null if enrichment is paused
};
// ── Agent Status ──────────────────────────────────────────────
agents: Record<string, {
stage: string;
processed: number;
last_run: string;
avg_duration_ms: number;
error_rate: number;
}>;
// ── Costs ─────────────────────────────────────────────────────
costs: {
all_time: { usd: number; calls: number };
today: { usd: number; calls: number };
last_24h: { usd: number; calls: number };
rate: {
per_hour: number;
per_day_estimate: number;
};
by_service: Record<string, {
all_time: number;
today: number;
calls: number;
calls_today: number;
}>;
hourly_trend: Array<{
hour: string;
usd: number;
calls: number;
}>;
};
}
Design Principles
1. Precompute derived values
Do not store raw data and compute percentages on the dashboard. Compute them in the snapshot:
// Good — precomputed
pct_enriched: totalApps?.n
? Math.round(((enrichedApps?.n ?? 0) / totalApps.n) * 100)
: 0,
// Bad — forces the dashboard to recompute
// pct_enriched: undefined (let the frontend do the math)
2. Include time windows, not just totals
Totals tell you “how much exists.” Velocities tell you “how fast things are moving.” Both are essential:
velocity: {
apps_last_hour: 12, // 12 new apps discovered in the last hour
apps_last_24h: 287, // 287 new apps in the last day
enriched_last_hour: 5, // 5 apps enriched in the last hour
enriched_last_24h: 120, // 120 enriched in the last day
}
3. Include cost projections
Knowing “we spent $0.12 today” is less useful than “at this rate we’ll spend $2.88 by end of day”:
rate: {
per_hour: 0.005, // $0.005/hour current burn rate
per_day_estimate: 0.12, // projected daily spend
}
4. Include ETA calculations
For batch processing pipelines, the most important number is “when will it be done?”:
est_days_to_full_enrichment:
enrichedLast24h > 0
? Math.round(pendingItems / enrichedLast24h)
: null, // null = enrichment is paused, can't estimate
5. Round monetary values
Floating point math produces numbers like 0.023499999999999997. Round to 4 decimal places for display:
function round4(n: number): number {
return Math.round(n * 10000) / 10000;
}
// $0.0235 not $0.023499999999999997
costs.today.usd = round4(rawCost);
Minimizing D1 Cost of Snapshot Computation
Even though the snapshot runs only every 5 minutes, each computation still hits D1 with multiple queries. Here are techniques to reduce the cost:
Use indexed columns in WHERE clauses
-- Expensive: full table scan
SELECT COUNT(*) FROM app_registry WHERE enrichment_source = 'scraper'
-- Cheaper: if enrichment_source is indexed, D1 uses the index
CREATE INDEX idx_enrichment_source ON app_registry(enrichment_source);
Use time-bounded queries with indexed timestamps
-- Cheaper than COUNT(*) on the whole table
SELECT COUNT(*) FROM app_registry
WHERE created_at >= datetime('now', '-1 hour')
-- Only works well if created_at is indexed
CREATE INDEX idx_created_at ON app_registry(created_at);
Batch related queries
// Instead of 21 individual queries, use compound queries where possible
const stats = await db.prepare(`
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status = 'enriched' THEN 1 END) as enriched,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN created_at >= datetime('now', '-1 hour') THEN 1 END) as last_hour,
COUNT(CASE WHEN created_at >= datetime('now', '-24 hours') THEN 1 END) as last_24h
FROM items
`).first();
Key insight: A single query with conditional
COUNTexpressions scans the table once instead of five times. FiveCOUNT(*)queries with differentWHEREclauses = five full table scans. One query with fiveCASE WHENexpressions = one full table scan.
Consider maintaining a counters table
For the most expensive counts (total items, total enriched), maintain a separate counters table that is updated on writes:
// On every item insert
await db.batch([
db.prepare('INSERT INTO items (id, name, status) VALUES (?, ?, ?)').bind(id, name, 'pending'),
db.prepare("UPDATE counters SET value = value + 1 WHERE name = 'total_items'"),
]);
// On every enrichment
await db.batch([
db.prepare("UPDATE items SET status = 'enriched', enriched_at = datetime('now') WHERE id = ?").bind(id),
db.prepare("UPDATE counters SET value = value + 1 WHERE name = 'enriched_items'"),
]);
// In the snapshot — 1 row read instead of 500,000
const counts = await db.prepare('SELECT name, value FROM counters').all();
This is the precompute-on-write pattern applied to D1 internals.
Pattern 3: Dashboard Routes from KV
The Pattern
Build Hono API routes that serve monitoring data from KV instead of running live D1 queries. The dashboard calls these routes, gets instant responses, and never touches D1.
When to Use It
- You have a web dashboard or CLI tool that displays pipeline health
- You want sub-5ms response times for monitoring endpoints
- You want to serve monitoring data without consuming D1 row reads
Route Structure
import { Hono } from 'hono';
interface Env {
DB: D1Database;
KV: KVNamespace;
}
const app = new Hono<{ Bindings: Env }>();
// ── Latest Health (from KV) ─────────────────────────────────────
// Cost: 1 KV read = $0.0000002
app.get('/api/health', async (c) => {
const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
if (!snapshot) {
return c.json({ error: 'No snapshot available yet' }, 404);
}
return c.json({
health: snapshot.health,
snapshot_at: snapshot.taken_at,
freshness_seconds: Math.round(
(Date.now() - new Date(snapshot.taken_at).getTime()) / 1000,
),
});
});
// ── Snapshot History (from KV) ──────────────────────────────────
// Cost: 1 KV read = $0.0000002
app.get('/api/health/history', async (c) => {
const hours = parseInt(c.req.query('hours') || '24');
const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');
if (!snapshots) {
return c.json({ snapshots: [], count: 0 });
}
const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000).toISOString();
const filtered = snapshots.filter((s) => s.taken_at >= cutoff);
return c.json({
snapshots: filtered,
count: filtered.length,
window_hours: hours,
});
});
// ── Velocity Chart Data (from KV) ──────────────────────────────
// Transforms snapshot history into chart-ready time series
app.get('/api/health/velocity', async (c) => {
const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');
if (!snapshots || snapshots.length === 0) {
return c.json({ series: [] });
}
const series = snapshots.map((s) => ({
time: s.taken_at,
items_per_hour: s.health.velocity.items_last_hour,
enriched_per_hour: s.health.velocity.enriched_last_hour,
cost_per_hour: s.health.costs.rate_per_hour,
}));
return c.json({ series });
});
// ── Cost Breakdown (from KV) ────────────────────────────────────
app.get('/api/health/costs', async (c) => {
const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
if (!snapshot) {
return c.json({ error: 'No snapshot available' }, 404);
}
return c.json({
today: snapshot.health.costs.today,
last_24h: snapshot.health.costs.last_24h,
rate: snapshot.health.costs.rate,
by_service: snapshot.health.costs.by_service,
hourly_trend: snapshot.health.costs.hourly_trend,
snapshot_at: snapshot.taken_at,
});
});
export default app;
Adding Freshness Indicators
Stale data is dangerous. If the cron stops firing, the dashboard should tell you the data is old:
app.get('/api/health', async (c) => {
const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
if (!snapshot) {
return c.json({ status: 'no_data', error: 'No snapshot available' }, 404);
}
const ageSeconds = Math.round(
(Date.now() - new Date(snapshot.taken_at).getTime()) / 1000,
);
// Stale if more than 10 minutes old (2x the 5-minute interval)
const status = ageSeconds > 600 ? 'stale' : 'fresh';
// Very stale if more than 30 minutes old (cron probably stopped)
const warning = ageSeconds > 1800
? 'Health snapshot is over 30 minutes old. Check if the cron is running.'
: undefined;
return c.json({
status,
warning,
health: snapshot.health,
snapshot_at: snapshot.taken_at,
age_seconds: ageSeconds,
});
});
Response Headers for Caching
Since the data is already precomputed and at most 5 minutes stale, you can add cache headers to reduce even KV reads:
app.get('/api/health', async (c) => {
const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
if (!snapshot) return c.json({ error: 'No data' }, 404);
// Cache for 60 seconds — the data only changes every 5 minutes anyway
c.header('Cache-Control', 'public, max-age=60, s-maxage=60');
c.header('CDN-Cache-Control', 'max-age=60');
return c.json({
health: snapshot.health,
snapshot_at: snapshot.taken_at,
});
});
Gotchas
-
404 on first deploy — The very first time you deploy, there is no snapshot in KV yet. The cron has not fired. Handle the 404 gracefully in your dashboard.
-
Large snapshot arrays — If your snapshot JSON is 3 KB and you have 288 of them, the
health:snapshotsvalue is ~860 KB. KV reads are billed per operation regardless of value size, so this is fine cost-wise, but it means every history request downloads the full array. Consider client-side caching. -
No partial reads — You cannot read “just the last 6 hours” from KV. You always read the full array and filter in the Worker. If the array is large, this adds latency. The alternative is to split into multiple keys (one per hour), but that complicates the write side.
Pattern 4: Analytics Engine for Time-Series
The Pattern
Use Workers Analytics Engine to record individual events as data points. Query them with the SQL API for dashboards, alerts, or long-term trending.
When to Use It
- You need to query arbitrary time ranges (“show me throughput between 2pm and 4pm last Tuesday”)
- You want SQL aggregation over time-series data (averages, percentiles, bucketing)
- You need data retention beyond 24-48 hours (Analytics Engine retains ~92 days)
- You want to build Grafana dashboards with custom queries
Binding Configuration
// wrangler.jsonc
{
"analytics_engine_datasets": [
{
"binding": "METRICS",
"dataset": "pipeline_metrics"
}
]
}
Writing Data Points
Analytics Engine data points have three fields: blobs (string dimensions), doubles (numeric values), and indexes (sampling keys).
interface Env {
METRICS: AnalyticsEngineDataset;
}
// Record a crawl cycle completion
function recordCrawlCycle(env: Env, stats: CrawlStats) {
env.METRICS.writeDataPoint({
blobs: [
'crawl_cycle', // blob1: event type
stats.source, // blob2: data source
stats.status, // blob3: success/failure
],
doubles: [
stats.items_discovered, // double1: count of new items
stats.items_enriched, // double2: count enriched
stats.duration_ms, // double3: cycle duration
stats.api_cost_usd, // double4: API cost
],
indexes: [stats.pipeline_id], // sampling key
});
}
// Record an API call with cost
function recordApiCall(env: Env, service: string, cost: number, success: boolean) {
env.METRICS.writeDataPoint({
blobs: [
'api_call', // blob1: event type
service, // blob2: service name (e.g., 'dataforseo')
success ? 'success' : 'error', // blob3: outcome
],
doubles: [
1, // double1: count (always 1 for individual events)
cost, // double2: cost in USD
],
indexes: [service],
});
}
// Record an error
function recordError(env: Env, component: string, errorType: string) {
env.METRICS.writeDataPoint({
blobs: [
'error', // blob1: event type
component, // blob2: which component failed
errorType, // blob3: error classification
],
doubles: [1], // double1: count
indexes: [component],
});
}
Key insight:
writeDataPoint()does not need to be awaited. It returns immediately and the Workers runtime persists the data point asynchronously. This means zero impact on your Worker’s response time or CPU budget.
Querying from a Worker
To query Analytics Engine from inside a Worker, you call the SQL API over HTTP. This requires your account ID and an API token with Analytics Read permissions.
interface Env {
ACCOUNT_ID: string;
ANALYTICS_API_TOKEN: string;
}
async function queryAnalyticsEngine(
env: Env,
sql: string,
): Promise<AnalyticsQueryResult> {
const url = `https://api.cloudflare.com/client/v4/accounts/${env.ACCOUNT_ID}/analytics_engine/sql`;
const response = await fetch(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${env.ANALYTICS_API_TOKEN}`,
},
body: sql,
});
if (!response.ok) {
const text = await response.text();
throw new Error(`Analytics Engine query failed: ${response.status} ${text}`);
}
return response.json();
}
interface AnalyticsQueryResult {
data: Record<string, unknown>[];
meta: Array<{ name: string; type: string }>;
rows: number;
}
Example Queries
Throughput by hour for the last 24 hours:
SELECT
toStartOfHour(timestamp) AS hour,
SUM(_sample_interval * double1) AS items_discovered,
SUM(_sample_interval * double2) AS items_enriched
FROM pipeline_metrics
WHERE
blob1 = 'crawl_cycle'
AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY hour
ORDER BY hour DESC
API cost by service for today:
SELECT
blob2 AS service,
SUM(_sample_interval * double2) AS total_cost_usd,
SUM(_sample_interval * double1) AS total_calls
FROM pipeline_metrics
WHERE
blob1 = 'api_call'
AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY service
ORDER BY total_cost_usd DESC
Error rate by component, last 6 hours:
SELECT
blob2 AS component,
SUM(_sample_interval * double1) AS error_count,
toStartOfHour(timestamp) AS hour
FROM pipeline_metrics
WHERE
blob1 = 'error'
AND timestamp > NOW() - INTERVAL '6' HOUR
GROUP BY component, hour
ORDER BY hour DESC, error_count DESC
Average cycle duration by source:
SELECT
blob2 AS source,
AVG(double3) AS avg_duration_ms,
MAX(double3) AS max_duration_ms,
SUM(_sample_interval) AS total_cycles
FROM pipeline_metrics
WHERE
blob1 = 'crawl_cycle'
AND timestamp > NOW() - INTERVAL '7' DAY
GROUP BY source
The _sample_interval Factor
Analytics Engine samples data at high volumes. The _sample_interval column tells you how many events each row represents. At low volumes (under the sampling threshold), _sample_interval is always 1. At high volumes, it may be higher.
Always multiply numeric values by _sample_interval when computing aggregates:
-- Wrong: undercounts at high volume
SELECT COUNT(*) AS total_events FROM pipeline_metrics
-- Right: accounts for sampling
SELECT SUM(_sample_interval) AS total_events FROM pipeline_metrics
Building a Grafana Dashboard
Analytics Engine’s SQL API is compatible with Grafana’s JSON API data source. You can point Grafana at a Worker route that proxies queries to Analytics Engine.
// Grafana-compatible proxy route
app.post('/api/grafana/query', async (c) => {
const body = await c.req.json();
const sql = body.sql;
// Validate the query is read-only (prevent injection)
if (!sql.trim().toUpperCase().startsWith('SELECT')) {
return c.json({ error: 'Only SELECT queries allowed' }, 400);
}
const result = await queryAnalyticsEngine(c.env, sql);
return c.json(result);
});
Gotchas
-
No binding for reads — You can write with
env.METRICS.writeDataPoint()but you cannot query with a binding. Reads go through the HTTP SQL API, which means you need to store an API token as a secret. -
Field ordering matters —
blob1,blob2,blob3etc. are positional. If you change the order of your blobs, old and new data become incompatible. Define a schema early and stick with it. -
92-day retention — Data older than ~3 months is automatically deleted. If you need longer retention, roll up daily aggregates into D1.
-
Not yet billed — As of early 2026, Cloudflare has published pricing ($0.25/M writes, $1.00/M queries) but is not actively billing for Analytics Engine usage. Plan for the cost but enjoy the free ride while it lasts.
-
Query latency — SQL API queries typically take 50-200ms. This is fine for dashboards but too slow for hot-path request handling. Never put an Analytics Engine query in a user-facing request path.
-
Single index only — You can only provide one index per data point. If you provide multiple, the data point is silently dropped. Use the most meaningful grouping key as your index.
Pattern 5: The Hybrid Storage Approach
The Pattern
Use each Cloudflare storage primitive for what it does best. D1 for transactional application data. KV for operational snapshots. Analytics Engine for time-series metrics. Never cross the streams.
When to Use It
- You have a Workers pipeline with multiple storage needs
- You want to minimize cost while maximizing observability
- You have been bitten by putting monitoring data in D1
The Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Workers Pipeline │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌────────────────────────┐ │
│ │ Cron │ │ Queue │ │ Durable Object │ │
│ │ Handler │ │ Consumer │ │ Controller │ │
│ └─────┬────┘ └──────┬───────┘ └───────────┬────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Application Logic │ │
│ │ (crawl, enrich, score, publish) │ │
│ └──────┬──────────────┬──────────────────────┬───────────┘ │
│ │ │ │ │
│ Write app data Write metrics Write snapshot │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ D1 │ │ Analytics │ │ KV │ │
│ │ │ │ Engine │ │ │ │
│ │ items │ │ │ │ health:latest │ │
│ │ keywords │ │ pipeline_ │ │ health:snapshots │ │
│ │ api_calls│ │ metrics │ │ │ │
│ │ agents │ │ │ │ │ │
│ └──────────┘ └──────────────┘ └───────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Dashboard Routes │ │
│ │ GET /health → reads KV (1 read) │ │
│ │ GET /health/history → reads KV (1 read) │ │
│ │ GET /health/costs → reads KV (1 read) │ │
│ │ GET /activity → reads D1 (time-bounded query) │ │
│ │ GET /metrics/trend → queries Analytics Engine (HTTP) │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
What Goes Where
| Data Type | Storage | Why |
|---|---|---|
| App records, keywords, user data | D1 | Transactional, queryable, relational |
| API call ledger (individual calls) | D1 | Needs to be queryable for billing disputes |
| Agent processing ledger | D1 | Auditable record of what each agent did |
| Current health snapshot | KV | Single key read, no query needed |
| 24h snapshot history | KV | Rolling array, no query needed |
| Per-event metrics (throughput, cost, errors) | Analytics Engine | Time-series queries, 92-day retention |
| Long-term daily aggregates | D1 (optional) | Permanent trend data beyond 92 days |
Binding Configuration
// wrangler.jsonc
{
"name": "pipeline-worker",
"main": "src/index.ts",
"compatibility_date": "2025-03-07",
"compatibility_flags": ["nodejs_compat"],
// D1 — transactional application data
"d1_databases": [
{
"binding": "DB",
"database_name": "pipeline-db",
"database_id": "your-database-id",
"migrations_dir": "migrations"
}
],
// KV — operational snapshots
"kv_namespaces": [
{
"binding": "KV",
"id": "your-kv-namespace-id"
}
],
// Analytics Engine — time-series metrics
"analytics_engine_datasets": [
{
"binding": "METRICS",
"dataset": "pipeline_metrics"
}
],
// Cron triggers
"triggers": {
"crons": [
"* * * * *", // every minute: crawl cycle
"0 */6 * * *", // every 6 hours: deeper analysis
"0 6 * * *" // daily: cleanup, aggregation
]
}
}
Type Definitions
interface Env {
DB: D1Database;
KV: KVNamespace;
METRICS: AnalyticsEngineDataset;
ACCOUNT_ID: string;
ANALYTICS_API_TOKEN: string;
}
The Instrumented Pipeline
Here is how a crawl cycle writes to all three storage layers:
async function runInstrumentedCrawlCycle(env: Env): Promise<void> {
const startTime = Date.now();
let itemsDiscovered = 0;
let itemsEnriched = 0;
let apiCostUsd = 0;
let errors = 0;
try {
// ── Step 1: Crawl (writes to D1) ────────────────────────────
const crawlResult = await crawlDataSource(env.DB);
itemsDiscovered = crawlResult.newItems;
// Record each API call to D1 (for auditing) and Analytics Engine (for trending)
for (const call of crawlResult.apiCalls) {
// D1: permanent audit record
await env.DB.prepare(
'INSERT INTO api_calls (service, endpoint, cost_usd, created_at) VALUES (?, ?, ?, datetime("now"))',
).bind(call.service, call.endpoint, call.cost).run();
// Analytics Engine: time-series metric
env.METRICS.writeDataPoint({
blobs: ['api_call', call.service, 'success'],
doubles: [1, call.cost],
indexes: [call.service],
});
apiCostUsd += call.cost;
}
// ── Step 2: Enrich (writes to D1) ───────────────────────────
const enrichResult = await enrichPendingItems(env.DB);
itemsEnriched = enrichResult.enriched;
// ── Step 3: Record cycle metrics to Analytics Engine ────────
env.METRICS.writeDataPoint({
blobs: ['crawl_cycle', 'main', 'success'],
doubles: [
itemsDiscovered,
itemsEnriched,
Date.now() - startTime,
apiCostUsd,
],
indexes: ['main'],
});
} catch (error) {
errors++;
// Record error to Analytics Engine
env.METRICS.writeDataPoint({
blobs: ['error', 'crawl_cycle', (error as Error).message.slice(0, 100)],
doubles: [1],
indexes: ['crawl_cycle'],
});
// Also log to D1 error table for investigation
await env.DB.prepare(
'INSERT INTO error_log (component, message, created_at) VALUES (?, ?, datetime("now"))',
).bind('crawl_cycle', (error as Error).message).run();
}
}
Gotchas
-
Do not query Analytics Engine from the cron — The SQL API adds 50-200ms latency per query. In a cron that runs every minute, this overhead is significant. Use Analytics Engine for dashboard queries, not for cron-time computation.
-
Keep D1 writes minimal in the cron — The D1 write budget is 50 million rows/month included. Each crawl cycle should write only the data that needs to be durable and queryable. Metrics go to Analytics Engine.
-
KV eventual consistency — If your dashboard is served from a different Cloudflare region than where the cron runs, the KV value might be up to 60 seconds behind. Add a
freshness_secondsfield to the response so the consumer knows.
Pattern 6: Activity Feed Pattern
The Pattern
For “what just happened?” queries — recent activity, last N minutes of work — use D1 with narrow time-bounded queries. These are fundamentally different from monitoring aggregates and are appropriate for D1 because they scan a small, bounded number of rows.
When to Use It
- You want to show “recent activity” on a dashboard
- The time window is small (5-30 minutes)
- You want to see individual events, not aggregates
- The data is already in D1 for application purposes
The Key Difference
Monitoring aggregates scan the entire table: “how many apps exist?” Activity feeds scan a time window: “what happened in the last 5 minutes?” If your table has 500,000 rows but only 50 were created in the last 5 minutes, the activity query reads ~50 rows (with a timestamp index), not 500,000.
// Activity feed route — reads D1, but with bounded queries
app.get('/api/activity', async (c) => {
const minutes = parseInt(c.req.query('minutes') || '5');
// Cap the window to prevent accidental full scans
const cappedMinutes = Math.min(minutes, 60);
const [recentApiCalls, recentAgentWork, recentDiscovered, recentEnriched] =
await Promise.all([
c.env.DB.prepare(`
SELECT service, context_id, COUNT(*) as calls, MAX(created_at) as last_at
FROM api_calls
WHERE created_at >= datetime('now', '-' || ? || ' minutes')
GROUP BY service, context_id
ORDER BY last_at DESC
`).bind(cappedMinutes).all(),
c.env.DB.prepare(`
SELECT agent, stage, COUNT(*) as processed, MAX(computed_at) as last_at
FROM agent_ledger
WHERE computed_at >= datetime('now', '-' || ? || ' minutes')
GROUP BY agent, stage
ORDER BY last_at DESC
`).bind(cappedMinutes).all(),
c.env.DB.prepare(`
SELECT COUNT(*) as n FROM items
WHERE created_at >= datetime('now', '-' || ? || ' minutes')
`).bind(cappedMinutes).first<{ n: number }>(),
c.env.DB.prepare(`
SELECT COUNT(*) as n FROM items
WHERE enriched_at >= datetime('now', '-' || ? || ' minutes')
`).bind(cappedMinutes).first<{ n: number }>(),
]);
return c.json({
window_minutes: cappedMinutes,
discovered: recentDiscovered?.n ?? 0,
enriched: recentEnriched?.n ?? 0,
api_calls: recentApiCalls.results,
agent_work: recentAgentWork.results,
});
});
Index Requirements
Activity feed queries are only cheap if the timestamp columns are indexed:
CREATE INDEX idx_api_calls_created_at ON api_calls(created_at);
CREATE INDEX idx_agent_ledger_computed_at ON agent_ledger(computed_at);
CREATE INDEX idx_items_created_at ON items(created_at);
CREATE INDEX idx_items_enriched_at ON items(enriched_at);
Without these indexes, WHERE created_at >= datetime('now', '-5 minutes') becomes a full table scan — exactly what we are trying to avoid.
Cap the Time Window
Always cap the time window on activity feed queries. A user requesting ?minutes=10080 (one week) would scan every row created in the last week — potentially hundreds of thousands of rows:
const cappedMinutes = Math.min(
parseInt(c.req.query('minutes') || '5'),
60, // max 1 hour
);
Gotchas
-
GROUP BY adds cost —
GROUP BY service, context_idmeans D1 sorts the results. On a small result set (5 minutes of data), this is fast. On a large result set (1 hour of high-throughput data), it adds up. -
datetime() function calls —
datetime('now', '-5 minutes')is computed by SQLite at query time. It does not use cached results. This is fine. -
Time zone awareness —
datetime('now')in SQLite returns UTC. Make sure your dashboard is aware of this.
Pattern 7: Precompute on Write, Not on Read
The Pattern
Instead of computing aggregates when the dashboard asks for them (read-time), compute them when the data changes (write-time) and store the results. This inverts the cost model: you pay a tiny premium on every write instead of a large cost on every read.
When to Use It
- You have expensive aggregates (COUNT, SUM, AVG) that are queried frequently
- The write rate is predictable and manageable
- You can tolerate slight staleness (the aggregate might be one write behind)
The Counters Table
The simplest version: maintain a counters table with pre-aggregated values.
CREATE TABLE counters (
name TEXT PRIMARY KEY,
value INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Seed with initial values
INSERT INTO counters (name, value) VALUES
('total_items', 0),
('enriched_items', 0),
('pending_items', 0),
('total_keywords', 0),
('total_errors_today', 0);
Increment on Write
Use D1 batch operations to update counters atomically with the data write:
async function insertItem(db: D1Database, item: NewItem): Promise<void> {
await db.batch([
db.prepare(
'INSERT INTO items (id, name, status, created_at) VALUES (?, ?, ?, datetime("now"))',
).bind(item.id, item.name, 'pending'),
db.prepare(
"UPDATE counters SET value = value + 1, updated_at = datetime('now') WHERE name = 'total_items'",
),
db.prepare(
"UPDATE counters SET value = value + 1, updated_at = datetime('now') WHERE name = 'pending_items'",
),
]);
}
async function markEnriched(db: D1Database, itemId: string): Promise<void> {
await db.batch([
db.prepare(
"UPDATE items SET status = 'enriched', enriched_at = datetime('now') WHERE id = ?",
).bind(itemId),
db.prepare(
"UPDATE counters SET value = value + 1, updated_at = datetime('now') WHERE name = 'enriched_items'",
),
db.prepare(
"UPDATE counters SET value = value - 1, updated_at = datetime('now') WHERE name = 'pending_items'",
),
]);
}
Reading Counters
Now the health snapshot reads counters instead of running COUNT(*):
async function getCountsFromCountersTable(
db: D1Database,
): Promise<Record<string, number>> {
const result = await db.prepare('SELECT name, value FROM counters').all();
return Object.fromEntries(
result.results.map((r) => [r.name as string, r.value as number]),
);
}
// This reads ~5 rows instead of scanning 500,000
// Cost: ~5 row reads vs ~500,000 row reads
Daily Counter Reset
Some counters need to reset daily (errors today, cost today). Use the daily cron for this:
// In the daily cron (0 0 * * *)
async function resetDailyCounters(db: D1Database): Promise<void> {
await db.batch([
db.prepare(
"UPDATE counters SET value = 0, updated_at = datetime('now') WHERE name = 'total_errors_today'",
),
db.prepare(
"UPDATE counters SET value = 0, updated_at = datetime('now') WHERE name = 'cost_calls_today'",
),
]);
}
Cost Analysis
Without counters (21 COUNT queries, each scanning ~100,000 rows, every 5 minutes):
21 queries * 100,000 rows * 288 ticks/day = 604,800,000 row reads/day
Monthly: 18.1 billion row reads
With counters (1 query reading ~10 rows, every 5 minutes):
1 query * 10 rows * 288 ticks/day = 2,880 row reads/day
Monthly: 86,400 row reads
The counter approach writes a few extra rows per item processed. If you process 1,000 items per day and each insert triggers 2 counter updates:
1,000 items * 2 counter updates = 2,000 extra row writes/day
Monthly: 60,000 row writes
Row writes cost $1.00 per million. 60,000 writes cost $0.00006. The savings on reads (18.1 billion to 86,400) pay for the extra writes by a factor of approximately 200,000.
Key insight: Precomputing on write trades expensive reads for cheap writes. In D1’s pricing model, reads are $0.001/M and writes are $1.00/M — reads are 1,000x cheaper per row. But when your read queries scan 100,000 rows each, a single
COUNT(*)costs the equivalent of 100 row reads (since $0.001/M * 100,000 = same cost as writing 0.1 rows at $1.00/M). The economics always favor precomputation when read queries scan large tables.
Gotchas
-
Counter drift — If a write succeeds but the counter update fails (or vice versa), the counter becomes inaccurate. D1 batch operations are atomic within a single database, so this should not happen for same-database batches. But if you forget to batch them, you will drift.
-
Reconciliation — Run a periodic reconciliation job (daily or weekly) that compares counter values with actual
COUNT(*)results and corrects drift:
async function reconcileCounters(db: D1Database): Promise<void> {
const actual = await db.prepare('SELECT COUNT(*) as n FROM items').first<{ n: number }>();
await db.prepare(
"UPDATE counters SET value = ?, updated_at = datetime('now') WHERE name = 'total_items'",
).bind(actual?.n ?? 0).run();
// Repeat for other counters...
}
- Migration — If you add the counters table to an existing system, you need to seed the counters with the current
COUNT(*)values. Run this once:
async function seedCounters(db: D1Database): Promise<void> {
const [total, enriched, pending] = await Promise.all([
db.prepare('SELECT COUNT(*) as n FROM items').first<{ n: number }>(),
db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'enriched'").first<{ n: number }>(),
db.prepare("SELECT COUNT(*) as n FROM items WHERE status = 'pending'").first<{ n: number }>(),
]);
await db.batch([
db.prepare('INSERT OR REPLACE INTO counters (name, value) VALUES (?, ?)').bind('total_items', total?.n ?? 0),
db.prepare('INSERT OR REPLACE INTO counters (name, value) VALUES (?, ?)').bind('enriched_items', enriched?.n ?? 0),
db.prepare('INSERT OR REPLACE INTO counters (name, value) VALUES (?, ?)').bind('pending_items', pending?.n ?? 0),
]);
}
Small Examples
Example 1: Minimal Health Check Endpoint
The simplest possible health endpoint — just read the latest snapshot from KV.
import { Hono } from 'hono';
const app = new Hono<{ Bindings: { KV: KVNamespace } }>();
app.get('/health', async (c) => {
const data = await c.env.KV.get('health:latest', 'json');
if (!data) return c.json({ status: 'initializing' }, 503);
const age = Date.now() - new Date((data as any).taken_at).getTime();
return c.json({
status: age < 600_000 ? 'healthy' : 'stale',
age_seconds: Math.round(age / 1000),
...(data as any).health,
});
});
export default app;
Example 2: Recording Queue Consumer Metrics
Instrument a queue consumer to write metrics to Analytics Engine after processing each batch.
interface Env {
METRICS: AnalyticsEngineDataset;
DB: D1Database;
}
interface QueueMessage {
type: string;
payload: Record<string, unknown>;
}
export default {
async queue(
batch: MessageBatch<QueueMessage>,
env: Env,
): Promise<void> {
const startTime = Date.now();
let successes = 0;
let failures = 0;
for (const msg of batch.messages) {
try {
await processMessage(env.DB, msg.body);
msg.ack();
successes++;
} catch {
msg.retry();
failures++;
}
}
// Record batch metrics — fire and forget
env.METRICS.writeDataPoint({
blobs: ['queue_batch', batch.queue, successes === batch.messages.length ? 'clean' : 'partial'],
doubles: [
batch.messages.length, // total messages
successes, // successful
failures, // failed
Date.now() - startTime, // batch duration ms
],
indexes: [batch.queue],
});
},
};
async function processMessage(db: D1Database, body: QueueMessage): Promise<void> {
// Your processing logic here
}
Example 3: Durable Object Health Reporter
A Durable Object that exposes its internal state as a health report, written to KV on every alarm.
export class PipelineController implements DurableObject {
private state: DurableObjectState;
private env: Env;
private processedCount = 0;
private errorCount = 0;
private lastCycleMs = 0;
constructor(state: DurableObjectState, env: Env) {
this.state = state;
this.env = env;
state.blockConcurrencyWhile(async () => {
this.processedCount = (await state.storage.get('processedCount')) ?? 0;
this.errorCount = (await state.storage.get('errorCount')) ?? 0;
});
}
async alarm(): Promise<void> {
const start = Date.now();
try {
await this.runCycle();
this.processedCount++;
await this.state.storage.put('processedCount', this.processedCount);
} catch {
this.errorCount++;
await this.state.storage.put('errorCount', this.errorCount);
}
this.lastCycleMs = Date.now() - start;
// Write health to KV
await this.env.KV.put(
'do:pipeline-controller:health',
JSON.stringify({
processed: this.processedCount,
errors: this.errorCount,
last_cycle_ms: this.lastCycleMs,
error_rate: this.processedCount > 0
? this.errorCount / (this.processedCount + this.errorCount)
: 0,
updated_at: new Date().toISOString(),
}),
);
// Schedule next alarm
this.state.storage.setAlarm(Date.now() + 30_000); // 30 seconds
}
private async runCycle(): Promise<void> {
// Processing logic
}
}
Example 4: Cost Alerting with Analytics Engine
Query Analytics Engine to check if API spend exceeds a threshold, and log a warning.
async function checkCostAlert(env: Env): Promise<void> {
const sql = `
SELECT SUM(_sample_interval * double2) AS total_cost
FROM pipeline_metrics
WHERE blob1 = 'api_call'
AND timestamp > NOW() - INTERVAL '24' HOUR
`;
const result = await queryAnalyticsEngine(env, sql);
const totalCost = result.data[0]?.total_cost as number ?? 0;
const DAILY_BUDGET = 1.00; // $1.00/day
if (totalCost > DAILY_BUDGET * 0.8) {
console.warn(
`[COST ALERT] 24h API spend: $${totalCost.toFixed(4)} (${Math.round(totalCost / DAILY_BUDGET * 100)}% of budget)`,
);
// Write alert to KV for dashboard
await env.KV.put(
'alert:cost',
JSON.stringify({
level: totalCost > DAILY_BUDGET ? 'critical' : 'warning',
spend_24h: totalCost,
budget: DAILY_BUDGET,
pct: Math.round(totalCost / DAILY_BUDGET * 100),
checked_at: new Date().toISOString(),
}),
);
}
}
Example 5: Multi-Pipeline Health Aggregator
When you have multiple pipelines (crawling, enrichment, publishing), aggregate their health into a single dashboard.
interface PipelineHealth {
name: string;
status: 'running' | 'paused' | 'error' | 'unknown';
last_activity: string;
items_processed_24h: number;
error_rate: number;
}
app.get('/api/health/pipelines', async (c) => {
// Read health from each pipeline's KV key
const keys = [
'health:crawler',
'health:enrichment',
'health:publisher',
'health:agents',
];
const results = await Promise.all(
keys.map(async (key) => {
const data = await c.env.KV.get(key, 'json');
return { key, data };
}),
);
const pipelines: PipelineHealth[] = results.map(({ key, data }) => {
const name = key.split(':')[1];
if (!data) {
return { name, status: 'unknown' as const, last_activity: 'never', items_processed_24h: 0, error_rate: 0 };
}
const health = data as any;
const age = Date.now() - new Date(health.updated_at).getTime();
const status = age > 600_000 ? 'error' : health.paused ? 'paused' : 'running';
return {
name,
status: status as PipelineHealth['status'],
last_activity: health.updated_at,
items_processed_24h: health.items_24h ?? 0,
error_rate: health.error_rate ?? 0,
};
});
const overall = pipelines.every((p) => p.status === 'running')
? 'healthy'
: pipelines.some((p) => p.status === 'error')
? 'degraded'
: 'partial';
return c.json({ overall, pipelines });
});
Example 6: Snapshot Diff — What Changed Since Last Check
Compare the current snapshot with the previous one to highlight changes.
app.get('/api/health/diff', async (c) => {
const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');
if (!snapshots || snapshots.length < 2) {
return c.json({ error: 'Need at least 2 snapshots for diff' }, 400);
}
const current = snapshots[snapshots.length - 1];
const previous = snapshots[snapshots.length - 2];
const diff = {
interval_minutes: Math.round(
(new Date(current.taken_at).getTime() - new Date(previous.taken_at).getTime()) / 60000,
),
counts: {
total_items: current.health.counts.total_items - previous.health.counts.total_items,
enriched_items: current.health.counts.enriched_items - previous.health.counts.enriched_items,
pending_items: current.health.counts.pending_items - previous.health.counts.pending_items,
},
costs: {
usd_since_last: round4(
current.health.costs.today_usd - previous.health.costs.today_usd,
),
},
errors: {
new_errors: current.health.errors.last_hour - previous.health.errors.last_hour,
},
};
return c.json({
current_at: current.taken_at,
previous_at: previous.taken_at,
diff,
});
});
Example 7: KV Snapshot with TTL Auto-Cleanup
Use KV’s built-in expiration to automatically clean up old snapshots without manual pruning.
async function writeSnapshotWithTTL(
kv: KVNamespace,
health: HealthData,
): Promise<void> {
const now = new Date();
const timestamp = now.toISOString();
const hourKey = `health:hourly:${now.toISOString().slice(0, 13)}`; // health:hourly:2026-03-14T10
// Latest — no TTL, always available
await kv.put('health:latest', JSON.stringify({ health, taken_at: timestamp }));
// Hourly bucket — expires after 7 days
const existing = await kv.get<Snapshot[]>(hourKey, 'json');
const bucket = existing ?? [];
bucket.push({ health, taken_at: timestamp });
await kv.put(hourKey, JSON.stringify(bucket), {
expirationTtl: 7 * 24 * 60 * 60, // 7 days in seconds
});
}
This eliminates the need for manual array trimming. Each hourly key contains all snapshots for that hour and automatically disappears after 7 days.
Example 8: Analytics Engine Dashboard Route
A complete dashboard route that queries Analytics Engine for trending data.
app.get('/api/metrics/throughput', async (c) => {
const hours = parseInt(c.req.query('hours') || '24');
const cappedHours = Math.min(hours, 168); // max 7 days
const sql = `
SELECT
toStartOfHour(timestamp) AS hour,
SUM(_sample_interval * double1) AS items_discovered,
SUM(_sample_interval * double2) AS items_enriched,
AVG(double3) AS avg_cycle_ms,
SUM(_sample_interval * double4) AS total_cost_usd
FROM pipeline_metrics
WHERE
blob1 = 'crawl_cycle'
AND timestamp > NOW() - INTERVAL '${cappedHours}' HOUR
GROUP BY hour
ORDER BY hour ASC
`;
try {
const result = await queryAnalyticsEngine(c.env, sql);
return c.json({
window_hours: cappedHours,
data_points: result.data,
total_rows: result.rows,
});
} catch (error) {
return c.json(
{ error: 'Failed to query Analytics Engine', detail: (error as Error).message },
500,
);
}
});
Example 9: Composite Health Score
Compute a single 0-100 health score from multiple signals.
function computeHealthScore(health: HealthData): number {
let score = 100;
// Freshness penalty: -20 if nothing processed in the last hour
if (health.velocity.items_last_hour === 0) {
score -= 20;
}
// Error penalty: -5 per error in the last hour, up to -30
score -= Math.min(health.errors.last_hour * 5, 30);
// Cost penalty: -20 if over daily budget
const dailyBudget = 1.0;
if (health.costs.rate_per_day_estimate > dailyBudget) {
score -= 20;
}
// Backlog penalty: -10 if pending items are growing
if (health.velocity.items_last_hour > health.velocity.enriched_last_hour * 2) {
score -= 10;
}
// ETA penalty: -10 if enrichment will take more than 30 days
if (health.velocity.est_days_to_completion !== null && health.velocity.est_days_to_completion > 30) {
score -= 10;
}
return Math.max(0, Math.min(100, score));
}
app.get('/api/health/score', async (c) => {
const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
if (!snapshot) return c.json({ score: 0, status: 'no_data' });
const score = computeHealthScore(snapshot.health);
const status = score >= 80 ? 'healthy' : score >= 50 ? 'degraded' : 'critical';
return c.json({ score, status, snapshot_at: snapshot.taken_at });
});
Example 10: Lightweight Uptime Ping
An external uptime monitor can hit this endpoint. It reads nothing — just confirms the Worker is alive and the cron is running.
app.get('/ping', async (c) => {
// Read the latest snapshot timestamp from KV
const raw = await c.env.KV.get('health:latest');
if (!raw) {
// No snapshot yet — Worker is running but cron has not fired
return c.json({ status: 'starting', cron_active: false }, 200);
}
const snapshot = JSON.parse(raw);
const age = Date.now() - new Date(snapshot.taken_at).getTime();
return c.json({
status: 'up',
cron_active: age < 600_000, // true if snapshot is < 10 min old
last_snapshot: snapshot.taken_at,
age_seconds: Math.round(age / 1000),
});
});
Comparisons
Storage Primitives for Monitoring
| Feature | D1 | KV | Analytics Engine |
|---|---|---|---|
| Data model | Relational (SQLite) | Key-value | Time-series (columnar) |
| Query language | Full SQL | Get by key, list by prefix | SQL subset (no joins) |
| Write latency | 10-30ms | 1-5ms | ~0ms (async) |
| Read latency | 5-20ms | 1-5ms (edge cached) | 50-200ms (HTTP API) |
| Read cost model | Per row scanned | Per key read | Per query |
| Write cost model | Per row written | Per key written | Per data point |
| Read pricing | $0.001/M rows | $0.20/M reads | $1.00/M queries |
| Write pricing | $1.00/M rows | $1.00/M writes | $0.25/M data points |
| Included (paid plan) | 25B reads, 50M writes | Paid per use (after free) | 10M writes, 1M queries |
| Retention | Permanent | Permanent (or TTL) | ~92 days |
| Best for monitoring | Activity feeds (bounded) | Current state, snapshots | Trending, alerting |
| Worst for monitoring | Aggregates on large tables | Time-range queries | Real-time reads |
Monitoring Approaches Compared
| Approach | Latency | D1 Cost | Setup Complexity | Query Power | Data Retention |
|---|---|---|---|---|---|
| Live D1 queries | 5-20ms | Very high (full scans) | None | Full SQL | Permanent |
| KV snapshots | 1-5ms | Low (5-min interval) | Low | None (key lookup) | Configurable |
| Analytics Engine | 50-200ms | Zero | Medium | SQL subset | 92 days |
| KV + Analytics Engine | 1-5ms / 50-200ms | Low | Medium | Both patterns | Configurable + 92 days |
| External (Datadog) | 100-500ms | Zero | High | Full | Plan-dependent |
| External (Grafana Cloud) | 100-500ms | Zero | High | PromQL/SQL | Plan-dependent |
External Monitoring vs. Built-In
| Factor | Cloudflare Built-In (KV + AE) | Datadog | Grafana Cloud | Better Uptime |
|---|---|---|---|---|
| Setup | Add bindings, write code | Install agent, configure | Install agent, configure | Add URL |
| Monthly cost | ~$0 (within included) | $15+/host | $0 (free tier) - $29+ | $20+ |
| Custom metrics | writeDataPoint() | DogStatsD | Prometheus push | HTTP only |
| Dashboards | Build yourself (or Grafana) | Included | Included | Basic |
| Alerting | Build yourself | Included | Included | Included |
| Workers-native | Yes (binding) | No (HTTP push) | No (HTTP push) | No |
| Vendor lock-in | Cloudflare | Datadog | Low (OSS core) | Low |
| Edge latency | 0ms (same worker) | 50-200ms (HTTP push) | 50-200ms (HTTP push) | N/A |
| Retention | 92 days (AE) + permanent (KV/D1) | 15 months | 13 months (free) | 90 days |
Recommendation: Start with KV snapshots for operational health + Analytics Engine for time-series. Add Grafana Cloud only if you need sophisticated alerting rules or cross-service dashboards. Datadog is overkill for most Workers-only architectures.
When to Use External Monitoring
External monitoring makes sense when:
- You have services outside Cloudflare that need unified dashboards
- You need PagerDuty/OpsGenie integration for on-call alerting
- You need anomaly detection beyond simple threshold alerting
- Your team already uses Datadog/Grafana and wants one pane of glass
- You need compliance-grade audit logging
External monitoring does NOT make sense when:
- Your entire stack is on Cloudflare Workers
- You have fewer than 5 production Workers
- You are solo or a small team that checks dashboards manually
- Budget is tight (external monitoring costs more than the Workers themselves)
Anti-Patterns
| Don’t | Do Instead | Why |
|---|---|---|
Run COUNT(*) on large tables from a cron | Precompute counts on write (counters table) or snapshot to KV every 5 min | COUNT(*) scans every row. A 500K-row table = 500K row reads per invocation. |
| Store monitoring snapshots in D1 | Store in KV (health:latest + health:snapshots) | D1 snapshots create a table that grows forever, and reading them requires more D1 queries. |
| Run health queries every minute | Run every 5 minutes (or 15 for non-critical systems) | 5-min intervals use 5x fewer D1 reads than 1-min. Dashboard users cannot tell the difference. |
| Query D1 on every dashboard load | Read from KV (precomputed snapshot) | Each dashboard load re-runs expensive queries. 100 dashboard loads = 100x the D1 cost. |
Use SELECT * for monitoring queries | Use SELECT COUNT(*) with specific WHERE clauses, or better, read counters | SELECT * returns all columns and all rows. Monitoring needs counts, not records. |
| Forget timestamp indexes | Create indexes on created_at, updated_at, enriched_at | Without an index, WHERE created_at >= datetime('now', '-5 minutes') is a full table scan. |
| Put Analytics Engine queries in request-hot-path | Query AE from background jobs or dedicated dashboard routes | AE SQL API has 50-200ms latency. Acceptable for dashboards, not for user-facing requests. |
| Store unbounded history in KV | Trim to a fixed window (288 entries = 24h at 5-min) or use TTL | KV values have a 25 MB limit. Unbounded arrays will hit it and your writes will fail silently. |
| Use one blob field for everything | Design a consistent schema with blob1=event_type, blob2=source, blob3=status | Inconsistent blob ordering makes queries impossible since AE fields are positional. |
| Skip reconciliation for counters | Run a weekly job that verifies counters against actual COUNT(*) | Counters can drift if writes fail partially. Periodic reconciliation catches drift early. |
| Ignore staleness on dashboards | Add freshness_seconds and status: 'stale' to health responses | A dashboard showing 2-hour-old data without indicating staleness creates false confidence. |
| Push metrics to external services from every Worker invocation | Batch metrics in Analytics Engine, push to external on a schedule | Pushing per-invocation adds latency and creates a dependency on an external service. |
Cost Math
Scenario: Pipeline Processing 1,000 Items Per Day
The pipeline runs a cron every minute, processes ~42 items per hour, and needs monitoring.
Approach 1: Live D1 Queries (the anti-pattern)
21 COUNT(*) queries per tick, average 100,000 rows scanned each
Ticks per day: 1,440 (every minute)
Row reads per day: 21 * 100,000 * 1,440 = 3,024,000,000
Row reads per month: 3,024,000,000 * 30 = 90,720,000,000
D1 included: 25,000,000,000 / month
Overage: 65,720,000,000 rows
Cost: 65,720,000,000 * $0.001 / 1,000,000 = $65.72/month
Plus D1 writes for snapshots:
1,440 snapshots/day * 30 = 43,200 writes/month
Cost: $0.043
Total monitoring cost: ~$65.77/month
Approach 2: KV Snapshots Every 5 Minutes
D1 reads (computing snapshots):
13 queries * 100,000 rows * 288 ticks/day = 374,400,000/day
Monthly: 11,232,000,000 rows
Within 25B included: $0.00
KV writes:
2 writes per tick (latest + history) * 288 ticks/day = 576 writes/day
Monthly: 17,280 writes
Cost: $0.00 (within free tier of 1,000/day)
KV reads (dashboard loads):
Assume 100 dashboard loads/day = 3,000/month
Cost: $0.00 (within free tier)
Total monitoring cost: ~$0.00/month
Wait — 11.2 billion row reads per month is still nearly half the included 25 billion. Let’s improve with the counters table.
Approach 3: KV Snapshots + Counters Table
D1 reads (computing snapshots with counters):
5 rows from counters + 4 time-bounded queries (~500 rows each)
Per tick: 5 + 2,000 = 2,005 rows
288 ticks/day = 577,440 rows/day
Monthly: 17,323,200 rows
Cost: $0.00 (well within 25B included)
D1 writes (counter increments):
1,000 items/day * 3 counter updates each = 3,000 writes/day
Monthly: 90,000 writes
Cost: $0.00 (within 50M included)
KV writes + reads: same as above = $0.00
Total monitoring cost: ~$0.00/month
Approach 4: KV + Analytics Engine
Same KV costs as Approach 3: $0.00
Analytics Engine writes:
1,000 items/day * 2 data points each = 2,000 writes/day
Plus 288 cycle metrics + 100 error events = ~2,400/day
Monthly: 72,000 data points
Cost: $0.00 (within 10M included)
Analytics Engine queries:
100 dashboard loads/day * 3 queries each = 300 queries/day
Monthly: 9,000 queries
Cost: $0.00 (within 1M included)
Total monitoring cost: ~$0.00/month
Breakeven Analysis
At what scale does each approach start costing money?
| Approach | Free Until | Monthly Cost at 10x Scale |
|---|---|---|
| Live D1 queries | ~8 days at 1-min interval | $65.77 |
| KV snapshots (no counters) | ~67 days of data growth | $0.00 |
| KV + counters | ~4 years of data growth | $0.00 |
| KV + counters + Analytics Engine | ~130K items/day | $0.00 |
| External (Datadog) | Never free | $15-23/month minimum |
Key insight: With the hybrid approach (KV snapshots + counters + Analytics Engine), monitoring is effectively free for any pipeline processing under 100,000 items per day. The included allocations on the $5/month Workers plan cover everything.
The $5 Plan Budget
The Workers paid plan ($5/month) includes:
| Resource | Included | Monitoring Budget (5%) |
|---|---|---|
| D1 row reads | 25 billion | 1.25 billion |
| D1 row writes | 50 million | 2.5 million |
| KV reads | Paid per use ($0.20/M) | ~50,000 reads = $0.01 |
| KV writes | Paid per use ($1.00/M) | ~17,000 writes = $0.017 |
| AE data points | 10 million | 500,000 |
| AE queries | 1 million | 50,000 |
| Worker invocations | 10 million | 500,000 |
If your monitoring uses more than 5% of any resource, something is wrong with your monitoring design. Monitoring should be a rounding error on your infrastructure bill.
Production Checklist
Before deploying monitoring for a Workers pipeline, verify:
- Health snapshot runs every 5 minutes, not every minute
- Snapshot is stored in KV, not D1
- Dashboard routes read from KV, not from live D1 queries
- All timestamp columns used in WHERE clauses are indexed
- COUNT(*) queries use compound CASE expressions or a counters table
- Activity feed queries have a capped time window (max 60 minutes)
- Analytics Engine binding is configured for time-series metrics
- Health endpoint includes freshness_seconds and a staleness indicator
- KV snapshot history is trimmed to a fixed window (288 entries)
- Error handling exists for first-deploy 404 (no snapshot yet)
- Cost projection (rate_per_day_estimate) is included in the snapshot
- Counter reconciliation job runs weekly
- writeDataPoint() calls are NOT awaited
- Analytics Engine queries are NOT in the request hot path
- Dashboard has Cache-Control headers to reduce KV reads
Full Working Example
This is a complete Worker with all patterns integrated — cron with KV snapshots, Hono dashboard routes, Analytics Engine instrumentation, and an activity feed.
import { Hono } from 'hono';
import { cors } from 'hono/cors';
// ── Types ───────────────────────────────────────────────────────
interface Env {
DB: D1Database;
KV: KVNamespace;
METRICS: AnalyticsEngineDataset;
ACCOUNT_ID: string;
ANALYTICS_API_TOKEN: string;
}
interface HealthData {
counts: Record<string, number>;
velocity: Record<string, number>;
costs: {
today_usd: number;
today_calls: number;
rate_per_hour: number;
rate_per_day_estimate: number;
by_service: Record<string, { usd: number; calls: number }>;
};
errors: { last_hour: number; last_24h: number };
}
interface Snapshot {
health: HealthData;
taken_at: string;
}
// ── Hono App ────────────────────────────────────────────────────
const app = new Hono<{ Bindings: Env }>();
app.use('*', cors());
// Health — from KV
app.get('/api/health', async (c) => {
const snapshot = await c.env.KV.get<Snapshot>('health:latest', 'json');
if (!snapshot) return c.json({ status: 'initializing' }, 503);
const age = Date.now() - new Date(snapshot.taken_at).getTime();
c.header('Cache-Control', 'public, max-age=60');
return c.json({
status: age < 600_000 ? 'healthy' : 'stale',
age_seconds: Math.round(age / 1000),
health: snapshot.health,
snapshot_at: snapshot.taken_at,
});
});
// History — from KV
app.get('/api/health/history', async (c) => {
const hours = Math.min(parseInt(c.req.query('hours') || '24'), 48);
const snapshots = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');
if (!snapshots) return c.json({ snapshots: [], count: 0 });
const cutoff = new Date(Date.now() - hours * 3600_000).toISOString();
const filtered = snapshots.filter((s) => s.taken_at >= cutoff);
return c.json({ snapshots: filtered, count: filtered.length, window_hours: hours });
});
// Activity — from D1 (bounded time window)
app.get('/api/activity', async (c) => {
const minutes = Math.min(parseInt(c.req.query('minutes') || '5'), 60);
const [calls, items] = await Promise.all([
c.env.DB.prepare(`
SELECT service, COUNT(*) as n, MAX(created_at) as last_at
FROM api_calls
WHERE created_at >= datetime('now', '-' || ? || ' minutes')
GROUP BY service ORDER BY last_at DESC
`).bind(minutes).all(),
c.env.DB.prepare(`
SELECT COUNT(*) as n FROM items
WHERE created_at >= datetime('now', '-' || ? || ' minutes')
`).bind(minutes).first<{ n: number }>(),
]);
return c.json({
window_minutes: minutes,
new_items: items?.n ?? 0,
api_calls: calls.results,
});
});
// Metrics — from Analytics Engine
app.get('/api/metrics/throughput', async (c) => {
const hours = Math.min(parseInt(c.req.query('hours') || '24'), 168);
const sql = `
SELECT toStartOfHour(timestamp) AS hour,
SUM(_sample_interval * double1) AS discovered,
SUM(_sample_interval * double2) AS enriched,
SUM(_sample_interval * double4) AS cost_usd
FROM pipeline_metrics
WHERE blob1 = 'crawl_cycle'
AND timestamp > NOW() - INTERVAL '${hours}' HOUR
GROUP BY hour ORDER BY hour ASC
`;
const url = `https://api.cloudflare.com/client/v4/accounts/${c.env.ACCOUNT_ID}/analytics_engine/sql`;
const res = await fetch(url, {
method: 'POST',
headers: { Authorization: `Bearer ${c.env.ANALYTICS_API_TOKEN}` },
body: sql,
});
if (!res.ok) return c.json({ error: 'Analytics query failed' }, 502);
return c.json(await res.json());
});
// ── Cron Handler ────────────────────────────────────────────────
async function handleScheduled(event: ScheduledEvent, env: Env): Promise<void> {
if (event.cron === '* * * * *') {
// Useful work every minute
const start = Date.now();
const result = await runCrawlCycle(env);
// Record to Analytics Engine (fire-and-forget)
env.METRICS.writeDataPoint({
blobs: ['crawl_cycle', 'main', result.error ? 'error' : 'success'],
doubles: [result.discovered, result.enriched, Date.now() - start, result.cost],
indexes: ['main'],
});
// Health snapshot every 5 minutes
if (new Date().getUTCMinutes() % 5 === 0) {
await computeAndStoreSnapshot(env);
}
}
}
async function computeAndStoreSnapshot(env: Env): Promise<void> {
// Read from counters table (cheap) + time-bounded queries
const [counters, costToday, errorsHour, errors24h] = await Promise.all([
env.DB.prepare('SELECT name, value FROM counters').all(),
env.DB.prepare(
"SELECT COALESCE(SUM(cost_usd),0) as cost, COUNT(*) as calls FROM api_calls WHERE created_at >= date('now')"
).first<{ cost: number; calls: number }>(),
env.DB.prepare(
"SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now','-1 hour')"
).first<{ n: number }>(),
env.DB.prepare(
"SELECT COUNT(*) as n FROM error_log WHERE created_at >= datetime('now','-24 hours')"
).first<{ n: number }>(),
]);
const counts = Object.fromEntries(
counters.results.map((r) => [r.name as string, r.value as number]),
);
const todayCost = costToday?.cost ?? 0;
const health: HealthData = {
counts,
velocity: {
enriched_last_hour: counts.enriched_last_hour ?? 0,
items_last_24h: counts.items_last_24h ?? 0,
},
costs: {
today_usd: Math.round(todayCost * 10000) / 10000,
today_calls: costToday?.calls ?? 0,
rate_per_hour: Math.round((todayCost / Math.max(new Date().getUTCHours(), 1)) * 10000) / 10000,
rate_per_day_estimate: Math.round((todayCost / Math.max(new Date().getUTCHours(), 1)) * 24 * 10000) / 10000,
by_service: {},
},
errors: {
last_hour: errorsHour?.n ?? 0,
last_24h: errors24h?.n ?? 0,
},
};
const now = new Date().toISOString();
const snapshot: Snapshot = { health, taken_at: now };
await env.KV.put('health:latest', JSON.stringify(snapshot));
const existing = await env.KV.get<Snapshot[]>('health:snapshots', 'json');
const history = existing ?? [];
history.push(snapshot);
while (history.length > 288) history.shift();
await env.KV.put('health:snapshots', JSON.stringify(history));
}
// Stub for illustration
async function runCrawlCycle(env: Env): Promise<{
discovered: number; enriched: number; cost: number; error?: string
}> {
return { discovered: 3, enriched: 1, cost: 0.005 };
}
// ── Export ───────────────────────────────────────────────────────
export default {
fetch: app.fetch,
scheduled: handleScheduled,
};
Wrangler Configuration
// wrangler.jsonc
{
"name": "monitored-pipeline",
"main": "src/index.ts",
"compatibility_date": "2025-03-07",
"compatibility_flags": ["nodejs_compat"],
"d1_databases": [
{
"binding": "DB",
"database_name": "pipeline-db",
"database_id": "your-db-id",
"migrations_dir": "migrations"
}
],
"kv_namespaces": [
{
"binding": "KV",
"id": "your-kv-id"
}
],
"analytics_engine_datasets": [
{
"binding": "METRICS",
"dataset": "pipeline_metrics"
}
],
"vars": {
"ACCOUNT_ID": "your-account-id"
},
"triggers": {
"crons": [
"* * * * *"
]
}
}
Migration for Counters Table
-- migrations/0001_counters.sql
CREATE TABLE IF NOT EXISTS counters (
name TEXT PRIMARY KEY,
value INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT OR IGNORE INTO counters (name, value) VALUES
('total_items', 0),
('enriched_items', 0),
('pending_items', 0),
('total_keywords', 0),
('total_errors_today', 0);
-- Indexes for time-bounded queries
CREATE INDEX IF NOT EXISTS idx_api_calls_created_at ON api_calls(created_at);
CREATE INDEX IF NOT EXISTS idx_error_log_created_at ON error_log(created_at);
CREATE INDEX IF NOT EXISTS idx_items_created_at ON items(created_at);
CREATE INDEX IF NOT EXISTS idx_items_enriched_at ON items(enriched_at);
Monitoring Monitoring
A monitoring system that fails silently is worse than no monitoring. Here is how to monitor your monitoring.
Staleness Detection
The simplest check: is the latest snapshot too old?
app.get('/api/health/meta', async (c) => {
const latest = await c.env.KV.get<Snapshot>('health:latest', 'json');
const history = await c.env.KV.get<Snapshot[]>('health:snapshots', 'json');
const now = Date.now();
const latestAge = latest
? Math.round((now - new Date(latest.taken_at).getTime()) / 1000)
: null;
return c.json({
latest_snapshot_age_seconds: latestAge,
latest_snapshot_at: latest?.taken_at ?? null,
history_count: history?.length ?? 0,
history_oldest: history?.[0]?.taken_at ?? null,
history_newest: history?.[history.length - 1]?.taken_at ?? null,
monitoring_status: latestAge === null
? 'no_data'
: latestAge < 600
? 'healthy'
: latestAge < 1800
? 'delayed'
: 'down',
});
});
External Ping
Use an external uptime service (Better Uptime, UptimeRobot, or even a simple cron on another machine) to hit your /ping endpoint every 5 minutes. If it stops responding or reports cron_active: false, alert.
KV Write Failures
KV writes can fail silently if the value exceeds the 25 MB limit. Log the size of your snapshots array periodically:
async function computeAndStoreSnapshot(env: Env): Promise<void> {
// ... compute health ...
const history = (await env.KV.get<Snapshot[]>('health:snapshots', 'json')) ?? [];
history.push(snapshot);
while (history.length > 288) history.shift();
const serialized = JSON.stringify(history);
const sizeKB = Math.round(serialized.length / 1024);
if (sizeKB > 20_000) {
console.warn(`[monitoring] Snapshot history is ${sizeKB} KB — approaching 25 MB KV limit`);
}
await env.KV.put('health:snapshots', serialized);
await env.KV.put('health:latest', JSON.stringify(snapshot));
// Record monitoring health to Analytics Engine
env.METRICS.writeDataPoint({
blobs: ['monitoring', 'snapshot', 'success'],
doubles: [history.length, sizeKB],
indexes: ['monitoring'],
});
}
Decision Framework
Use this flowchart to decide where to store your monitoring data:
Do you need to query arbitrary time ranges?
├── YES → Use Analytics Engine
│ ├── Need retention > 92 days? → Also roll up daily to D1
│ └── Need real-time? → Also use KV for latest snapshot
└── NO
├── Do you need current state only?
│ └── YES → Use KV (health:latest)
└── Do you need short-term history (24-48h)?
└── YES → Use KV (health:snapshots array)
└── Also need activity feed? → Use D1 with bounded time queries
The answer for most Workers pipelines is: KV for current state + short-term history, Analytics Engine for trending, D1 for activity feeds only.
References
Official Cloudflare Documentation
- Workers Analytics Engine — Overview — Overview of the time-series analytics system built into Workers
- Workers Analytics Engine — Get Started — Binding configuration, writeDataPoint API, data point structure
- Workers Analytics Engine — SQL API — HTTP API endpoint, authentication, query format
- Workers Analytics Engine — SQL Reference — Supported SQL functions, time bucketing, sampling
- Workers Analytics Engine — Querying from a Worker — Fetch-based querying from inside a Worker
- Workers Analytics Engine — Pricing — $0.25/M writes, $1.00/M queries, currently not yet billed
- D1 Pricing — $0.001/M row reads, $1.00/M row writes, 25B reads included
- KV Pricing — $0.20/M reads, $1.00/M writes, $0.20/GB storage
- KV Limits — 25 MB max value size, eventual consistency model
- Workers Pricing — 10M requests included, CPU time limits
- Workers Metrics and Analytics — Built-in Worker analytics dashboard
Cloudflare Blog Posts
- Introducing Workers Analytics Engine — Original announcement with design rationale
- How Cloudflare instruments services using Workers Analytics Engine — Dogfooding AE for internal monitoring
- Making state easy with D1 GA, Hyperdrive, Queues and Workers Analytics Engine updates — D1 GA announcement with pricing details
- Workers Analytics Engine SQL enhancements (Sept 2025) — New mathematical, aggregate, and bit functions
Related Articles
- Cost Observability for Cloudflare Workers — Deep dive on the D1 row reads incident, Analytics Engine as metrics store, GFS retention pattern, budget governor
- Event-Driven Architecture on Cloudflare Workers — Queue patterns, fan-out, consumer middleware
- Durable Object Patterns on Cloudflare Workers — Control plane/data plane, adaptive controller, event reactor
- Building an Autonomous Data Pipeline on Cloudflare Workers — Complete pipeline architecture with D1, Queues, DOs, R2, Workflows
External Tools
- Grafana Cloud — Free tier available, can query Analytics Engine via JSON API datasource
- Better Uptime — External uptime monitoring, can ping Worker health endpoints
- Hono — Lightweight web framework for Cloudflare Workers, used for all route examples in this article
Built from production experience running continuous Workers pipelines. The patterns in this article eliminated $65+/month in D1 overages and reduced monitoring cost to effectively $0.