Skip to content
Gary Wu
Go back

Inversion of Control in Data Pipeline Architecture

Edit page

The principle: Don’t hardcode how functions are called, where they run, or who calls them. Inject all of these decisions. The same code runs in a test, locally, on the cloud, or peer-to-peer without modification.

This article explains Inversion of Control (Inversion of Coupling Control) through the lens of building distributed data systems, using the Replicator / Reconciler architecture as a concrete example.


Table of Contents

Open Table of Contents

The Problem: Tight Coupling in Method Calls

Consider a traditional data scanning architecture:

// ❌ BAD: Tightly coupled
class DataRunner {
  async scan(path: string) {
    const files = fs.readdirSync(path)  // Hardcoded I/O
    const hashes = await computeHashes(files)  // Hardcoded computation

    // Push results to cloud directly
    const response = await fetch('https://cloud.example.com/sync', {
      method: 'POST',
      body: JSON.stringify({ results: hashes })
    })
    return response.json()
  }
}

// Problems:
// - Can't test without cloud connectivity
// - Can't run locally without HTTP client
// - Can't change communication protocol without rewriting
// - Can't run same code on both local and cloud

This monolithic approach creates five distinct coupling points:

  1. Argumentsscan(path) assumes you’re passing strings
  2. Method namesscan, computeHashes, fetch are hardcoded
  3. Return types — Assumes HTTP response is returned
  4. Exceptions — Network errors are thrown directly
  5. Execution context — Assumes it runs in a Node environment with fs access

The system can’t evolve without breaking everything downstream.


The Five Aspects of Coupling

Based on Office Control’s framework, method calls create coupling in five dimensions:

1. Argument Coupling → Dependency Injection

Problem: Methods take specific types (strings, objects, IDs). Solution: Inject dependencies instead of passing them.

// ❌ Coupled
async function scanDirectory(path: string) { }

// ✅ Inverted
async function scanDirectory(deps: { fileSystem: FileSystem, scanner: Scanner }) { }

2. Method Name Coupling → Continuation Injection

Problem: You hardcode which function to call (scan, enumerate, list). Solution: Inject the function itself (continuation) — don’t name it.

// ❌ Coupled
if (operation === 'scan') {
  await scan(data)
} else if (operation === 'enumerate') {
  await enumerate(data)  // Hard to add new operations
}

// ✅ Inverted
const operation = injectedOperation  // Function is passed in
await operation(data)  // No hardcoded names

3. Return Type Coupling → State Injection

Problem: Methods return specific types (JSON, HTTP response). Solution: Inject a handler that receives the result.

// ❌ Coupled
async function scan(): Promise<ScanResult[]> {
  return results  // Hardcoded return shape
}

// ✅ Inverted
async function scan(deps: { onResult: (r: any) => void }) {
  deps.onResult(results)  // Caller decides what to do with results
}

4. Exception Coupling → Error Handler Injection

Problem: Methods throw exceptions; callers must catch them. Solution: Inject error handlers.

// ❌ Coupled
try {
  await scan()
} catch (error) {
  logger.error(error)  // Hardcoded error handling
}

// ✅ Inverted
async function scan(deps: { onError: (e: Error) => void }) {
  try { /* ... */ }
  catch (e) { deps.onError(e) }  // Caller decides error handling
}

5. Execution Context Coupling → Execution Injection

Problem: Methods assume they run in specific environments (Node, Browser, Cloudflare). Solution: Inject the execution context (file system, HTTP client, etc.).

// ❌ Coupled
async function scan(path: string) {
  const files = fs.readdirSync(path)  // Assumes Node.js fs module
  const response = await fetch(url)   // Assumes fetch exists
}

// ✅ Inverted
async function scan(
  path: string,
  deps: { filesystem: FileSystem, http: HttpClient }
) {
  const files = await deps.filesystem.list(path)
  const response = await deps.http.post(url, data)
}

Inversion of Coupling Control: The Solution

Inversion of Control means:

Instead of your function deciding how to call other functions, pass those functions in. Instead of hardcoding where it runs, inject the environment. The function becomes a pure description of logic; the runtime becomes pluggable.

The result: Same code, different runtimes.

// The core logic — no hardcoded calls, no hardcoded environment
async function reconcile(
  scans: ScanResult[],
  deps: {
    dedup: DedupService,
    confidence: ConfidenceService,
    storage: StorageLayer
  }
): AsyncIterable<DedupPlan> {
  for (const scan of scans) {
    const duplicates = await deps.dedup.find(scan)
    const plans = await deps.confidence.score(duplicates)
    yield deps.storage.save(plans)
  }
}

// Usage 1: TEST (in-memory)
await reconcile(testScans, {
  dedup: new InMemoryDedup(),
  confidence: new MockConfidence(),
  storage: new MemoryStorage()
})

// Usage 2: LOCAL (SQLite + logging)
await reconcile(localScans, {
  dedup: new SQLiteDedup(db),
  confidence: new ConfidenceService(db),
  storage: new FileSystemStorage('./plans')
})

// Usage 3: CLOUD (D1 + R2)
await reconcile(cloudScans, {
  dedup: new D1Dedup(d1),
  confidence: new CloudflareAIConfidence(),
  storage: new R2Storage(r2)
})

// Usage 4: P2P (between two runners)
await reconcile(peerScans, {
  dedup: new PeerDedup(otherRunner),
  confidence: new LocalConfidence(),
  storage: new PeerStorage(otherRunner)
})

Same function, four different deployments. No rewrites. No branching. Just injection.


Replicator Architecture: A Real Example

The Replicator system applies this principle across a distributed data pipeline:

The Three-Layer Design

LAYER 3: Testable Core (Pure Functions + Injection)
├─ core/scanning.ts      — enumerate, tree-hash (algorithms)
├─ core/dedup.ts         — find duplicates, scoring
├─ core/reconcile.ts     — cross-device reconciliation
└─ core/types.ts         — shared data structures

LAYER 2: Service Wrappers (Dependency Resolution)
├─ runner/service.ts     — wraps core/scanning for local use
├─ reconciler/service.ts — wraps core/dedup for analysis
└─ All services: async functions that don't care where they run

LAYER 1: Runtime Adapters (How You Invoke It)
├─ runner/cli.ts         — local: `npm run enumerate`
├─ runner/http.ts        — HTTP: POST /enumerate
├─ reconciler/cli.ts     — local: `npm run reconcile`
├─ reconciler/http.ts    — HTTP: POST /reconcile
└─ reconciler/p2p.ts     — peer-to-peer: direct Runner↔Runner

Example: The enumerate Function

// CORE: Pure logic, fully testable, completely generic
export async function* enumerate(
  path: string,
  deps: {
    filesystem: FileSystem,
    hasher: Hasher,
    db: Database
  }
): AsyncIterable<EnumerateResult> {
  const stack = [path]

  while (stack.length > 0) {
    const current = stack.pop()!
    const children = await deps.filesystem.list(current)

    for (const child of children) {
      const treeHash = await deps.hasher.computeTreeHash(children)
      yield { path: child.path, treeHash, childCount: children.length }

      if (child.isDirectory) {
        stack.push(child.path)
      }
    }
  }
}

// RUNNER SERVICE: Wraps core logic for the local machine
export function createRunnerService(db: Database) {
  return {
    async *enumerate(path: string) {
      const filesystem = new LocalFileSystem()
      const hasher = new BLAKEHasher()

      yield* enumerate(path, { filesystem, hasher, db })
    }
  }
}

// RUNNER CLI: Local invocation
const service = createRunnerService(db)
for await (const result of service.enumerate('/Volumes/Samsung4T')) {
  console.log(result)
}

// RUNNER HTTP: Remote invocation
app.post('/enumerate', async (req, res) => {
  const service = createRunnerService(db)
  const results = []
  for await (const result of service.enumerate(req.body.path)) {
    results.push(result)
  }
  res.json(results)
})

// RECONCILER: Cloud processing (same core logic, different deps)
const reconcilerService = {
  async *reconcile(scans: EnumerateResult[]) {
    const dedup = new D1Dedup(d1)  // Cloud storage
    const confidence = new CloudflareAIConfidence()
    const storage = new R2Storage(r2)

    yield* reconcileCore(scans, { dedup, confidence, storage })
  }
}

// TEST: Same code, memory only
const testService = {
  async *enumerate(path: string) {
    const filesystem = new MockFileSystem(mockData)
    const hasher = new MockHasher()
    const db = new MemoryDatabase()

    yield* enumerate(path, { filesystem, hasher, db })
  }
}

const results = []
for await (const result of testService.enumerate('/test')) {
  results.push(result)
}

The Pattern in Action

  1. Test environment: Everything in memory, instant feedback

    npm test  # runner + reconciler in same process
  2. Development environment: Local runner, local reconciler (SQLite)

    npm run enumerate -- /path
    npm run reconcile -- scan.json
  3. Single-machine environment: One machine handles both scanning and reconciliation

    node runner/cli.ts  # OR node runner/http-server.ts
    node reconciler/cli.ts
  4. Cloud environment: Runner on local machine, reconciler on Cloudflare Workers

    node runner/http-server.ts  # Listens on :7850
    # Deploy reconciler/ to Workers with D1 + R2
  5. P2P environment: Two machines communicate directly

    const peerReconciler = {
      dedup: new RemoteDedup('runner@192.168.1.100:7850'),
      storage: new PeerStorage('runner@192.168.1.100:7850')
    }

Not one line of application code changes.


From Monolithic to Distributed (Without Rewriting)

Traditional approach:

Week 1-2: Build monolithic system
Week 3-4: Discover it won't scale
Week 5-8: Rewrite everything for distributed architecture

With Inversion of Control:

Week 1-2: Build pure core functions (core/), testable without external dependencies
Week 3: Wrap them in local services (runner/, reconciler/)
Week 4: Test everything in a single process
Week 5: Deploy runner to local machine, reconciler to cloud
  → Zero application code changes
  → Same test suite passes
  → Same business logic runs

The architecture supports all deployment modes from day one.


The Benefits

1. Testability Without Infrastructure

// Test the entire pipeline in a unit test
const testScans = [{ path: '/test', treeHash: 'abc' }]
const results = []

for await (const plan of reconciler.reconcile(testScans)) {
  results.push(plan)
}

expect(results).toHaveLength(1)
expect(results[0].dedupGroups).toEqual([...])

No Docker, no cloud account, no database setup.

2. Deployment Flexibility

Same code runs in:

3. Zero Breaking Changes During Evolution

// Old deployment
const runner = createRunnerService(db, logger)

// New deployment with caching
const runner = createRunnerService(db, logger, { cache: redisCache })

// Same code works; caller doesn't change
for await (const result of runner.enumerate(path)) { }

4. Independent Team Development

5. Progressive Adoption

// Day 1: Run locally
const result = await reconcile(scans, localDeps)

// Day 10: Add caching
const result = await reconcile(scans, { ...localDeps, cache: redisCache })

// Day 20: Migrate to cloud storage
const result = await reconcile(scans, { ...localDeps, storage: d1Storage })

// No application code changes; only deps change

How Office Control Enables This

The Office Control framework provides the toolkit for expressing these patterns cleanly:

  1. ManagedFunction — Base type for injectable functions
  2. Container — Registry for resolving dependencies
  3. Continuation Injection — Functions as first-class values
  4. Error Handler Injection — Central error policies

For Replicator, you don’t necessarily need the full Office Control framework—you can implement these patterns with plain TypeScript. But Office Control provides:


Conclusion: Function Injection as the Primitive

The principle is simple:

Never hardcode dependencies, method names, return shapes, error handling, or execution context. Always inject them.

This transforms your system from a rigid monolith into a flexible, testable, deployable architecture that:

The Replicator architecture demonstrates this at scale: the same enumerate() and reconcile() functions power:

All without branching, all without specialized versions, all from one canonical implementation.

This is the power of Inversion of Control.


References


Status: Published 2026-03-27 Last Updated: 2026-03-27 Author: Gary Wu


Edit page
Share this post on:

Previous Post
Cloudflare Service Binding Pitfalls
Next Post
Prime: A Conversational Control Plane