Skip to content
Gary Wu
Go back

Composable Processor Architecture for AI Content Pipelines

Edit page

Org Status: 🟡 Dormant Cloudflare: N/A Last Audited: 2026-04-28


registerProcessor('topic-researcher', async (ctx) => {
  const pending = await ctx.db.prepare(
    "SELECT * FROM artifacts WHERE format = 'data' AND status = 'pending' AND niche = ?"
  ).bind(ctx.config.niche).all()

  for (const item of pending.results) {
    const research = await fetchResearch(item.topic, ctx)
    await ctx.db.prepare(
      "INSERT INTO artifacts (id, format, status, tags, created_by, created_at, updated_at) VALUES (?, 'data', 'ready', ?, 'topic-researcher', datetime('now'), datetime('now'))"
    ).bind(crypto.randomUUID(), JSON.stringify([item.topic])).run()
  }

  return pending.results.map(item => ({
    processor: 'topic-researcher',
    entity_id: item.id,
    success: true,
    cost_usd: 0.02,
    tokens_input: 500,
    tokens_output: 1200,
    duration_ms: 3400,
    entity_type: 'artifact',
    model_used: 'gemini-2.5-flash',
  }))
})

That’s a complete processor. It checks for work, does it, stores the result, reports cost. It doesn’t know what runs before it or after it. It doesn’t trigger anything. It just enriches the store.



AI pipelines are typically built as imperative chains:

research → generate → review → publish

Every step calls the next. This creates five problems that compound as the system grows.

Tight coupling. Step 2 knows step 1’s output shape. Change the researcher and the generator breaks.

Fragile chains. Step 3 fails and everything stops. No partial progress. Retry means re-running the whole chain.

No compounding. Each run starts fresh. A researcher that ran 100 times has the same effect as one that ran once — the output of the last run is all that matters.

Budget blindness. No cost control until you get the bill. LLM calls are fire-and-forget. One runaway generation burns through your monthly budget in an hour.

Temporal coupling. Everything must run at once. The researcher must finish before the generator starts. The generator must finish before the reviewer starts. One slow step blocks everything downstream.

// The imperative approach — every problem at once
const research = await callResearcher(topic)       // step 1 blocks
const article = await callGenerator(research)      // coupled to step 1's shape
const reviewed = await callReviewer(article)        // fails = start over
await callPublisher(reviewed)                       // no budget tracking anywhere

The fix isn’t better error handling. It’s a different architecture.


Processors don’t call each other. They share nothing at runtime. They communicate through their stores.

A researcher accumulating intelligence for 3 months produces fundamentally different output than one triggered on-demand. The architecture is the same — the depth changes.

One loop, everywhere:

1. Check input store — is there work ready for me?
2. If yes → do my thing → write output to my store
3. If no → done (try again next cycle)

That’s it. Every processor in the system — researcher, generator, adapter, reviewer, publisher, collector — runs this same loop. The only difference is what “inputs ready” means and what “do my thing” produces.

No orchestrator. No event bus between processors. No dependency graph. Each processor polls its store on its own schedule. The system converges toward completeness without coordination.


Core Interface

All seven processor types share one interface. The type system enforces the contract:

interface Processor<TInput = Artifact, TOutput = Artifact> {
  /** Unique name used for registry lookup */
  name: string

  /** One of the seven types */
  type: ProcessorType

  /** Determines execution ordering within a cycle */
  phase: Phase

  /**
   * The processor function itself.
   * Receives context (artifact refs, config, API clients).
   * Returns new or modified artifacts.
   * Must be pure relative to external state — all side effects go through ctx.
   */
  process(ctx: ProcessorContext, input: TInput[]): Promise<TOutput[]>
}

type ProcessorType =
  | 'researcher'
  | 'generator'
  | 'adapter'
  | 'reviewer'
  | 'publisher'
  | 'collector'
  | 'storer'

type Phase = 'gather' | 'generate' | 'adapt' | 'review' | 'publish' | 'collect'

1. Researcher — Gathers External Data

Researchers pull information from outside the system and store it as data artifacts. They don’t create content — they create the raw material that generators will later use.

VariantWhat it accumulatesPersistence
Topic researchstructured findings by niche/topicD1 + R2
Keyword researchvolume, difficulty, trends, SERP dataD1
Competitor scancompetitor content, gaps, positioningD1 + R2
Trend detectiontrending topics, velocity, lifecycle stageD1
Web scraperaw extracted data from any sourceR2

The longer it runs, the deeper the knowledge base. A researcher that’s been scanning a niche for 3 months knows things a fresh run never will.

import { registerProcessor, ProcessorContext, ProcessorResult } from './registry'

registerProcessor('keyword-researcher', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db, config } = ctx

  // Find topics that need keyword research
  const topics = await db.prepare(`
    SELECT id, title, niche FROM artifacts
    WHERE format = 'data' AND status = 'ready'
      AND tags LIKE '%topic%'
      AND id NOT IN (
        SELECT DISTINCT json_each.value FROM artifacts a,
        json_each(a.source_ids) WHERE a.created_by = 'keyword-researcher'
      )
    ORDER BY created_at DESC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  const results: ProcessorResult[] = []

  for (const topic of topics.results) {
    const startTime = Date.now()
    try {
      // Call external API (DataForSEO, Perplexity, etc.)
      const keywords = await fetchKeywordData(topic.title, topic.niche, ctx)

      // Store as a new data artifact
      await db.prepare(`
        INSERT INTO artifacts (id, format, status, title, tags, niche, created_by, source_ids, properties, created_at, updated_at)
        VALUES (?, 'data', 'ready', ?, ?, ?, 'keyword-researcher', ?, ?, datetime('now'), datetime('now'))
      `).bind(
        crypto.randomUUID(),
        `Keywords: ${topic.title}`,
        JSON.stringify(['keywords', 'research', topic.niche]),
        topic.niche,
        JSON.stringify([topic.id]),
        JSON.stringify({ keyword_count: keywords.length, source: 'dataforseo' }),
      ).run()

      results.push({
        processor: 'keyword-researcher',
        entity_type: 'keyword_research',
        entity_id: topic.id,
        model_used: null,
        tokens_input: 0,
        tokens_output: 0,
        cost_usd: 0.002,
        duration_ms: Date.now() - startTime,
        success: true,
        result_summary: `Found ${keywords.length} keywords for ${topic.title}`,
      })
    } catch (err: any) {
      results.push({
        processor: 'keyword-researcher',
        entity_type: 'keyword_research',
        entity_id: topic.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

2. Generator — Creates Content

Generators are the LLM-calling processors. They query the artifact store for research data and produce content artifacts. A generator doesn’t know or care where the output goes — an article is an article whether it becomes a blog post, email body, YouTube script, or pSEO page.

VariantWhat it createsPersistence
Articlelong-form text, any lengthD1 + R2
Postshort-form text, social-lengthD1
ImagePNG/SVG compositesR2
Videoshort-form videoR2
Page (pSEO)structured page content from templatesD1
Email bodyemail HTML/text with CTAD1

Quality depends on what’s in the store. A generator pulling from 3 months of accumulated research produces fundamentally different output than one pulling from yesterday’s snapshot.

registerProcessor('article-generator', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx
  const model = ctx.selectModel() // Respects budget + tier config
  const results: ProcessorResult[] = []

  // Find topics with keyword research but no generated article
  const topics = await db.prepare(`
    SELECT a.id, a.title, a.niche, a.properties
    FROM artifacts a
    WHERE a.format = 'data' AND a.status = 'ready'
      AND a.created_by = 'keyword-researcher'
      AND a.id NOT IN (
        SELECT json_each.value FROM artifacts g,
        json_each(g.source_ids) WHERE g.created_by = 'article-generator'
      )
    ORDER BY a.created_at DESC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  for (const topic of topics.results) {
    const budget = ctx.remainingBudget()
    if (budget <= 0) break // Respect budget allocation

    const startTime = Date.now()
    try {
      const article = await generateArticle(model, topic, ctx)

      // Store the generated article as a text artifact
      const articleId = crypto.randomUUID()
      await db.prepare(`
        INSERT INTO artifacts
          (id, format, status, title, summary, tags, niche, author, word_count, created_by, source_ids, created_at, updated_at)
        VALUES (?, 'text', 'draft', ?, ?, ?, ?, ?, ?, 'article-generator', ?, datetime('now'), datetime('now'))
      `).bind(
        articleId, article.title, article.summary,
        JSON.stringify(article.tags), topic.niche,
        'article-generator', article.wordCount,
        JSON.stringify([topic.id]),
      ).run()

      // Store full content in R2 (too large for D1)
      await ctx.r2.put(`artifacts/${articleId}/content.md`, article.content)

      results.push({
        processor: 'article-generator',
        entity_type: 'article',
        entity_id: articleId,
        model_used: model,
        tokens_input: article.tokensIn,
        tokens_output: article.tokensOut,
        cost_usd: article.cost,
        duration_ms: Date.now() - startTime,
        success: true,
        result_summary: `Generated "${article.title}" (${article.wordCount} words)`,
      })
    } catch (err: any) {
      results.push({
        processor: 'article-generator',
        entity_type: 'article',
        entity_id: topic.id,
        model_used: model,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

3. Adapter — Transforms Between Formats

Adapters reshape content without creating new ideas. Two sub-types:

Voice adapter — applies brand personality (tone, vocabulary, persona, style rules) to any content. Input: raw content + brand config. Output: same content in brand voice.

Format adapter — reshapes for platform constraints (character limits, hashtag conventions, media requirements). Input: content + platform rules. Output: platform-ready content.

Adapters are stateless transforms. They don’t accumulate. Config store only (brand voices, platform rule sets).

registerProcessor('social-adapter', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx
  const model = ctx.selectModel()
  const results: ProcessorResult[] = []

  // Find articles that haven't been adapted for social yet
  const articles = await db.prepare(`
    SELECT a.id, a.title, a.summary, a.tags, a.niche
    FROM artifacts a
    WHERE a.format = 'text' AND a.status IN ('ready', 'reviewed')
      AND a.created_by IN ('article-generator', 'voice-adapter')
      AND a.id NOT IN (
        SELECT json_each.value FROM artifacts s,
        json_each(s.source_ids) WHERE s.created_by = 'social-adapter'
      )
    ORDER BY a.created_at DESC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  for (const article of articles.results) {
    const startTime = Date.now()
    try {
      // Read full content from R2
      const content = await ctx.r2.get(`artifacts/${article.id}/content.md`)
      const text = content ? await content.text() : article.summary

      // Generate multiple social posts from one article
      const posts = await adaptForSocial(model, { ...article, content: text }, ctx)

      for (const post of posts) {
        const postId = crypto.randomUUID()
        await db.prepare(`
          INSERT INTO artifacts
            (id, format, status, title, tags, niche, voice, word_count, created_by, source_ids, properties, created_at, updated_at)
          VALUES (?, 'text', 'draft', ?, ?, ?, ?, ?, 'social-adapter', ?, ?, datetime('now'), datetime('now'))
        `).bind(
          postId, post.title,
          JSON.stringify([...JSON.parse(article.tags), 'social', post.platform]),
          article.niche, post.voice, post.wordCount,
          JSON.stringify([article.id]),
          JSON.stringify({ platform: post.platform, char_count: post.charCount }),
        ).run()

        // Record edge for lineage tracking
        await db.prepare(`
          INSERT INTO edges (source_id, target_id, relation, created_at)
          VALUES (?, ?, 'adapted_from', datetime('now'))
        `).bind(article.id, postId).run()
      }

      results.push({
        processor: 'social-adapter',
        entity_type: 'social_posts',
        entity_id: article.id,
        model_used: model,
        tokens_input: posts.reduce((s, p) => s + p.tokensIn, 0),
        tokens_output: posts.reduce((s, p) => s + p.tokensOut, 0),
        cost_usd: posts.reduce((s, p) => s + p.cost, 0),
        duration_ms: Date.now() - startTime,
        success: true,
        result_summary: `Created ${posts.length} social posts from "${article.title}"`,
      })
    } catch (err: any) {
      results.push({
        processor: 'social-adapter',
        entity_type: 'social_posts',
        entity_id: article.id,
        model_used: model,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

4. Reviewer — Evaluates Quality

Quality gate. Evaluates content against criteria. Can reject with feedback, triggering re-generation.

VariantWhat it checks
Brand compliancevoice, tone, guidelines adherence
SEO checkkeyword usage, structure, meta
Factual checkclaims vs source material

Persists review history — what passed, what failed, why. Useful for improving generators over time.

registerProcessor('content-reviewer', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx
  const model = ctx.selectModel()
  const results: ProcessorResult[] = []

  // Find draft articles that haven't been reviewed
  const drafts = await db.prepare(`
    SELECT a.id, a.title, a.summary, a.niche, a.word_count
    FROM artifacts a
    WHERE a.format = 'text' AND a.status = 'draft'
      AND a.created_by IN ('article-generator', 'social-adapter', 'voice-adapter')
      AND a.id NOT IN (
        SELECT json_each.value FROM artifacts r,
        json_each(r.source_ids) WHERE r.created_by = 'content-reviewer'
      )
    ORDER BY a.created_at ASC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  for (const draft of drafts.results) {
    const startTime = Date.now()
    try {
      const content = await ctx.r2.get(`artifacts/${draft.id}/content.md`)
      const text = content ? await content.text() : draft.summary

      // AI review with scoring rubric
      const review = await reviewContent(model, {
        title: draft.title,
        content: text,
        niche: draft.niche,
        wordCount: draft.word_count,
      })

      // Store the review as its own data artifact
      const reviewId = crypto.randomUUID()
      await db.prepare(`
        INSERT INTO artifacts
          (id, format, status, title, tags, created_by, source_ids, properties, created_at, updated_at)
        VALUES (?, 'data', 'ready', ?, '["review", "quality"]', 'content-reviewer', ?, ?, datetime('now'), datetime('now'))
      `).bind(
        reviewId,
        `Review: ${draft.title}`,
        JSON.stringify([draft.id]),
        JSON.stringify({
          score: review.score,
          passed: review.score >= 0.7,
          feedback: review.feedback,
          criteria: review.criteria,
        }),
      ).run()

      // Record edge
      await db.prepare(`
        INSERT INTO edges (source_id, target_id, relation, created_at)
        VALUES (?, ?, 'reviewed_by', datetime('now'))
      `).bind(draft.id, reviewId).run()

      // Update the original artifact's status based on review score
      const newStatus = review.score >= 0.7 ? 'ready' : 'draft'
      await db.prepare(`
        UPDATE artifacts SET status = ?, updated_at = datetime('now') WHERE id = ?
      `).bind(newStatus, draft.id).run()

      results.push({
        processor: 'content-reviewer',
        entity_type: 'review',
        entity_id: draft.id,
        model_used: model,
        tokens_input: review.tokensIn,
        tokens_output: review.tokensOut,
        cost_usd: review.cost,
        duration_ms: Date.now() - startTime,
        success: true,
        result_summary: `Reviewed "${draft.title}": ${review.score.toFixed(2)}${newStatus}`,
      })
    } catch (err: any) {
      results.push({
        processor: 'content-reviewer',
        entity_type: 'review',
        entity_id: draft.id,
        model_used: model,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

5. Publisher — Deploys to Destinations

Pushes content to a destination. Thin wrapper around platform APIs. The simplest processor — just delivery.

VariantDestination
Social postX, TikTok, LinkedIn, Instagram
Site pagebrand website via SSR
Email sendinbox via transactional email

Persists publish logs: what was sent, when, where, response.

registerProcessor('site-publisher', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx
  const results: ProcessorResult[] = []

  // Find reviewed articles ready for publishing
  const ready = await db.prepare(`
    SELECT a.id, a.title, a.summary, a.tags, a.niche
    FROM artifacts a
    WHERE a.format = 'text' AND a.status = 'ready'
      AND a.created_by IN ('article-generator', 'voice-adapter')
      AND a.id NOT IN (
        SELECT e.source_id FROM edges e WHERE e.relation = 'published_to'
      )
    ORDER BY a.created_at ASC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  for (const article of ready.results) {
    const startTime = Date.now()
    try {
      const content = await ctx.r2.get(`artifacts/${article.id}/content.md`)
      const text = content ? await content.text() : ''

      // Push to CMS / site API
      const publishResult = await publishToSite({
        title: article.title,
        content: text,
        tags: JSON.parse(article.tags),
        niche: article.niche,
      })

      // Record the publish event as a data artifact
      const publishId = crypto.randomUUID()
      await db.prepare(`
        INSERT INTO artifacts
          (id, format, status, title, tags, created_by, source_ids, properties, created_at, updated_at)
        VALUES (?, 'data', 'ready', ?, '["publish", "site"]', 'site-publisher', ?, ?, datetime('now'), datetime('now'))
      `).bind(
        publishId,
        `Published: ${article.title}`,
        JSON.stringify([article.id]),
        JSON.stringify({
          url: publishResult.url,
          platform: 'site',
          published_at: new Date().toISOString(),
        }),
      ).run()

      await db.prepare(`
        INSERT INTO edges (source_id, target_id, relation, created_at)
        VALUES (?, ?, 'published_to', datetime('now'))
      `).bind(article.id, publishId).run()

      // Mark original as archived (published lifecycle complete)
      await db.prepare(`
        UPDATE artifacts SET status = 'archived', updated_at = datetime('now') WHERE id = ?
      `).bind(article.id).run()

      results.push({
        processor: 'site-publisher',
        entity_type: 'publish',
        entity_id: article.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: true,
        result_summary: `Published "${article.title}" to ${publishResult.url}`,
      })
    } catch (err: any) {
      results.push({
        processor: 'site-publisher',
        entity_type: 'publish',
        entity_id: article.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

6. Collector — Aggregates Feedback

Captures audience signals back into the system. Feeds back into Researcher stores for the next cycle.

VariantWhat it captures
Email captureform submissions, brand, source
Engagementlikes, replies, shares, clicks
Analyticstraffic, conversions, behavior

Always listening. Continuous persistence to audience and engagement stores.

registerProcessor('engagement-collector', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx
  const results: ProcessorResult[] = []

  // Find published artifacts that need engagement data collected
  const published = await db.prepare(`
    SELECT a.id, a.properties, a.source_ids
    FROM artifacts a
    WHERE a.created_by = 'site-publisher'
      AND a.status = 'ready'
      AND datetime(json_extract(a.properties, '$.published_at')) < datetime('now', '-1 hour')
      AND a.id NOT IN (
        SELECT json_each.value FROM artifacts c,
        json_each(c.source_ids) WHERE c.created_by = 'engagement-collector'
      )
    ORDER BY a.created_at ASC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  for (const pub of published.results) {
    const startTime = Date.now()
    try {
      const props = JSON.parse(pub.properties)
      const engagement = await fetchEngagement(props.url, props.platform)

      const engId = crypto.randomUUID()
      await db.prepare(`
        INSERT INTO artifacts
          (id, format, status, title, tags, created_by, source_ids, properties, created_at, updated_at)
        VALUES (?, 'data', 'ready', ?, '["engagement", "metrics"]', 'engagement-collector', ?, ?, datetime('now'), datetime('now'))
      `).bind(
        engId,
        `Engagement: ${props.url}`,
        JSON.stringify([pub.id, ...JSON.parse(pub.source_ids)]),
        JSON.stringify({
          views: engagement.views,
          clicks: engagement.clicks,
          shares: engagement.shares,
          time_on_page: engagement.avgTimeOnPage,
          bounce_rate: engagement.bounceRate,
          collected_at: new Date().toISOString(),
        }),
      ).run()

      results.push({
        processor: 'engagement-collector',
        entity_type: 'engagement',
        entity_id: pub.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: true,
        result_summary: `Collected ${engagement.views} views, ${engagement.clicks} clicks`,
      })
    } catch (err: any) {
      results.push({
        processor: 'engagement-collector',
        entity_type: 'engagement',
        entity_id: pub.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

7. Storer — Persists to Storage

Cross-cutting persistence layer. Every processor has its own store, but the Storer pattern defines how stores are queryable across the system. Handles the mechanics of putting large content in R2, metadata in D1, and cached lookups in KV.

VariantWhat it holds
Research storequeryable findings (D1 + R2)
Content storegenerated artifacts + metadata (D1 + R2)
Audience storecontacts, segments, history (D1)
CacheAPI responses, TTL-based (KV)
registerProcessor('artifact-storer', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx
  const results: ProcessorResult[] = []

  // Find artifacts with content that needs R2 persistence
  const pending = await db.prepare(`
    SELECT id, format, properties FROM artifacts
    WHERE status = 'pending_store'
    ORDER BY created_at ASC
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  for (const artifact of pending.results) {
    const startTime = Date.now()
    try {
      const props = JSON.parse(artifact.properties || '{}')

      // Move large content to R2
      if (props._content) {
        await ctx.r2.put(`artifacts/${artifact.id}/content`, props._content)
        delete props._content
        props.r2_key = `artifacts/${artifact.id}/content`
      }

      // Update artifact with storage metadata
      await db.prepare(`
        UPDATE artifacts SET status = 'ready', properties = ?, updated_at = datetime('now') WHERE id = ?
      `).bind(JSON.stringify(props), artifact.id).run()

      // Write to KV cache for hot lookups
      if (props.cache_key) {
        await ctx.kv.put(
          props.cache_key,
          JSON.stringify({ id: artifact.id, format: artifact.format }),
          { expirationTtl: 3600 },
        )
      }

      results.push({
        processor: 'artifact-storer',
        entity_type: 'store',
        entity_id: artifact.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: true,
      })
    } catch (err: any) {
      results.push({
        processor: 'artifact-storer',
        entity_type: 'store',
        entity_id: artifact.id,
        model_used: null,
        tokens_input: 0, tokens_output: 0, cost_usd: 0,
        duration_ms: Date.now() - startTime,
        success: false,
        error_message: err.message,
      })
    }
  }

  return results
})

Summary Table

TypeDoesInputOutputTypical cost
ResearcherGathers external dataAPIs, web, databasesData artifactsAPI fees
GeneratorCreates contentData artifactsContent artifactsLLM tokens (high)
AdapterTransforms format/voiceContent artifactsNew content artifactsLLM tokens (low)
ReviewerEvaluates qualityDraft artifactsReview artifacts + status updatesLLM tokens (low)
PublisherDeploys to destinationsReady artifactsPublish recordsPlatform API fees
CollectorAggregates feedbackPlatform APIsEngagement artifactsUsually free
StorerPersists across tiersAny artifactsStorage metadataStorage costs

The hard problem isn’t the processors — it’s naming, tagging, and linking so any processor can find what it needs with a simple query.

Three principles:

  1. Rich metadata — artifacts carry enough context to be found without knowing who created them
  2. Encourage composition — link to parents, groups, sources
  3. Encourage reusability — describe what they ARE, never what they’re FOR

The Cooking Analogy

Artifacts are ingredients. Label them by what they are (protein, vegetable, grain), not what dish they’re for. The same chicken works in a stir-fry, a sandwich, a soup. Don’t label it “soup chicken.” The cook decides what to make when everything is ready.

A piece of text is just text. It has a topic, a length, some tags. It doesn’t know if it’ll become a blog post, email body, book chapter, tweet thread, or video script. The processor that picks it up — the cook — makes that decision at the moment of use.

Don’t pre-define categories. Describe properties richly and let patterns emerge. A processor that needs “short text in professional voice about fintech” queries:

SELECT * FROM artifacts
WHERE format = 'text'
  AND voice = 'nichefi-professional'
  AND word_count < 500
  AND tags LIKE '%fintech%'

The category “LinkedIn post” was never defined — it emerged from the query.

TypeScript Interfaces

interface Artifact {
  id: string
  format: ArtifactFormat
  status: ArtifactStatus

  // What is this about?
  title: string | null
  summary: string | null
  tags: string[]
  niche: string | null
  project: string | null

  // Who made it and how does it sound?
  author: string | null
  voice: string | null

  // Descriptive measurements (let patterns emerge)
  word_count: number | null
  paragraphs: number | null
  properties: ArtifactMetadata

  // Where does it fit in a collection?
  parent_id: string | null
  group_id: string | null
  sequence: number | null

  // Where did it come from?
  source_ids: string[]
  created_by: string

  created_at: string
  updated_at: string
}

type ArtifactFormat = 'text' | 'outline' | 'image' | 'data' | 'mixed'

type ArtifactStatus = 'draft' | 'ready' | 'reviewed' | 'published' | 'archived'

interface ArtifactMetadata {
  /** For text: reading_level, sentiment, language */
  /** For images: width, height, palette */
  /** For data: row_count, columns, freshness */
  /** For outlines: depth, node_count */
  [key: string]: unknown
}

interface ArtifactQuery {
  format?: ArtifactFormat
  status?: ArtifactStatus | ArtifactStatus[]
  tags?: string[]
  niche?: string
  project?: string
  voice?: string
  created_by?: string
  min_word_count?: number
  max_word_count?: number
  not_source_of?: string  // Exclude artifacts already consumed by this processor
  limit?: number
  order_by?: 'created_at' | 'updated_at' | 'word_count'
  order_dir?: 'asc' | 'desc'
}

D1 Schema

CREATE TABLE artifacts (
  id          TEXT PRIMARY KEY,
  format      TEXT NOT NULL,        -- 'text', 'outline', 'image', 'data', 'mixed'
  status      TEXT NOT NULL,        -- 'draft', 'ready', 'reviewed', 'published', 'archived'

  -- Metadata
  title       TEXT,
  summary     TEXT,
  tags        TEXT,                 -- JSON array: ['scramjet', 'architecture']
  niche       TEXT,
  project     TEXT,

  -- Voice & authorship
  author      TEXT,
  voice       TEXT,                 -- 'scramjet-technical', 'nichefi-casual', NULL if raw

  -- Measurements
  word_count  INTEGER,
  paragraphs  INTEGER,
  properties  TEXT,                 -- JSON: format-specific measurements
                                   --   text:    { reading_level, sentiment, language }
                                   --   image:   { width, height, palette }
                                   --   data:    { row_count, columns, freshness }
                                   --   outline: { depth, node_count }

  -- Composition
  parent_id   TEXT,                 -- part → whole
  group_id    TEXT,                 -- all parts of a collection share this
  sequence    INTEGER,             -- ordering within a group

  -- Lineage
  source_ids  TEXT,                 -- JSON: artifact IDs this was derived from
  created_by  TEXT,                 -- processor that created it

  -- Housekeeping
  created_at  TEXT NOT NULL,
  updated_at  TEXT NOT NULL
);

-- Shopping queries: find artifacts by shape and topic
CREATE INDEX idx_artifacts_shop ON artifacts(format, status, niche);
CREATE INDEX idx_artifacts_voice ON artifacts(voice, format, status);
CREATE INDEX idx_artifacts_project ON artifacts(project, format, status);
CREATE INDEX idx_artifacts_group ON artifacts(group_id, sequence);
CREATE INDEX idx_artifacts_tags ON artifacts(tags);
CREATE INDEX idx_artifacts_created_by ON artifacts(created_by, status);

Artifact Lifecycle

         ┌──────────────────────────────────────────────┐
         │                                              │
    ┌────┴───┐    ┌──────────┐    ┌──────────┐    ┌────▼────┐
    │ draft  │───►│  ready   │───►│published │───►│archived │
    └────┬───┘    └──────────┘    └──────────┘    └─────────┘
         │              ▲
         └──────────────┘
          (rejected by reviewer,
           regenerate and retry)

The Self-Enriching Flywheel

The store fills its own gaps. When a processor needs an artifact in a form that doesn’t exist yet, it creates it — and now it exists for everyone.

1. Query: "text, scramjet, professional voice, 800+ words"
2. Found:  great scramjet text — but casual voice, 1200 words
3. Gap:    no professional version exists
4. Fill:   adapter creates professional version → stores it
           (source_ids → the casual original, voice → 'professional')
5. Next time: professional scramjet text is already there

No one planned this. No rigid hierarchy predicted “we’ll need a professional version of article X.” The gap was discovered at the moment of need, filled, and the store got richer.

Every Operation Feeds the Store

Nothing is consumed. Nothing is thrown away. Every operation produces new artifacts.

Gather    → data artifacts (research, scrapes, trends)
Transform → new artifacts (originals stay untouched)
Publish   → publish records (what was sent, where, when)
Collect   → engagement artifacts (likes, clicks, replies)
Review    → review artifacts (scores, feedback, rejection reasons)
Failures  → signal artifacts (what didn't work is valuable too)

Published content itself feeds the next cycle:

publish article → engagement data flows back → becomes data artifact
  → next generation queries "what topics got most engagement?"
  → produces better content → publishes → more engagement
  → richer store → smarter generation → ...

The store doesn’t grow linearly — it compounds. Each cycle is informed by everything that came before. Research from 3 months ago + engagement data from last week + a failed draft from yesterday = a better article today. All connected through edges, all queryable by any processor.

At D1/R2 costs, storage is essentially free. Keep everything.

How Processors Shop for Ingredients

Processors find what they need through queries, not through being told.

Researcher stores findings as data:

INSERT INTO artifacts (id, format, status, tags, niche, created_by, created_at, updated_at)
VALUES (?, 'data', 'ready', '["app-store", "productivity"]', 'productivity-apps', 'topic-researcher', datetime('now'), datetime('now'))

Generator shops by format + topic:

SELECT * FROM artifacts
WHERE format = 'data' AND status = 'ready' AND niche = 'productivity-apps'
ORDER BY created_at DESC

Assembler checks if parts of a collection are ready:

SELECT COUNT(*) as ready
FROM artifacts
WHERE format = 'text' AND status = 'ready' AND group_id = 'book-xyz'

Any processor shops for reusable content:

SELECT * FROM artifacts
WHERE format = 'text' AND status = 'ready' AND tags LIKE '%scramjet%'
AND id NOT IN (SELECT source_ids FROM artifacts WHERE created_by = 'format-adapter-x')

Gap detection — find research with no generated content:

SELECT a.* FROM artifacts a
WHERE a.format = 'data' AND a.status = 'ready'
AND a.id NOT IN (SELECT e.source_id FROM edges e WHERE e.relation = 'derived_from')

Engagement-driven prioritization:

SELECT a.niche, SUM(CAST(json_extract(e.properties, '$.engagement_score') AS REAL)) as score
FROM artifacts a
JOIN artifacts e ON e.tags LIKE '%' || a.niche || '%' AND e.created_by = 'engagement-collector'
WHERE a.format = 'data' AND a.status = 'ready'
GROUP BY a.niche
ORDER BY score DESC

source_ids as JSON works for “what was this made from?” but not for “what was made from this?” or “show me the full derivation tree.” Use a proper edges table for bidirectional graph queries.

Schema

CREATE TABLE edges (
  source_id   TEXT NOT NULL,     -- The artifact that was used
  target_id   TEXT NOT NULL,     -- The artifact that was created from it
  relation    TEXT NOT NULL,     -- 'derived_from', 'adapted_from', 'part_of', 'references'
  created_at  TEXT NOT NULL,
  PRIMARY KEY (source_id, target_id, relation)
);

-- Efficient queries in both directions
CREATE INDEX idx_edges_source ON edges(source_id);
CREATE INDEX idx_edges_target ON edges(target_id);
CREATE INDEX idx_edges_relation ON edges(relation);

Edge Types

RelationMeaningExample
derived_fromCreated using this artifact as inputArticle derived from keyword research
adapted_fromSame content, different format/voiceSocial post adapted from article
part_ofThis artifact is a component of a larger oneChapter is part of a book
referencesThis artifact cites or uses data from anotherArticle references engagement data
reviewed_byA review artifact evaluating this contentReview linked to the article it scored
published_toContent deployed to a destinationArticle published to site

Querying the Graph

// What was this artifact made from?
async function getLineage(db: D1Database, artifactId: string): Promise<Edge[]> {
  return (await db.prepare(`
    SELECT * FROM edges WHERE target_id = ?
  `).bind(artifactId).all()).results as Edge[]
}

// What has been made from this artifact?
async function getDerivatives(db: D1Database, artifactId: string): Promise<Edge[]> {
  return (await db.prepare(`
    SELECT * FROM edges WHERE source_id = ?
  `).bind(artifactId).all()).results as Edge[]
}

// Full derivation tree (recursive CTE — SQLite/D1 supports this)
async function getDerivationTree(db: D1Database, rootId: string): Promise<Edge[]> {
  return (await db.prepare(`
    WITH RECURSIVE tree AS (
      SELECT source_id, target_id, relation, 0 as depth
      FROM edges WHERE source_id = ?
      UNION ALL
      SELECT e.source_id, e.target_id, e.relation, t.depth + 1
      FROM edges e
      JOIN tree t ON e.source_id = t.target_id
      WHERE t.depth < 10
    )
    SELECT * FROM tree
  `).bind(rootId).all()).results as Edge[]
}

// Find all adaptations of a specific article
async function getAdaptations(db: D1Database, articleId: string): Promise<Artifact[]> {
  return (await db.prepare(`
    SELECT a.* FROM artifacts a
    JOIN edges e ON e.target_id = a.id
    WHERE e.source_id = ? AND e.relation = 'adapted_from'
  `).bind(articleId).all()).results as Artifact[]
}

// Find articles that have been published but never adapted for social
async function findUnadaptedArticles(db: D1Database): Promise<Artifact[]> {
  return (await db.prepare(`
    SELECT a.* FROM artifacts a
    WHERE a.format = 'text' AND a.status IN ('ready', 'reviewed')
      AND a.created_by = 'article-generator'
      AND a.id NOT IN (
        SELECT e.source_id FROM edges e
        WHERE e.relation = 'adapted_from'
          AND e.target_id IN (
            SELECT id FROM artifacts WHERE created_by = 'social-adapter'
          )
      )
  `).all()).results as Artifact[]
}

Versioned Edge Sets

When you deploy a new pipeline version, edges from the old version still exist and remain queryable. New processors create new edges. The graph grows monotonically — nothing is deleted, only appended. This makes rollback safe: reverting to an old pipeline version means running old processors that create edges the same way they always did.


Processors are pure functions. They self-register and the control plane looks them up by name at runtime.

Types

export interface ProcessorConfig {
  name: string                          // 'keyword-researcher', 'article-generator', etc.
  type: 'deterministic' | 'llm'        // Does it call an LLM?
  enabled: boolean
  phase: string                         // 'gather' | 'generate' | 'adapt' | 'review' | 'publish' | 'collect'

  // LLM processors only
  model_tier?: 'default' | 'expensive' | 'premium'
  max_tokens?: number
  temperature?: number
  prompt_version?: number               // Track which prompt version is active

  // Budget allocation
  weight: number                        // Share of cycle budget (0.0-1.0, all weights sum to 1.0)
  max_cost_per_call: number             // Hard cap per invocation in USD

  // Execution
  batch_size: number                    // Entities per tick
}

export interface ProcessorResult {
  processor: string
  entity_type: string
  entity_id: string
  model_used: string | null
  tokens_input: number
  tokens_output: number
  cost_usd: number
  duration_ms: number
  success: boolean
  error_message?: string
  result_summary?: string
  spawned_entities?: Array<{ type: string; id: string }>
}

export interface ProcessorContext {
  db: D1Database
  r2: R2Bucket
  kv: KVNamespace
  config: HarnessConfig
  processor: ProcessorConfig
  cycle: CycleState
  selectModel: () => string             // Respects budget and tier
  remainingBudget: () => number         // USD remaining for this processor
}

export type ProcessorFn = (ctx: ProcessorContext) => Promise<ProcessorResult[]>

Registry Implementation

const PROCESSOR_REGISTRY = new Map<string, ProcessorFn>()

export function registerProcessor(name: string, fn: ProcessorFn): void {
  PROCESSOR_REGISTRY.set(name, fn)
}

export function getProcessor(name: string): ProcessorFn | undefined {
  return PROCESSOR_REGISTRY.get(name)
}

export function listProcessors(): string[] {
  return Array.from(PROCESSOR_REGISTRY.keys())
}

The registry is a flat map. No hierarchy, no dependency declarations, no ordering constraints. The control plane decides when to run each processor. The processor just does its work when called.

Self-Registering Processors

Each processor lives in its own file and registers itself on import:

// processors/keyword-researcher.ts
import { registerProcessor } from '../registry'
registerProcessor('keyword-researcher', async (ctx) => { /* ... */ })

// processors/article-generator.ts
import { registerProcessor } from '../registry'
registerProcessor('article-generator', async (ctx) => { /* ... */ })

The harness imports all processor files at startup, which triggers registration:

// processors/index.ts — import all to register
import './keyword-researcher'
import './article-generator'
import './social-adapter'
import './content-reviewer'
import './site-publisher'
import './engagement-collector'
import './artifact-storer'

Phase-Ordered Execution

Processors execute in phase order. Within a phase, processors run in config order. The phase sequence ensures data flows correctly:

const PHASE_ORDER = ['gather', 'generate', 'adapt', 'review', 'publish', 'collect']

The execution flow in each cycle:

gather phase:     keyword-researcher → topic-researcher → web-scraper
                  (all researchers run, accumulating data artifacts)

generate phase:   article-generator → image-generator
                  (generators query data artifacts, produce content)

adapt phase:      voice-adapter → social-adapter → email-adapter
                  (adapters reshape content for different contexts)

review phase:     content-reviewer → seo-reviewer
                  (reviewers score and gate content)

publish phase:    site-publisher → social-publisher → email-publisher
                  (publishers deploy ready content)

collect phase:    engagement-collector → analytics-collector
                  (collectors capture signals for the next cycle)

How the Harness Runs a Tick

The control plane runs one processor per tick to stay under CPU limits:

async function tick(config: HarnessConfig, cycle: CycleState): Promise<boolean> {
  // Budget gate
  if (config.budget_mode === 'hard' && cycle.cost_so_far >= config.budget_max_per_cycle) {
    return true // Cycle done — out of budget
  }

  // Get processors for current phase
  const phaseProcessors = config.processors
    .filter(p => p.enabled && p.phase === cycle.phase)

  if (phaseProcessors.length === 0) {
    return advancePhase(cycle)
  }

  // Find the next processor that has work
  let didWork = false
  for (const proc of phaseProcessors) {
    const fn = getProcessor(proc.name)
    if (!fn) continue

    // Per-processor budget gate (soft mode)
    if (proc.type === 'llm' && config.budget_mode === 'soft') {
      const allocated = config.budget_max_per_cycle * proc.weight
      const spent = cycle.processor_stats[proc.name]?.cost || 0
      if (spent >= allocated) continue
    }

    const ctx = buildContext(config, proc, cycle)
    const results = await fn(ctx)

    for (const r of results) {
      recordResult(cycle, r)
    }

    didWork = results.length > 0
    if (didWork) break // One processor per tick
  }

  if (!didWork) {
    return advancePhase(cycle) // Phase exhausted
  }

  return false // More work to do
}

function advancePhase(cycle: CycleState): boolean {
  const currentIdx = PHASE_ORDER.indexOf(cycle.phase)
  if (currentIdx >= PHASE_ORDER.length - 1) {
    return true // Last phase done → cycle complete
  }
  cycle.phase = PHASE_ORDER[currentIdx + 1]
  return false // More phases to go
}

Every LLM call costs money. The architecture tracks cost at every level and stops before it overspends.

Two Modes

Hard mode. When cost_so_far >= budget_max_per_cycle, the cycle stops. Period. No more processors run.

if (config.budget_mode === 'hard' && cycle.cost_so_far >= config.budget_max_per_cycle) {
  return true // done — out of budget
}

Soft mode. Each processor gets a weighted share of the total budget. When a processor exhausts its share, it gets skipped — but other processors keep running. This means researchers (API calls) and publishers (no LLM cost) can keep running even when generators (expensive LLM calls) are paused.

if (config.budget_mode === 'soft') {
  const allocated = config.budget_max_per_cycle * proc.weight  // e.g., 0.4 = 40%
  const spent = cycle.processor_stats[proc.name]?.cost || 0
  if (spent >= allocated) continue // skip — this processor is over budget
}

Weight Allocation

Each processor declares a weight from 0.0 to 1.0. All weights sum to 1.0:

ProcessorWeightPhaseRationale
keyword-researcher0.05gatherCheap API calls
topic-researcher0.10gatherExternal API costs
article-generator0.35generateExpensive LLM calls, most budget
image-generator0.15generateMedium LLM + image API
voice-adapter0.08adaptQuick transforms
social-adapter0.07adaptQuick transforms
content-reviewer0.10reviewShort evaluation prompts
site-publisher0.05publishNo LLM cost
engagement-collector0.05collectNo LLM cost

Model Tier Routing

Three tiers: default, expensive, premium. The control plane auto-downgrades when budget is tight:

function selectModel(proc: ProcessorConfig, cycle: CycleState, config: HarnessConfig): string {
  const remaining = config.budget_max_per_cycle - cycle.cost_so_far

  // Budget < 20% remaining → force default tier
  if (remaining < config.budget_max_per_cycle * 0.2 && proc.model_tier !== 'default') {
    return config.model_default     // e.g., gemini-2.5-flash
  }

  switch (proc.model_tier) {
    case 'premium':   return config.model_premium    // e.g., gemini-2.5-pro
    case 'expensive': return config.model_expensive  // e.g., claude-sonnet-4
    default:          return config.model_default    // e.g., gemini-2.5-flash
  }
}

Graceful Degradation in Practice

Budget: $0.50/cycle

Phase     Processor              Cost     Running total   Decision
────────  ─────────────────────  ───────  ─────────────   ────────
gather    keyword-researcher     $0.01    $0.01           run
gather    topic-researcher       $0.03    $0.04           run
generate  article-generator      $0.25    $0.29           run (within allocation)
generate  article-generator      $0.20    $0.49           skip (soft: weight exhausted)
generate  image-generator        $0.08    $0.49           skip (soft: tight budget)
adapt     voice-adapter          $0.03    $0.49           skip (soft: tight budget)
review    content-reviewer       $0.02    $0.49           run (cheap, within allocation)
publish   site-publisher         $0.00    $0.49           run (deterministic)
collect   engagement-collector   $0.00    $0.49           run (deterministic)

Per-Processor Telemetry

Every processor invocation logs cost, tokens, and duration to D1:

interface ProcessorStats {
  cost: number       // USD spent
  calls: number      // invocations
  total_ms: number   // wall clock time
  failures: number   // error count
}

// Aggregated per-cycle
interface CycleTelemetry {
  cycle_id: number
  total_cost: number
  total_tokens_input: number
  total_tokens_output: number
  processor_stats: Record<string, ProcessorStats>
}

The dashboard reads this in real time. Over time, you can answer:


The system tunes itself based on success rate. No static intervals. No manual knobs.

The Self-Driving Loop

import { DurableObject } from 'cloudflare:workers'

// Safety rails
const ABSOLUTE_MIN_BATCH = 2
const ABSOLUTE_MAX_BATCH = 50
const MIN_INTERVAL_MS = 10_000         // fastest: 10s
const MAX_INTERVAL_MS = 120_000        // slowest: 2 min
const COOLDOWN_MS = 300_000            // 5 min on critical failure
const METRICS_WINDOW_MINUTES = 5

// Adaptive thresholds
const SUCCESS_RATE_CRITICAL = 0.2      // Below 20% → cooldown
const SUCCESS_RATE_LOW = 0.5           // Below 50% → halve batch, slow down
const SUCCESS_RATE_GOOD = 0.8          // Above 80% → gentle ramp up
const SUCCESS_RATE_GREAT = 0.95        // Above 95% → aggressive ramp up

interface ControllerState {
  running: boolean
  current_batch_size: number
  current_interval_ms: number
  cooldown_until: number

  // Rolling metrics keyed by minute timestamp
  metrics: Record<number, {
    dispatched: number
    completed: number
    failed: number
  }>
}

Alarm-Based Heartbeat

The Durable Object uses Cloudflare’s alarm API to wake itself. No external cron, no queue, no scheduler. The DO is its own scheduler:

class AdaptiveController extends DurableObject<Env> {
  private state: ControllerState

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env)
    ctx.blockConcurrencyWhile(async () => {
      const saved = await ctx.storage.get<ControllerState>('state')
      if (saved) this.state = { ...DEFAULT_STATE, ...saved }
      this.pruneMetrics()
      // Resume alarm if running
      if (this.state.running) {
        const existing = await ctx.storage.getAlarm()
        if (!existing) {
          await ctx.storage.setAlarm(Date.now() + this.state.current_interval_ms)
        }
      }
    })
  }

  async alarm(): Promise<void> {
    if (!this.state.running) return

    // Cooldown check
    if (Date.now() < this.state.cooldown_until) {
      await this.scheduleNext()
      return
    }

    // Pick work from artifact store
    const batch = await this.pickWork(this.state.current_batch_size)
    if (batch.length === 0) {
      await this.scheduleNext()
      return
    }

    // Execute the batch
    const { completed, failed } = await this.executeBatch(batch)

    // Record metrics
    this.recordMetrics(batch.length, completed, failed)

    // Adapt based on success rate
    this.adapt()

    // Persist and schedule next tick
    await this.persist()
    await this.scheduleNext()
  }

  private async scheduleNext(): Promise<void> {
    if (this.state.running) {
      await this.ctx.storage.setAlarm(Date.now() + this.state.current_interval_ms)
    }
  }
}

Rolling Metrics Window

Success rate is calculated over a rolling window (default: 5 minutes). Recent performance drives adaptation:

private successRate(): number {
  const windowStart = Date.now() - METRICS_WINDOW_MINUTES * 60_000
  let completed = 0, failed = 0

  for (const [ts, m] of Object.entries(this.state.metrics)) {
    if (Number(ts) >= windowStart) {
      completed += m.completed
      failed += m.failed
    }
  }

  const total = completed + failed
  return total === 0 ? 1 : completed / total
}

private recordMetrics(dispatched: number, completed: number, failed: number): void {
  const minuteTs = Math.floor(Date.now() / 60_000) * 60_000
  if (!this.state.metrics[minuteTs]) {
    this.state.metrics[minuteTs] = { dispatched: 0, completed: 0, failed: 0 }
  }
  this.state.metrics[minuteTs].dispatched += dispatched
  this.state.metrics[minuteTs].completed += completed
  this.state.metrics[minuteTs].failed += failed
}

private pruneMetrics(): void {
  const cutoff = Date.now() - (METRICS_WINDOW_MINUTES + 1) * 60_000
  for (const ts of Object.keys(this.state.metrics)) {
    if (Number(ts) < cutoff) {
      delete this.state.metrics[Number(ts)]
    }
  }
}

Adaptive Batch Size and Interval

Four zones, each with different behavior:

private adapt(): void {
  const sr = this.successRate()
  const recentTotal = this.recentTotal()

  // Need minimum sample size before adapting
  if (recentTotal < 5) return

  if (sr < SUCCESS_RATE_CRITICAL) {
    // CRITICAL (< 20%): Full cooldown. Something is seriously wrong.
    this.state.cooldown_until = Date.now() + COOLDOWN_MS
    this.state.current_batch_size = ABSOLUTE_MIN_BATCH
    this.state.current_interval_ms = MAX_INTERVAL_MS

  } else if (sr < SUCCESS_RATE_LOW) {
    // LOW (20-50%): Halve batch, slow down interval
    this.state.current_batch_size = Math.max(
      ABSOLUTE_MIN_BATCH,
      Math.floor(this.state.current_batch_size * 0.5),
    )
    this.state.current_interval_ms = Math.min(
      MAX_INTERVAL_MS,
      Math.floor(this.state.current_interval_ms * 1.5),
    )

  } else if (sr > SUCCESS_RATE_GREAT) {
    // GREAT (> 95%): Aggressive ramp up
    this.state.current_batch_size = Math.min(
      ABSOLUTE_MAX_BATCH,
      Math.ceil(this.state.current_batch_size * 1.25),
    )
    this.state.current_interval_ms = Math.max(
      MIN_INTERVAL_MS,
      Math.floor(this.state.current_interval_ms * 0.8),
    )

  } else if (sr > SUCCESS_RATE_GOOD) {
    // GOOD (80-95%): Gentle ramp up
    this.state.current_batch_size = Math.min(
      ABSOLUTE_MAX_BATCH,
      Math.ceil(this.state.current_batch_size * 1.1),
    )
    this.state.current_interval_ms = Math.max(
      MIN_INTERVAL_MS,
      Math.floor(this.state.current_interval_ms * 0.95),
    )
  }
  // NEUTRAL (50-80%): Hold steady
}

Adaptation Summary

Success rate     Batch size    Interval     Behavior
─────────────    ──────────    ─────────    ─────────────────────
< 20%            → 2 (min)    → 120s       COOLDOWN 5 min
20-50%           × 0.50       × 1.50       Halve batch, slow down
50-80%           (hold)       (hold)       Wait for better signal
80-95%           × 1.10       × 0.95       Gentle ramp up
> 95%            × 1.25       × 0.80       Aggressive ramp up

HTTP API for Dashboard Control

async fetch(request: Request): Promise<Response> {
  const url = new URL(request.url)

  if (request.method === 'GET' && url.pathname === '/status') {
    return Response.json({
      running: this.state.running,
      current_batch_size: this.state.current_batch_size,
      current_interval_ms: this.state.current_interval_ms,
      success_rate: Math.round(this.successRate() * 100),
      completions_per_minute: this.recentCompletionsPerMinute(),
      in_cooldown: Date.now() < this.state.cooldown_until,
      projected_per_day: this.recentCompletionsPerMinute() * 60 * 24,
    })
  }

  if (request.method === 'POST' && url.pathname === '/start') {
    this.state.running = true
    this.state.cooldown_until = 0
    await this.persist()
    await this.ctx.storage.setAlarm(Date.now() + 1000)
    return Response.json({ ok: true, running: true })
  }

  if (request.method === 'POST' && url.pathname === '/stop') {
    this.state.running = false
    await this.persist()
    return Response.json({ ok: true, running: false })
  }

  return new Response('Not found', { status: 404 })
}

Processors are the atoms. Composition patterns are the molecules. Because every processor has the same interface (input artifacts, output artifacts), you can combine them in several ways.

Linear Pipeline

The simplest pattern. One processor’s output becomes the next processor’s input:

Researcher → Generator → Reviewer → Publisher

Each processor queries the artifact store for its inputs. The pipeline emerges from the data, not from explicit wiring.

Fan-Out

One processor’s output feeds multiple downstream processors:

                  ┌→ social-adapter → social-publisher
article-generator─┼→ email-adapter  → email-publisher
                  └→ site-publisher (directly)

All three adapters/publishers query the same artifact store. The article-generator writes one article; three processors independently find it.

Conditional Routing

Route artifacts to different processors based on their state:

registerProcessor('quality-router', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx

  const reviews = await db.prepare(`
    SELECT a.id, a.source_ids, json_extract(a.properties, '$.score') as score
    FROM artifacts a
    WHERE a.created_by = 'content-reviewer' AND a.status = 'ready'
      AND a.id NOT IN (
        SELECT json_each.value FROM artifacts r,
        json_each(r.source_ids) WHERE r.created_by = 'quality-router'
      )
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()

  const results: ProcessorResult[] = []

  for (const review of reviews.results) {
    const sourceIds = JSON.parse(review.source_ids as string)
    const originalId = sourceIds[0]
    const score = review.score as number

    if (score >= 0.8) {
      // High quality → ready for publishing
      await db.prepare(`
        UPDATE artifacts SET status = 'ready', updated_at = datetime('now') WHERE id = ?
      `).bind(originalId).run()
    } else if (score >= 0.5) {
      // Medium quality → send to voice adapter for improvement
      await db.prepare(`
        UPDATE artifacts SET status = 'draft', updated_at = datetime('now'),
        properties = json_set(COALESCE(properties, '{}'), '$.needs_revision', true)
        WHERE id = ?
      `).bind(originalId).run()
    } else {
      // Low quality → archive and let generator create fresh
      await db.prepare(`
        UPDATE artifacts SET status = 'archived', updated_at = datetime('now') WHERE id = ?
      `).bind(originalId).run()
    }

    results.push({
      processor: 'quality-router',
      entity_type: 'routing',
      entity_id: originalId,
      model_used: null,
      tokens_input: 0, tokens_output: 0, cost_usd: 0,
      duration_ms: 5,
      success: true,
      result_summary: `Score ${score}: ${score >= 0.8 ? 'ready' : score >= 0.5 ? 'revise' : 'archive'}`,
    })
  }

  return results
})

Fractal Composition: A Pipeline IS a Processor

The most powerful pattern. Processors are themselves composed of sub-processors. Same pattern at every level of the tree.

Book Generator
  checks: "are all chapters ready?"
  if yes → assemble → store

  Chapter Generator
    checks: "is there an approved outline?"
    if yes → generate → store

    Outline Generator
      checks: "is there research for this topic?"
      if yes → generate outline → store

No processor triggers another. No orchestration. Each one runs the same loop:

1. Check input store — is there work ready for me?
2. If yes → do my thing → write output to my store
3. If no → done (try again next cycle)

The book assembler, the chapter generator, the outline generator, the topic researcher — they all run this loop. The only difference is what “inputs ready” means and what “do my thing” produces.

registerProcessor('book-assembler', async (ctx: ProcessorContext): Promise<ProcessorResult[]> => {
  const { db } = ctx

  // Find books with all chapters ready
  const books = await db.prepare(`
    SELECT group_id,
      COUNT(*) as total,
      SUM(CASE WHEN status = 'ready' THEN 1 ELSE 0 END) as ready
    FROM artifacts
    WHERE format = 'text' AND parent_id IS NOT NULL AND group_id IS NOT NULL
    GROUP BY group_id
    HAVING total = ready AND total > 0
  `).all()

  const results: ProcessorResult[] = []

  for (const book of books.results) {
    const chapters = await db.prepare(`
      SELECT id FROM artifacts
      WHERE group_id = ? AND format = 'text' AND parent_id IS NOT NULL
      ORDER BY sequence ASC
    `).bind(book.group_id).all()

    // Assemble chapters into book artifact
    let fullContent = ''
    for (const ch of chapters.results) {
      const content = await ctx.r2.get(`artifacts/${ch.id}/content.md`)
      if (content) fullContent += await content.text() + '\n\n---\n\n'
    }

    const bookId = crypto.randomUUID()
    await ctx.r2.put(`artifacts/${bookId}/content.md`, fullContent)
    await db.prepare(`
      INSERT INTO artifacts (id, format, status, title, group_id, word_count, created_by, source_ids, created_at, updated_at)
      VALUES (?, 'text', 'draft', ?, ?, ?, 'book-assembler', ?, datetime('now'), datetime('now'))
    `).bind(
      bookId,
      `Book: ${book.group_id}`,
      book.group_id,
      fullContent.split(/\s+/).length,
      JSON.stringify(chapters.results.map(c => c.id)),
    ).run()

    results.push({
      processor: 'book-assembler',
      entity_type: 'book',
      entity_id: bookId,
      model_used: null,
      tokens_input: 0, tokens_output: 0, cost_usd: 0,
      duration_ms: 100,
      success: true,
      result_summary: `Assembled ${chapters.results.length} chapters`,
    })
  }

  return results
})

This eliminates:

The entire engine is N instances of this one loop, each checking its own store, each on its own schedule. Some run every minute, some daily, some weekly. The system converges toward completeness without coordination.

Pipeline Versioning

Pipelines are immutable configurations. To change a pipeline, create a new version:

interface PipelineVersion {
  version: number
  status: 'active' | 'draining' | 'inactive'
  processors: ProcessorConfig[]
  created_at: string
}

// Deployment flow:
// 1. Create new version with updated processors
// 2. Mark old version as 'draining' (finish in-flight, no new work)
// 3. Mark new version as 'active' (start accepting new work)
// 4. Once old version has no in-flight work, mark 'inactive'
// 5. If problems: reactivate old version (rollback)

async function deployPipelineVersion(
  db: D1Database,
  harnessId: number,
  newProcessors: ProcessorConfig[],
): Promise<void> {
  await db.prepare(`
    UPDATE pipeline_versions SET status = 'draining'
    WHERE harness_id = ? AND status = 'active'
  `).bind(harnessId).run()

  const currentMax = await db.prepare(`
    SELECT MAX(version) as v FROM pipeline_versions WHERE harness_id = ?
  `).bind(harnessId).first()

  await db.prepare(`
    INSERT INTO pipeline_versions (harness_id, version, status, processors_json, created_at)
    VALUES (?, ?, 'active', ?, datetime('now'))
  `).bind(harnessId, ((currentMax?.v as number) || 0) + 1, JSON.stringify(newProcessors)).run()
}

This architecture maps naturally to Cloudflare’s primitives. Each component has a clear home:

┌─────────────────────────────────────────────────────────┐
│ Worker (entry point)                                     │
│   Routes HTTP requests to the correct Durable Object     │
│                                                          │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Durable Object: Pipeline Controller (control plane)  │ │
│ │   - Alarm-based heartbeat (self-scheduling)          │ │
│ │   - Budget governor                                  │ │
│ │   - Phase-ordered processor execution                │ │
│ │   - Adaptive batch size + interval                   │ │
│ │   - Config from D1 (dashboard-controllable)          │ │
│ │   - HTTP API: /status, /start, /stop, /tick          │ │
│ └──────────────┬──────────────────────────────────────┘ │
│                │ calls registered processors             │
│ ┌──────────────▼──────────────────────────────────────┐ │
│ │ Processor Registry (data plane)                      │ │
│ │   - Pure functions registered by name                │ │
│ │   - Each processor reads/writes artifact store       │ │
│ │   - No awareness of other processors                 │ │
│ └──────────────┬──────────────────────────────────────┘ │
│                │ reads/writes                             │
│ ┌──────────────▼──────────────────────────────────────┐ │
│ │ Storage                                              │ │
│ │   D1  — artifacts table, edges table, processor log  │ │
│ │   R2  — large content bodies (articles, images)      │ │
│ │   KV  — hot cache for frequently accessed artifacts  │ │
│ └─────────────────────────────────────────────────────┘ │
│                                                          │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Queues (inter-pipeline communication)                │ │
│ │   - Pipeline A publishes artifacts to a queue        │ │
│ │   - Pipeline B consumes and processes them           │ │
│ └─────────────────────────────────────────────────────┘ │
│                                                          │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Workflows (multi-step processor chains)              │ │
│ │   - Long-running processor execution                 │ │
│ │   - Automatic retry with state persistence           │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘

Durable Object as Control Plane

The DO is the singleton that governs execution. One DO per pipeline. It owns:

export class PipelineController extends DurableObject<Env> {
  private state: PipelineState = DEFAULT_STATE
  private config: HarnessConfig | null = null

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env)
    ctx.blockConcurrencyWhile(async () => {
      const saved = await ctx.storage.get<PipelineState>('state')
      if (saved) this.state = { ...DEFAULT_STATE, ...saved }
      // Resume alarm on wake
      if (this.state.running) {
        const existing = await ctx.storage.getAlarm()
        if (!existing) {
          await ctx.storage.setAlarm(Date.now() + 5000)
        }
      }
    })
  }

  async alarm() {
    if (!this.state.running) return

    try {
      // Load config from DB every tick (dashboard can change it live)
      this.config = await this.loadConfig()
      if (!this.config) {
        this.state.error_message = 'Config not found'
        this.state.running = false
        await this.save()
        return
      }

      if (!this.state.current_cycle) {
        await this.startCycle()
      }

      const done = await this.tick()

      if (done) {
        await this.finalizeCycle()
        await this.ctx.storage.setAlarm(Date.now() + this.config.cycle_interval_ms)
      } else {
        await this.ctx.storage.setAlarm(Date.now() + 2000) // More work soon
      }

      await this.save()
    } catch (err: any) {
      this.state.error_message = err.message
      await this.save()
      await this.ctx.storage.setAlarm(Date.now() + 60_000) // Retry in 60s
    }
  }
}

Queues for Cross-Pipeline Communication

When pipelines need to communicate (e.g., “content pipeline produced articles, SEO pipeline should check them”), Queues bridge them:

// Content pipeline's publisher writes to a queue
await ctx.env.SEO_QUEUE.send({
  type: 'new_article',
  artifact_id: articleId,
  niche: article.niche,
})

// SEO pipeline's worker consumes the queue
export default {
  async queue(batch: MessageBatch, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      const { artifact_id } = msg.body as { artifact_id: string }
      // Copy artifact reference into SEO pipeline's scope
      await env.DB.prepare(`
        INSERT INTO artifacts (id, format, status, tags, source_ids, created_by, created_at, updated_at)
        VALUES (?, 'data', 'ready', '["seo-check-needed"]', ?, 'queue-consumer', datetime('now'), datetime('now'))
      `).bind(crypto.randomUUID(), JSON.stringify([artifact_id])).run()
      msg.ack()
    }
  },
}

D1 for Everything Queryable

D1 is the source of truth for artifacts, edges, processor configs, and telemetry. SQLite foundation means recursive CTEs work for graph traversal, JSON functions work for tag queries, and it costs nearly nothing.

R2 for Large Content

Article bodies, generated images, and raw research data go in R2. Artifact records in D1 reference R2 keys. D1 rows stay small (metadata only); R2 holds arbitrarily large content.

Workflows for Multi-Step Processors

Some processors need more than 30 seconds of CPU time. Cloudflare Workflows provide multi-step execution with automatic retry:

import { WorkflowEntrypoint, WorkflowStep, WorkflowEvent } from 'cloudflare:workers'

export class ArticleGenerationWorkflow extends WorkflowEntrypoint<Env, ArticleParams> {
  async run(event: WorkflowEvent<ArticleParams>, step: WorkflowStep) {
    const research = await step.do('fetch-research', async () => {
      return await fetchResearch(event.payload.topic)
    })

    const outline = await step.do('generate-outline', async () => {
      return await generateOutline(research)
    })

    const article = await step.do('generate-article', async () => {
      return await generateArticle(outline, research)
    })

    await step.do('store-artifact', async () => {
      await storeArtifact(article)
    })
  }
}

Keyword Research Pipeline

The simplest useful pipeline. A researcher gathers keyword data, a generator creates articles from high-value keywords.

const keywordPipeline: ProcessorConfig[] = [
  { name: 'keyword-researcher',  type: 'deterministic', enabled: true, phase: 'gather',   weight: 0.15, max_cost_per_call: 0.01, batch_size: 20 },
  { name: 'article-generator',   type: 'llm',           enabled: true, phase: 'generate', weight: 0.70, max_cost_per_call: 0.10, batch_size: 3, model_tier: 'expensive' },
  { name: 'artifact-storer',     type: 'deterministic', enabled: true, phase: 'collect',  weight: 0.05, max_cost_per_call: 0,    batch_size: 50 },
]

Data flow:

keyword-researcher: queries DataForSEO → stores keyword data artifacts
article-generator:  queries keyword artifacts → generates articles → stores as drafts
artifact-storer:    moves large content to R2, updates metadata

Content Quality Gate

Generator creates content, reviewer evaluates it, router sends it to publishing or back for revision.

const qualityGatePipeline: ProcessorConfig[] = [
  { name: 'article-generator',  type: 'llm',           enabled: true, phase: 'generate', weight: 0.45, max_cost_per_call: 0.10, batch_size: 3, model_tier: 'expensive' },
  { name: 'content-reviewer',   type: 'llm',           enabled: true, phase: 'review',   weight: 0.15, max_cost_per_call: 0.02, batch_size: 10, model_tier: 'default' },
  { name: 'quality-router',     type: 'deterministic', enabled: true, phase: 'review',   weight: 0.05, max_cost_per_call: 0,    batch_size: 20 },
  { name: 'site-publisher',     type: 'deterministic', enabled: true, phase: 'publish',  weight: 0.05, max_cost_per_call: 0,    batch_size: 10 },
]

The gate is implicit: the reviewer scores articles and updates their status. The publisher only picks up articles with status ready. Articles that score below threshold stay as draft — the generator will improve them on the next cycle.

Social Media Adaptation

Collect engagement data, adapt articles into social posts, review, publish:

const socialPipeline: ProcessorConfig[] = [
  { name: 'engagement-collector', type: 'deterministic', enabled: true, phase: 'gather',  weight: 0.10, max_cost_per_call: 0,    batch_size: 50 },
  { name: 'social-adapter',      type: 'llm',           enabled: true, phase: 'adapt',    weight: 0.30, max_cost_per_call: 0.03, batch_size: 5, model_tier: 'default' },
  { name: 'content-reviewer',    type: 'llm',           enabled: true, phase: 'review',   weight: 0.15, max_cost_per_call: 0.02, batch_size: 10, model_tier: 'default' },
  { name: 'social-publisher',    type: 'deterministic', enabled: true, phase: 'publish',  weight: 0.05, max_cost_per_call: 0,    batch_size: 20 },
]

Full SEO Article Pipeline

End-to-end: research, generate, review, adapt for web, publish, collect engagement:

const seoPipeline: ProcessorConfig[] = [
  // Gather
  { name: 'keyword-researcher',   phase: 'gather',   type: 'deterministic', weight: 0.05, batch_size: 20, enabled: true, max_cost_per_call: 0.01 },
  { name: 'topic-researcher',     phase: 'gather',   type: 'deterministic', weight: 0.10, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
  // Generate
  { name: 'article-generator',    phase: 'generate', type: 'llm', model_tier: 'expensive', weight: 0.35, batch_size: 3, enabled: true, max_cost_per_call: 0.10 },
  // Adapt
  { name: 'seo-optimizer',        phase: 'adapt',    type: 'llm', model_tier: 'default', weight: 0.10, batch_size: 5, enabled: true, max_cost_per_call: 0.03 },
  // Review
  { name: 'content-reviewer',     phase: 'review',   type: 'llm', model_tier: 'default', weight: 0.10, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
  { name: 'seo-reviewer',         phase: 'review',   type: 'llm', model_tier: 'default', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
  // Publish
  { name: 'site-publisher',       phase: 'publish',  type: 'deterministic', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0 },
  // Collect
  { name: 'engagement-collector', phase: 'collect',  type: 'deterministic', weight: 0.05, batch_size: 50, enabled: true, max_cost_per_call: 0 },
  { name: 'analytics-collector',  phase: 'collect',  type: 'deterministic', weight: 0.05, batch_size: 50, enabled: true, max_cost_per_call: 0 },
]

Self-Improving Feedback Loop

The collector feeds engagement data back to the researcher, which informs the next generation cycle. No code change needed — the loop emerges from the data:

Cycle 1: researcher gathers topics → generator creates articles → publisher deploys
Cycle 2: collector gathers engagement → researcher sees "fintech gets 3x clicks"
         → generator prioritizes fintech → better articles
Cycle 3: collector sees even better engagement → researcher deepens fintech research
         → generator produces expert content → ...
// In topic-researcher: use engagement data to prioritize
registerProcessor('smart-topic-researcher', async (ctx) => {
  // Find which niches have the best engagement
  const topNiches = await ctx.db.prepare(`
    SELECT a.niche, AVG(CAST(json_extract(e.properties, '$.clicks') AS REAL)) as avg_clicks
    FROM artifacts e
    JOIN edges ON edges.target_id = e.id
    JOIN artifacts a ON edges.source_id = a.id
    WHERE e.created_by = 'engagement-collector'
      AND a.format = 'text'
    GROUP BY a.niche
    ORDER BY avg_clicks DESC
    LIMIT 5
  `).all()

  const results: ProcessorResult[] = []
  for (const niche of topNiches.results) {
    const research = await deepResearch(niche.niche as string, ctx)
    // Store enhanced research...
    results.push({
      processor: 'smart-topic-researcher',
      entity_type: 'research',
      entity_id: crypto.randomUUID(),
      model_used: null,
      tokens_input: 0, tokens_output: 0,
      cost_usd: 0.005,
      duration_ms: 2000,
      success: true,
      result_summary: `Deep research on ${niche.niche} (avg ${niche.avg_clicks} clicks)`,
    })
  }
  return results
})

A/B Testing Pipeline

Generate two versions with different models, publish both, collect engagement, compare:

const abTestPipeline: ProcessorConfig[] = [
  { name: 'keyword-researcher',   phase: 'gather',   type: 'deterministic', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0.01 },
  // Two generators with different models
  { name: 'article-gen-a',        phase: 'generate', type: 'llm', model_tier: 'expensive', weight: 0.25, batch_size: 3, enabled: true, max_cost_per_call: 0.10 },
  { name: 'article-gen-b',        phase: 'generate', type: 'llm', model_tier: 'default',   weight: 0.15, batch_size: 3, enabled: true, max_cost_per_call: 0.05 },
  { name: 'content-reviewer',     phase: 'review',   type: 'llm', model_tier: 'default',   weight: 0.10, batch_size: 10, enabled: true, max_cost_per_call: 0.02 },
  { name: 'ab-publisher',         phase: 'publish',  type: 'deterministic', weight: 0.05, batch_size: 10, enabled: true, max_cost_per_call: 0 },
  { name: 'engagement-collector', phase: 'collect',  type: 'deterministic', weight: 0.05, batch_size: 50, enabled: true, max_cost_per_call: 0 },
  { name: 'ab-comparator',        phase: 'collect',  type: 'deterministic', weight: 0.05, batch_size: 20, enabled: true, max_cost_per_call: 0 },
]

// The comparator processor
registerProcessor('ab-comparator', async (ctx) => {
  // Find publish records from both generators
  const results_a = await ctx.db.prepare(`
    SELECT p.id as publish_id, a.niche, a.title,
      json_extract(eng.properties, '$.clicks') as clicks,
      json_extract(eng.properties, '$.views') as views
    FROM artifacts a
    JOIN edges e1 ON e1.source_id = a.id AND e1.relation = 'published_to'
    JOIN artifacts p ON p.id = e1.target_id
    JOIN edges e2 ON e2.source_id = p.id
    JOIN artifacts eng ON eng.id = e2.target_id AND eng.created_by = 'engagement-collector'
    WHERE a.created_by = 'article-gen-a'
  `).all()

  const results_b = await ctx.db.prepare(`
    SELECT p.id as publish_id, a.niche, a.title,
      json_extract(eng.properties, '$.clicks') as clicks,
      json_extract(eng.properties, '$.views') as views
    FROM artifacts a
    JOIN edges e1 ON e1.source_id = a.id AND e1.relation = 'published_to'
    JOIN artifacts p ON p.id = e1.target_id
    JOIN edges e2 ON e2.source_id = p.id
    JOIN artifacts eng ON eng.id = e2.target_id AND eng.created_by = 'engagement-collector'
    WHERE a.created_by = 'article-gen-b'
  `).all()

  // Compare by niche
  const a_avg = results_a.results.reduce((s, r) => s + (r.clicks as number), 0) / (results_a.results.length || 1)
  const b_avg = results_b.results.reduce((s, r) => s + (r.clicks as number), 0) / (results_b.results.length || 1)
  const winner = a_avg > b_avg ? 'article-gen-a' : 'article-gen-b'

  // Store comparison result
  const compId = crypto.randomUUID()
  await ctx.db.prepare(`
    INSERT INTO artifacts (id, format, status, title, tags, created_by, properties, created_at, updated_at)
    VALUES (?, 'data', 'ready', ?, '["ab-test", "comparison"]', 'ab-comparator', ?, datetime('now'), datetime('now'))
  `).bind(
    compId,
    `A/B Result: gen-a vs gen-b`,
    JSON.stringify({ winner, a_avg_clicks: a_avg, b_avg_clicks: b_avg, a_samples: results_a.results.length, b_samples: results_b.results.length }),
  ).run()

  return [{
    processor: 'ab-comparator',
    entity_type: 'comparison',
    entity_id: compId,
    model_used: null,
    tokens_input: 0, tokens_output: 0, cost_usd: 0,
    duration_ms: 50,
    success: true,
    result_summary: `Winner: ${winner} (${a_avg.toFixed(1)} vs ${b_avg.toFixed(1)} avg clicks)`,
  }]
})

vs LangChain / LangGraph

LangChain chains are imperative: each step calls the next. LangGraph adds a state machine but steps are still coupled by explicit edges. Change one node and you must update the edges.

Composable processors have no explicit edges. Processors discover work through queries. Add a new processor and it starts finding inputs immediately. Remove one and the rest don’t notice.

LangChain/LangGraphComposable Processors
CouplingExplicit edges between stepsNone — query-based discovery
Adding a stepUpdate chain/graph definitionRegister processor, done
Failure handlingTry/catch per step, retry chainsNext cycle checks again
Cost trackingManual instrumentationBuilt into ProcessorResult
AccumulationEach run starts freshStore compounds over time
SchedulingExternal trigger or streamingSelf-driving Durable Objects
DeploymentPython server (centralized)Cloudflare edge (global, serverless)

When to choose LangChain/LangGraph: Complex agent reasoning with tool use, human-in-the-loop, or building a chatbot. LangGraph excels at multi-turn agent workflows.

When to choose Composable Processors: Production content pipeline that runs autonomously, manages its own budget, adapts to failures, and compounds knowledge over time.

vs Apache Airflow / Prefect

DAG-based orchestrators. All steps and dependencies must be defined upfront. The DAG is the bottleneck — every change touches it.

Airflow/PrefectComposable Processors
OrchestrationCentral DAG schedulerNone — each processor self-schedules
DependenciesDeclared in DAG definitionImplicit via store queries
Adding a stepModify DAG, update dependenciesRegister processor
RuntimeScheduled batch jobsContinuous, each on own cadence
StateTask-level (pass between steps)Persistent store (compounds)
InfrastructureServer-based ($50-500+/mo)Serverless (sub-$5/mo)

When to choose Airflow/Prefect: Complex data engineering workflows, visual DAG editor needed, or already on Kubernetes.

When to choose Composable Processors: Lightweight, self-adaptive system with built-in budget management running at the edge.

vs Temporal Workflows

Durable execution with automatic retries and replay. Powerful, but still imperative — you define a sequence of activities. The workflow function is the coupling point.

TemporalComposable Processors
Execution modelImperative workflow functionIndependent polling loops
DurabilityWorkflow history replayStore persistence (D1/R2)
CouplingActivities called by workflowNone between processors
ScalingWorker pools per task queuePer-processor batch sizing
Cost trackingCustom instrumentationBuilt into every result
InfrastructureTemporal Cloud or self-hosted clusterServerless (Cloudflare Workers)

When to choose Temporal: Guaranteed exactly-once execution, complex saga patterns, or workflows spanning multiple services that must not lose state.

When to choose Composable Processors: Durability benefits without operational complexity. DOs provide single-writer durability; the artifact store provides crash recovery.

vs AWS Step Functions

AWS’s serverless workflow orchestration.

AWS Step FunctionsComposable Processors
DefinitionAmazon States Language (JSON)ProcessorConfig array
ExecutionPush-based (state machine drives)Pull-based (processors query for work)
Pricing$0.025 per 1,000 transitionsWorkers pricing (~$0.50/M requests)
LatencyRegion-boundEdge (global, < 50ms)
AdaptivityManual (Choice states + CloudWatch)Built-in (success-rate driven)
Budget managementNot built-inPer-processor weight allocation

When to choose Step Functions: Already in AWS ecosystem, need visual workflow editing, native Lambda/SQS/DynamoDB integration.

When to choose Composable Processors: Edge execution, sub-cent pricing, adaptive scheduling, simpler programming model.

vs Simple Sequential Scripts

The honest comparison. Many AI content pipelines start as a single script:

const research = await callPerplexity(topic)
const article = await callGemini(research)
const review = await callClaude(article)
if (review.score > 0.7) await publishToSite(article)
Sequential ScriptComposable Processors
SimplicityDead simpleMore upfront structure
ResumabilityStart over on failureCrash-safe (artifacts persist)
Cost trackingManual or noneBuilt-in per-processor
AdaptivityManual tuningAutomatic
ReusabilityCopy-pasteProcessors compose freely
Feedback loopsRequire explicit wiringEmerge from data

When to choose the script: Prototyping, fewer than three steps, or running manually once a week. Don’t over-engineer.

When to choose Composable Processors: Pipeline has grown past 3-4 steps, needs to run autonomously, cost matters, or multiple pipelines share processors.

vs Microservices with REST APIs

Microservices + RESTComposable Processors
CouplingCoupled by API contractsDecoupled by data (artifact store)
Failure handlingCascading failures, circuit breakers neededProcessor retries independently
DiscoveryService registry / DNSQuery the store
ScalingPer-service replicasPer-processor batch size
DeploymentN services, N deploymentsSingle Worker + DO
LatencyNetwork hops between servicesIn-process function calls

When to choose microservices: Processors in different languages, different teams, or need independent scaling beyond what a single Worker provides.

When to choose Composable Processors: All TypeScript, small team, and the overhead of N separate deployments isn’t justified.


Processors That Know About Other Processors

The most common mistake. A generator that imports and calls a reviewer, or a publisher that triggers a collector.

// BAD: Generator calls reviewer directly
registerProcessor('article-generator', async (ctx) => {
  const article = await generate(ctx)
  const review = await getProcessor('content-reviewer')!(reviewCtx) // Don't do this
  if (review.score < 0.7) return regenerate(ctx)
})

// GOOD: Generator writes to store. Reviewer finds it on its own schedule.
registerProcessor('article-generator', async (ctx) => {
  const article = await generate(ctx)
  await storeArtifact(article, { status: 'draft' })
  return [/* result */]
})

Putting Business Logic in the Orchestrator

The control plane governs when and with what budget. It should not contain business logic about what to do.

// BAD: Business logic in the harness
async tick() {
  if (article.niche === 'fintech') {
    await runProcessor('premium-generator', article)
  } else {
    await runProcessor('basic-generator', article)
  }
}

// GOOD: Processor decides its own logic
registerProcessor('smart-generator', async (ctx) => {
  const model = ctx.selectModel() // Control plane handles model routing
  // Generator decides its own approach based on artifact data
})

Artifacts Without Metadata (Unlabeled Ingredients)

Storing content without rich metadata makes the store useless. Future processors can’t find what they need.

// BAD: Minimal metadata — impossible to query meaningfully
await db.prepare(`
  INSERT INTO artifacts (id, format, status, created_by, created_at, updated_at)
  VALUES (?, 'text', 'ready', 'generator', datetime('now'), datetime('now'))
`).bind(id).run()

// GOOD: Rich metadata enables discovery by any future processor
await db.prepare(`
  INSERT INTO artifacts
    (id, format, status, title, summary, tags, niche, voice, word_count,
     created_by, source_ids, properties, created_at, updated_at)
  VALUES (?, 'text', 'ready', ?, ?, ?, ?, ?, ?,
          'generator', ?, ?, datetime('now'), datetime('now'))
`).bind(id, title, summary, tags, niche, voice, wordCount, sourceIds, properties).run()

Skipping the Review Step

LLMs produce confident-sounding garbage regularly. A review step (even a cheap one) catches hallucinations, off-brand content, and structural problems before they reach your audience. The review step also creates valuable feedback data that improves generators over time.

Monolithic Processors That Do Too Much

A processor that researches, generates, reviews, and publishes in one function is just a sequential script with extra steps.

// BAD: One processor does everything
registerProcessor('do-everything', async (ctx) => {
  const research = await fetchResearch()
  const article = await generate(research)
  const review = await reviewArticle(article)
  if (review.passed) await publish(article)
  return [/* result */]
})

// GOOD: Four processors, each with one job
registerProcessor('researcher', async (ctx) => { /* gather */ })
registerProcessor('generator', async (ctx) => { /* create */ })
registerProcessor('reviewer', async (ctx) => { /* evaluate */ })
registerProcessor('publisher', async (ctx) => { /* deploy */ })

Tightly Coupled Processor Ordering

If your pipeline breaks when you reorder processors, they’re too coupled. Each processor should query the artifact store, not assume a specific predecessor.

// BAD: Assumes keyword-researcher ran in the previous tick
registerProcessor('article-generator', async (ctx) => {
  const keywords = ctx.cycle.keyword_results // Coupled to cycle state
})

// GOOD: Queries the store for what's available
registerProcessor('article-generator', async (ctx) => {
  const keywords = await ctx.db.prepare(`
    SELECT * FROM artifacts
    WHERE format = 'data' AND created_by = 'keyword-researcher' AND status = 'ready'
    LIMIT ?
  `).bind(ctx.processor.batch_size).all()
  // Works regardless of when or whether the researcher ran
})

Labeling Artifacts by Purpose

// BAD: Artifact knows it's a "blog post"
{ format: 'text', tags: ['blog-post'], destination: 'website' }

// GOOD: Artifact describes what it IS, not what it's FOR
{ format: 'text', tags: ['productivity', 'tools'], niche: 'productivity-apps', word_count: 1200, voice: 'professional' }
// The publisher decides this is a blog post. An adapter could also make it an email body.

Using Events Between Processors

// BAD: Events create coupling
eventBus.on('article-published', () => collectEngagement()) // Publisher knows collector exists

// GOOD: Poll-based discovery
// Collector queries store: "any published artifacts I haven't collected engagement for?"
// Publisher has no idea collectors exist. Collectors have no idea publishers exist.

Cloudflare Platform

Orchestration & Workflow Systems

Architecture & Patterns

MIT


Edit page
Share this post on:

Previous Post
Autonomous Agent Frameworks
Next Post
Durable Object Rate Limiting