Node.js — Filter Data in Streams

Using streams in Node.js is a powerful mechanism to process data. Especially when working with large data sets where you might want to filter chunks that don’t match a given criteria.

Node.js Series Overview

Example: Loading Lines From File to Database

Imagine the following example: you want to load a file with rows of data into a database. Your requirement is that each row has a value in the “name” column. If there’s no value, you want to skip it.

your-example-file.csv

name      , department  
-------------------------
Marcus    , streaming  
          , joking        <-- filter this line
Norman    , programming  
Christian , programming  

The task is to filter the second row because of the missing name.

How to Filter Data in Streams

Transform streams are a way to filter data in streams. Implement the _transform() function to pass through chunks that match your condition. In case the condition doesn’t match, omit the current chunk and proceed with the next one.

Within the _transform method, you can either use this.push() to pass through data or use the second parameter of the next callback. Here’s a code snippet that uses the next callback:

const { Transform } = require('stream')

class Filter extends Transform {

  constructor() {
    super({
      readableObjectMode: true,
      writableObjectMode: true
    })
  }

  _transform(chunk, encoding, next) {
    if (this.has(chunk.name)) {
      return next(null, chunk)
    }

    next()
  }

  has(value) {
    return !!value
  }
}

module.exports = Filter  

Notice that the chunk passes through your transform if it matches the condition. If not, you’re notifying Node.js to process the next chunk by calling next().

The object mode configuration in the constructor ensures that the chunks in _transform are JavaScript objects and not buffers. It also improves the readability of the example 😃

Usage

A way of integrating the filter in your stream is to .pipe your input to the transformer and from there to the destination stream:

const Fs = require('fs')  
const Filter = require('./your-filter-transform')

const transformer = new Filter()  
const input = Fs.createReadStream(source)  
const output = Fs.createReadStream(destination)

input.pipe(transformer).pipe(output)  

That’s it! A transform stream is a great way to receive data from an input source, read the chunk, process it, and pass the changed value to the destination.

Enjoy coding & make it rock!

Explore the Library

Find interesting tutorials and solutions for your problems.