Skip to content

Commit

Permalink
[#182] Support gRPC stream flowable
Browse files Browse the repository at this point in the history
*  _read method of readable steam learning test
* Max Buffer size
* retry pipe writableStream
* recovery readable stream
* readableStream.closed above node@18
[#177] Update minimum NodeJS version to Node@16
  • Loading branch information
feelform committed Apr 18, 2024
1 parent 719afa7 commit b1b7237
Show file tree
Hide file tree
Showing 6 changed files with 403 additions and 57 deletions.
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ jobs:
strategy:
matrix:
node_version:
- 14
- 16
- 18

Expand Down
122 changes: 122 additions & 0 deletions lib/client/bounded-buffer-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* Pinpoint Node.js Agent
* Copyright 2021-present NAVER Corp.
* Apache License v2.0
*/

'use strict'

const { Readable } = require('node:stream')
const log = require('../utils/logger')

class BoundedBufferReadableStream {
constructor(constructorOptions) {
this.buffer = []
this.options = constructorOptions || {}
this.readableStream = this.makeReadableSteam()
this.maxBufferSize = this.options.maxBuffer || 100
}

makeReadableSteam() {
const readableStream = new Readable(Object.assign({
read: () => {
this.readStart()
}
}, this.options))

readableStream.on('error', () => {
// https://nodejs.org/api/stream.html#readablepipedestination-options
// `One important caveat is that if the Readable stream emits an error during processing,
// the Writable destination is not closed automatically.
// If an error occurs, it will be necessary to manually close each stream
// in order to prevent memory leaks.`
// for readable steam error memory leak prevention
if (this.writableSteam && typeof this.writableSteam.end === 'function') {
this.writableSteam.end()
}
})

readableStream.on('close', () => {
if (!this.writableFactory) {
return
}

this.readableStream = this.makeReadableSteam()
this._pipe()
})

return readableStream
}

push(data) {
if (this.buffer.length < this.maxBufferSize) {
this.buffer.push(data)
}

if (this.canStart()) {
this.readStart()
}
}

canStart() {
return this.readable
}

readStart() {
this.readable = true

const length = this.buffer.length
for (let index = 0; index < length; index++) {
if (!this.readableStream.push(this.buffer.shift())) {
return this.readStop()
}
}
}

readStop() {
this.readable = false
}

end() {
this.readableStream.end()
}

pipe(writableFactory) {
this.writableFactory = writableFactory
this._pipe()
}

_pipe() {
if (typeof this.writableFactory !== 'function') {
return
}

const writableSteam = this.writableFactory()
if (!writableSteam) {
return
}

writableSteam.on('error', (error) => {
if (error) {
log.error('writable steam error', error)
}
})

writableSteam.on('unpipe', () => {
this.readStop()
})

writableSteam.on('close', () => {

})

this.readableStream.pipe(writableSteam)
this.writableSteam = writableSteam
}

setEncoding(encoding) {
this.readableStream.setEncoding(encoding)
}
}

module.exports = BoundedBufferReadableStream
104 changes: 52 additions & 52 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pinpoint-node-agent",
"version": "0.9.0-next.4",
"version": "1.0.0-next.1",
"main": "index.js",
"types": "index.d.ts",
"type": "commonjs",
Expand All @@ -23,7 +23,7 @@
},
"homepage": "https://github.com/pinpoint-apm/pinpoint-node-agent",
"engines": {
"node": ">=14.0"
"node": ">=16.0"
},
"keywords": [
"pinpoint",
Expand Down Expand Up @@ -64,7 +64,7 @@
"devDependencies": {
"@types/semver": "^7.3.13",
"@types/shimmer": "^1.0.2",
"axios": "^1.6.2",
"axios": "^1.6.8",
"eslint": "^8.43.0",
"eslint-config-prettier": "^3.6.0",
"eslint-plugin-import": "^2.25.2",
Expand Down
Loading

0 comments on commit b1b7237

Please sign in to comment.