Skip to content
Gary Wu
Go back

Layered Architecture for Distributed Data Systems

Edit page

The Problem: When you build a data runner that needs to work across multiple devices (MacBook, NAS, cloud), you face a choice: either duplicate business logic for each runtime, or abstract I/O away from the core algorithms.

The Solution: A three-layer architecture—pure functions, adapters, and orchestrators—that makes your data pipeline testable, deployable to any runtime, and independent enough that multiple teams can work on different parts without constant coordination.

This article walks through the architecture pattern used in Replicator (a multi-device data indexer), shows why it beats monolithic approaches, and provides copy-paste ready code.

Table of Contents

Open Table of Contents

The Monolithic Problem

Imagine building a file indexing system. You need to:

  1. Walk directories and catalog files
  2. Deduplicate by content hash
  3. Classify files into categories
  4. Apply consolidation strategies
  5. Query and return results

In a traditional monolithic design, your code looks like this:

// The monolithic approach (DON'T do this)
async function indexDirectory(path: string, dbPath: string) {
  // Open the database
  const db = new Database(dbPath);

  // Walk the filesystem (tightly coupled to real I/O)
  const entries = fs.readdirSync(path);

  for (const entry of entries) {
    const stat = fs.statSync(path + '/' + entry);

    // Call database directly (business logic tangled with I/O)
    const hash = computeHash(fs.readFileSync(path + '/' + entry));
    db.prepare('INSERT INTO files VALUES (?, ?, ?)').run(
      entry,
      hash,
      stat.size
    );
  }

  // Run dedup queries (tightly coupled to one database backend)
  const dupes = db.prepare(
    'SELECT * FROM files WHERE hash = ? GROUP BY hash HAVING COUNT(*) > 1'
  ).all();

  // Process results
  for (const group of dupes) {
    // Apply consolidation logic (mixed with query logic)
  }

  db.close();
}

Problems with this approach:

  1. Untestable: You can’t test without a real filesystem and database
  2. One deployment path: Works locally with SQLite, but how do you:
    • Run on cloud infrastructure (use D1 instead of SQLite)?
    • Deploy as a worker (emit events instead of writing DB)?
    • Share logic between platforms (Node, Rust, Python)?
  3. Tightly coupled: Business logic is intertwined with I/O specifics
  4. Hard to parallelize: No clear boundaries between what can run in parallel vs. what needs coordination
  5. Team friction: One person can’t improve hashing without coordinating with someone working on dedup

The core issue: your algorithm is imprisoned in a specific technology stack.


Three-Layer Architecture

The solution is to separate three concerns:

┌─────────────────────────────────────────────────────────────────┐
│ LAYER 3: ORCHESTRATORS                                          │
│ (Integrate all layers, provide runtime-specific entry points)  │
│                                                                 │
│  - createLocalOrchestrator(db)                                 │
│  - createHttpOrchestrator()                                    │
│  - createTestOrchestrator(mockFs, mockDb)                      │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ LAYER 2: ADAPTERS (Dependency Injection)                        │
│ (Implement interfaces for database, filesystem, network, etc.)  │
│                                                                 │
│  - LocalFileSystem → filesystem                                │
│  - SQLiteRepository → database queries                         │
│  - MockFileSystem → testing                                    │
│  - CloudFileSystem (D1) → alternative backend                  │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ LAYER 1: PURE FUNCTIONS                                         │
│ (Algorithm that depends only on injected interfaces)            │
│                                                                 │
│  - enumerateTree(rootPath, deps, emit)                         │
│  - reconcile(scans, deps)                                      │
│  - classify(files, deps)                                       │
│                                                                 │
│ Properties:                                                     │
│  - Same input → always same output                             │
│  - No side effects (I/O happens through deps)                  │
│  - Emits events for caller to handle                           │
└─────────────────────────────────────────────────────────────────┘

Key principle: Dependency injection flows down, data flows up.


Layer 1: Pure Functions

The bottom layer contains pure algorithms with no I/O. All side effects come through a dependency injection interface.

Example: Pure Enumeration

/**
 * Pure BFS enumeration algorithm.
 * Completely decoupled from database and filesystem implementations.
 *
 * All operations come from injected `deps` interface.
 */
export interface EnumerationDeps {
  /** Get file/directory attributes (stat) */
  stat: (path: string) => Promise<FileStat | null>;

  /** List directory contents */
  listDir: (path: string) => Promise<DirEntry[]>;

  /** Query database for mtime-skip optimization */
  getDirScanStatus: (path: string) =>
    Promise<{ id: number; mtime_ms: number } | null>;

  /** Get known subdirectories (for mtime skip) */
  getKnownSubdirs: (path: string) => Promise<string[]>;

  /** Pattern matching for directory auto-labels */
  matchDirPattern: (name: string) => { label: string } | null;

  /** Maximum directories to visit */
  maxDirs?: number;

  /** Whether to follow symlinks */
  followSymlinks?: boolean;
}

/**
 * Pure enumeration: BFS walk that emits events.
 *
 * Same input → same output, always.
 * No database writes, no direct filesystem calls.
 */
export async function* enumerateTree(
  rootPath: string,
  volumeId: number,
  deps: EnumerationDeps,
  emit: (event: EnumerateEvent) => void,
): AsyncIterable<void> {
  let dirsVisited = 0;
  let filesFound = 0;
  let errors = 0;
  const t0 = Date.now();

  // Seed root directory
  const rootStat = await deps.stat(rootPath);
  if (!rootStat) {
    emit({
      type: 'error',
      path: rootPath,
      message: 'Cannot stat root',
    });
    return;
  }

  emit({
    type: 'dir',
    path: rootPath,
    volumeId,
    ino: rootStat.ino,
    mtimeMs: rootStat.mtimeMs,
  });

  // BFS: queue of directories to visit
  const queue: string[] = [rootPath];

  while (queue.length > 0 && dirsVisited < (deps.maxDirs ?? Infinity)) {
    const dirPath = queue.shift()!;

    // --- Optimization: mtime-based skip ---
    // If directory hasn't changed, reuse cached listing
    const existing = await deps.getDirScanStatus(dirPath);
    const dirStat = await deps.stat(dirPath);

    if (
      existing &&
      existing.mtime_ms !== null &&
      Math.abs(existing.mtime_ms - dirStat.mtimeMs) <= 1
    ) {
      // Children are already indexed. Re-queue subdirectories.
      const subdirs = await deps.getKnownSubdirs(dirPath);
      for (const subdir of subdirs) {
        queue.push(subdir);
      }
      dirsVisited++; // Still count as visited for progress tracking
      continue;
    }

    // --- List directory ---
    const entries = await deps.listDir(dirPath);
    dirsVisited++;

    // Check for pattern-based auto-labels
    const dirName = dirPath.split('/').pop() || dirPath;
    const patternMatch = deps.matchDirPattern(dirName);
    if (patternMatch) {
      emit({
        type: 'dir_label',
        path: dirPath,
        label: patternMatch.label,
      });
    }

    // Process each entry
    for (const entry of entries) {
      const entryPath = `${dirPath}/${entry.name}`;
      const entryStat = await deps.stat(entryPath);

      if (!entryStat) {
        errors++;
        continue;
      }

      if (entry.isDirectory) {
        // Queue subdirectory for BFS
        emit({
          type: 'dir',
          path: entryPath,
          volumeId,
          ino: entryStat.ino,
          mtimeMs: entryStat.mtimeMs,
        });
        queue.push(entryPath);
      } else if (entry.isFile) {
        // Emit file discovery
        emit({
          type: 'file',
          path: entryPath,
          volumeId,
          ino: entryStat.ino,
          size: entryStat.size,
          mtimeMs: entryStat.mtimeMs,
        });
        filesFound++;
      }
    }

    emit({
      type: 'dir_done',
      path: dirPath,
      mtimeMs: dirStat.mtimeMs,
    });
  }

  // Emit final stats
  emit({
    type: 'done',
    dirsVisited,
    filesFound,
    errors,
    elapsedMs: Date.now() - t0,
  });
}

Key properties of pure functions:


Layer 2: Adapters (Dependency Injection)

Adapters implement the EnumerationDeps interface for different backends. The same enumerateTree function works with any adapter.

Local Filesystem Adapter

/**
 * Create a real filesystem adapter for local enumeration.
 */
export function createLocalFileSystem(): Omit<
  EnumerationDeps,
  'getDirScanStatus' | 'getKnownSubdirs' | 'matchDirPattern'
> {
  return {
    async stat(path: string) {
      const result = safeStat(path);
      if (!result) return null;
      return {
        ino: result.ino,
        size: result.size,
        mtimeMs: result.mtimeMs,
        nlink: result.nlink,
        isDirectory: result.isDirectory,
        isFile: !result.isDirectory,
      };
    },

    async listDir(path: string) {
      const entries = safeReaddir(path); // safe: handles errors gracefully
      return entries.map((e) => ({
        name: e.name,
        isDirectory: e.isDirectory,
        isFile: !e.isDirectory,
        isSymbolicLink: e.isSymbolicLink,
        isValidUtf8: e.isValidUtf8,
      }));
    },
  };
}

Database Repository Adapter

/**
 * Create a database repository for mtime-skip queries.
 */
export function createScanRepository(db: Database) {
  return {
    async getDirScanStatus(path: string) {
      return db
        .prepare(
          `SELECT id, mtime_ms, scan_status FROM directories
           WHERE path = ?`
        )
        .get(path) as { id: number; mtime_ms: number | null; scan_status: string } | undefined;
    },

    async getKnownSubdirs(path: string) {
      const rows = db
        .prepare(`SELECT path FROM directories WHERE parent_path = ?`)
        .all(path) as { path: string }[];
      return rows.map((r) => r.path);
    },
  };
}

Mock Filesystem for Testing

/**
 * Create an in-memory mock filesystem for testing.
 * No real I/O, no database needed.
 */
export function createMockFileSystem(
  structure: Record<string, string[]> // map of paths to entry names
) {
  let fileCounter = 1000;

  return {
    async stat(path: string) {
      // Check if this path exists in our structure
      if (structure[path]) {
        return {
          ino: fileCounter++,
          size: 4096,
          mtimeMs: Date.now(),
          nlink: 1,
          isDirectory: true,
          isFile: false,
        };
      }

      // Check if path is a file in parent directory
      for (const [dirPath, entries] of Object.entries(structure)) {
        const fileName = path.split('/').pop();
        if (entries.includes(fileName!)) {
          return {
            ino: fileCounter++,
            size: Math.random() * 100000,
            mtimeMs: Date.now(),
            nlink: 1,
            isDirectory: false,
            isFile: true,
          };
        }
      }

      return null; // Path doesn't exist
    },

    async listDir(path: string) {
      if (!structure[path]) return [];

      return structure[path].map((name) => ({
        name,
        isDirectory: name.endsWith('/'),
        isFile: !name.endsWith('/'),
        isSymbolicLink: false,
        isValidUtf8: true,
      }));
    },
  };
}

Benefit of adapters: Change the implementation without changing the algorithm. Swap SQLite for D1, local filesystem for cloud storage, real I/O for mocks—the pure function doesn’t care.


Layer 3: Orchestrators

Orchestrators wire together pure functions, adapters, and I/O to create complete, deployable features.

Local Orchestrator

/**
 * Local enumeration orchestrator.
 * Wires pure enumerate to real database and filesystem.
 */
export function createLocalOrchestrator(db: Database) {
  const dbRepo = createScanRepository(db);
  const fs = createLocalFileSystem();

  return {
    async *enumerate(
      rootPath: string,
      options?: { maxDirs?: number; followSymlinks?: boolean }
    ): AsyncIterable<EnumerateEvent> {
      const deps: EnumerationDeps = {
        stat: fs.stat,
        listDir: fs.listDir,
        getDirScanStatus: dbRepo.getDirScanStatus.bind(dbRepo),
        getKnownSubdirs: dbRepo.getKnownSubdirs.bind(dbRepo),
        matchDirPattern: (name) => {
          // Load patterns from database
          const pattern = db
            .prepare(`SELECT label FROM dir_patterns WHERE pattern LIKE ?`)
            .get(`%${name}%`) as { label: string } | undefined;
          return pattern ? { label: pattern.label } : null;
        },
        maxDirs: options?.maxDirs,
        followSymlinks: options?.followSymlinks,
      };

      const events: EnumerateEvent[] = [];
      const emitter = (event: EnumerateEvent) => {
        events.push(event);
      };

      yield* enumerateTree(rootPath, 1, deps, emitter);
      yield* events;
    },
  };
}

Test Orchestrator (No Real I/O)

/**
 * Test orchestrator using in-memory mock filesystem.
 */
export function createTestOrchestrator(
  mockFs: Omit<EnumerationDeps, 'getDirScanStatus' | 'getKnownSubdirs'>
) {
  return {
    async *enumerate(
      rootPath: string,
      options?: { maxDirs?: number }
    ): AsyncIterable<EnumerateEvent> {
      const deps: EnumerationDeps = {
        stat: mockFs.stat,
        listDir: mockFs.listDir,
        getDirScanStatus: async () => null, // No caching in tests
        getKnownSubdirs: async () => [],
        matchDirPattern: () => null,
        maxDirs: options?.maxDirs,
        followSymlinks: false,
      };

      const events: EnumerateEvent[] = [];
      const emitter = (event: EnumerateEvent) => events.push(event);

      yield* enumerateTree(rootPath, 1, deps, emitter);
      yield* events;
    },
  };
}

HTTP Worker Orchestrator

/**
 * Orchestrator for running as a cloud worker.
 * Reads input from stdin, emits results as NDJSON.
 */
export async function createWorkerOrchestrator(dbPath: string) {
  const db = new Database(dbPath, { readonly: true });
  const dbRepo = createScanRepository(db);
  const fs = createLocalFileSystem();

  return {
    async *enumerate(rootPath: string) {
      const deps: EnumerationDeps = {
        stat: fs.stat,
        listDir: fs.listDir,
        getDirScanStatus: dbRepo.getDirScanStatus.bind(dbRepo),
        getKnownSubdirs: dbRepo.getKnownSubdirs.bind(dbRepo),
        matchDirPattern: () => null,
      };

      // Emit each event as NDJSON on stdout
      yield* enumerateTree(rootPath, 1, deps, (event) => {
        process.stdout.write(JSON.stringify(event) + '\n');
      });
    },
  };
}

Real Examples from Replicator

Replicator uses this three-layer pattern for multiple operations:

Enumerate

Pure:        enumerateTree() in layers/pure/scan/enumerate.ts
Adapters:    createLocalFileSystem(), createScanRepository()
Orchestrator: createScanOrchestrator(db)

Reconcile (Deduplication)

Pure:        reconcile() in layers/pure/reconcile/
Adapters:    SQLite duplicate queries, confidence scoring
Orchestrator: createLocalReconciler(db)

Classify (AI categorization)

Pure:        classifyFiles() in layers/pure/classifier/
Adapters:    createOllamaProvider(), createClaudeProvider()
Orchestrator: createClassifyOrchestrator(db, aiProvider)

Each follows the same pattern: pure algorithm + injected dependencies + runtime-specific orchestrator.


Why This Matters

1. Testability

Without this pattern, testing requires:

With layers, unit tests run in milliseconds:

// Test with no I/O
it('enumerates a mock directory tree', async () => {
  const mockFs = createMockFileSystem({
    '/test': ['a.txt', 'subdir/'],
    '/test/subdir': ['b.txt', 'c.txt'],
  });

  const orch = createTestOrchestrator(mockFs);
  const events: EnumerateEvent[] = [];

  for await (const event of orch.enumerate('/test')) {
    events.push(event);
  }

  expect(events).toContainEqual(
    expect.objectContaining({ type: 'file', name: 'a.txt' })
  );
  expect(events).toContainEqual(
    expect.objectContaining({ type: 'file', name: 'b.txt' })
  );
  expect(events).toContainEqual(
    expect.objectContaining({ type: 'done' })
  );
});

No fixtures. No disk access. Just pure logic with mocked dependencies.

2. Multi-Runtime Support

The same enumerateTree() function runs as:

Same algorithm. Different orchestrators.

3. Parallel Development

Teams can work independently:

As long as the EnumerationDeps interface stays stable, changes in one layer don’t block others.

4. Testability of Complex Workflows

Full pipeline tests become practical:

// Test entire enumerate → reconcile → classify flow
it('processes files end-to-end', async () => {
  const mockFs = createMockFileSystem({
    '/data': ['file1.jpg', 'file2.jpg', 'file3.txt'],
  });
  const mockDb = createInMemoryDatabase();
  const mockAI = createMockAIProvider();

  const scanOrch = createTestOrchestrator(mockFs);
  const reconcileOrch = createTestReconciler();
  const classifyOrch = createTestClassifier(mockAI);

  // Run enumeration
  const enumEvents = [];
  for await (const e of scanOrch.enumerate('/data')) {
    enumEvents.push(e);
  }

  // Run reconciliation
  const scans = eventsToDocs(enumEvents);
  const plans = [];
  for await (const p of reconcileOrch.reconcile(scans)) {
    plans.push(p);
  }

  // Run classification
  const files = plansToFiles(plans);
  const classified = [];
  for await (const c of classifyOrch.classify(files)) {
    classified.push(c);
  }

  // All in-memory, no disk access, runs in milliseconds
  expect(classified).toHaveLength(3);
  expect(classified).toContainEqual(
    expect.objectContaining({ path: '/data/file1.jpg', category: 'image' })
  );
});

Comparison with Alternatives

vs. DI Frameworks (Spring, Nestjs)

DI frameworks like Spring or Nest inject dependencies at runtime. This approach is simpler:

// With Spring (lots of boilerplate)
@Component
class FileSystemAdapter implements FileSystem {
  stat(path) { /* ... */ }
}

@Component
class ScanRepository implements Repository {
  constructor(private db: Database) {}
  getDirScanStatus(path) { /* ... */ }
}

@Service
class EnumerationService {
  constructor(
    private fs: FileSystem,
    private repo: Repository
  ) {}
  enumerate(path) { /* ... */ }
}

// vs. Plain functions (what we're doing)
const deps = {
  stat: fs.stat,
  listDir: fs.listDir,
  getDirScanStatus: repo.getDirScanStatus,
  getKnownSubdirs: repo.getKnownSubdirs,
};
yield* enumerateTree(path, 1, deps, emit);

Plain function injection is:

vs. Monolithic “Everything in One Process”

Monolithic approaches tangle algorithm with I/O:

// Monolithic: algorithm can't be tested alone
async function enumerate(path: string, db: Database) {
  const entries = fs.readdirSync(path);
  for (const e of entries) {
    db.prepare('INSERT INTO files VALUES (?)').run(e);
  }
}

// Layered: algorithm is testable
async function* enumerateTree(path: string, deps: Deps, emit) {
  const entries = await deps.listDir(path);
  for (const e of entries) {
    emit({ type: 'file', name: e });
  }
}

vs. Microservices

Microservices separate concerns at the network level. Layers separate concerns at the code level:

Microservices:
  HTTP Scanner → RabbitMQ → HTTP Dedup → RabbitMQ → HTTP Classifier
  (high latency, hard to debug, operational overhead)

Layers:
  scanOrch.enumerate() → reconcileOrch.reconcile() → classifyOrch.classify()
  (in-process, testable, no network overhead)

Use layers within a service. Use microservices between services.


Practical Implementation Guide

Step 1: Identify Your Pure Algorithm

Start with the core business logic. For enumeration, it’s the BFS walk:

export async function* pureAlgorithm(
  input: string,
  deps: Deps,
  emit: (event) => void
): AsyncIterable<void> {
  // Core logic here
  // No I/O calls directly
  // Everything through deps
  // Emit events instead of returning
}

Step 2: Define the Deps Interface

What does your algorithm need from the outside world?

export interface Deps {
  // File I/O
  stat: (path: string) => Promise<FileStat | null>;
  listDir: (path: string) => Promise<DirEntry[]>;

  // Database queries
  getScanStatus: (path: string) => Promise<ScanStatus | null>;

  // Options
  maxDepth?: number;
}

Step 3: Create Adapters

Implement Deps for each backend:

// Local filesystem
export function createLocalFs(): Pick<Deps, 'stat' | 'listDir'> {
  return {
    async stat(path) { /* ... */ },
    async listDir(path) { /* ... */ },
  };
}

// Mock for testing
export function createMockFs(structure): Pick<Deps, 'stat' | 'listDir'> {
  return {
    async stat(path) { /* check structure */ },
    async listDir(path) { /* return structure[path] */ },
  };
}

// Cloud filesystem (future)
export function createS3Fs(): Pick<Deps, 'stat' | 'listDir'> {
  return {
    async stat(path) { /* s3.headObject() */ },
    async listDir(path) { /* s3.listObjectsV2() */ },
  };
}

Step 4: Create Orchestrators

Wire adapters to the pure function:

export function createLocalOrchestrator(db: Database) {
  const fs = createLocalFs();
  const repo = createRepository(db);

  return {
    async *enumerate(path: string) {
      yield* pureAlgorithm(path, {
        stat: fs.stat,
        listDir: fs.listDir,
        getScanStatus: repo.getScanStatus,
      }, (event) => { /* process event */ });
    },
  };
}

export function createTestOrchestrator(mockFs) {
  return {
    async *enumerate(path: string) {
      yield* pureAlgorithm(path, {
        stat: mockFs.stat,
        listDir: mockFs.listDir,
        getScanStatus: async () => null,
      }, (event) => { /* process event */ });
    },
  };
}

Step 5: Test Everything

it('enumerates directories', async () => {
  const mockFs = createMockFs({
    '/root': ['a.txt', 'b/'],
    '/root/b': ['c.txt'],
  });

  const orch = createTestOrchestrator(mockFs);
  const events = [];

  for await (const e of orch.enumerate('/root')) {
    events.push(e);
  }

  expect(events.map(e => e.name)).toEqual(['a.txt', 'b', 'c.txt']);
});

Key Takeaways

  1. Separate algorithm from I/O: Pure functions have no dependencies except their inputs
  2. Use dependency injection: Pass all external operations as interface parameters
  3. Emit events, don’t return data: Makes algorithms composable and streambable
  4. Create adapters for each backend: SQLite, cloud, mock, filesystem—different impls, same interface
  5. Orchestrate in the handler layer: Wire adapters to pure functions for different deployments
  6. Test with mocks: In-memory mocks eliminate all I/O overhead from tests

This pattern scales from simple utilities to complex distributed systems. The key is discipline: resist the urge to call I/O from pure functions, even when it seems convenient.


References


Edit page
Share this post on:

Previous Post
Streaming Data Pipelines with Async Generators
Next Post
Local Testing of Distributed Systems