Code examples for utilizing NodeJS Streams can be found in my GitHub repository.
We previously explored Readable streams, which supply data, and Writable streams, which consume it. But what if we need a stream that can do both? Enter Duplex streams.
What are Duplex Streams? Link to heading
Duplex streams, as the name suggests, are streams that implement both the Readable and Writable interfaces. That means they’re both sources and destinations of data. They’re instances of the stream.Duplex class, and because they combine the functionality of both Readable and Writable streams, they emit events similar to both types.
Duplex streams are useful in scenarios where data can flow in both directions but are independent of each other, such as TCP sockets. In a TCP connection, a server can send data to a client and a client can send data to a server independently of each other.
Implementing a Duplex Stream Link to heading
You can create a Duplex stream by calling the stream.Duplex() constructor or by extending the stream.Duplex class and implementing its _read()
and _write()
methods. Here’s a simple example:
const { Duplex } = require('stream');
const duplexStream = new Duplex({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
},
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
duplexStream.currentCharCode = 65;
process.stdin.pipe(duplexStream).pipe(process.stdout);
In this example, when data is written to the duplex stream (using duplexStream.write(data)
), it outputs the data to the console. When it’s read from the duplex stream (using duplexStream.read()), it pushes the next letter of the alphabet, until it has pushed all letters and ends the Readable side of the stream.
Common Duplex Streams Link to heading
TCP Sockets (net.Socket
): A TCP socket in Node.js is an instance of a Duplex stream. It can be used to read from and write to the connected client. Here’s an example:
const net = require('net');
const server = net.createServer((socket) => {
socket.write('Hello from server!\n');
socket.on('data', (data) => {
console.log(data.toString());
});
});
server.listen(8000);
In this example, socket is a Duplex stream. The server writes a greeting message to the client and listens for data from the client.
HTTP Server Response and Request (http.ServerResponse
, http.IncomingMessage
): The response object (res) in an HTTP server in Node.js is an instance of a Writable stream, while the request object (req) is an instance of a Readable stream. However, in HTTP/2, the request and response objects are both instances of Duplex streams. This allows for scenarios such as server push, where the server can initiate a stream and push resources to the client.
WebSockets: In contrast to HTTP, where the server responds only when the client sends a request, WebSockets allow for two-way communication between the client and server. A WebSocket connection in Node.js is an instance of a Duplex stream. Here’s an example with the popular ws
package:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
ws.on('message', (message) => {
console.log('Received: %s', message);
});
ws.send('Hello from server!');
});
In this example, ws
is a Duplex stream that allows the server to listen for messages from the client and send messages to the client.
Socket.io: socket.io is a library that allows real-time, bidirectional, and event-based communication between the browser and the server. A socket in Node.js is a Duplex stream. Here’s an example:
const io = require('socket.io')(3000);
io.on('connection', (socket) => {
socket.on('client event', (data) => {
console.log(data);
});
socket.emit('server event', { hello: 'world' });
});
In this example, socket is a Duplex stream. The server listens for ‘client event’ from the client and sends ‘server event’ to the client.
gRPC (grpc.ServerDuplexStream
): gRPC is a high-performance, open-source framework developed by Google. It is designed to enable the development of efficient and scalable microservices. In gRPC, a server can handle requests from a client and send responses back to the client through a Duplex stream.
Here’s an example:
const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const packageDefinition = protoLoader.loadSync('chat.proto');
const chat_proto = grpc.loadPackageDefinition(packageDefinition).chat;
const server = new grpc.Server();
server.addService(chat_proto.Chat.service, { chat: call => {
call.on('data', function(message) {
console.log('Client: ' + message.text);
call.write({text: 'Hello from server'});
});
call.on('end', function() {
call.end();
});
}});
server.bind('0.0.0.0:9000', grpc.ServerCredentials.createInsecure());
server.start();
In this example, call is a Duplex stream. The server listens for messages from the client and sends messages back to the client.
These examples show how Duplex streams are used in popular high-level packages, providing a powerful and flexible interface for bidirectional data flow.
Now that we’ve covered Duplex streams, let’s take a look at the last of the streams on our list - Transform streams, which are a special type of Duplex stream.