Home

Duplex Streams for Reading And Writing

duplex streams implement both the readable and writable streams in one interface.
Duplex streams can both "listen" for data events, like a readStream, and can also write() to the stream, like writeStreams.

An Example Of A Duplex Stream with the net module

Here, the net module is used as a quick example of a duplex stream.
The server:

  • built with createServer
  • listens for connections
  • on an interval writes server beat
  • calls the end internally when the client disconnects

The client

  • written with the connect method
  • includes the write method, essentially writing to itself
  • listens for data, here from the server, and logs a string with the data
  • after 2 intervals will signal to itself all done, then call the .end() method, signaling to the server that this client instance is done connecting to the server
const { createServer, connect } = require('net');

const TIMEOUTS = {
  serverBeat: 1000,
  client: {
    allDone: 3250,
    end: 250
  },
};

// SERVER
createServer((socket) => {
  
  const interval = setInterval(() => {
    console.log('server writing beat')
    
    socket.write('server beat');
  }, TIMEOUTS.serverBeat);

  socket.on('data', (data) => {
    socket.write(data.toString().toUpperCase());
  });
  
  socket.on('end', () => {
    console.log('server socket ended!')
    
    clearInterval(interval);
  });
})
  .listen(3000);


// CLIENT
const netClient = connect(3000)

netClient.on('data', (data) => {
  console.log('client received data:', data.toString());
});
netClient.write('starting client');

setTimeout(() => {
  netClient.write('all done');
  setTimeout(() => {
    console.log('client .end() here')
    
    netClient.end();
  }, TIMEOUTS.client.end);
}, TIMEOUTS.client.allDone);

Running the above will output...

client received data: STARTING CLIENT
server writing beat
client received data: server beat
server writing beat
client received data: server beat
server writing beat
client received data: server beat
client received data: ALL DONE
client .end() here
server socket ended!

An Example Of A Duplex Stream with the native gzip module

const { createGzip } = require('zlib');

const gzipStream = createGzip();

function onData(data) {
  const base64Data = data.toString('base64');
  console.log('base64Data :', base64Data);
  
}

gzipStream.on('data', onData);
gzipStream.write('a string here');
setTimeout(() => {
  gzipStream.end('another string here');
}, 500);

Running the above will output:

base64Data : H4sIAAAAAAAAEw==
base64Data : S1QoLinKzEtXyEgtSk3Myy8B0shCAA8rMZwgAAAA

An Example of A Duplex Stream with the native crypto module

const { Transform } = require('stream');
const { scrypt } = require('crypto');
const CRYPTO_SALT = 'random-string-here'
const dataToWrite = ['first string', 'second string', 'third string'];

const createTransformStream = () => {
  return new Transform({
    decodeStrings: false,
    encoding: 'hex',
    transform(chunk, enc, next) {
      scrypt(chunk, CRYPTO_SALT, 32, (err, key) => {
        if (err) {
          next(err);
          return;
        }
        next(null, key);
      });
    },
  });
};
const transformStream = createTransformStream();

transformStream.on('data', (data) => {
  console.log('got data:', data);
});

dataToWrite.forEach((str, itmIdx) => {
  if (itmIdx !== dataToWrite.length - 1) {
    transformStream.write(str);
  } else {
    transformStream.end(str);
  }
});

An Example using the finished utility

This is a minor adjustment of the above net server example

const { createServer, connect } = require('net');
const { finished } = require('stream')
const TIMEOUTS = {
  serverBeat: 1000,
  client: {
    allDone: 3250,
    end: 250
  },
};

// SERVER
createServer((socket) => {
  
  const interval = setInterval(() => {
    console.log('server writing beat')
    
    socket.write('server beat');
  }, TIMEOUTS.serverBeat);

  socket.on('data', (data) => {
    socket.write(data.toString().toUpperCase());
  });
  
  // instead of using...
  // socket.on('end', () => {
  //   console.log('server socket ended!') 
  //   clearInterval(interval);
  // });

  // here is the 'finished' function in action
  finished(socket, (err) => {
    if (err) {
      console.error('there was a socket error', err)
    }
    clearInterval(interval) 
  })
})
  .listen(3000);
Tags: