Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream.pipeline does not wait for the last stream to flush before calling the final callback #34274

Closed
aravindanve opened this issue Jul 9, 2020 · 11 comments
Labels
stream Issues and PRs related to the stream subsystem.

Comments

@aravindanve
Copy link

  • Version: v12.8.0
  • Platform: macOS 10.15.4 (Catalina)
  • Subsystem: stream

What steps will reproduce the bug?

// reproduce.js

const util = require('util');
const stream = require('stream');

const call = async (fn, ...args) => fn(...args);

const map = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      (modified) => cb(null, modified),
      (error) => cb(error),
    );

  return tx;
};

const tap = (fn) => {
  const tx = new stream.Transform({ objectMode: true });

  tx._transform = (chunk, enc, cb) =>
    call(fn, chunk).then(
      () => cb(null, chunk),
      (error) => cb(error),
    );

  return tx;
};

const fork = (...t) => {
  let done;
  let doneError;
  let flush;

  const tx = new stream.Transform({ objectMode: true });
  const pt = new stream.PassThrough({ objectMode: true });

  stream.pipeline(pt, ...t, (error) => {
    done = true;
    doneError = error;
    flush && flush(doneError);
  });

  tx._flush = (cb) => {
    pt.push(null);
    flush = cb;
    done && flush(doneError);
  };

  tx._transform = (chunk, enc, cb) => {
    pt.push(chunk, enc);
    cb(null, chunk);
  };

  return tx;
};

const readableStream = new stream.PassThrough({ objectMode: true });
const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    readableStream,
    fork(
      tap(() => console.log('fork 1: do something with obj for 2s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 2000))),
      tap(() => console.log('fork 1 done!')),
    ),
    fork(
      tap(() => console.log('fork 2: do something with obj for 4s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 4000))),
      tap(() => console.log('fork 2 done!')),
    ),
    fork(
      tap(() => console.log('fork 3: do something with obj for 6s')),
      map((obj) => new Promise((done) => setTimeout(() => done(obj), 6000))),
      tap(() => console.log('fork 3 done!')),
    ),
    // new stream.PassThrough({ objectMode: true }),
    // ^___ adding an extra stream in the pipeline seems to fix the problem
  );
  console.log('done!');
}

run().catch(console.error);

readableStream.push({ name: 'test' });
readableStream.push(null);

How often does it reproduce? Is there a required condition?

always

What is the expected behavior?

Console output should look like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
fork 3 done!
done!

What do you see instead?

Console output actually looks like:

fork 1: do something with obj for 2s
fork 2: do something with obj for 4s
fork 3: do something with obj for 6s
fork 1 done!
fork 2 done!
done!
fork 3 done!

Additional information

As noted in the code above, adding an extra stream at the end seems to mitigate the problem for now.

await pipeline(
    ...
    new stream.PassThrough({ objectMode: true }),
);

console.log('done!);
@rickyes
Copy link
Contributor

rickyes commented Jul 9, 2020

Hi, Aravindan. Thanks for your feedback, I am trying to locate this problem, and I reproduced this problem in v14.5.0.

@aravindanve
Copy link
Author

@rickyes Just checked, I too get this in both 12.8.0 and 14.5.0. Let me know if you need anything else from me.

@ronag
Copy link
Member

ronag commented Jul 10, 2020

are you able to simplify the repro case?

@ronag ronag added the stream Issues and PRs related to the stream subsystem. label Jul 10, 2020
@aravindanve
Copy link
Author

aravindanve commented Jul 11, 2020

@ronag here you go:

const stream = require('stream');

const makeStream = (i) =>
  new stream.Transform({
    transform: (chunk, enc, cb) => cb(null, chunk),
    flush: (cb) =>
      setTimeout(() => {
        console.log('done flushing', i);
        cb(null);
      }),
  });

const input = new stream.Readable();

stream.pipeline(
  input,
  makeStream(1),
  makeStream(2),
  makeStream(3),
  () => console.log('done!'),
);

input.push('test');
input.push(null);

Prints:

done flushing 1
done flushing 2
done!
done flushing 3

@rickyes
Copy link
Contributor

rickyes commented Jul 11, 2020

I think I'm getting close to the problem and check again tonight.

@ronag
Copy link
Member

ronag commented Jul 11, 2020

Simplest repro:

const stream = require('stream');

const makeStream = (i) =>
  new stream.Transform({
    transform: (chunk, enc, cb) => cb(null, chunk),
    flush: (cb) =>
      setTimeout(() => {
        console.log('done flushing', i);
        cb(null);
      }),
  });

const input = new stream.Readable();
input.push(null);

stream.pipeline(
  input,
  makeStream(1),
  () => console.log('done!'),
);

@ronag
Copy link
Member

ronag commented Jul 11, 2020

This is actually an unfortunate case of Transform using 'prefinish' instead of _final to invoke _flush.

I don't think this is fixable without breaking something else. See notes here.

We would have to change Transform to properly use _final to fix this.

@mcollina WDYT?

ronag added a commit to nxtedition/node that referenced this issue Jul 11, 2020
Due to compat reasons Transform streams don't always wait
for flush to complete before finishing the stream.

Try to wait when possible, i.e. when the user does not
override _final.

Fixes: nodejs#34274
@ronag
Copy link
Member

ronag commented Jul 11, 2020

I think I found a solution.

@ronag ronag closed this as completed in a65218f Jul 15, 2020
ronag added a commit that referenced this issue Jul 16, 2020
finished should invoke callback on closed OutgoingMessage the
same way as for regular streams.

Fixes: #34301

PR-URL: #34313
Fixes: #34274
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
cjihrig pushed a commit that referenced this issue Jul 23, 2020
Due to compat reasons Transform streams don't always wait
for flush to complete before finishing the stream.

Try to wait when possible, i.e. when the user does not
override _final.

Fixes: #34274

PR-URL: #34314
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Zeyu Yang <himself65@outlook.com>
cjihrig pushed a commit that referenced this issue Jul 23, 2020
finished should invoke callback on closed OutgoingMessage the
same way as for regular streams.

Fixes: #34301

PR-URL: #34313
Fixes: #34274
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
@oprogramador
Copy link

I just reproduced this issue in Node.js 14.15.4 but I see it's fixed in 15.5.1

@oprogramador
Copy link

The simplest workaround seems to declare a promise somewhere outside, assign a value in flush to the promise and await the promise in the final callback.

sounisi5011 added a commit to sounisi5011/npm-packages that referenced this issue May 22, 2021
sounisi5011 added a commit to sounisi5011/npm-packages that referenced this issue May 22, 2021
* chore(stream-transform-from): create directory for `@sounisi5011/stream-transform-from` package

* test(stream-transform-from): add tests

* test(stream-transform-from): remove the `createPipeline()` function from test code

* feat(stream-transform-from): add a type definition for the public API

* feat(stream-transform-from): add `transformFrom()` function and `TransformFromAsyncIterable` class

* fix(stream-transform-from): change the complex `createSource()` method to a simple `createSource()` function

* test(stream-transform-from): fix the tests

    + Re-create the `Transform` object every time testing.
    + Simplify tests that differ only in options with `describe.each()`.

* refactor(stream-transform-from): use the `catch()` method instead of the `try ... catch` statement

* fix(stream-transform-from): don't call the callback multiple times & call the callback passed to `_flush()` method after finish

* test(stream-transform-from): use the `break` keyword while transforming a stream

* fix(stream-transform-from): use the `break` keyword while transforming a stream

* refactor(stream-transform-from): refactoring the `TransformFromAsyncIterable` class

    + Rename the `done` property to `transformCallback
    + Rename the `callDoneFn` method to `callTransformCallback`
    + Remove the `pushError` method and add `finish` method in its place

* test(stream-transform-from): add tests

    + Added a test that returns multiple chunks
    + Added a test for the type of chunks contained in the source

* fix(stream-transform-from): fix the type definition of return values

    If there is a possibility that either the `objectMode` or the `readableObjectMode` option is not `true`, the return value should not be `unknown`.

* test(stream-transform-from): add a test to merge multiple chunks

* fix(stream-transform-from): `transformCallback` should be called when the next chunk is needed, not after transforming a chunk

* test(stream-transform-from): add type definition tests

* fix(stream-transform-from): fix `InputChunkType`

    Index signatures and `boolean` types are now supported.

* test(stream-transform-from): add type definition tests for output values

* docs(stream-transform-from): add example code

* test(stream-transform-from): add a test for the timing of data flowing in the stream

* test(stream-transform-from): fix the timing test for data flowing in the stream

    I noticed that the timing of the flow changes when using the asynchronous API.

* test(stream-transform-from): eliminate the PromiseRejectionHandledWarning

    Promise objects should only be created in tests.
    Creating it during test case generation could cause Promise errors to affect unrelated code.

* test(stream-transform-from): fix the timing test for data flowing in the stream

    We have not found a way to control when data is read from the Readable stream.

* fix(stream-transform-from): create a source iterator in the constructor

* refactor(stream-transform-from): remove `src/utils.ts`

* refactor(stream-transform-from): reduce Cognitive Complexity in the `TransformFromAsyncIterable#finish()` method

    Code Climate reported:

    + Function `finish` has a Cognitive Complexity of 6 (exceeds 5 allowed). Consider refactoring.

* test(stream-transform-from): add tests that inherit from the `TransformFromAsyncIterable` class

* revert: test(stream-transform-from): add tests that inherit from the `TransformFromAsyncIterable` class

    This reverts commit 28e9901.

* test(stream-transform-from): add tests to convert strings of different encodings

* feat(stream-transform-from): can use the encoding passed with the chunk

* feat(stream-transform-from): export some types

* test(stream-transform-from): can't get the error after processing all chunks

    see nodejs/node#34274

* test(stream-transform-from): the bug with not being able to get errors after processing all chunks has been fixed in Node v15

* test(stream-transform-from): fix the bug that no error occurs after getting all the chunks

    This bug has been fixed in Node.js v15.

    + https://github.com/nodejs/node/blob/master/doc/changelogs/CHANGELOG_V15.md#15.0.0
    + nodejs/node#34314

    However, we have found a workaround that works for anything less than Node.js v15, so we will attempt to fix this bug in this package.

* fix(stream-transform-from): fix the bug that no error occurs after getting all the chunks

* test(stream-transform-from): make the conditional branches of the test code readable and organized

* test(stream-transform-from): refactoring test code

* build(stream-transform-from): disable the inlineSources option in TypeScript

    The size of the package is now smaller if `src` directory contents are included.

* docs(stream-transform-from): add `README.md`

* ci(stream-transform-from): add custom publish scripts

* docs(stream-transform-from): update `README.md`

* test(stream-transform-from): add tests for options that should be ignored

* fix(stream-transform-from): ignore some options

* fix(stream-transform-from): add "construct" in the options to ignore

* docs(stream-transform-from): update `README.md`: add fields that are disallowed in options

* refactor(stream-transform-from): don't use the global object `process`

* docs(stream-transform-from): fix the code for the example in `README.md`

* refactor(stream-transform-from): move utility type functions to the top of file

* style(stream-transform-from): update `src/index.ts`
@boutell
Copy link

boutell commented Nov 20, 2023

Thanks for reading this, I know it's a little long but I believe it is relevant. Bear with me 🙏

I implemented _final in a PassThrough stream in order to check for a special condition before invoking the callback. As it happens, I'm using the AWS SDK API, which allows me to pass in a readable stream to an upload method, but doesn't simply implement the writable stream interface.

So like many before me I wrapped it in a PassThrough stream. Now I can call await pipeline(someReadableStream, myPassThroughStream).

Except: once the AWS SDK reads the last byte from that passthrough stream, we're "done" as far as pipeline knows. So Node.js prematurely thinks the upload is complete. Boo.

"Aha!" I said. "I'll implement _final in my PassThrough stream and defer invoking the callback until both the superclass implementation of _final and the done method of the AWS Upload class are complete. That way I know that (1) all the input came through and (2) the actual upload also completed."

But... it turns out that tons of operations are still going on even after the pipelines are all 100% convinced the are finished. I couldn't figure it out until I found this thread.

The fix here in Node.js specifically decides not to wait for my _final method before finalizing the stream, and I wind up with lots of work still going on after pipeline:

a65218f

Augh!

Is there just no way to impose an additional requirement upon finalization in a PassThrough stream and have that be respected by pipeline()? Will I have to do something like returning both a stream and a promise, and awaiting the pipeline and then the promise? Or perhaps implementing a writable stream from scratch, whatever that looks like in 2023?

I did try implementing _flush() instead, but trying to pause there just interferes with the uploader's ability to know that it has received all of its input. Catch-22.

Here is my current implementation. Note that settle waits until the outcome of both the superclass _final method and the uploader's done() method are known before invoking the callback of _final. This is the technique I need to replace, somehow, in a way that is compatible with pipeline() and the change that was made to not wait for _final when it has been extended in this way.

Thanks so much for reading!

      const body = new PassThrough();
      const superFinal = body._final;
      let streamError = undefined;
      let awaitError = undefined;
      let streamFinalCallback;

      // We have to wait for both the stream to finish and the upload to finish
      body._final = function(callback) {
        superFinal.call(body, function(err) {
          streamError = err || null;
          streamFinalCallback = callback;
          return settle();
        });
      };
          
      const uploader = new Upload({
        client: self.getClient(uri),
        params: {
          // etc
          Body: body
        }
      });
      (async () => {
        try {
          // Triggers the flow of data
          await uploader.done();
          awaitError = null;
          return settle();
        } catch (err) {
          awaitError = err;
          return settle();
        }
      })();
      // return the passthrough stream for use in pipeline()
      return body;

      function settle() {
        if ((streamError !== undefined) && (awaitError !== undefined)) {
          return streamFinalCallback(streamError || awaitError);
        }
        // Not yet
      }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stream Issues and PRs related to the stream subsystem.
Projects
None yet
5 participants