Skip to content

Commit

Permalink
[receiver/otelarrow] use mdatagen for metrics
Browse files Browse the repository at this point in the history
Fixes open-telemetry#33666

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
  • Loading branch information
codeboten committed Jul 3, 2024
1 parent 851f054 commit cd428d2
Showing 1 changed file with 33 additions and 57 deletions.
90 changes: 33 additions & 57 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
Expand All @@ -43,12 +42,13 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)

const (
streamFormat = "arrow"
hpackMaxDynamicSize = 4096
scopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver"
)

var (
Expand All @@ -71,17 +71,15 @@ type Receiver struct {
arrowpb.UnsafeArrowLogsServiceServer
arrowpb.UnsafeArrowMetricsServiceServer

telemetry component.TelemetrySettings
tracer trace.Tracer
obsrecv *receiverhelper.ObsReport
gsettings configgrpc.ServerConfig
authServer auth.Server
newConsumer func() arrowRecord.ConsumerAPI
netReporter netstats.Interface
recvInFlightBytes metric.Int64UpDownCounter
recvInFlightItems metric.Int64UpDownCounter
recvInFlightRequests metric.Int64UpDownCounter
boundedQueue *admission.BoundedQueue
telemetry component.TelemetrySettings
tracer trace.Tracer
obsrecv *receiverhelper.ObsReport
gsettings configgrpc.ServerConfig
authServer auth.Server
newConsumer func() arrowRecord.ConsumerAPI
netReporter netstats.Interface
telemetryBuilder *internalmetadata.TelemetryBuilder
boundedQueue *admission.BoundedQueue
}

// receiverStream holds the inFlightWG for a single stream.
Expand All @@ -102,44 +100,22 @@ func New(
netReporter netstats.Interface,
) (*Receiver, error) {
tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver")
var errors, err error
recv := &Receiver{
Consumers: cs,
obsrecv: obsrecv,
telemetry: set.TelemetrySettings,
tracer: tracer,
authServer: authServer,
newConsumer: newConsumer,
gsettings: gsettings,
netReporter: netReporter,
boundedQueue: bq,
}

meter := recv.telemetry.MeterProvider.Meter(scopeName)
recv.recvInFlightBytes, err = meter.Int64UpDownCounter(
"otel_arrow_receiver_in_flight_bytes",
metric.WithDescription("Number of bytes in flight"),
metric.WithUnit("By"),
)
errors = multierr.Append(errors, err)

recv.recvInFlightItems, err = meter.Int64UpDownCounter(
"otel_arrow_receiver_in_flight_items",
metric.WithDescription("Number of items in flight"),
)
errors = multierr.Append(errors, err)

recv.recvInFlightRequests, err = meter.Int64UpDownCounter(
"otel_arrow_receiver_in_flight_requests",
metric.WithDescription("Number of requests in flight"),
)
errors = multierr.Append(errors, err)

if errors != nil {
return nil, errors
}

return recv, nil
telemetryBuilder, err := internalmetadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}
return &Receiver{
Consumers: cs,
obsrecv: obsrecv,
telemetry: set.TelemetrySettings,
tracer: tracer,
authServer: authServer,
newConsumer: newConsumer,
gsettings: gsettings,
netReporter: netReporter,
boundedQueue: bq,
telemetryBuilder: telemetryBuilder,
}, nil
}

// headerReceiver contains the state necessary to decode per-request metadata
Expand Down Expand Up @@ -445,7 +421,7 @@ func (r *receiverStream) newInFlightData(ctx context.Context, method string, bat
ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight")

r.inFlightWG.Add(1)
r.recvInFlightRequests.Add(ctx, 1)
r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1)
id := &inFlightData{
receiverStream: r,
method: method,
Expand Down Expand Up @@ -525,10 +501,10 @@ func (id *inFlightData) anyDone(ctx context.Context) {
}

if id.uncompSize != 0 {
id.recvInFlightBytes.Add(ctx, -id.uncompSize)
id.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -id.uncompSize)
}
if id.numItems != 0 {
id.recvInFlightItems.Add(ctx, int64(-id.numItems))
id.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-id.numItems))
}

// The netstats code knows that uncompressed size is
Expand All @@ -540,7 +516,7 @@ func (id *inFlightData) anyDone(ctx context.Context) {
sized.Length = id.uncompSize
id.netReporter.CountReceive(ctx, sized)

id.recvInFlightRequests.Add(ctx, -1)
id.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, -1)
id.inFlightWG.Done()
}

Expand Down Expand Up @@ -638,8 +614,8 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre
flight.uncompSize = uncompSize
flight.numItems = numItems

r.recvInFlightBytes.Add(inflightCtx, uncompSize)
r.recvInFlightItems.Add(inflightCtx, int64(numItems))
r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize)
r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems))

numAcquired, err := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound)

Expand Down

0 comments on commit cd428d2

Please sign in to comment.