Code examples for utilizing NodeJS Streams can be found in my GitHub repository.

What are Readable Streams? Link to heading

In Node.js, a Stream is an abstract interface for working with streaming data. A Readable Stream is an abstraction for a source from which data can be consumed. They are instances of the stream.Readable class, which is an EventEmitter and emits events that can be listened to (given the event-based architecture of Node, most interfaces extend the EventEmitter - you’ll see this a lot both here in the streams sections, as well as when we get to process and signal handling).

Readable Streams are incredibly powerful for handling large amounts of data efficiently, as they allow you to process data piece by piece as it becomes available, rather than waiting for the entire data set to be loaded into memory.


Implementing a Readable Stream Link to heading

To implement a custom Readable Stream, you have two options:

Option 1: Extending the stream.Readable class: Link to heading

const { Readable } = require('stream');

class MyReadableStream extends Readable {
  constructor(options) {
    super(options);
    this.readCount = 0;
  }

  _read(size) {
    this.readCount++;

    if (this.readCount > 10) {
      this.push(null);
    } else {
      setTimeout(() => {
        this.push(`Chunk ${this.readCount}\n`);
      }, 100);
    }
  }
}

const myStream = new MyReadableStream();
myStream.pipe(process.stdout);

Option 2: Using the alternative method approach: Link to heading

const { Readable } = require('stream');

const myStream = new Readable({
  read(size) {
    this.readCount = this.readCount || 0;
    this.readCount++;

    if (this.readCount > 10) {
      this.push(null);
    } else {
      setTimeout(() => {
        this.push(`Chunk ${this.readCount}\n`);
      }, 100);
    }
  }
});

myStream.pipe(process.stdout);

In both approaches, the custom Readable Stream emits chunks of data until it reaches a certain count, after which it signals the end of the stream. In Node.js, there are a few ways to signal the end of the read:

  • Pushing null: By calling push(null) in the _read or read method of the Readable Stream, it signals that there are no more data chunks to be emitted. This indicates to the consumer that the end of the stream has been reached.

  • Emitting the ’end’ event: Another way to signal the end of the read is by emitting the ’end’ event from the Readable Stream. This can be done by calling this.emit(’end’) in the _read or read method when there is no more data to emit.

When the consumer of the Readable Stream receives the end signal, it can take appropriate actions such as closing file handles, finalizing data processing, or cleaning up resources. Signaling the end of the read is crucial for proper handling of streams, ensuring that consumers can detect when all the data has been processed and avoid any potential blocking or waiting indefinitely for more data, allowing them to handle the end of the stream gracefully.


Common Readable Streams Link to heading

There are numerous built-in methods that return a Readable Stream, each designed to handle a different type of data source. Here are some examples:

File System Streams (fs.createReadStream()): These streams are used for reading data from files. They allow you to process large files piece by piece, reducing memory consumption and improving performance.

const fs = require('fs');
const readStream = fs.createReadStream('input.txt');

HTTP Request Streams (http.IncomingMessage): In an HTTP server, each request is a readable stream. You can process the request body as a stream of data, which is particularly useful for handling large file uploads.

const http = require('http');

const server = http.createServer((req, res) => {
    let body = '';
    req.on('data', (chunk) => {
        body += chunk;
    });
});

Zlib Streams (zlib.createGunzip(), zlib.createInflate()): These streams are used for decompressing gzip or deflate compressed data. You can pipe a file stream or any other readable stream into a zlib stream to decompress data on the fly.

const fs = require('fs');
const zlib = require('zlib');

const readStream = fs.createReadStream('input.txt.gz');
const unzip = zlib.createGunzip();
readStream.pipe(unzip);

Crypto Streams (crypto.createDecipheriv()): These streams are used for decrypting data. You can pipe a file stream or any other readable stream into a crypto stream to decrypt data on the fly.

const fs = require('fs');
const crypto = require('crypto');

const algorithm = 'aes-256-cbc';
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);

const readStream = fs.createReadStream('encrypted.txt');
const decipher = crypto.createDecipheriv(algorithm, key, iv);
readStream.pipe(decipher);

In each of these examples, we create a Readable Stream and listen to the ‘data’, ’end’, and ’error’ events to handle the data as it becomes available, know when there is no more data, and handle any errors, respectively.

That’s it for this post! In the next post, we’ll take a look at Writable Streams.