pipeline() vs pipe() for Memory-Safe Streams

If a Node.js process’s file-descriptor count and RSS climb every time a stream chain errors out, the cause is almost always .pipe() being used without manual destroy() calls on every stream in the chain — the fix is stream.pipeline(), which is the error-handling and cleanup layer that the wider stream backpressure & memory guide (part of Node.js Server-Side Memory Management) assumes you already have in place.

Symptom Root Cause Immediate Action
RSS climbs after failed uploads .pipe() skips destroy on error Swap the chain to stream.pipeline()
EMFILE errors under load FDs left open after write errors Destroy every stream in error handlers
Transform buffer keeps growing Nothing drains it after a downstream error Use pipeline() to destroy all stages at once
Unhandled error crashes process .pipe() needs a listener per stream Use pipeline()'s single callback/promise
Duplicate 'end'/'close' handlers fire Manual cleanup added on top of .pipe() Remove ad-hoc listeners; let pipeline() own it

Root Cause

.pipe() was designed in an era when Node.js streams assumed a happy path: data flows from a readable to a writable, and the writable’s finish event is the only outcome anyone cared about. It wires up data/drain listeners to respect backpressure correctly, but it does not wire up error propagation between the streams it connects. If the destination writable emits error — a full disk, a closed socket, a permission failure — the source readable is never told to stop. It keeps reading from disk or network and pushing chunks into whatever transform sits between them, and that transform keeps buffering because the destination that would normally drain it has already died. The read-side file descriptor also stays open indefinitely, because nothing ever calls .destroy() on it.

This is the exact failure mode covered when diagnosing unbounded buffering in stream chains: an internal buffer that should be bounded by highWaterMark keeps accumulating because the consumer side of the pipe has effectively vanished without formally ending the stream. Multiply this by one leaked readable + one leaked transform per failed request in a high-throughput upload or export endpoint, and both open file descriptors and retained buffer memory grow monotonically until the process hits EMFILE or its RSS budget.

stream.pipeline() (added in Node.js 10, and available as a Promise-returning function from node:stream/promises since Node.js 15) solves this by wrapping every stream you pass it in shared error and completion tracking. The moment any stream in the chain emits error, or the chain finishes successfully, pipeline() calls .destroy() on every other stream in the chain — closing file descriptors, releasing internal buffers, and removing the listeners it attached. It then reports exactly one outcome: the completion callback fires once, or the returned promise settles once. Backpressure handling is unchanged — pipeline() still uses .pipe() internally under the hood — so switching to it costs nothing in throughput and removes an entire class of leak.

The diagram below traces both code paths from the same downstream write error to their different memory outcomes:

pipe() vs pipeline() on Downstream Error Left panel: source.pipe(transform).pipe(dest) where dest errors; source and transform remain open, file descriptor stays open, buffer keeps growing. Right panel: the same three streams passed to pipeline(); when dest errors, pipeline destroys source, transform, and dest together, closing the file descriptor and freeing the buffer. .pipe() chain pipeline() chain source gzip dest write error (disk full) source + gzip never destroyed read fd stays open gzip internal buffer keeps growing — nothing drains it after dest died RSS + open FDs rise per failure Requires manual .destroy() calls on every stream to avoid this source gzip dest write error (disk full) pipeline() catches the error and destroys source + gzip + dest gzip internal buffer freed the instant destroy() runs on the transform stream RSS + FDs return to baseline One callback / one rejected promise reports the failure

Step-by-Step Fix

Step 1 — Reproduce the leak with .pipe(). Build the failing chain exactly as it exists in your codebase, for example a file read piped through zlib.createGzip() into a write stream. Force the downstream write to fail (point it at a read-only path or a full disk). Verification checkpoint: the process does not crash, but the read stream’s error/close events never fire.

Step 2 — Confirm the open file descriptor. While the process is still running, list its open files:

lsof -p $(pgrep -f your-app.js) | grep big-input.log

Expected output: the source file still appears in the listing minutes after the write failure, confirming the descriptor was never closed.

Step 3 — Replace .pipe() with stream.pipeline(). Import pipeline from node:stream (callback style) or node:stream/promises (async/await), and pass the same streams as positional arguments instead of chaining .pipe() calls. Verification checkpoint: the code change is confined to the call site — no changes needed to the individual stream implementations.

Step 4 — Trigger the identical failure again. Repeat the same disk-full or permission-denied condition used in Step 1. Expected output: the pipeline() callback fires once with the underlying error, or the promise rejects once — never both, and never silently.

Step 5 — Re-check descriptors and memory. Run the same lsof command from Step 2, and log process.memoryUsage().rss before and after the failure in a loop of repeated attempts. Expected output: the file descriptor disappears from lsof immediately after the callback fires, and RSS stays flat across repeated failed attempts instead of climbing.


Command & Code Reference

Use-case: a .pipe() chain that leaks a file descriptor on write failure.

// LEAKY: .pipe() has no built-in error propagation
const fs = require('node:fs');
const zlib = require('node:zlib');

const src = fs.createReadStream('big.log');
const gz = zlib.createGzip();
const dst = fs.createWriteStream('/full-disk/big.log.gz');

src.pipe(gz).pipe(dst);
// If `dst` errors (e.g. ENOSPC), `src` and `gz` are
// never destroyed. `src`'s read fd stays open and
// `gz`'s internal buffer keeps accumulating chunks
// that nobody is left to drain.
dst.on('error', (err) => {
  console.error('write failed:', err.message);
  // src.destroy() and gz.destroy() are missing here —
  // this is the leak.
});

Use-case: the same chain rewritten with stream.pipeline().

// FIXED: pipeline() destroys all streams on any error
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

pipeline(
  fs.createReadStream('big.log'),
  zlib.createGzip(),
  fs.createWriteStream('/full-disk/big.log.gz'),
  (err) => {
    // Fires exactly once. By the time this callback
    // runs, pipeline() has already called .destroy()
    // on every stream, closing file descriptors and
    // releasing the gzip transform's internal buffer.
    if (err) {
      console.error('pipeline failed:', err.message);
    } else {
      console.log('pipeline finished, fds released');
    }
  }
);

Use-case: the promise-based form for async/await code paths.

// stream/promises pipeline — same cleanup guarantee
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function compress(inPath, outPath) {
  try {
    await pipeline(
      fs.createReadStream(inPath),
      zlib.createGzip(),
      fs.createWriteStream(outPath)
    );
    // Resolves only once every stream has finished
    // AND been destroyed cleanly.
    console.log('compression complete');
  } catch (err) {
    // Rejects with the first error; all streams in
    // the chain are already destroyed at this point.
    console.error('compression failed:', err.message);
  }
}

compress('access.log', 'access.log.gz');

Verification & Regression Prevention

Track these two metrics before shipping a stream refactor and again after every deploy that touches stream chains:

  • Open file descriptors per failed request should be 0. Run lsof -p <pid> | wc -l before and after a batch of 100 forced failures against a staging endpoint; the count must return to its pre-test baseline.
  • RSS growth across 100 repeated failures should stay under 5 MB. Log process.memoryUsage().rss in a loop; a chain that still uses bare .pipe() typically shows several hundred KB to a few MB of growth per failed attempt that never recovers.

Add a lint rule to catch regressions before they reach staging: configure eslint-plugin-n’s n/no-unsupported-features/node-builtins alongside a custom rule (or a simple grep in a pre-commit hook) that flags any .pipe( call with more than one stream in the chain:

# pre-commit hook: fail if a multi-stage .pipe() chain
# is added without a corresponding pipeline() import
git diff --cached -U0 -- '*.js' | \
  grep -E '\.pipe\(.*\)\.pipe\(' && \
  echo "Use stream.pipeline() for multi-stage chains" && \
  exit 1
exit 0

For runtime monitoring, alert when a Node.js process’s rss (from process.memoryUsage(), exported via your metrics agent) grows by more than 10% over a rolling 15-minute window with no corresponding increase in request volume — this is the signature of accumulating dangling buffers from unclosed stream chains rather than expected traffic-driven growth.


Frequently Asked Questions

Does pipeline() eliminate the need to handle backpressure manually?

No. Both .pipe() and pipeline() respect the same highWaterMark and drain/pause signalling internally — pipeline() is built on top of .pipe() plumbing, not a replacement for it. What pipeline() adds is guaranteed error propagation and cleanup across every stream in the chain; the actual backpressure mechanics are unchanged.

Can I mix pipe() and pipeline() in the same codebase safely?

Yes, but only where every .pipe() chain has explicit error listeners on each stream that call .destroy() on its neighbours. In practice that discipline is easy to lose during refactors, which is why most teams standardize on pipeline() for any chain with more than one stream, or any chain touching a file descriptor or socket.

Does stream/promises pipeline behave differently from the callback-based stream.pipeline()?

Functionally they are identical — the same destroy-on-error guarantee and the same backpressure handling. The node:stream/promises version returns a Promise instead of accepting a completion callback, which is preferable in async/await code because a rejected pipeline() promise cannot be silently forgotten the way an unchecked callback argument sometimes is.