Skip to content

Recipe: splitting objects

Eugene Lazutkin edited this page Jun 12, 2022 · 2 revisions

Problem

Frequently we need to split objects to have a different pipeline for different properties. Obviously, it could be done at a data processing level but it will complicate the code. It could be implemented as separate programs that pick different features but we'll spend extra time parsing the same stream over and over.

The problem was stated in #99.

Solution

One way to improve the separation is to split at the stream level. For example, it can be done after parsing.

Example

We have data in data-99.json (clearly fake to demonstrate the principle):

{
  "results": [
    {"data": "data #1", "metadata": "metadata #1", "otherdata": "otherdata #1"},
    {"data": "data #2", "metadata": "metadata #2", "otherdata": "otherdata #2"},
    {"data": "data #3", "metadata": "metadata #3", "otherdata": "otherdata #3"}
  ]
}

We want to process data, metadata, and otherdata differently ignoring the rest. For the sake of argument, we want to save them in different files as JSONL for further processing.

It can be done with this simple program:

const fs = require('fs');

const {chain} = require('stream-chain');

const {parser} = require('stream-json');
const {pick} = require('stream-json/filters/Pick');
const {streamValues} = require('stream-json/streamers/StreamValues');
const {stringer} = require('stream-json/jsonl/Stringer');

const processSection = (source, sectionName) =>
  chain([
    source,
    pick({filter: new RegExp('\\b' + sectionName + '\\b')}),
    streamValues(),
    data => (console.log(sectionName.toUpperCase() + ':', data), data.value)
    // , stringer(),
    // fs.createWriteStream(`data-99-${sectionName}-sample.jsonl`)
  ]);

const main = () => {
  const parsed = chain([
    fs.createReadStream('./data-99.json', {encoding: 'utf8'}),
    parser(),
    pick({filter: 'results'})
  ]);

  // process different parts in parallel
  processSection(parsed, 'data');
  processSection(parsed, 'metadata');
  processSection(parsed, 'otherdata');
};

try {
  main();
} catch (error) {
  console.error('ERROR:', error);
}

It produces the following output:

DATA: { key: 0, value: 'data #1' }
METADATA: { key: 0, value: 'metadata #1' }
OTHERDATA: { key: 0, value: 'otherdata #1' }
DATA: { key: 1, value: 'data #2' }
METADATA: { key: 1, value: 'metadata #2' }
OTHERDATA: { key: 1, value: 'otherdata #2' }
DATA: { key: 2, value: 'data #3' }
METADATA: { key: 2, value: 'metadata #3' }
OTHERDATA: { key: 2, value: 'otherdata #3' }

Note that in some cases the order of DATA, METADATA, and OTHERDATA parts can be different. The only constant is that data #1 will go always before data #2 and so on. In most cases, e.g., writing to a file, it doesn't matter.

Discussion

The idea is to separate a stream into substreams and combine them accordingly.

In this particular example we have 4 substreams:

  • The main one reads from a file, parses it, and picks the part we need.
    • Optionally it can clean up data more using ignore(), replace(), more pick(), and so on.
    • It may have more custom steps or streamers like streamArray().
    • The idea is that it produces a stream in a format that can be processed directly by other substreams.
  • 3 processing substreams to continue processing different parts of the stream produced by the main substream.
    • In this particular case they pick a relevant subobject and process it.
    • The processing can do anything. In the example, we print data and save it in a separate file.

The important part here is that to split a stream we don't need to do anything special. We just connect our processing substreams in parallel to the main substream.

While splitting a stream is trivial, joining them back can be more difficult. You may consider my other micro-library for that: stream-join.

Clone this wiki locally