Skip to content
Gary Wu
Go back

Event-Driven Architecture on Cloudflare Queues

Edit page

Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28


Event-Driven Architecture on Cloudflare Queues

Cloudflare Queues is the simplest way to connect Workers that don’t need to talk to each other in real time. You get at-least-once delivery, automatic retries, dead letter queues, and consumer auto-scaling — all configured in a single wrangler.jsonc file, with zero infrastructure to manage. This article shows how to design queue topologies for multi-service architectures, implement idempotent consumers, handle fan-out patterns, and avoid the mistakes that turn a clean event-driven system into an unreliable mess.

What you’ll learn:


The Problem

Most Workers architectures start synchronous. Worker A calls Worker B with fetch(). Worker B calls Worker C. Everything works until it doesn’t:

// The synchronous anti-pattern
export default {
  async fetch(request: Request, env: Env) {
    // Step 1: Research the keyword
    const research = await fetch("https://gatherfeed.example.com/api/v1/research", {
      method: "POST",
      body: JSON.stringify({ keyword: "best budgeting apps" }),
    });

    // Step 2: Generate content from research
    const content = await fetch("https://content-engine.example.com/generate", {
      method: "POST",
      body: JSON.stringify({ research: await research.json() }),
    });

    // Step 3: Publish the content
    const published = await fetch("https://pages-plus.example.com/publish", {
      method: "POST",
      body: JSON.stringify({ content: await content.json() }),
    });

    // If any step fails, everything fails.
    // If step 3 takes 45 seconds, the whole chain blocks.
    // If you retry, you re-run steps 1 and 2 unnecessarily.
    return new Response("Done");
  },
};

The problems compound:

  1. Cascading failures. If GatherFeed is slow, every upstream caller blocks. One service’s problem becomes everyone’s problem.
  2. Wasted compute. When you retry a failed chain, you re-execute work that already succeeded. Research costs money (API calls to Brave, Perplexity, etc.). Regenerating it because the publish step failed is waste.
  3. Tight coupling. Worker A must know Worker B’s URL, its API contract, and its error semantics. Change B’s interface and A breaks.
  4. No backpressure. If you send 100 keywords for research simultaneously, GatherFeed gets 100 concurrent requests. It can’t say “slow down” — it just times out or drops requests.
  5. No visibility. When a message disappears into a fetch() chain, you have no idea where it is. Did research complete? Is generation running? Did publish fail? You need distributed tracing just to answer basic questions.

Event-driven architecture solves all of these by replacing synchronous call chains with asynchronous message passing. Each service owns an inbound queue. Each service processes messages at its own pace. Each service reports what happened by emitting events. No service waits for another.

The result: a system where GatherFeed being slow for 30 seconds doesn’t cascade to the content generator, the publisher, or the social poster. They each process their own queue independently, at their own pace, with their own retry logic.


Core Concepts

The Queue Primitive

A Cloudflare Queue is a durable message buffer between a producer (the Worker that writes messages) and a consumer (the Worker that reads them). Messages are stored durably — once a write succeeds, the message won’t be lost. Consumers receive messages in batches, process them, and acknowledge or retry each one individually.

// The Queue<Body> interface — what your producer binding gives you
interface Queue<Body = unknown> {
  send(body: Body, options?: QueueSendOptions): Promise<void>;
  sendBatch(
    messages: Iterable<MessageSendRequest<Body>>,
    options?: QueueSendBatchOptions
  ): Promise<void>;
}

interface QueueSendOptions {
  contentType?: QueuesContentType; // "json" | "text" | "bytes" | "v8"
  delaySeconds?: number;           // delay delivery up to 24 hours
}

interface MessageSendRequest<Body = unknown> {
  body: Body;
  contentType?: QueuesContentType;
  delaySeconds?: number;
}

The consumer side is a queue() handler exported from your Worker, alongside the usual fetch() handler:

// The consumer receives a MessageBatch
interface MessageBatch<Body = unknown> {
  readonly queue: string;
  readonly messages: readonly Message<Body>[];
  ackAll(): void;
  retryAll(options?: QueueRetryOptions): void;
}

interface Message<Body = unknown> {
  readonly id: string;        // unique message ID assigned by the queue
  readonly timestamp: Date;   // when the message was written
  readonly body: Body;        // your payload
  readonly attempts: number;  // how many times delivery has been attempted
  ack(): void;
  retry(options?: QueueRetryOptions): void;
}

interface QueueRetryOptions {
  delaySeconds?: number; // delay before next attempt, up to 24 hours
}

Key insight: A queue decouples when work happens from who requests it. The producer says “do this eventually.” The consumer decides when and how to do it. This is the fundamental shift from synchronous RPC to asynchronous messaging.

At-Least-Once Delivery

Cloudflare Queues guarantees at-least-once delivery. This means:

This has a critical implication: every consumer must be idempotent. Processing the same message twice must produce the same result as processing it once. This is not optional — it’s a fundamental requirement of at-least-once systems.

// What at-least-once means in practice:
// This message WILL be delivered. It MIGHT be delivered twice.
await env.RESEARCH_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "research.requested",
  source: "scalable-media",
  timestamp: new Date().toISOString(),
  payload: { brand_slug: "llc-tax", keyword: "llc tax deductions" },
});

Commands vs. Events

There are two kinds of messages in an event-driven system. Conflating them leads to architectural confusion.

Commands are directed — they tell a specific service to do something:

// Command: "GatherFeed, research this keyword"
interface ResearchRequestedPayload {
  brand_slug: string;
  keyword: string;
  priority: "high" | "normal" | "low";
}

// Sent to: gatherfeed-commands queue
// Consumer: GatherFeed (and only GatherFeed)

Events are broadcast — they announce that something happened, and any interested service can react:

// Event: "Research for this keyword is complete"
interface ResearchCompletedPayload {
  brand_slug: string;
  research_id: string;
  keyword: string;
}

// Sent to: brand-events queue
// Consumers: anyone who cares (SM routes it to BrandAgent)

The naming convention makes this explicit:

Message TypeDirectionNaming PatternExample
CommandOne-to-one<noun>.<verb-past> imperativeresearch.requested, content.publish
EventOne-to-many<noun>.<verb-past> descriptiveresearch.completed, content.published

Key insight: Commands go to service-specific queues (gatherfeed-commands). Events go to a shared events queue (brand-events). This separation means you can add new event subscribers without touching the producer.

The Message Envelope

Every message through every queue uses the same envelope. This makes routing, logging, and debugging consistent across the entire system.

interface DomainMessage<T = unknown> {
  event_id: string;        // UUID v4 — deduplication key
  type: string;            // dot-notation: "research.requested"
  source: string;          // emitting service: "scalable-media"
  timestamp: string;       // ISO 8601: "2026-03-11T10:00:00Z"
  correlation_id?: string; // traces a chain of related messages
  payload: T;              // reference data — IDs, not full objects
}

The event_id is the most important field. It’s the deduplication key. When a consumer sees a message, it checks: “Have I already processed this event_id?” If yes, acknowledge and move on. If no, process it and record the event_id.

The correlation_id ties related messages together across services. When SM requests research for a brand cycle, every downstream message — the research command, the research completion event, the content generation, the publish — shares the same correlation_id. This is your distributed tracing without a tracing system.

// Helper function to create consistently-shaped messages
function createMessage<T>(
  type: string,
  source: string,
  payload: T,
  correlationId?: string
): DomainMessage<T> {
  return {
    event_id: crypto.randomUUID(),
    type,
    source,
    timestamp: new Date().toISOString(),
    correlation_id: correlationId,
    payload,
  };
}

Key insight: Keep payloads small — references (IDs), not full objects. The 128 KB message size limit exists, but you should aim for under 10 KB. Put the actual data in D1 or R2 and pass the ID in the message. This also means consumers can always fetch the latest state rather than working with a stale snapshot.


Patterns

Pattern 1: The Idempotent Consumer

This is the foundational pattern. Every other pattern builds on it. Without idempotency, at-least-once delivery becomes at-least-once-and-maybe-duplicated execution.

When to use it: Every queue consumer. No exceptions.

import type { Env } from "./types";
import type { DomainMessage } from "./messages";

/**
 * Queue consumer with idempotency check.
 *
 * Flow:
 * 1. For each message in the batch, check if event_id exists in processed_events
 * 2. If already processed → ack immediately (skip)
 * 3. If new → process the message
 * 4. Record event_id in processed_events
 * 5. Ack the message
 * 6. On failure → retry with exponential backoff
 */
export async function handleQueue(
  batch: MessageBatch<DomainMessage>,
  env: Env
): Promise<void> {
  for (const msg of batch.messages) {
    const { event_id, type } = msg.body;

    // Step 1: Idempotency check — have we seen this message before?
    const existing = await env.DB.prepare(
      "SELECT event_id FROM processed_events WHERE event_id = ?"
    )
      .bind(event_id)
      .first();

    if (existing) {
      // Already processed — acknowledge and skip
      msg.ack();
      continue;
    }

    try {
      // Step 2: Route and process the message
      await routeMessage(msg.body, env);

      // Step 3: Record as processed (INSERT OR IGNORE for extra safety)
      await env.DB.prepare(
        `INSERT OR IGNORE INTO processed_events (event_id, type, source, processed_at)
         VALUES (?, ?, ?, datetime('now'))`
      )
        .bind(event_id, type, msg.body.source)
        .run();

      // Step 4: Acknowledge — message will not be redelivered
      msg.ack();
    } catch (err) {
      console.error(`Queue error processing ${type} (${event_id}):`, err);
      // Step 5: Retry with delay — exponential backoff based on attempts
      const delay = Math.min(30 * Math.pow(2, msg.attempts - 1), 600);
      msg.retry({ delaySeconds: delay });
    }
  }
}

async function routeMessage(
  message: DomainMessage,
  env: Env
): Promise<void> {
  switch (message.type) {
    case "research.completed":
      await handleResearchCompleted(message, env);
      break;
    case "content.generated":
      await handleContentGenerated(message, env);
      break;
    case "content.published":
      await handleContentPublished(message, env);
      break;
    default:
      console.warn(`Unknown message type: ${message.type}`);
      // Don't retry unknown types — they'll never succeed
  }
}

The processed_events table is simple:

CREATE TABLE IF NOT EXISTS processed_events (
  event_id TEXT PRIMARY KEY,
  type TEXT NOT NULL,
  source TEXT,
  processed_at TEXT NOT NULL
);

-- Prune events older than 7 days (run periodically)
DELETE FROM processed_events
WHERE processed_at < datetime('now', '-7 days');

Gotchas:

Pattern 2: Producer-Side Fan-Out

Fan-out is when one action triggers multiple downstream messages. The BrandAgent pattern demonstrates this: when a user requests research for 10 keywords, the agent sends 10 individual messages to the research queue — one per keyword.

When to use it: When a single request maps to N independent units of work that should be processed, retried, and acknowledged independently.

import { Agent } from "agents";
import { createMessage, MessageTypes } from "./messages";
import type { Env } from "./types";

interface BrandState {
  brand_slug: string;
  brand_id: string;
  pending_research: number;
  total_content_generated: number;
  total_content_published: number;
}

export class BrandAgent extends Agent<Env, BrandState> {
  /**
   * Fan-out pattern: one request becomes N independent queue messages.
   *
   * Why fan-out at the producer instead of sending one batch message?
   * 1. Each keyword retries independently — one failure doesn't block others
   * 2. Consumer processes one keyword at a time — no batch timeout risk
   * 3. Queue concurrency auto-scales — 10 messages can process in parallel
   * 4. Each keyword has its own event_id — individual idempotency tracking
   */
  async requestResearch(
    keywords: string[],
    priority: "high" | "normal" | "low" = "normal"
  ) {
    const correlationId = `brand-cycle-${this.state.brand_slug}-${new Date().toISOString().slice(0, 10)}`;

    // Fan-out: one message per keyword
    for (const keyword of keywords) {
      const message = createMessage(
        MessageTypes.RESEARCH_REQUESTED,
        "scalable-media",
        {
          brand_slug: this.state.brand_slug,
          keyword,
          priority,
        },
        correlationId
      );

      await this.env.RESEARCH_QUEUE.send(message);
    }

    // Update agent state — track how many we sent
    this.setState({
      ...this.state,
      pending_research: this.state.pending_research + keywords.length,
    });
  }

  /**
   * React to research completion — trigger content generation.
   * This is the other side of fan-out: each completed keyword
   * triggers its own generation workflow.
   */
  async onResearchCompleted(event: DomainMessage<ResearchCompletedPayload>) {
    const { brand_slug, research_id, keyword } = event.payload;

    // Start a content generation workflow for this keyword
    const instance = await this.env.CONTENT_WORKFLOW.create({
      params: {
        brand_slug,
        keyword,
        research_id,
        template: "blog",
      },
    });

    this.setState({
      ...this.state,
      pending_research: Math.max(0, this.state.pending_research - 1),
    });
  }

  /**
   * After content is generated, fan-out to both publish and social queues.
   * One generated article triggers two independent downstream actions.
   */
  async onContentGenerated(event: DomainMessage<ContentGeneratedPayload>) {
    const { brand_slug, content_id, keyword } = event.payload;

    // Fan-out to publish queue
    const publishMessage = createMessage(
      MessageTypes.CONTENT_PUBLISH,
      "scalable-media",
      {
        brand_slug,
        content_id,
        target: "pages-plus",
      },
      event.correlation_id
    );
    await this.env.PUBLISH_QUEUE.send(publishMessage);

    // Fan-out to social queue
    const socialMessage = createMessage(
      MessageTypes.SOCIAL_CREATE,
      "scalable-media",
      {
        brand_slug,
        content_id,
        article_context: {
          title: `Article about ${keyword}`,
          keyword,
          meta_description: "",
          key_points: [],
        },
      },
      event.correlation_id
    );
    await this.env.SOCIAL_QUEUE.send(socialMessage);

    this.setState({
      ...this.state,
      total_content_generated: this.state.total_content_generated + 1,
    });
  }
}

Why fan-out at the producer, not the consumer?

You have two choices for where to split work:

  1. Producer-side fan-out (send N messages): The producer creates one message per unit of work. Each message is independent — its own event_id, its own retry lifecycle, its own acknowledgment.

  2. Consumer-side fan-out (send 1 message with N items): The producer sends one message with an array of keywords. The consumer loops over them. If any fails, the whole message retries — including the ones that already succeeded.

Producer-side fan-out is almost always better:

ConcernProducer-side (N messages)Consumer-side (1 message)
Independent retriesEach keyword retries independentlyAll keywords retry if any fails
Parallel processingQueue auto-scales across messagesOne consumer processes all sequentially
IdempotencyEach has unique event_idOne event_id for all — partial success is ambiguous
Batch timeout riskNone — each message is smallIf processing N keywords exceeds timeout, batch fails
Message sizeEach message is small (~1 KB)Message grows with N (risk hitting 128 KB limit)

Key insight: Fan-out at the producer means each unit of work gets its own lifecycle. This is the “control plane / data plane” split: the BrandAgent (control plane) decides what work to do and fans it out. GatherFeed (data plane) processes one keyword at a time without knowing about the batch.

Pattern 3: The Autonomous Consumer

In a traditional request-response model, the consumer sends a response back to the caller. In an event-driven model, the consumer has no idea who called it. Instead, it processes the message, stores the result in its own database, and emits an event saying “this happened.”

When to use it: Every consumer that produces results other services care about.

import type { Env, DomainMessage } from "../types";
import {
  ResearchRequestedSchema,
  ResearchCompletedSchema,
  type ResearchCompletedPayload,
} from "../types";
import { deepResearch } from "../lib/deep-research";
import { newId } from "../lib/id";

/**
 * GatherFeed's queue consumer — the autonomous consumer pattern.
 *
 * GatherFeed doesn't know who sent the message. It doesn't care.
 * It processes the keyword, stores the result in its own D1, and
 * emits a research.completed event. Anyone can subscribe to that event.
 *
 * This is the "Eyes" service — it sees what's out there and reports back.
 * It never asks "who wants to know?" It just publishes what it found.
 */
export async function handleResearchCommands(
  batch: MessageBatch<DomainMessage>,
  env: Env
): Promise<void> {
  for (const msg of batch.messages) {
    const { event_id, type } = msg.body;

    // Idempotency check
    const existing = await env.DB.prepare(
      "SELECT event_id FROM processed_events WHERE event_id = ?"
    )
      .bind(event_id)
      .first();

    if (existing) {
      msg.ack();
      continue;
    }

    try {
      if (type !== "research.requested") {
        console.warn(`GatherFeed: unknown message type: ${type}`);
        msg.ack(); // Don't retry messages we can't handle
        continue;
      }

      // Validate the payload with Zod
      const parsed = ResearchRequestedSchema.safeParse(msg.body.payload);
      if (!parsed.success) {
        console.error("GatherFeed: invalid payload", parsed.error);
        msg.ack(); // Malformed messages will never succeed — don't retry
        continue;
      }

      const { brand_slug, keyword } = parsed.data;

      // Check if fresh research already exists (cost savings)
      const cached = await env.DB.prepare(
        `SELECT id FROM research
         WHERE topic = ? AND project = ?
         AND created_at > datetime('now', '-7 days')
         LIMIT 1`
      )
        .bind(keyword, brand_slug)
        .first<{ id: string }>();

      let researchId: string;

      if (cached) {
        // Cache hit — no API cost
        researchId = cached.id;
        console.log(`GatherFeed: cached "${keyword}" (no API cost)`);
      } else {
        // Cache miss — call Brave Search + Workers AI
        const researchData = await deepResearch(keyword, env);
        researchId = newId();
        await env.DB.prepare(
          `INSERT INTO research (id, topic, title, content, source, tags, project, created_at, updated_at)
           VALUES (?, ?, ?, ?, 'brave+workersai', ?, ?, datetime('now'), datetime('now'))`
        )
          .bind(
            researchId,
            keyword,
            `Research: ${keyword}`,
            JSON.stringify(researchData),
            JSON.stringify(["auto-research", "v2-pipeline", brand_slug]),
            brand_slug
          )
          .run();
      }

      // Emit completion event — anyone can subscribe
      const completedPayload: ResearchCompletedPayload = {
        brand_slug,
        research_id: researchId,
        keyword,
      };

      // Validate outbound event shape (catch bugs before they propagate)
      ResearchCompletedSchema.parse(completedPayload);

      const completedEvent: DomainMessage<ResearchCompletedPayload> = {
        event_id: crypto.randomUUID(),
        type: "research.completed",
        source: "gatherfeed",
        timestamp: new Date().toISOString(),
        correlation_id: msg.body.correlation_id,
        payload: completedPayload,
      };

      await env.EVENTS_QUEUE.send(completedEvent);

      // Record original message as processed
      await env.DB.prepare(
        `INSERT OR IGNORE INTO processed_events (event_id, type, source, processed_at)
         VALUES (?, ?, ?, datetime('now'))`
      )
        .bind(event_id, type, msg.body.source)
        .run();

      msg.ack();
    } catch (err) {
      console.error(`GatherFeed: error processing ${type} (${event_id}):`, err);
      msg.retry({ delaySeconds: 30 });
    }
  }
}

Connection to Pattern 1: The autonomous consumer always includes the idempotency check. The two patterns are layered — idempotency is the base, autonomy is the behavior.

Connection to Pattern 2: The autonomous consumer emits events that trigger the next fan-out. GatherFeed emits research.completed, which the BrandAgent receives and fans out into content generation commands. The chain is: fan-out → autonomous processing → event emission → next fan-out.

Gotchas:

Pattern 4: Queue Topology Design

A queue topology is the map of all queues, producers, and consumers in your system. Getting this right is an architectural decision that’s hard to change later.

When to use it: Before you write any code. Design the topology first.

The fundamental rule: each service owns its inbound command queue. No service writes to another service’s internal queue. Commands go from producer to the consumer’s queue. Events go to the shared events queue.

┌──────────────────────────────────────────────────────────────────────┐
│                        Queue Topology                                │
│                                                                      │
│  ┌─────────────┐     gatherfeed-commands     ┌─────────────┐        │
│  │  Scalable    │ ─────────────────────────▶ │  GatherFeed  │        │
│  │  Media (SM)  │                            │  (Eyes)      │        │
│  │  [Brain]     │     publish-commands       ├─────────────┤        │
│  │              │ ─────────────────────────▶ │  Pages-Plus  │        │
│  │              │                            │  (Mouth)     │        │
│  │              │     social-commands         ├─────────────┤        │
│  │              │ ─────────────────────────▶ │  Social-Good │        │
│  │              │                            │  (Hands)     │        │
│  │              │                            └─────────────┘        │
│  │              │                                                    │
│  │              │ ◀─┐  brand-events                                  │
│  │              │   │  (shared)              ┌─────────────┐        │
│  └──────────────┘   ├──────────────────────  │  GatherFeed  │        │
│         ▲           │                        └─────────────┘        │
│         │           ├──────────────────────  Pages-Plus              │
│         │           │                                                │
│    sm-commands      └──────────────────────  Social-Good             │
│    (inbound)                                                         │
│                                                                      │
│  DLQs: sm-commands-dlq, gatherfeed-commands-dlq,                    │
│         brand-events-dlq, social-commands-dlq                        │
└──────────────────────────────────────────────────────────────────────┘

Here’s how each service declares its topology in wrangler.jsonc:

Scalable Media (the brain — produces to all, consumes from sm-commands and brand-events):

{
  "queues": {
    "producers": [
      { "queue": "gatherfeed-commands", "binding": "RESEARCH_QUEUE" },
      { "queue": "publish-commands", "binding": "PUBLISH_QUEUE" },
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" },
      { "queue": "social-commands", "binding": "SOCIAL_QUEUE" }
    ],
    "consumers": [
      {
        "queue": "sm-commands",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "dead_letter_queue": "sm-commands-dlq"
      },
      {
        "queue": "brand-events",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "dead_letter_queue": "brand-events-dlq"
      }
    ]
  }
}

GatherFeed (the eyes — consumes research commands, produces events):

{
  "queues": {
    "producers": [
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" }
    ],
    "consumers": [
      {
        "queue": "gatherfeed-commands",
        "max_batch_size": 5,
        "max_batch_timeout": 10,
        "dead_letter_queue": "gatherfeed-commands-dlq"
      }
    ]
  }
}

Pages-Plus (the mouth — consumes publish commands):

{
  "queues": {
    "producers": [
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" }
    ],
    "consumers": [
      {
        "queue": "publish-commands",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "dead_letter_queue": "publish-commands-dlq"
      }
    ]
  }
}

The complete topology for 5 services and 8 queues:

QueueProducer(s)ConsumerDLQPurpose
gatherfeed-commandsSMGatherFeedgatherfeed-commands-dlqResearch requests (one keyword per message)
publish-commandsSMPages-Pluspublish-commands-dlqContent publish commands
social-commandsSMSocial-Goodsocial-commands-dlqSocial post creation commands
sm-commandsExternal / APISMsm-commands-dlqInbound commands to SM
brand-eventsSM, GF, PP, SGSMbrand-events-dlqShared domain events (multi-producer)
sm-commands-dlqQueue infra(monitoring)Failed SM commands
gatherfeed-commands-dlqQueue infra(monitoring)Failed research commands
brand-events-dlqQueue infra(monitoring)Failed event processing

Gotchas:

Pattern 5: Event Router (Single Consumer, Multiple Handlers)

Because Cloudflare Queues supports only one push consumer per queue, you need a router pattern when multiple message types flow through the same queue. The shared brand-events queue carries research.completed, content.generated, content.published, and potentially many more event types.

When to use it: Any queue that carries more than one message type — especially the shared events queue.

import type { Env } from "./types";
import type {
  DomainMessage,
  ResearchCompletedPayload,
  ContentGeneratedPayload,
  ContentPublishedPayload,
} from "./messages";
import { MessageTypes } from "./messages";

/**
 * Event router for the brand-events queue.
 *
 * SM is the sole consumer of brand-events. It routes each event
 * to the appropriate handler, which typically delegates to a BrandAgent.
 *
 * This is the "fan-in" counterpart to producer-side fan-out:
 * multiple producers emit events → one router → per-brand agents.
 */
export async function handleQueue(
  batch: MessageBatch<DomainMessage>,
  env: Env
): Promise<void> {
  for (const msg of batch.messages) {
    const { event_id, type } = msg.body;

    // Idempotency check (Pattern 1)
    const existing = await env.DB.prepare(
      "SELECT event_id FROM processed_events WHERE event_id = ?"
    )
      .bind(event_id)
      .first();

    if (existing) {
      msg.ack();
      continue;
    }

    try {
      // Route by message type
      await routeMessage(msg.body, env);

      // Record as processed
      await env.DB.prepare(
        `INSERT OR IGNORE INTO processed_events (event_id, type, source, processed_at)
         VALUES (?, ?, ?, datetime('now'))`
      )
        .bind(event_id, type, msg.body.source)
        .run();

      msg.ack();
    } catch (err) {
      console.error(`Queue error processing ${type} (${event_id}):`, err);
      msg.retry({ delaySeconds: 30 });
    }
  }
}

async function routeMessage(
  message: DomainMessage,
  env: Env
): Promise<void> {
  switch (message.type) {
    case MessageTypes.RESEARCH_COMPLETED:
      await handleResearchCompleted(
        message as DomainMessage<ResearchCompletedPayload>,
        env
      );
      break;

    case MessageTypes.CONTENT_GENERATED:
      await handleContentGenerated(
        message as DomainMessage<ContentGeneratedPayload>,
        env
      );
      break;

    case MessageTypes.CONTENT_PUBLISHED:
      await handleContentPublished(
        message as DomainMessage<ContentPublishedPayload>,
        env
      );
      break;

    default:
      // Log unknown types but don't retry — they'll never succeed
      console.warn(`Unknown message type: ${message.type}`);
  }
}

/**
 * Route research.completed → BrandAgent.
 * The agent decides what to do next (generate content, skip, etc.)
 */
async function handleResearchCompleted(
  event: DomainMessage<ResearchCompletedPayload>,
  env: Env
): Promise<void> {
  const agent = getBrandAgent(event.payload.brand_slug, env);
  await agent.onResearchCompleted(event);
}

/**
 * Route content.generated → BrandAgent.
 * Agent triggers publish + social fan-out (Pattern 2).
 */
async function handleContentGenerated(
  event: DomainMessage<ContentGeneratedPayload>,
  env: Env
): Promise<void> {
  const agent = getBrandAgent(event.payload.brand_slug, env);
  await agent.onContentGenerated(event);
}

/**
 * Route content.published → update D1 + BrandAgent.
 * Direct D1 update + agent state tracking.
 */
async function handleContentPublished(
  event: DomainMessage<ContentPublishedPayload>,
  env: Env
): Promise<void> {
  const { content_id, published_url } = event.payload;

  // Update content status in D1
  await env.DB.prepare(
    `UPDATE content SET status = 'published', published_at = datetime('now'), published_to = ? WHERE id = ?`
  )
    .bind(JSON.stringify([published_url]), content_id)
    .run();

  // Route to BrandAgent for state counter update
  const agent = getBrandAgent(event.payload.brand_slug, env);
  await agent.onContentPublished(event);
}

/**
 * Get a BrandAgent Durable Object stub with name set.
 * Agents SDK requires .setName() before RPC calls.
 */
function getBrandAgent(brandSlug: string, env: Env) {
  const agentId = env.BRAND_AGENT.idFromName(brandSlug);
  const stub = env.BRAND_AGENT.get(agentId) as any;
  if (typeof stub.setName === "function") {
    stub.setName(brandSlug);
  }
  return stub;
}

Connection to Pattern 4: The topology determines which queues exist. The router determines how messages within a queue are dispatched. Together, they form the messaging backbone.

Gotchas:

Pattern 6: The Worker Export Shape

Every Worker that participates in the queue topology needs to export three handlers: fetch, queue, and optionally scheduled. The shape is always the same.

When to use it: Every Worker with queue bindings.

import { Hono } from "hono";
import type { Env } from "./types";
import type { DomainMessage } from "./messages";
import { handleQueue } from "./queue";

// Re-export Durable Object classes (required for DO bindings)
export { BrandAgent } from "./agents/brand-agent";
export { ContentWorkflow } from "./agents/content-workflow";

const app = new Hono<{ Bindings: Env }>();

app.get("/health", (c) => c.json({ status: "ok" }));

// ... your routes here ...

export default {
  // HTTP handler — serves your API
  async fetch(
    request: Request,
    env: Env,
    ctx: ExecutionContext
  ): Promise<Response> {
    return app.fetch(request, env, ctx);
  },

  // Queue handler — processes messages from bound queues
  async queue(
    batch: MessageBatch<DomainMessage>,
    env: Env
  ): Promise<void> {
    await handleQueue(batch, env);
  },

  // Cron handler — triggers periodic work
  async scheduled(
    event: ScheduledEvent,
    env: Env,
    ctx: ExecutionContext
  ): Promise<void> {
    // Example: weekly brand audits
    const brands = await env.DB.prepare(
      "SELECT id, slug FROM brands WHERE fqdn IS NOT NULL"
    ).all();

    for (const brand of brands.results) {
      try {
        const agentId = env.BRAND_AGENT.idFromName(brand.slug);
        const agent = env.BRAND_AGENT.get(agentId) as any;
        if (typeof agent.setName === "function") agent.setName(brand.slug);
        await agent.runAudit();
      } catch (err) {
        console.error(`Cron failed for ${brand.slug}:`, err);
      }
    }
  },
};

The Env type binds everything together:

interface Env {
  // D1 database
  DB: D1Database;

  // R2 storage
  STORAGE: R2Bucket;

  // Queue producer bindings
  RESEARCH_QUEUE: Queue;
  PUBLISH_QUEUE: Queue;
  EVENTS_QUEUE: Queue;
  SOCIAL_QUEUE: Queue;

  // Durable Object bindings
  BRAND_AGENT: DurableObjectNamespace;

  // Workflow bindings
  CONTENT_WORKFLOW: Workflow;

  // Workers AI
  AI: Ai;

  // Service bindings (sync RPC — infrastructure only)
  APIMOM: Fetcher;

  // Secrets
  SERVICE_KEY: string;
}

Key insight: The queue() handler runs in the same Worker as your fetch() handler. They share the same Env, the same D1 database, the same R2 bucket. This is powerful — your consumer has full access to your service’s state without any additional configuration.

Pattern 7: Delayed Messages and Backoff

Cloudflare Queues supports delaying messages — both at send time and at retry time. This enables rate limiting, cooldown periods, and exponential backoff.

When to use it: Rate-limited APIs, cooldown between operations, gradually increasing retry delays.

// Delay at send time — schedule a message for later
await env.RESEARCH_QUEUE.send(
  createMessage("research.requested", "scalable-media", {
    brand_slug: "llc-tax",
    keyword: "llc annual fees by state",
    priority: "low",
  }),
  {
    delaySeconds: 3600, // Process in 1 hour — low priority
  }
);

// Batch send with per-message delays — stagger processing
const keywords = ["llc taxes", "s-corp vs llc", "llc operating agreement"];
await env.RESEARCH_QUEUE.sendBatch(
  keywords.map((keyword, index) => ({
    body: createMessage("research.requested", "scalable-media", {
      brand_slug: "llc-tax",
      keyword,
      priority: "normal",
    }),
    delaySeconds: index * 60, // Stagger: 0s, 60s, 120s
  }))
);

// Exponential backoff in the consumer
async function handleWithBackoff(
  msg: Message<DomainMessage>,
  env: Env
): Promise<void> {
  try {
    await processMessage(msg.body, env);
    msg.ack();
  } catch (err) {
    // Exponential backoff: 30s, 60s, 120s, 240s, 480s, ...
    // Capped at 600s (10 minutes) to avoid excessive delays
    const delay = Math.min(30 * Math.pow(2, msg.attempts - 1), 600);

    console.error(
      `Attempt ${msg.attempts} failed, retrying in ${delay}s:`,
      err
    );
    msg.retry({ delaySeconds: delay });
  }
}

Rate limiting with delayed sends for external API calls:

/**
 * Rate-limited fan-out: send N research requests with staggered delays
 * to avoid overwhelming the Brave Search API.
 *
 * At 5 requests/second, 100 keywords take 20 seconds of staggered sends.
 */
async function rateLimitedFanOut(
  keywords: string[],
  brandSlug: string,
  env: Env
): Promise<void> {
  const REQUESTS_PER_SECOND = 5;
  const DELAY_BETWEEN_BATCHES = 1; // 1 second

  for (let i = 0; i < keywords.length; i += REQUESTS_PER_SECOND) {
    const batch = keywords.slice(i, i + REQUESTS_PER_SECOND);
    const batchDelay = Math.floor(i / REQUESTS_PER_SECOND) * DELAY_BETWEEN_BATCHES;

    await env.RESEARCH_QUEUE.sendBatch(
      batch.map((keyword) => ({
        body: createMessage("research.requested", "scalable-media", {
          brand_slug: brandSlug,
          keyword,
          priority: "normal",
        }),
        delaySeconds: batchDelay,
      }))
    );
  }
}

Key insight: Delayed sends are a primitive form of backpressure. The producer controls the pace by staggering delivery times. This is simpler than implementing backpressure in the consumer, because the queue handles the timing.

Pattern 8: Multi-Producer Events

The brand-events queue accepts messages from multiple producers. This is the broadcast pattern — any service can announce what happened, and the central router (SM) dispatches to the right handler.

When to use it: Domain events that multiple services emit but one service needs to react to.

// GatherFeed emits research.completed
// In gatherfeed/src/consumers/research-commands.ts:
await env.EVENTS_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "research.completed",
  source: "gatherfeed",
  timestamp: new Date().toISOString(),
  correlation_id: originalMessage.correlation_id,
  payload: {
    brand_slug: "llc-tax",
    research_id: "r_001",
    keyword: "llc tax deductions",
  },
});

// Pages-Plus emits content.published
// In pages-plus/src/consumers/publish-commands.ts:
await env.EVENTS_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "content.published",
  source: "pages-plus",
  timestamp: new Date().toISOString(),
  correlation_id: originalMessage.correlation_id,
  payload: {
    brand_slug: "llc-tax",
    content_id: "c_001",
    published_url: "https://llctax.co/blog/llc-tax-deductions",
  },
});

// Social-Good emits social.posted
// In social-good/src/consumers/social-commands.ts:
await env.EVENTS_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "social.posted",
  source: "social-good",
  timestamp: new Date().toISOString(),
  payload: {
    brand_slug: "llc-tax",
    content_id: "c_001",
    platform: "twitter",
    post_url: "https://x.com/llctax/status/123456",
  },
});

// All three events land in the same brand-events queue.
// SM's queue consumer routes each to the appropriate handler.

The key constraint is that all producers share the same brand-events binding name. Each service declares it in its wrangler.jsonc:

// Every service that emits events has this:
{
  "queues": {
    "producers": [
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" }
    ]
  }
}

Gotchas:


Small Examples

Example 1: Sending a Single Message

// The simplest possible producer — one message, one queue
async function triggerResearch(keyword: string, brandSlug: string, env: Env) {
  await env.RESEARCH_QUEUE.send({
    event_id: crypto.randomUUID(),
    type: "research.requested",
    source: "scalable-media",
    timestamp: new Date().toISOString(),
    payload: {
      brand_slug: brandSlug,
      keyword,
      priority: "normal",
    },
  });
}

Example 2: Batch Sending

// Send up to 100 messages in a single API call
// Each message can have its own delay
async function triggerBatchResearch(
  keywords: string[],
  brandSlug: string,
  env: Env
) {
  const messages = keywords.map((keyword) => ({
    body: {
      event_id: crypto.randomUUID(),
      type: "research.requested",
      source: "scalable-media",
      timestamp: new Date().toISOString(),
      payload: { brand_slug: brandSlug, keyword, priority: "normal" },
    },
  }));

  // sendBatch handles up to 100 messages or 256 KB total
  await env.RESEARCH_QUEUE.sendBatch(messages);
}

Example 3: Acknowledging vs. Retrying Individual Messages

// Per-message ack/retry within a batch
async function handleBatch(batch: MessageBatch<DomainMessage>, env: Env) {
  for (const msg of batch.messages) {
    try {
      await processMessage(msg.body, env);
      msg.ack(); // This message succeeded — remove from queue
    } catch (err) {
      if (isPermanentError(err)) {
        // Malformed data, missing references — will never succeed
        console.error(`Permanent failure for ${msg.body.event_id}:`, err);
        msg.ack(); // Ack to prevent infinite retries — log for investigation
      } else {
        // Transient error — retry with backoff
        msg.retry({ delaySeconds: 30 });
      }
    }
  }
}

function isPermanentError(err: unknown): boolean {
  if (err instanceof Error) {
    return (
      err.message.includes("not found") ||
      err.message.includes("invalid payload") ||
      err.message.includes("schema validation")
    );
  }
  return false;
}

Example 4: ackAll / retryAll for Homogeneous Batches

// When all messages in a batch should be treated the same
async function handleHomogeneousBatch(
  batch: MessageBatch<DomainMessage>,
  env: Env
) {
  try {
    // Process all messages — if any fails, we retry all
    for (const msg of batch.messages) {
      await processMessage(msg.body, env);
    }
    batch.ackAll(); // All succeeded
  } catch (err) {
    // One failed — retry the whole batch
    // Note: messages already acked individually are NOT retried
    batch.retryAll({ delaySeconds: 30 });
  }
}

Example 5: Correlation ID Chain

// Tracing a request across 4 services with correlation_id
// Step 1: SM creates the chain
const correlationId = `brand-cycle-llc-tax-2026-03-14`;

await env.RESEARCH_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "research.requested",
  source: "scalable-media",
  timestamp: new Date().toISOString(),
  correlation_id: correlationId, // Born here
  payload: { brand_slug: "llc-tax", keyword: "llc taxes" },
});

// Step 2: GatherFeed carries it forward
await env.EVENTS_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "research.completed",
  source: "gatherfeed",
  timestamp: new Date().toISOString(),
  correlation_id: correlationId, // Propagated
  payload: { brand_slug: "llc-tax", research_id: "r_001", keyword: "llc taxes" },
});

// Step 3: SM carries it to publish
await env.PUBLISH_QUEUE.send({
  event_id: crypto.randomUUID(),
  type: "content.publish",
  source: "scalable-media",
  timestamp: new Date().toISOString(),
  correlation_id: correlationId, // Still the same chain
  payload: { brand_slug: "llc-tax", content_id: "c_001", target: "pages-plus" },
});

// Now you can search logs for correlation_id to trace the full chain

Example 6: Consumer with CPU-Time Awareness

// Cloudflare Queue consumers have a 15-minute wall-clock limit
// and configurable CPU time (default 30s, max 5 min).
// For heavy processing, track elapsed time and bail early.
async function handleHeavyBatch(
  batch: MessageBatch<DomainMessage>,
  env: Env
) {
  const startTime = Date.now();
  const MAX_PROCESSING_MS = 10 * 60 * 1000; // 10 minutes safe margin

  for (const msg of batch.messages) {
    // Check if we're running out of time
    if (Date.now() - startTime > MAX_PROCESSING_MS) {
      console.warn("Approaching time limit — retrying remaining messages");
      msg.retry({ delaySeconds: 5 });
      continue;
    }

    try {
      await processMessage(msg.body, env);
      msg.ack();
    } catch (err) {
      msg.retry({ delaySeconds: 30 });
    }
  }
}

Example 7: Typed Queue Bindings

// Type your queue bindings for compile-time safety
interface Env {
  RESEARCH_QUEUE: Queue<DomainMessage<ResearchRequestedPayload>>;
  PUBLISH_QUEUE: Queue<DomainMessage<ContentPublishPayload>>;
  EVENTS_QUEUE: Queue<DomainMessage>; // multiple payload types
  SOCIAL_QUEUE: Queue<DomainMessage<SocialCreatePayload>>;
}

// Now TypeScript catches payload mismatches at compile time
async function send(env: Env) {
  // Correct — type matches
  await env.RESEARCH_QUEUE.send({
    event_id: crypto.randomUUID(),
    type: "research.requested",
    source: "scalable-media",
    timestamp: new Date().toISOString(),
    payload: {
      brand_slug: "llc-tax",
      keyword: "llc taxes",
      priority: "normal",
    },
  });

  // TypeScript error — wrong payload type for RESEARCH_QUEUE
  // await env.RESEARCH_QUEUE.send({
  //   ...envelope,
  //   payload: { brand_slug: "llc-tax", content_id: "c_001" }
  // });
}

Example 8: Processed Events Cleanup

// Periodically prune old processed events to prevent table bloat.
// Run this from a cron trigger or as part of a scheduled audit.
async function pruneProcessedEvents(env: Env): Promise<number> {
  const result = await env.DB.prepare(
    `DELETE FROM processed_events
     WHERE processed_at < datetime('now', '-7 days')`
  ).run();

  console.log(`Pruned ${result.meta.changes} processed events`);
  return result.meta.changes ?? 0;
}

// In your scheduled handler:
async function scheduled(event: ScheduledEvent, env: Env) {
  await pruneProcessedEvents(env);
}

Example 9: Pull Consumer for External Systems

// Cloudflare Queues also supports HTTP pull consumers.
// This lets any external system consume from a queue over HTTP.
// Useful for: monitoring dashboards, external processors, testing.

// Pulling messages (requires API token with Queues read+write permissions)
const response = await fetch(
  `https://api.cloudflare.com/client/v4/accounts/${accountId}/queues/${queueId}/messages/pull`,
  {
    method: "POST",
    headers: {
      Authorization: `Bearer ${apiToken}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      batch_size: 10,
      visibility_timeout_ms: 30000, // 30 seconds to process
    }),
  }
);

const { result } = await response.json();
// result.messages contains the pulled messages

// After processing, acknowledge them:
await fetch(
  `https://api.cloudflare.com/client/v4/accounts/${accountId}/queues/${queueId}/messages/ack`,
  {
    method: "POST",
    headers: {
      Authorization: `Bearer ${apiToken}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      acks: result.messages.map((m: any) => ({
        lease_id: m.lease_id,
      })),
    }),
  }
);

Example 10: Sending Messages via HTTP (No Worker Required)

// As of May 2025, you can publish to queues via HTTP.
// No Worker producer needed — any system can send messages.

const response = await fetch(
  `https://api.cloudflare.com/client/v4/accounts/${accountId}/queues/${queueId}/messages`,
  {
    method: "POST",
    headers: {
      Authorization: `Bearer ${apiToken}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      body: JSON.stringify({
        event_id: crypto.randomUUID(),
        type: "research.requested",
        source: "external-script",
        timestamp: new Date().toISOString(),
        payload: {
          brand_slug: "llc-tax",
          keyword: "llc formation costs",
          priority: "low",
        },
      }),
      content_type: "json",
      delay_seconds: 0,
    }),
  }
);

Service Bindings vs. Queues

Cloudflare offers two ways for Workers to communicate: service bindings (synchronous RPC) and queues (asynchronous messaging). Choosing the wrong one creates either unnecessary complexity or unnecessary coupling.

Service Bindings

A service binding is a direct, in-process call from one Worker to another. It uses the same fetch() API but bypasses the public internet — the call happens within Cloudflare’s network with near-zero latency.

// wrangler.jsonc — GatherFeed binds to API Mom
{
  "services": [
    {
      "binding": "APIMOM",
      "service": "apimom-router"
    }
  ]
}
// Using a service binding — synchronous, in-process
const response = await env.APIMOM.fetch("https://apimom/v1/brave/search", {
  method: "POST",
  body: JSON.stringify({ query: "llc tax deductions" }),
});
const data = await response.json();

When to Use Which

CriterionService BindingQueue
Latency requirementNeed the result NOW (< 50ms)Result can wait (seconds to minutes)
CouplingTight — caller blocks until responseLoose — fire and forget
Failure handlingCaller must handle errors inlineQueue retries automatically
Use caseInfrastructure: key lookup, auth, API proxyBusiness logic: research, generate, publish
DirectionRequest/response (sync)Fire-and-forget (async)
ScalingOne-to-oneQueue auto-scales consumers
ExampleGatherFeed → API Mom (fetch Brave API key)SM → GatherFeed (research a keyword)

The rule: Use service bindings for infrastructure concerns where you need the result immediately. Use queues for business logic where the work can happen independently.

// Service binding: GatherFeed calls API Mom to get a Brave search result
// This is infrastructure — GatherFeed needs the search result NOW to continue processing
const braveResponse = await env.APIMOM.fetch(
  "https://apimom/v1/brave/web_search?q=" + encodeURIComponent(keyword),
  { headers: { "X-Service-Key": env.SERVICE_KEY } }
);
const searchResults = await braveResponse.json();

// Queue: SM tells GatherFeed to research a keyword
// This is business logic — SM doesn't need to wait for the research to complete
await env.RESEARCH_QUEUE.send(
  createMessage("research.requested", "scalable-media", {
    brand_slug: "llc-tax",
    keyword,
    priority: "normal",
  })
);

Key insight: Service bindings are for “I need this value to continue.” Queues are for “this needs to happen eventually.” If you find yourself waiting for a queue response via polling, you probably want a service binding. If you find yourself building retry logic around a service binding, you probably want a queue.

The Hybrid Pattern

In practice, most services use both. GatherFeed uses a service binding to call API Mom (synchronous key proxy) while consuming from a queue (asynchronous research commands):

// GatherFeed's wrangler.jsonc — hybrid: service binding + queue
{
  // Service binding for synchronous infrastructure
  "services": [
    { "binding": "APIMOM", "service": "apimom-router" }
  ],

  // Queues for asynchronous business logic
  "queues": {
    "producers": [
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" }
    ],
    "consumers": [
      {
        "queue": "gatherfeed-commands",
        "max_batch_size": 5,
        "max_batch_timeout": 10,
        "dead_letter_queue": "gatherfeed-commands-dlq"
      }
    ]
  }
}

Consumer Tuning

The three consumer configuration parameters — max_batch_size, max_batch_timeout, and max_concurrency — interact in ways that aren’t obvious. Getting them right affects throughput, latency, and cost.

max_batch_size

The maximum number of messages delivered in a single batch. Default: 10. Range: 1-100.

max_batch_timeout

The maximum seconds to wait before delivering a batch (even if it’s not full). Default: 5. Range: 0-60.

max_concurrency

The maximum concurrent consumer invocations. Default: auto-scale up to 250. Range: 1-250.

Tuning Guide

Scenariomax_batch_sizemax_batch_timeoutmax_concurrencyWhy
Research (API calls)510AutoSmall batches — each msg calls external API, limit concurrency naturally
Publish (DB writes)105AutoMedium batches — DB writes are fast, optimize for throughput
Events (routing)105AutoRoute quickly — events trigger downstream work
Bulk import10030AutoMax throughput — process as many as possible per invocation
Rate-limited API101Serial — one message at a time to respect rate limits
DLQ monitoring10601Low priority — batch up errors for periodic review
// Research consumer — small batches, longer timeout
// GatherFeed processes one keyword at a time, each takes 2-10 seconds
{
  "queues": {
    "consumers": [
      {
        "queue": "gatherfeed-commands",
        "max_batch_size": 5,
        "max_batch_timeout": 10,
        "max_retries": 3,
        "dead_letter_queue": "gatherfeed-commands-dlq"
      }
    ]
  }
}

// Event router — standard batching
// SM routes events to BrandAgents — fast in-memory routing
{
  "queues": {
    "consumers": [
      {
        "queue": "brand-events",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "max_retries": 3,
        "dead_letter_queue": "brand-events-dlq"
      }
    ]
  }
}

The Cost Math

Cloudflare Queues charges per operation, where an operation is per 64 KB chunk of data written, read, or deleted.

For context: 100,000 messages per month costs about $0.12 (after the free tier). Even at 10 million messages per month, you’re looking at $10.80. The cost is effectively zero for most architectures.

ScaleMessages/MonthOperationsMonthly Cost
Hobby10,00030,000Free (under 1M)
Small100,000300,000Free (under 1M)
Medium1,000,0003,000,000$0.80
Large10,000,00030,000,000$11.60
Very large100,000,000300,000,000$119.60

Key insight: At Cloudflare Queues pricing, the question is never “can we afford another queue?” It’s “does another queue make the architecture cleaner?” The answer is almost always yes.


Dead Letter Queues

A dead letter queue (DLQ) receives messages that fail processing after exhausting all retries. Without a DLQ, failed messages are permanently deleted. With a DLQ, failed messages are preserved for investigation, replay, or manual processing.

Configuration

Every command queue should have a DLQ. It’s one line in your wrangler.jsonc:

{
  "queues": {
    "consumers": [
      {
        "queue": "gatherfeed-commands",
        "max_batch_size": 5,
        "max_batch_timeout": 10,
        "max_retries": 3,                              // Try 3 times
        "dead_letter_queue": "gatherfeed-commands-dlq"  // Then send here
      }
    ]
  }
}

If the DLQ queue doesn’t exist, Cloudflare creates it automatically. But you should create it explicitly and configure a consumer for monitoring.

DLQ Behavior

  1. A message fails processing (consumer throws an error or calls msg.retry()).
  2. The queue redelivers the message (up to max_retries times, default 3).
  3. After max_retries failures, the message is moved to the DLQ.
  4. DLQ messages persist for 4 days without an active consumer.
  5. After 4 days without a consumer, DLQ messages are permanently deleted.

DLQ Consumer for Monitoring

interface DLQMessage {
  original_event_id: string;
  type: string;
  source: string;
  timestamp: string;
  error_context?: string;
  payload: unknown;
}

/**
 * DLQ consumer — monitors failed messages and alerts.
 *
 * Options for handling DLQ messages:
 * 1. Log and alert (minimum viable DLQ)
 * 2. Store in a "dead messages" D1 table for later analysis
 * 3. Attempt automated repair and re-queue
 * 4. Send to Telegram/Slack for human review
 */
export async function handleDLQ(
  batch: MessageBatch<DomainMessage>,
  env: Env
): Promise<void> {
  for (const msg of batch.messages) {
    const { event_id, type, source, payload } = msg.body;

    // Log with full context
    console.error(`DLQ: Failed message after ${msg.attempts} attempts`, {
      event_id,
      type,
      source,
      payload,
      queue: batch.queue,
      timestamp: msg.timestamp.toISOString(),
    });

    // Store in dead_messages table for investigation
    await env.DB.prepare(
      `INSERT OR IGNORE INTO dead_messages
       (event_id, queue, type, source, payload, attempts, failed_at)
       VALUES (?, ?, ?, ?, ?, ?, datetime('now'))`
    )
      .bind(
        event_id,
        batch.queue,
        type,
        source,
        JSON.stringify(payload),
        msg.attempts
      )
      .run();

    // Alert via Telegram (using the coordinator)
    try {
      await notifyDLQ(env, {
        queue: batch.queue,
        event_id,
        type,
        source,
        attempts: msg.attempts,
      });
    } catch {
      // Don't let notification failures prevent acking
    }

    // Always ack DLQ messages — we've recorded them
    msg.ack();
  }
}

async function notifyDLQ(
  env: Env,
  details: {
    queue: string;
    event_id: string;
    type: string;
    source: string;
    attempts: number;
  }
): Promise<void> {
  // Example: send alert via coordinator Telegram bot
  const text = [
    `[DLQ ALERT] Message failed after ${details.attempts} attempts`,
    `Queue: ${details.queue}`,
    `Type: ${details.type}`,
    `Source: ${details.source}`,
    `Event ID: ${details.event_id}`,
  ].join("\n");

  await env.COORDINATOR.fetch("https://coordinator/send", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      machine: "dlq-monitor",
      agent: "queue-monitor",
      text,
    }),
  });
}

DLQ Schema

CREATE TABLE IF NOT EXISTS dead_messages (
  event_id TEXT PRIMARY KEY,
  queue TEXT NOT NULL,
  type TEXT NOT NULL,
  source TEXT,
  payload TEXT,            -- JSON string of the original payload
  attempts INTEGER,
  failed_at TEXT NOT NULL,
  replayed_at TEXT,         -- set when message is replayed
  resolution TEXT           -- 'replayed', 'discarded', 'fixed'
);

Message Replay

When you’ve fixed the bug that caused the failure, replay DLQ messages back to the original queue:

/**
 * Replay failed messages from the dead_messages table back to their
 * original queue. Only replays messages that haven't been replayed yet.
 */
async function replayDeadMessages(
  queue: Queue,
  queueName: string,
  env: Env,
  limit: number = 10
): Promise<number> {
  const messages = await env.DB.prepare(
    `SELECT event_id, type, source, payload
     FROM dead_messages
     WHERE queue = ? AND replayed_at IS NULL
     ORDER BY failed_at ASC
     LIMIT ?`
  )
    .bind(queueName, limit)
    .all<{
      event_id: string;
      type: string;
      source: string;
      payload: string;
    }>();

  let replayed = 0;

  for (const row of messages.results) {
    // Re-send with a NEW event_id (old one is in processed_events)
    const replayMessage: DomainMessage = {
      event_id: crypto.randomUUID(),
      type: row.type,
      source: row.source,
      timestamp: new Date().toISOString(),
      correlation_id: `replay-${row.event_id}`,
      payload: JSON.parse(row.payload),
    };

    await queue.send(replayMessage);

    // Mark as replayed
    await env.DB.prepare(
      `UPDATE dead_messages SET replayed_at = datetime('now'), resolution = 'replayed'
       WHERE event_id = ?`
    )
      .bind(row.event_id)
      .run();

    replayed++;
  }

  return replayed;
}

Key insight: When replaying DLQ messages, generate a new event_id. The old event_id is already in processed_events (it was processed — just not successfully — or it was never recorded because the consumer crashed). A new event_id ensures the replayed message goes through normal processing without hitting the idempotency check.


Real-World Topology: The Brand Engine

This is the actual queue topology running in production across 5 Workers. It powers an autonomous content pipeline that researches keywords, generates articles, publishes to websites, and creates social media posts — all driven by queue messages with zero synchronous coupling.

The Flow

User Request


┌─────────────┐
│  Scalable    │  1. BrandAgent receives keyword list
│  Media (SM)  │  2. Fan-out: one message per keyword → RESEARCH_QUEUE
│              │
│  BrandAgent  │──────────── gatherfeed-commands ──────────────┐
│  (DO)        │                                                │
│              │                                                ▼
│              │                                          ┌───────────┐
│              │                                          │ GatherFeed│
│              │                                          │           │
│              │                                          │ Brave API │
│              │                                          │ Workers AI│
│              │  ◀──── brand-events ────────────────────  │           │
│              │         (research.completed)               └───────────┘
│              │
│              │  3. Agent starts ContentWorkflow per keyword
│              │  4. Workflow generates article via Workers AI
│              │  5. Fan-out: publish + social messages
│              │
│              │──────────── publish-commands ──────────────┐
│              │                                            │
│              │──────────── social-commands ────────────┐  │
│              │                                         │  │
│              │  ◀──── brand-events ───────────────┐   │  │
│              │         (content.published)         │   │  │
│              │         (social.posted)             │   │  │
└──────────────┘                                     │   │  │
                                                     │   │  │
                                              ┌──────┘   │  │
                                              │          │  │
                                              │   ┌──────┘  │
                                              │   │         │
                                              │   │  ┌──────┘
                                              │   │  │
                                              │   ▼  ▼
                                        ┌──────────┐ ┌──────────┐
                                        │Social-   │ │Pages-    │
                                        │Good      │ │Plus      │
                                        │          │ │          │
                                        │X, IG,    │ │Multi-site│
                                        │LinkedIn  │ │SSR       │
                                        └──────────┘ └──────────┘

Message Flow for “Research 3 Keywords”

Here’s the exact sequence of queue messages when SM processes 3 keywords for a brand:

Time    Queue                   Message Type            Source → Consumer
────    ─────                   ────────────            ────────────────
T+0     gatherfeed-commands     research.requested      SM → GF (keyword 1)
T+0     gatherfeed-commands     research.requested      SM → GF (keyword 2)
T+0     gatherfeed-commands     research.requested      SM → GF (keyword 3)

T+5s    brand-events            research.completed      GF → SM (keyword 1)
T+8s    brand-events            research.completed      GF → SM (keyword 3)
T+12s   brand-events            research.completed      GF → SM (keyword 2)
        ↑ Note: out of order — this is expected

T+13s   (ContentWorkflow starts for keyword 1)
T+14s   (ContentWorkflow starts for keyword 3)
T+15s   (ContentWorkflow starts for keyword 2)

T+45s   brand-events            content.generated       SM → SM (keyword 1)
T+52s   brand-events            content.generated       SM → SM (keyword 3)

T+46s   publish-commands        content.publish         SM → PP (keyword 1)
T+46s   social-commands         social.create           SM → SG (keyword 1)
T+53s   publish-commands        content.publish         SM → PP (keyword 3)
T+53s   social-commands         social.create           SM → SG (keyword 3)

T+50s   brand-events            content.published       PP → SM (keyword 1)
T+57s   brand-events            content.published       PP → SM (keyword 3)
T+60s   brand-events            content.generated       SM → SM (keyword 2)
T+61s   publish-commands        content.publish         SM → PP (keyword 2)
T+65s   brand-events            content.published       PP → SM (keyword 2)

Total queue messages for 3 keywords: 15 messages (3 research + 3 completed + 3 generated + 3 publish + 3 social). At $0.0000012 per message, this costs $0.000018. The external API calls cost orders of magnitude more than the messaging.

The Configuration Files

All five services have their topology declared in wrangler.jsonc. Here’s the complete picture:

Scalable Media (SM) — The Brain:

{
  "name": "scalable-media-api",
  "main": "src/index.ts",
  "compatibility_flags": ["nodejs_compat"],

  "ai": { "binding": "AI" },

  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "scalable-media-db",
      "database_id": "061fb3fb-7147-44fc-b99e-e99ee479fc51",
      "migrations_dir": "migrations"
    }
  ],

  "r2_buckets": [
    { "binding": "STORAGE", "bucket_name": "scalable-media-data" }
  ],

  "durable_objects": {
    "bindings": [
      { "name": "BRAND_AGENT", "class_name": "BrandAgent" },
      { "name": "COORDINATOR_AGENT", "class_name": "CoordinatorAgent" }
    ]
  },

  "migrations": [
    { "tag": "v1", "new_sqlite_classes": ["BrandAgent"] },
    { "tag": "v2", "new_sqlite_classes": ["CoordinatorAgent"] }
  ],

  "workflows": [
    {
      "name": "content-workflow",
      "binding": "CONTENT_WORKFLOW",
      "class_name": "ContentWorkflow"
    }
  ],

  "triggers": {
    "crons": ["0 6 * * SUN"]
  },

  "queues": {
    "producers": [
      { "queue": "gatherfeed-commands", "binding": "RESEARCH_QUEUE" },
      { "queue": "publish-commands", "binding": "PUBLISH_QUEUE" },
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" },
      { "queue": "social-commands", "binding": "SOCIAL_QUEUE" }
    ],
    "consumers": [
      {
        "queue": "sm-commands",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "dead_letter_queue": "sm-commands-dlq"
      },
      {
        "queue": "brand-events",
        "max_batch_size": 10,
        "max_batch_timeout": 5,
        "dead_letter_queue": "brand-events-dlq"
      }
    ]
  }
}

GatherFeed (GF) — The Eyes:

{
  "name": "gatherfeed-api",
  "main": "src/index.ts",
  "compatibility_flags": ["nodejs_compat"],

  "ai": { "binding": "AI" },

  "d1_databases": [
    {
      "binding": "DB",
      "database_name": "gatherfeed-db",
      "database_id": "277bbcbe-b83c-4b52-8ce0-4d37ddaf5b10",
      "migrations_dir": "migrations"
    }
  ],

  "r2_buckets": [
    { "binding": "STORAGE", "bucket_name": "gatherfeed-data" }
  ],

  "services": [
    { "binding": "APIMOM", "service": "apimom-router" }
  ],

  "queues": {
    "producers": [
      { "queue": "brand-events", "binding": "EVENTS_QUEUE" }
    ],
    "consumers": [
      {
        "queue": "gatherfeed-commands",
        "max_batch_size": 5,
        "max_batch_timeout": 10,
        "dead_letter_queue": "gatherfeed-commands-dlq"
      }
    ]
  }
}

Why This Topology Works

  1. No circular dependencies. SM produces to GF, PP, SG. They produce events back to the shared queue. SM consumes events. The flow is always: command out → event back.

  2. Each service is independently deployable. Updating GatherFeed’s research pipeline doesn’t require touching SM, PP, or SG. The queue contract (message schema) is the only integration point.

  3. Failure is isolated. If Pages-Plus goes down, publish-commands messages accumulate in the queue. GatherFeed and Social-Good keep working. When PP comes back, it processes the backlog automatically.

  4. Scaling is automatic. If SM fans out 100 keywords, gatherfeed-commands gets 100 messages. The queue auto-scales GatherFeed’s consumer from 1 to up to 250 concurrent invocations to handle the backlog.

  5. Cost is negligible. Even at 10,000 messages per day (a busy day), the queue cost is under $1/month. The external API calls (Brave, Workers AI, Gemini) dominate the cost by 100x.


Platform Limits

Before designing your topology, know the limits. These are the documented constraints as of 2025:

LimitValueNotes
Max queues per account10,000More than enough for any architecture
Max message size128 KB~100 bytes overhead counted toward limit
Max batch size (send)100 messages OR 256 KBPer sendBatch() call
Max batch size (consumer)100 messagesPer consumer invocation
Max batch timeout60 secondsHow long to wait for a full batch
Max retries100Per message
Throughput per queue5,000 messages/secondPer queue — add more queues to scale
Max backlog per queue25 GBFar more than you’ll ever need
Consumer wall-clock time15 minutesMaximum time to process a batch
Consumer CPU timeUp to 5 minutesConfigurable (default 30s)
Max concurrent consumers250Per queue, push-based only
Message retentionUp to 14 days24 hours on free plan
Message delayUp to 24 hoursBoth send-time and retry-time
DLQ retention (no consumer)4 daysMessages deleted after 4 days without consumer
Pull consumer visibility timeoutUp to 12 hoursTime before unacked message is redelivered

Key insight: The 5,000 messages/second per-queue throughput means you should design around multiple queues rather than routing everything through one. Our topology uses 5 command/event queues plus 3 DLQs. Even our busiest queue (brand-events) is nowhere near the 5,000/s limit.


Comparisons

Cloudflare Queues vs. AWS SQS vs. GCP Pub/Sub vs. RabbitMQ vs. Kafka

FeatureCloudflare QueuesAWS SQSGCP Pub/SubRabbitMQKafka
DeploymentZero-config (in wrangler.jsonc)Console/CLI/IaCConsole/CLI/IaCSelf-hosted or managedSelf-hosted or managed
Delivery guaranteeAt-least-onceAt-least-once (Standard) or exactly-once (FIFO)At-least-onceAt-least-once or at-most-onceAt-least-once (configurable)
OrderingNo ordering guaranteeFIFO queues: ordered within groupOrdered within partitionQueue-level orderingPartition-level ordering
Message size128 KB256 KB (up to 2 GB via S3)10 MBNo hard limit (practical ~128 MB)1 MB default (configurable)
RetentionUp to 14 daysUp to 14 days31 daysUntil consumedConfigurable (forever possible)
Consumer modelPush (1 per queue) or Pull (HTTP)Pull-basedPush (subscription) or PullPush or PullPull (consumer groups)
Multi-consumer1 push consumer per queue1 consumer group per queueMultiple subscriptionsMultiple consumers with exchangesMultiple consumer groups
DLQBuilt-in (1 line config)Built-inBuilt-in (dead-letter topic)Built-in (per-queue)Manual (via consumer logic)
Auto-scalingUp to 250 concurrent consumersUnlimited (Lambda trigger)UnlimitedManualConsumer group rebalancing
BatchingUp to 100 messagesUp to 10 messagesUp to 1,000 messagesPrefetch countConfigurable fetch size
Throughput5,000 msg/s per queueNearly unlimitedNearly unlimited~50,000 msg/s (varies)Millions msg/s per cluster
Pricing (per 1M msgs)~$1.20 (3 ops each)~$0.40 (standard)~$0.04 (per 10 KB units)Self-hosted or $variesSelf-hosted or $varies
Egress feesNoneYes ($0.09/GB+)Yes ($0.12/GB)N/A (self-hosted)N/A (self-hosted)
Workers integrationNative (binding in wrangler.jsonc)Lambda trigger or SDKCloud Functions or SDKAMQP clientClient library
Best forWorkers-native architecturesAWS ecosystem, high scaleGCP ecosystem, pub/sub patternsFlexible routing, protocolsEvent streaming, log aggregation

Honest Assessment

Cloudflare Queues is best when:

Cloudflare Queues is worst when:

Cloudflare Queues fills a specific niche: it’s the “just enough” messaging system for Workers-native architectures. It’s not trying to compete with Kafka’s throughput or Pub/Sub’s subscription model. It’s trying to be the simplest possible way to decouple Workers, and it succeeds at that.

When You’d Switch Away

TriggerSwitch ToWhy
Need ordering guaranteesAWS SQS FIFOFIFO queues guarantee message ordering within a group
Need multiple subscribers per eventGCP Pub/SubNative multi-subscription model
Need event replay/rewindKafkaLog-based storage — consumers can rewind to any offset
Throughput > 5,000 msg/s sustainedKafka or Pub/SubBuilt for high-throughput streaming
Need complex routing rulesRabbitMQExchanges, routing keys, topic matching
Need cross-cloud messagingAny managed serviceQueues is Cloudflare-only

The “Just Enough” Argument

The 1-consumer-per-queue limitation is the most common objection to Cloudflare Queues. In systems like Pub/Sub or Kafka, you can have multiple independent subscribers to the same topic. In Cloudflare Queues, you work around this by having one consumer that routes events to multiple handlers (Pattern 5 above).

This is a deliberate trade-off. One consumer per queue means:

If you find yourself needing 5+ independent subscribers to the same event stream, you’ve outgrown Cloudflare Queues. That’s fine — switch to Pub/Sub or Kafka for that specific queue while keeping Cloudflare Queues for point-to-point command queues.


Anti-Patterns

Don’tDo InsteadWhy
fetch(other_worker_url, { method: 'POST', body }) between Workersenv.QUEUE.send(message)Synchronous calls couple services, cascade failures, prevent independent retry
Put full article HTML in queue messagesPut content_id in message, HTML in D1/R2Messages have a 128 KB limit; large payloads slow serialization; consumers should read latest state
Assume messages arrive in orderDesign handlers to work regardless of orderCloudflare Queues has no ordering guarantee
Process without checking event_idAlways check processed_events table firstAt-least-once delivery means duplicates happen
Use batch.retryAll() when one message failsUse per-message msg.ack() / msg.retry()retryAll re-processes already-succeeded messages (wasteful, risks side effects)
Ack before processingProcess → record → ack (in that order)If you ack first and crash, the message is lost forever
One giant queue for all message typesSeparate command queues per service + shared events queueMixing concerns makes routing complex, tuning impossible, debugging painful
Ignore the DLQConfigure DLQ for every queue, build a consumer for monitoringWithout DLQ monitoring, failed messages disappear silently
Set max_concurrency: 1 everywhereLeave it on auto (default)Auto-scaling handles backlog; setting to 1 creates bottlenecks
Retry malformed/invalid messagesAck them and log the errorMalformed messages will never succeed — retrying wastes resources and fills the DLQ
Store processing state in messagesStore state in D1 or DO SQLite; messages are ephemeral signalsMessages are transient; state must survive message deletion
Poll for results after sending a commandReact to the completion event when it arrivesPolling adds latency, cost, and complexity; events are the right pattern
Send a batch message with N keywordsFan out: send N individual messages, one per keywordIndividual messages get independent retry, acknowledgment, and concurrency
Call external APIs directly from BrandAgentAgent emits command → queue → consumer calls via API MomAgents should be decision-makers (control plane), not executors (data plane)
Use service bindings for business logicUse service bindings for infrastructure (key lookup, auth); use queues for business logicService bindings create synchronous coupling — exactly what queues are designed to eliminate
Create deeply nested queue chains (A → B → C → D → E)Keep chains shallow (max 3 hops); use correlation_id to traceDeep chains are hard to debug, have compounding failure rates, and increase end-to-end latency

Migration Checklist: From Synchronous to Event-Driven

If you’re converting an existing synchronous architecture to event-driven:

  1. Identify all fetch() calls between Workers. Each one becomes a queue message.
  2. Create command queues — one per consuming service (<service>-commands).
  3. Add processed_events table to each service’s D1 schema.
  4. Implement queue consumer (async queue(batch)) with idempotency check.
  5. Replace fetch() with env.QUEUE.send() in the producing service.
  6. Add event emission — after completing work, emit a domain event to brand-events.
  7. Wire your orchestrator (BrandAgent or equivalent) to react to events.
  8. Add DLQ configuration to wrangler.jsonc for every command queue.
  9. Remove synchronous HTTP endpoints used for inter-service actions (keep read-only GETs).
  10. Test with duplicate messages — send the same event_id twice, verify idempotency.
-- Step 3: The processed_events table (same in every service)
CREATE TABLE IF NOT EXISTS processed_events (
  event_id TEXT PRIMARY KEY,
  type TEXT NOT NULL,
  source TEXT,
  processed_at TEXT NOT NULL
);

-- Index for cleanup queries
CREATE INDEX IF NOT EXISTS idx_processed_events_processed_at
ON processed_events (processed_at);

Advanced: Queue Patterns with Durable Objects

Durable Objects and Queues complement each other. DOs provide per-entity state and decision-making. Queues provide cross-service communication. The BrandAgent pattern combines both.

The DO as Event Reactor

export class BrandAgent extends Agent<Env, BrandState> {
  /**
   * The BrandAgent is a control plane entity. It:
   * 1. Receives events from the queue router
   * 2. Makes decisions based on its state
   * 3. Emits commands to downstream queues
   *
   * It never does heavy work itself — it delegates via queues.
   */

  async onResearchCompleted(
    event: DomainMessage<ResearchCompletedPayload>
  ) {
    const { brand_slug, research_id, keyword } = event.payload;

    // Decision: should we generate content for this keyword?
    const alreadyHasContent = await this.env.DB.prepare(
      "SELECT id FROM content WHERE brand_id = ? AND keyword = ? AND status != 'archived'"
    )
      .bind(this.state.brand_id, keyword)
      .first();

    if (alreadyHasContent) {
      console.log(`Skipping "${keyword}" — content already exists`);
      return;
    }

    // Decision: start content workflow
    await this.env.CONTENT_WORKFLOW.create({
      params: {
        brand_slug,
        keyword,
        research_id,
        template: "blog",
      },
    });

    this.setState({
      ...this.state,
      pending_research: Math.max(0, this.state.pending_research - 1),
      last_research_cycle: new Date().toISOString(),
    });
  }

  /**
   * Safeguard: don't auto-action if pipeline is busy.
   * This prevents overwhelming downstream services.
   */
  async canAutoAction(): Promise<boolean> {
    const pending = await this.env.DB.prepare(
      "SELECT COUNT(*) as count FROM content WHERE brand_id = ? AND status = 'draft'"
    )
      .bind(this.state.brand_id)
      .first<{ count: number }>();

    return (pending?.count ?? 0) < 5;
  }
}

Workflow Integration

Cloudflare Workflows provide durable multi-step execution. Combined with queues, they handle the “research → generate → publish” pipeline with automatic step-level retry:

import {
  WorkflowEntrypoint,
  type WorkflowEvent,
  type WorkflowStep,
} from "cloudflare:workers";

interface ContentWorkflowParams {
  brand_slug: string;
  keyword: string;
  research_id: string;
  template: string;
}

export class ContentWorkflow extends WorkflowEntrypoint<
  Env,
  ContentWorkflowParams
> {
  async run(event: WorkflowEvent<ContentWorkflowParams>, step: WorkflowStep) {
    const { brand_slug, keyword, research_id, template } = event.payload;

    // Step 1: Fetch research from GatherFeed (read-only API — OK to use fetch)
    const research = await step.do("fetch-research", async () => {
      const response = await this.env.GATHERFEED.fetch(
        `https://gatherfeed/api/v1/research?topic=${encodeURIComponent(keyword)}&project=${brand_slug}`
      );
      return response.json();
    });

    // Step 2: Generate article with Workers AI
    const content = await step.do("generate-article", async () => {
      const result = await this.env.AI.run("@cf/meta/llama-3.1-70b-instruct", {
        prompt: `Write an article about "${keyword}" for the ${brand_slug} brand...`,
      });
      return result;
    });

    // Step 3: Store in D1
    const contentId = await step.do("store-content", async () => {
      const id = crypto.randomUUID();
      await this.env.DB.prepare(
        "INSERT INTO content (id, brand_id, keyword, title, content_html, status) VALUES (?, ?, ?, ?, ?, 'draft')"
      )
        .bind(id, brand_slug, keyword, content.title, content.html)
        .run();
      return id;
    });

    // Step 4: Emit content.generated event (triggers BrandAgent fan-out)
    await step.do("emit-event", async () => {
      await this.env.EVENTS_QUEUE.send({
        event_id: crypto.randomUUID(),
        type: "content.generated",
        source: "scalable-media",
        timestamp: new Date().toISOString(),
        payload: {
          brand_slug,
          content_id: contentId,
          keyword,
          word_count: content.html.split(/\s+/).length,
        },
      });
    });
  }
}

Each step.do() is durable — if the Worker crashes between steps, the workflow resumes from the last completed step. This is why you don’t need to build your own retry logic for multi-step processes.


Testing Queue Consumers Locally

Wrangler dev supports local queue testing. When you run wrangler dev, both the producer and consumer run in the same process, and messages flow through an in-memory queue.

# Run the producer and consumer in the same wrangler dev session
npx wrangler dev

# In another terminal, send a test message via the API
curl -X POST http://localhost:8787/v1/brands/llc-tax/research \
  -H "Content-Type: application/json" \
  -H "X-Service-Key: test-key" \
  -d '{"keywords": ["llc tax deductions"]}'

For testing separate producer and consumer Workers locally:

# Terminal 1: Run GatherFeed (consumer)
cd ~/Work/gather-feed && npx wrangler dev

# Terminal 2: Run Scalable Media (producer)
cd ~/Work/scalable-media && npx wrangler dev

# The --remote flag connects to real queues on Cloudflare's network
# Without it, queues are in-memory and don't cross Worker boundaries

For integration testing, use the --remote flag to connect to real queues:

# Connect to real Cloudflare Queues (messages go through the actual queue)
npx wrangler dev --remote

Testing Idempotency

// Test: send the same message twice, verify it's processed only once
async function testIdempotency(env: Env) {
  const eventId = crypto.randomUUID();
  const message: DomainMessage = {
    event_id: eventId,
    type: "research.requested",
    source: "test",
    timestamp: new Date().toISOString(),
    payload: { brand_slug: "test-brand", keyword: "test keyword" },
  };

  // Send the same message twice
  await env.RESEARCH_QUEUE.send(message);
  await env.RESEARCH_QUEUE.send(message);

  // After processing, check: should be processed exactly once
  const count = await env.DB.prepare(
    "SELECT COUNT(*) as c FROM processed_events WHERE event_id = ?"
  )
    .bind(eventId)
    .first<{ c: number }>();

  console.assert(count?.c === 1, "Message should be processed exactly once");
}

Monitoring and Observability

Built-in Observability

Cloudflare’s Workers observability captures queue consumer invocations automatically when you enable it:

{
  "observability": {
    "enabled": true,
    "head_sampling_rate": 1  // 1 = capture all invocations
  }
}

This gives you:

Custom Logging

Add structured logging to your queue consumers for debugging:

function logMessage(
  action: string,
  msg: Message<DomainMessage>,
  extra?: Record<string, unknown>
) {
  console.log(
    JSON.stringify({
      action,
      event_id: msg.body.event_id,
      type: msg.body.type,
      source: msg.body.source,
      correlation_id: msg.body.correlation_id,
      attempt: msg.attempts,
      queue_timestamp: msg.timestamp.toISOString(),
      processing_delay_ms: Date.now() - msg.timestamp.getTime(),
      ...extra,
    })
  );
}

// Usage in consumer:
for (const msg of batch.messages) {
  logMessage("received", msg);

  try {
    await processMessage(msg.body, env);
    logMessage("processed", msg, { success: true });
    msg.ack();
  } catch (err) {
    logMessage("failed", msg, {
      error: err instanceof Error ? err.message : String(err),
      will_retry: msg.attempts < 3,
    });
    msg.retry({ delaySeconds: 30 });
  }
}

Queue Health Dashboard Query

Monitor your queue health by tracking the processed_events and dead_messages tables:

-- Messages processed in the last hour, by type
SELECT type, COUNT(*) as count, source
FROM processed_events
WHERE processed_at > datetime('now', '-1 hour')
GROUP BY type, source
ORDER BY count DESC;

-- DLQ messages in the last 24 hours
SELECT queue, type, COUNT(*) as count
FROM dead_messages
WHERE failed_at > datetime('now', '-1 day')
GROUP BY queue, type;

-- Processing throughput (messages per minute, last 30 minutes)
SELECT
  strftime('%H:%M', processed_at) as minute,
  COUNT(*) as messages
FROM processed_events
WHERE processed_at > datetime('now', '-30 minutes')
GROUP BY minute
ORDER BY minute;

Design Principles Summary

  1. Services are autonomous. Each service owns its database, makes its own decisions, and runs on its own schedule. No service waits for another.

  2. Commands are directed, events are broadcast. A command says “do this” (message to a specific consumer). An event says “this happened” (anyone can observe).

  3. Databases are the integration layer. Services exchange references (IDs), not data. Data lives in D1/R2. Messages are signals, databases are state.

  4. Design for at-least-once. Every consumer is idempotent. Duplicate processing is harmless. This is not optional.

  5. No service knows its callers. GatherFeed doesn’t know SM asked for research. Services react to what’s in their queue, not who sent it.

  6. Async by default, sync only at boundaries. The only synchronous HTTP is user-facing APIs and infrastructure service bindings. All business logic communication is async via queues.

  7. Fan-out at the producer. One request becomes N independent messages. Each gets its own lifecycle.

  8. Keep chains shallow. Max 3 hops. If your chain is deeper, your services are doing too little per hop.

  9. Monitor the DLQ. Failed messages are data — they tell you what’s broken. Don’t ignore them.

  10. The cost argument is over. At $1.20 per million messages, the question is never “can we afford a queue?” It’s “does a queue make the architecture cleaner?”


References

Official Documentation

Architecture & Design

Comparisons

Community & Troubleshooting


Edit page
Share this post on:

Previous Post
Event-Driven Architecture on Cloudflare Workers
Next Post
Worker Analytics — Lightweight Monitoring for Continuous Pipelines