Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28
registerProcessor('topic-researcher', async (ctx) => {
const pending = await ctx.db.prepare(
"SELECT * FROM artifacts WHERE format = 'data' AND status = 'pending' AND niche = ?"
).bind(ctx.config.niche).all()
for (const item of pending.results) {
const research = await fetchResearch(item.topic, ctx)
await ctx.db.prepare(
"INSERT INTO artifacts (id, format, status, tags, created_by, created_at, updated_at) VALUES (?, 'data', 'ready', ?, 'topic-researcher', datetime('now'), datetime('now'))"
).bind(crypto.randomUUID(), JSON.stringify([item.topic])).run()
}
return pending.results.map(item => ({
processor: 'topic-researcher',
entity_id: item.id,
success: true,
cost_usd: 0.02,
tokens_input: 500,
tokens_output: 1200,
duration_ms: 3400,
entity_type: 'artifact',
model_used: 'gemini-2.5-flash',
}))
})
That’s a complete processor. It checks for work, does it, stores the result, reports cost. It doesn’t know what runs before it or after it. It doesn’t trigger anything. It just enriches the store.
- The Problem
- The Core Insight
- The 7 Processor Types
- The Artifact Store
- The Edges Table
- The Processor Registry
- Budget Management
- Adaptive Scheduling
- Composition Patterns
- Implementation on Cloudflare
- Practical Pipeline Examples
- Comparisons
- Anti-Patterns
- References
AI pipelines are typically built as imperative chains:
research → generate → review → publish
Every step calls the next. This creates five problems that compound as the system grows.
Tight coupling. Step 2 knows step 1’s output shape. Change the researcher and the generator breaks.
Fragile chains. Step 3 fails and everything stops. No partial progress. Retry means re-running the whole chain.
No compounding. Each run starts fresh. A researcher that ran 100 times has the same effect as one that ran once — the output of the last run is all that matters.
Budget blindness. No cost control until you get the bill. LLM calls are fire-and-forget. One runaway generation burns through your monthly budget in an hour.
Temporal coupling. Everything must run at once. The researcher must finish before the generator starts. The generator must finish before the reviewer starts. One slow step blocks everything downstream.
// The imperative approach — every problem at once
const research = await callResearcher(topic) // step 1 blocks
const article = await callGenerator(research) // coupled to step 1's shape
const reviewed = await callReviewer(article) // fails = start over
await callPublisher(reviewed) // no budget tracking anywhere
The fix isn’t better error handling. It’s a different architecture.
Processors don’t call each other. They share nothing at runtime. They communicate through their stores.
A researcher accumulating intelligence for 3 months produces fundamentally different output than one triggered on-demand. The architecture is the same — the depth changes.
One loop, everywhere:
1. Check input store — is there work ready for me?
2. If yes → do my thing → write output to my store
3. If no → done (try again next cycle)
That’s it. Every processor in the system — researcher, generator, adapter, reviewer, publisher, collector — runs this same loop. The only difference is what “inputs ready” means and what “do my thing” produces.
No orchestrator. No event bus between processors. No dependency graph. Each processor polls its store on its own schedule. The system converges toward completeness without coordination.
Core Interface
All seven processor types share one interface. The type system enforces the contract:
interface Processor<TInput = Artifact, TOutput = Artifact> {
/** Unique name used for registry lookup */
name: string
/** One of the seven types */
type: ProcessorType
/** Determines execution ordering within a cycle */
phase: Phase
/**
* The processor function itself.
* Receives context (artifact refs, config, API clients).
* Returns new or modified artifacts.
* Must be pure relative to external state — all side effects go through ctx.
*/
process(ctx: ProcessorContext, input: TInput[]): Promise<TOutput[]>
}
type ProcessorType =
| 'researcher'
| 'generator'
| 'adapter'
| 'reviewer'
| 'publisher'
| 'collector'
| 'storer'
type Phase = 'gather' | 'generate' | 'adapt' | 'review' | 'publish' | 'collect'
1. Researcher — Gathers External Data
Researchers pull information from outside the system and store it as data artifacts. They don’t create content — they create the raw material that generators will later use.
| Variant | What it accumulates | Persistence |
|---|---|---|
| Topic research | structured findings by niche/topic | D1 + R2 |
| Keyword research | volume, difficulty, trends, SERP data | D1 |
| Competitor scan | competitor content, gaps, positioning | D1 + R2 |
| Trend detection | trending topics, velocity, lifecycle stage | D1 |
| Web scrape | raw extracted data from any source | R2 |
The longer it runs, the deeper the knowledge base. A researcher that’s been scanning a niche for 3 months knows things a fresh run never will.
import { registerProcessor, ProcessorContext, ProcessorResult } from './registry'
registerProcessor('keyword-researcher', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db, config } = ctx
// Find topics that need keyword research
const topics = await db.prepare(`
SELECT id, title, niche FROM artifacts
WHERE format = 'data' AND status = 'ready'
AND tags LIKE '%topic%'
AND id NOT IN (
SELECT DISTINCT json_each.value FROM artifacts a,
json_each(a.source_ids) WHERE a.created_by = 'keyword-researcher'
)
ORDER BY created_at DESC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
const results: ProcessorResult[] = []
for (const topic of topics.results) {
const startTime = Date.now()
try {
// Call external API (DataForSEO, Perplexity, etc.)
const keywords = await fetchKeywordData(topic.title, topic.niche, ctx)
// Store as a new data artifact
await db.prepare(`
INSERT INTO artifacts (id, format, status, title, tags, niche, created_by, source_ids, properties, created_at, updated_at)
VALUES (?, 'data', 'ready', ?, ?, ?, 'keyword-researcher', ?, ?, datetime('now'), datetime('now'))
`).bind(
crypto.randomUUID(),
`Keywords: ${topic.title}`,
JSON.stringify(['keywords', 'research', topic.niche]),
topic.niche,
JSON.stringify([topic.id]),
JSON.stringify({ keyword_count: keywords.length, source: 'dataforseo' }),
).run()
results.push({
processor: 'keyword-researcher',
entity_type: 'keyword_research',
entity_id: topic.id,
model_used: null,
tokens_input: 0,
tokens_output: 0,
cost_usd: 0.002,
duration_ms: Date.now() - startTime,
success: true,
result_summary: `Found ${keywords.length} keywords for ${topic.title}`,
})
} catch (err: any) {
results.push({
processor: 'keyword-researcher',
entity_type: 'keyword_research',
entity_id: topic.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
2. Generator — Creates Content
Generators are the LLM-calling processors. They query the artifact store for research data and produce content artifacts. A generator doesn’t know or care where the output goes — an article is an article whether it becomes a blog post, email body, YouTube script, or pSEO page.
| Variant | What it creates | Persistence |
|---|---|---|
| Article | long-form text, any length | D1 + R2 |
| Post | short-form text, social-length | D1 |
| Image | PNG/SVG composites | R2 |
| Video | short-form video | R2 |
| Page (pSEO) | structured page content from templates | D1 |
| Email body | email HTML/text with CTA | D1 |
Quality depends on what’s in the store. A generator pulling from 3 months of accumulated research produces fundamentally different output than one pulling from yesterday’s snapshot.
registerProcessor('article-generator', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const model = ctx.selectModel() // Respects budget + tier config
const results: ProcessorResult[] = []
// Find topics with keyword research but no generated article
const topics = await db.prepare(`
SELECT a.id, a.title, a.niche, a.properties
FROM artifacts a
WHERE a.format = 'data' AND a.status = 'ready'
AND a.created_by = 'keyword-researcher'
AND a.id NOT IN (
SELECT json_each.value FROM artifacts g,
json_each(g.source_ids) WHERE g.created_by = 'article-generator'
)
ORDER BY a.created_at DESC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
for (const topic of topics.results) {
const budget = ctx.remainingBudget()
if (budget <= 0) break // Respect budget allocation
const startTime = Date.now()
try {
const article = await generateArticle(model, topic, ctx)
// Store the generated article as a text artifact
const articleId = crypto.randomUUID()
await db.prepare(`
INSERT INTO artifacts
(id, format, status, title, summary, tags, niche, author, word_count, created_by, source_ids, created_at, updated_at)
VALUES (?, 'text', 'draft', ?, ?, ?, ?, ?, ?, 'article-generator', ?, datetime('now'), datetime('now'))
`).bind(
articleId, article.title, article.summary,
JSON.stringify(article.tags), topic.niche,
'article-generator', article.wordCount,
JSON.stringify([topic.id]),
).run()
// Store full content in R2 (too large for D1)
await ctx.r2.put(`artifacts/${articleId}/content.md`, article.content)
results.push({
processor: 'article-generator',
entity_type: 'article',
entity_id: articleId,
model_used: model,
tokens_input: article.tokensIn,
tokens_output: article.tokensOut,
cost_usd: article.cost,
duration_ms: Date.now() - startTime,
success: true,
result_summary: `Generated "${article.title}" (${article.wordCount} words)`,
})
} catch (err: any) {
results.push({
processor: 'article-generator',
entity_type: 'article',
entity_id: topic.id,
model_used: model,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
3. Adapter — Transforms Between Formats
Adapters reshape content without creating new ideas. Two sub-types:
Voice adapter — applies brand personality (tone, vocabulary, persona, style rules) to any content. Input: raw content + brand config. Output: same content in brand voice.
Format adapter — reshapes for platform constraints (character limits, hashtag conventions, media requirements). Input: content + platform rules. Output: platform-ready content.
Adapters are stateless transforms. They don’t accumulate. Config store only (brand voices, platform rule sets).
registerProcessor('social-adapter', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const model = ctx.selectModel()
const results: ProcessorResult[] = []
// Find articles that haven't been adapted for social yet
const articles = await db.prepare(`
SELECT a.id, a.title, a.summary, a.tags, a.niche
FROM artifacts a
WHERE a.format = 'text' AND a.status IN ('ready', 'reviewed')
AND a.created_by IN ('article-generator', 'voice-adapter')
AND a.id NOT IN (
SELECT json_each.value FROM artifacts s,
json_each(s.source_ids) WHERE s.created_by = 'social-adapter'
)
ORDER BY a.created_at DESC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
for (const article of articles.results) {
const startTime = Date.now()
try {
// Read full content from R2
const content = await ctx.r2.get(`artifacts/${article.id}/content.md`)
const text = content ? await content.text() : article.summary
// Generate multiple social posts from one article
const posts = await adaptForSocial(model, { ...article, content: text }, ctx)
for (const post of posts) {
const postId = crypto.randomUUID()
await db.prepare(`
INSERT INTO artifacts
(id, format, status, title, tags, niche, voice, word_count, created_by, source_ids, properties, created_at, updated_at)
VALUES (?, 'text', 'draft', ?, ?, ?, ?, ?, 'social-adapter', ?, ?, datetime('now'), datetime('now'))
`).bind(
postId, post.title,
JSON.stringify([...JSON.parse(article.tags), 'social', post.platform]),
article.niche, post.voice, post.wordCount,
JSON.stringify([article.id]),
JSON.stringify({ platform: post.platform, char_count: post.charCount }),
).run()
// Record edge for lineage tracking
await db.prepare(`
INSERT INTO edges (source_id, target_id, relation, created_at)
VALUES (?, ?, 'adapted_from', datetime('now'))
`).bind(article.id, postId).run()
}
results.push({
processor: 'social-adapter',
entity_type: 'social_posts',
entity_id: article.id,
model_used: model,
tokens_input: posts.reduce((s, p) => s + p.tokensIn, 0),
tokens_output: posts.reduce((s, p) => s + p.tokensOut, 0),
cost_usd: posts.reduce((s, p) => s + p.cost, 0),
duration_ms: Date.now() - startTime,
success: true,
result_summary: `Created ${posts.length} social posts from "${article.title}"`,
})
} catch (err: any) {
results.push({
processor: 'social-adapter',
entity_type: 'social_posts',
entity_id: article.id,
model_used: model,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
4. Reviewer — Evaluates Quality
Quality gate. Evaluates content against criteria. Can reject with feedback, triggering re-generation.
| Variant | What it checks |
|---|---|
| Brand compliance | voice, tone, guidelines adherence |
| SEO check | keyword usage, structure, meta |
| Factual check | claims vs source material |
Persists review history — what passed, what failed, why. Useful for improving generators over time.
registerProcessor('content-reviewer', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const model = ctx.selectModel()
const results: ProcessorResult[] = []
// Find draft articles that haven't been reviewed
const drafts = await db.prepare(`
SELECT a.id, a.title, a.summary, a.niche, a.word_count
FROM artifacts a
WHERE a.format = 'text' AND a.status = 'draft'
AND a.created_by IN ('article-generator', 'social-adapter', 'voice-adapter')
AND a.id NOT IN (
SELECT json_each.value FROM artifacts r,
json_each(r.source_ids) WHERE r.created_by = 'content-reviewer'
)
ORDER BY a.created_at ASC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
for (const draft of drafts.results) {
const startTime = Date.now()
try {
const content = await ctx.r2.get(`artifacts/${draft.id}/content.md`)
const text = content ? await content.text() : draft.summary
// AI review with scoring rubric
const review = await reviewContent(model, {
title: draft.title,
content: text,
niche: draft.niche,
wordCount: draft.word_count,
})
// Store the review as its own data artifact
const reviewId = crypto.randomUUID()
await db.prepare(`
INSERT INTO artifacts
(id, format, status, title, tags, created_by, source_ids, properties, created_at, updated_at)
VALUES (?, 'data', 'ready', ?, '["review", "quality"]', 'content-reviewer', ?, ?, datetime('now'), datetime('now'))
`).bind(
reviewId,
`Review: ${draft.title}`,
JSON.stringify([draft.id]),
JSON.stringify({
score: review.score,
passed: review.score >= 0.7,
feedback: review.feedback,
criteria: review.criteria,
}),
).run()
// Record edge
await db.prepare(`
INSERT INTO edges (source_id, target_id, relation, created_at)
VALUES (?, ?, 'reviewed_by', datetime('now'))
`).bind(draft.id, reviewId).run()
// Update the original artifact's status based on review score
const newStatus = review.score >= 0.7 ? 'ready' : 'draft'
await db.prepare(`
UPDATE artifacts SET status = ?, updated_at = datetime('now') WHERE id = ?
`).bind(newStatus, draft.id).run()
results.push({
processor: 'content-reviewer',
entity_type: 'review',
entity_id: draft.id,
model_used: model,
tokens_input: review.tokensIn,
tokens_output: review.tokensOut,
cost_usd: review.cost,
duration_ms: Date.now() - startTime,
success: true,
result_summary: `Reviewed "${draft.title}": ${review.score.toFixed(2)} → ${newStatus}`,
})
} catch (err: any) {
results.push({
processor: 'content-reviewer',
entity_type: 'review',
entity_id: draft.id,
model_used: model,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
5. Publisher — Deploys to Destinations
Pushes content to a destination. Thin wrapper around platform APIs. The simplest processor — just delivery.
| Variant | Destination |
|---|---|
| Social post | X, TikTok, LinkedIn, Instagram |
| Site page | brand website via SSR |
| Email send | inbox via transactional email |
Persists publish logs: what was sent, when, where, response.
registerProcessor('site-publisher', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const results: ProcessorResult[] = []
// Find reviewed articles ready for publishing
const ready = await db.prepare(`
SELECT a.id, a.title, a.summary, a.tags, a.niche
FROM artifacts a
WHERE a.format = 'text' AND a.status = 'ready'
AND a.created_by IN ('article-generator', 'voice-adapter')
AND a.id NOT IN (
SELECT e.source_id FROM edges e WHERE e.relation = 'published_to'
)
ORDER BY a.created_at ASC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
for (const article of ready.results) {
const startTime = Date.now()
try {
const content = await ctx.r2.get(`artifacts/${article.id}/content.md`)
const text = content ? await content.text() : ''
// Push to CMS / site API
const publishResult = await publishToSite({
title: article.title,
content: text,
tags: JSON.parse(article.tags),
niche: article.niche,
})
// Record the publish event as a data artifact
const publishId = crypto.randomUUID()
await db.prepare(`
INSERT INTO artifacts
(id, format, status, title, tags, created_by, source_ids, properties, created_at, updated_at)
VALUES (?, 'data', 'ready', ?, '["publish", "site"]', 'site-publisher', ?, ?, datetime('now'), datetime('now'))
`).bind(
publishId,
`Published: ${article.title}`,
JSON.stringify([article.id]),
JSON.stringify({
url: publishResult.url,
platform: 'site',
published_at: new Date().toISOString(),
}),
).run()
await db.prepare(`
INSERT INTO edges (source_id, target_id, relation, created_at)
VALUES (?, ?, 'published_to', datetime('now'))
`).bind(article.id, publishId).run()
// Mark original as archived (published lifecycle complete)
await db.prepare(`
UPDATE artifacts SET status = 'archived', updated_at = datetime('now') WHERE id = ?
`).bind(article.id).run()
results.push({
processor: 'site-publisher',
entity_type: 'publish',
entity_id: article.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: true,
result_summary: `Published "${article.title}" to ${publishResult.url}`,
})
} catch (err: any) {
results.push({
processor: 'site-publisher',
entity_type: 'publish',
entity_id: article.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
6. Collector — Aggregates Feedback
Captures audience signals back into the system. Feeds back into Researcher stores for the next cycle.
| Variant | What it captures |
|---|---|
| Email capture | form submissions, brand, source |
| Engagement | likes, replies, shares, clicks |
| Analytics | traffic, conversions, behavior |
Always listening. Continuous persistence to audience and engagement stores.
registerProcessor('engagement-collector', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const results: ProcessorResult[] = []
// Find published artifacts that need engagement data collected
const published = await db.prepare(`
SELECT a.id, a.properties, a.source_ids
FROM artifacts a
WHERE a.created_by = 'site-publisher'
AND a.status = 'ready'
AND datetime(json_extract(a.properties, '$.published_at')) < datetime('now', '-1 hour')
AND a.id NOT IN (
SELECT json_each.value FROM artifacts c,
json_each(c.source_ids) WHERE c.created_by = 'engagement-collector'
)
ORDER BY a.created_at ASC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
for (const pub of published.results) {
const startTime = Date.now()
try {
const props = JSON.parse(pub.properties)
const engagement = await fetchEngagement(props.url, props.platform)
const engId = crypto.randomUUID()
await db.prepare(`
INSERT INTO artifacts
(id, format, status, title, tags, created_by, source_ids, properties, created_at, updated_at)
VALUES (?, 'data', 'ready', ?, '["engagement", "metrics"]', 'engagement-collector', ?, ?, datetime('now'), datetime('now'))
`).bind(
engId,
`Engagement: ${props.url}`,
JSON.stringify([pub.id, ...JSON.parse(pub.source_ids)]),
JSON.stringify({
views: engagement.views,
clicks: engagement.clicks,
shares: engagement.shares,
time_on_page: engagement.avgTimeOnPage,
bounce_rate: engagement.bounceRate,
collected_at: new Date().toISOString(),
}),
).run()
results.push({
processor: 'engagement-collector',
entity_type: 'engagement',
entity_id: pub.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: true,
result_summary: `Collected ${engagement.views} views, ${engagement.clicks} clicks`,
})
} catch (err: any) {
results.push({
processor: 'engagement-collector',
entity_type: 'engagement',
entity_id: pub.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
7. Storer — Persists to Storage
Cross-cutting persistence layer. Every processor has its own store, but the Storer pattern defines how stores are queryable across the system. Handles the mechanics of putting large content in R2, metadata in D1, and cached lookups in KV.
| Variant | What it holds |
|---|---|
| Research store | queryable findings (D1 + R2) |
| Content store | generated artifacts + metadata (D1 + R2) |
| Audience store | contacts, segments, history (D1) |
| Cache | API responses, TTL-based (KV) |
registerProcessor('artifact-storer', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const results: ProcessorResult[] = []
// Find artifacts with content that needs R2 persistence
const pending = await db.prepare(`
SELECT id, format, properties FROM artifacts
WHERE status = 'pending_store'
ORDER BY created_at ASC
LIMIT ?
`).bind(ctx.processor.batch_size).all()
for (const artifact of pending.results) {
const startTime = Date.now()
try {
const props = JSON.parse(artifact.properties || '{}')
// Move large content to R2
if (props._content) {
await ctx.r2.put(`artifacts/${artifact.id}/content`, props._content)
delete props._content
props.r2_key = `artifacts/${artifact.id}/content`
}
// Update artifact with storage metadata
await db.prepare(`
UPDATE artifacts SET status = 'ready', properties = ?, updated_at = datetime('now') WHERE id = ?
`).bind(JSON.stringify(props), artifact.id).run()
// Write to KV cache for hot lookups
if (props.cache_key) {
await ctx.kv.put(
props.cache_key,
JSON.stringify({ id: artifact.id, format: artifact.format }),
{ expirationTtl: 3600 },
)
}
results.push({
processor: 'artifact-storer',
entity_type: 'store',
entity_id: artifact.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: true,
})
} catch (err: any) {
results.push({
processor: 'artifact-storer',
entity_type: 'store',
entity_id: artifact.id,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: Date.now() - startTime,
success: false,
error_message: err.message,
})
}
}
return results
})
Summary Table
| Type | Does | Input | Output | Typical cost |
|---|---|---|---|---|
| Researcher | Gathers external data | APIs, web, databases | Data artifacts | API fees |
| Generator | Creates content | Data artifacts | Content artifacts | LLM tokens (high) |
| Adapter | Transforms format/voice | Content artifacts | New content artifacts | LLM tokens (low) |
| Reviewer | Evaluates quality | Draft artifacts | Review artifacts + status updates | LLM tokens (low) |
| Publisher | Deploys to destinations | Ready artifacts | Publish records | Platform API fees |
| Collector | Aggregates feedback | Platform APIs | Engagement artifacts | Usually free |
| Storer | Persists across tiers | Any artifacts | Storage metadata | Storage costs |
The hard problem isn’t the processors — it’s naming, tagging, and linking so any processor can find what it needs with a simple query.
Three principles:
- Rich metadata — artifacts carry enough context to be found without knowing who created them
- Encourage composition — link to parents, groups, sources
- Encourage reusability — describe what they ARE, never what they’re FOR
The Cooking Analogy
Artifacts are ingredients. Label them by what they are (protein, vegetable, grain), not what dish they’re for. The same chicken works in a stir-fry, a sandwich, a soup. Don’t label it “soup chicken.” The cook decides what to make when everything is ready.
A piece of text is just text. It has a topic, a length, some tags. It doesn’t know if it’ll become a blog post, email body, book chapter, tweet thread, or video script. The processor that picks it up — the cook — makes that decision at the moment of use.
Don’t pre-define categories. Describe properties richly and let patterns emerge. A processor that needs “short text in professional voice about fintech” queries:
SELECT * FROM artifacts
WHERE format = 'text'
AND voice = 'nichefi-professional'
AND word_count < 500
AND tags LIKE '%fintech%'
The category “LinkedIn post” was never defined — it emerged from the query.
TypeScript Interfaces
interface Artifact {
id: string
format: ArtifactFormat
status: ArtifactStatus
// What is this about?
title: string | null
summary: string | null
tags: string[]
niche: string | null
project: string | null
// Who made it and how does it sound?
author: string | null
voice: string | null
// Descriptive measurements (let patterns emerge)
word_count: number | null
paragraphs: number | null
properties: ArtifactMetadata
// Where does it fit in a collection?
parent_id: string | null
group_id: string | null
sequence: number | null
// Where did it come from?
source_ids: string[]
created_by: string
created_at: string
updated_at: string
}
type ArtifactFormat = 'text' | 'outline' | 'image' | 'data' | 'mixed'
type ArtifactStatus = 'draft' | 'ready' | 'reviewed' | 'published' | 'archived'
interface ArtifactMetadata {
/** For text: reading_level, sentiment, language */
/** For images: width, height, palette */
/** For data: row_count, columns, freshness */
/** For outlines: depth, node_count */
[key: string]: unknown
}
interface ArtifactQuery {
format?: ArtifactFormat
status?: ArtifactStatus | ArtifactStatus[]
tags?: string[]
niche?: string
project?: string
voice?: string
created_by?: string
min_word_count?: number
max_word_count?: number
not_source_of?: string // Exclude artifacts already consumed by this processor
limit?: number
order_by?: 'created_at' | 'updated_at' | 'word_count'
order_dir?: 'asc' | 'desc'
}
D1 Schema
CREATE TABLE artifacts (
id TEXT PRIMARY KEY,
format TEXT NOT NULL, -- 'text', 'outline', 'image', 'data', 'mixed'
status TEXT NOT NULL, -- 'draft', 'ready', 'reviewed', 'published', 'archived'
-- Metadata
title TEXT,
summary TEXT,
tags TEXT, -- JSON array: ['scramjet', 'architecture']
niche TEXT,
project TEXT,
-- Voice & authorship
author TEXT,
voice TEXT, -- 'scramjet-technical', 'nichefi-casual', NULL if raw
-- Measurements
word_count INTEGER,
paragraphs INTEGER,
properties TEXT, -- JSON: format-specific measurements
-- text: { reading_level, sentiment, language }
-- image: { width, height, palette }
-- data: { row_count, columns, freshness }
-- outline: { depth, node_count }
-- Composition
parent_id TEXT, -- part → whole
group_id TEXT, -- all parts of a collection share this
sequence INTEGER, -- ordering within a group
-- Lineage
source_ids TEXT, -- JSON: artifact IDs this was derived from
created_by TEXT, -- processor that created it
-- Housekeeping
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Shopping queries: find artifacts by shape and topic
CREATE INDEX idx_artifacts_shop ON artifacts(format, status, niche);
CREATE INDEX idx_artifacts_voice ON artifacts(voice, format, status);
CREATE INDEX idx_artifacts_project ON artifacts(project, format, status);
CREATE INDEX idx_artifacts_group ON artifacts(group_id, sequence);
CREATE INDEX idx_artifacts_tags ON artifacts(tags);
CREATE INDEX idx_artifacts_created_by ON artifacts(created_by, status);
Artifact Lifecycle
┌──────────────────────────────────────────────┐
│ │
┌────┴───┐ ┌──────────┐ ┌──────────┐ ┌────▼────┐
│ draft │───►│ ready │───►│published │───►│archived │
└────┬───┘ └──────────┘ └──────────┘ └─────────┘
│ ▲
└──────────────┘
(rejected by reviewer,
regenerate and retry)
- draft — just created by a generator or adapter, not yet reviewed
- ready — passed review, available for publishing or further adaptation
- published — deployed to a destination by a publisher
- archived — lifecycle complete, kept for lineage and analytics
The Self-Enriching Flywheel
The store fills its own gaps. When a processor needs an artifact in a form that doesn’t exist yet, it creates it — and now it exists for everyone.
1. Query: "text, scramjet, professional voice, 800+ words"
2. Found: great scramjet text — but casual voice, 1200 words
3. Gap: no professional version exists
4. Fill: adapter creates professional version → stores it
(source_ids → the casual original, voice → 'professional')
5. Next time: professional scramjet text is already there
No one planned this. No rigid hierarchy predicted “we’ll need a professional version of article X.” The gap was discovered at the moment of need, filled, and the store got richer.
Every Operation Feeds the Store
Nothing is consumed. Nothing is thrown away. Every operation produces new artifacts.
Gather → data artifacts (research, scrapes, trends)
Transform → new artifacts (originals stay untouched)
Publish → publish records (what was sent, where, when)
Collect → engagement artifacts (likes, clicks, replies)
Review → review artifacts (scores, feedback, rejection reasons)
Failures → signal artifacts (what didn't work is valuable too)
Published content itself feeds the next cycle:
publish article → engagement data flows back → becomes data artifact
→ next generation queries "what topics got most engagement?"
→ produces better content → publishes → more engagement
→ richer store → smarter generation → ...
The store doesn’t grow linearly — it compounds. Each cycle is informed by everything that came before. Research from 3 months ago + engagement data from last week + a failed draft from yesterday = a better article today. All connected through edges, all queryable by any processor.
At D1/R2 costs, storage is essentially free. Keep everything.
How Processors Shop for Ingredients
Processors find what they need through queries, not through being told.
Researcher stores findings as data:
INSERT INTO artifacts (id, format, status, tags, niche, created_by, created_at, updated_at)
VALUES (?, 'data', 'ready', '["app-store", "productivity"]', 'productivity-apps', 'topic-researcher', datetime('now'), datetime('now'))
Generator shops by format + topic:
SELECT * FROM artifacts
WHERE format = 'data' AND status = 'ready' AND niche = 'productivity-apps'
ORDER BY created_at DESC
Assembler checks if parts of a collection are ready:
SELECT COUNT(*) as ready
FROM artifacts
WHERE format = 'text' AND status = 'ready' AND group_id = 'book-xyz'
Any processor shops for reusable content:
SELECT * FROM artifacts
WHERE format = 'text' AND status = 'ready' AND tags LIKE '%scramjet%'
AND id NOT IN (SELECT source_ids FROM artifacts WHERE created_by = 'format-adapter-x')
Gap detection — find research with no generated content:
SELECT a.* FROM artifacts a
WHERE a.format = 'data' AND a.status = 'ready'
AND a.id NOT IN (SELECT e.source_id FROM edges e WHERE e.relation = 'derived_from')
Engagement-driven prioritization:
SELECT a.niche, SUM(CAST(json_extract(e.properties, '$.engagement_score') AS REAL)) as score
FROM artifacts a
JOIN artifacts e ON e.tags LIKE '%' || a.niche || '%' AND e.created_by = 'engagement-collector'
WHERE a.format = 'data' AND a.status = 'ready'
GROUP BY a.niche
ORDER BY score DESC
source_ids as JSON works for “what was this made from?” but not for “what was made from this?” or “show me the full derivation tree.” Use a proper edges table for bidirectional graph queries.
Schema
CREATE TABLE edges (
source_id TEXT NOT NULL, -- The artifact that was used
target_id TEXT NOT NULL, -- The artifact that was created from it
relation TEXT NOT NULL, -- 'derived_from', 'adapted_from', 'part_of', 'references'
created_at TEXT NOT NULL,
PRIMARY KEY (source_id, target_id, relation)
);
-- Efficient queries in both directions
CREATE INDEX idx_edges_source ON edges(source_id);
CREATE INDEX idx_edges_target ON edges(target_id);
CREATE INDEX idx_edges_relation ON edges(relation);
Edge Types
| Relation | Meaning | Example |
|---|---|---|
derived_from | Created using this artifact as input | Article derived from keyword research |
adapted_from | Same content, different format/voice | Social post adapted from article |
part_of | This artifact is a component of a larger one | Chapter is part of a book |
references | This artifact cites or uses data from another | Article references engagement data |
reviewed_by | A review artifact evaluating this content | Review linked to the article it scored |
published_to | Content deployed to a destination | Article published to site |
Querying the Graph
// What was this artifact made from?
async function getLineage(db: D1Database, artifactId: string): Promise<Edge[]> {
return (await db.prepare(`
SELECT * FROM edges WHERE target_id = ?
`).bind(artifactId).all()).results as Edge[]
}
// What has been made from this artifact?
async function getDerivatives(db: D1Database, artifactId: string): Promise<Edge[]> {
return (await db.prepare(`
SELECT * FROM edges WHERE source_id = ?
`).bind(artifactId).all()).results as Edge[]
}
// Full derivation tree (recursive CTE — SQLite/D1 supports this)
async function getDerivationTree(db: D1Database, rootId: string): Promise<Edge[]> {
return (await db.prepare(`
WITH RECURSIVE tree AS (
SELECT source_id, target_id, relation, 0 as depth
FROM edges WHERE source_id = ?
UNION ALL
SELECT e.source_id, e.target_id, e.relation, t.depth + 1
FROM edges e
JOIN tree t ON e.source_id = t.target_id
WHERE t.depth < 10
)
SELECT * FROM tree
`).bind(rootId).all()).results as Edge[]
}
// Find all adaptations of a specific article
async function getAdaptations(db: D1Database, articleId: string): Promise<Artifact[]> {
return (await db.prepare(`
SELECT a.* FROM artifacts a
JOIN edges e ON e.target_id = a.id
WHERE e.source_id = ? AND e.relation = 'adapted_from'
`).bind(articleId).all()).results as Artifact[]
}
// Find articles that have been published but never adapted for social
async function findUnadaptedArticles(db: D1Database): Promise<Artifact[]> {
return (await db.prepare(`
SELECT a.* FROM artifacts a
WHERE a.format = 'text' AND a.status IN ('ready', 'reviewed')
AND a.created_by = 'article-generator'
AND a.id NOT IN (
SELECT e.source_id FROM edges e
WHERE e.relation = 'adapted_from'
AND e.target_id IN (
SELECT id FROM artifacts WHERE created_by = 'social-adapter'
)
)
`).all()).results as Artifact[]
}
Versioned Edge Sets
When you deploy a new pipeline version, edges from the old version still exist and remain queryable. New processors create new edges. The graph grows monotonically — nothing is deleted, only appended. This makes rollback safe: reverting to an old pipeline version means running old processors that create edges the same way they always did.
Processors are pure functions. They self-register and the control plane looks them up by name at runtime.
Types
export interface ProcessorConfig {
name: string // 'keyword-researcher', 'article-generator', etc.
type: 'deterministic' | 'llm' // Does it call an LLM?
enabled: boolean
phase: string // 'gather' | 'generate' | 'adapt' | 'review' | 'publish' | 'collect'
// LLM processors only
model_tier?: 'default' | 'expensive' | 'premium'
max_tokens?: number
temperature?: number
prompt_version?: number // Track which prompt version is active
// Budget allocation
weight: number // Share of cycle budget (0.0-1.0, all weights sum to 1.0)
max_cost_per_call: number // Hard cap per invocation in USD
// Execution
batch_size: number // Entities per tick
}
export interface ProcessorResult {
processor: string
entity_type: string
entity_id: string
model_used: string | null
tokens_input: number
tokens_output: number
cost_usd: number
duration_ms: number
success: boolean
error_message?: string
result_summary?: string
spawned_entities?: Array<{ type: string; id: string }>
}
export interface ProcessorContext {
db: D1Database
r2: R2Bucket
kv: KVNamespace
config: HarnessConfig
processor: ProcessorConfig
cycle: CycleState
selectModel: () => string // Respects budget and tier
remainingBudget: () => number // USD remaining for this processor
}
export type ProcessorFn = (ctx: ProcessorContext) => Promise<ProcessorResult[]>
Registry Implementation
const PROCESSOR_REGISTRY = new Map<string, ProcessorFn>()
export function registerProcessor(name: string, fn: ProcessorFn): void {
PROCESSOR_REGISTRY.set(name, fn)
}
export function getProcessor(name: string): ProcessorFn | undefined {
return PROCESSOR_REGISTRY.get(name)
}
export function listProcessors(): string[] {
return Array.from(PROCESSOR_REGISTRY.keys())
}
The registry is a flat map. No hierarchy, no dependency declarations, no ordering constraints. The control plane decides when to run each processor. The processor just does its work when called.
Self-Registering Processors
Each processor lives in its own file and registers itself on import:
// processors/keyword-researcher.ts
import { registerProcessor } from '../registry'
registerProcessor('keyword-researcher', async (ctx) => { /* ... */ })
// processors/article-generator.ts
import { registerProcessor } from '../registry'
registerProcessor('article-generator', async (ctx) => { /* ... */ })
The harness imports all processor files at startup, which triggers registration:
// processors/index.ts — import all to register
import './keyword-researcher'
import './article-generator'
import './social-adapter'
import './content-reviewer'
import './site-publisher'
import './engagement-collector'
import './artifact-storer'
Phase-Ordered Execution
Processors execute in phase order. Within a phase, processors run in config order. The phase sequence ensures data flows correctly:
const PHASE_ORDER = ['gather', 'generate', 'adapt', 'review', 'publish', 'collect']
The execution flow in each cycle:
gather phase: keyword-researcher → topic-researcher → web-scraper
(all researchers run, accumulating data artifacts)
generate phase: article-generator → image-generator
(generators query data artifacts, produce content)
adapt phase: voice-adapter → social-adapter → email-adapter
(adapters reshape content for different contexts)
review phase: content-reviewer → seo-reviewer
(reviewers score and gate content)
publish phase: site-publisher → social-publisher → email-publisher
(publishers deploy ready content)
collect phase: engagement-collector → analytics-collector
(collectors capture signals for the next cycle)
How the Harness Runs a Tick
The control plane runs one processor per tick to stay under CPU limits:
async function tick(config: HarnessConfig, cycle: CycleState): Promise<boolean> {
// Budget gate
if (config.budget_mode === 'hard' && cycle.cost_so_far >= config.budget_max_per_cycle) {
return true // Cycle done — out of budget
}
// Get processors for current phase
const phaseProcessors = config.processors
.filter(p => p.enabled && p.phase === cycle.phase)
if (phaseProcessors.length === 0) {
return advancePhase(cycle)
}
// Find the next processor that has work
let didWork = false
for (const proc of phaseProcessors) {
const fn = getProcessor(proc.name)
if (!fn) continue
// Per-processor budget gate (soft mode)
if (proc.type === 'llm' && config.budget_mode === 'soft') {
const allocated = config.budget_max_per_cycle * proc.weight
const spent = cycle.processor_stats[proc.name]?.cost || 0
if (spent >= allocated) continue
}
const ctx = buildContext(config, proc, cycle)
const results = await fn(ctx)
for (const r of results) {
recordResult(cycle, r)
}
didWork = results.length > 0
if (didWork) break // One processor per tick
}
if (!didWork) {
return advancePhase(cycle) // Phase exhausted
}
return false // More work to do
}
function advancePhase(cycle: CycleState): boolean {
const currentIdx = PHASE_ORDER.indexOf(cycle.phase)
if (currentIdx >= PHASE_ORDER.length - 1) {
return true // Last phase done → cycle complete
}
cycle.phase = PHASE_ORDER[currentIdx + 1]
return false // More phases to go
}
Every LLM call costs money. The architecture tracks cost at every level and stops before it overspends.
Two Modes
Hard mode. When cost_so_far >= budget_max_per_cycle, the cycle stops. Period. No more processors run.
if (config.budget_mode === 'hard' && cycle.cost_so_far >= config.budget_max_per_cycle) {
return true // done — out of budget
}
Soft mode. Each processor gets a weighted share of the total budget. When a processor exhausts its share, it gets skipped — but other processors keep running. This means researchers (API calls) and publishers (no LLM cost) can keep running even when generators (expensive LLM calls) are paused.
if (config.budget_mode === 'soft') {
const allocated = config.budget_max_per_cycle * proc.weight // e.g., 0.4 = 40%
const spent = cycle.processor_stats[proc.name]?.cost || 0
if (spent >= allocated) continue // skip — this processor is over budget
}
Weight Allocation
Each processor declares a weight from 0.0 to 1.0. All weights sum to 1.0:
| Processor | Weight | Phase | Rationale |
|---|---|---|---|
| keyword-researcher | 0.05 | gather | Cheap API calls |
| topic-researcher | 0.10 | gather | External API costs |
| article-generator | 0.35 | generate | Expensive LLM calls, most budget |
| image-generator | 0.15 | generate | Medium LLM + image API |
| voice-adapter | 0.08 | adapt | Quick transforms |
| social-adapter | 0.07 | adapt | Quick transforms |
| content-reviewer | 0.10 | review | Short evaluation prompts |
| site-publisher | 0.05 | publish | No LLM cost |
| engagement-collector | 0.05 | collect | No LLM cost |
Model Tier Routing
Three tiers: default, expensive, premium. The control plane auto-downgrades when budget is tight:
function selectModel(proc: ProcessorConfig, cycle: CycleState, config: HarnessConfig): string {
const remaining = config.budget_max_per_cycle - cycle.cost_so_far
// Budget < 20% remaining → force default tier
if (remaining < config.budget_max_per_cycle * 0.2 && proc.model_tier !== 'default') {
return config.model_default // e.g., gemini-2.5-flash
}
switch (proc.model_tier) {
case 'premium': return config.model_premium // e.g., gemini-2.5-pro
case 'expensive': return config.model_expensive // e.g., claude-sonnet-4
default: return config.model_default // e.g., gemini-2.5-flash
}
}
Graceful Degradation in Practice
Budget: $0.50/cycle
Phase Processor Cost Running total Decision
──────── ───────────────────── ─────── ───────────── ────────
gather keyword-researcher $0.01 $0.01 run
gather topic-researcher $0.03 $0.04 run
generate article-generator $0.25 $0.29 run (within allocation)
generate article-generator $0.20 $0.49 skip (soft: weight exhausted)
generate image-generator $0.08 $0.49 skip (soft: tight budget)
adapt voice-adapter $0.03 $0.49 skip (soft: tight budget)
review content-reviewer $0.02 $0.49 run (cheap, within allocation)
publish site-publisher $0.00 $0.49 run (deterministic)
collect engagement-collector $0.00 $0.49 run (deterministic)
Per-Processor Telemetry
Every processor invocation logs cost, tokens, and duration to D1:
interface ProcessorStats {
cost: number // USD spent
calls: number // invocations
total_ms: number // wall clock time
failures: number // error count
}
// Aggregated per-cycle
interface CycleTelemetry {
cycle_id: number
total_cost: number
total_tokens_input: number
total_tokens_output: number
processor_stats: Record<string, ProcessorStats>
}
The dashboard reads this in real time. Over time, you can answer:
- Which processor consumes the most budget?
- What’s the cost per published article?
- Are generators getting more efficient with better research data?
The system tunes itself based on success rate. No static intervals. No manual knobs.
The Self-Driving Loop
import { DurableObject } from 'cloudflare:workers'
// Safety rails
const ABSOLUTE_MIN_BATCH = 2
const ABSOLUTE_MAX_BATCH = 50
const MIN_INTERVAL_MS = 10_000 // fastest: 10s
const MAX_INTERVAL_MS = 120_000 // slowest: 2 min
const COOLDOWN_MS = 300_000 // 5 min on critical failure
const METRICS_WINDOW_MINUTES = 5
// Adaptive thresholds
const SUCCESS_RATE_CRITICAL = 0.2 // Below 20% → cooldown
const SUCCESS_RATE_LOW = 0.5 // Below 50% → halve batch, slow down
const SUCCESS_RATE_GOOD = 0.8 // Above 80% → gentle ramp up
const SUCCESS_RATE_GREAT = 0.95 // Above 95% → aggressive ramp up
interface ControllerState {
running: boolean
current_batch_size: number
current_interval_ms: number
cooldown_until: number
// Rolling metrics keyed by minute timestamp
metrics: Record<number, {
dispatched: number
completed: number
failed: number
}>
}
Alarm-Based Heartbeat
The Durable Object uses Cloudflare’s alarm API to wake itself. No external cron, no queue, no scheduler. The DO is its own scheduler:
class AdaptiveController extends DurableObject<Env> {
private state: ControllerState
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
ctx.blockConcurrencyWhile(async () => {
const saved = await ctx.storage.get<ControllerState>('state')
if (saved) this.state = { ...DEFAULT_STATE, ...saved }
this.pruneMetrics()
// Resume alarm if running
if (this.state.running) {
const existing = await ctx.storage.getAlarm()
if (!existing) {
await ctx.storage.setAlarm(Date.now() + this.state.current_interval_ms)
}
}
})
}
async alarm(): Promise<void> {
if (!this.state.running) return
// Cooldown check
if (Date.now() < this.state.cooldown_until) {
await this.scheduleNext()
return
}
// Pick work from artifact store
const batch = await this.pickWork(this.state.current_batch_size)
if (batch.length === 0) {
await this.scheduleNext()
return
}
// Execute the batch
const { completed, failed } = await this.executeBatch(batch)
// Record metrics
this.recordMetrics(batch.length, completed, failed)
// Adapt based on success rate
this.adapt()
// Persist and schedule next tick
await this.persist()
await this.scheduleNext()
}
private async scheduleNext(): Promise<void> {
if (this.state.running) {
await this.ctx.storage.setAlarm(Date.now() + this.state.current_interval_ms)
}
}
}
Rolling Metrics Window
Success rate is calculated over a rolling window (default: 5 minutes). Recent performance drives adaptation:
private successRate(): number {
const windowStart = Date.now() - METRICS_WINDOW_MINUTES * 60_000
let completed = 0, failed = 0
for (const [ts, m] of Object.entries(this.state.metrics)) {
if (Number(ts) >= windowStart) {
completed += m.completed
failed += m.failed
}
}
const total = completed + failed
return total === 0 ? 1 : completed / total
}
private recordMetrics(dispatched: number, completed: number, failed: number): void {
const minuteTs = Math.floor(Date.now() / 60_000) * 60_000
if (!this.state.metrics[minuteTs]) {
this.state.metrics[minuteTs] = { dispatched: 0, completed: 0, failed: 0 }
}
this.state.metrics[minuteTs].dispatched += dispatched
this.state.metrics[minuteTs].completed += completed
this.state.metrics[minuteTs].failed += failed
}
private pruneMetrics(): void {
const cutoff = Date.now() - (METRICS_WINDOW_MINUTES + 1) * 60_000
for (const ts of Object.keys(this.state.metrics)) {
if (Number(ts) < cutoff) {
delete this.state.metrics[Number(ts)]
}
}
}
Adaptive Batch Size and Interval
Four zones, each with different behavior:
private adapt(): void {
const sr = this.successRate()
const recentTotal = this.recentTotal()
// Need minimum sample size before adapting
if (recentTotal < 5) return
if (sr < SUCCESS_RATE_CRITICAL) {
// CRITICAL (< 20%): Full cooldown. Something is seriously wrong.
this.state.cooldown_until = Date.now() + COOLDOWN_MS
this.state.current_batch_size = ABSOLUTE_MIN_BATCH
this.state.current_interval_ms = MAX_INTERVAL_MS
} else if (sr < SUCCESS_RATE_LOW) {
// LOW (20-50%): Halve batch, slow down interval
this.state.current_batch_size = Math.max(
ABSOLUTE_MIN_BATCH,
Math.floor(this.state.current_batch_size * 0.5),
)
this.state.current_interval_ms = Math.min(
MAX_INTERVAL_MS,
Math.floor(this.state.current_interval_ms * 1.5),
)
} else if (sr > SUCCESS_RATE_GREAT) {
// GREAT (> 95%): Aggressive ramp up
this.state.current_batch_size = Math.min(
ABSOLUTE_MAX_BATCH,
Math.ceil(this.state.current_batch_size * 1.25),
)
this.state.current_interval_ms = Math.max(
MIN_INTERVAL_MS,
Math.floor(this.state.current_interval_ms * 0.8),
)
} else if (sr > SUCCESS_RATE_GOOD) {
// GOOD (80-95%): Gentle ramp up
this.state.current_batch_size = Math.min(
ABSOLUTE_MAX_BATCH,
Math.ceil(this.state.current_batch_size * 1.1),
)
this.state.current_interval_ms = Math.max(
MIN_INTERVAL_MS,
Math.floor(this.state.current_interval_ms * 0.95),
)
}
// NEUTRAL (50-80%): Hold steady
}
Adaptation Summary
Success rate Batch size Interval Behavior
───────────── ────────── ───────── ─────────────────────
< 20% → 2 (min) → 120s COOLDOWN 5 min
20-50% × 0.50 × 1.50 Halve batch, slow down
50-80% (hold) (hold) Wait for better signal
80-95% × 1.10 × 0.95 Gentle ramp up
> 95% × 1.25 × 0.80 Aggressive ramp up
HTTP API for Dashboard Control
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url)
if (request.method === 'GET' && url.pathname === '/status') {
return Response.json({
running: this.state.running,
current_batch_size: this.state.current_batch_size,
current_interval_ms: this.state.current_interval_ms,
success_rate: Math.round(this.successRate() * 100),
completions_per_minute: this.recentCompletionsPerMinute(),
in_cooldown: Date.now() < this.state.cooldown_until,
projected_per_day: this.recentCompletionsPerMinute() * 60 * 24,
})
}
if (request.method === 'POST' && url.pathname === '/start') {
this.state.running = true
this.state.cooldown_until = 0
await this.persist()
await this.ctx.storage.setAlarm(Date.now() + 1000)
return Response.json({ ok: true, running: true })
}
if (request.method === 'POST' && url.pathname === '/stop') {
this.state.running = false
await this.persist()
return Response.json({ ok: true, running: false })
}
return new Response('Not found', { status: 404 })
}
Processors are the atoms. Composition patterns are the molecules. Because every processor has the same interface (input artifacts, output artifacts), you can combine them in several ways.
Linear Pipeline
The simplest pattern. One processor’s output becomes the next processor’s input:
Researcher → Generator → Reviewer → Publisher
Each processor queries the artifact store for its inputs. The pipeline emerges from the data, not from explicit wiring.
Fan-Out
One processor’s output feeds multiple downstream processors:
┌→ social-adapter → social-publisher
article-generator─┼→ email-adapter → email-publisher
└→ site-publisher (directly)
All three adapters/publishers query the same artifact store. The article-generator writes one article; three processors independently find it.
Conditional Routing
Route artifacts to different processors based on their state:
registerProcessor('quality-router', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
const reviews = await db.prepare(`
SELECT a.id, a.source_ids, json_extract(a.properties, '$.score') as score
FROM artifacts a
WHERE a.created_by = 'content-reviewer' AND a.status = 'ready'
AND a.id NOT IN (
SELECT json_each.value FROM artifacts r,
json_each(r.source_ids) WHERE r.created_by = 'quality-router'
)
LIMIT ?
`).bind(ctx.processor.batch_size).all()
const results: ProcessorResult[] = []
for (const review of reviews.results) {
const sourceIds = JSON.parse(review.source_ids as string)
const originalId = sourceIds[0]
const score = review.score as number
if (score >= 0.8) {
// High quality → ready for publishing
await db.prepare(`
UPDATE artifacts SET status = 'ready', updated_at = datetime('now') WHERE id = ?
`).bind(originalId).run()
} else if (score >= 0.5) {
// Medium quality → send to voice adapter for improvement
await db.prepare(`
UPDATE artifacts SET status = 'draft', updated_at = datetime('now'),
properties = json_set(COALESCE(properties, '{}'), '$.needs_revision', true)
WHERE id = ?
`).bind(originalId).run()
} else {
// Low quality → archive and let generator create fresh
await db.prepare(`
UPDATE artifacts SET status = 'archived', updated_at = datetime('now') WHERE id = ?
`).bind(originalId).run()
}
results.push({
processor: 'quality-router',
entity_type: 'routing',
entity_id: originalId,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: 5,
success: true,
result_summary: `Score ${score}: ${score >= 0.8 ? 'ready' : score >= 0.5 ? 'revise' : 'archive'}`,
})
}
return results
})
Fractal Composition: A Pipeline IS a Processor
The most powerful pattern. Processors are themselves composed of sub-processors. Same pattern at every level of the tree.
Book Generator
checks: "are all chapters ready?"
if yes → assemble → store
Chapter Generator
checks: "is there an approved outline?"
if yes → generate → store
Outline Generator
checks: "is there research for this topic?"
if yes → generate outline → store
No processor triggers another. No orchestration. Each one runs the same loop:
1. Check input store — is there work ready for me?
2. If yes → do my thing → write output to my store
3. If no → done (try again next cycle)
The book assembler, the chapter generator, the outline generator, the topic researcher — they all run this loop. The only difference is what “inputs ready” means and what “do my thing” produces.
registerProcessor('book-assembler', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
const { db } = ctx
// Find books with all chapters ready
const books = await db.prepare(`
SELECT group_id,
COUNT(*) as total,
SUM(CASE WHEN status = 'ready' THEN 1 ELSE 0 END) as ready
FROM artifacts
WHERE format = 'text' AND parent_id IS NOT NULL AND group_id IS NOT NULL
GROUP BY group_id
HAVING total = ready AND total > 0
`).all()
const results: ProcessorResult[] = []
for (const book of books.results) {
const chapters = await db.prepare(`
SELECT id FROM artifacts
WHERE group_id = ? AND format = 'text' AND parent_id IS NOT NULL
ORDER BY sequence ASC
`).bind(book.group_id).all()
// Assemble chapters into book artifact
let fullContent = ''
for (const ch of chapters.results) {
const content = await ctx.r2.get(`artifacts/${ch.id}/content.md`)
if (content) fullContent += await content.text() + '\n\n---\n\n'
}
const bookId = crypto.randomUUID()
await ctx.r2.put(`artifacts/${bookId}/content.md`, fullContent)
await db.prepare(`
INSERT INTO artifacts (id, format, status, title, group_id, word_count, created_by, source_ids, created_at, updated_at)
VALUES (?, 'text', 'draft', ?, ?, ?, 'book-assembler', ?, datetime('now'), datetime('now'))
`).bind(
bookId,
`Book: ${book.group_id}`,
book.group_id,
fullContent.split(/\s+/).length,
JSON.stringify(chapters.results.map(c => c.id)),
).run()
results.push({
processor: 'book-assembler',
entity_type: 'book',
entity_id: bookId,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: 100,
success: true,
result_summary: `Assembled ${chapters.results.length} chapters`,
})
}
return results
})
This eliminates:
- Orchestration / workflow engines between processors
- Event systems / triggers / webhooks
- Dependency graphs
- Failure cascading — if a chapter generator fails, the book generator simply doesn’t see all chapters ready yet. Nothing breaks. Next cycle it checks again.
The entire engine is N instances of this one loop, each checking its own store, each on its own schedule. Some run every minute, some daily, some weekly. The system converges toward completeness without coordination.
Pipeline Versioning
Pipelines are immutable configurations. To change a pipeline, create a new version:
interface PipelineVersion {
version: number
status: 'active' | 'draining' | 'inactive'
processors: ProcessorConfig[]
created_at: string
}
// Deployment flow:
// 1. Create new version with updated processors
// 2. Mark old version as 'draining' (finish in-flight, no new work)
// 3. Mark new version as 'active' (start accepting new work)
// 4. Once old version has no in-flight work, mark 'inactive'
// 5. If problems: reactivate old version (rollback)
async function deployPipelineVersion(
db: D1Database,
harnessId: number,
newProcessors: ProcessorConfig[],
): Promise<void> {
await db.prepare(`
UPDATE pipeline_versions SET status = 'draining'
WHERE harness_id = ? AND status = 'active'
`).bind(harnessId).run()
const currentMax = await db.prepare(`
SELECT MAX(version) as v FROM pipeline_versions WHERE harness_id = ?
`).bind(harnessId).first()
await db.prepare(`
INSERT INTO pipeline_versions (harness_id, version, status, processors_json, created_at)
VALUES (?, ?, 'active', ?, datetime('now'))
`).bind(harnessId, ((currentMax?.v as number) || 0) + 1, JSON.stringify(newProcessors)).run()
}
This architecture maps naturally to Cloudflare’s primitives. Each component has a clear home:
┌─────────────────────────────────────────────────────────┐
│ Worker (entry point) │
│ Routes HTTP requests to the correct Durable Object │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Durable Object: Pipeline Controller (control plane) │ │
│ │ - Alarm-based heartbeat (self-scheduling) │ │
│ │ - Budget governor │ │
│ │ - Phase-ordered processor execution │ │
│ │ - Adaptive batch size + interval │ │
│ │ - Config from D1 (dashboard-controllable) │ │
│ │ - HTTP API: /status, /start, /stop, /tick │ │
│ └──────────────┬──────────────────────────────────────┘ │
│ │ calls registered processors │
│ ┌──────────────▼──────────────────────────────────────┐ │
│ │ Processor Registry (data plane) │ │
│ │ - Pure functions registered by name │ │
│ │ - Each processor reads/writes artifact store │ │
│ │ - No awareness of other processors │ │
│ └──────────────┬──────────────────────────────────────┘ │
│ │ reads/writes │
│ ┌──────────────▼──────────────────────────────────────┐ │
│ │ Storage │ │
│ │ D1 — artifacts table, edges table, processor log │ │
│ │ R2 — large content bodies (articles, images) │ │
│ │ KV — hot cache for frequently accessed artifacts │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Queues (inter-pipeline communication) │ │
│ │ - Pipeline A publishes artifacts to a queue │ │
│ │ - Pipeline B consumes and processes them │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Workflows (multi-step processor chains) │ │
│ │ - Long-running processor execution │ │
│ │ - Automatic retry with state persistence │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Durable Object as Control Plane
The DO is the singleton that governs execution. One DO per pipeline. It owns:
- Self-scheduling: alarms fire the heartbeat. No cron, no external trigger.
- State: current cycle, budget spent, processor stats. Persisted in DO storage.
- Config: loaded from D1 every tick, so a dashboard can change behavior live.
- Identity:
idFromName('pipeline-slug')— deterministic, stable, resumable. - Location hint: pin to a specific colo if latency to upstream APIs matters.
export class PipelineController extends DurableObject<Env> {
private state: PipelineState = DEFAULT_STATE
private config: HarnessConfig | null = null
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env)
ctx.blockConcurrencyWhile(async () => {
const saved = await ctx.storage.get<PipelineState>('state')
if (saved) this.state = { ...DEFAULT_STATE, ...saved }
// Resume alarm on wake
if (this.state.running) {
const existing = await ctx.storage.getAlarm()
if (!existing) {
await ctx.storage.setAlarm(Date.now() + 5000)
}
}
})
}
async alarm() {
if (!this.state.running) return
try {
// Load config from DB every tick (dashboard can change it live)
this.config = await this.loadConfig()
if (!this.config) {
this.state.error_message = 'Config not found'
this.state.running = false
await this.save()
return
}
if (!this.state.current_cycle) {
await this.startCycle()
}
const done = await this.tick()
if (done) {
await this.finalizeCycle()
await this.ctx.storage.setAlarm(Date.now() + this.config.cycle_interval_ms)
} else {
await this.ctx.storage.setAlarm(Date.now() + 2000) // More work soon
}
await this.save()
} catch (err: any) {
this.state.error_message = err.message
await this.save()
await this.ctx.storage.setAlarm(Date.now() + 60_000) // Retry in 60s
}
}
}
Queues for Cross-Pipeline Communication
When pipelines need to communicate (e.g., “content pipeline produced articles, SEO pipeline should check them”), Queues bridge them:
// Content pipeline's publisher writes to a queue
await ctx.env.SEO_QUEUE.send({
type: 'new_article',
artifact_id: articleId,
niche: article.niche,
})
// SEO pipeline's worker consumes the queue
export default {
async queue(batch: MessageBatch, env: Env): Promise<void> {
for (const msg of batch.messages) {
const { artifact_id } = msg.body as { artifact_id: string }
// Copy artifact reference into SEO pipeline's scope
await env.DB.prepare(`
INSERT INTO artifacts (id, format, status, tags, source_ids, created_by, created_at, updated_at)
VALUES (?, 'data', 'ready', '["seo-check-needed"]', ?, 'queue-consumer', datetime('now'), datetime('now'))
`).bind(crypto.randomUUID(), JSON.stringify([artifact_id])).run()
msg.ack()
}
},
}
D1 for Everything Queryable
D1 is the source of truth for artifacts, edges, processor configs, and telemetry. SQLite foundation means recursive CTEs work for graph traversal, JSON functions work for tag queries, and it costs nearly nothing.
R2 for Large Content
Article bodies, generated images, and raw research data go in R2. Artifact records in D1 reference R2 keys. D1 rows stay small (metadata only); R2 holds arbitrarily large content.
Workflows for Multi-Step Processors
Some processors need more than 30 seconds of CPU time. Cloudflare Workflows provide multi-step execution with automatic retry:
import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'
export class ArticleGenerationWorkflow extends WorkflowEntrypoint<Env, ArticleParams> {
async run(event: WorkflowEvent<ArticleParams>, step: WorkflowStep) {
const research = await step.do('fetch-research', async () => {
return await fetchResearch(event.payload.topic)
})
const outline = await step.do('generate-outline', async () => {
return await generateOutline(research)
})
const article = await step.do('generate-article', async () => {
return await generateArticle(outline, research)
})
await step.do('store-artifact', async () => {
await storeArtifact(article)
})
}
}
Keyword Research Pipeline
The simplest useful pipeline. A researcher gathers keyword data, a generator creates articles from high-value keywords.
const keywordPipeline: ProcessorConfig[] = [
{ name: 'keyword-researcher', type: 'deterministic', enabled: true, phase: 'gather', weight: 0.15, max_cost_per_call: 0.01, batch_size: 20 },
{ name: 'article-generator', type: 'llm', enabled: true, phase: 'generate', weight: 0.70, max_cost_per_call: 0.10, batch_size: 3, model_tier: 'expensive' },
{ name: 'artifact-storer', type: 'deterministic', enabled: true, phase: 'collect', weight: 0.05, max_cost_per_call: 0, batch_size: 50 },
]
Data flow:
keyword-researcher: queries DataForSEO → stores keyword data artifacts
article-generator: queries keyword artifacts → generates articles → stores as drafts
artifact-storer: moves large content to R2, updates metadata
Content Quality Gate
Generator creates content, reviewer evaluates it, router sends it to publishing or back for revision.
const qualityGatePipeline: ProcessorConfig[] = [
{ name: 'article-generator', type: 'llm', enabled: true, phase: 'generate', weight: 0.45, max_cost_per_call: 0.10, batch_size: 3, model_tier: 'expensive' },
{ name: 'content-reviewer', type: 'llm', enabled: true, phase: 'review', weight: 0.15, max_cost_per_call: 0.02, batch_size: 10, model_tier: 'default' },
{ name: 'quality-router', type: 'deterministic', enabled: true, phase: 'review', weight: 0.05, max_cost_per_call: 0, batch_size: 20 },
{ name: 'site-publisher', type: 'deterministic', enabled: true, phase: 'publish', weight: 0.05, max_cost_per_call: 0, batch_size: 10 },
]
The gate is implicit: the reviewer scores articles and updates their status. The publisher only picks up articles with status ready. Articles that score below threshold stay as draft — the generator will improve them on the next cycle.
Social Media Adaptation
Collect engagement data, adapt articles into social posts, review, publish:
const socialPipeline: ProcessorConfig[] = [
{ name: 'engagement-collector', type: 'deterministic', enabled: true, phase: 'gather', weight: 0.10, max_cost_per_call: 0, batch_size: 50 },
{ name: 'social-adapter', type: 'llm', enabled: true, phase: 'adapt', weight: 0.30, max_cost_per_call: 0.03, batch_size: 5, model_tier: 'default' },
{ name: 'content-reviewer', type: 'llm', enabled: true, phase: 'review', weight: 0.15, max_cost_per_call: 0.02, batch_size: 10, model_tier: 'default' },
{ name: 'social-publisher', type: 'deterministic', enabled: true, phase: 'publish', weight: 0.05, max_cost_per_call: 0, batch_size: 20 },
]
Full SEO Article Pipeline
End-to-end: research, generate, review, adapt for web, publish, collect engagement:
const seoPipeline: ProcessorConfig[] = [
// Gather
{ name: 'keyword-researcher', phase: 'gather', type: 'deterministic', weight: 0.05, batch_size: 20, enabled: true, max_cost_per_call: 0.01 },
{ name: 'topic-researcher', phase: 'gather', type: 'deterministic', weight: 0.10, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
// Generate
{ name: 'article-generator', phase: 'generate', type: 'llm', model_tier: 'expensive', weight: 0.35, batch_size: 3, enabled: true, max_cost_per_call: 0.10 },
// Adapt
{ name: 'seo-optimizer', phase: 'adapt', type: 'llm', model_tier: 'default', weight: 0.10, batch_size: 5, enabled: true, max_cost_per_call: 0.03 },
// Review
{ name: 'content-reviewer', phase: 'review', type: 'llm', model_tier: 'default', weight: 0.10, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
{ name: 'seo-reviewer', phase: 'review', type: 'llm', model_tier: 'default', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
// Publish
{ name: 'site-publisher', phase: 'publish', type: 'deterministic', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0 },
// Collect
{ name: 'engagement-collector', phase: 'collect', type: 'deterministic', weight: 0.05, batch_size: 50, enabled: true, max_cost_per_call: 0 },
{ name: 'analytics-collector', phase: 'collect', type: 'deterministic', weight: 0.05, batch_size: 50, enabled: true, max_cost_per_call: 0 },
]
Self-Improving Feedback Loop
The collector feeds engagement data back to the researcher, which informs the next generation cycle. No code change needed — the loop emerges from the data:
Cycle 1: researcher gathers topics → generator creates articles → publisher deploys
Cycle 2: collector gathers engagement → researcher sees "fintech gets 3x clicks"
→ generator prioritizes fintech → better articles
Cycle 3: collector sees even better engagement → researcher deepens fintech research
→ generator produces expert content → ...
// In topic-researcher: use engagement data to prioritize
registerProcessor('smart-topic-researcher', async (ctx) => {
// Find which niches have the best engagement
const topNiches = await ctx.db.prepare(`
SELECT a.niche, AVG(CAST(json_extract(e.properties, '$.clicks') AS REAL)) as avg_clicks
FROM artifacts e
JOIN edges ON edges.target_id = e.id
JOIN artifacts a ON edges.source_id = a.id
WHERE e.created_by = 'engagement-collector'
AND a.format = 'text'
GROUP BY a.niche
ORDER BY avg_clicks DESC
LIMIT 5
`).all()
const results: ProcessorResult[] = []
for (const niche of topNiches.results) {
const research = await deepResearch(niche.niche as string, ctx)
// Store enhanced research...
results.push({
processor: 'smart-topic-researcher',
entity_type: 'research',
entity_id: crypto.randomUUID(),
model_used: null,
tokens_input: 0, tokens_output: 0,
cost_usd: 0.005,
duration_ms: 2000,
success: true,
result_summary: `Deep research on ${niche.niche} (avg ${niche.avg_clicks} clicks)`,
})
}
return results
})
A/B Testing Pipeline
Generate two versions with different models, publish both, collect engagement, compare:
const abTestPipeline: ProcessorConfig[] = [
{ name: 'keyword-researcher', phase: 'gather', type: 'deterministic', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0.01 },
// Two generators with different models
{ name: 'article-gen-a', phase: 'generate', type: 'llm', model_tier: 'expensive', weight: 0.25, batch_size: 3, enabled: true, max_cost_per_call: 0.10 },
{ name: 'article-gen-b', phase: 'generate', type: 'llm', model_tier: 'default', weight: 0.15, batch_size: 3, enabled: true, max_cost_per_call: 0.05 },
{ name: 'content-reviewer', phase: 'review', type: 'llm', model_tier: 'default', weight: 0.10, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
{ name: 'ab-publisher', phase: 'publish', type: 'deterministic', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0 },
{ name: 'engagement-collector', phase: 'collect', type: 'deterministic', weight: 0.05, batch_size: 50, enabled: true, max_cost_per_call: 0 },
{ name: 'ab-comparator', phase: 'collect', type: 'deterministic', weight: 0.05, batch_size: 20, enabled: true, max_cost_per_call: 0 },
]
// The comparator processor
registerProcessor('ab-comparator', async (ctx) => {
// Find publish records from both generators
const results_a = await ctx.db.prepare(`
SELECT p.id as publish_id, a.niche, a.title,
json_extract(eng.properties, '$.clicks') as clicks,
json_extract(eng.properties, '$.views') as views
FROM artifacts a
JOIN edges e1 ON e1.source_id = a.id AND e1.relation = 'published_to'
JOIN artifacts p ON p.id = e1.target_id
JOIN edges e2 ON e2.source_id = p.id
JOIN artifacts eng ON eng.id = e2.target_id AND eng.created_by = 'engagement-collector'
WHERE a.created_by = 'article-gen-a'
`).all()
const results_b = await ctx.db.prepare(`
SELECT p.id as publish_id, a.niche, a.title,
json_extract(eng.properties, '$.clicks') as clicks,
json_extract(eng.properties, '$.views') as views
FROM artifacts a
JOIN edges e1 ON e1.source_id = a.id AND e1.relation = 'published_to'
JOIN artifacts p ON p.id = e1.target_id
JOIN edges e2 ON e2.source_id = p.id
JOIN artifacts eng ON eng.id = e2.target_id AND eng.created_by = 'engagement-collector'
WHERE a.created_by = 'article-gen-b'
`).all()
// Compare by niche
const a_avg = results_a.results.reduce((s, r) => s + (r.clicks as number), 0) / (results_a.results.length || 1)
const b_avg = results_b.results.reduce((s, r) => s + (r.clicks as number), 0) / (results_b.results.length || 1)
const winner = a_avg > b_avg ? 'article-gen-a' : 'article-gen-b'
// Store comparison result
const compId = crypto.randomUUID()
await ctx.db.prepare(`
INSERT INTO artifacts (id, format, status, title, tags, created_by, properties, created_at, updated_at)
VALUES (?, 'data', 'ready', ?, '["ab-test", "comparison"]', 'ab-comparator', ?, datetime('now'), datetime('now'))
`).bind(
compId,
`A/B Result: gen-a vs gen-b`,
JSON.stringify({ winner, a_avg_clicks: a_avg, b_avg_clicks: b_avg, a_samples: results_a.results.length, b_samples: results_b.results.length }),
).run()
return [{
processor: 'ab-comparator',
entity_type: 'comparison',
entity_id: compId,
model_used: null,
tokens_input: 0, tokens_output: 0, cost_usd: 0,
duration_ms: 50,
success: true,
result_summary: `Winner: ${winner} (${a_avg.toFixed(1)} vs ${b_avg.toFixed(1)} avg clicks)`,
}]
})
vs LangChain / LangGraph
LangChain chains are imperative: each step calls the next. LangGraph adds a state machine but steps are still coupled by explicit edges. Change one node and you must update the edges.
Composable processors have no explicit edges. Processors discover work through queries. Add a new processor and it starts finding inputs immediately. Remove one and the rest don’t notice.
| LangChain/LangGraph | Composable Processors | |
|---|---|---|
| Coupling | Explicit edges between steps | None — query-based discovery |
| Adding a step | Update chain/graph definition | Register processor, done |
| Failure handling | Try/catch per step, retry chains | Next cycle checks again |
| Cost tracking | Manual instrumentation | Built into ProcessorResult |
| Accumulation | Each run starts fresh | Store compounds over time |
| Scheduling | External trigger or streaming | Self-driving Durable Objects |
| Deployment | Python server (centralized) | Cloudflare edge (global, serverless) |
When to choose LangChain/LangGraph: Complex agent reasoning with tool use, human-in-the-loop, or building a chatbot. LangGraph excels at multi-turn agent workflows.
When to choose Composable Processors: Production content pipeline that runs autonomously, manages its own budget, adapts to failures, and compounds knowledge over time.
vs Apache Airflow / Prefect
DAG-based orchestrators. All steps and dependencies must be defined upfront. The DAG is the bottleneck — every change touches it.
| Airflow/Prefect | Composable Processors | |
|---|---|---|
| Orchestration | Central DAG scheduler | None — each processor self-schedules |
| Dependencies | Declared in DAG definition | Implicit via store queries |
| Adding a step | Modify DAG, update dependencies | Register processor |
| Runtime | Scheduled batch jobs | Continuous, each on own cadence |
| State | Task-level (pass between steps) | Persistent store (compounds) |
| Infrastructure | Server-based ($50-500+/mo) | Serverless (sub-$5/mo) |
When to choose Airflow/Prefect: Complex data engineering workflows, visual DAG editor needed, or already on Kubernetes.
When to choose Composable Processors: Lightweight, self-adaptive system with built-in budget management running at the edge.
vs Temporal Workflows
Durable execution with automatic retries and replay. Powerful, but still imperative — you define a sequence of activities. The workflow function is the coupling point.
| Temporal | Composable Processors | |
|---|---|---|
| Execution model | Imperative workflow function | Independent polling loops |
| Durability | Workflow history replay | Store persistence (D1/R2) |
| Coupling | Activities called by workflow | None between processors |
| Scaling | Worker pools per task queue | Per-processor batch sizing |
| Cost tracking | Custom instrumentation | Built into every result |
| Infrastructure | Temporal Cloud or self-hosted cluster | Serverless (Cloudflare Workers) |
When to choose Temporal: Guaranteed exactly-once execution, complex saga patterns, or workflows spanning multiple services that must not lose state.
When to choose Composable Processors: Durability benefits without operational complexity. DOs provide single-writer durability; the artifact store provides crash recovery.
vs AWS Step Functions
AWS’s serverless workflow orchestration.
| AWS Step Functions | Composable Processors | |
|---|---|---|
| Definition | Amazon States Language (JSON) | ProcessorConfig array |
| Execution | Push-based (state machine drives) | Pull-based (processors query for work) |
| Pricing | $0.025 per 1,000 transitions | Workers pricing (~$0.50/M requests) |
| Latency | Region-bound | Edge (global, < 50ms) |
| Adaptivity | Manual (Choice states + CloudWatch) | Built-in (success-rate driven) |
| Budget management | Not built-in | Per-processor weight allocation |
When to choose Step Functions: Already in AWS ecosystem, need visual workflow editing, native Lambda/SQS/DynamoDB integration.
When to choose Composable Processors: Edge execution, sub-cent pricing, adaptive scheduling, simpler programming model.
vs Simple Sequential Scripts
The honest comparison. Many AI content pipelines start as a single script:
const research = await callPerplexity(topic)
const article = await callGemini(research)
const review = await callClaude(article)
if (review.score > 0.7) await publishToSite(article)
| Sequential Script | Composable Processors | |
|---|---|---|
| Simplicity | Dead simple | More upfront structure |
| Resumability | Start over on failure | Crash-safe (artifacts persist) |
| Cost tracking | Manual or none | Built-in per-processor |
| Adaptivity | Manual tuning | Automatic |
| Reusability | Copy-paste | Processors compose freely |
| Feedback loops | Require explicit wiring | Emerge from data |
When to choose the script: Prototyping, fewer than three steps, or running manually once a week. Don’t over-engineer.
When to choose Composable Processors: Pipeline has grown past 3-4 steps, needs to run autonomously, cost matters, or multiple pipelines share processors.
vs Microservices with REST APIs
| Microservices + REST | Composable Processors | |
|---|---|---|
| Coupling | Coupled by API contracts | Decoupled by data (artifact store) |
| Failure handling | Cascading failures, circuit breakers needed | Processor retries independently |
| Discovery | Service registry / DNS | Query the store |
| Scaling | Per-service replicas | Per-processor batch size |
| Deployment | N services, N deployments | Single Worker + DO |
| Latency | Network hops between services | In-process function calls |
When to choose microservices: Processors in different languages, different teams, or need independent scaling beyond what a single Worker provides.
When to choose Composable Processors: All TypeScript, small team, and the overhead of N separate deployments isn’t justified.
Processors That Know About Other Processors
The most common mistake. A generator that imports and calls a reviewer, or a publisher that triggers a collector.
// BAD: Generator calls reviewer directly
registerProcessor('article-generator', async (ctx) => {
const article = await generate(ctx)
const review = await getProcessor('content-reviewer')!(reviewCtx) // Don't do this
if (review.score < 0.7) return regenerate(ctx)
})
// GOOD: Generator writes to store. Reviewer finds it on its own schedule.
registerProcessor('article-generator', async (ctx) => {
const article = await generate(ctx)
await storeArtifact(article, { status: 'draft' })
return [/* result */]
})
Putting Business Logic in the Orchestrator
The control plane governs when and with what budget. It should not contain business logic about what to do.
// BAD: Business logic in the harness
async tick() {
if (article.niche === 'fintech') {
await runProcessor('premium-generator', article)
} else {
await runProcessor('basic-generator', article)
}
}
// GOOD: Processor decides its own logic
registerProcessor('smart-generator', async (ctx) => {
const model = ctx.selectModel() // Control plane handles model routing
// Generator decides its own approach based on artifact data
})
Artifacts Without Metadata (Unlabeled Ingredients)
Storing content without rich metadata makes the store useless. Future processors can’t find what they need.
// BAD: Minimal metadata — impossible to query meaningfully
await db.prepare(`
INSERT INTO artifacts (id, format, status, created_by, created_at, updated_at)
VALUES (?, 'text', 'ready', 'generator', datetime('now'), datetime('now'))
`).bind(id).run()
// GOOD: Rich metadata enables discovery by any future processor
await db.prepare(`
INSERT INTO artifacts
(id, format, status, title, summary, tags, niche, voice, word_count,
created_by, source_ids, properties, created_at, updated_at)
VALUES (?, 'text', 'ready', ?, ?, ?, ?, ?, ?,
'generator', ?, ?, datetime('now'), datetime('now'))
`).bind(id, title, summary, tags, niche, voice, wordCount, sourceIds, properties).run()
Skipping the Review Step
LLMs produce confident-sounding garbage regularly. A review step (even a cheap one) catches hallucinations, off-brand content, and structural problems before they reach your audience. The review step also creates valuable feedback data that improves generators over time.
Monolithic Processors That Do Too Much
A processor that researches, generates, reviews, and publishes in one function is just a sequential script with extra steps.
// BAD: One processor does everything
registerProcessor('do-everything', async (ctx) => {
const research = await fetchResearch()
const article = await generate(research)
const review = await reviewArticle(article)
if (review.passed) await publish(article)
return [/* result */]
})
// GOOD: Four processors, each with one job
registerProcessor('researcher', async (ctx) => { /* gather */ })
registerProcessor('generator', async (ctx) => { /* create */ })
registerProcessor('reviewer', async (ctx) => { /* evaluate */ })
registerProcessor('publisher', async (ctx) => { /* deploy */ })
Tightly Coupled Processor Ordering
If your pipeline breaks when you reorder processors, they’re too coupled. Each processor should query the artifact store, not assume a specific predecessor.
// BAD: Assumes keyword-researcher ran in the previous tick
registerProcessor('article-generator', async (ctx) => {
const keywords = ctx.cycle.keyword_results // Coupled to cycle state
})
// GOOD: Queries the store for what's available
registerProcessor('article-generator', async (ctx) => {
const keywords = await ctx.db.prepare(`
SELECT * FROM artifacts
WHERE format = 'data' AND created_by = 'keyword-researcher' AND status = 'ready'
LIMIT ?
`).bind(ctx.processor.batch_size).all()
// Works regardless of when or whether the researcher ran
})
Labeling Artifacts by Purpose
// BAD: Artifact knows it's a "blog post"
{ format: 'text', tags: ['blog-post'], destination: 'website' }
// GOOD: Artifact describes what it IS, not what it's FOR
{ format: 'text', tags: ['productivity', 'tools'], niche: 'productivity-apps', word_count: 1200, voice: 'professional' }
// The publisher decides this is a blog post. An adapter could also make it an email body.
Using Events Between Processors
// BAD: Events create coupling
eventBus.on('article-published', () => collectEngagement()) // Publisher knows collector exists
// GOOD: Poll-based discovery
// Collector queries store: "any published artifacts I haven't collected engagement for?"
// Publisher has no idea collectors exist. Collectors have no idea publishers exist.
Cloudflare Platform
- Durable Objects — Single-writer, globally unique objects. The primitive behind the control plane: self-scheduling, budget governance, adaptive execution.
- Cloudflare Queues — At-least-once message delivery between Workers. Inter-pipeline communication when processors span multiple services.
- Cloudflare Workflows — Multi-step, durable execution engine. For processors that need more than 30 seconds of CPU time or automatic retry with state persistence.
- D1 Database — SQLite at the edge. Artifact store, edges table, processor registry, and telemetry log. Supports recursive CTEs for graph traversal.
- R2 Storage — S3-compatible object storage with zero egress fees. Large artifact content (article bodies, images, raw research data).
- Cloudflare Agents SDK — Framework for building AI agents on Cloudflare. Extends Durable Objects with agent-specific patterns.
- Agents SDK Source — Open source implementation of the Cloudflare Agents SDK.
Orchestration & Workflow Systems
- LangGraph — Stateful, multi-actor orchestration for LLM applications. Adds cycles, branching, and persistence to LangChain.
- Prefect — Modern workflow orchestration. Pythonic API, hybrid execution model, native async support.
- Temporal — Durable execution engine for long-running, reliable workflows. Guarantees exactly-once execution with automatic retries.
- Apache Airflow — The original workflow orchestration platform. DAG-based, battle-tested, extensive ecosystem of operators and providers.
Architecture & Patterns
- Event-Driven.io — Oskar Dudycz’s patterns for event-driven systems. Influenced the artifact lifecycle and edge-based lineage design.
- Watermill — Go library for working with message streams. Consumer middleware pattern inspired the processor registry’s approach.
- Emmett — Event sourcing library implementing the Decide/Evolve/Project pattern. Influenced the separation between control plane (decide), artifact mutations (evolve), and store queries (project).
- Enterprise Integration Patterns — Gregor Hohpe and Bobby Woolf’s foundational catalog. The composable processor architecture draws from Content-Based Router, Pipes and Filters, and Message Store.
- Microservices Patterns — Chris Richardson’s catalog. Saga, Event Sourcing, and CQRS patterns inform the artifact store design and cross-pipeline coordination.
MIT