Learn The Basics Of Duplex Streams
Duplex Streams for Reading And Writing
duplex streams implement both the readable and writable streams in one interface.
Duplex streams can both "listen" for data
events, like a readStream, and can also write()
to the stream, like writeStreams.
An Example Of A Duplex Stream with the net module
Here, the net
module is used as a quick example of a duplex stream.
The server:
- built with
createServer
- listens for connections
- on an interval writes
server beat
- calls the
end
internally when the client disconnects
The client
- written with the
connect
method - includes the
write
method, essentially writing to itself - listens for
data
, here from the server, and logs a string with the data - after 2 intervals will signal to itself
all done
, then call the.end()
method, signaling to the server that this client instance is done connecting to the server
const { createServer, connect } = require('net');
const TIMEOUTS = {
serverBeat: 1000,
client: {
allDone: 3250,
end: 250
},
};
// SERVER
createServer((socket) => {
const interval = setInterval(() => {
console.log('server writing beat')
socket.write('server beat');
}, TIMEOUTS.serverBeat);
socket.on('data', (data) => {
socket.write(data.toString().toUpperCase());
});
socket.on('end', () => {
console.log('server socket ended!')
clearInterval(interval);
});
})
.listen(3000);
// CLIENT
const netClient = connect(3000)
netClient.on('data', (data) => {
console.log('client received data:', data.toString());
});
netClient.write('starting client');
setTimeout(() => {
netClient.write('all done');
setTimeout(() => {
console.log('client .end() here')
netClient.end();
}, TIMEOUTS.client.end);
}, TIMEOUTS.client.allDone);
Running the above will output...
client received data: STARTING CLIENT
server writing beat
client received data: server beat
server writing beat
client received data: server beat
server writing beat
client received data: server beat
client received data: ALL DONE
client .end() here
server socket ended!
An Example Of A Duplex Stream with the native gzip module
const { createGzip } = require('zlib');
const gzipStream = createGzip();
function onData(data) {
const base64Data = data.toString('base64');
console.log('base64Data :', base64Data);
}
gzipStream.on('data', onData);
gzipStream.write('a string here');
setTimeout(() => {
gzipStream.end('another string here');
}, 500);
Running the above will output:
base64Data : H4sIAAAAAAAAEw==
base64Data : S1QoLinKzEtXyEgtSk3Myy8B0shCAA8rMZwgAAAA
An Example of A Duplex Stream with the native crypto module
const { Transform } = require('stream');
const { scrypt } = require('crypto');
const CRYPTO_SALT = 'random-string-here'
const dataToWrite = ['first string', 'second string', 'third string'];
const createTransformStream = () => {
return new Transform({
decodeStrings: false,
encoding: 'hex',
transform(chunk, enc, next) {
scrypt(chunk, CRYPTO_SALT, 32, (err, key) => {
if (err) {
next(err);
return;
}
next(null, key);
});
},
});
};
const transformStream = createTransformStream();
transformStream.on('data', (data) => {
console.log('got data:', data);
});
dataToWrite.forEach((str, itmIdx) => {
if (itmIdx !== dataToWrite.length - 1) {
transformStream.write(str);
} else {
transformStream.end(str);
}
});
An Example using the finished utility
This is a minor adjustment of the above net server example
const { createServer, connect } = require('net');
const { finished } = require('stream')
const TIMEOUTS = {
serverBeat: 1000,
client: {
allDone: 3250,
end: 250
},
};
// SERVER
createServer((socket) => {
const interval = setInterval(() => {
console.log('server writing beat')
socket.write('server beat');
}, TIMEOUTS.serverBeat);
socket.on('data', (data) => {
socket.write(data.toString().toUpperCase());
});
// instead of using...
// socket.on('end', () => {
// console.log('server socket ended!')
// clearInterval(interval);
// });
// here is the 'finished' function in action
finished(socket, (err) => {
if (err) {
console.error('there was a socket error', err)
}
clearInterval(interval)
})
})
.listen(3000);
Page Tags:
node
streams
core