The Problem: When you build a data runner that needs to work across multiple devices (MacBook, NAS, cloud), you face a choice: either duplicate business logic for each runtime, or abstract I/O away from the core algorithms.
The Solution: A three-layer architecture—pure functions, adapters, and orchestrators—that makes your data pipeline testable, deployable to any runtime, and independent enough that multiple teams can work on different parts without constant coordination.
This article walks through the architecture pattern used in Replicator (a multi-device data indexer), shows why it beats monolithic approaches, and provides copy-paste ready code.
Table of Contents
Open Table of Contents
The Monolithic Problem
Imagine building a file indexing system. You need to:
- Walk directories and catalog files
- Deduplicate by content hash
- Classify files into categories
- Apply consolidation strategies
- Query and return results
In a traditional monolithic design, your code looks like this:
// The monolithic approach (DON'T do this)
async function indexDirectory(path: string, dbPath: string) {
// Open the database
const db = new Database(dbPath);
// Walk the filesystem (tightly coupled to real I/O)
const entries = fs.readdirSync(path);
for (const entry of entries) {
const stat = fs.statSync(path + '/' + entry);
// Call database directly (business logic tangled with I/O)
const hash = computeHash(fs.readFileSync(path + '/' + entry));
db.prepare('INSERT INTO files VALUES (?, ?, ?)').run(
entry,
hash,
stat.size
);
}
// Run dedup queries (tightly coupled to one database backend)
const dupes = db.prepare(
'SELECT * FROM files WHERE hash = ? GROUP BY hash HAVING COUNT(*) > 1'
).all();
// Process results
for (const group of dupes) {
// Apply consolidation logic (mixed with query logic)
}
db.close();
}
Problems with this approach:
- Untestable: You can’t test without a real filesystem and database
- One deployment path: Works locally with SQLite, but how do you:
- Run on cloud infrastructure (use D1 instead of SQLite)?
- Deploy as a worker (emit events instead of writing DB)?
- Share logic between platforms (Node, Rust, Python)?
- Tightly coupled: Business logic is intertwined with I/O specifics
- Hard to parallelize: No clear boundaries between what can run in parallel vs. what needs coordination
- Team friction: One person can’t improve hashing without coordinating with someone working on dedup
The core issue: your algorithm is imprisoned in a specific technology stack.
Three-Layer Architecture
The solution is to separate three concerns:
┌─────────────────────────────────────────────────────────────────┐
│ LAYER 3: ORCHESTRATORS │
│ (Integrate all layers, provide runtime-specific entry points) │
│ │
│ - createLocalOrchestrator(db) │
│ - createHttpOrchestrator() │
│ - createTestOrchestrator(mockFs, mockDb) │
└─────────────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────────────┐
│ LAYER 2: ADAPTERS (Dependency Injection) │
│ (Implement interfaces for database, filesystem, network, etc.) │
│ │
│ - LocalFileSystem → filesystem │
│ - SQLiteRepository → database queries │
│ - MockFileSystem → testing │
│ - CloudFileSystem (D1) → alternative backend │
└─────────────────────────────────────────────────────────────────┘
↑
┌─────────────────────────────────────────────────────────────────┐
│ LAYER 1: PURE FUNCTIONS │
│ (Algorithm that depends only on injected interfaces) │
│ │
│ - enumerateTree(rootPath, deps, emit) │
│ - reconcile(scans, deps) │
│ - classify(files, deps) │
│ │
│ Properties: │
│ - Same input → always same output │
│ - No side effects (I/O happens through deps) │
│ - Emits events for caller to handle │
└─────────────────────────────────────────────────────────────────┘
Key principle: Dependency injection flows down, data flows up.
Layer 1: Pure Functions
The bottom layer contains pure algorithms with no I/O. All side effects come through a dependency injection interface.
Example: Pure Enumeration
/**
* Pure BFS enumeration algorithm.
* Completely decoupled from database and filesystem implementations.
*
* All operations come from injected `deps` interface.
*/
export interface EnumerationDeps {
/** Get file/directory attributes (stat) */
stat: (path: string) => Promise<FileStat | null>;
/** List directory contents */
listDir: (path: string) => Promise<DirEntry[]>;
/** Query database for mtime-skip optimization */
getDirScanStatus: (path: string) =>
Promise<{ id: number; mtime_ms: number } | null>;
/** Get known subdirectories (for mtime skip) */
getKnownSubdirs: (path: string) => Promise<string[]>;
/** Pattern matching for directory auto-labels */
matchDirPattern: (name: string) => { label: string } | null;
/** Maximum directories to visit */
maxDirs?: number;
/** Whether to follow symlinks */
followSymlinks?: boolean;
}
/**
* Pure enumeration: BFS walk that emits events.
*
* Same input → same output, always.
* No database writes, no direct filesystem calls.
*/
export async function* enumerateTree(
rootPath: string,
volumeId: number,
deps: EnumerationDeps,
emit: (event: EnumerateEvent) => void,
): AsyncIterable<void> {
let dirsVisited = 0;
let filesFound = 0;
let errors = 0;
const t0 = Date.now();
// Seed root directory
const rootStat = await deps.stat(rootPath);
if (!rootStat) {
emit({
type: 'error',
path: rootPath,
message: 'Cannot stat root',
});
return;
}
emit({
type: 'dir',
path: rootPath,
volumeId,
ino: rootStat.ino,
mtimeMs: rootStat.mtimeMs,
});
// BFS: queue of directories to visit
const queue: string[] = [rootPath];
while (queue.length > 0 && dirsVisited < (deps.maxDirs ?? Infinity)) {
const dirPath = queue.shift()!;
// --- Optimization: mtime-based skip ---
// If directory hasn't changed, reuse cached listing
const existing = await deps.getDirScanStatus(dirPath);
const dirStat = await deps.stat(dirPath);
if (
existing &&
existing.mtime_ms !== null &&
Math.abs(existing.mtime_ms - dirStat.mtimeMs) <= 1
) {
// Children are already indexed. Re-queue subdirectories.
const subdirs = await deps.getKnownSubdirs(dirPath);
for (const subdir of subdirs) {
queue.push(subdir);
}
dirsVisited++; // Still count as visited for progress tracking
continue;
}
// --- List directory ---
const entries = await deps.listDir(dirPath);
dirsVisited++;
// Check for pattern-based auto-labels
const dirName = dirPath.split('/').pop() || dirPath;
const patternMatch = deps.matchDirPattern(dirName);
if (patternMatch) {
emit({
type: 'dir_label',
path: dirPath,
label: patternMatch.label,
});
}
// Process each entry
for (const entry of entries) {
const entryPath = `${dirPath}/${entry.name}`;
const entryStat = await deps.stat(entryPath);
if (!entryStat) {
errors++;
continue;
}
if (entry.isDirectory) {
// Queue subdirectory for BFS
emit({
type: 'dir',
path: entryPath,
volumeId,
ino: entryStat.ino,
mtimeMs: entryStat.mtimeMs,
});
queue.push(entryPath);
} else if (entry.isFile) {
// Emit file discovery
emit({
type: 'file',
path: entryPath,
volumeId,
ino: entryStat.ino,
size: entryStat.size,
mtimeMs: entryStat.mtimeMs,
});
filesFound++;
}
}
emit({
type: 'dir_done',
path: dirPath,
mtimeMs: dirStat.mtimeMs,
});
}
// Emit final stats
emit({
type: 'done',
dirsVisited,
filesFound,
errors,
elapsedMs: Date.now() - t0,
});
}
Key properties of pure functions:
- Stateless: No hidden dependencies. All state comes through
deps. - Deterministic: Same input always produces same events.
- Composable: Events can be fed to multiple consumers.
- Testable: You can mock
depsentirely without touching disk or database.
Layer 2: Adapters (Dependency Injection)
Adapters implement the EnumerationDeps interface for different backends. The same enumerateTree function works with any adapter.
Local Filesystem Adapter
/**
* Create a real filesystem adapter for local enumeration.
*/
export function createLocalFileSystem(): Omit<
EnumerationDeps,
'getDirScanStatus' | 'getKnownSubdirs' | 'matchDirPattern'
> {
return {
async stat(path: string) {
const result = safeStat(path);
if (!result) return null;
return {
ino: result.ino,
size: result.size,
mtimeMs: result.mtimeMs,
nlink: result.nlink,
isDirectory: result.isDirectory,
isFile: !result.isDirectory,
};
},
async listDir(path: string) {
const entries = safeReaddir(path); // safe: handles errors gracefully
return entries.map((e) => ({
name: e.name,
isDirectory: e.isDirectory,
isFile: !e.isDirectory,
isSymbolicLink: e.isSymbolicLink,
isValidUtf8: e.isValidUtf8,
}));
},
};
}
Database Repository Adapter
/**
* Create a database repository for mtime-skip queries.
*/
export function createScanRepository(db: Database) {
return {
async getDirScanStatus(path: string) {
return db
.prepare(
`SELECT id, mtime_ms, scan_status FROM directories
WHERE path = ?`
)
.get(path) as { id: number; mtime_ms: number | null; scan_status: string } | undefined;
},
async getKnownSubdirs(path: string) {
const rows = db
.prepare(`SELECT path FROM directories WHERE parent_path = ?`)
.all(path) as { path: string }[];
return rows.map((r) => r.path);
},
};
}
Mock Filesystem for Testing
/**
* Create an in-memory mock filesystem for testing.
* No real I/O, no database needed.
*/
export function createMockFileSystem(
structure: Record<string, string[]> // map of paths to entry names
) {
let fileCounter = 1000;
return {
async stat(path: string) {
// Check if this path exists in our structure
if (structure[path]) {
return {
ino: fileCounter++,
size: 4096,
mtimeMs: Date.now(),
nlink: 1,
isDirectory: true,
isFile: false,
};
}
// Check if path is a file in parent directory
for (const [dirPath, entries] of Object.entries(structure)) {
const fileName = path.split('/').pop();
if (entries.includes(fileName!)) {
return {
ino: fileCounter++,
size: Math.random() * 100000,
mtimeMs: Date.now(),
nlink: 1,
isDirectory: false,
isFile: true,
};
}
}
return null; // Path doesn't exist
},
async listDir(path: string) {
if (!structure[path]) return [];
return structure[path].map((name) => ({
name,
isDirectory: name.endsWith('/'),
isFile: !name.endsWith('/'),
isSymbolicLink: false,
isValidUtf8: true,
}));
},
};
}
Benefit of adapters: Change the implementation without changing the algorithm. Swap SQLite for D1, local filesystem for cloud storage, real I/O for mocks—the pure function doesn’t care.
Layer 3: Orchestrators
Orchestrators wire together pure functions, adapters, and I/O to create complete, deployable features.
Local Orchestrator
/**
* Local enumeration orchestrator.
* Wires pure enumerate to real database and filesystem.
*/
export function createLocalOrchestrator(db: Database) {
const dbRepo = createScanRepository(db);
const fs = createLocalFileSystem();
return {
async *enumerate(
rootPath: string,
options?: { maxDirs?: number; followSymlinks?: boolean }
): AsyncIterable<EnumerateEvent> {
const deps: EnumerationDeps = {
stat: fs.stat,
listDir: fs.listDir,
getDirScanStatus: dbRepo.getDirScanStatus.bind(dbRepo),
getKnownSubdirs: dbRepo.getKnownSubdirs.bind(dbRepo),
matchDirPattern: (name) => {
// Load patterns from database
const pattern = db
.prepare(`SELECT label FROM dir_patterns WHERE pattern LIKE ?`)
.get(`%${name}%`) as { label: string } | undefined;
return pattern ? { label: pattern.label } : null;
},
maxDirs: options?.maxDirs,
followSymlinks: options?.followSymlinks,
};
const events: EnumerateEvent[] = [];
const emitter = (event: EnumerateEvent) => {
events.push(event);
};
yield* enumerateTree(rootPath, 1, deps, emitter);
yield* events;
},
};
}
Test Orchestrator (No Real I/O)
/**
* Test orchestrator using in-memory mock filesystem.
*/
export function createTestOrchestrator(
mockFs: Omit<EnumerationDeps, 'getDirScanStatus' | 'getKnownSubdirs'>
) {
return {
async *enumerate(
rootPath: string,
options?: { maxDirs?: number }
): AsyncIterable<EnumerateEvent> {
const deps: EnumerationDeps = {
stat: mockFs.stat,
listDir: mockFs.listDir,
getDirScanStatus: async () => null, // No caching in tests
getKnownSubdirs: async () => [],
matchDirPattern: () => null,
maxDirs: options?.maxDirs,
followSymlinks: false,
};
const events: EnumerateEvent[] = [];
const emitter = (event: EnumerateEvent) => events.push(event);
yield* enumerateTree(rootPath, 1, deps, emitter);
yield* events;
},
};
}
HTTP Worker Orchestrator
/**
* Orchestrator for running as a cloud worker.
* Reads input from stdin, emits results as NDJSON.
*/
export async function createWorkerOrchestrator(dbPath: string) {
const db = new Database(dbPath, { readonly: true });
const dbRepo = createScanRepository(db);
const fs = createLocalFileSystem();
return {
async *enumerate(rootPath: string) {
const deps: EnumerationDeps = {
stat: fs.stat,
listDir: fs.listDir,
getDirScanStatus: dbRepo.getDirScanStatus.bind(dbRepo),
getKnownSubdirs: dbRepo.getKnownSubdirs.bind(dbRepo),
matchDirPattern: () => null,
};
// Emit each event as NDJSON on stdout
yield* enumerateTree(rootPath, 1, deps, (event) => {
process.stdout.write(JSON.stringify(event) + '\n');
});
},
};
}
Real Examples from Replicator
Replicator uses this three-layer pattern for multiple operations:
Enumerate
Pure: enumerateTree() in layers/pure/scan/enumerate.ts
Adapters: createLocalFileSystem(), createScanRepository()
Orchestrator: createScanOrchestrator(db)
Reconcile (Deduplication)
Pure: reconcile() in layers/pure/reconcile/
Adapters: SQLite duplicate queries, confidence scoring
Orchestrator: createLocalReconciler(db)
Classify (AI categorization)
Pure: classifyFiles() in layers/pure/classifier/
Adapters: createOllamaProvider(), createClaudeProvider()
Orchestrator: createClassifyOrchestrator(db, aiProvider)
Each follows the same pattern: pure algorithm + injected dependencies + runtime-specific orchestrator.
Why This Matters
1. Testability
Without this pattern, testing requires:
- A real filesystem
- A real database
- Network calls to external services
- Hours of test setup
With layers, unit tests run in milliseconds:
// Test with no I/O
it('enumerates a mock directory tree', async () => {
const mockFs = createMockFileSystem({
'/test': ['a.txt', 'subdir/'],
'/test/subdir': ['b.txt', 'c.txt'],
});
const orch = createTestOrchestrator(mockFs);
const events: EnumerateEvent[] = [];
for await (const event of orch.enumerate('/test')) {
events.push(event);
}
expect(events).toContainEqual(
expect.objectContaining({ type: 'file', name: 'a.txt' })
);
expect(events).toContainEqual(
expect.objectContaining({ type: 'file', name: 'b.txt' })
);
expect(events).toContainEqual(
expect.objectContaining({ type: 'done' })
);
});
No fixtures. No disk access. Just pure logic with mocked dependencies.
2. Multi-Runtime Support
The same enumerateTree() function runs as:
- Local CLI:
npm run scan /path - Daemon worker: Spawned subprocess, outputs NDJSON
- HTTP endpoint: Cloud worker receives request, streams results
- P2P: Between two devices over WebSocket
Same algorithm. Different orchestrators.
3. Parallel Development
Teams can work independently:
- Algorithm team: Improves
enumerateTree()without touching I/O - Database team: Optimizes repository queries without touching algorithm
- DevOps team: Deploys to different runtimes without changing code
As long as the EnumerationDeps interface stays stable, changes in one layer don’t block others.
4. Testability of Complex Workflows
Full pipeline tests become practical:
// Test entire enumerate → reconcile → classify flow
it('processes files end-to-end', async () => {
const mockFs = createMockFileSystem({
'/data': ['file1.jpg', 'file2.jpg', 'file3.txt'],
});
const mockDb = createInMemoryDatabase();
const mockAI = createMockAIProvider();
const scanOrch = createTestOrchestrator(mockFs);
const reconcileOrch = createTestReconciler();
const classifyOrch = createTestClassifier(mockAI);
// Run enumeration
const enumEvents = [];
for await (const e of scanOrch.enumerate('/data')) {
enumEvents.push(e);
}
// Run reconciliation
const scans = eventsToDocs(enumEvents);
const plans = [];
for await (const p of reconcileOrch.reconcile(scans)) {
plans.push(p);
}
// Run classification
const files = plansToFiles(plans);
const classified = [];
for await (const c of classifyOrch.classify(files)) {
classified.push(c);
}
// All in-memory, no disk access, runs in milliseconds
expect(classified).toHaveLength(3);
expect(classified).toContainEqual(
expect.objectContaining({ path: '/data/file1.jpg', category: 'image' })
);
});
Comparison with Alternatives
vs. DI Frameworks (Spring, Nestjs)
DI frameworks like Spring or Nest inject dependencies at runtime. This approach is simpler:
// With Spring (lots of boilerplate)
@Component
class FileSystemAdapter implements FileSystem {
stat(path) { /* ... */ }
}
@Component
class ScanRepository implements Repository {
constructor(private db: Database) {}
getDirScanStatus(path) { /* ... */ }
}
@Service
class EnumerationService {
constructor(
private fs: FileSystem,
private repo: Repository
) {}
enumerate(path) { /* ... */ }
}
// vs. Plain functions (what we're doing)
const deps = {
stat: fs.stat,
listDir: fs.listDir,
getDirScanStatus: repo.getDirScanStatus,
getKnownSubdirs: repo.getKnownSubdirs,
};
yield* enumerateTree(path, 1, deps, emit);
Plain function injection is:
- Explicit (no magic)
- Lightweight (no runtime overhead)
- Testable (just pass objects)
- Language-agnostic (works in any language)
vs. Monolithic “Everything in One Process”
Monolithic approaches tangle algorithm with I/O:
// Monolithic: algorithm can't be tested alone
async function enumerate(path: string, db: Database) {
const entries = fs.readdirSync(path);
for (const e of entries) {
db.prepare('INSERT INTO files VALUES (?)').run(e);
}
}
// Layered: algorithm is testable
async function* enumerateTree(path: string, deps: Deps, emit) {
const entries = await deps.listDir(path);
for (const e of entries) {
emit({ type: 'file', name: e });
}
}
vs. Microservices
Microservices separate concerns at the network level. Layers separate concerns at the code level:
Microservices:
HTTP Scanner → RabbitMQ → HTTP Dedup → RabbitMQ → HTTP Classifier
(high latency, hard to debug, operational overhead)
Layers:
scanOrch.enumerate() → reconcileOrch.reconcile() → classifyOrch.classify()
(in-process, testable, no network overhead)
Use layers within a service. Use microservices between services.
Practical Implementation Guide
Step 1: Identify Your Pure Algorithm
Start with the core business logic. For enumeration, it’s the BFS walk:
export async function* pureAlgorithm(
input: string,
deps: Deps,
emit: (event) => void
): AsyncIterable<void> {
// Core logic here
// No I/O calls directly
// Everything through deps
// Emit events instead of returning
}
Step 2: Define the Deps Interface
What does your algorithm need from the outside world?
export interface Deps {
// File I/O
stat: (path: string) => Promise<FileStat | null>;
listDir: (path: string) => Promise<DirEntry[]>;
// Database queries
getScanStatus: (path: string) => Promise<ScanStatus | null>;
// Options
maxDepth?: number;
}
Step 3: Create Adapters
Implement Deps for each backend:
// Local filesystem
export function createLocalFs(): Pick<Deps, 'stat' | 'listDir'> {
return {
async stat(path) { /* ... */ },
async listDir(path) { /* ... */ },
};
}
// Mock for testing
export function createMockFs(structure): Pick<Deps, 'stat' | 'listDir'> {
return {
async stat(path) { /* check structure */ },
async listDir(path) { /* return structure[path] */ },
};
}
// Cloud filesystem (future)
export function createS3Fs(): Pick<Deps, 'stat' | 'listDir'> {
return {
async stat(path) { /* s3.headObject() */ },
async listDir(path) { /* s3.listObjectsV2() */ },
};
}
Step 4: Create Orchestrators
Wire adapters to the pure function:
export function createLocalOrchestrator(db: Database) {
const fs = createLocalFs();
const repo = createRepository(db);
return {
async *enumerate(path: string) {
yield* pureAlgorithm(path, {
stat: fs.stat,
listDir: fs.listDir,
getScanStatus: repo.getScanStatus,
}, (event) => { /* process event */ });
},
};
}
export function createTestOrchestrator(mockFs) {
return {
async *enumerate(path: string) {
yield* pureAlgorithm(path, {
stat: mockFs.stat,
listDir: mockFs.listDir,
getScanStatus: async () => null,
}, (event) => { /* process event */ });
},
};
}
Step 5: Test Everything
it('enumerates directories', async () => {
const mockFs = createMockFs({
'/root': ['a.txt', 'b/'],
'/root/b': ['c.txt'],
});
const orch = createTestOrchestrator(mockFs);
const events = [];
for await (const e of orch.enumerate('/root')) {
events.push(e);
}
expect(events.map(e => e.name)).toEqual(['a.txt', 'b', 'c.txt']);
});
Key Takeaways
- Separate algorithm from I/O: Pure functions have no dependencies except their inputs
- Use dependency injection: Pass all external operations as interface parameters
- Emit events, don’t return data: Makes algorithms composable and streambable
- Create adapters for each backend: SQLite, cloud, mock, filesystem—different impls, same interface
- Orchestrate in the handler layer: Wire adapters to pure functions for different deployments
- Test with mocks: In-memory mocks eliminate all I/O overhead from tests
This pattern scales from simple utilities to complex distributed systems. The key is discipline: resist the urge to call I/O from pure functions, even when it seems convenient.
References
- RFC-RE-001: Drive Roles and deduplication architecture
- Replicator source:
src/layers/(pure, io, orchestrators) - Testing patterns:
src/__tests__/and*.test.tsfiles - Dependency injection: “Dependency Injection Principles, Practices, and Patterns” by Steven van Deursen and Mark Seemann
- Hexagonal Architecture: Alistair Cockburn’s “Hexagonal Architecture” (also called Ports & Adapters)