From b54e1b998a8a49a96197b7cc0f4a038b9635036a Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 19 Aug 2024 16:17:01 -0700 Subject: [PATCH 1/4] Add deadline propagation --- receiver/otelarrowreceiver/go.mod | 1 + receiver/otelarrowreceiver/go.sum | 2 + .../otelarrowreceiver/internal/arrow/arrow.go | 48 +++++++++++-- .../internal/arrow/arrow_test.go | 67 +++++++++++++++++++ 4 files changed, 111 insertions(+), 7 deletions(-) diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index caa11bd9e94a..aea0bbad0c6d 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar go 1.22.0 require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.107.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.107.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.107.0 github.com/open-telemetry/otel-arrow v0.24.0 diff --git a/receiver/otelarrowreceiver/go.sum b/receiver/otelarrowreceiver/go.sum index a065bc18982c..bfbe6f900b19 100644 --- a/receiver/otelarrowreceiver/go.sum +++ b/receiver/otelarrowreceiver/go.sum @@ -91,6 +91,8 @@ github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.107.0 h1:lbJCTRmXTm6LihA7HBqS57kADZQYqFaYmtb4t3IVo8k= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil v0.107.0/go.mod h1:w0X1c3MP2PZHkQ1J/vJ+onH/cjTxKNwdDwK2OYHRrvI= github.com/open-telemetry/otel-arrow v0.24.0 h1:hNUEbwHW/1gEOUiN+HoI+ITiXe2vSBaPWlE9FRwJwDE= github.com/open-telemetry/otel-arrow v0.24.0/go.mod h1:uzoHixEh6CUBZkP+vkRvyiHYUnYsAOUwCcfByQkSMM0= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index e4c8a894fd21..c6d2d0e061cc 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -41,6 +41,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" @@ -420,8 +421,8 @@ func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retEr } } -func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) { - ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") +func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) *inFlightData { + _, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") r.inFlightWG.Add(1) r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1) @@ -433,7 +434,7 @@ func (r *receiverStream) newInFlightData(ctx context.Context, method string, bat span: span, } id.refs.Add(1) - return ctx, id + return id } // inFlightData is responsible for storing the resources held by one request. @@ -551,8 +552,14 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre // or plog.Logs item. req, err := serverStream.Recv() + // the incoming stream context is the parent of the in-flight context, which + // carries a span covering sequential stream-processing work. the context + // is severed at this point, with flight.span a contextless child that will be + // finished in recvDone(). + flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) + // inflightCtx is carried through into consumeAndProcess on the success path. - inflightCtx, flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) + inflightCtx := context.Background() defer flight.recvDone(inflightCtx, &retErr) if err != nil { @@ -607,6 +614,25 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre } } + var callerCancel context.CancelFunc + if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 { + if timeout, err := grpcutil.DecodeTimeout(encodedTimeout[0]); err != nil { + r.telemetry.Logger.Debug("grpc-timeout parse error", zap.Error(err)) + } else { + // timeout parsed successfully + inflightCtx, callerCancel = context.WithTimeout(inflightCtx, timeout) + + // if we return before the new goroutine is started below + // cancel the context. callerCancel will be non-nil until + // the new goroutine is created at the end of this function. + defer func() { + if callerCancel != nil { + callerCancel() + } + }() + } + } + // Use the bounded queue to memory limit based on incoming // uncompressed request size and waiters. Acquire will fail // immediately if there are too many waiters, or will @@ -644,16 +670,24 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre flight.refs.Add(1) // consumeAndRespond consumes the data and returns control to the sender loop. - go r.consumeAndRespond(inflightCtx, data, flight) + go func(callerCancel context.CancelFunc) { + if callerCancel != nil { + defer callerCancel() + } + r.consumeAndRespond(inflightCtx, streamCtx, data, flight) + }(callerCancel) + + // Reset callerCancel so the deferred function above does not call it here. + callerCancel = nil return nil } // consumeAndRespond finishes the span started in recvOne and logs the // result after invoking the pipeline to consume the data. -func (r *Receiver) consumeAndRespond(ctx context.Context, data any, flight *inFlightData) { +func (r *Receiver) consumeAndRespond(ctx, streamCtx context.Context, data any, flight *inFlightData) { var err error - defer flight.consumeDone(ctx, &err) + defer flight.consumeDone(streamCtx, &err) // recoverErr is a special function because it recovers panics, so we // keep it in a separate defer than the processing above, which will diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 70c31e9e7320..f100c1af49c6 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -135,6 +135,21 @@ func (u unhealthyTestChannel) onConsume(ctx context.Context) error { } } +type blockingTestChannel struct { + t *testing.T + cf func(context.Context) +} + +func newBlockingTestChannel(t *testing.T, cf func(context.Context)) *blockingTestChannel { + return &blockingTestChannel{t: t, cf: cf} +} + +func (h blockingTestChannel) onConsume(ctx context.Context) error { + h.cf(ctx) + <-ctx.Done() + return status.Error(codes.DeadlineExceeded, ctx.Err().Error()) +} + type recvResult struct { payload *arrowpb.BatchArrowRecords err error @@ -300,6 +315,14 @@ func statusUnavailableFor(batchID int64, msg string) *arrowpb.BatchStatus { } } +func statusDeadlineExceededFor(batchID int64, msg string) *arrowpb.BatchStatus { + return &arrowpb.BatchStatus{ + BatchId: batchID, + StatusCode: arrowpb.StatusCode_DEADLINE_EXCEEDED, + StatusMessage: msg, + } +} + func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI { mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) cons := arrowRecord.NewConsumer() @@ -600,6 +623,50 @@ func TestReceiverSendError(t *testing.T) { requireUnavailableStatus(t, err) } +func TestReceiverTimeoutError(t *testing.T) { + tc := newBlockingTestChannel(t, func(ctx context.Context) { + deadline, has := ctx.Deadline() + require.True(t, has, "context has deadline") + timeout := time.Until(deadline) + require.Less(t, time.Second/2, timeout) + require.GreaterOrEqual(t, time.Second, timeout) + }) + ctc := newCommonTestCase(t, tc) + + ld := testdata.GenerateLogs(2) + batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld) + require.NoError(t, err) + + ctc.stream.EXPECT().Send(statusDeadlineExceededFor(batch.BatchId, "context deadline exceeded")).Times(1).Return(nil) + + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + err = hpe.WriteField(hpack.HeaderField{ + Name: "grpc-timeout", + Value: "1000m", + }) + assert.NoError(t, err) + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + + ctc.start(ctc.newRealConsumer, defaultBQ()) + ctc.putBatch(batch, nil) + + assert.EqualValues(t, ld, (<-ctc.consume).Data) + + start := time.Now() + for time.Since(start) < 5*time.Second { + if ctc.ctrl.Satisfied() { + break + } + time.Sleep(time.Second) + } + + close(ctc.receive) + err = ctc.wait() + require.NoError(t, err) +} + func TestReceiverConsumeError(t *testing.T) { stdTesting := otelAssert.NewStdUnitTest(t) From 771d5b74be812e30515e04810229cf4f89c32fd2 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Mon, 19 Aug 2024 16:28:02 -0700 Subject: [PATCH 2/4] chlog --- .chloggen/otelarrow-receiver-timeout.yaml | 27 +++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/otelarrow-receiver-timeout.yaml diff --git a/.chloggen/otelarrow-receiver-timeout.yaml b/.chloggen/otelarrow-receiver-timeout.yaml new file mode 100644 index 000000000000..540e2b379265 --- /dev/null +++ b/.chloggen/otelarrow-receiver-timeout.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add gRPC timeout propagation. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34742] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 6b855d5846c09d63ca776becf0d94f09d2dfaa07 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 5 Sep 2024 14:37:06 -0700 Subject: [PATCH 3/4] add e2e test --- internal/otelarrow/test/e2e_test.go | 59 +++++++++++++++++-- .../otelarrowreceiver/internal/arrow/arrow.go | 8 ++- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 85783b00361a..ae1ca1b038aa 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -46,11 +46,14 @@ import ( ) type testParams struct { - threadCount int - requestUntil func(*testConsumer) bool + threadCount int + requestUntil func(*testConsumer) bool + expectDeadline bool } type testConsumer struct { + t *testing.T + sink consumertest.TracesSink sentSpans atomic.Int64 @@ -62,6 +65,8 @@ type testConsumer struct { recvSpans *tracetest.InMemoryExporter expSpans *tracetest.InMemoryExporter + + expectDeadline bool } var _ consumer.Traces = &testConsumer{} @@ -80,6 +85,19 @@ func (*testConsumer) Capabilities() consumer.Capabilities { func (tc *testConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { time.Sleep(time.Duration(float64(time.Millisecond) * (1 + rand.Float64()))) + + dead, hasDeadline := ctx.Deadline() + timeout := time.Until(dead) + + require.Equal(tc.t, tc.expectDeadline, hasDeadline, "deadline set or not set: %v", timeout) + if tc.expectDeadline { + // expect allows 1/6 of the deadline to elapse in transit, + // so 1m becomes 50s. + expect := tc.expCfg.Timeout * 5 / 6 + require.Less(tc.t, expect, timeout) + require.Greater(tc.t, tc.expCfg.Timeout, timeout) + } + return tc.sink.ConsumeTraces(ctx, td) } @@ -96,7 +114,7 @@ func testLoggerSettings(t *testing.T) (component.TelemetrySettings, *observer.Ob return tset, obslogs, exp } -func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) { +func basicTestConfig(t *testing.T, tp testParams, cfgF CfgFunc) (*testConsumer, exporter.Traces, receiver.Traces) { ctx := context.Background() efact := otelarrowexporter.NewFactory() @@ -111,6 +129,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces addr := testutil.GetAvailableLocalAddress(t) receiverCfg.Protocols.GRPC.NetAddr.Endpoint = addr + exporterCfg.ClientConfig.Endpoint = addr exporterCfg.ClientConfig.WaitForReady = true exporterCfg.ClientConfig.TLSSetting.Insecure = true @@ -119,6 +138,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces exporterCfg.RetryConfig.Enabled = true exporterCfg.Arrow.NumStreams = 1 exporterCfg.Arrow.MaxStreamLifetime = 5 * time.Second + exporterCfg.Arrow.DisableDowngrade = true if cfgF != nil { cfgF(exporterCfg, receiverCfg) @@ -128,6 +148,8 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces recvTset, recvLogs, recvSpans := testLoggerSettings(t) testCon := &testConsumer{ + t: t, + recvCfg: receiverCfg, expCfg: exporterCfg, @@ -136,6 +158,8 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces recvSpans: recvSpans, expSpans: expSpans, + + expectDeadline: tp.expectDeadline, } receiver, err := rfact.CreateTracesReceiver(ctx, receiver.Settings{ @@ -157,7 +181,7 @@ func basicTestConfig(t *testing.T, cfgF CfgFunc) (*testConsumer, exporter.Traces func testIntegrationTraces(ctx context.Context, t *testing.T, tp testParams, cfgf CfgFunc, mkgen MkGen, errf ConsumerErrFunc, endf EndFunc) { host := componenttest.NewNopHost() - testCon, exporter, receiver := basicTestConfig(t, cfgf) + testCon, exporter, receiver := basicTestConfig(t, tp, cfgf) var startWG sync.WaitGroup var exporterShutdownWG sync.WaitGroup @@ -413,6 +437,33 @@ func TestIntegrationTracesSimple(t *testing.T) { } } +func TestIntegrationDeadlinePropagation(t *testing.T) { + for _, hasDeadline := range []bool{false, true} { + t.Run(fmt.Sprint("deadline=", hasDeadline), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Until at least one span is written. + var params = testParams{ + threadCount: 1, + requestUntil: func(test *testConsumer) bool { + return test.sink.SpanCount() < 1 + }, + expectDeadline: hasDeadline, + } + + testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, _ *RecvConfig) { + if !hasDeadline { + // 0 disables the exporthelper-set timeout. + ecfg.TimeoutSettings.Timeout = 0 + } else { + ecfg.TimeoutSettings.Timeout = 37 * time.Minute + } + }, func() GenFunc { return makeTestTraces }, consumerSuccess, standardEnding) + }) + } +} + func TestIntegrationMemoryLimited(t *testing.T) { // This test is flaky, it only shows on Windows. This will be // addressed in a separate PR. diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index 0fec92ac9e80..717dbed42f0c 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -174,7 +174,9 @@ func newHeaderReceiver(streamCtx context.Context, as auth.Server, includeMetadat // client.Info with additional key:values associated with the arrow batch. func (h *headerReceiver) combineHeaders(ctx context.Context, hdrsBytes []byte) (context.Context, map[string][]string, error) { if len(hdrsBytes) == 0 && len(h.streamHdrs) == 0 { - return ctx, nil, nil + // Note: call newContext in this case to ensure that + // connInfo is added to the context, for Auth. + return h.newContext(ctx, nil), nil, nil } if len(hdrsBytes) == 0 { @@ -559,7 +561,9 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) // inflightCtx is carried through into consumeAndProcess on the success path. - inflightCtx := context.Background() + // this inherits the stream context so that its auth headers are present + // when the per-data Auth call is made. + inflightCtx := streamCtx defer flight.recvDone(inflightCtx, &retErr) if recvErr != nil { From 42952589db3c55fdd4ea021573232aac7f8e4305 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 6 Sep 2024 09:37:14 -0700 Subject: [PATCH 4/4] test logic fix --- internal/otelarrow/test/e2e_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 4717b6a501f9..c48d81801582 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -45,9 +45,12 @@ import ( ) type testParams struct { - threadCount int - requestUntil func(*testConsumer) bool - expectDeadline bool + threadCount int + requestUntil func(*testConsumer) bool + + // missingDeadline is configured so the zero value implies a deadline, + // which is the default. + missingDeadline bool } type testConsumer struct { @@ -162,7 +165,7 @@ func basicTestConfig(t *testing.T, tp testParams, cfgF CfgFunc) (*testConsumer, recvSpans: recvSpans, expSpans: expSpans, - expectDeadline: tp.expectDeadline, + expectDeadline: !tp.missingDeadline, } receiver, err := rfact.CreateTracesReceiver(ctx, receiver.Settings{ @@ -461,7 +464,7 @@ func TestIntegrationDeadlinePropagation(t *testing.T) { requestUntil: func(test *testConsumer) bool { return test.sink.SpanCount() < 1 }, - expectDeadline: hasDeadline, + missingDeadline: !hasDeadline, } testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, _ *RecvConfig) {