Skip to content
Gary Wu
Go back

Cost Observability for Cloudflare Workers

Edit page

Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28


Cost Observability for Cloudflare Workers — How We Burned $19.63 in D1 Row Reads and Built a Free Monitoring System

A single cron job ran 21 COUNT(*) queries every minute on a 6.4 GB D1 database for six days. Nobody noticed. The billing dashboard showed 19.6 billion row reads and a $19.63 charge — on a platform where 25 billion reads per month are included free. This article explains what went wrong, why platform costs are invisible by default, and how to build a zero-cost observability system using Workers Analytics Engine and D1 that catches runaway costs before they become invoices.

What you’ll learn:


Table of Contents

Open Table of Contents

The Incident

The Setup

A project called aso-mrr tracked App Store rankings. It had a Cloudflare Worker with a cron trigger set to * * * * * — every minute. The cron handler did two things:

  1. Crawl cycle — check for pending keywords, scan app store categories, enrich app metadata. This was the useful work.
  2. Health snapshot — run 21 COUNT(*) queries across multiple tables and write the results to a scan_snapshots table. This was the monitoring.

Here is the actual health snapshot function that ran every minute:

async function computeAndStoreHealthSnapshot(db: D1Database) {
  const [
    totalApps, enrichedApps, pendingApps, totalKeywords, pendingKeywords,
    totalClusters, totalReviews, appsWithReviews, agentStats,
    appsLastHour, appsLast24h, keywordsLastHour, keywordsLast24h,
    enrichedLastHour, enrichedLast24h, reviewsLast24h,
    costTotal, costToday, costLast24h, costByService, costByHour,
  ] = await Promise.all([
    db.prepare('SELECT COUNT(*) as n FROM app_registry').first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE enrichment_source = 'scraper'").first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM app_registry WHERE enriched_at IS NULL').first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM crawl_keywords').first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM crawl_keywords WHERE status = 'pending'").first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM market_clusters').first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM app_reviews').first<{ n: number }>(),
    db.prepare('SELECT COUNT(*) as n FROM app_registry WHERE reviews_scraped_at IS NOT NULL').first<{ n: number }>(),
    getAgentStats(db),
    db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE first_seen_at >= datetime('now', '-1 hour')").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE first_seen_at >= datetime('now', '-24 hours')").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM crawl_keywords WHERE created_at >= datetime('now', '-1 hour')").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM crawl_keywords WHERE created_at >= datetime('now', '-24 hours')").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE enriched_at >= datetime('now', '-1 hour')").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM app_registry WHERE enriched_at >= datetime('now', '-24 hours')").first<{ n: number }>(),
    db.prepare("SELECT COUNT(*) as n FROM app_reviews WHERE scraped_at >= datetime('now', '-24 hours')").first<{ n: number }>(),
    db.prepare('SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls_ledger').first<{ cost: number; calls: number }>(),
    db.prepare("SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls_ledger WHERE created_at >= date('now')").first<{ cost: number; calls: number }>(),
    db.prepare("SELECT COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls FROM api_calls_ledger WHERE created_at >= datetime('now', '-24 hours')").first<{ cost: number; calls: number }>(),
    db.prepare(`
      SELECT service, COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls,
        COALESCE(SUM(CASE WHEN created_at >= date('now') THEN cost_usd ELSE 0 END), 0) as cost_today,
        COUNT(CASE WHEN created_at >= date('now') THEN 1 END) as calls_today
      FROM api_calls_ledger GROUP BY service ORDER BY cost DESC
    `).all(),
    db.prepare(`
      SELECT strftime('%Y-%m-%d %H:00', created_at) as hour,
        COALESCE(SUM(cost_usd), 0) as cost, COUNT(*) as calls
      FROM api_calls_ledger
      WHERE created_at >= datetime('now', '-24 hours')
      GROUP BY hour ORDER BY hour DESC LIMIT 24
    `).all(),
  ]);

  // Write snapshot to D1
  await db.prepare(`
    INSERT INTO scan_snapshots (
      apps_total, apps_enriched, apps_pending,
      keywords_total, keywords_pending, clusters,
      reviews_total, reviews_apps, cost_total_usd, cost_calls,
      health_json
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  `).bind(/* ... 11 values ... */).run();

  return health;
}

Twenty-one queries. Every minute. On tables with hundreds of thousands of rows.

The Math

The app_registry table had ~180,000 rows. The crawl_keywords table had ~120,000. The app_reviews table had ~500,000. The api_calls_ledger had ~50,000 with GROUP BY aggregations.

Each COUNT(*) without a covering index performs a full table scan. D1 counts every row it reads, regardless of what the query returns.

Per invocation:
  8 queries on app_registry    × 180,000 rows  = 1,440,000 row reads
  4 queries on crawl_keywords  × 120,000 rows  =   480,000 row reads
  2 queries on app_reviews     × 500,000 rows  = 1,000,000 row reads
  4 queries on api_calls_ledger × 50,000 rows  =   200,000 row reads
  3 queries (GROUP BY, JOIN)   × ~100,000 rows  =   300,000 row reads
  ─────────────────────────────────────────────
  Total per invocation:                          ~3,420,000 row reads

Per day (1,440 invocations):
  3,420,000 × 1,440 = ~4,924,800,000 row reads

Per 6 days:
  ~4,924,800,000 × 6 ≈ 29,548,800,000 row reads

Some queries had date-filtered WHERE clauses that reduced the scan, so the actual number was ~19.6 billion. The D1 pricing is $0.001 per million row reads after the included 25 billion/month. The bill: $19.63.

Key insight: The monitoring cost more than the work it was monitoring. The crawl cycle itself — the actual business logic — used perhaps 5,000 row reads per invocation. The health snapshot used 3,400,000. The monitoring was 680x more expensive than the work.

The Timeline

DayWhat HappenedRow ReadsCumulative Cost
1Deployed with * * * * * cron. Health snapshot runs. Nobody checks.~4.9B$0.00 (within free tier)
2Cron keeps running. No smoke test. Not in any team.~4.9B$0.00
3Crawl work completes. Cron keeps running because there’s no exit condition.~4.9B$0.00
4Passed 25B free tier threshold. Billing starts.~4.9B~$4.90
5Still running. No alerts configured.~4.9B~$9.80
6Discovered during manual dashboard check. Killed the cron.~4.9B$19.63

Six days. Zero alerts. Zero product output after day 2 (the crawl work was done). The Worker ran 8,640 invocations doing nothing but counting rows in a database nobody was watching.

What Changed

The fix was immediate and multi-layered:

1. Cron interval: * * * * * became */5 * * * *

// wrangler.jsonc — before
"triggers": {
  "crons": ["* * * * *"]
}

// wrangler.jsonc — after: 3 crons with distinct responsibilities
"triggers": {
  "crons": [
    "*/5 * * * *",   // Crawler (was every minute)
    "0 */6 * * *",   // Clustering (every 6 hours)
    "0 6 * * *"      // Daily snapshots (once per day)
  ]
}

2. Exit condition before expensive queries

async scheduled(event: ScheduledEvent, env: Bindings, ctx: ExecutionContext) {
  if (event.cron === '*/5 * * * *') {
    // Exit early if no work — costs 3 row reads, not 3,400,000
    const [pendingScan, pendingKeywords, enrichGap] = await Promise.all([
      env.DB.prepare(
        "SELECT COUNT(*) as n FROM category_scans WHERE scanned_at IS NULL OR scanned_at < datetime('now', '-7 days') LIMIT 1"
      ).first<{ n: number }>(),
      env.DB.prepare(
        "SELECT COUNT(*) as n FROM crawl_keywords WHERE status = 'pending' LIMIT 1"
      ).first<{ n: number }>(),
      env.DB.prepare(
        "SELECT COUNT(*) as n FROM app_registry WHERE enriched_at IS NULL LIMIT 1"
      ).first<{ n: number }>(),
    ]);

    const hasWork = (pendingScan?.n ?? 0) > 0
      || (pendingKeywords?.n ?? 0) > 0
      || (enrichGap?.n ?? 0) > 0;

    if (!hasWork) {
      // Write "idle" metric to Analytics Engine and return
      env.METRICS.writeDataPoint({
        blobs: ['aso-mrr', 'cron', 'crawl_idle'],
        doubles: [0, 0, 0, 0],
        indexes: ['aso-mrr'],
      });
      return;
    }

    // Only now do the actual work
    const stats = await runCrawlCycle(env.DB);

    // Write metrics to Analytics Engine (replaces 21 COUNT queries)
    env.METRICS.writeDataPoint({
      blobs: ['aso-mrr', 'cron', stats.phase],
      doubles: [
        stats.apps_discovered,
        stats.rankings_added,
        stats.keywords_searched,
        stats.categories_scanned,
      ],
      indexes: ['aso-mrr'],
    });
  }
}

3. Health snapshot moved to daily cron

The 21-query health snapshot now runs once per day at 06:00 UTC instead of 1,440 times per day. Same data, 1/1440th the cost.

4. Analytics Engine added for real-time metrics

// wrangler.jsonc — Analytics Engine binding (free, zero-setup)
"analytics_engine_datasets": [
  {
    "binding": "METRICS",
    "dataset": "platform_metrics"
  }
]

5. A deploy-readiness standard was written to prevent this from happening to any other Worker.

The result: row reads dropped from ~4.9 billion/day to ~50,000/day. A 99.999% reduction. The idle check costs 3 row reads per invocation. The daily snapshot costs 3.4 million reads once. Total: ~3.4 million/day instead of 4.9 billion/day.


Why Platform Costs Are Invisible

The Visibility Gap

Most teams track API costs carefully. If you call OpenAI, you log the token count. If you call DataForSEO, you log the credits. These are application-level costs — you wrote the code that makes the call, you see the charge.

Platform costs are different. They happen below your application code:

Cost TypeVisibilityHow You Find Out
API calls (OpenAI, Stripe, etc.)High — you wrote the fetch()Your own logging
D1 row readsNone — happens inside db.prepare().run()Monthly invoice
D1 row writesNone — sameMonthly invoice
R2 Class A ops (PUT, POST, LIST)None — happens inside bucket.put()Monthly invoice
R2 Class B ops (GET, HEAD)None — sameMonthly invoice
R2 storageLow — you know what you storedMonthly invoice
Workers invocationsLow — you know your crons existMonthly invoice
Workers CPU timeNone — you don’t see millisecondsMonthly invoice
KV reads/writesNone — transparent to your codeMonthly invoice
Durable Object durationLow — you know DOs are activeMonthly invoice
Queue messagesLow — you know message volumeMonthly invoice

Key insight: D1 is the most dangerous cost center on Cloudflare because a single query can scan millions of rows, the cost accumulates silently, and there is no built-in query-level cost reporting. You have to extract rows_read from the query metadata yourself.

Why D1 Is Uniquely Dangerous

D1 charges per row read, not per query. A query that returns 1 row but scans 500,000 rows to find it costs 500,000 row reads. The pricing model punishes:

  1. Full table scansSELECT COUNT(*) FROM large_table reads every row
  2. Missing indexesWHERE clauses without indexes scan the entire table
  3. Frequent queries — a cron running every minute multiplies the base cost by 1,440
  4. Wide scans with narrow resultsWHERE created_at > datetime('now', '-1 hour') on a table with 500,000 rows and no index on created_at reads all 500,000 rows

The D1 query response includes a meta object with rows_read and rows_written:

const result = await db.prepare('SELECT COUNT(*) as n FROM app_registry').run();
console.log(result.meta);
// {
//   duration: 12.5,
//   rows_read: 180000,    <-- THIS is your cost signal
//   rows_written: 0,
//   last_row_id: 0,
//   changed_db: false,
//   size_after: 6442450944,
//   changes: 0
// }

That rows_read: 180000 is telling you “this query just cost you 0.00018 dollars.” Nobody reads it. Nobody logs it. It happens inside the D1 driver and disappears.

The Cloudflare Dashboard Gap

The Cloudflare dashboard shows aggregate metrics per Worker:

It does not show:

You get a single number on your monthly invoice. If that number is surprising, you have to work backwards through your code to figure out which queries caused it.

What Other Platforms Do Better

AWS CloudWatch shows you DynamoDB read capacity units per table per minute. Google Cloud shows you BigQuery bytes scanned per query. Azure shows you Cosmos DB request units per operation. All of these are visible in real-time dashboards, have built-in alarms, and are included in the platform’s free monitoring tier.

Cloudflare gives you the raw data in the query meta object but does not aggregate, store, or alert on it. That is the gap this article fills.


Core Concepts

Workers Analytics Engine

Workers Analytics Engine is a time-series data store built into the Cloudflare Workers runtime. You write data points from your Worker code, and you query them via a SQL API. It is designed for metrics, counters, and event tracking — exactly the use case for cost observability.

interface AnalyticsEngineDataPoint {
  // Up to 20 strings for dimensions (grouping, filtering)
  blobs: string[];
  // Up to 20 numbers for values (the things you measure)
  doubles: number[];
  // Exactly 1 string used as a sampling key
  indexes: string[];
}

// Binding type in your Worker
interface Env {
  METRICS: AnalyticsEngineDataset;
}

Writing a data point is a single method call that returns immediately. The runtime handles persistence in the background — it does not block your Worker.

env.METRICS.writeDataPoint({
  blobs: ['aso-mrr', 'query', 'app_registry'],
  doubles: [180000, 0, 12.5],  // rows_read, rows_written, duration_ms
  indexes: ['aso-mrr'],
});

Key insight: Analytics Engine writes are fire-and-forget. They add zero latency to your Worker’s response. The runtime batches and ships them asynchronously. This makes it safe to instrument every query without performance impact.

Limits and pricing:

MetricFree PlanWorkers Paid ($5/mo)
Data points written100,000/day10M/month included, then $0.25/M
Read queries10,000/day1M/month included, then $1.00/M
Data retention3 months (92 days)3 months (92 days)
Max blobs per data point2020
Max doubles per data point2020
Max blob size (total)16 KB16 KB
Max index size96 bytes96 bytes
Max writes per invocation250250

As of the time of writing, Cloudflare has published the pricing but is not yet billing for Analytics Engine usage. It is effectively free.

D1 Query Metadata

Every D1 query returns metadata about what it cost. This is the foundation of self-instrumented cost tracking.

interface D1Meta {
  duration: number;      // Query execution time in milliseconds
  rows_read: number;     // Number of rows scanned (THIS IS YOUR COST)
  rows_written: number;  // Number of rows modified
  last_row_id: number;   // Auto-increment ID of last inserted row
  changed_db: boolean;   // Whether the database was modified
  size_after: number;    // Database size in bytes after the query
  changes: number;       // Number of rows changed
}

// Access via .run() result
const result = await db.prepare('SELECT ...').run();
const meta: D1Meta = result.meta;

The key fields for cost tracking are rows_read and rows_written. D1 pricing:

Row reads are 1,000x cheaper than row writes per unit, but they accumulate much faster because every query reads rows and only writes touch the write counter.

The GFS Retention Model

GFS (Grandfather-Father-Son) is a data retention strategy borrowed from backup systems and popularized by RRDtool for time-series data. The idea: keep high-resolution data for a short window, medium-resolution data for a longer window, and low-resolution data forever.

interface GFSRetentionPolicy {
  son: {
    resolution: '15min';
    retention: '48h';
    purpose: 'Real-time debugging, anomaly detection';
  };
  father: {
    resolution: '1h';
    retention: '30d';
    purpose: 'Daily trends, weekly comparisons';
  };
  grandfather: {
    resolution: '1d';
    retention: 'forever';
    purpose: 'Monthly reports, capacity planning, billing reconciliation';
  };
}

In practice, this means:

Time axis: ──────────────────────────────────────────────►
            │← 48 hours →│← 30 days ──→│← forever ──────
Resolution: 15-minute      hourly        daily
Table:      metrics_15min  metrics_hour  metrics_daily
Row count:  ~192           ~720          grows by 1/day

Key insight: The GFS model solves a fundamental problem with time-series on relational databases: unbounded growth. Without it, a table that records a metric every 15 minutes grows by 35,040 rows per year per metric. With GFS, the son and father tables have fixed maximum sizes, and only the grandfather table grows — at 365 rows per year per metric.

The Hybrid Architecture

Analytics Engine provides 92-day retention with zero setup. D1 with GFS provides permanent storage with controlled growth. Together they cover every use case:

┌──────────────────────────────────────────────────────────┐
│                    Worker Runtime                         │
│                                                          │
│  ┌─────────┐  writeDataPoint()   ┌───────────────────┐  │
│  │  Your   │ ───────────────────►│ Analytics Engine   │  │
│  │  Code   │                     │ (92-day retention) │  │
│  │         │  db.prepare().run() │                    │  │
│  │         │ ──────┐             └───────────────────┘  │
│  └─────────┘       │                                    │
│                    ▼                                    │
│            ┌──────────────┐                             │
│            │     D1       │                             │
│            │              │                             │
│            │ metrics_15m  │ ← son (48h, auto-pruned)    │
│            │ metrics_hour │ ← father (30d, auto-pruned) │
│            │ metrics_daily│ ← grandfather (forever)     │
│            └──────────────┘                             │
└──────────────────────────────────────────────────────────┘

         ┌──────────────┐
         │  SQL API      │ ← Query Analytics Engine
         │  (HTTP POST)  │    from dashboards, alerts,
         │               │    or other Workers
         └──────────────┘

When to use which:

NeedUseWhy
Last 5 minutes of query costsAnalytics EngineReal-time, zero-latency writes
Today’s cost by WorkerAnalytics EngineSQL API with WHERE timestamp > NOW() - INTERVAL '1' DAY
Cost trend over 90 daysAnalytics EngineWithin retention window
Cost trend over 1 yearD1 grandfather tableBeyond 92-day Analytics Engine retention
Budget alert (per-invocation)Analytics Engine write + D1 daily rollupCheck daily total against threshold
Monthly billing reconciliationD1 grandfather tablePermanent record

Workers Analytics Engine Deep Dive

Configuration

Add the Analytics Engine binding to your wrangler.jsonc:

{
  "analytics_engine_datasets": [
    {
      "binding": "METRICS",
      "dataset": "platform_metrics"
    }
  ]
}

Add the type to your environment bindings:

interface Env {
  METRICS: AnalyticsEngineDataset;
  DB: D1Database;
  // ... other bindings
}

No database to provision. No tables to create. No schema to define. The dataset is created on first write.

Writing Data Points

Every data point has three components:

env.METRICS.writeDataPoint({
  // blobs: up to 20 strings for dimensions
  // These are the columns you GROUP BY and filter with WHERE
  blobs: [
    'aso-mrr',           // blob1: worker name
    'query',             // blob2: operation type
    'app_registry',      // blob3: table name
    'SELECT COUNT(*)',   // blob4: query pattern
  ],

  // doubles: up to 20 numbers for values
  // These are the columns you SUM, AVG, MAX, MIN
  doubles: [
    180000,   // double1: rows_read
    0,        // double2: rows_written
    12.5,     // double3: duration_ms
    0.00018,  // double4: estimated_cost_usd
  ],

  // indexes: exactly 1 string for sampling
  // Analytics Engine uses this for consistent downsampling at high volumes
  indexes: ['aso-mrr'],
});

The call returns immediately. It does not await — the runtime handles persistence in the background via ctx.waitUntil internally.

Querying via SQL API

Query your data with SQL over HTTP:

curl -X POST \
  "https://api.cloudflare.com/client/v4/accounts/${ACCOUNT_ID}/analytics_engine/sql" \
  -H "Authorization: Bearer ${API_TOKEN}" \
  -H "Content-Type: text/plain" \
  --data "
    SELECT
      blob1 AS worker,
      blob2 AS operation,
      blob3 AS table_name,
      SUM(double1) AS total_rows_read,
      SUM(double4) AS total_cost_usd,
      COUNT() AS query_count
    FROM platform_metrics
    WHERE timestamp > NOW() - INTERVAL '24' HOUR
    GROUP BY worker, operation, table_name
    ORDER BY total_rows_read DESC
    FORMAT JSON
  "

Response:

{
  "data": [
    {
      "worker": "aso-mrr",
      "operation": "query",
      "table_name": "app_registry",
      "total_rows_read": 4924800000,
      "total_cost_usd": 4924.80,
      "query_count": 11520
    }
  ],
  "meta": [
    { "name": "worker", "type": "String" },
    { "name": "operation", "type": "String" },
    { "name": "table_name", "type": "String" },
    { "name": "total_rows_read", "type": "UInt64" },
    { "name": "total_cost_usd", "type": "Float64" },
    { "name": "query_count", "type": "UInt64" }
  ],
  "rows": 1
}

Querying from a Worker

You can query Analytics Engine from within a Worker using fetch:

async function queryMetrics(
  env: Env,
  sql: string
): Promise<AnalyticsQueryResult> {
  const response = await fetch(
    `https://api.cloudflare.com/client/v4/accounts/${env.CF_ACCOUNT_ID}/analytics_engine/sql`,
    {
      method: 'POST',
      headers: {
        Authorization: `Bearer ${env.CF_ANALYTICS_TOKEN}`,
        'Content-Type': 'text/plain',
      },
      body: sql,
    }
  );

  if (!response.ok) {
    const text = await response.text();
    console.error('[analytics] query failed', {
      status: response.status,
      body: text.slice(0, 500),
    });
    throw new Error(`Analytics Engine query failed: ${response.status}`);
  }

  return response.json();
}

// Usage: get today's D1 cost by worker
const todayCost = await queryMetrics(env, `
  SELECT
    blob1 AS worker,
    SUM(double1) / 1000000 * 0.001 AS cost_usd,
    SUM(double1) AS rows_read,
    COUNT() AS queries
  FROM platform_metrics
  WHERE
    blob2 = 'query'
    AND timestamp > NOW() - INTERVAL '24' HOUR
  GROUP BY worker
  ORDER BY cost_usd DESC
  FORMAT JSON
`);

The _sample_interval Column

At high write volumes, Analytics Engine downsamples data. When querying, you must account for this with the _sample_interval column:

-- WRONG: undercounts when data is sampled
SELECT SUM(double1) AS total_rows_read
FROM platform_metrics

-- CORRECT: accounts for sampling
SELECT SUM(double1 * _sample_interval) AS total_rows_read
FROM platform_metrics

At low volumes (under ~100K writes/day), _sample_interval is always 1.0 and this distinction does not matter. But build it into your queries from the start.

Data Model Design

Design your blob assignments for the queries you will actually run. Here is a schema that covers D1, R2, KV, and Workers costs:

// Standard data point schema for platform cost tracking
interface CostDataPoint {
  blobs: [
    string,  // blob1: worker_name     (e.g., 'aso-mrr', 'pages-plus')
    string,  // blob2: resource_type   (e.g., 'query', 'r2_put', 'kv_get')
    string,  // blob3: resource_name   (e.g., table name, bucket name, KV namespace)
    string,  // blob4: operation       (e.g., 'SELECT', 'INSERT', 'put', 'get')
    string,  // blob5: cron_schedule   (e.g., '*/5 * * * *', 'http', 'queue')
  ];
  doubles: [
    number,  // double1: rows_read (D1) or ops_count (R2/KV)
    number,  // double2: rows_written (D1) or bytes (R2/KV)
    number,  // double3: duration_ms
    number,  // double4: estimated_cost_usd
  ];
  indexes: [string];  // worker_name (for consistent sampling)
}

Key insight: Choose your blob layout based on your GROUP BY patterns. If you will always filter by worker_name and resource_type, put those in blob1 and blob2. Analytics Engine queries are fastest when filtering on the first few blobs.


The Hybrid Architecture

Why Two Stores?

Analytics Engine has a hard 92-day retention limit. After 92 days, your data is gone. For cost observability, you need:

The hybrid architecture writes to both stores simultaneously and uses each for what it is best at.

Data Flow

// Every D1 query writes to both stores
async function trackedQuery<T>(
  db: D1Database,
  metrics: AnalyticsEngineDataset,
  workerName: string,
  tableName: string,
  sql: string,
  params: unknown[] = []
): Promise<D1Result<T>> {
  const stmt = params.length > 0
    ? db.prepare(sql).bind(...params)
    : db.prepare(sql);

  const result = await stmt.run<T>();

  // 1. Write to Analytics Engine (real-time, 92-day)
  const costUsd = (result.meta.rows_read / 1_000_000) * 0.001
    + (result.meta.rows_written / 1_000_000) * 1.0;

  metrics.writeDataPoint({
    blobs: [workerName, 'query', tableName, sql.trim().split(/\s+/)[0]],
    doubles: [
      result.meta.rows_read,
      result.meta.rows_written,
      result.meta.duration,
      costUsd,
    ],
    indexes: [workerName],
  });

  // 2. Accumulate for D1 rollup (permanent)
  // This happens in a daily cron, not per-query
  // See Pattern 2: GFS Retention

  return result;
}

The Daily Rollup

A daily cron queries Analytics Engine and writes the aggregate to D1’s grandfather table:

async function dailyRollup(env: Env): Promise<void> {
  // Query Analytics Engine for yesterday's totals
  const yesterday = await queryMetrics(env, `
    SELECT
      blob1 AS worker,
      blob2 AS resource_type,
      blob3 AS resource_name,
      SUM(double1 * _sample_interval) AS total_rows_read,
      SUM(double2 * _sample_interval) AS total_rows_written,
      SUM(double3 * _sample_interval) AS total_duration_ms,
      SUM(double4 * _sample_interval) AS total_cost_usd,
      COUNT() AS total_operations
    FROM platform_metrics
    WHERE
      timestamp >= NOW() - INTERVAL '1' DAY
      AND timestamp < NOW()
    GROUP BY worker, resource_type, resource_name
    FORMAT JSON
  `);

  // Write to D1 grandfather table
  const date = new Date().toISOString().split('T')[0];
  const batch = yesterday.data.map((row: Record<string, unknown>) =>
    env.DB.prepare(`
      INSERT INTO metrics_daily (
        date, worker, resource_type, resource_name,
        rows_read, rows_written, duration_ms, cost_usd, operations
      ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
      ON CONFLICT (date, worker, resource_type, resource_name)
      DO UPDATE SET
        rows_read = excluded.rows_read,
        rows_written = excluded.rows_written,
        duration_ms = excluded.duration_ms,
        cost_usd = excluded.cost_usd,
        operations = excluded.operations
    `).bind(
      date,
      row.worker,
      row.resource_type,
      row.resource_name,
      row.total_rows_read,
      row.total_rows_written,
      row.total_duration_ms,
      row.total_cost_usd,
      row.total_operations
    )
  );

  await env.DB.batch(batch);
}

This produces one row per worker per resource per day. For a system with 5 workers and 10 resources each, that is 50 rows per day, or 18,250 rows per year. Trivial to query, trivial to store.


Pattern 1: Self-Instrumented D1 Wrapper

The Problem

D1’s prepare().run() returns rows_read in the metadata, but nobody captures it. The information exists — it is just discarded.

The Solution

Wrap D1 with a proxy that captures metadata and writes it to Analytics Engine on every query.

import type { D1Database, D1Result } from '@cloudflare/workers-types';

interface TrackedD1Options {
  db: D1Database;
  metrics: AnalyticsEngineDataset;
  workerName: string;
  /** Maximum queries to track per invocation (Analytics Engine limit: 250) */
  maxTrackedQueries?: number;
}

interface QueryStats {
  totalRowsRead: number;
  totalRowsWritten: number;
  totalDurationMs: number;
  queryCount: number;
  estimatedCostUsd: number;
}

class TrackedD1 {
  private db: D1Database;
  private metrics: AnalyticsEngineDataset;
  private workerName: string;
  private maxTracked: number;
  private tracked: number = 0;
  private stats: QueryStats = {
    totalRowsRead: 0,
    totalRowsWritten: 0,
    totalDurationMs: 0,
    queryCount: 0,
    estimatedCostUsd: 0,
  };

  constructor(options: TrackedD1Options) {
    this.db = options.db;
    this.metrics = options.metrics;
    this.workerName = options.workerName;
    this.maxTracked = options.maxTrackedQueries ?? 200;
  }

  prepare(sql: string): TrackedD1PreparedStatement {
    return new TrackedD1PreparedStatement(
      this.db.prepare(sql),
      sql,
      this,
    );
  }

  /** Run a batch of statements with tracking */
  async batch<T = unknown>(
    statements: TrackedD1PreparedStatement[]
  ): Promise<D1Result<T>[]> {
    const rawStatements = statements.map((s) => s.getRawStatement());
    const results = await this.db.batch<T>(rawStatements);

    // Track aggregate metadata for the batch
    let batchRowsRead = 0;
    let batchRowsWritten = 0;
    let batchDuration = 0;

    for (const result of results) {
      batchRowsRead += result.meta.rows_read;
      batchRowsWritten += result.meta.rows_written;
      batchDuration += result.meta.duration;
    }

    this.recordMetric('batch', batchRowsRead, batchRowsWritten, batchDuration);
    return results;
  }

  /** Record a metric for a single query or batch */
  recordMetric(
    tableName: string,
    rowsRead: number,
    rowsWritten: number,
    durationMs: number
  ): void {
    // Update running totals
    this.stats.totalRowsRead += rowsRead;
    this.stats.totalRowsWritten += rowsWritten;
    this.stats.totalDurationMs += durationMs;
    this.stats.queryCount += 1;

    const costUsd =
      (rowsRead / 1_000_000) * 0.001 +
      (rowsWritten / 1_000_000) * 1.0;
    this.stats.estimatedCostUsd += costUsd;

    // Write to Analytics Engine (respecting per-invocation limit)
    if (this.tracked < this.maxTracked) {
      this.metrics.writeDataPoint({
        blobs: [this.workerName, 'query', tableName],
        doubles: [rowsRead, rowsWritten, durationMs, costUsd],
        indexes: [this.workerName],
      });
      this.tracked++;
    }
  }

  /** Get accumulated stats for this invocation */
  getStats(): QueryStats {
    return { ...this.stats };
  }

  /** Write a summary data point at end of invocation */
  writeSummary(context: string): void {
    this.metrics.writeDataPoint({
      blobs: [this.workerName, 'invocation_summary', context],
      doubles: [
        this.stats.totalRowsRead,
        this.stats.totalRowsWritten,
        this.stats.totalDurationMs,
        this.stats.estimatedCostUsd,
      ],
      indexes: [this.workerName],
    });
  }
}

class TrackedD1PreparedStatement {
  private stmt: D1PreparedStatement;
  private sql: string;
  private tracker: TrackedD1;
  private boundStmt: D1PreparedStatement;

  constructor(
    stmt: D1PreparedStatement,
    sql: string,
    tracker: TrackedD1
  ) {
    this.stmt = stmt;
    this.sql = sql;
    this.tracker = tracker;
    this.boundStmt = stmt;
  }

  bind(...values: unknown[]): TrackedD1PreparedStatement {
    this.boundStmt = this.stmt.bind(...values);
    return this;
  }

  async run<T = unknown>(): Promise<D1Result<T>> {
    const result = await this.boundStmt.run<T>();
    this.tracker.recordMetric(
      this.extractTableName(),
      result.meta.rows_read,
      result.meta.rows_written,
      result.meta.duration
    );
    return result;
  }

  async first<T = unknown>(column?: string): Promise<T | null> {
    // first() doesn't return meta, so we use run() internally
    const result = await this.boundStmt.run<T>();
    this.tracker.recordMetric(
      this.extractTableName(),
      result.meta.rows_read,
      result.meta.rows_written,
      result.meta.duration
    );

    if (!result.results.length) return null;
    if (column) {
      return (result.results[0] as Record<string, unknown>)[column] as T;
    }
    return result.results[0] as T;
  }

  async all<T = unknown>(): Promise<D1Result<T>> {
    const result = await this.boundStmt.all<T>();
    this.tracker.recordMetric(
      this.extractTableName(),
      result.meta.rows_read,
      result.meta.rows_written,
      result.meta.duration
    );
    return result;
  }

  /** Get the underlying D1 statement for batch operations */
  getRawStatement(): D1PreparedStatement {
    return this.boundStmt;
  }

  private extractTableName(): string {
    // Extract table name from SQL for grouping
    const match = this.sql.match(
      /(?:FROM|INTO|UPDATE|TABLE)\s+(\w+)/i
    );
    return match?.[1] ?? 'unknown';
  }
}

// Factory function
function createTrackedD1(options: TrackedD1Options): TrackedD1 {
  return new TrackedD1(options);
}

export { TrackedD1, createTrackedD1, type QueryStats };

Usage

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    const db = createTrackedD1({
      db: env.DB,
      metrics: env.METRICS,
      workerName: 'aso-mrr',
    });

    // Use exactly like D1 — the API is identical
    const apps = await db
      .prepare('SELECT * FROM app_registry WHERE status = ? LIMIT 10')
      .bind('active')
      .all<App>();

    const count = await db
      .prepare('SELECT COUNT(*) as n FROM crawl_keywords')
      .first<{ n: number }>();

    // At end of request, write summary
    ctx.waitUntil(async () => {
      db.writeSummary('fetch');
      const stats = db.getStats();
      if (stats.estimatedCostUsd > 0.01) {
        console.warn('[cost] expensive request', {
          url: request.url,
          ...stats,
        });
      }
    });

    return Response.json({ apps: apps.results, count: count?.n });
  },
};

What This Gives You

After deploying the wrapper, you can answer questions like:

-- Which table is the most expensive?
SELECT
  blob3 AS table_name,
  SUM(double1 * _sample_interval) AS total_rows_read,
  SUM(double4 * _sample_interval) AS total_cost_usd
FROM platform_metrics
WHERE blob2 = 'query'
  AND timestamp > NOW() - INTERVAL '7' DAY
GROUP BY table_name
ORDER BY total_cost_usd DESC

-- Which cron is burning the most reads?
SELECT
  blob1 AS worker,
  toStartOfHour(timestamp) AS hour,
  SUM(double1 * _sample_interval) AS rows_read,
  COUNT() AS queries
FROM platform_metrics
WHERE blob2 = 'query'
  AND timestamp > NOW() - INTERVAL '24' HOUR
GROUP BY worker, hour
ORDER BY hour DESC

-- Cost per invocation over time (detect anomalies)
SELECT
  blob1 AS worker,
  blob3 AS context,
  toStartOfInterval(timestamp, INTERVAL '15' MINUTE) AS bucket,
  AVG(double4) AS avg_cost_per_invocation,
  MAX(double4) AS max_cost_per_invocation,
  COUNT() AS invocations
FROM platform_metrics
WHERE blob2 = 'invocation_summary'
  AND timestamp > NOW() - INTERVAL '6' HOUR
GROUP BY worker, context, bucket
ORDER BY bucket DESC

Pattern 2: GFS Retention on D1

Schema

-- Son: 15-minute granularity, 48-hour retention
CREATE TABLE IF NOT EXISTS metrics_15min (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  bucket TEXT NOT NULL,           -- ISO 8601 timestamp rounded to 15 min
  worker TEXT NOT NULL,
  resource_type TEXT NOT NULL,    -- 'query', 'r2_put', 'kv_get', etc.
  resource_name TEXT NOT NULL,    -- table name, bucket name, etc.
  rows_read INTEGER DEFAULT 0,
  rows_written INTEGER DEFAULT 0,
  duration_ms REAL DEFAULT 0,
  cost_usd REAL DEFAULT 0,
  operations INTEGER DEFAULT 0,
  created_at TEXT DEFAULT (datetime('now')),
  UNIQUE(bucket, worker, resource_type, resource_name)
);

-- Father: hourly granularity, 30-day retention
CREATE TABLE IF NOT EXISTS metrics_hourly (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  bucket TEXT NOT NULL,           -- ISO 8601 timestamp rounded to hour
  worker TEXT NOT NULL,
  resource_type TEXT NOT NULL,
  resource_name TEXT NOT NULL,
  rows_read INTEGER DEFAULT 0,
  rows_written INTEGER DEFAULT 0,
  duration_ms REAL DEFAULT 0,
  cost_usd REAL DEFAULT 0,
  operations INTEGER DEFAULT 0,
  created_at TEXT DEFAULT (datetime('now')),
  UNIQUE(bucket, worker, resource_type, resource_name)
);

-- Grandfather: daily granularity, forever
CREATE TABLE IF NOT EXISTS metrics_daily (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  date TEXT NOT NULL,             -- YYYY-MM-DD
  worker TEXT NOT NULL,
  resource_type TEXT NOT NULL,
  resource_name TEXT NOT NULL,
  rows_read INTEGER DEFAULT 0,
  rows_written INTEGER DEFAULT 0,
  duration_ms REAL DEFAULT 0,
  cost_usd REAL DEFAULT 0,
  operations INTEGER DEFAULT 0,
  created_at TEXT DEFAULT (datetime('now')),
  UNIQUE(date, worker, resource_type, resource_name)
);

-- Indexes for efficient queries and pruning
CREATE INDEX IF NOT EXISTS idx_15min_bucket ON metrics_15min(bucket);
CREATE INDEX IF NOT EXISTS idx_hourly_bucket ON metrics_hourly(bucket);
CREATE INDEX IF NOT EXISTS idx_daily_date ON metrics_daily(date);
CREATE INDEX IF NOT EXISTS idx_daily_worker ON metrics_daily(worker);

Rollup and Pruning

The rollup process runs on three schedules:

// 15min → hourly rollup: runs every hour
async function rollupToHourly(db: D1Database): Promise<void> {
  const hourAgo = new Date(Date.now() - 60 * 60 * 1000)
    .toISOString()
    .replace(/:\d{2}\.\d{3}Z$/, ':00:00Z');

  await db.prepare(`
    INSERT INTO metrics_hourly (bucket, worker, resource_type, resource_name,
      rows_read, rows_written, duration_ms, cost_usd, operations)
    SELECT
      strftime('%Y-%m-%dT%H:00:00Z', bucket) AS hour_bucket,
      worker, resource_type, resource_name,
      SUM(rows_read), SUM(rows_written), SUM(duration_ms),
      SUM(cost_usd), SUM(operations)
    FROM metrics_15min
    WHERE bucket < ?
    GROUP BY hour_bucket, worker, resource_type, resource_name
    ON CONFLICT (bucket, worker, resource_type, resource_name)
    DO UPDATE SET
      rows_read = excluded.rows_read,
      rows_written = excluded.rows_written,
      duration_ms = excluded.duration_ms,
      cost_usd = excluded.cost_usd,
      operations = excluded.operations
  `).bind(hourAgo).run();
}

// hourly → daily rollup: runs once per day
async function rollupToDaily(db: D1Database): Promise<void> {
  const yesterday = new Date(Date.now() - 24 * 60 * 60 * 1000)
    .toISOString()
    .split('T')[0];

  await db.prepare(`
    INSERT INTO metrics_daily (date, worker, resource_type, resource_name,
      rows_read, rows_written, duration_ms, cost_usd, operations)
    SELECT
      strftime('%Y-%m-%d', bucket) AS day,
      worker, resource_type, resource_name,
      SUM(rows_read), SUM(rows_written), SUM(duration_ms),
      SUM(cost_usd), SUM(operations)
    FROM metrics_hourly
    WHERE bucket < ?
    GROUP BY day, worker, resource_type, resource_name
    ON CONFLICT (date, worker, resource_type, resource_name)
    DO UPDATE SET
      rows_read = excluded.rows_read,
      rows_written = excluded.rows_written,
      duration_ms = excluded.duration_ms,
      cost_usd = excluded.cost_usd,
      operations = excluded.operations
  `).bind(yesterday + 'T00:00:00Z').run();
}

// Pruning: delete old data from son and father tables
async function pruneOldMetrics(db: D1Database): Promise<void> {
  const twoDaysAgo = new Date(Date.now() - 48 * 60 * 60 * 1000)
    .toISOString();
  const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
    .toISOString();

  await db.batch([
    // Prune son: keep 48 hours
    db.prepare('DELETE FROM metrics_15min WHERE bucket < ?')
      .bind(twoDaysAgo),
    // Prune father: keep 30 days
    db.prepare('DELETE FROM metrics_hourly WHERE bucket < ?')
      .bind(thirtyDaysAgo),
    // Grandfather: never prune
  ]);
}

Cron Handler

Wire the rollup and pruning into your cron:

async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
  // Every hour: rollup 15min → hourly, prune old 15min data
  if (event.cron === '0 * * * *') {
    await rollupToHourly(env.DB);
    await pruneOldMetrics(env.DB);
  }

  // Once per day at 01:00 UTC: rollup hourly → daily
  if (event.cron === '0 1 * * *') {
    await rollupToDaily(env.DB);
  }
}

Table Size Bounds

TableResolutionRetentionMax rows (5 workers, 10 resources)
metrics_15min15 min48 hours192 buckets x 50 combos = 9,600
metrics_hourly1 hour30 days720 buckets x 50 combos = 36,000
metrics_daily1 dayForever365/year x 50 combos = 18,250/year

The son and father tables are bounded. Only the grandfather grows, and at 18,250 rows per year for a moderately complex system, it will take decades to become a performance concern.

Key insight: The GFS model is what RRDtool pioneered in 1999 — fixed-size storage for time-series data. RRDtool uses circular buffers (Round Robin Archives). We use SQL DELETE with time predicates. The principle is the same: high-resolution data is expensive to keep, so downsample it progressively.


Pattern 3: Cost Dashboard from Analytics Engine

The API Route

Expose a cost dashboard endpoint from your Worker:

interface CostDashboard {
  period: string;
  totals: {
    rows_read: number;
    rows_written: number;
    cost_usd: number;
    queries: number;
  };
  by_worker: Array<{
    worker: string;
    rows_read: number;
    cost_usd: number;
    queries: number;
  }>;
  by_table: Array<{
    table_name: string;
    rows_read: number;
    cost_usd: number;
    queries: number;
  }>;
  hourly_trend: Array<{
    hour: string;
    rows_read: number;
    cost_usd: number;
  }>;
  alerts: Array<{
    type: string;
    message: string;
    value: number;
    threshold: number;
  }>;
}

async function getCostDashboard(
  env: Env,
  period: '1h' | '24h' | '7d' | '30d' = '24h'
): Promise<CostDashboard> {
  const intervalMap = {
    '1h': "INTERVAL '1' HOUR",
    '24h': "INTERVAL '1' DAY",
    '7d': "INTERVAL '7' DAY",
    '30d': "INTERVAL '30' DAY",
  };
  const interval = intervalMap[period];

  const [totals, byWorker, byTable, hourlyTrend] = await Promise.all([
    queryMetrics(env, `
      SELECT
        SUM(double1 * _sample_interval) AS rows_read,
        SUM(double2 * _sample_interval) AS rows_written,
        SUM(double4 * _sample_interval) AS cost_usd,
        COUNT() AS queries
      FROM platform_metrics
      WHERE blob2 = 'query'
        AND timestamp > NOW() - ${interval}
      FORMAT JSON
    `),
    queryMetrics(env, `
      SELECT
        blob1 AS worker,
        SUM(double1 * _sample_interval) AS rows_read,
        SUM(double4 * _sample_interval) AS cost_usd,
        COUNT() AS queries
      FROM platform_metrics
      WHERE blob2 = 'query'
        AND timestamp > NOW() - ${interval}
      GROUP BY worker
      ORDER BY cost_usd DESC
      FORMAT JSON
    `),
    queryMetrics(env, `
      SELECT
        blob3 AS table_name,
        SUM(double1 * _sample_interval) AS rows_read,
        SUM(double4 * _sample_interval) AS cost_usd,
        COUNT() AS queries
      FROM platform_metrics
      WHERE blob2 = 'query'
        AND timestamp > NOW() - ${interval}
      GROUP BY table_name
      ORDER BY cost_usd DESC
      LIMIT 20
      FORMAT JSON
    `),
    queryMetrics(env, `
      SELECT
        toStartOfHour(timestamp) AS hour,
        SUM(double1 * _sample_interval) AS rows_read,
        SUM(double4 * _sample_interval) AS cost_usd
      FROM platform_metrics
      WHERE blob2 = 'query'
        AND timestamp > NOW() - ${interval}
      GROUP BY hour
      ORDER BY hour ASC
      FORMAT JSON
    `),
  ]);

  // Generate alerts
  const alerts: CostDashboard['alerts'] = [];
  const totalCost = totals.data[0]?.cost_usd ?? 0;
  const totalReads = totals.data[0]?.rows_read ?? 0;

  // Alert: daily cost exceeds $1
  if (period === '24h' && totalCost > 1.0) {
    alerts.push({
      type: 'cost_high',
      message: `Daily D1 cost is $${totalCost.toFixed(2)}, exceeding $1.00 threshold`,
      value: totalCost,
      threshold: 1.0,
    });
  }

  // Alert: daily reads exceed 1B
  if (period === '24h' && totalReads > 1_000_000_000) {
    alerts.push({
      type: 'reads_high',
      message: `Daily row reads at ${(totalReads / 1e9).toFixed(1)}B, exceeding 1B threshold`,
      value: totalReads,
      threshold: 1_000_000_000,
    });
  }

  // Alert: single table dominates (>80% of reads)
  if (byTable.data.length > 0) {
    const topTable = byTable.data[0];
    const topPct = (topTable.rows_read / totalReads) * 100;
    if (topPct > 80) {
      alerts.push({
        type: 'table_hotspot',
        message: `Table '${topTable.table_name}' accounts for ${topPct.toFixed(0)}% of all reads`,
        value: topPct,
        threshold: 80,
      });
    }
  }

  return {
    period,
    totals: totals.data[0] ?? { rows_read: 0, rows_written: 0, cost_usd: 0, queries: 0 },
    by_worker: byWorker.data,
    by_table: byTable.data,
    hourly_trend: hourlyTrend.data,
    alerts,
  };
}

// Route handler
app.get('/v1/cost-dashboard', async (c) => {
  const period = (c.req.query('period') ?? '24h') as '1h' | '24h' | '7d' | '30d';
  const dashboard = await getCostDashboard(c.env, period);
  return c.json(dashboard);
});

Grafana Integration

Analytics Engine data can be visualized in Grafana using the Cloudflare datasource plugin:

Data source: Cloudflare
Authentication: API Token (Account Analytics:Read)
Query: SELECT toStartOfHour(timestamp) AS time,
              blob1 AS worker,
              SUM(double1 * _sample_interval) AS rows_read
       FROM platform_metrics
       WHERE $__timeFilter(timestamp)
       GROUP BY time, worker
       ORDER BY time ASC

You can also use the JSON API datasource to query your /v1/cost-dashboard endpoint directly.


Pattern 4: Budget Governor

The Idea

A budget governor is a per-invocation check that stops expensive operations before they run. It queries recent cost data and refuses to proceed if the budget is exhausted.

interface BudgetConfig {
  /** Maximum cost in USD per day */
  dailyCostLimit: number;
  /** Maximum row reads per day */
  dailyReadsLimit: number;
  /** Maximum cost per single invocation */
  perInvocationLimit: number;
}

const DEFAULT_BUDGET: BudgetConfig = {
  dailyCostLimit: 1.0,       // $1/day
  dailyReadsLimit: 5_000_000_000,  // 5B reads/day
  perInvocationLimit: 0.05,  // $0.05 per invocation
};

interface BudgetCheck {
  allowed: boolean;
  reason?: string;
  todayCost: number;
  todayReads: number;
  remainingBudget: number;
}

async function checkBudget(
  env: Env,
  workerName: string,
  config: BudgetConfig = DEFAULT_BUDGET
): Promise<BudgetCheck> {
  // Query today's accumulated cost from Analytics Engine
  const result = await queryMetrics(env, `
    SELECT
      COALESCE(SUM(double4 * _sample_interval), 0) AS cost_usd,
      COALESCE(SUM(double1 * _sample_interval), 0) AS rows_read
    FROM platform_metrics
    WHERE blob1 = '${workerName}'
      AND blob2 = 'invocation_summary'
      AND timestamp > toStartOfDay(NOW())
    FORMAT JSON
  `);

  const todayCost = result.data[0]?.cost_usd ?? 0;
  const todayReads = result.data[0]?.rows_read ?? 0;
  const remainingBudget = config.dailyCostLimit - todayCost;

  if (todayCost >= config.dailyCostLimit) {
    return {
      allowed: false,
      reason: `Daily cost limit reached: $${todayCost.toFixed(4)} >= $${config.dailyCostLimit}`,
      todayCost,
      todayReads,
      remainingBudget: 0,
    };
  }

  if (todayReads >= config.dailyReadsLimit) {
    return {
      allowed: false,
      reason: `Daily reads limit reached: ${todayReads.toLocaleString()} >= ${config.dailyReadsLimit.toLocaleString()}`,
      todayCost,
      todayReads,
      remainingBudget,
    };
  }

  return {
    allowed: true,
    todayCost,
    todayReads,
    remainingBudget,
  };
}

Usage in a Cron Handler

async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
  // Budget check before any work
  const budget = await checkBudget(env, 'aso-mrr');

  if (!budget.allowed) {
    console.warn('[budget] skipping cron cycle', {
      reason: budget.reason,
      todayCost: budget.todayCost,
    });

    // Still write a metric so we can see the skip in dashboards
    env.METRICS.writeDataPoint({
      blobs: ['aso-mrr', 'budget_skip', event.cron],
      doubles: [budget.todayCost, budget.todayReads, 0, 0],
      indexes: ['aso-mrr'],
    });

    return;
  }

  // Proceed with work
  const db = createTrackedD1({
    db: env.DB,
    metrics: env.METRICS,
    workerName: 'aso-mrr',
  });

  await runCrawlCycle(db);

  // Write invocation summary
  db.writeSummary(event.cron);

  // Check if this invocation was unexpectedly expensive
  const stats = db.getStats();
  if (stats.estimatedCostUsd > DEFAULT_BUDGET.perInvocationLimit) {
    console.error('[budget] expensive invocation', {
      cron: event.cron,
      cost: stats.estimatedCostUsd,
      limit: DEFAULT_BUDGET.perInvocationLimit,
      rowsRead: stats.totalRowsRead,
    });
  }
}

Key insight: The budget governor creates a cost feedback loop. Without it, cost grows linearly with invocations and nobody notices. With it, you get automatic circuit-breaking. The tradeoff is one Analytics Engine read query per invocation — cheap (free, currently) but not zero-latency. For crons running every 5 minutes, the ~50ms query time is negligible.

Limitations

The budget governor queries Analytics Engine for today’s accumulated cost. Analytics Engine writes are eventually consistent — there may be a few seconds of delay between writeDataPoint() and the data being queryable. This means:

  1. Two concurrent invocations might both pass the budget check before either’s cost is recorded
  2. The budget check itself costs one Analytics Engine read query

For most cost observability use cases, this eventual consistency is fine. You are protecting against runaway costs over hours, not millisecond precision. If you need hard real-time limits, use a Durable Object as a singleton cost gate with in-memory counters.


Pattern 5: Cron Exit Conditions

The Rule

Every cron handler must answer “is there work to do?” before doing any work. The check must be cheaper than the work by at least 100x.

Before: No Exit Condition

// BAD: runs every minute, 21 COUNT(*) queries whether there's work or not
async scheduled(event: ScheduledEvent, env: Env) {
  if (event.cron === '* * * * *') {
    const stats = await runCrawlCycle(env.DB);       // 5 queries
    await computeAndStoreHealthSnapshot(env.DB);     // 21 queries
    // Total: 26 queries × ~180K rows = ~4.7M reads per invocation
    // Per day: 4.7M × 1,440 = ~6.7B reads
  }
}

After: Exit Condition + Separate Schedules

async scheduled(event: ScheduledEvent, env: Env, ctx: ExecutionContext) {
  if (event.cron === '*/5 * * * *') {
    // 3 cheap queries with LIMIT 1 — stops after finding 1 matching row
    const [a, b, c] = await Promise.all([
      env.DB.prepare(
        "SELECT 1 FROM category_scans WHERE scanned_at IS NULL LIMIT 1"
      ).first(),
      env.DB.prepare(
        "SELECT 1 FROM crawl_keywords WHERE status = 'pending' LIMIT 1"
      ).first(),
      env.DB.prepare(
        "SELECT 1 FROM app_registry WHERE enriched_at IS NULL LIMIT 1"
      ).first(),
    ]);

    if (!a && !b && !c) {
      env.METRICS.writeDataPoint({
        blobs: ['aso-mrr', 'cron', 'idle'],
        doubles: [3, 0, 0, 0],  // 3 rows read, that's it
        indexes: ['aso-mrr'],
      });
      return; // Cost: 3 row reads. Done.
    }

    await runCrawlCycle(env.DB);
  }

  // Health snapshot: once per day, not 1,440 times
  if (event.cron === '0 6 * * *') {
    await computeAndStoreHealthSnapshot(env.DB);
  }
}

The SELECT 1 … LIMIT 1 Pattern

The cheapest possible existence check on D1 is:

SELECT 1 FROM table WHERE condition LIMIT 1

This query:

With a covering index, this costs exactly 1 row read. Without an index, it costs up to N row reads where N is the position of the first matching row, but it never scans the entire table.

Compare to COUNT(*):

-- Costs N row reads (full table scan, always)
SELECT COUNT(*) FROM app_registry WHERE enriched_at IS NULL

-- Costs 1-N row reads (stops at first match)
SELECT 1 FROM app_registry WHERE enriched_at IS NULL LIMIT 1

For an existence check (“is there any work?”), you never need COUNT(*). You just need to know if at least one row matches.

Stats Table Pattern

For cases where you genuinely need counts (not just existence checks), maintain a stats table updated by triggers or incremental updates:

CREATE TABLE IF NOT EXISTS table_stats (
  table_name TEXT PRIMARY KEY,
  row_count INTEGER DEFAULT 0,
  pending_count INTEGER DEFAULT 0,
  updated_at TEXT DEFAULT (datetime('now'))
);

-- Update on INSERT
CREATE TRIGGER IF NOT EXISTS trg_app_registry_insert
AFTER INSERT ON app_registry
BEGIN
  INSERT INTO table_stats (table_name, row_count, pending_count)
  VALUES ('app_registry', 1, CASE WHEN NEW.enriched_at IS NULL THEN 1 ELSE 0 END)
  ON CONFLICT (table_name) DO UPDATE SET
    row_count = row_count + 1,
    pending_count = pending_count + CASE WHEN NEW.enriched_at IS NULL THEN 1 ELSE 0 END,
    updated_at = datetime('now');
END;

-- Update on UPDATE (enrichment complete)
CREATE TRIGGER IF NOT EXISTS trg_app_registry_enrich
AFTER UPDATE OF enriched_at ON app_registry
WHEN OLD.enriched_at IS NULL AND NEW.enriched_at IS NOT NULL
BEGIN
  UPDATE table_stats SET
    pending_count = MAX(0, pending_count - 1),
    updated_at = datetime('now')
  WHERE table_name = 'app_registry';
END;

Now SELECT row_count FROM table_stats WHERE table_name = 'app_registry' costs 1 row read, always, regardless of table size.


Pattern 6: Billing Notifications as Alert Layer

The Last Resort

Cloudflare offers usage-based billing notifications that send email when a metric exceeds a threshold. This is your last line of defense — it catches what your code-level instrumentation misses.

Setting Up D1 Read Notifications

  1. Log into the Cloudflare dashboard
  2. Navigate to Account Home > Notifications
  3. Click Create
  4. Select Usage-Based Billing from the Event Type dropdown
  5. Configure:
    • Product: D1
    • Threshold: 5,000,000,000 (5B reads — 20% of your included 25B)
    • Notification email: your ops email
ResourceThresholdWhy
D1 row reads5B/month (20% of included)Early warning before you eat through the free tier
D1 row writes10M/month (20% of included)Write costs are 1000x reads per unit
R2 Class A operations1M/month (20% of included 5M)PUT/POST/LIST operations
R2 storage4 GB (80% of included 5 GB)Storage grows monotonically
Workers requests8M/month (80% of included 10M)Runaway crons or bots

Limitations

Billing notifications are:

This is why billing notifications are the last resort, not the primary alert. Use them as a safety net alongside code-level budget governors and Analytics Engine dashboards.

Wiring Billing Notifications to Telegram/Slack

Since billing notifications are email-only, you can route them to your team chat via:

// Cloudflare Email Worker — receives billing notification emails
// and forwards them to Telegram
export default {
  async email(message: ForwardableEmailMessage, env: Env) {
    const subject = message.headers.get('subject') ?? '';

    // Only process billing alerts
    if (!subject.includes('usage') && !subject.includes('billing')) {
      return;
    }

    // Forward to Telegram
    await fetch(
      `https://api.telegram.org/bot${env.TELEGRAM_BOT_TOKEN}/sendMessage`,
      {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          chat_id: env.TELEGRAM_CHAT_ID,
          text: `[BILLING ALERT] ${subject}`,
          parse_mode: 'HTML',
        }),
      }
    );
  },
};

Small Examples

Example 1: Track R2 Operations

function trackR2Op(
  metrics: AnalyticsEngineDataset,
  worker: string,
  bucket: string,
  operation: 'put' | 'get' | 'delete' | 'list',
  bytes: number
): void {
  // R2 pricing: Class A (put/list/delete) = $4.50/M, Class B (get) = $0.36/M
  const isClassA = operation !== 'get';
  const costPer = isClassA ? 4.5 / 1_000_000 : 0.36 / 1_000_000;

  metrics.writeDataPoint({
    blobs: [worker, `r2_${operation}`, bucket],
    doubles: [1, bytes, 0, costPer],
    indexes: [worker],
  });
}

// Usage
const object = await env.MY_BUCKET.put(key, body);
trackR2Op(env.METRICS, 'my-worker', 'my-bucket', 'put', body.length);

const gotten = await env.MY_BUCKET.get(key);
trackR2Op(env.METRICS, 'my-worker', 'my-bucket', 'get', gotten?.size ?? 0);

Example 2: Track KV Operations

function trackKVOp(
  metrics: AnalyticsEngineDataset,
  worker: string,
  namespace: string,
  operation: 'get' | 'put' | 'delete' | 'list',
  bytes: number = 0
): void {
  // KV pricing: reads $0.50/M, writes $5.00/M, deletes $5.00/M, lists $5.00/M
  const costMap: Record<string, number> = {
    get: 0.5 / 1_000_000,
    put: 5.0 / 1_000_000,
    delete: 5.0 / 1_000_000,
    list: 5.0 / 1_000_000,
  };

  metrics.writeDataPoint({
    blobs: [worker, `kv_${operation}`, namespace],
    doubles: [1, bytes, 0, costMap[operation]],
    indexes: [worker],
  });
}

// Usage
const value = await env.KV.get('my-key');
trackKVOp(env.METRICS, 'my-worker', 'MY_KV', 'get');

await env.KV.put('my-key', JSON.stringify(data));
trackKVOp(env.METRICS, 'my-worker', 'MY_KV', 'put', JSON.stringify(data).length);

Example 3: Track Workers AI Inference

async function trackedAIRun(
  ai: Ai,
  metrics: AnalyticsEngineDataset,
  worker: string,
  model: string,
  input: unknown
): Promise<unknown> {
  const start = Date.now();
  const result = await ai.run(model, input as AiOptions);
  const durationMs = Date.now() - start;

  // Workers AI pricing varies by model
  // Text generation: ~$0.011/1K input tokens, ~$0.019/1K output tokens
  // Image gen (SDXL/FLUX): ~$0.0005/step
  metrics.writeDataPoint({
    blobs: [worker, 'ai_inference', model],
    doubles: [1, 0, durationMs, 0], // cost estimated externally
    indexes: [worker],
  });

  return result;
}

// Usage
const response = await trackedAIRun(
  env.AI,
  env.METRICS,
  'my-worker',
  '@cf/meta/llama-3.1-8b-instruct',
  { messages: [{ role: 'user', content: 'Hello' }] }
);

Example 4: Track Queue Message Costs

function trackQueueSend(
  metrics: AnalyticsEngineDataset,
  worker: string,
  queue: string,
  messageCount: number,
  totalBytes: number
): void {
  // Queues pricing: $0.40/M messages (standard), $0.88/M (FIFO guarantee)
  const costUsd = (messageCount / 1_000_000) * 0.40;

  metrics.writeDataPoint({
    blobs: [worker, 'queue_send', queue],
    doubles: [messageCount, totalBytes, 0, costUsd],
    indexes: [worker],
  });
}

function trackQueueConsume(
  metrics: AnalyticsEngineDataset,
  worker: string,
  queue: string,
  batchSize: number,
  processingMs: number
): void {
  metrics.writeDataPoint({
    blobs: [worker, 'queue_consume', queue],
    doubles: [batchSize, 0, processingMs, 0],
    indexes: [worker],
  });
}

// Producer
await env.MY_QUEUE.send({ type: 'process', id: 123 });
trackQueueSend(env.METRICS, 'producer-worker', 'my-queue', 1, 50);

// Consumer
async queue(batch: MessageBatch<QueueMessage>, env: Env) {
  const start = Date.now();
  for (const msg of batch.messages) {
    await processMessage(msg.body);
    msg.ack();
  }
  trackQueueConsume(
    env.METRICS, 'consumer-worker', 'my-queue',
    batch.messages.length, Date.now() - start
  );
}

Example 5: Track Durable Object Duration

class MyDurableObject extends DurableObject {
  private metrics: AnalyticsEngineDataset;
  private startTime: number;

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    this.metrics = env.METRICS;
    this.startTime = Date.now();
  }

  async fetch(request: Request): Promise<Response> {
    const duration = Date.now() - this.startTime;

    // DO pricing: $12.50/M requests + duration billing
    // Duration: first 400K GB-s free, then $12.50/M GB-s
    this.metrics.writeDataPoint({
      blobs: ['my-worker', 'do_request', this.constructor.name],
      doubles: [1, 0, duration, 12.5 / 1_000_000],
      indexes: ['my-worker'],
    });

    return new Response('OK');
  }

  async alarm(): Promise<void> {
    this.metrics.writeDataPoint({
      blobs: ['my-worker', 'do_alarm', this.constructor.name],
      doubles: [1, 0, 0, 0],
      indexes: ['my-worker'],
    });
  }
}

Example 6: Cost Summary in Response Headers

export default {
  async fetch(request: Request, env: Env, ctx: ExecutionContext) {
    const db = createTrackedD1({
      db: env.DB,
      metrics: env.METRICS,
      workerName: 'api',
    });

    // ... handle request using db ...
    const result = await handleRequest(request, db);

    // Add cost headers for debugging
    const stats = db.getStats();
    const response = Response.json(result);
    response.headers.set('X-D1-Rows-Read', stats.totalRowsRead.toString());
    response.headers.set('X-D1-Rows-Written', stats.totalRowsWritten.toString());
    response.headers.set('X-D1-Cost-USD', stats.estimatedCostUsd.toFixed(6));
    response.headers.set('X-D1-Queries', stats.queryCount.toString());
    response.headers.set('X-D1-Duration-Ms', stats.totalDurationMs.toFixed(1));

    ctx.waitUntil(Promise.resolve().then(() => db.writeSummary('fetch')));

    return response;
  },
};

// Now you can see costs in curl:
// $ curl -s -D - https://my-worker.dev/api/data | head -10
// HTTP/2 200
// x-d1-rows-read: 5042
// x-d1-rows-written: 0
// x-d1-cost-usd: 0.000005
// x-d1-queries: 3
// x-d1-duration-ms: 8.2

Example 7: Anomaly Detection via Moving Average

async function detectCostAnomalies(env: Env): Promise<Array<{
  worker: string;
  current_cost: number;
  avg_cost: number;
  deviation: number;
  is_anomaly: boolean;
}>> {
  const result = await queryMetrics(env, `
    SELECT
      blob1 AS worker,
      SUM(CASE
        WHEN timestamp > NOW() - INTERVAL '1' HOUR
        THEN double4 * _sample_interval ELSE 0
      END) AS current_hour_cost,
      SUM(CASE
        WHEN timestamp > NOW() - INTERVAL '24' HOUR
        AND timestamp <= NOW() - INTERVAL '1' HOUR
        THEN double4 * _sample_interval ELSE 0
      END) / 23.0 AS avg_hourly_cost
    FROM platform_metrics
    WHERE blob2 = 'invocation_summary'
      AND timestamp > NOW() - INTERVAL '24' HOUR
    GROUP BY worker
    FORMAT JSON
  `);

  return result.data.map((row: Record<string, number>) => {
    const avg = row.avg_hourly_cost || 0.001; // avoid division by zero
    const deviation = row.current_hour_cost / avg;
    return {
      worker: row.worker as unknown as string,
      current_cost: row.current_hour_cost,
      avg_cost: avg,
      deviation,
      is_anomaly: deviation > 3.0, // 3x average = anomaly
    };
  });
}

Example 8: Weekly Cost Report via Cron

async function generateWeeklyReport(env: Env): Promise<string> {
  const result = await queryMetrics(env, `
    SELECT
      blob1 AS worker,
      toDate(timestamp) AS day,
      SUM(double1 * _sample_interval) AS rows_read,
      SUM(double4 * _sample_interval) AS cost_usd,
      COUNT() AS queries
    FROM platform_metrics
    WHERE blob2 = 'query'
      AND timestamp > NOW() - INTERVAL '7' DAY
    GROUP BY worker, day
    ORDER BY day ASC, cost_usd DESC
    FORMAT JSON
  `);

  let report = '=== Weekly Cost Report ===\n\n';
  let totalCost = 0;

  const byDay = new Map<string, typeof result.data>();
  for (const row of result.data) {
    const day = row.day as string;
    if (!byDay.has(day)) byDay.set(day, []);
    byDay.get(day)!.push(row);
    totalCost += row.cost_usd as number;
  }

  for (const [day, rows] of byDay) {
    const dayCost = rows.reduce((sum, r) => sum + (r.cost_usd as number), 0);
    report += `${day}: $${dayCost.toFixed(4)}\n`;
    for (const row of rows) {
      report += `  ${row.worker}: ${(row.rows_read as number).toLocaleString()} reads, $${(row.cost_usd as number).toFixed(4)}\n`;
    }
  }

  report += `\nTotal: $${totalCost.toFixed(4)}`;
  return report;
}

Example 9: Per-Route Cost Tracking Middleware

import { Hono } from 'hono';

type CostEnv = {
  DB: D1Database;
  METRICS: AnalyticsEngineDataset;
};

const app = new Hono<{ Bindings: CostEnv }>();

// Middleware: track D1 cost per route
app.use('*', async (c, next) => {
  const db = createTrackedD1({
    db: c.env.DB,
    metrics: c.env.METRICS,
    workerName: 'api',
  });

  // Replace the raw DB binding with the tracked version
  c.set('db', db);

  await next();

  // After response: write route-level summary
  const stats = db.getStats();
  c.env.METRICS.writeDataPoint({
    blobs: [
      'api',
      'route',
      c.req.method,
      new URL(c.req.url).pathname,
    ],
    doubles: [
      stats.totalRowsRead,
      stats.totalRowsWritten,
      stats.totalDurationMs,
      stats.estimatedCostUsd,
    ],
    indexes: ['api'],
  });
});

// Now every route automatically gets cost tracking
app.get('/api/apps', async (c) => {
  const db = c.get('db') as TrackedD1;
  const apps = await db
    .prepare('SELECT * FROM app_registry LIMIT 20')
    .all();
  return c.json(apps.results);
});

Example 10: D1 Query Optimizer Hints

// Detect expensive queries at runtime and suggest optimizations
function analyzeQueryCost(
  sql: string,
  meta: D1Meta,
  thresholds = { rowsRead: 10_000, durationMs: 100 }
): string | null {
  if (meta.rows_read < thresholds.rowsRead) return null;

  const suggestions: string[] = [];

  // Full table scan detection
  if (sql.includes('COUNT(*)') && !sql.includes('WHERE')) {
    suggestions.push(
      'COUNT(*) without WHERE is a full table scan. Use a stats table or cached counter.'
    );
  }

  // Missing index hint
  if (sql.includes('WHERE') && meta.rows_read > 10 * (meta.changes || 1)) {
    suggestions.push(
      `Query read ${meta.rows_read.toLocaleString()} rows but returned/changed ${meta.changes || 0}. Consider adding an index on the WHERE clause columns.`
    );
  }

  // Date range without index
  if (sql.match(/datetime\('now'/i) && meta.rows_read > 1000) {
    suggestions.push(
      'Date-filtered query scanning many rows. Add an index on the datetime column.'
    );
  }

  // LIMIT without index
  if (sql.includes('LIMIT') && !sql.includes('ORDER BY') && meta.rows_read > 1000) {
    suggestions.push(
      'LIMIT without ORDER BY + index still scans until enough rows match.'
    );
  }

  return suggestions.length > 0
    ? `[EXPENSIVE QUERY] ${meta.rows_read.toLocaleString()} rows read, ${meta.duration}ms\n  SQL: ${sql.slice(0, 200)}\n  ${suggestions.join('\n  ')}`
    : null;
}

Comparisons

Analytics Engine vs External Monitoring Systems

FeatureAnalytics EnginePrometheus + ThanosDatadogInfluxDB CloudTimescaleDB
SetupZero — binding in wrangler.jsoncDeploy Prometheus server + Thanos sidecarsSaaS signup + agent installSaaS signupDeploy PostgreSQL + TimescaleDB extension
CostFree (not yet billing)Free OSS + infra costs ($50-500/mo for Thanos storage)$15/host/mo + $0.10/M custom metricsFree tier: 5 writes/s, 300 reads/5min; paid from $0.002/writePostgreSQL hosting ($20-200/mo)
Write latency<1ms (fire-and-forget in runtime)~10ms (local scrape)~5ms (agent buffered)~5ms (HTTP POST)~2ms (SQL INSERT)
Query languageSQL subset (familiar)PromQL (specialized, learning curve)Datadog Query Language (proprietary)Flux or InfluxQL (specialized)Full SQL (PostgreSQL)
Retention92 days fixedConfigurable (Thanos: unlimited with object storage)15 months (standard)30 days free, configurable on paidUnlimited (PostgreSQL storage)
CardinalityUnlimited (designed for high cardinality)Limited (high cardinality kills performance)Unlimited (but expensive)ConfigurableUnlimited (SQL)
Cloudflare integrationNative — zero network hopsRequires exporter + egressRequires agent + egressRequires HTTP exporterRequires HTTP exporter
AlertingDIY (query in cron)AlertManager (powerful rules)Built-in (monitors, SLOs, PagerDuty)Built-in (checks, notification rules)DIY or pg_cron + triggers
DashboardingGrafana plugin or custom APIGrafana (native)Built-in (excellent)Built-in (chronograph)Grafana (native via PostgreSQL)
Multi-tenancyPer-account datasetsLabel-based (DIY)Per-org (built-in)Per-org with bucketsSchema-based (DIY)
DownsamplingAutomatic (_sample_interval)Recording rules + Thanos compactionAutomatic on ingestAutomatic with retention policiesContinuous aggregates (manual setup)

When to Use What

Use Analytics Engine when:

Use Prometheus + Thanos when:

Use Datadog when:

Use InfluxDB when:

Use TimescaleDB when:

The Analytics Engine Advantage for Cloudflare-Native Workloads

For cost observability specifically — tracking D1 reads, R2 ops, KV operations, and Workers invocations — Analytics Engine wins on every dimension that matters:

  1. Zero network hops: writeDataPoint() happens inside the Workers runtime. No HTTP requests, no DNS resolution, no TLS handshakes. Every other option requires egress traffic.

  2. Zero setup: One binding in wrangler.jsonc. No databases to provision, no schemas to define, no agents to install, no credentials to manage.

  3. Zero cost: Not billing yet. Even when they start, the free tier (100K writes/day, 10K reads/day) covers most cost-tracking workloads.

  4. Zero latency: Fire-and-forget writes. The runtime batches and ships them. Your Worker’s response time is unaffected.

  5. Zero maintenance: No upgrades, no capacity planning, no disk space monitoring, no backup strategy.

The tradeoff is limited retention (92 days), limited alerting (DIY), and a SQL subset (not full PostgreSQL). The hybrid architecture with D1 covers the retention gap. The budget governor pattern covers alerting. And the SQL subset is more than sufficient for SUM, COUNT, GROUP BY, and WHERE — which is all cost dashboards need.

GFS vs RRDtool vs Prometheus Recording Rules vs TimescaleDB Continuous Aggregates

FeatureGFS on D1 (our approach)RRDtoolPrometheus Recording RulesTimescaleDB Continuous Aggregates
Storage modelSQL tables with DELETE pruningCircular buffer (fixed-size file)New time series from PromQL expressionsMaterialized views with auto-refresh
Resolution tiers3 tables (15min, hourly, daily)Multiple RRAs in one fileMultiple rules at different intervalsMultiple continuous aggregates
PruningExplicit DELETE in cronAutomatic (circular buffer overwrites)Retention config per tierRetention policies on hypertables
GrowthGrandfather table grows forever (controlled)Fixed size (defined at creation)Depends on cardinalityDepends on retention policy
SetupSQL DDL + cron handlerrrdcreate CLI with RRA definitionsYAML recording rulesCREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)
QueryStandard SQLrrdtool fetch or rrdgraphPromQL (same as live data)Standard SQL (same as raw data)
Cloudflare nativeYes (D1)No (file-based, needs filesystem)No (needs Prometheus server)No (needs PostgreSQL server)
Best forWorkers cost tracking with permanent historyNetwork monitoring (MRTG, Cacti)Kubernetes metrics aggregationHybrid time-series + relational workloads

Key insight: RRDtool solved this problem in 1999 with circular buffers that guarantee constant storage size. We are doing the same thing on D1 with SQL DELETE + INSERT. The principle is identical: recent data at high resolution, older data at progressively lower resolution. The implementation adapts to the serverless constraint — no filesystem, no long-running processes, just SQL and crons.


The Deploy-Readiness Standard

This standard was born from the aso-mrr incident. Every Worker with cron triggers, queue consumers, or Durable Object alarms must meet these 10 rules before shipping.

The 10 Rules

Rule 1: Every Deployed Worker MUST Have a Smoke Test

Add at least one route check to your smoke test script. If it is running, you are testing it.

# .tools/smoke-test.sh
check "aso-mrr health" "https://aso-mrr-api.apiservices.workers.dev/health"
check "pages-plus health" "https://pages-plus-api.apiservices.workers.dev/health"

If a Worker is not in the smoke test, it does not exist to your operations process. It can run unmonitored for days (as aso-mrr did).

Rule 2: Every Worker with Cron Triggers MUST Have a Cost Ceiling

Either:

// Option A: Budget governor (code-level)
const budget = await checkBudget(env, 'my-worker');
if (!budget.allowed) return;

// Option B: CF notification rule (dashboard-level)
// Set threshold for D1 reads > 5B/month

Rule 3: Every Worker with Crons MUST Have Exit Conditions

If the work queue is empty, return early BEFORE running monitoring/snapshot queries. “Nothing to do” should cost nothing to confirm.

// Good: 3 row reads when idle
const hasWork = await checkForWork(db);
if (!hasWork) return;

// Bad: 3,400,000 row reads when idle
await computeAndStoreHealthSnapshot(db);

Rule 4: Every Deployed Worker MUST Belong to a Team

Create a team label and assign issues. If it does not appear in planning, it does not exist.

gh issue create --title "aso-mrr: cost monitoring" --label "team-3-infra"

Rule 5: Every Worker MUST Be in the Train System

Deployed Workers without a train slot get reviewed at each planning session. Orphan Workers are candidates for shutdown.

Rule 6: No * * * * * Crons Without Justification

Every-minute crons must document why sub-minute latency matters. Default to */5 * * * * or slower.

{
  "triggers": {
    "crons": [
      // Crawler: 5-min is fast enough — rankings don't change per-minute
      "*/5 * * * *",
      // Clustering: 6-hour intervals — clusters are stable
      "0 */6 * * *",
      // Snapshots: daily — nobody watches more than once a day
      "0 6 * * *"
    ]
  }
}

Rule 7: Output Validation Before Input Scaling

Before scaling data collection, verify the full pipeline end-to-end. An empty output table after 24 hours of collection is a P0 bug.

// Before enabling autonomous crawling:
// 1. Crawl 10 apps manually
// 2. Verify they appear in the enriched table
// 3. Verify the dashboard shows them
// 4. THEN enable the cron

Rule 8: COUNT(*) on Large Tables MUST Be Cached

Never run unbounded COUNT(*) queries in a recurring cron. Use materialized counters.

// Bad: 180,000 row reads every invocation
const total = await db.prepare('SELECT COUNT(*) FROM app_registry').first();

// Good: 1 row read every invocation
const total = await db.prepare(
  "SELECT row_count FROM table_stats WHERE table_name = 'app_registry'"
).first();

Rule 9: Health Snapshots MUST Be Proportional to Audience

If nobody is watching the dashboard, the snapshot interval should be hourly or daily, not per-minute. Match monitoring frequency to how often a human actually looks at the data.

Rule 10: New Deploys MUST Log to the Session Log

First deploy of a new Worker gets a session log entry noting what is running, what crons are active, and expected cost.

The Checklist

- [ ] Worker has at least one route in smoke test
- [ ] Worker with crons has a CF notification rule OR budget governor
- [ ] All cron handlers return early when work queue is empty
- [ ] Worker has a team label on at least one GitHub issue
- [ ] Worker's first deploy was logged to session log
- [ ] No `* * * * *` crons without documented justification
- [ ] COUNT(*) on tables > 10K rows is cached
- [ ] Health snapshot interval matches audience frequency
- [ ] Output pipeline validated end-to-end before autonomous input
- [ ] Cron interval documented with rationale in wrangler.jsonc comment

Anti-Patterns

Don’tDo InsteadWhy
* * * * * by default*/5 * * * * or slower; justify if faster neededEvery-minute crons multiply cost by 1,440x per day vs daily
21 COUNT(*) every minute on million-row tablesStats table with triggers, or KV-cached countersEach COUNT(*) is a full table scan; multiply by invocation frequency
Deploy and forgetTeam label + train slot + smoke testOrphan Workers run forever with nobody watching
Scale input before validating outputE2E test: input → output → user value, then scaleYou might be burning money collecting data nobody uses
Monitor per-minute when nobody watchesMatch snapshot frequency to actual dashboard usageThe monitoring should not cost more than the work
Track API costs but ignore platform costsInstrument D1 reads, R2 ops, KV ops, DO durationPlatform costs accumulate silently — no per-query visibility by default
Assume cron will stop when work is doneExplicit exit condition before expensive queriesCrons run on schedule regardless of whether there is work
Use COUNT(*) for existence checksSELECT 1 FROM table WHERE condition LIMIT 1Existence checks need 1 row read, not a full table scan
Query Analytics Engine without _sample_intervalAlways multiply by _sample_intervalAt high volume, data is downsampled; omitting this undercounts
Hard-code cost thresholdsConfigure via environment variables or KVThresholds change as your system scales
Alert only on absolute costAlso alert on rate-of-change (3x average = anomaly)Absolute thresholds miss gradual degradation and sudden spikes on small workers
Build cost dashboards without time contextAlways include toStartOfHour(timestamp) groupingPoint-in-time totals are useless without trend context
Log rows_read from D1 meta and ignore itWrite it to Analytics Engine for aggregation and alertingLogged data in Workers Logs expires in 7-30 days and is not aggregatable
Run the health snapshot in the same cron as the workSeparate crons: frequent for work, infrequent for monitoringMixing monitoring with work means monitoring runs at work frequency
Rely on billing notifications as your only alertUse billing notifications as last resort; budget governor as primaryBilling notifications are monthly aggregate, email-only, after-the-fact

References

Official Cloudflare Documentation

Cloudflare Blog Posts

Time-Series and Retention Systems

Monitoring and Observability Comparisons

Community Discussions

Third-Party Tools


Appendix: Complete Wrangler Configuration

This is the full wrangler.jsonc for a Worker with cost observability:

{
  "name": "my-worker",
  "main": "src/index.ts",
  "compatibility_date": "2025-03-07",
  "compatibility_flags": ["nodejs_compat"],

  // Observability: structured logging + traces
  "observability": {
    "enabled": true,
    "head_sampling_rate": 1
  },

  // D1 database
  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "my-db",
      "database_id": "your-database-id",
      "migrations_dir": "migrations"
    }
  ],

  // Analytics Engine — cost tracking (free, zero-setup)
  "analytics_engine_datasets": [
    {
      "binding": "METRICS",
      "dataset": "platform_metrics"
    }
  ],

  // R2 bucket (if applicable)
  "r2_buckets": [
    {
      "binding": "STORAGE",
      "bucket_name": "my-bucket"
    }
  ],

  // KV namespace (if applicable)
  "kv_namespaces": [
    {
      "binding": "KV",
      "id": "your-kv-namespace-id"
    }
  ],

  // Crons: work + monitoring on separate schedules
  "triggers": {
    "crons": [
      // Main work: every 5 minutes (not every minute — see Rule 6)
      "*/5 * * * *",
      // GFS rollup: every hour
      "0 * * * *",
      // Daily snapshot + daily rollup: once at 01:00 UTC
      "0 1 * * *"
    ]
  },

  // Environment variables
  "vars": {
    "ENVIRONMENT": "production",
    "DAILY_COST_LIMIT": "1.00",
    "DAILY_READS_LIMIT": "5000000000"
  }
}

Appendix: Migration Files

Create GFS Tables

-- migrations/0001_create_metrics_tables.sql

-- Son: 15-minute granularity, 48-hour retention
CREATE TABLE IF NOT EXISTS metrics_15min (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  bucket TEXT NOT NULL,
  worker TEXT NOT NULL,
  resource_type TEXT NOT NULL,
  resource_name TEXT NOT NULL,
  rows_read INTEGER DEFAULT 0,
  rows_written INTEGER DEFAULT 0,
  duration_ms REAL DEFAULT 0,
  cost_usd REAL DEFAULT 0,
  operations INTEGER DEFAULT 0,
  created_at TEXT DEFAULT (datetime('now')),
  UNIQUE(bucket, worker, resource_type, resource_name)
);

-- Father: hourly granularity, 30-day retention
CREATE TABLE IF NOT EXISTS metrics_hourly (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  bucket TEXT NOT NULL,
  worker TEXT NOT NULL,
  resource_type TEXT NOT NULL,
  resource_name TEXT NOT NULL,
  rows_read INTEGER DEFAULT 0,
  rows_written INTEGER DEFAULT 0,
  duration_ms REAL DEFAULT 0,
  cost_usd REAL DEFAULT 0,
  operations INTEGER DEFAULT 0,
  created_at TEXT DEFAULT (datetime('now')),
  UNIQUE(bucket, worker, resource_type, resource_name)
);

-- Grandfather: daily granularity, forever
CREATE TABLE IF NOT EXISTS metrics_daily (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  date TEXT NOT NULL,
  worker TEXT NOT NULL,
  resource_type TEXT NOT NULL,
  resource_name TEXT NOT NULL,
  rows_read INTEGER DEFAULT 0,
  rows_written INTEGER DEFAULT 0,
  duration_ms REAL DEFAULT 0,
  cost_usd REAL DEFAULT 0,
  operations INTEGER DEFAULT 0,
  created_at TEXT DEFAULT (datetime('now')),
  UNIQUE(date, worker, resource_type, resource_name)
);

-- Indexes
CREATE INDEX IF NOT EXISTS idx_15min_bucket ON metrics_15min(bucket);
CREATE INDEX IF NOT EXISTS idx_hourly_bucket ON metrics_hourly(bucket);
CREATE INDEX IF NOT EXISTS idx_daily_date ON metrics_daily(date);
CREATE INDEX IF NOT EXISTS idx_daily_worker ON metrics_daily(worker);

Create Stats Table

-- migrations/0002_create_table_stats.sql

CREATE TABLE IF NOT EXISTS table_stats (
  table_name TEXT PRIMARY KEY,
  row_count INTEGER DEFAULT 0,
  pending_count INTEGER DEFAULT 0,
  updated_at TEXT DEFAULT (datetime('now'))
);

Appendix: Cost Quick Reference

Cloudflare Platform Costs (Workers Paid, $5/mo)

ResourceFree IncludedOverage RateUnit
D1 row reads25B/month$0.001/Mper million rows scanned
D1 row writes50M/month$1.00/Mper million rows modified
D1 storage5 GB$0.75/GB/monthdatabase size on disk
R2 Class A (PUT, POST, LIST)5M/month$4.50/Mper million operations
R2 Class B (GET, HEAD)50M/month$0.36/Mper million operations
R2 storage10 GB$0.015/GB/monthstored bytes
KV reads10M/day$0.50/Mper million reads
KV writes1M/day$5.00/Mper million writes
Workers requests10M/month$0.30/Mper million invocations
Workers CPU time30M ms/month$0.02/M msCPU milliseconds (not wall time)
Queues standard1M/month$0.40/Mper million messages
DO requests1M/month$0.15/Mper million requests to DO
DO duration400K GB-s/month$12.50/M GB-sgigabyte-seconds of active wall time
Analytics Engine writes100K/day (free), 10M/month (paid)$0.25/Mper million data points
Analytics Engine reads10K/day (free), 1M/month (paid)$1.00/Mper million SQL queries

Cost Formulas

// D1 cost estimator
function estimateD1Cost(rowsRead: number, rowsWritten: number): number {
  const readCost = Math.max(0, rowsRead - 25_000_000_000) / 1_000_000 * 0.001;
  const writeCost = Math.max(0, rowsWritten - 50_000_000) / 1_000_000 * 1.0;
  return readCost + writeCost;
}

// R2 cost estimator
function estimateR2Cost(
  classAOps: number,
  classBOps: number,
  storageGB: number
): number {
  const aCost = Math.max(0, classAOps - 5_000_000) / 1_000_000 * 4.50;
  const bCost = Math.max(0, classBOps - 50_000_000) / 1_000_000 * 0.36;
  const storageCost = Math.max(0, storageGB - 10) * 0.015;
  return aCost + bCost + storageCost;
}

// Workers invocation cost estimator
function estimateWorkersCost(
  requests: number,
  cpuTimeMs: number
): number {
  const reqCost = Math.max(0, requests - 10_000_000) / 1_000_000 * 0.30;
  const cpuCost = Math.max(0, cpuTimeMs - 30_000_000) / 1_000_000 * 0.02;
  return reqCost + cpuCost;
}

Summary

The aso-mrr incident was a $19.63 lesson in cost observability. The root cause was not technical complexity — it was invisibility. Platform costs (D1 row reads, R2 operations, Workers invocations) accumulate silently because Cloudflare does not surface per-query or per-invocation cost data in its dashboard. You have to build the visibility yourself.

The solution has four layers:

  1. Self-instrumented D1 wrapper — captures rows_read from every query’s metadata and writes it to Analytics Engine. Zero-latency, fire-and-forget.

  2. Analytics Engine for real-time — free metrics store with 92-day retention, SQL query API, and Grafana integration. Covers “is something burning right now?”

  3. D1 with GFS retention for permanent history — daily rollups from Analytics Engine to D1 grandfather table. Covers “what did we spend last quarter?”

  4. Deploy-readiness standard — 10 rules that prevent the incident from recurring. Exit conditions, cost ceilings, smoke tests, team ownership.

The total cost of this observability system is zero. Analytics Engine is free. The GFS tables add negligible storage to D1. The instrumentation wrapper adds zero latency. The deploy-readiness standard is documentation.

The $19.63 bill was the cheapest possible tuition for learning that monitoring your monitoring is not optional.


Edit page
Share this post on:

Previous Post
Viral Video Bible
Next Post
Event-Driven Architecture on Cloudflare Workers