The legend of the disappearing data in Node.js
Happy leap day! :D

_(Above: A nice green tree frog - source)_
Recently, I've been doing a bunch of work in Node.js streaming large amounts of data. For the most part the experience has been highly pleasurable, as Node.js makes it so easy! I have encountered a few pain points though, the most significant of which I'd like to talk about here.
In Node.js, streams come in 3 main forms:
- Readable Streams
- Writable Streams
- Transform Streams
In addition, you can either plug streams together with the .pipe() method, write to them directly with the .write() method, or any combination thereof - allowing you to build up a chain of streams that enables data to flow through your program.
The problems start when you try and write large amounts of data to a stream directly:
import fs from 'fs';
import do_work from 'somewhere';
import get_some_stream from 'somewhere_else';
let stream_in = get_some_stream();
let out = fs.createWriteStream("/tmp/test.txt");
for(let i = 0; i < 1000000; i++) {
out.write(do_work(stream_in, i))
}
(Above: Just an example of writing lots of data to a stream)
When this happens, you start to lose random chunks of data. The reason for this is not obvious, but it is buried in the Node.js docs:
The
writable.write()method writes some data to the stream, and calls the suppliedcallbackonce the data has been fully handled. If an error occurs, thecallbackmay or may not be called with the error as its first argument. To reliably detect write errors, add a listener for the'error'event.
This is a huge pain. It means that you have to wrap all write calls like this:
"use strict";
/**
* Writes data to a stream, automatically waiting for the drain event if asked.
* @param {stream.Writable} stream_out The writable stream to write to.
* @param {string|Buffer|Uint8Array} data The data to write.
* @return {Promise} A promise that resolves when writing is complete.
*/
function write_safe(stream_out, data) {
return new Promise((resolve, reject) => {
// Handle errors
let handler_error = (error) => {
stream_out.off("error", handler_error);
reject(error);
};
stream_out.on("error", handler_error);
if(stream_out.write(data)) {
// We're good to go
stream_out.off("error", handler_error);
resolve();
}
else {
// We need to wait for the drain event before continuing
stream_out.once("drain", () => {
stream_out.off("error", handler_error);
resolve();
});
}
});
}
export { write_safe };
Such a huge boilerplate for such a simple task! Basically, if the .write() method returns false, you have to wait until the drain event is fired on the writeable stream before continuing to write to the stream. The reason for this I think is that it signals that the write buffer is full, and it needs to be drained before writing can continue.
This is ok, but it would be nice if this was abstracted away behind a single method, such as the wrapper I've shown above. Something like a async stream.Writable.writeAsync() would be great, but it doesn't currently exist.
I think I'm going to open an issue about it - since it seems very doable and just silly that it doesn't exist already.