Learn The Basics Of Duplex Streams

Duplex Streams for Reading And Writing

duplex streams implement both the readable and writable streams in one interface.
Duplex streams can both "listen" for data events, like a readStream, and can also write() to the stream, like writeStreams.

An Example Of A Duplex Stream with the net module

Here, the net module is used as a quick example of a duplex stream.
The server:

  • built with createServer
  • listens for connections
  • on an interval writes server beat
  • calls the end internally when the client disconnects

The client

  • written with the connect method
  • includes the write method, essentially writing to itself
  • listens for data, here from the server, and logs a string with the data
  • after 2 intervals will signal to itself all done, then call the .end() method, signaling to the server that this client instance is done connecting to the server
const { createServer, connect } = require('net');

const TIMEOUTS = {
  serverBeat: 1000,
  client: {
    allDone: 3250,
    end: 250
  },
};

// SERVER
createServer((socket) => {
  
  const interval = setInterval(() => {
    console.log('server writing beat')
    
    socket.write('server beat');
  }, TIMEOUTS.serverBeat);

  socket.on('data', (data) => {
    socket.write(data.toString().toUpperCase());
  });
  
  socket.on('end', () => {
    console.log('server socket ended!')
    
    clearInterval(interval);
  });
})
  .listen(3000);


// CLIENT
const netClient = connect(3000)

netClient.on('data', (data) => {
  console.log('client received data:', data.toString());
});
netClient.write('starting client');

setTimeout(() => {
  netClient.write('all done');
  setTimeout(() => {
    console.log('client .end() here')
    
    netClient.end();
  }, TIMEOUTS.client.end);
}, TIMEOUTS.client.allDone);

Running the above will output...

client received data: STARTING CLIENT
server writing beat
client received data: server beat
server writing beat
client received data: server beat
server writing beat
client received data: server beat
client received data: ALL DONE
client .end() here
server socket ended!

An Example Of A Duplex Stream with the native gzip module

const { createGzip } = require('zlib');

const gzipStream = createGzip();

function onData(data) {
  const base64Data = data.toString('base64');
  console.log('base64Data :', base64Data);
  
}

gzipStream.on('data', onData);
gzipStream.write('a string here');
setTimeout(() => {
  gzipStream.end('another string here');
}, 500);

Running the above will output:

base64Data : H4sIAAAAAAAAEw==
base64Data : S1QoLinKzEtXyEgtSk3Myy8B0shCAA8rMZwgAAAA

An Example of A Duplex Stream with the native crypto module

const { Transform } = require('stream');
const { scrypt } = require('crypto');
const CRYPTO_SALT = 'random-string-here'
const dataToWrite = ['first string', 'second string', 'third string'];

const createTransformStream = () => {
  return new Transform({
    decodeStrings: false,
    encoding: 'hex',
    transform(chunk, enc, next) {
      scrypt(chunk, CRYPTO_SALT, 32, (err, key) => {
        if (err) {
          next(err);
          return;
        }
        next(null, key);
      });
    },
  });
};
const transformStream = createTransformStream();

transformStream.on('data', (data) => {
  console.log('got data:', data);
});

dataToWrite.forEach((str, itmIdx) => {
  if (itmIdx !== dataToWrite.length - 1) {
    transformStream.write(str);
  } else {
    transformStream.end(str);
  }
});

An Example using the finished utility

This is a minor adjustment of the above net server example

const { createServer, connect } = require('net');
const { finished } = require('stream')
const TIMEOUTS = {
  serverBeat: 1000,
  client: {
    allDone: 3250,
    end: 250
  },
};

// SERVER
createServer((socket) => {
  
  const interval = setInterval(() => {
    console.log('server writing beat')
    
    socket.write('server beat');
  }, TIMEOUTS.serverBeat);

  socket.on('data', (data) => {
    socket.write(data.toString().toUpperCase());
  });
  
  // instead of using...
  // socket.on('end', () => {
  //   console.log('server socket ended!') 
  //   clearInterval(interval);
  // });

  // here is the 'finished' function in action
  finished(socket, (err) => {
    if (err) {
      console.error('there was a socket error', err)
    }
    clearInterval(interval) 
  })
})
  .listen(3000);
Page Tags:
node
streams
core