Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#221] Support PCmdRequest #222

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions lib/client/bounded-buffer-readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ class BoundedBufferReadableStream {
this.buffer = []
this.options = constructorOptions || {}
this.readableStream = this.makeReadableSteam()
this.maxBufferSize = this.options.maxBuffer || 100
}

makeReadableSteam() {
const readableStream = new Readable(Object.assign({
objectMode: true,
read: () => {
this.readStart()
}
Expand All @@ -44,12 +44,8 @@ class BoundedBufferReadableStream {
}

push(data) {
if (this.buffer.length < this.maxBufferSize) {
this.buffer.push(data)
}

if (this.canStart()) {
this.readStart()
if(!this.readableStream.push(data)) {
return this.readStop()
}
}

Expand All @@ -64,13 +60,6 @@ class BoundedBufferReadableStream {
}

this.readable = true

const length = this.buffer.length
for (let index = 0; index < length; index++) {
if (!this.readableStream.push(this.buffer.shift())) {
return this.readStop()
}
}
}

readStop() {
Expand Down Expand Up @@ -113,10 +102,6 @@ class BoundedBufferReadableStream {
this.readableStream.pipe(writableSteam)
this.writableSteam = writableSteam
}

setEncoding(encoding) {
this.readableStream.setEncoding(encoding)
}
}

module.exports = BoundedBufferReadableStream
24 changes: 21 additions & 3 deletions lib/client/command/command-type.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
const cmdMessages = require('../../data/v1/Cmd_pb')

class CommandType {
static ECHO = new CommandType('ECHO', cmdMessages.PCommandType.ECHO)
static ACTIVE_THREAD_COUNT = new CommandType('ACTIVE_THREAD_COUNT', cmdMessages.PCommandType.ACTIVE_THREAD_COUNT)
static supportedServices = [CommandType.ECHO, CommandType.ACTIVE_THREAD_COUNT]
static echo = new CommandType('ECHO', cmdMessages.PCommandType.ECHO)
static activeThreadCount = new CommandType('ACTIVE_THREAD_COUNT', cmdMessages.PCommandType.ACTIVE_THREAD_COUNT)
static none = new CommandType('NONE', cmdMessages.PCommandType.NONE)
static supportedServices = [CommandType.echo, CommandType.activeThreadCount]

constructor(name, value) {
this.name = name
Expand All @@ -27,6 +28,23 @@ class CommandType {
message.setHandshakemessage(command)
return message
}

static valueOf(command) {
for (const commandType of CommandType.supportedServices) {
if (commandType.value === command) {
return commandType
}
}
return CommandType.none
}

send(senders) {
if (typeof senders[this.name] !== 'function') {
return
}

senders[this.name]()
}
}

module.exports = CommandType
58 changes: 51 additions & 7 deletions lib/client/grpc-data-sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const grpcBuiltInRetryHeaderInterceptor = require('./interceptor/grpc-built-in-r
const CallArgumentsBuilder = require('./call-arguments-builder')
const OptionsBuilder = require('./retry/options-builder')
const CommandType = require('./command/command-type')
const GrpcReadableStream = require('./grpc-readable-stream')
const cmdMessages = require('../data/v1/Cmd_pb')
const wrappers = require('google-protobuf/google/protobuf/wrappers_pb')

// AgentInfoSender.java
// refresh daily
Expand All @@ -33,7 +36,11 @@ class GrpcDataSender {
this.initializeStatStream(collectorIp, collectorStatPort, config)
this.initializePingStream()
this.initializeAgentInfoScheduler()
this.initializeProfilerClients()
this.initializeProfilerClients(collectorIp, collectorTcpPort, config)
}

setCommandEchoCallArguments(callArguments) {
this.commandEchoCallArguments = callArguments
}

close() {
Expand All @@ -59,17 +66,17 @@ class GrpcDataSender {
if (this.statClient) {
this.statClient.close()
}
if (this.profilerStream) {
this.profilerStream.end()
}
if (this.profilerClient) {
this.profilerClient.close()
}
if (this.profilerStream) {
this.profilerStream.grpcStream.end()
}
}

initializeClients(collectorIp, collectorTcpPort, config) {
const agentBuilder = new OptionsBuilder()
.addInterceptor(makeAgentInformationMetadataInterceptor(this.agentInfo))
.addInterceptor(makeAgentInformationMetadataInterceptor(this.agentInfo))
.addInterceptor(socketIdInterceptor)
.addInterceptor(grpcBuiltInRetryHeaderInterceptor)

Expand Down Expand Up @@ -129,7 +136,44 @@ class GrpcDataSender {
profilerBuilder.setGrpcServiceConfig(config.grpcServiceConfig.getProfiler())
}
this.profilerClient = new services.ProfilerCommandServiceClient(collectorIp + ":" + collectorTcpPort, grpc.credentials.createInsecure(), profilerBuilder.build())
this.profilerStream = new GrpcBidirectionalStream('profilerStream', this.profilerClient, this.profilerClient.handleCommandV2)
this.profilerStream = new GrpcReadableStream(() => {
const writable = this.profilerClient.handleCommandV2()
writable.on('data', (cmdRequest) => {
const requestId = cmdRequest.getRequestid()
const command = cmdRequest.getCommandCase()
CommandType.valueOf(command).send({
'ECHO': () => {
const response = new cmdMessages.PCmdResponse()
response.setResponseid(requestId)
response.setStatus(0)
const stringValue = new wrappers.StringValue()
stringValue.setValue('')
response.setMessage(stringValue) // error message

const cmdEchoResponse = new cmdMessages.PCmdEchoResponse()
cmdEchoResponse.setCommonresponse(response)
const message = cmdRequest.getCommandecho().getMessage()
cmdEchoResponse.setMessage(message)

const callArguments = guardCallArguments(this.commandEchoCallArguments)
const metadata = callArguments.getMetadata()
let options = callArguments.getOptions()
const callback = callArguments.getCallback()
this.profilerClient.commandEcho(cmdEchoResponse, metadata, options, (err, response) => {
if (err) {
log.error(err)
}
if (callback) {
callback(err, response)
}
})
},
'ACTIVE_THREAD_COUNT': () => {
}
})
})
return writable
})
}

agentInfoRefreshInterval() {
Expand Down Expand Up @@ -291,7 +335,7 @@ class GrpcDataSender {
if (log.isDebug()) {
log.debug(`sendControlHandshake pCmdMessage: ${JSON.stringify(pCmdMessage.toObject())}`)
}
this.profilerStream.write(pCmdMessage)
this.profilerStream.push(pCmdMessage)
}

sendPing() {
Expand Down
67 changes: 67 additions & 0 deletions lib/client/grpc-readable-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Pinpoint Node.js Agent
* Copyright 2021-present NAVER Corp.
* Apache License v2.0
*/

'use strict'

const { Transform } = require('node:stream')
const log = require('../utils/logger')

// Java Agent queue size private static final int DEFAULT_AGENT_SENDER_EXECUTOR_QUEUE_SIZE = 1000;
class GrpcReadableStream {
constructor(makeCall, options = {}) {
this.makeCall = makeCall
this.readableStream = this.makeTransform(options)
this._pipe()
}

makeTransform(options) {
const readableStream = new Transform(Object.assign({
readableObjectMode: true,
transform(chunk, encoding, callback) {
callback(null, chunk)
}
}, options))

readableStream.on('error', () => {
// https://nodejs.org/api/stream.html#readablepipedestination-options
// `One important caveat is that if the Readable stream emits an error during processing,
// the Writable destination is not closed automatically.
// If an error occurs, it will be necessary to manually close each stream
// in order to prevent memory leaks.`
// for readable steam error memory leak prevention
if (this.writableSteam && typeof this.writableSteam.end === 'function') {
this.writableSteam.end()
}
})

return readableStream
}

push(data) {
this.readableStream.push(data)
}

_pipe() {
if (typeof this.makeCall !== 'function') {
return
}
const writableStream = this.makeCall()
writableStream.on('error', (error) => {
if (error) {
log.error('writable steam error', error)
}
})

this.readableStream.pipe(writableStream)
this.writableSteam = writableStream
}

end() {
this.readableStream.end()
}
}

module.exports = GrpcReadableStream
32 changes: 5 additions & 27 deletions lib/instrumentation/context/span-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,60 +6,38 @@

const TransactionId = require("../../context/transaction-id")
const TraceId = require("../../context/trace-id")
const Span = require("../../context/span")

class SpanBuilder {
constructor(traceId, agentInfo) {
this.traceId = traceId
this.agentId = agentInfo.agentId
this.applicationName = agentInfo.applicationName
this.agentStartTime = agentInfo.agentStartTime
this.serviceType = agentInfo.serviceType
this.spanId = traceId.spanId
this.parentSpanId = traceId.parentSpanId
this.agentInfo = agentInfo
this.startTime = Date.now()
this.elapsedTime = 0
this.rpc = null
this.endPoint = null
this.remoteAddr = null
this.annotations = []
this.flag = traceId.flag
this.err = null
this.spanEventList = []
this.apiId = null
this.exceptionInfo = null
this.applicationServiceType = agentInfo.applicationServiceType
this.loggingTransactionInfo = null
this.version = 1
this.acceptorHost = undefined
this.parentApplicationName = undefined
this.parentApplicationType = undefined
}

static makeSpanBuilderWithSpanId(spanId, agentInfo, parentSpanId = '-1') {
static makeWithSpanId(spanId, agentInfo, parentSpanId = '-1') {
const transactionId = new TransactionId(agentInfo.agentId, agentInfo.agentStartTime)
const traceId = new TraceId(transactionId, spanId, parentSpanId)
const builder = new SpanBuilder(traceId, agentInfo)
return builder
}

static valueOf(span) {
const builder = new SpanBuilder(span.traceId, span.agentId, span.applicationName, span.applicationServiceType, span.agentStartTime, span.serviceType, span.acceptorHost, span.parentApplicationName, span.parentApplicationType)
builder.spanId = span.spanId
builder.parentSpanId = span.parentSpanId
builder.startTime = span.startTime
builder.elapsedTime = span.elapsedTime
builder.rpc = span.rpc
builder.endPoint = span.endPoint
builder.remoteAddr = span.remoteAddr
builder.annotations = span.annotations
builder.flag = span.flag
builder.err = span.err
builder.spanEventList = span.spanEventList
builder.apiId = span.apiId
builder.exceptionInfo = span.exceptionInfo
builder.loggingTransactionInfo = span.loggingTransactionInfo
builder.version = span.version
return builder
build() {
return new Span(this.traceId, this.agentInfo)
}
}

Expand Down
10 changes: 10 additions & 0 deletions test/client/grpc-data-sender-client-side-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

const test = require('tape')
const grpc = require('@grpc/grpc-js')
const cmdMessage = require('../../lib/data/v1/Cmd_pb')
const services = require('../../lib/data/v1/Service_grpc_pb')
const { Empty } = require('google-protobuf/google/protobuf/empty_pb')
const { log } = require('../test-helper')
Expand Down Expand Up @@ -113,6 +114,12 @@ function pingSession(call) {
})
}

const handleCommandV2Service = (call, callback) => {
const result = new cmdMessage.PResult()
callback(null, result)
}


// https://github.com/grpc/grpc-node/issues/1542
// https://github.com/grpc/grpc-node/pull/1616/files
// https://github.com/agreatfool/grpc_tools_node_protoc_ts/blob/v5.0.0/examples/src/grpcjs/client.ts
Expand All @@ -133,6 +140,9 @@ test('client side streaming with deadline and cancellation', function (t) {
server.addService(services.SpanService, {
sendSpan: sendSpan
})
server.addService(services.ProfilerCommandServiceService, {
handleCommandV2: handleCommandV2Service
})

server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (err, port) => {
actuals.dataCount = 1
Expand Down
Loading
Loading