Skip to content
Gary Wu
Go back

Streaming Data Pipelines with Async Generators

Edit page

The Problem: Building data pipelines that can handle gigabytes of files without loading everything into memory. Traditional approaches require complex infrastructure:

The Solution: Async generators are the right primitive for data pipelines. They’re:

  1. Natural: Look like normal sequential code, not callback hell
  2. Composable: Chain generators with simple helpers
  3. Backpressured: Consumer controls the pace (no runaway producers)
  4. Memory efficient: Process one item at a time
  5. Debuggable: Stack traces work normally

This article explains why async generators beat the alternatives, shows real examples from Replicator, and provides copy-paste ready patterns for your pipelines.

Table of Contents

Open Table of Contents

The Problem with Batch Processing

Imagine you need to process files from a 4TB external drive. Batch processing looks like:

// Batch approach (DON'T do this for large datasets)
async function processAllFiles(path: string): Promise<ProcessedFile[]> {
  // Step 1: Enumerate ALL files into memory
  const files = await enumerateAllFiles(path);
  console.log(`Found ${files.length} files`); // Could be millions!

  // Step 2: Hash ALL files
  const hashed = await hashAllFiles(files);
  console.log(`Hashed ${hashed.length} files`); // Still in memory!

  // Step 3: Deduplicate
  const deduped = deduplicateAll(hashed);

  // Step 4: Return results
  return deduped; // All in memory → OOM on large drives
}

Problems:

  1. Memory explosion: 4TB drive → millions of file objects → gigabytes of RAM
  2. No progress visibility: You can’t see progress until a step completes
  3. No cancellation: If step 2 is slow, you can’t skip it
  4. Sequential slowness: Step 2 can’t start until step 1 finishes completely
  5. Hard to test: You need fixtures with millions of files

This is why you need streaming.


Why Async Generators

Async generators let you process data as it arrives, not after it’s all collected:

// Async generator approach (BETTER)
async function* processFiles(path: string): AsyncIterable<ProcessedFile> {
  // Step 1: Enumerate and yield files one at a time
  for await (const file of enumerateFilesStream(path)) {
    // Step 2: Hash the file
    const hashed = await hashFile(file);

    // Step 3: Check for dupes
    if (!isDuplicate(hashed)) {
      // Step 4: Yield the result
      // Caller gets this ONE result, no accumulation
      yield hashed;
    }
  }
}

// Usage: process files one at a time
for await (const result of processFiles('/data')) {
  // Each result arrives as it's ready
  // Previous results are garbage-collected
  // Backpressure is automatic
  console.log(result);
}

Benefits:

  1. Constant memory: Process file N while forgetting about file N-1
  2. Real-time progress: Each yield is visible progress
  3. Natural composition: Just nest loops
  4. Automatic backpressure: If consumer is slow, producer pauses
  5. Testable: Yield test data one at a time

Async Generator Basics

Simple Example: Count to 10

// Define a generator
async function* countToTen(): AsyncIterable<number> {
  for (let i = 1; i <= 10; i++) {
    console.log(`Yielding ${i}`);
    yield i;
  }
}

// Use it
for await (const num of countToTen()) {
  console.log(`Got ${num}`);
}

// Output:
// Yielding 1
// Got 1
// Yielding 2
// Got 2
// ...

Key insight: The generator pauses at yield. The consumer controls when to call next().

Simulate Slow I/O

async function* slowNumbers(): AsyncIterable<number> {
  for (let i = 0; i < 5; i++) {
    await new Promise(resolve => setTimeout(resolve, 1000)); // Simulate I/O
    yield i;
  }
}

// Consumer gets each number one per second
for await (const num of slowNumbers()) {
  console.log(`Got ${num} at ${new Date().toISOString()}`);
}

Generators with Side Effects

async function* enumerateAndLog(path: string): AsyncIterable<File> {
  const entries = await fs.promises.readdir(path);

  for (const entry of entries) {
    const fullPath = `${path}/${entry}`;
    const stat = await fs.promises.stat(fullPath);

    console.log(`Found: ${fullPath} (${stat.size} bytes)`);

    yield {
      path: fullPath,
      size: stat.size,
      mtime: stat.mtime,
    };
  }
}

// Each yield is visible
for await (const file of enumerateAndLog('/large/drive')) {
  // Process one file at a time
  console.log(`Processing ${file.path}`);
}

Building Streaming Pipelines

Simple Pipeline: Filter, Transform, Reduce

// Step 1: Generator that yields raw files
async function* enumerateFiles(path: string): AsyncIterable<FileStat> {
  const entries = await fs.promises.readdir(path);
  for (const entry of entries) {
    const stat = await fs.promises.stat(`${path}/${entry}`);
    yield { name: entry, ...stat };
  }
}

// Step 2: Filter (only large files)
async function* filterLargeFiles(
  source: AsyncIterable<FileStat>,
  minSize: number
): AsyncIterable<FileStat> {
  for await (const file of source) {
    if (file.size >= minSize) {
      yield file;
    }
  }
}

// Step 3: Transform (add hash)
async function* addHashes(
  source: AsyncIterable<FileStat>
): AsyncIterable<FileStat & { hash: string }> {
  for await (const file of source) {
    const content = await fs.promises.readFile(file.path);
    const hash = crypto.createHash('sha256').update(content).digest('hex');
    yield { ...file, hash };
  }
}

// Step 4: Reduce (accumulate duplicates)
async function findDuplicates(
  source: AsyncIterable<FileStat & { hash: string }>
): Promise<Map<string, FileStat[]>> {
  const dupes = new Map<string, FileStat[]>();

  for await (const file of source) {
    if (!dupes.has(file.hash)) {
      dupes.set(file.hash, []);
    }
    dupes.get(file.hash)!.push(file);
  }

  return dupes;
}

// Compose the pipeline
async function findLargeFileDupes(path: string) {
  const files = enumerateFiles(path);
  const large = filterLargeFiles(files, 1024 * 1024); // 1MB+
  const hashed = addHashes(large);
  const dupes = await findDuplicates(hashed);

  return dupes;
}

Key principle: Each step yields, the next step consumes. Backpressure is automatic.

Progress Reporting

async function* addProgress<T>(
  source: AsyncIterable<T>,
  label: string
): AsyncIterable<T> {
  let count = 0;
  const startTime = Date.now();
  let lastPrint = startTime;

  for await (const item of source) {
    count++;
    const now = Date.now();

    // Print every 2 seconds
    if (now - lastPrint >= 2000) {
      const elapsed = (now - startTime) / 1000;
      const rate = count / elapsed;
      console.log(
        `${label}: ${count} items in ${elapsed.toFixed(1)}s (${rate.toFixed(0)}/s)`
      );
      lastPrint = now;
    }

    yield item;
  }

  const elapsed = (Date.now() - startTime) / 1000;
  console.log(`${label}: Done. ${count} items in ${elapsed.toFixed(1)}s`);
}

// Usage: wrap any generator
async function findLargeFileDupes(path: string) {
  const files = enumerateFiles(path);
  const withProgress = addProgress(files, 'Enumeration');
  const large = filterLargeFiles(withProgress, 1024 * 1024);
  const hashed = addHashes(large);
  const dupes = await findDuplicates(hashed);

  return dupes;
}

Batch Processing (Still Streaming)

Sometimes you need to batch items for efficiency (e.g., database inserts):

async function* batch<T>(
  source: AsyncIterable<T>,
  size: number
): AsyncIterable<T[]> {
  let batch: T[] = [];

  for await (const item of source) {
    batch.push(item);

    if (batch.length >= size) {
      yield batch;
      batch = [];
    }
  }

  // Yield remaining
  if (batch.length > 0) {
    yield batch;
  }
}

// Usage: batch 200 files, insert together
async function insertFiles(path: string, db: Database) {
  const files = enumerateFiles(path);
  const batches = batch(files, 200);

  for await (const batch of batches) {
    // Insert all 200 at once (more efficient)
    db.transaction(() => {
      for (const file of batch) {
        db.prepare('INSERT INTO files VALUES (?, ?)').run(file.name, file.size);
      }
    })();
  }
}

Backpressure Without Thinking

Backpressure means: if the consumer is slow, the producer slows down. Async generators do this automatically.

Without Backpressure (Problem)

// Promise.all: starts all operations immediately
async function slowProcessing() {
  const files = ['file1', 'file2', 'file3', /* ... 1000 more ... */];

  // Starts hashing all 1000+ files immediately
  // Memory explodes
  const hashed = await Promise.all(files.map(f => hashFile(f)));

  // Now process one at a time
  for (const hash of hashed) {
    await insertIntoDb(hash);
    await new Promise(r => setTimeout(r, 100)); // Slow insertion
  }
}

This loads all 1000+ hashes into memory before starting the slow insertion.

With Backpressure (Solution)

async function streamProcessing(path: string) {
  // Enumerate one at a time
  for await (const file of enumerateFiles(path)) {
    // Hash one file
    const hash = await hashFile(file);

    // Insert one file (slow)
    await insertIntoDb(hash);
    // ^ Pauses here. enumeration doesn't start the next file
    //   until this insertion finishes.
  }
}

The loop pauses at each for await, giving the consumer control. If insertion is slow (100ms), enumeration waits 100ms before fetching the next file.

Measuring Backpressure

async function* enumerateFiles(path: string): AsyncIterable<File> {
  for (const file of files) {
    console.log(`Yielding ${file.name}`);
    yield file; // Pauses here until consumer calls next()
    console.log(`Consumer done with ${file.name}`);
  }
}

// Fast consumer
async function fastConsumer() {
  console.log('Start');
  for await (const file of enumerateFiles('/path')) {
    // Does nothing (very fast)
  }
  console.log('End');
}

// Output:
// Start
// Yielding file1
// Consumer done with file1
// Yielding file2
// Consumer done with file2
// End
// (immediate, producer and consumer pace matches)

// Slow consumer
async function slowConsumer() {
  console.log('Start');
  for await (const file of enumerateFiles('/path')) {
    await new Promise(r => setTimeout(r, 1000)); // 1 second per file
  }
  console.log('End');
}

// Output:
// Start
// Yielding file1
// (1 second delay as consumer sleeps)
// Consumer done with file1
// Yielding file2
// (1 second delay as consumer sleeps)
// Consumer done with file2
// End
// (slow, because consumer is slow)

Result: Memory stays constant. No buffering. Natural flow.


Real Examples from Replicator

Replicator uses async generators throughout the pipeline:

Enumerate Stream

// From src/layers/pure/scan/enumerate.ts
export async function* enumerateTree(
  rootPath: string,
  volumeId: number,
  deps: EnumerationDeps,
  emit: (event: EnumerateEvent) => void
): AsyncIterable<void> {
  // Yields progress events and discovery events
  // Caller processes one event at a time
  // Memory for pending discoveries stays bounded

  const queue: string[] = [rootPath];
  let dirsVisited = 0;

  while (queue.length > 0) {
    const dirPath = queue.shift()!;
    const entries = await deps.listDir(dirPath);

    for (const entry of entries) {
      if (entry.isDirectory) {
        queue.push(/* ... */);
      } else if (entry.isFile) {
        emit({ type: 'file', path: entry });
      }
    }

    // Yield progress (could be consumed by UI for real-time updates)
    yield undefined;
    dirsVisited++;
  }
}

Reconcile Stream

// Deduplication as a stream
export async function* reconcile(
  scans: ScanResult[],
  deps: ReconciliationDeps
): AsyncIterable<ReconciliationPlan> {
  // Group files by hash
  const dupHashes = await deps.findDuplicateHashes(scans);

  // Yield consolidation plans one at a time
  for (const [hash, files] of dupHashes.entries()) {
    const plan: ReconciliationPlan = {
      hash,
      copies: files,
      sourceOfTruth: deps.selectSourceOfTruth(files),
      consolidateVia: 'hardlink',
      estimatedRecovery: files.reduce(f => f.size) - files[0].size,
    };

    yield plan; // One plan at a time
  }
}

Classify Stream

// AI classification as a stream
export async function* classify(
  files: FileStat[],
  deps: ClassificationDeps
): AsyncIterable<ClassificationResult> {
  // Process files in batches for efficiency
  const batches = batch(files, 100);

  for await (const fileBatch of batches) {
    // Classify all files in batch with one AI call
    const results = await deps.inferCategories(fileBatch);

    for (const result of results) {
      yield result; // Stream results as they arrive
    }
  }
}

Composition Patterns

Pattern 1: Linear Pipeline

// filter → transform → reduce
const files = enumerateFiles(path);
const large = filterLargeFiles(files, 1MB);
const hashed = addHashes(large);
const dupes = await findDuplicates(hashed);

Pattern 2: Fan-Out (Multiple Consumers)

// One producer, multiple consumers
async function processAndReport(path: string) {
  const files = enumerateFiles(path);

  // Consumer 1: Count files
  const countPromise = (async () => {
    let count = 0;
    for await (const _ of files) {
      count++;
    }
    return count;
  })();

  // Consumer 2: Sum sizes
  const sizePromise = (async () => {
    let total = 0;
    for await (const file of files) {
      total += file.size;
    }
    return total;
  })();

  // Wait for both (but they share the same producer!)
  const [count, totalSize] = await Promise.all([countPromise, sizePromise]);
  return { count, totalSize };
}

// Problem: This doesn't work. Generators can't be shared!
// Solution: Use tee() to split the stream

Pattern 3: Tee (Split Stream)

// Split one stream into multiple independent streams
async function* tee<T>(
  source: AsyncIterable<T>,
  count: number = 2
): AsyncIterable<AsyncIterable<T>[]> {
  const buffers: T[][] = Array.from({ length: count }, () => []);
  const indices: number[] = Array.from({ length: count }, () => 0);

  let done = false;
  let sourceIndex = 0;

  const makeConsumer = (consumerIndex: number) => ({
    async *[Symbol.asyncIterator](): AsyncIterable<T> {
      while (true) {
        // If we've fallen behind, read more from source
        while (indices[consumerIndex] >= buffers[consumerIndex].length && !done) {
          for await (const item of source) {
            for (let i = 0; i < count; i++) {
              buffers[i].push(item);
            }
            break;
          }
          done = true;
        }

        // Yield buffered items
        if (indices[consumerIndex] < buffers[consumerIndex].length) {
          yield buffers[consumerIndex][indices[consumerIndex]++];
        } else {
          break;
        }
      }
    },
  });

  yield Array.from({ length: count }, (_, i) => makeConsumer(i)[Symbol.asyncIterator]());
}

// Usage
async function processWithMultipleConsumers(path: string) {
  const files = enumerateFiles(path);
  const [counter, sizer] = await tee(files, 2);

  const [count, totalSize] = await Promise.all([
    (async () => {
      let count = 0;
      for await (const _ of counter) count++;
      return count;
    })(),
    (async () => {
      let total = 0;
      for await (const file of sizer) total += file.size;
      return total;
    })(),
  ]);

  return { count, totalSize };
}

(In practice, use a library like async-iter-tools for tee and other utilities.)


Error Handling

Try-Catch in Generators

async function* safeEnumerate(path: string) {
  try {
    for (const entry of await fs.promises.readdir(path)) {
      try {
        const stat = await fs.promises.stat(`${path}/${entry}`);
        yield { name: entry, ...stat };
      } catch (err) {
        // Log individual file errors, continue
        console.warn(`Error on ${entry}: ${err.message}`);
      }
    }
  } catch (err) {
    // Directory read failed
    console.error(`Cannot read ${path}: ${err.message}`);
  }
}

Error Events

async function* enumerateWithErrors(
  path: string
): AsyncIterable<File | Error> {
  for (const entry of await fs.promises.readdir(path)) {
    try {
      const stat = await fs.promises.stat(`${path}/${entry}`);
      yield { name: entry, ...stat };
    } catch (err) {
      // Yield error instead of throwing
      yield err as Error;
    }
  }
}

// Consumer decides how to handle
for await (const result of enumerateWithErrors(path)) {
  if (result instanceof Error) {
    console.error(`Error: ${result.message}`);
  } else {
    console.log(`File: ${result.name}`);
  }
}

Cancellation with AbortSignal

async function* cancellableEnumerate(
  path: string,
  signal: AbortSignal
): AsyncIterable<File> {
  for (const entry of await fs.promises.readdir(path)) {
    if (signal.aborted) break;

    const stat = await fs.promises.stat(`${path}/${entry}`);
    yield { name: entry, ...stat };
  }
}

// Usage
const controller = new AbortController();
setTimeout(() => controller.abort(), 5000); // Cancel after 5s

for await (const file of cancellableEnumerate(path, controller.signal)) {
  console.log(file);
}

Testing Async Generators

Unit Test (No I/O)

it('filters large files', async () => {
  // Create a mock generator
  async function* mockFiles() {
    yield { name: 'small.txt', size: 100 };
    yield { name: 'large.bin', size: 10_000_000 };
    yield { name: 'medium.jpg', size: 500_000 };
  }

  const large = filterLargeFiles(mockFiles(), 1_000_000);
  const results = [];

  for await (const file of large) {
    results.push(file);
  }

  expect(results).toHaveLength(2); // small.txt and medium.jpg filtered out
  expect(results[0].name).toBe('large.bin');
});

Streaming Test (Verify Backpressure)

it('respects backpressure', async () => {
  let producedCount = 0;
  let consumedCount = 0;

  async function* countingGenerator() {
    for (let i = 0; i < 10; i++) {
      producedCount++;
      yield i;
    }
  }

  const slow = async () => {
    for await (const num of countingGenerator()) {
      consumedCount++;
      await new Promise(r => setTimeout(r, 10)); // Simulate slow consumer
      // Assert producer hasn't gotten too far ahead
      expect(producedCount - consumedCount).toBeLessThanOrEqual(1);
    }
  };

  await slow();
});

Integration Test (Full Pipeline)

it('processes files end-to-end', async () => {
  // Create temp directory
  const dir = await fs.promises.mkdtemp(path.join(tmpdir(), 'test-'));
  await fs.promises.writeFile(`${dir}/file1.txt`, 'content1');
  await fs.promises.writeFile(`${dir}/file2.txt`, 'content2');

  // Run pipeline
  const dupes = await findLargeFileDupes(dir);

  // Verify results
  expect(dupes.size).toBeGreaterThanOrEqual(0);

  // Cleanup
  await fs.promises.rm(dir, { recursive: true });
});

Comparison with Alternatives

vs. RxJS Observables

RxJS is powerful but adds cognitive overhead:

// RxJS: declarative, functional
from(getFiles()).pipe(
  filter(f => f.size > 1MB),
  mergeMap(f => hashFile(f)),
  tap(f => console.log(f)),
  toArray()
).subscribe(results => console.log(results));

// Async generators: imperative, normal code
const files = getFiles();
const large = filterLargeFiles(files, 1MB);
const hashed = addHashes(large);

for await (const result of hashed) {
  console.log(result);
}

Trade-off: RxJS is better for complex combinators (merge, race, retry). Async generators are simpler for sequential pipelines.

vs. Node Streams

Node streams are low-level:

// Node streams: plumbing-heavy
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('done'));

// Async generators: simple composition
async function* readFile(file: string) {
  const content = await fs.promises.readFile(file);
  yield content;
}

async function* gzipFiles(source: AsyncIterable<Buffer>) {
  for await (const content of source) {
    yield zlib.gzipSync(content); // Sync here for simplicity
  }
}

async function* writeToFile(source: AsyncIterable<Buffer>, file: string) {
  for await (const content of source) {
    await fs.promises.writeFile(file + '.gz', content);
    yield content;
  }
}

// Compose
for await (const _ of writeToFile(gzipFiles(readFile(inputFile)), outputFile)) {
  // Done
}

Trade-off: Node streams are more efficient for binary I/O. Async generators are simpler for business logic pipelines.

vs. Message Queues (RabbitMQ, Kafka)

Message queues add operational overhead:

// Message queue: external infrastructure
const queue = new RabbitMQ();
await queue.connect();

// Producer
async function produceFiles() {
  for (const file of getFiles()) {
    await queue.publish('files', JSON.stringify(file));
  }
}

// Consumer
queue.consume('files', async (msg) => {
  const file = JSON.parse(msg.content);
  await processFile(file);
  msg.ack();
});

// Async generators: in-process
async function* produceFiles() {
  for (const file of getFiles()) {
    yield file;
  }
}

for await (const file of produceFiles()) {
  await processFile(file);
}

Use case: Message queues when you need distributed processing across multiple machines. Async generators when you’re processing within a single process.


Real-World Size Comparison

Replicator on 4TB Drive

Without streaming (batch):

With async generators (streaming):


Key Takeaways

  1. Use async generators for sequential pipelines: Natural syntax, automatic backpressure
  2. Yield one item at a time: Memory constant, progress visible
  3. Compose with simple helpers: filter, map, batch, progress, tee
  4. Backpressure is automatic: Consumer pace controls producer pace
  5. Test with mocks: Generators work great with test data
  6. Use libraries for advanced patterns: async-iter-tools, iterable-operators

References


Edit page
Share this post on:

Previous Post
Bridging Local Compute and Cloud APIs
Next Post
Layered Architecture for Distributed Data Systems