Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28
Event-Driven Architecture on Cloudflare Queues
Cloudflare Queues is the simplest way to connect Workers that don’t need to talk to each other in real time. You get at-least-once delivery, automatic retries, dead letter queues, and consumer auto-scaling — all configured in a single wrangler.jsonc file, with zero infrastructure to manage. This article shows how to design queue topologies for multi-service architectures, implement idempotent consumers, handle fan-out patterns, and avoid the mistakes that turn a clean event-driven system into an unreliable mess.
What you’ll learn:
- How Cloudflare Queues works: producers, consumers, batching, retries, and dead letter queues
- How to design queue topologies that connect multiple Workers without coupling them
- The fan-out pattern: one message triggers work across multiple downstream services
- Idempotency: why at-least-once delivery demands deduplication, and how to implement it with D1
- When to use service bindings vs. queues for Worker-to-Worker communication
- Consumer tuning:
max_batch_size,max_batch_timeout,max_concurrency, error handling - Dead letter queue design: monitoring, alerting, and message replay
- A real-world 8-queue topology connecting 5 Workers in production
The Problem
Most Workers architectures start synchronous. Worker A calls Worker B with fetch(). Worker B calls Worker C. Everything works until it doesn’t:
// The synchronous anti-pattern
export default {
async fetch(request: Request, env: Env) {
// Step 1: Research the keyword
const research = await fetch("https://gatherfeed.example.com/api/v1/research", {
method: "POST",
body: JSON.stringify({ keyword: "best budgeting apps" }),
});
// Step 2: Generate content from research
const content = await fetch("https://content-engine.example.com/generate", {
method: "POST",
body: JSON.stringify({ research: await research.json() }),
});
// Step 3: Publish the content
const published = await fetch("https://pages-plus.example.com/publish", {
method: "POST",
body: JSON.stringify({ content: await content.json() }),
});
// If any step fails, everything fails.
// If step 3 takes 45 seconds, the whole chain blocks.
// If you retry, you re-run steps 1 and 2 unnecessarily.
return new Response("Done");
},
};
The problems compound:
- Cascading failures. If GatherFeed is slow, every upstream caller blocks. One service’s problem becomes everyone’s problem.
- Wasted compute. When you retry a failed chain, you re-execute work that already succeeded. Research costs money (API calls to Brave, Perplexity, etc.). Regenerating it because the publish step failed is waste.
- Tight coupling. Worker A must know Worker B’s URL, its API contract, and its error semantics. Change B’s interface and A breaks.
- No backpressure. If you send 100 keywords for research simultaneously, GatherFeed gets 100 concurrent requests. It can’t say “slow down” — it just times out or drops requests.
- No visibility. When a message disappears into a
fetch()chain, you have no idea where it is. Did research complete? Is generation running? Did publish fail? You need distributed tracing just to answer basic questions.
Event-driven architecture solves all of these by replacing synchronous call chains with asynchronous message passing. Each service owns an inbound queue. Each service processes messages at its own pace. Each service reports what happened by emitting events. No service waits for another.
The result: a system where GatherFeed being slow for 30 seconds doesn’t cascade to the content generator, the publisher, or the social poster. They each process their own queue independently, at their own pace, with their own retry logic.
Core Concepts
The Queue Primitive
A Cloudflare Queue is a durable message buffer between a producer (the Worker that writes messages) and a consumer (the Worker that reads them). Messages are stored durably — once a write succeeds, the message won’t be lost. Consumers receive messages in batches, process them, and acknowledge or retry each one individually.
// The Queue<Body> interface — what your producer binding gives you
interface Queue<Body = unknown> {
send(body: Body, options?: QueueSendOptions): Promise<void>;
sendBatch(
messages: Iterable<MessageSendRequest<Body>>,
options?: QueueSendBatchOptions
): Promise<void>;
}
interface QueueSendOptions {
contentType?: QueuesContentType; // "json" | "text" | "bytes" | "v8"
delaySeconds?: number; // delay delivery up to 24 hours
}
interface MessageSendRequest<Body = unknown> {
body: Body;
contentType?: QueuesContentType;
delaySeconds?: number;
}
The consumer side is a queue() handler exported from your Worker, alongside the usual fetch() handler:
// The consumer receives a MessageBatch
interface MessageBatch<Body = unknown> {
readonly queue: string;
readonly messages: readonly Message<Body>[];
ackAll(): void;
retryAll(options?: QueueRetryOptions): void;
}
interface Message<Body = unknown> {
readonly id: string; // unique message ID assigned by the queue
readonly timestamp: Date; // when the message was written
readonly body: Body; // your payload
readonly attempts: number; // how many times delivery has been attempted
ack(): void;
retry(options?: QueueRetryOptions): void;
}
interface QueueRetryOptions {
delaySeconds?: number; // delay before next attempt, up to 24 hours
}
Key insight: A queue decouples when work happens from who requests it. The producer says “do this eventually.” The consumer decides when and how to do it. This is the fundamental shift from synchronous RPC to asynchronous messaging.
At-Least-Once Delivery
Cloudflare Queues guarantees at-least-once delivery. This means:
- Every message written to a queue will be delivered to the consumer at least once.
- A message might be delivered more than once — after a consumer crash, network timeout, or retry.
- Messages are never silently lost (once the write succeeds).
- Messages are not delivered in order. The documentation is explicit: “Queues does not guarantee that messages will be delivered to a consumer in the same order in which they are published.”
This has a critical implication: every consumer must be idempotent. Processing the same message twice must produce the same result as processing it once. This is not optional — it’s a fundamental requirement of at-least-once systems.
// What at-least-once means in practice:
// This message WILL be delivered. It MIGHT be delivered twice.
await env.RESEARCH_QUEUE.send({
event_id: crypto.randomUUID(),
type: "research.requested",
source: "scalable-media",
timestamp: new Date().toISOString(),
payload: { brand_slug: "llc-tax", keyword: "llc tax deductions" },
});
Commands vs. Events
There are two kinds of messages in an event-driven system. Conflating them leads to architectural confusion.
Commands are directed — they tell a specific service to do something:
// Command: "GatherFeed, research this keyword"
interface ResearchRequestedPayload {
brand_slug: string;
keyword: string;
priority: "high" | "normal" | "low";
}
// Sent to: gatherfeed-commands queue
// Consumer: GatherFeed (and only GatherFeed)
Events are broadcast — they announce that something happened, and any interested service can react:
// Event: "Research for this keyword is complete"
interface ResearchCompletedPayload {
brand_slug: string;
research_id: string;
keyword: string;
}
// Sent to: brand-events queue
// Consumers: anyone who cares (SM routes it to BrandAgent)
The naming convention makes this explicit:
| Message Type | Direction | Naming Pattern | Example |
|---|---|---|---|
| Command | One-to-one | <noun>.<verb-past> imperative | research.requested, content.publish |
| Event | One-to-many | <noun>.<verb-past> descriptive | research.completed, content.published |
Key insight: Commands go to service-specific queues (
gatherfeed-commands). Events go to a shared events queue (brand-events). This separation means you can add new event subscribers without touching the producer.
The Message Envelope
Every message through every queue uses the same envelope. This makes routing, logging, and debugging consistent across the entire system.
interface DomainMessage<T = unknown> {
event_id: string; // UUID v4 — deduplication key
type: string; // dot-notation: "research.requested"
source: string; // emitting service: "scalable-media"
timestamp: string; // ISO 8601: "2026-03-11T10:00:00Z"
correlation_id?: string; // traces a chain of related messages
payload: T; // reference data — IDs, not full objects
}
The event_id is the most important field. It’s the deduplication key. When a consumer sees a message, it checks: “Have I already processed this event_id?” If yes, acknowledge and move on. If no, process it and record the event_id.
The correlation_id ties related messages together across services. When SM requests research for a brand cycle, every downstream message — the research command, the research completion event, the content generation, the publish — shares the same correlation_id. This is your distributed tracing without a tracing system.
// Helper function to create consistently-shaped messages
function createMessage<T>(
type: string,
source: string,
payload: T,
correlationId?: string
): DomainMessage<T> {
return {
event_id: crypto.randomUUID(),
type,
source,
timestamp: new Date().toISOString(),
correlation_id: correlationId,
payload,
};
}
Key insight: Keep payloads small — references (IDs), not full objects. The 128 KB message size limit exists, but you should aim for under 10 KB. Put the actual data in D1 or R2 and pass the ID in the message. This also means consumers can always fetch the latest state rather than working with a stale snapshot.
Patterns
Pattern 1: The Idempotent Consumer
This is the foundational pattern. Every other pattern builds on it. Without idempotency, at-least-once delivery becomes at-least-once-and-maybe-duplicated execution.
When to use it: Every queue consumer. No exceptions.
import type { Env } from "./types";
import type { DomainMessage } from "./messages";
/**
* Queue consumer with idempotency check.
*
* Flow:
* 1. For each message in the batch, check if event_id exists in processed_events
* 2. If already processed → ack immediately (skip)
* 3. If new → process the message
* 4. Record event_id in processed_events
* 5. Ack the message
* 6. On failure → retry with exponential backoff
*/
export async function handleQueue(
batch: MessageBatch<DomainMessage>,
env: Env
): Promise<void> {
for (const msg of batch.messages) {
const { event_id, type } = msg.body;
// Step 1: Idempotency check — have we seen this message before?
const existing = await env.DB.prepare(
"SELECT event_id FROM processed_events WHERE event_id = ?"
)
.bind(event_id)
.first();
if (existing) {
// Already processed — acknowledge and skip
msg.ack();
continue;
}
try {
// Step 2: Route and process the message
await routeMessage(msg.body, env);
// Step 3: Record as processed (INSERT OR IGNORE for extra safety)
await env.DB.prepare(
`INSERT OR IGNORE INTO processed_events (event_id, type, source, processed_at)
VALUES (?, ?, ?, datetime('now'))`
)
.bind(event_id, type, msg.body.source)
.run();
// Step 4: Acknowledge — message will not be redelivered
msg.ack();
} catch (err) {
console.error(`Queue error processing ${type} (${event_id}):`, err);
// Step 5: Retry with delay — exponential backoff based on attempts
const delay = Math.min(30 * Math.pow(2, msg.attempts - 1), 600);
msg.retry({ delaySeconds: delay });
}
}
}
async function routeMessage(
message: DomainMessage,
env: Env
): Promise<void> {
switch (message.type) {
case "research.completed":
await handleResearchCompleted(message, env);
break;
case "content.generated":
await handleContentGenerated(message, env);
break;
case "content.published":
await handleContentPublished(message, env);
break;
default:
console.warn(`Unknown message type: ${message.type}`);
// Don't retry unknown types — they'll never succeed
}
}
The processed_events table is simple:
CREATE TABLE IF NOT EXISTS processed_events (
event_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
source TEXT,
processed_at TEXT NOT NULL
);
-- Prune events older than 7 days (run periodically)
DELETE FROM processed_events
WHERE processed_at < datetime('now', '-7 days');
Gotchas:
- The idempotency check and the processing are not atomic. If the Worker crashes after processing but before recording the
event_id, the message will be redelivered and processed again. This means your business logic must also be idempotent — useINSERT OR IGNORE,ON CONFLICT DO NOTHING, or check-before-write patterns. - The 7-day prune window matters. Messages in the DLQ persist for 4 days. If you prune
processed_eventsfaster than that, a DLQ replay could cause duplicates. Keep the prune window longer than your DLQ retention. - Don’t ack before processing. If you ack first and then crash, the message is lost forever. Always: process, record, ack — in that order.
Pattern 2: Producer-Side Fan-Out
Fan-out is when one action triggers multiple downstream messages. The BrandAgent pattern demonstrates this: when a user requests research for 10 keywords, the agent sends 10 individual messages to the research queue — one per keyword.
When to use it: When a single request maps to N independent units of work that should be processed, retried, and acknowledged independently.
import { Agent } from "agents";
import { createMessage, MessageTypes } from "./messages";
import type { Env } from "./types";
interface BrandState {
brand_slug: string;
brand_id: string;
pending_research: number;
total_content_generated: number;
total_content_published: number;
}
export class BrandAgent extends Agent<Env, BrandState> {
/**
* Fan-out pattern: one request becomes N independent queue messages.
*
* Why fan-out at the producer instead of sending one batch message?
* 1. Each keyword retries independently — one failure doesn't block others
* 2. Consumer processes one keyword at a time — no batch timeout risk
* 3. Queue concurrency auto-scales — 10 messages can process in parallel
* 4. Each keyword has its own event_id — individual idempotency tracking
*/
async requestResearch(
keywords: string[],
priority: "high" | "normal" | "low" = "normal"
) {
const correlationId = `brand-cycle-${this.state.brand_slug}-${new Date().toISOString().slice(0, 10)}`;
// Fan-out: one message per keyword
for (const keyword of keywords) {
const message = createMessage(
MessageTypes.RESEARCH_REQUESTED,
"scalable-media",
{
brand_slug: this.state.brand_slug,
keyword,
priority,
},
correlationId
);
await this.env.RESEARCH_QUEUE.send(message);
}
// Update agent state — track how many we sent
this.setState({
...this.state,
pending_research: this.state.pending_research + keywords.length,
});
}
/**
* React to research completion — trigger content generation.
* This is the other side of fan-out: each completed keyword
* triggers its own generation workflow.
*/
async onResearchCompleted(event: DomainMessage<ResearchCompletedPayload>) {
const { brand_slug, research_id, keyword } = event.payload;
// Start a content generation workflow for this keyword
const instance = await this.env.CONTENT_WORKFLOW.create({
params: {
brand_slug,
keyword,
research_id,
template: "blog",
},
});
this.setState({
...this.state,
pending_research: Math.max(0, this.state.pending_research - 1),
});
}
/**
* After content is generated, fan-out to both publish and social queues.
* One generated article triggers two independent downstream actions.
*/
async onContentGenerated(event: DomainMessage<ContentGeneratedPayload>) {
const { brand_slug, content_id, keyword } = event.payload;
// Fan-out to publish queue
const publishMessage = createMessage(
MessageTypes.CONTENT_PUBLISH,
"scalable-media",
{
brand_slug,
content_id,
target: "pages-plus",
},
event.correlation_id
);
await this.env.PUBLISH_QUEUE.send(publishMessage);
// Fan-out to social queue
const socialMessage = createMessage(
MessageTypes.SOCIAL_CREATE,
"scalable-media",
{
brand_slug,
content_id,
article_context: {
title: `Article about ${keyword}`,
keyword,
meta_description: "",
key_points: [],
},
},
event.correlation_id
);
await this.env.SOCIAL_QUEUE.send(socialMessage);
this.setState({
...this.state,
total_content_generated: this.state.total_content_generated + 1,
});
}
}
Why fan-out at the producer, not the consumer?
You have two choices for where to split work:
-
Producer-side fan-out (send N messages): The producer creates one message per unit of work. Each message is independent — its own
event_id, its own retry lifecycle, its own acknowledgment. -
Consumer-side fan-out (send 1 message with N items): The producer sends one message with an array of keywords. The consumer loops over them. If any fails, the whole message retries — including the ones that already succeeded.
Producer-side fan-out is almost always better:
| Concern | Producer-side (N messages) | Consumer-side (1 message) |
|---|---|---|
| Independent retries | Each keyword retries independently | All keywords retry if any fails |
| Parallel processing | Queue auto-scales across messages | One consumer processes all sequentially |
| Idempotency | Each has unique event_id | One event_id for all — partial success is ambiguous |
| Batch timeout risk | None — each message is small | If processing N keywords exceeds timeout, batch fails |
| Message size | Each message is small (~1 KB) | Message grows with N (risk hitting 128 KB limit) |
Key insight: Fan-out at the producer means each unit of work gets its own lifecycle. This is the “control plane / data plane” split: the BrandAgent (control plane) decides what work to do and fans it out. GatherFeed (data plane) processes one keyword at a time without knowing about the batch.
Pattern 3: The Autonomous Consumer
In a traditional request-response model, the consumer sends a response back to the caller. In an event-driven model, the consumer has no idea who called it. Instead, it processes the message, stores the result in its own database, and emits an event saying “this happened.”
When to use it: Every consumer that produces results other services care about.
import type { Env, DomainMessage } from "../types";
import {
ResearchRequestedSchema,
ResearchCompletedSchema,
type ResearchCompletedPayload,
} from "../types";
import { deepResearch } from "../lib/deep-research";
import { newId } from "../lib/id";
/**
* GatherFeed's queue consumer — the autonomous consumer pattern.
*
* GatherFeed doesn't know who sent the message. It doesn't care.
* It processes the keyword, stores the result in its own D1, and
* emits a research.completed event. Anyone can subscribe to that event.
*
* This is the "Eyes" service — it sees what's out there and reports back.
* It never asks "who wants to know?" It just publishes what it found.
*/
export async function handleResearchCommands(
batch: MessageBatch<DomainMessage>,
env: Env
): Promise<void> {
for (const msg of batch.messages) {
const { event_id, type } = msg.body;
// Idempotency check
const existing = await env.DB.prepare(
"SELECT event_id FROM processed_events WHERE event_id = ?"
)
.bind(event_id)
.first();
if (existing) {
msg.ack();
continue;
}
try {
if (type !== "research.requested") {
console.warn(`GatherFeed: unknown message type: ${type}`);
msg.ack(); // Don't retry messages we can't handle
continue;
}
// Validate the payload with Zod
const parsed = ResearchRequestedSchema.safeParse(msg.body.payload);
if (!parsed.success) {
console.error("GatherFeed: invalid payload", parsed.error);
msg.ack(); // Malformed messages will never succeed — don't retry
continue;
}
const { brand_slug, keyword } = parsed.data;
// Check if fresh research already exists (cost savings)
const cached = await env.DB.prepare(
`SELECT id FROM research
WHERE topic = ? AND project = ?
AND created_at > datetime('now', '-7 days')
LIMIT 1`
)
.bind(keyword, brand_slug)
.first<{ id: string }>();
let researchId: string;
if (cached) {
// Cache hit — no API cost
researchId = cached.id;
console.log(`GatherFeed: cached "${keyword}" (no API cost)`);
} else {
// Cache miss — call Brave Search + Workers AI
const researchData = await deepResearch(keyword, env);
researchId = newId();
await env.DB.prepare(
`INSERT INTO research (id, topic, title, content, source, tags, project, created_at, updated_at)
VALUES (?, ?, ?, ?, 'brave+workersai', ?, ?, datetime('now'), datetime('now'))`
)
.bind(
researchId,
keyword,
`Research: ${keyword}`,
JSON.stringify(researchData),
JSON.stringify(["auto-research", "v2-pipeline", brand_slug]),
brand_slug
)
.run();
}
// Emit completion event — anyone can subscribe
const completedPayload: ResearchCompletedPayload = {
brand_slug,
research_id: researchId,
keyword,
};
// Validate outbound event shape (catch bugs before they propagate)
ResearchCompletedSchema.parse(completedPayload);
const completedEvent: DomainMessage<ResearchCompletedPayload> = {
event_id: crypto.randomUUID(),
type: "research.completed",
source: "gatherfeed",
timestamp: new Date().toISOString(),
correlation_id: msg.body.correlation_id,
payload: completedPayload,
};
await env.EVENTS_QUEUE.send(completedEvent);
// Record original message as processed
await env.DB.prepare(
`INSERT OR IGNORE INTO processed_events (event_id, type, source, processed_at)
VALUES (?, ?, ?, datetime('now'))`
)
.bind(event_id, type, msg.body.source)
.run();
msg.ack();
} catch (err) {
console.error(`GatherFeed: error processing ${type} (${event_id}):`, err);
msg.retry({ delaySeconds: 30 });
}
}
}
Connection to Pattern 1: The autonomous consumer always includes the idempotency check. The two patterns are layered — idempotency is the base, autonomy is the behavior.
Connection to Pattern 2: The autonomous consumer emits events that trigger the next fan-out. GatherFeed emits research.completed, which the BrandAgent receives and fans out into content generation commands. The chain is: fan-out → autonomous processing → event emission → next fan-out.
Gotchas:
- Validate outbound events. If GatherFeed emits a malformed
research.completedevent, every downstream consumer fails. Use Zod or similar to validate outbound payloads before sending. - Cache checks are part of idempotency. The 7-day cache check for existing research is a form of business-level idempotency — it prevents duplicate API calls even when the
event_idis different. This saves real money. - Don’t ack malformed messages and don’t retry them either. A message with an invalid payload will never succeed. Acking it means you skip it (correct). Retrying it means it fails
max_retriestimes and goes to the DLQ (wasteful).
Pattern 4: Queue Topology Design
A queue topology is the map of all queues, producers, and consumers in your system. Getting this right is an architectural decision that’s hard to change later.
When to use it: Before you write any code. Design the topology first.
The fundamental rule: each service owns its inbound command queue. No service writes to another service’s internal queue. Commands go from producer to the consumer’s queue. Events go to the shared events queue.
┌──────────────────────────────────────────────────────────────────────┐
│ Queue Topology │
│ │
│ ┌─────────────┐ gatherfeed-commands ┌─────────────┐ │
│ │ Scalable │ ─────────────────────────▶ │ GatherFeed │ │
│ │ Media (SM) │ │ (Eyes) │ │
│ │ [Brain] │ publish-commands ├─────────────┤ │
│ │ │ ─────────────────────────▶ │ Pages-Plus │ │
│ │ │ │ (Mouth) │ │
│ │ │ social-commands ├─────────────┤ │
│ │ │ ─────────────────────────▶ │ Social-Good │ │
│ │ │ │ (Hands) │ │
│ │ │ └─────────────┘ │
│ │ │ │
│ │ │ ◀─┐ brand-events │
│ │ │ │ (shared) ┌─────────────┐ │
│ └──────────────┘ ├────────────────────── │ GatherFeed │ │
│ ▲ │ └─────────────┘ │
│ │ ├────────────────────── Pages-Plus │
│ │ │ │
│ sm-commands └────────────────────── Social-Good │
│ (inbound) │
│ │
│ DLQs: sm-commands-dlq, gatherfeed-commands-dlq, │
│ brand-events-dlq, social-commands-dlq │
└──────────────────────────────────────────────────────────────────────┘
Here’s how each service declares its topology in wrangler.jsonc:
Scalable Media (the brain — produces to all, consumes from sm-commands and brand-events):
{
"queues": {
"producers": [
{ "queue": "gatherfeed-commands", "binding": "RESEARCH_QUEUE" },
{ "queue": "publish-commands", "binding": "PUBLISH_QUEUE" },
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" },
{ "queue": "social-commands", "binding": "SOCIAL_QUEUE" }
],
"consumers": [
{
"queue": "sm-commands",
"max_batch_size": 10,
"max_batch_timeout": 5,
"dead_letter_queue": "sm-commands-dlq"
},
{
"queue": "brand-events",
"max_batch_size": 10,
"max_batch_timeout": 5,
"dead_letter_queue": "brand-events-dlq"
}
]
}
}
GatherFeed (the eyes — consumes research commands, produces events):
{
"queues": {
"producers": [
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" }
],
"consumers": [
{
"queue": "gatherfeed-commands",
"max_batch_size": 5,
"max_batch_timeout": 10,
"dead_letter_queue": "gatherfeed-commands-dlq"
}
]
}
}
Pages-Plus (the mouth — consumes publish commands):
{
"queues": {
"producers": [
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" }
],
"consumers": [
{
"queue": "publish-commands",
"max_batch_size": 10,
"max_batch_timeout": 5,
"dead_letter_queue": "publish-commands-dlq"
}
]
}
}
The complete topology for 5 services and 8 queues:
| Queue | Producer(s) | Consumer | DLQ | Purpose |
|---|---|---|---|---|
gatherfeed-commands | SM | GatherFeed | gatherfeed-commands-dlq | Research requests (one keyword per message) |
publish-commands | SM | Pages-Plus | publish-commands-dlq | Content publish commands |
social-commands | SM | Social-Good | social-commands-dlq | Social post creation commands |
sm-commands | External / API | SM | sm-commands-dlq | Inbound commands to SM |
brand-events | SM, GF, PP, SG | SM | brand-events-dlq | Shared domain events (multi-producer) |
sm-commands-dlq | Queue infra | (monitoring) | — | Failed SM commands |
gatherfeed-commands-dlq | Queue infra | (monitoring) | — | Failed research commands |
brand-events-dlq | Queue infra | (monitoring) | — | Failed event processing |
Gotchas:
- Cloudflare Queues supports only one push consumer per queue. You cannot have two Workers consuming from the same queue. If multiple services need the same event, route it through a single consumer that fans out (SM consumes
brand-eventsand routes to the appropriate BrandAgent). - Queue names are global within your account. Use a consistent naming convention:
<service>-<purpose>for command queues,<domain>-eventsfor event queues. - DLQ queues are created automatically if they don’t exist when referenced in the config. But you should still explicitly create them and configure consumers for monitoring.
Pattern 5: Event Router (Single Consumer, Multiple Handlers)
Because Cloudflare Queues supports only one push consumer per queue, you need a router pattern when multiple message types flow through the same queue. The shared brand-events queue carries research.completed, content.generated, content.published, and potentially many more event types.
When to use it: Any queue that carries more than one message type — especially the shared events queue.
import type { Env } from "./types";
import type {
DomainMessage,
ResearchCompletedPayload,
ContentGeneratedPayload,
ContentPublishedPayload,
} from "./messages";
import { MessageTypes } from "./messages";
/**
* Event router for the brand-events queue.
*
* SM is the sole consumer of brand-events. It routes each event
* to the appropriate handler, which typically delegates to a BrandAgent.
*
* This is the "fan-in" counterpart to producer-side fan-out:
* multiple producers emit events → one router → per-brand agents.
*/
export async function handleQueue(
batch: MessageBatch<DomainMessage>,
env: Env
): Promise<void> {
for (const msg of batch.messages) {
const { event_id, type } = msg.body;
// Idempotency check (Pattern 1)
const existing = await env.DB.prepare(
"SELECT event_id FROM processed_events WHERE event_id = ?"
)
.bind(event_id)
.first();
if (existing) {
msg.ack();
continue;
}
try {
// Route by message type
await routeMessage(msg.body, env);
// Record as processed
await env.DB.prepare(
`INSERT OR IGNORE INTO processed_events (event_id, type, source, processed_at)
VALUES (?, ?, ?, datetime('now'))`
)
.bind(event_id, type, msg.body.source)
.run();
msg.ack();
} catch (err) {
console.error(`Queue error processing ${type} (${event_id}):`, err);
msg.retry({ delaySeconds: 30 });
}
}
}
async function routeMessage(
message: DomainMessage,
env: Env
): Promise<void> {
switch (message.type) {
case MessageTypes.RESEARCH_COMPLETED:
await handleResearchCompleted(
message as DomainMessage<ResearchCompletedPayload>,
env
);
break;
case MessageTypes.CONTENT_GENERATED:
await handleContentGenerated(
message as DomainMessage<ContentGeneratedPayload>,
env
);
break;
case MessageTypes.CONTENT_PUBLISHED:
await handleContentPublished(
message as DomainMessage<ContentPublishedPayload>,
env
);
break;
default:
// Log unknown types but don't retry — they'll never succeed
console.warn(`Unknown message type: ${message.type}`);
}
}
/**
* Route research.completed → BrandAgent.
* The agent decides what to do next (generate content, skip, etc.)
*/
async function handleResearchCompleted(
event: DomainMessage<ResearchCompletedPayload>,
env: Env
): Promise<void> {
const agent = getBrandAgent(event.payload.brand_slug, env);
await agent.onResearchCompleted(event);
}
/**
* Route content.generated → BrandAgent.
* Agent triggers publish + social fan-out (Pattern 2).
*/
async function handleContentGenerated(
event: DomainMessage<ContentGeneratedPayload>,
env: Env
): Promise<void> {
const agent = getBrandAgent(event.payload.brand_slug, env);
await agent.onContentGenerated(event);
}
/**
* Route content.published → update D1 + BrandAgent.
* Direct D1 update + agent state tracking.
*/
async function handleContentPublished(
event: DomainMessage<ContentPublishedPayload>,
env: Env
): Promise<void> {
const { content_id, published_url } = event.payload;
// Update content status in D1
await env.DB.prepare(
`UPDATE content SET status = 'published', published_at = datetime('now'), published_to = ? WHERE id = ?`
)
.bind(JSON.stringify([published_url]), content_id)
.run();
// Route to BrandAgent for state counter update
const agent = getBrandAgent(event.payload.brand_slug, env);
await agent.onContentPublished(event);
}
/**
* Get a BrandAgent Durable Object stub with name set.
* Agents SDK requires .setName() before RPC calls.
*/
function getBrandAgent(brandSlug: string, env: Env) {
const agentId = env.BRAND_AGENT.idFromName(brandSlug);
const stub = env.BRAND_AGENT.get(agentId) as any;
if (typeof stub.setName === "function") {
stub.setName(brandSlug);
}
return stub;
}
Connection to Pattern 4: The topology determines which queues exist. The router determines how messages within a queue are dispatched. Together, they form the messaging backbone.
Gotchas:
- Every new event type requires a new case. If you add
content.archivedevents, you need to add a handler here. Forgetting this means the event is silently ignored (since thedefaultcase warns but doesn’t retry). - The router is itself a single point of failure for events. If SM’s queue consumer crashes, no events get processed. This is acceptable because SM is the brain — if the brain is down, nothing useful happens anyway. But monitor it.
- Consider message type registries. As the system grows, you’ll want a centralized registry of all message types, their schemas, and which services produce/consume them.
Pattern 6: The Worker Export Shape
Every Worker that participates in the queue topology needs to export three handlers: fetch, queue, and optionally scheduled. The shape is always the same.
When to use it: Every Worker with queue bindings.
import { Hono } from "hono";
import type { Env } from "./types";
import type { DomainMessage } from "./messages";
import { handleQueue } from "./queue";
// Re-export Durable Object classes (required for DO bindings)
export { BrandAgent } from "./agents/brand-agent";
export { ContentWorkflow } from "./agents/content-workflow";
const app = new Hono<{ Bindings: Env }>();
app.get("/health", (c) => c.json({ status: "ok" }));
// ... your routes here ...
export default {
// HTTP handler — serves your API
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
return app.fetch(request, env, ctx);
},
// Queue handler — processes messages from bound queues
async queue(
batch: MessageBatch<DomainMessage>,
env: Env
): Promise<void> {
await handleQueue(batch, env);
},
// Cron handler — triggers periodic work
async scheduled(
event: ScheduledEvent,
env: Env,
ctx: ExecutionContext
): Promise<void> {
// Example: weekly brand audits
const brands = await env.DB.prepare(
"SELECT id, slug FROM brands WHERE fqdn IS NOT NULL"
).all();
for (const brand of brands.results) {
try {
const agentId = env.BRAND_AGENT.idFromName(brand.slug);
const agent = env.BRAND_AGENT.get(agentId) as any;
if (typeof agent.setName === "function") agent.setName(brand.slug);
await agent.runAudit();
} catch (err) {
console.error(`Cron failed for ${brand.slug}:`, err);
}
}
},
};
The Env type binds everything together:
interface Env {
// D1 database
DB: D1Database;
// R2 storage
STORAGE: R2Bucket;
// Queue producer bindings
RESEARCH_QUEUE: Queue;
PUBLISH_QUEUE: Queue;
EVENTS_QUEUE: Queue;
SOCIAL_QUEUE: Queue;
// Durable Object bindings
BRAND_AGENT: DurableObjectNamespace;
// Workflow bindings
CONTENT_WORKFLOW: Workflow;
// Workers AI
AI: Ai;
// Service bindings (sync RPC — infrastructure only)
APIMOM: Fetcher;
// Secrets
SERVICE_KEY: string;
}
Key insight: The
queue()handler runs in the same Worker as yourfetch()handler. They share the sameEnv, the same D1 database, the same R2 bucket. This is powerful — your consumer has full access to your service’s state without any additional configuration.
Pattern 7: Delayed Messages and Backoff
Cloudflare Queues supports delaying messages — both at send time and at retry time. This enables rate limiting, cooldown periods, and exponential backoff.
When to use it: Rate-limited APIs, cooldown between operations, gradually increasing retry delays.
// Delay at send time — schedule a message for later
await env.RESEARCH_QUEUE.send(
createMessage("research.requested", "scalable-media", {
brand_slug: "llc-tax",
keyword: "llc annual fees by state",
priority: "low",
}),
{
delaySeconds: 3600, // Process in 1 hour — low priority
}
);
// Batch send with per-message delays — stagger processing
const keywords = ["llc taxes", "s-corp vs llc", "llc operating agreement"];
await env.RESEARCH_QUEUE.sendBatch(
keywords.map((keyword, index) => ({
body: createMessage("research.requested", "scalable-media", {
brand_slug: "llc-tax",
keyword,
priority: "normal",
}),
delaySeconds: index * 60, // Stagger: 0s, 60s, 120s
}))
);
// Exponential backoff in the consumer
async function handleWithBackoff(
msg: Message<DomainMessage>,
env: Env
): Promise<void> {
try {
await processMessage(msg.body, env);
msg.ack();
} catch (err) {
// Exponential backoff: 30s, 60s, 120s, 240s, 480s, ...
// Capped at 600s (10 minutes) to avoid excessive delays
const delay = Math.min(30 * Math.pow(2, msg.attempts - 1), 600);
console.error(
`Attempt ${msg.attempts} failed, retrying in ${delay}s:`,
err
);
msg.retry({ delaySeconds: delay });
}
}
Rate limiting with delayed sends for external API calls:
/**
* Rate-limited fan-out: send N research requests with staggered delays
* to avoid overwhelming the Brave Search API.
*
* At 5 requests/second, 100 keywords take 20 seconds of staggered sends.
*/
async function rateLimitedFanOut(
keywords: string[],
brandSlug: string,
env: Env
): Promise<void> {
const REQUESTS_PER_SECOND = 5;
const DELAY_BETWEEN_BATCHES = 1; // 1 second
for (let i = 0; i < keywords.length; i += REQUESTS_PER_SECOND) {
const batch = keywords.slice(i, i + REQUESTS_PER_SECOND);
const batchDelay = Math.floor(i / REQUESTS_PER_SECOND) * DELAY_BETWEEN_BATCHES;
await env.RESEARCH_QUEUE.sendBatch(
batch.map((keyword) => ({
body: createMessage("research.requested", "scalable-media", {
brand_slug: brandSlug,
keyword,
priority: "normal",
}),
delaySeconds: batchDelay,
}))
);
}
}
Key insight: Delayed sends are a primitive form of backpressure. The producer controls the pace by staggering delivery times. This is simpler than implementing backpressure in the consumer, because the queue handles the timing.
Pattern 8: Multi-Producer Events
The brand-events queue accepts messages from multiple producers. This is the broadcast pattern — any service can announce what happened, and the central router (SM) dispatches to the right handler.
When to use it: Domain events that multiple services emit but one service needs to react to.
// GatherFeed emits research.completed
// In gatherfeed/src/consumers/research-commands.ts:
await env.EVENTS_QUEUE.send({
event_id: crypto.randomUUID(),
type: "research.completed",
source: "gatherfeed",
timestamp: new Date().toISOString(),
correlation_id: originalMessage.correlation_id,
payload: {
brand_slug: "llc-tax",
research_id: "r_001",
keyword: "llc tax deductions",
},
});
// Pages-Plus emits content.published
// In pages-plus/src/consumers/publish-commands.ts:
await env.EVENTS_QUEUE.send({
event_id: crypto.randomUUID(),
type: "content.published",
source: "pages-plus",
timestamp: new Date().toISOString(),
correlation_id: originalMessage.correlation_id,
payload: {
brand_slug: "llc-tax",
content_id: "c_001",
published_url: "https://llctax.co/blog/llc-tax-deductions",
},
});
// Social-Good emits social.posted
// In social-good/src/consumers/social-commands.ts:
await env.EVENTS_QUEUE.send({
event_id: crypto.randomUUID(),
type: "social.posted",
source: "social-good",
timestamp: new Date().toISOString(),
payload: {
brand_slug: "llc-tax",
content_id: "c_001",
platform: "twitter",
post_url: "https://x.com/llctax/status/123456",
},
});
// All three events land in the same brand-events queue.
// SM's queue consumer routes each to the appropriate handler.
The key constraint is that all producers share the same brand-events binding name. Each service declares it in its wrangler.jsonc:
// Every service that emits events has this:
{
"queues": {
"producers": [
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" }
]
}
}
Gotchas:
- All producers must use the same envelope schema. If GatherFeed sends messages without
event_id, SM’s idempotency check fails. TheDomainMessageinterface must be shared across all services. - The
sourcefield is critical for debugging. When you see a failed message in the DLQ,sourcetells you which service emitted it. Always set it accurately. - Order is not guaranteed across producers. GatherFeed might emit
research.completedbefore SM finishes writing the brand to D1. Your handlers must be order-independent.
Small Examples
Example 1: Sending a Single Message
// The simplest possible producer — one message, one queue
async function triggerResearch(keyword: string, brandSlug: string, env: Env) {
await env.RESEARCH_QUEUE.send({
event_id: crypto.randomUUID(),
type: "research.requested",
source: "scalable-media",
timestamp: new Date().toISOString(),
payload: {
brand_slug: brandSlug,
keyword,
priority: "normal",
},
});
}
Example 2: Batch Sending
// Send up to 100 messages in a single API call
// Each message can have its own delay
async function triggerBatchResearch(
keywords: string[],
brandSlug: string,
env: Env
) {
const messages = keywords.map((keyword) => ({
body: {
event_id: crypto.randomUUID(),
type: "research.requested",
source: "scalable-media",
timestamp: new Date().toISOString(),
payload: { brand_slug: brandSlug, keyword, priority: "normal" },
},
}));
// sendBatch handles up to 100 messages or 256 KB total
await env.RESEARCH_QUEUE.sendBatch(messages);
}
Example 3: Acknowledging vs. Retrying Individual Messages
// Per-message ack/retry within a batch
async function handleBatch(batch: MessageBatch<DomainMessage>, env: Env) {
for (const msg of batch.messages) {
try {
await processMessage(msg.body, env);
msg.ack(); // This message succeeded — remove from queue
} catch (err) {
if (isPermanentError(err)) {
// Malformed data, missing references — will never succeed
console.error(`Permanent failure for ${msg.body.event_id}:`, err);
msg.ack(); // Ack to prevent infinite retries — log for investigation
} else {
// Transient error — retry with backoff
msg.retry({ delaySeconds: 30 });
}
}
}
}
function isPermanentError(err: unknown): boolean {
if (err instanceof Error) {
return (
err.message.includes("not found") ||
err.message.includes("invalid payload") ||
err.message.includes("schema validation")
);
}
return false;
}
Example 4: ackAll / retryAll for Homogeneous Batches
// When all messages in a batch should be treated the same
async function handleHomogeneousBatch(
batch: MessageBatch<DomainMessage>,
env: Env
) {
try {
// Process all messages — if any fails, we retry all
for (const msg of batch.messages) {
await processMessage(msg.body, env);
}
batch.ackAll(); // All succeeded
} catch (err) {
// One failed — retry the whole batch
// Note: messages already acked individually are NOT retried
batch.retryAll({ delaySeconds: 30 });
}
}
Example 5: Correlation ID Chain
// Tracing a request across 4 services with correlation_id
// Step 1: SM creates the chain
const correlationId = `brand-cycle-llc-tax-2026-03-14`;
await env.RESEARCH_QUEUE.send({
event_id: crypto.randomUUID(),
type: "research.requested",
source: "scalable-media",
timestamp: new Date().toISOString(),
correlation_id: correlationId, // Born here
payload: { brand_slug: "llc-tax", keyword: "llc taxes" },
});
// Step 2: GatherFeed carries it forward
await env.EVENTS_QUEUE.send({
event_id: crypto.randomUUID(),
type: "research.completed",
source: "gatherfeed",
timestamp: new Date().toISOString(),
correlation_id: correlationId, // Propagated
payload: { brand_slug: "llc-tax", research_id: "r_001", keyword: "llc taxes" },
});
// Step 3: SM carries it to publish
await env.PUBLISH_QUEUE.send({
event_id: crypto.randomUUID(),
type: "content.publish",
source: "scalable-media",
timestamp: new Date().toISOString(),
correlation_id: correlationId, // Still the same chain
payload: { brand_slug: "llc-tax", content_id: "c_001", target: "pages-plus" },
});
// Now you can search logs for correlation_id to trace the full chain
Example 6: Consumer with CPU-Time Awareness
// Cloudflare Queue consumers have a 15-minute wall-clock limit
// and configurable CPU time (default 30s, max 5 min).
// For heavy processing, track elapsed time and bail early.
async function handleHeavyBatch(
batch: MessageBatch<DomainMessage>,
env: Env
) {
const startTime = Date.now();
const MAX_PROCESSING_MS = 10 * 60 * 1000; // 10 minutes safe margin
for (const msg of batch.messages) {
// Check if we're running out of time
if (Date.now() - startTime > MAX_PROCESSING_MS) {
console.warn("Approaching time limit — retrying remaining messages");
msg.retry({ delaySeconds: 5 });
continue;
}
try {
await processMessage(msg.body, env);
msg.ack();
} catch (err) {
msg.retry({ delaySeconds: 30 });
}
}
}
Example 7: Typed Queue Bindings
// Type your queue bindings for compile-time safety
interface Env {
RESEARCH_QUEUE: Queue<DomainMessage<ResearchRequestedPayload>>;
PUBLISH_QUEUE: Queue<DomainMessage<ContentPublishPayload>>;
EVENTS_QUEUE: Queue<DomainMessage>; // multiple payload types
SOCIAL_QUEUE: Queue<DomainMessage<SocialCreatePayload>>;
}
// Now TypeScript catches payload mismatches at compile time
async function send(env: Env) {
// Correct — type matches
await env.RESEARCH_QUEUE.send({
event_id: crypto.randomUUID(),
type: "research.requested",
source: "scalable-media",
timestamp: new Date().toISOString(),
payload: {
brand_slug: "llc-tax",
keyword: "llc taxes",
priority: "normal",
},
});
// TypeScript error — wrong payload type for RESEARCH_QUEUE
// await env.RESEARCH_QUEUE.send({
// ...envelope,
// payload: { brand_slug: "llc-tax", content_id: "c_001" }
// });
}
Example 8: Processed Events Cleanup
// Periodically prune old processed events to prevent table bloat.
// Run this from a cron trigger or as part of a scheduled audit.
async function pruneProcessedEvents(env: Env): Promise<number> {
const result = await env.DB.prepare(
`DELETE FROM processed_events
WHERE processed_at < datetime('now', '-7 days')`
).run();
console.log(`Pruned ${result.meta.changes} processed events`);
return result.meta.changes ?? 0;
}
// In your scheduled handler:
async function scheduled(event: ScheduledEvent, env: Env) {
await pruneProcessedEvents(env);
}
Example 9: Pull Consumer for External Systems
// Cloudflare Queues also supports HTTP pull consumers.
// This lets any external system consume from a queue over HTTP.
// Useful for: monitoring dashboards, external processors, testing.
// Pulling messages (requires API token with Queues read+write permissions)
const response = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${accountId}/queues/${queueId}/messages/pull`,
{
method: "POST",
headers: {
Authorization: `Bearer ${apiToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
batch_size: 10,
visibility_timeout_ms: 30000, // 30 seconds to process
}),
}
);
const { result } = await response.json();
// result.messages contains the pulled messages
// After processing, acknowledge them:
await fetch(
`https://api.cloudflare.com/client/v4/accounts/${accountId}/queues/${queueId}/messages/ack`,
{
method: "POST",
headers: {
Authorization: `Bearer ${apiToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
acks: result.messages.map((m: any) => ({
lease_id: m.lease_id,
})),
}),
}
);
Example 10: Sending Messages via HTTP (No Worker Required)
// As of May 2025, you can publish to queues via HTTP.
// No Worker producer needed — any system can send messages.
const response = await fetch(
`https://api.cloudflare.com/client/v4/accounts/${accountId}/queues/${queueId}/messages`,
{
method: "POST",
headers: {
Authorization: `Bearer ${apiToken}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
body: JSON.stringify({
event_id: crypto.randomUUID(),
type: "research.requested",
source: "external-script",
timestamp: new Date().toISOString(),
payload: {
brand_slug: "llc-tax",
keyword: "llc formation costs",
priority: "low",
},
}),
content_type: "json",
delay_seconds: 0,
}),
}
);
Service Bindings vs. Queues
Cloudflare offers two ways for Workers to communicate: service bindings (synchronous RPC) and queues (asynchronous messaging). Choosing the wrong one creates either unnecessary complexity or unnecessary coupling.
Service Bindings
A service binding is a direct, in-process call from one Worker to another. It uses the same fetch() API but bypasses the public internet — the call happens within Cloudflare’s network with near-zero latency.
// wrangler.jsonc — GatherFeed binds to API Mom
{
"services": [
{
"binding": "APIMOM",
"service": "apimom-router"
}
]
}
// Using a service binding — synchronous, in-process
const response = await env.APIMOM.fetch("https://apimom/v1/brave/search", {
method: "POST",
body: JSON.stringify({ query: "llc tax deductions" }),
});
const data = await response.json();
When to Use Which
| Criterion | Service Binding | Queue |
|---|---|---|
| Latency requirement | Need the result NOW (< 50ms) | Result can wait (seconds to minutes) |
| Coupling | Tight — caller blocks until response | Loose — fire and forget |
| Failure handling | Caller must handle errors inline | Queue retries automatically |
| Use case | Infrastructure: key lookup, auth, API proxy | Business logic: research, generate, publish |
| Direction | Request/response (sync) | Fire-and-forget (async) |
| Scaling | One-to-one | Queue auto-scales consumers |
| Example | GatherFeed → API Mom (fetch Brave API key) | SM → GatherFeed (research a keyword) |
The rule: Use service bindings for infrastructure concerns where you need the result immediately. Use queues for business logic where the work can happen independently.
// Service binding: GatherFeed calls API Mom to get a Brave search result
// This is infrastructure — GatherFeed needs the search result NOW to continue processing
const braveResponse = await env.APIMOM.fetch(
"https://apimom/v1/brave/web_search?q=" + encodeURIComponent(keyword),
{ headers: { "X-Service-Key": env.SERVICE_KEY } }
);
const searchResults = await braveResponse.json();
// Queue: SM tells GatherFeed to research a keyword
// This is business logic — SM doesn't need to wait for the research to complete
await env.RESEARCH_QUEUE.send(
createMessage("research.requested", "scalable-media", {
brand_slug: "llc-tax",
keyword,
priority: "normal",
})
);
Key insight: Service bindings are for “I need this value to continue.” Queues are for “this needs to happen eventually.” If you find yourself waiting for a queue response via polling, you probably want a service binding. If you find yourself building retry logic around a service binding, you probably want a queue.
The Hybrid Pattern
In practice, most services use both. GatherFeed uses a service binding to call API Mom (synchronous key proxy) while consuming from a queue (asynchronous research commands):
// GatherFeed's wrangler.jsonc — hybrid: service binding + queue
{
// Service binding for synchronous infrastructure
"services": [
{ "binding": "APIMOM", "service": "apimom-router" }
],
// Queues for asynchronous business logic
"queues": {
"producers": [
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" }
],
"consumers": [
{
"queue": "gatherfeed-commands",
"max_batch_size": 5,
"max_batch_timeout": 10,
"dead_letter_queue": "gatherfeed-commands-dlq"
}
]
}
}
Consumer Tuning
The three consumer configuration parameters — max_batch_size, max_batch_timeout, and max_concurrency — interact in ways that aren’t obvious. Getting them right affects throughput, latency, and cost.
max_batch_size
The maximum number of messages delivered in a single batch. Default: 10. Range: 1-100.
- Higher values mean fewer consumer invocations (lower cost) but higher per-invocation latency.
- Lower values mean more invocations (higher cost) but lower processing latency per message.
- The queue delivers a batch as soon as
max_batch_sizeis reached ORmax_batch_timeoutexpires — whichever comes first.
max_batch_timeout
The maximum seconds to wait before delivering a batch (even if it’s not full). Default: 5. Range: 0-60.
- Higher values mean better batching efficiency (more messages per batch) but higher latency for the first message in the batch.
- Lower values mean faster processing but potentially many small batches (higher cost).
- Set to 0 for near-instant delivery (messages delivered as soon as they arrive).
max_concurrency
The maximum concurrent consumer invocations. Default: auto-scale up to 250. Range: 1-250.
- Auto (default): The queue automatically scales consumers based on backlog and error rates. This is almost always what you want.
- Set to 1: Forces serial processing. Use this when your consumer must process messages one at a time (e.g., rate-limited external APIs).
- Don’t set it unless you have a specific reason. Auto-scaling is smarter than you think.
Tuning Guide
| Scenario | max_batch_size | max_batch_timeout | max_concurrency | Why |
|---|---|---|---|---|
| Research (API calls) | 5 | 10 | Auto | Small batches — each msg calls external API, limit concurrency naturally |
| Publish (DB writes) | 10 | 5 | Auto | Medium batches — DB writes are fast, optimize for throughput |
| Events (routing) | 10 | 5 | Auto | Route quickly — events trigger downstream work |
| Bulk import | 100 | 30 | Auto | Max throughput — process as many as possible per invocation |
| Rate-limited API | 1 | 0 | 1 | Serial — one message at a time to respect rate limits |
| DLQ monitoring | 10 | 60 | 1 | Low priority — batch up errors for periodic review |
// Research consumer — small batches, longer timeout
// GatherFeed processes one keyword at a time, each takes 2-10 seconds
{
"queues": {
"consumers": [
{
"queue": "gatherfeed-commands",
"max_batch_size": 5,
"max_batch_timeout": 10,
"max_retries": 3,
"dead_letter_queue": "gatherfeed-commands-dlq"
}
]
}
}
// Event router — standard batching
// SM routes events to BrandAgents — fast in-memory routing
{
"queues": {
"consumers": [
{
"queue": "brand-events",
"max_batch_size": 10,
"max_batch_timeout": 5,
"max_retries": 3,
"dead_letter_queue": "brand-events-dlq"
}
]
}
}
The Cost Math
Cloudflare Queues charges per operation, where an operation is per 64 KB chunk of data written, read, or deleted.
- Pricing: $0.40 per million operations after the first 1 million free per month.
- Per-message cost: 3 operations (write + read + delete) = ~$0.0000012 per message.
- Per million delivered messages: ~$1.20.
- No egress fees. No bandwidth charges.
For context: 100,000 messages per month costs about $0.12 (after the free tier). Even at 10 million messages per month, you’re looking at $10.80. The cost is effectively zero for most architectures.
| Scale | Messages/Month | Operations | Monthly Cost |
|---|---|---|---|
| Hobby | 10,000 | 30,000 | Free (under 1M) |
| Small | 100,000 | 300,000 | Free (under 1M) |
| Medium | 1,000,000 | 3,000,000 | $0.80 |
| Large | 10,000,000 | 30,000,000 | $11.60 |
| Very large | 100,000,000 | 300,000,000 | $119.60 |
Key insight: At Cloudflare Queues pricing, the question is never “can we afford another queue?” It’s “does another queue make the architecture cleaner?” The answer is almost always yes.
Dead Letter Queues
A dead letter queue (DLQ) receives messages that fail processing after exhausting all retries. Without a DLQ, failed messages are permanently deleted. With a DLQ, failed messages are preserved for investigation, replay, or manual processing.
Configuration
Every command queue should have a DLQ. It’s one line in your wrangler.jsonc:
{
"queues": {
"consumers": [
{
"queue": "gatherfeed-commands",
"max_batch_size": 5,
"max_batch_timeout": 10,
"max_retries": 3, // Try 3 times
"dead_letter_queue": "gatherfeed-commands-dlq" // Then send here
}
]
}
}
If the DLQ queue doesn’t exist, Cloudflare creates it automatically. But you should create it explicitly and configure a consumer for monitoring.
DLQ Behavior
- A message fails processing (consumer throws an error or calls
msg.retry()). - The queue redelivers the message (up to
max_retriestimes, default 3). - After
max_retriesfailures, the message is moved to the DLQ. - DLQ messages persist for 4 days without an active consumer.
- After 4 days without a consumer, DLQ messages are permanently deleted.
DLQ Consumer for Monitoring
interface DLQMessage {
original_event_id: string;
type: string;
source: string;
timestamp: string;
error_context?: string;
payload: unknown;
}
/**
* DLQ consumer — monitors failed messages and alerts.
*
* Options for handling DLQ messages:
* 1. Log and alert (minimum viable DLQ)
* 2. Store in a "dead messages" D1 table for later analysis
* 3. Attempt automated repair and re-queue
* 4. Send to Telegram/Slack for human review
*/
export async function handleDLQ(
batch: MessageBatch<DomainMessage>,
env: Env
): Promise<void> {
for (const msg of batch.messages) {
const { event_id, type, source, payload } = msg.body;
// Log with full context
console.error(`DLQ: Failed message after ${msg.attempts} attempts`, {
event_id,
type,
source,
payload,
queue: batch.queue,
timestamp: msg.timestamp.toISOString(),
});
// Store in dead_messages table for investigation
await env.DB.prepare(
`INSERT OR IGNORE INTO dead_messages
(event_id, queue, type, source, payload, attempts, failed_at)
VALUES (?, ?, ?, ?, ?, ?, datetime('now'))`
)
.bind(
event_id,
batch.queue,
type,
source,
JSON.stringify(payload),
msg.attempts
)
.run();
// Alert via Telegram (using the coordinator)
try {
await notifyDLQ(env, {
queue: batch.queue,
event_id,
type,
source,
attempts: msg.attempts,
});
} catch {
// Don't let notification failures prevent acking
}
// Always ack DLQ messages — we've recorded them
msg.ack();
}
}
async function notifyDLQ(
env: Env,
details: {
queue: string;
event_id: string;
type: string;
source: string;
attempts: number;
}
): Promise<void> {
// Example: send alert via coordinator Telegram bot
const text = [
`[DLQ ALERT] Message failed after ${details.attempts} attempts`,
`Queue: ${details.queue}`,
`Type: ${details.type}`,
`Source: ${details.source}`,
`Event ID: ${details.event_id}`,
].join("\n");
await env.COORDINATOR.fetch("https://coordinator/send", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
machine: "dlq-monitor",
agent: "queue-monitor",
text,
}),
});
}
DLQ Schema
CREATE TABLE IF NOT EXISTS dead_messages (
event_id TEXT PRIMARY KEY,
queue TEXT NOT NULL,
type TEXT NOT NULL,
source TEXT,
payload TEXT, -- JSON string of the original payload
attempts INTEGER,
failed_at TEXT NOT NULL,
replayed_at TEXT, -- set when message is replayed
resolution TEXT -- 'replayed', 'discarded', 'fixed'
);
Message Replay
When you’ve fixed the bug that caused the failure, replay DLQ messages back to the original queue:
/**
* Replay failed messages from the dead_messages table back to their
* original queue. Only replays messages that haven't been replayed yet.
*/
async function replayDeadMessages(
queue: Queue,
queueName: string,
env: Env,
limit: number = 10
): Promise<number> {
const messages = await env.DB.prepare(
`SELECT event_id, type, source, payload
FROM dead_messages
WHERE queue = ? AND replayed_at IS NULL
ORDER BY failed_at ASC
LIMIT ?`
)
.bind(queueName, limit)
.all<{
event_id: string;
type: string;
source: string;
payload: string;
}>();
let replayed = 0;
for (const row of messages.results) {
// Re-send with a NEW event_id (old one is in processed_events)
const replayMessage: DomainMessage = {
event_id: crypto.randomUUID(),
type: row.type,
source: row.source,
timestamp: new Date().toISOString(),
correlation_id: `replay-${row.event_id}`,
payload: JSON.parse(row.payload),
};
await queue.send(replayMessage);
// Mark as replayed
await env.DB.prepare(
`UPDATE dead_messages SET replayed_at = datetime('now'), resolution = 'replayed'
WHERE event_id = ?`
)
.bind(row.event_id)
.run();
replayed++;
}
return replayed;
}
Key insight: When replaying DLQ messages, generate a new
event_id. The oldevent_idis already inprocessed_events(it was processed — just not successfully — or it was never recorded because the consumer crashed). A newevent_idensures the replayed message goes through normal processing without hitting the idempotency check.
Real-World Topology: The Brand Engine
This is the actual queue topology running in production across 5 Workers. It powers an autonomous content pipeline that researches keywords, generates articles, publishes to websites, and creates social media posts — all driven by queue messages with zero synchronous coupling.
The Flow
User Request
│
▼
┌─────────────┐
│ Scalable │ 1. BrandAgent receives keyword list
│ Media (SM) │ 2. Fan-out: one message per keyword → RESEARCH_QUEUE
│ │
│ BrandAgent │──────────── gatherfeed-commands ──────────────┐
│ (DO) │ │
│ │ ▼
│ │ ┌───────────┐
│ │ │ GatherFeed│
│ │ │ │
│ │ │ Brave API │
│ │ │ Workers AI│
│ │ ◀──── brand-events ──────────────────── │ │
│ │ (research.completed) └───────────┘
│ │
│ │ 3. Agent starts ContentWorkflow per keyword
│ │ 4. Workflow generates article via Workers AI
│ │ 5. Fan-out: publish + social messages
│ │
│ │──────────── publish-commands ──────────────┐
│ │ │
│ │──────────── social-commands ────────────┐ │
│ │ │ │
│ │ ◀──── brand-events ───────────────┐ │ │
│ │ (content.published) │ │ │
│ │ (social.posted) │ │ │
└──────────────┘ │ │ │
│ │ │
┌──────┘ │ │
│ │ │
│ ┌──────┘ │
│ │ │
│ │ ┌──────┘
│ │ │
│ ▼ ▼
┌──────────┐ ┌──────────┐
│Social- │ │Pages- │
│Good │ │Plus │
│ │ │ │
│X, IG, │ │Multi-site│
│LinkedIn │ │SSR │
└──────────┘ └──────────┘
Message Flow for “Research 3 Keywords”
Here’s the exact sequence of queue messages when SM processes 3 keywords for a brand:
Time Queue Message Type Source → Consumer
──── ───── ──────────── ────────────────
T+0 gatherfeed-commands research.requested SM → GF (keyword 1)
T+0 gatherfeed-commands research.requested SM → GF (keyword 2)
T+0 gatherfeed-commands research.requested SM → GF (keyword 3)
T+5s brand-events research.completed GF → SM (keyword 1)
T+8s brand-events research.completed GF → SM (keyword 3)
T+12s brand-events research.completed GF → SM (keyword 2)
↑ Note: out of order — this is expected
T+13s (ContentWorkflow starts for keyword 1)
T+14s (ContentWorkflow starts for keyword 3)
T+15s (ContentWorkflow starts for keyword 2)
T+45s brand-events content.generated SM → SM (keyword 1)
T+52s brand-events content.generated SM → SM (keyword 3)
T+46s publish-commands content.publish SM → PP (keyword 1)
T+46s social-commands social.create SM → SG (keyword 1)
T+53s publish-commands content.publish SM → PP (keyword 3)
T+53s social-commands social.create SM → SG (keyword 3)
T+50s brand-events content.published PP → SM (keyword 1)
T+57s brand-events content.published PP → SM (keyword 3)
T+60s brand-events content.generated SM → SM (keyword 2)
T+61s publish-commands content.publish SM → PP (keyword 2)
T+65s brand-events content.published PP → SM (keyword 2)
Total queue messages for 3 keywords: 15 messages (3 research + 3 completed + 3 generated + 3 publish + 3 social). At $0.0000012 per message, this costs $0.000018. The external API calls cost orders of magnitude more than the messaging.
The Configuration Files
All five services have their topology declared in wrangler.jsonc. Here’s the complete picture:
Scalable Media (SM) — The Brain:
{
"name": "scalable-media-api",
"main": "src/index.ts",
"compatibility_flags": ["nodejs_compat"],
"ai": { "binding": "AI" },
"d1_databases": [
{
"binding": "DB",
"database_name": "scalable-media-db",
"database_id": "061fb3fb-7147-44fc-b99e-e99ee479fc51",
"migrations_dir": "migrations"
}
],
"r2_buckets": [
{ "binding": "STORAGE", "bucket_name": "scalable-media-data" }
],
"durable_objects": {
"bindings": [
{ "name": "BRAND_AGENT", "class_name": "BrandAgent" },
{ "name": "COORDINATOR_AGENT", "class_name": "CoordinatorAgent" }
]
},
"migrations": [
{ "tag": "v1", "new_sqlite_classes": ["BrandAgent"] },
{ "tag": "v2", "new_sqlite_classes": ["CoordinatorAgent"] }
],
"workflows": [
{
"name": "content-workflow",
"binding": "CONTENT_WORKFLOW",
"class_name": "ContentWorkflow"
}
],
"triggers": {
"crons": ["0 6 * * SUN"]
},
"queues": {
"producers": [
{ "queue": "gatherfeed-commands", "binding": "RESEARCH_QUEUE" },
{ "queue": "publish-commands", "binding": "PUBLISH_QUEUE" },
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" },
{ "queue": "social-commands", "binding": "SOCIAL_QUEUE" }
],
"consumers": [
{
"queue": "sm-commands",
"max_batch_size": 10,
"max_batch_timeout": 5,
"dead_letter_queue": "sm-commands-dlq"
},
{
"queue": "brand-events",
"max_batch_size": 10,
"max_batch_timeout": 5,
"dead_letter_queue": "brand-events-dlq"
}
]
}
}
GatherFeed (GF) — The Eyes:
{
"name": "gatherfeed-api",
"main": "src/index.ts",
"compatibility_flags": ["nodejs_compat"],
"ai": { "binding": "AI" },
"d1_databases": [
{
"binding": "DB",
"database_name": "gatherfeed-db",
"database_id": "277bbcbe-b83c-4b52-8ce0-4d37ddaf5b10",
"migrations_dir": "migrations"
}
],
"r2_buckets": [
{ "binding": "STORAGE", "bucket_name": "gatherfeed-data" }
],
"services": [
{ "binding": "APIMOM", "service": "apimom-router" }
],
"queues": {
"producers": [
{ "queue": "brand-events", "binding": "EVENTS_QUEUE" }
],
"consumers": [
{
"queue": "gatherfeed-commands",
"max_batch_size": 5,
"max_batch_timeout": 10,
"dead_letter_queue": "gatherfeed-commands-dlq"
}
]
}
}
Why This Topology Works
-
No circular dependencies. SM produces to GF, PP, SG. They produce events back to the shared queue. SM consumes events. The flow is always: command out → event back.
-
Each service is independently deployable. Updating GatherFeed’s research pipeline doesn’t require touching SM, PP, or SG. The queue contract (message schema) is the only integration point.
-
Failure is isolated. If Pages-Plus goes down,
publish-commandsmessages accumulate in the queue. GatherFeed and Social-Good keep working. When PP comes back, it processes the backlog automatically. -
Scaling is automatic. If SM fans out 100 keywords,
gatherfeed-commandsgets 100 messages. The queue auto-scales GatherFeed’s consumer from 1 to up to 250 concurrent invocations to handle the backlog. -
Cost is negligible. Even at 10,000 messages per day (a busy day), the queue cost is under $1/month. The external API calls (Brave, Workers AI, Gemini) dominate the cost by 100x.
Platform Limits
Before designing your topology, know the limits. These are the documented constraints as of 2025:
| Limit | Value | Notes |
|---|---|---|
| Max queues per account | 10,000 | More than enough for any architecture |
| Max message size | 128 KB | ~100 bytes overhead counted toward limit |
| Max batch size (send) | 100 messages OR 256 KB | Per sendBatch() call |
| Max batch size (consumer) | 100 messages | Per consumer invocation |
| Max batch timeout | 60 seconds | How long to wait for a full batch |
| Max retries | 100 | Per message |
| Throughput per queue | 5,000 messages/second | Per queue — add more queues to scale |
| Max backlog per queue | 25 GB | Far more than you’ll ever need |
| Consumer wall-clock time | 15 minutes | Maximum time to process a batch |
| Consumer CPU time | Up to 5 minutes | Configurable (default 30s) |
| Max concurrent consumers | 250 | Per queue, push-based only |
| Message retention | Up to 14 days | 24 hours on free plan |
| Message delay | Up to 24 hours | Both send-time and retry-time |
| DLQ retention (no consumer) | 4 days | Messages deleted after 4 days without consumer |
| Pull consumer visibility timeout | Up to 12 hours | Time before unacked message is redelivered |
Key insight: The 5,000 messages/second per-queue throughput means you should design around multiple queues rather than routing everything through one. Our topology uses 5 command/event queues plus 3 DLQs. Even our busiest queue (brand-events) is nowhere near the 5,000/s limit.
Comparisons
Cloudflare Queues vs. AWS SQS vs. GCP Pub/Sub vs. RabbitMQ vs. Kafka
| Feature | Cloudflare Queues | AWS SQS | GCP Pub/Sub | RabbitMQ | Kafka |
|---|---|---|---|---|---|
| Deployment | Zero-config (in wrangler.jsonc) | Console/CLI/IaC | Console/CLI/IaC | Self-hosted or managed | Self-hosted or managed |
| Delivery guarantee | At-least-once | At-least-once (Standard) or exactly-once (FIFO) | At-least-once | At-least-once or at-most-once | At-least-once (configurable) |
| Ordering | No ordering guarantee | FIFO queues: ordered within group | Ordered within partition | Queue-level ordering | Partition-level ordering |
| Message size | 128 KB | 256 KB (up to 2 GB via S3) | 10 MB | No hard limit (practical ~128 MB) | 1 MB default (configurable) |
| Retention | Up to 14 days | Up to 14 days | 31 days | Until consumed | Configurable (forever possible) |
| Consumer model | Push (1 per queue) or Pull (HTTP) | Pull-based | Push (subscription) or Pull | Push or Pull | Pull (consumer groups) |
| Multi-consumer | 1 push consumer per queue | 1 consumer group per queue | Multiple subscriptions | Multiple consumers with exchanges | Multiple consumer groups |
| DLQ | Built-in (1 line config) | Built-in | Built-in (dead-letter topic) | Built-in (per-queue) | Manual (via consumer logic) |
| Auto-scaling | Up to 250 concurrent consumers | Unlimited (Lambda trigger) | Unlimited | Manual | Consumer group rebalancing |
| Batching | Up to 100 messages | Up to 10 messages | Up to 1,000 messages | Prefetch count | Configurable fetch size |
| Throughput | 5,000 msg/s per queue | Nearly unlimited | Nearly unlimited | ~50,000 msg/s (varies) | Millions msg/s per cluster |
| Pricing (per 1M msgs) | ~$1.20 (3 ops each) | ~$0.40 (standard) | ~$0.04 (per 10 KB units) | Self-hosted or $varies | Self-hosted or $varies |
| Egress fees | None | Yes ($0.09/GB+) | Yes ($0.12/GB) | N/A (self-hosted) | N/A (self-hosted) |
| Workers integration | Native (binding in wrangler.jsonc) | Lambda trigger or SDK | Cloud Functions or SDK | AMQP client | Client library |
| Best for | Workers-native architectures | AWS ecosystem, high scale | GCP ecosystem, pub/sub patterns | Flexible routing, protocols | Event streaming, log aggregation |
Honest Assessment
Cloudflare Queues is best when:
- Your producers and consumers are all Cloudflare Workers
- You want zero infrastructure management (no servers, no clusters, no tuning)
- Your throughput is moderate (under 5,000 msg/s per queue)
- You value simplicity over features (no FIFO, no exactly-once)
- You want predictable, low cost with no egress fees
Cloudflare Queues is worst when:
- You need strict message ordering (use SQS FIFO or Kafka)
- You need exactly-once processing guarantees (use SQS FIFO)
- You need multiple push consumers per queue (use Pub/Sub or Kafka consumer groups)
- You need millions of messages per second (use Kafka)
- You need messages larger than 128 KB (use SQS with S3 or Kafka)
- Your consumers aren’t on Cloudflare Workers (use any managed queue service)
Cloudflare Queues fills a specific niche: it’s the “just enough” messaging system for Workers-native architectures. It’s not trying to compete with Kafka’s throughput or Pub/Sub’s subscription model. It’s trying to be the simplest possible way to decouple Workers, and it succeeds at that.
When You’d Switch Away
| Trigger | Switch To | Why |
|---|---|---|
| Need ordering guarantees | AWS SQS FIFO | FIFO queues guarantee message ordering within a group |
| Need multiple subscribers per event | GCP Pub/Sub | Native multi-subscription model |
| Need event replay/rewind | Kafka | Log-based storage — consumers can rewind to any offset |
| Throughput > 5,000 msg/s sustained | Kafka or Pub/Sub | Built for high-throughput streaming |
| Need complex routing rules | RabbitMQ | Exchanges, routing keys, topic matching |
| Need cross-cloud messaging | Any managed service | Queues is Cloudflare-only |
The “Just Enough” Argument
The 1-consumer-per-queue limitation is the most common objection to Cloudflare Queues. In systems like Pub/Sub or Kafka, you can have multiple independent subscribers to the same topic. In Cloudflare Queues, you work around this by having one consumer that routes events to multiple handlers (Pattern 5 above).
This is a deliberate trade-off. One consumer per queue means:
- Simpler debugging (one place to look)
- No consumer group coordination overhead
- Deterministic message flow (you know exactly which Worker processes each queue)
- The router pattern is 20 lines of code — it’s not a burden
If you find yourself needing 5+ independent subscribers to the same event stream, you’ve outgrown Cloudflare Queues. That’s fine — switch to Pub/Sub or Kafka for that specific queue while keeping Cloudflare Queues for point-to-point command queues.
Anti-Patterns
| Don’t | Do Instead | Why |
|---|---|---|
fetch(other_worker_url, { method: 'POST', body }) between Workers | env.QUEUE.send(message) | Synchronous calls couple services, cascade failures, prevent independent retry |
| Put full article HTML in queue messages | Put content_id in message, HTML in D1/R2 | Messages have a 128 KB limit; large payloads slow serialization; consumers should read latest state |
| Assume messages arrive in order | Design handlers to work regardless of order | Cloudflare Queues has no ordering guarantee |
Process without checking event_id | Always check processed_events table first | At-least-once delivery means duplicates happen |
Use batch.retryAll() when one message fails | Use per-message msg.ack() / msg.retry() | retryAll re-processes already-succeeded messages (wasteful, risks side effects) |
| Ack before processing | Process → record → ack (in that order) | If you ack first and crash, the message is lost forever |
| One giant queue for all message types | Separate command queues per service + shared events queue | Mixing concerns makes routing complex, tuning impossible, debugging painful |
| Ignore the DLQ | Configure DLQ for every queue, build a consumer for monitoring | Without DLQ monitoring, failed messages disappear silently |
Set max_concurrency: 1 everywhere | Leave it on auto (default) | Auto-scaling handles backlog; setting to 1 creates bottlenecks |
| Retry malformed/invalid messages | Ack them and log the error | Malformed messages will never succeed — retrying wastes resources and fills the DLQ |
| Store processing state in messages | Store state in D1 or DO SQLite; messages are ephemeral signals | Messages are transient; state must survive message deletion |
| Poll for results after sending a command | React to the completion event when it arrives | Polling adds latency, cost, and complexity; events are the right pattern |
| Send a batch message with N keywords | Fan out: send N individual messages, one per keyword | Individual messages get independent retry, acknowledgment, and concurrency |
| Call external APIs directly from BrandAgent | Agent emits command → queue → consumer calls via API Mom | Agents should be decision-makers (control plane), not executors (data plane) |
| Use service bindings for business logic | Use service bindings for infrastructure (key lookup, auth); use queues for business logic | Service bindings create synchronous coupling — exactly what queues are designed to eliminate |
| Create deeply nested queue chains (A → B → C → D → E) | Keep chains shallow (max 3 hops); use correlation_id to trace | Deep chains are hard to debug, have compounding failure rates, and increase end-to-end latency |
Migration Checklist: From Synchronous to Event-Driven
If you’re converting an existing synchronous architecture to event-driven:
- Identify all
fetch()calls between Workers. Each one becomes a queue message. - Create command queues — one per consuming service (
<service>-commands). - Add
processed_eventstable to each service’s D1 schema. - Implement queue consumer (
async queue(batch)) with idempotency check. - Replace
fetch()withenv.QUEUE.send()in the producing service. - Add event emission — after completing work, emit a domain event to
brand-events. - Wire your orchestrator (BrandAgent or equivalent) to react to events.
- Add DLQ configuration to
wrangler.jsoncfor every command queue. - Remove synchronous HTTP endpoints used for inter-service actions (keep read-only GETs).
- Test with duplicate messages — send the same
event_idtwice, verify idempotency.
-- Step 3: The processed_events table (same in every service)
CREATE TABLE IF NOT EXISTS processed_events (
event_id TEXT PRIMARY KEY,
type TEXT NOT NULL,
source TEXT,
processed_at TEXT NOT NULL
);
-- Index for cleanup queries
CREATE INDEX IF NOT EXISTS idx_processed_events_processed_at
ON processed_events (processed_at);
Advanced: Queue Patterns with Durable Objects
Durable Objects and Queues complement each other. DOs provide per-entity state and decision-making. Queues provide cross-service communication. The BrandAgent pattern combines both.
The DO as Event Reactor
export class BrandAgent extends Agent<Env, BrandState> {
/**
* The BrandAgent is a control plane entity. It:
* 1. Receives events from the queue router
* 2. Makes decisions based on its state
* 3. Emits commands to downstream queues
*
* It never does heavy work itself — it delegates via queues.
*/
async onResearchCompleted(
event: DomainMessage<ResearchCompletedPayload>
) {
const { brand_slug, research_id, keyword } = event.payload;
// Decision: should we generate content for this keyword?
const alreadyHasContent = await this.env.DB.prepare(
"SELECT id FROM content WHERE brand_id = ? AND keyword = ? AND status != 'archived'"
)
.bind(this.state.brand_id, keyword)
.first();
if (alreadyHasContent) {
console.log(`Skipping "${keyword}" — content already exists`);
return;
}
// Decision: start content workflow
await this.env.CONTENT_WORKFLOW.create({
params: {
brand_slug,
keyword,
research_id,
template: "blog",
},
});
this.setState({
...this.state,
pending_research: Math.max(0, this.state.pending_research - 1),
last_research_cycle: new Date().toISOString(),
});
}
/**
* Safeguard: don't auto-action if pipeline is busy.
* This prevents overwhelming downstream services.
*/
async canAutoAction(): Promise<boolean> {
const pending = await this.env.DB.prepare(
"SELECT COUNT(*) as count FROM content WHERE brand_id = ? AND status = 'draft'"
)
.bind(this.state.brand_id)
.first<{ count: number }>();
return (pending?.count ?? 0) < 5;
}
}
Workflow Integration
Cloudflare Workflows provide durable multi-step execution. Combined with queues, they handle the “research → generate → publish” pipeline with automatic step-level retry:
import {
WorkflowEntrypoint,
type WorkflowEvent,
type WorkflowStep,
} from "cloudflare:workers";
interface ContentWorkflowParams {
brand_slug: string;
keyword: string;
research_id: string;
template: string;
}
export class ContentWorkflow extends WorkflowEntrypoint<
Env,
ContentWorkflowParams
> {
async run(event: WorkflowEvent<ContentWorkflowParams>, step: WorkflowStep) {
const { brand_slug, keyword, research_id, template } = event.payload;
// Step 1: Fetch research from GatherFeed (read-only API — OK to use fetch)
const research = await step.do("fetch-research", async () => {
const response = await this.env.GATHERFEED.fetch(
`https://gatherfeed/api/v1/research?topic=${encodeURIComponent(keyword)}&project=${brand_slug}`
);
return response.json();
});
// Step 2: Generate article with Workers AI
const content = await step.do("generate-article", async () => {
const result = await this.env.AI.run("@cf/meta/llama-3.1-70b-instruct", {
prompt: `Write an article about "${keyword}" for the ${brand_slug} brand...`,
});
return result;
});
// Step 3: Store in D1
const contentId = await step.do("store-content", async () => {
const id = crypto.randomUUID();
await this.env.DB.prepare(
"INSERT INTO content (id, brand_id, keyword, title, content_html, status) VALUES (?, ?, ?, ?, ?, 'draft')"
)
.bind(id, brand_slug, keyword, content.title, content.html)
.run();
return id;
});
// Step 4: Emit content.generated event (triggers BrandAgent fan-out)
await step.do("emit-event", async () => {
await this.env.EVENTS_QUEUE.send({
event_id: crypto.randomUUID(),
type: "content.generated",
source: "scalable-media",
timestamp: new Date().toISOString(),
payload: {
brand_slug,
content_id: contentId,
keyword,
word_count: content.html.split(/\s+/).length,
},
});
});
}
}
Each step.do() is durable — if the Worker crashes between steps, the workflow resumes from the last completed step. This is why you don’t need to build your own retry logic for multi-step processes.
Testing Queue Consumers Locally
Wrangler dev supports local queue testing. When you run wrangler dev, both the producer and consumer run in the same process, and messages flow through an in-memory queue.
# Run the producer and consumer in the same wrangler dev session
npx wrangler dev
# In another terminal, send a test message via the API
curl -X POST http://localhost:8787/v1/brands/llc-tax/research \
-H "Content-Type: application/json" \
-H "X-Service-Key: test-key" \
-d '{"keywords": ["llc tax deductions"]}'
For testing separate producer and consumer Workers locally:
# Terminal 1: Run GatherFeed (consumer)
cd ~/Work/gather-feed && npx wrangler dev
# Terminal 2: Run Scalable Media (producer)
cd ~/Work/scalable-media && npx wrangler dev
# The --remote flag connects to real queues on Cloudflare's network
# Without it, queues are in-memory and don't cross Worker boundaries
For integration testing, use the --remote flag to connect to real queues:
# Connect to real Cloudflare Queues (messages go through the actual queue)
npx wrangler dev --remote
Testing Idempotency
// Test: send the same message twice, verify it's processed only once
async function testIdempotency(env: Env) {
const eventId = crypto.randomUUID();
const message: DomainMessage = {
event_id: eventId,
type: "research.requested",
source: "test",
timestamp: new Date().toISOString(),
payload: { brand_slug: "test-brand", keyword: "test keyword" },
};
// Send the same message twice
await env.RESEARCH_QUEUE.send(message);
await env.RESEARCH_QUEUE.send(message);
// After processing, check: should be processed exactly once
const count = await env.DB.prepare(
"SELECT COUNT(*) as c FROM processed_events WHERE event_id = ?"
)
.bind(eventId)
.first<{ c: number }>();
console.assert(count?.c === 1, "Message should be processed exactly once");
}
Monitoring and Observability
Built-in Observability
Cloudflare’s Workers observability captures queue consumer invocations automatically when you enable it:
{
"observability": {
"enabled": true,
"head_sampling_rate": 1 // 1 = capture all invocations
}
}
This gives you:
- Consumer invocation logs in the Cloudflare dashboard
- Error rates and latency metrics
- Automatic correlation of queue consumer invocations
Custom Logging
Add structured logging to your queue consumers for debugging:
function logMessage(
action: string,
msg: Message<DomainMessage>,
extra?: Record<string, unknown>
) {
console.log(
JSON.stringify({
action,
event_id: msg.body.event_id,
type: msg.body.type,
source: msg.body.source,
correlation_id: msg.body.correlation_id,
attempt: msg.attempts,
queue_timestamp: msg.timestamp.toISOString(),
processing_delay_ms: Date.now() - msg.timestamp.getTime(),
...extra,
})
);
}
// Usage in consumer:
for (const msg of batch.messages) {
logMessage("received", msg);
try {
await processMessage(msg.body, env);
logMessage("processed", msg, { success: true });
msg.ack();
} catch (err) {
logMessage("failed", msg, {
error: err instanceof Error ? err.message : String(err),
will_retry: msg.attempts < 3,
});
msg.retry({ delaySeconds: 30 });
}
}
Queue Health Dashboard Query
Monitor your queue health by tracking the processed_events and dead_messages tables:
-- Messages processed in the last hour, by type
SELECT type, COUNT(*) as count, source
FROM processed_events
WHERE processed_at > datetime('now', '-1 hour')
GROUP BY type, source
ORDER BY count DESC;
-- DLQ messages in the last 24 hours
SELECT queue, type, COUNT(*) as count
FROM dead_messages
WHERE failed_at > datetime('now', '-1 day')
GROUP BY queue, type;
-- Processing throughput (messages per minute, last 30 minutes)
SELECT
strftime('%H:%M', processed_at) as minute,
COUNT(*) as messages
FROM processed_events
WHERE processed_at > datetime('now', '-30 minutes')
GROUP BY minute
ORDER BY minute;
Design Principles Summary
-
Services are autonomous. Each service owns its database, makes its own decisions, and runs on its own schedule. No service waits for another.
-
Commands are directed, events are broadcast. A command says “do this” (message to a specific consumer). An event says “this happened” (anyone can observe).
-
Databases are the integration layer. Services exchange references (IDs), not data. Data lives in D1/R2. Messages are signals, databases are state.
-
Design for at-least-once. Every consumer is idempotent. Duplicate processing is harmless. This is not optional.
-
No service knows its callers. GatherFeed doesn’t know SM asked for research. Services react to what’s in their queue, not who sent it.
-
Async by default, sync only at boundaries. The only synchronous HTTP is user-facing APIs and infrastructure service bindings. All business logic communication is async via queues.
-
Fan-out at the producer. One request becomes N independent messages. Each gets its own lifecycle.
-
Keep chains shallow. Max 3 hops. If your chain is deeper, your services are doing too little per hop.
-
Monitor the DLQ. Failed messages are data — they tell you what’s broken. Don’t ignore them.
-
The cost argument is over. At $1.20 per million messages, the question is never “can we afford a queue?” It’s “does a queue make the architecture cleaner?”
References
Official Documentation
- Cloudflare Queues Overview — landing page with quick start and feature overview
- Getting Started with Cloudflare Queues — step-by-step setup guide
- Queue Configuration — wrangler.jsonc settings for producers and consumers
- JavaScript APIs — TypeScript types for Queue, MessageBatch, Message
- Batching, Retries, and Delays — max_batch_size, max_retries, delaySeconds behavior
- Dead Letter Queues — DLQ configuration, behavior, retention (4 days)
- Consumer Concurrency — auto-scaling, max_concurrency (1-250)
- Pull Consumers — HTTP-based consumption for external systems
- How Queues Works — at-least-once delivery, no ordering guarantee
- Queues Limits — 128 KB messages, 5,000 msg/s, 250 concurrent consumers
- Queues Pricing — $0.40 per million operations, 1M free/month
- Local Development — testing queues with wrangler dev
- Wrangler Commands for Queues — CLI reference for queue management
- Publish to Queues via HTTP — May 2025 feature: HTTP-based message publishing
- Pull Consumer Limits Increased — April 2025: 5,000 msg/s pull throughput
- Service Bindings — Worker-to-Worker synchronous RPC
- Cloudflare Workflows — durable multi-step execution for Workers
Architecture & Design
- Introducing Cloudflare Queues — Cloudflare blog post announcing Queues with design rationale
- Architecting on Cloudflare: Queues — Chapter 8 of the community guide to Cloudflare architecture patterns
- Cloudflare Queues: globally distributed queues without egress fees — InfoQ coverage of the Queues beta launch
Comparisons
- Amazon SQS Pricing — AWS SQS pricing details ($0.40/million standard, FIFO higher)
- Amazon SQS vs Google Cloud Pub/Sub — Ably’s comparison of managed queue services
- Kafka vs. JMS, RabbitMQ, SQS — 2025 Edition — comprehensive messaging comparison
- Google Cloud Pub/Sub vs Kafka vs RabbitMQ — StackShare community comparison
- Cloudflare’s alternative to Amazon SQS — community discussion on Cloudflare vs AWS messaging
Community & Troubleshooting
- Messages Not Moving to DLQ After Max Retries — Cloudflare community troubleshooting thread
- Running separate producer/consumer Workers locally — GitHub issue clarifying local dev with multiple Workers