The Problem: Building data pipelines that can handle gigabytes of files without loading everything into memory. Traditional approaches require complex infrastructure:
- Batch processing: Load all data, process, write results. Memory explosion on large datasets.
- RxJS Observables: Powerful but add cognitive overhead for sequential workflows.
- Node streams: Low-level, require manual pipe plumbing and error handling.
- Message queues: Operational complexity for what should be simple composition.
The Solution: Async generators are the right primitive for data pipelines. They’re:
- Natural: Look like normal sequential code, not callback hell
- Composable: Chain generators with simple helpers
- Backpressured: Consumer controls the pace (no runaway producers)
- Memory efficient: Process one item at a time
- Debuggable: Stack traces work normally
This article explains why async generators beat the alternatives, shows real examples from Replicator, and provides copy-paste ready patterns for your pipelines.
Table of Contents
Open Table of Contents
- The Problem with Batch Processing
- Why Async Generators
- Async Generator Basics
- Building Streaming Pipelines
- Backpressure Without Thinking
- Real Examples from Replicator
- Composition Patterns
- Error Handling
- Testing Async Generators
- Comparison with Alternatives
- Real-World Size Comparison
- Key Takeaways
- References
The Problem with Batch Processing
Imagine you need to process files from a 4TB external drive. Batch processing looks like:
// Batch approach (DON'T do this for large datasets)
async function processAllFiles(path: string): Promise<ProcessedFile[]> {
// Step 1: Enumerate ALL files into memory
const files = await enumerateAllFiles(path);
console.log(`Found ${files.length} files`); // Could be millions!
// Step 2: Hash ALL files
const hashed = await hashAllFiles(files);
console.log(`Hashed ${hashed.length} files`); // Still in memory!
// Step 3: Deduplicate
const deduped = deduplicateAll(hashed);
// Step 4: Return results
return deduped; // All in memory → OOM on large drives
}
Problems:
- Memory explosion: 4TB drive → millions of file objects → gigabytes of RAM
- No progress visibility: You can’t see progress until a step completes
- No cancellation: If step 2 is slow, you can’t skip it
- Sequential slowness: Step 2 can’t start until step 1 finishes completely
- Hard to test: You need fixtures with millions of files
This is why you need streaming.
Why Async Generators
Async generators let you process data as it arrives, not after it’s all collected:
// Async generator approach (BETTER)
async function* processFiles(path: string): AsyncIterable<ProcessedFile> {
// Step 1: Enumerate and yield files one at a time
for await (const file of enumerateFilesStream(path)) {
// Step 2: Hash the file
const hashed = await hashFile(file);
// Step 3: Check for dupes
if (!isDuplicate(hashed)) {
// Step 4: Yield the result
// Caller gets this ONE result, no accumulation
yield hashed;
}
}
}
// Usage: process files one at a time
for await (const result of processFiles('/data')) {
// Each result arrives as it's ready
// Previous results are garbage-collected
// Backpressure is automatic
console.log(result);
}
Benefits:
- Constant memory: Process file N while forgetting about file N-1
- Real-time progress: Each yield is visible progress
- Natural composition: Just nest loops
- Automatic backpressure: If consumer is slow, producer pauses
- Testable: Yield test data one at a time
Async Generator Basics
Simple Example: Count to 10
// Define a generator
async function* countToTen(): AsyncIterable<number> {
for (let i = 1; i <= 10; i++) {
console.log(`Yielding ${i}`);
yield i;
}
}
// Use it
for await (const num of countToTen()) {
console.log(`Got ${num}`);
}
// Output:
// Yielding 1
// Got 1
// Yielding 2
// Got 2
// ...
Key insight: The generator pauses at yield. The consumer controls when to call next().
Simulate Slow I/O
async function* slowNumbers(): AsyncIterable<number> {
for (let i = 0; i < 5; i++) {
await new Promise(resolve => setTimeout(resolve, 1000)); // Simulate I/O
yield i;
}
}
// Consumer gets each number one per second
for await (const num of slowNumbers()) {
console.log(`Got ${num} at ${new Date().toISOString()}`);
}
Generators with Side Effects
async function* enumerateAndLog(path: string): AsyncIterable<File> {
const entries = await fs.promises.readdir(path);
for (const entry of entries) {
const fullPath = `${path}/${entry}`;
const stat = await fs.promises.stat(fullPath);
console.log(`Found: ${fullPath} (${stat.size} bytes)`);
yield {
path: fullPath,
size: stat.size,
mtime: stat.mtime,
};
}
}
// Each yield is visible
for await (const file of enumerateAndLog('/large/drive')) {
// Process one file at a time
console.log(`Processing ${file.path}`);
}
Building Streaming Pipelines
Simple Pipeline: Filter, Transform, Reduce
// Step 1: Generator that yields raw files
async function* enumerateFiles(path: string): AsyncIterable<FileStat> {
const entries = await fs.promises.readdir(path);
for (const entry of entries) {
const stat = await fs.promises.stat(`${path}/${entry}`);
yield { name: entry, ...stat };
}
}
// Step 2: Filter (only large files)
async function* filterLargeFiles(
source: AsyncIterable<FileStat>,
minSize: number
): AsyncIterable<FileStat> {
for await (const file of source) {
if (file.size >= minSize) {
yield file;
}
}
}
// Step 3: Transform (add hash)
async function* addHashes(
source: AsyncIterable<FileStat>
): AsyncIterable<FileStat & { hash: string }> {
for await (const file of source) {
const content = await fs.promises.readFile(file.path);
const hash = crypto.createHash('sha256').update(content).digest('hex');
yield { ...file, hash };
}
}
// Step 4: Reduce (accumulate duplicates)
async function findDuplicates(
source: AsyncIterable<FileStat & { hash: string }>
): Promise<Map<string, FileStat[]>> {
const dupes = new Map<string, FileStat[]>();
for await (const file of source) {
if (!dupes.has(file.hash)) {
dupes.set(file.hash, []);
}
dupes.get(file.hash)!.push(file);
}
return dupes;
}
// Compose the pipeline
async function findLargeFileDupes(path: string) {
const files = enumerateFiles(path);
const large = filterLargeFiles(files, 1024 * 1024); // 1MB+
const hashed = addHashes(large);
const dupes = await findDuplicates(hashed);
return dupes;
}
Key principle: Each step yields, the next step consumes. Backpressure is automatic.
Progress Reporting
async function* addProgress<T>(
source: AsyncIterable<T>,
label: string
): AsyncIterable<T> {
let count = 0;
const startTime = Date.now();
let lastPrint = startTime;
for await (const item of source) {
count++;
const now = Date.now();
// Print every 2 seconds
if (now - lastPrint >= 2000) {
const elapsed = (now - startTime) / 1000;
const rate = count / elapsed;
console.log(
`${label}: ${count} items in ${elapsed.toFixed(1)}s (${rate.toFixed(0)}/s)`
);
lastPrint = now;
}
yield item;
}
const elapsed = (Date.now() - startTime) / 1000;
console.log(`${label}: Done. ${count} items in ${elapsed.toFixed(1)}s`);
}
// Usage: wrap any generator
async function findLargeFileDupes(path: string) {
const files = enumerateFiles(path);
const withProgress = addProgress(files, 'Enumeration');
const large = filterLargeFiles(withProgress, 1024 * 1024);
const hashed = addHashes(large);
const dupes = await findDuplicates(hashed);
return dupes;
}
Batch Processing (Still Streaming)
Sometimes you need to batch items for efficiency (e.g., database inserts):
async function* batch<T>(
source: AsyncIterable<T>,
size: number
): AsyncIterable<T[]> {
let batch: T[] = [];
for await (const item of source) {
batch.push(item);
if (batch.length >= size) {
yield batch;
batch = [];
}
}
// Yield remaining
if (batch.length > 0) {
yield batch;
}
}
// Usage: batch 200 files, insert together
async function insertFiles(path: string, db: Database) {
const files = enumerateFiles(path);
const batches = batch(files, 200);
for await (const batch of batches) {
// Insert all 200 at once (more efficient)
db.transaction(() => {
for (const file of batch) {
db.prepare('INSERT INTO files VALUES (?, ?)').run(file.name, file.size);
}
})();
}
}
Backpressure Without Thinking
Backpressure means: if the consumer is slow, the producer slows down. Async generators do this automatically.
Without Backpressure (Problem)
// Promise.all: starts all operations immediately
async function slowProcessing() {
const files = ['file1', 'file2', 'file3', /* ... 1000 more ... */];
// Starts hashing all 1000+ files immediately
// Memory explodes
const hashed = await Promise.all(files.map(f => hashFile(f)));
// Now process one at a time
for (const hash of hashed) {
await insertIntoDb(hash);
await new Promise(r => setTimeout(r, 100)); // Slow insertion
}
}
This loads all 1000+ hashes into memory before starting the slow insertion.
With Backpressure (Solution)
async function streamProcessing(path: string) {
// Enumerate one at a time
for await (const file of enumerateFiles(path)) {
// Hash one file
const hash = await hashFile(file);
// Insert one file (slow)
await insertIntoDb(hash);
// ^ Pauses here. enumeration doesn't start the next file
// until this insertion finishes.
}
}
The loop pauses at each for await, giving the consumer control. If insertion is slow (100ms), enumeration waits 100ms before fetching the next file.
Measuring Backpressure
async function* enumerateFiles(path: string): AsyncIterable<File> {
for (const file of files) {
console.log(`Yielding ${file.name}`);
yield file; // Pauses here until consumer calls next()
console.log(`Consumer done with ${file.name}`);
}
}
// Fast consumer
async function fastConsumer() {
console.log('Start');
for await (const file of enumerateFiles('/path')) {
// Does nothing (very fast)
}
console.log('End');
}
// Output:
// Start
// Yielding file1
// Consumer done with file1
// Yielding file2
// Consumer done with file2
// End
// (immediate, producer and consumer pace matches)
// Slow consumer
async function slowConsumer() {
console.log('Start');
for await (const file of enumerateFiles('/path')) {
await new Promise(r => setTimeout(r, 1000)); // 1 second per file
}
console.log('End');
}
// Output:
// Start
// Yielding file1
// (1 second delay as consumer sleeps)
// Consumer done with file1
// Yielding file2
// (1 second delay as consumer sleeps)
// Consumer done with file2
// End
// (slow, because consumer is slow)
Result: Memory stays constant. No buffering. Natural flow.
Real Examples from Replicator
Replicator uses async generators throughout the pipeline:
Enumerate Stream
// From src/layers/pure/scan/enumerate.ts
export async function* enumerateTree(
rootPath: string,
volumeId: number,
deps: EnumerationDeps,
emit: (event: EnumerateEvent) => void
): AsyncIterable<void> {
// Yields progress events and discovery events
// Caller processes one event at a time
// Memory for pending discoveries stays bounded
const queue: string[] = [rootPath];
let dirsVisited = 0;
while (queue.length > 0) {
const dirPath = queue.shift()!;
const entries = await deps.listDir(dirPath);
for (const entry of entries) {
if (entry.isDirectory) {
queue.push(/* ... */);
} else if (entry.isFile) {
emit({ type: 'file', path: entry });
}
}
// Yield progress (could be consumed by UI for real-time updates)
yield undefined;
dirsVisited++;
}
}
Reconcile Stream
// Deduplication as a stream
export async function* reconcile(
scans: ScanResult[],
deps: ReconciliationDeps
): AsyncIterable<ReconciliationPlan> {
// Group files by hash
const dupHashes = await deps.findDuplicateHashes(scans);
// Yield consolidation plans one at a time
for (const [hash, files] of dupHashes.entries()) {
const plan: ReconciliationPlan = {
hash,
copies: files,
sourceOfTruth: deps.selectSourceOfTruth(files),
consolidateVia: 'hardlink',
estimatedRecovery: files.reduce(f => f.size) - files[0].size,
};
yield plan; // One plan at a time
}
}
Classify Stream
// AI classification as a stream
export async function* classify(
files: FileStat[],
deps: ClassificationDeps
): AsyncIterable<ClassificationResult> {
// Process files in batches for efficiency
const batches = batch(files, 100);
for await (const fileBatch of batches) {
// Classify all files in batch with one AI call
const results = await deps.inferCategories(fileBatch);
for (const result of results) {
yield result; // Stream results as they arrive
}
}
}
Composition Patterns
Pattern 1: Linear Pipeline
// filter → transform → reduce
const files = enumerateFiles(path);
const large = filterLargeFiles(files, 1MB);
const hashed = addHashes(large);
const dupes = await findDuplicates(hashed);
Pattern 2: Fan-Out (Multiple Consumers)
// One producer, multiple consumers
async function processAndReport(path: string) {
const files = enumerateFiles(path);
// Consumer 1: Count files
const countPromise = (async () => {
let count = 0;
for await (const _ of files) {
count++;
}
return count;
})();
// Consumer 2: Sum sizes
const sizePromise = (async () => {
let total = 0;
for await (const file of files) {
total += file.size;
}
return total;
})();
// Wait for both (but they share the same producer!)
const [count, totalSize] = await Promise.all([countPromise, sizePromise]);
return { count, totalSize };
}
// Problem: This doesn't work. Generators can't be shared!
// Solution: Use tee() to split the stream
Pattern 3: Tee (Split Stream)
// Split one stream into multiple independent streams
async function* tee<T>(
source: AsyncIterable<T>,
count: number = 2
): AsyncIterable<AsyncIterable<T>[]> {
const buffers: T[][] = Array.from({ length: count }, () => []);
const indices: number[] = Array.from({ length: count }, () => 0);
let done = false;
let sourceIndex = 0;
const makeConsumer = (consumerIndex: number) => ({
async *[Symbol.asyncIterator](): AsyncIterable<T> {
while (true) {
// If we've fallen behind, read more from source
while (indices[consumerIndex] >= buffers[consumerIndex].length && !done) {
for await (const item of source) {
for (let i = 0; i < count; i++) {
buffers[i].push(item);
}
break;
}
done = true;
}
// Yield buffered items
if (indices[consumerIndex] < buffers[consumerIndex].length) {
yield buffers[consumerIndex][indices[consumerIndex]++];
} else {
break;
}
}
},
});
yield Array.from({ length: count }, (_, i) => makeConsumer(i)[Symbol.asyncIterator]());
}
// Usage
async function processWithMultipleConsumers(path: string) {
const files = enumerateFiles(path);
const [counter, sizer] = await tee(files, 2);
const [count, totalSize] = await Promise.all([
(async () => {
let count = 0;
for await (const _ of counter) count++;
return count;
})(),
(async () => {
let total = 0;
for await (const file of sizer) total += file.size;
return total;
})(),
]);
return { count, totalSize };
}
(In practice, use a library like async-iter-tools for tee and other utilities.)
Error Handling
Try-Catch in Generators
async function* safeEnumerate(path: string) {
try {
for (const entry of await fs.promises.readdir(path)) {
try {
const stat = await fs.promises.stat(`${path}/${entry}`);
yield { name: entry, ...stat };
} catch (err) {
// Log individual file errors, continue
console.warn(`Error on ${entry}: ${err.message}`);
}
}
} catch (err) {
// Directory read failed
console.error(`Cannot read ${path}: ${err.message}`);
}
}
Error Events
async function* enumerateWithErrors(
path: string
): AsyncIterable<File | Error> {
for (const entry of await fs.promises.readdir(path)) {
try {
const stat = await fs.promises.stat(`${path}/${entry}`);
yield { name: entry, ...stat };
} catch (err) {
// Yield error instead of throwing
yield err as Error;
}
}
}
// Consumer decides how to handle
for await (const result of enumerateWithErrors(path)) {
if (result instanceof Error) {
console.error(`Error: ${result.message}`);
} else {
console.log(`File: ${result.name}`);
}
}
Cancellation with AbortSignal
async function* cancellableEnumerate(
path: string,
signal: AbortSignal
): AsyncIterable<File> {
for (const entry of await fs.promises.readdir(path)) {
if (signal.aborted) break;
const stat = await fs.promises.stat(`${path}/${entry}`);
yield { name: entry, ...stat };
}
}
// Usage
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000); // Cancel after 5s
for await (const file of cancellableEnumerate(path, controller.signal)) {
console.log(file);
}
Testing Async Generators
Unit Test (No I/O)
it('filters large files', async () => {
// Create a mock generator
async function* mockFiles() {
yield { name: 'small.txt', size: 100 };
yield { name: 'large.bin', size: 10_000_000 };
yield { name: 'medium.jpg', size: 500_000 };
}
const large = filterLargeFiles(mockFiles(), 1_000_000);
const results = [];
for await (const file of large) {
results.push(file);
}
expect(results).toHaveLength(2); // small.txt and medium.jpg filtered out
expect(results[0].name).toBe('large.bin');
});
Streaming Test (Verify Backpressure)
it('respects backpressure', async () => {
let producedCount = 0;
let consumedCount = 0;
async function* countingGenerator() {
for (let i = 0; i < 10; i++) {
producedCount++;
yield i;
}
}
const slow = async () => {
for await (const num of countingGenerator()) {
consumedCount++;
await new Promise(r => setTimeout(r, 10)); // Simulate slow consumer
// Assert producer hasn't gotten too far ahead
expect(producedCount - consumedCount).toBeLessThanOrEqual(1);
}
};
await slow();
});
Integration Test (Full Pipeline)
it('processes files end-to-end', async () => {
// Create temp directory
const dir = await fs.promises.mkdtemp(path.join(tmpdir(), 'test-'));
await fs.promises.writeFile(`${dir}/file1.txt`, 'content1');
await fs.promises.writeFile(`${dir}/file2.txt`, 'content2');
// Run pipeline
const dupes = await findLargeFileDupes(dir);
// Verify results
expect(dupes.size).toBeGreaterThanOrEqual(0);
// Cleanup
await fs.promises.rm(dir, { recursive: true });
});
Comparison with Alternatives
vs. RxJS Observables
RxJS is powerful but adds cognitive overhead:
// RxJS: declarative, functional
from(getFiles()).pipe(
filter(f => f.size > 1MB),
mergeMap(f => hashFile(f)),
tap(f => console.log(f)),
toArray()
).subscribe(results => console.log(results));
// Async generators: imperative, normal code
const files = getFiles();
const large = filterLargeFiles(files, 1MB);
const hashed = addHashes(large);
for await (const result of hashed) {
console.log(result);
}
Trade-off: RxJS is better for complex combinators (merge, race, retry). Async generators are simpler for sequential pipelines.
vs. Node Streams
Node streams are low-level:
// Node streams: plumbing-heavy
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'))
.on('finish', () => console.log('done'));
// Async generators: simple composition
async function* readFile(file: string) {
const content = await fs.promises.readFile(file);
yield content;
}
async function* gzipFiles(source: AsyncIterable<Buffer>) {
for await (const content of source) {
yield zlib.gzipSync(content); // Sync here for simplicity
}
}
async function* writeToFile(source: AsyncIterable<Buffer>, file: string) {
for await (const content of source) {
await fs.promises.writeFile(file + '.gz', content);
yield content;
}
}
// Compose
for await (const _ of writeToFile(gzipFiles(readFile(inputFile)), outputFile)) {
// Done
}
Trade-off: Node streams are more efficient for binary I/O. Async generators are simpler for business logic pipelines.
vs. Message Queues (RabbitMQ, Kafka)
Message queues add operational overhead:
// Message queue: external infrastructure
const queue = new RabbitMQ();
await queue.connect();
// Producer
async function produceFiles() {
for (const file of getFiles()) {
await queue.publish('files', JSON.stringify(file));
}
}
// Consumer
queue.consume('files', async (msg) => {
const file = JSON.parse(msg.content);
await processFile(file);
msg.ack();
});
// Async generators: in-process
async function* produceFiles() {
for (const file of getFiles()) {
yield file;
}
}
for await (const file of produceFiles()) {
await processFile(file);
}
Use case: Message queues when you need distributed processing across multiple machines. Async generators when you’re processing within a single process.
Real-World Size Comparison
Replicator on 4TB Drive
Without streaming (batch):
- Memory: 4GB+ (storing millions of file objects)
- Time: Several hours (all phases sequential)
- Progress visibility: None until each phase completes
With async generators (streaming):
- Memory: ~100MB (processing one file at a time)
- Time: Same overall, but processes in parallel (enumeration → hashing → dedup)
- Progress visibility: Real-time
Key Takeaways
- Use async generators for sequential pipelines: Natural syntax, automatic backpressure
- Yield one item at a time: Memory constant, progress visible
- Compose with simple helpers: filter, map, batch, progress, tee
- Backpressure is automatic: Consumer pace controls producer pace
- Test with mocks: Generators work great with test data
- Use libraries for advanced patterns: async-iter-tools, iterable-operators
References
- async-iter-tools: https://github.com/wooorm/async-iter-tools (filter, map, batch, tee, etc.)
- MDN Async Generators: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function*
- TC39 Async Iteration: https://github.com/tc39/proposal-async-iteration
- Replicator source:
src/layers/pure/scan/enumerate.ts(enumerateTree) - “Gently Down the Stream”: https://2ality.com/2016/10/asynchronous-iteration.html (Dr. Axel Rauschmayer)