diff --git a/.chloggen/pubsubreceiver-add-cloudlogging-support.yaml b/.chloggen/pubsubreceiver-add-cloudlogging-support.yaml new file mode 100644 index 000000000000..4ae4644bd5ca --- /dev/null +++ b/.chloggen/pubsubreceiver-add-cloudlogging-support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: googlepubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for GoogleCloud logging encoding + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29299] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/googlecloudpubsubreceiver/README.md b/receiver/googlecloudpubsubreceiver/README.md index 83cb6d0da907..9a286c137770 100644 --- a/receiver/googlecloudpubsubreceiver/README.md +++ b/receiver/googlecloudpubsubreceiver/README.md @@ -23,8 +23,8 @@ The following configuration options are supported: * `subscription` (Required): The subscription name to receive OTLP data from. The subscription name should be a fully qualified resource name (eg: `projects/otel-project/subscriptions/otlp`). * `encoding` (Optional): The encoding that will be used to received data from the subscription. This can either be - `otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, or `raw_text` (see `encoding`). This will only be used as - a fallback, when no `content-type` attribute is present. + `otlp_proto_trace`, `otlp_proto_metric`, `otlp_proto_log`, `cloud_logging`, or `raw_text` (see `encoding`). This will + only be used as a fallback, when no `content-type` attribute is present. * `compression` (Optional): The compression that will be used on received data from the subscription. When set it can only be `gzip`. This will only be used as a fallback, when no `content-encoding` attribute is present. * `endpoint` (Optional): Override the default Pubsub Endpoint, useful when connecting to the PubSub emulator instance @@ -46,20 +46,29 @@ You should not need to set the encoding of the subscription as the receiver will by looking at the `ce-type` and `content-type` attributes of the message. Only when those attributes are not set must the `encoding` field in the configuration be set. -| ce-type] | ce-datacontenttype | encoding | description | -| --- | --- | --- | --- | -| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | -| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | -| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | -| - | - | otlp_proto_trace | Decode OTLP trace message | -| - | - | otlp_proto_metric | Decode OTLP trace message | -| - | - | otlp_proto_log | Decode OTLP trace message | -| - | - | raw_text | Wrap in an OTLP log message | +| ce-type | ce-datacontenttype | encoding | description | +|-----------------------------------|----------------------|-------------------|------------------------------------------------| +| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | +| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | +| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | +| - | - | otlp_proto_trace | Decode OTLP trace message | +| - | - | otlp_proto_metric | Decode OTLP trace message | +| - | - | otlp_proto_log | Decode OTLP trace message | +| - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type | +| - | - | raw_text | Wrap in an OTLP log message | When the `encoding` configuration is set, the attributes on the message are ignored. -The receiver can be used for ingesting arbitrary text message on a Pubsub subscription and wrap them in OTLP Log -message, making it a convenient way to ingest log lines from Pubsub. +With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll +first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. Note that the `cloud_logging` +integration is considered **alpha** as the semantic convention on some of the conversion are not stabilized yet. + +With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them +in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub. + +[Cloud Logging]: https://cloud.google.com/logging +[LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry +[sink-docs]: https://cloud.google.com/logging/docs/export/configure_export_v2#creating_sink ## Pubsub subscription diff --git a/receiver/googlecloudpubsubreceiver/config.go b/receiver/googlecloudpubsubreceiver/config.go index 82cf760119e4..83ad930a29dc 100644 --- a/receiver/googlecloudpubsubreceiver/config.go +++ b/receiver/googlecloudpubsubreceiver/config.go @@ -46,8 +46,9 @@ func (config *Config) validateForLog() error { case "otlp_proto_log": case "raw_text": case "raw_json": + case "cloud_logging": default: - return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json]", config.Encoding) + return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding) } return nil } diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index 4c3732a95a4d..78d0e307d2be 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -3,7 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/google go 1.20 require ( + cloud.google.com/go/logging v1.9.0 cloud.google.com/go/pubsub v1.36.1 + github.com/google/go-cmp v0.6.0 + github.com/iancoleman/strcase v0.3.0 + github.com/json-iterator/go v1.1.12 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.93.1-0.20240130182548-89388addcc7f go.opentelemetry.io/collector/confmap v0.93.1-0.20240130182548-89388addcc7f @@ -13,9 +17,13 @@ require ( go.opentelemetry.io/collector/receiver v0.93.1-0.20240130182548-89388addcc7f go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/trace v1.22.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 google.golang.org/api v0.160.0 + google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac + google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe google.golang.org/grpc v1.61.0 + google.golang.org/protobuf v1.32.0 ) require ( @@ -23,6 +31,7 @@ require ( cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v1.1.5 // indirect + cloud.google.com/go/longrunning v0.5.4 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect @@ -33,11 +42,9 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.0.1 // indirect @@ -63,7 +70,6 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.45.0 // indirect go.opentelemetry.io/otel/sdk v1.22.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect @@ -72,10 +78,7 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect - google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect - google.golang.org/protobuf v1.32.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/googlecloudpubsubreceiver/go.sum b/receiver/googlecloudpubsubreceiver/go.sum index f916986a4470..9071f90e34e1 100644 --- a/receiver/googlecloudpubsubreceiver/go.sum +++ b/receiver/googlecloudpubsubreceiver/go.sum @@ -7,6 +7,10 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= +cloud.google.com/go/logging v1.9.0 h1:iEIOXFO9EmSiTjDmfpbRjOxECO7R8C7b8IXUGOj7xZw= +cloud.google.com/go/logging v1.9.0/go.mod h1:1Io0vnZv4onoUnsVUQY3HZ3Igb1nBchky0A0y7BBBhE= +cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg= +cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= cloud.google.com/go/pubsub v1.36.1 h1:dfEPuGCHGbWUhaMCTHUFjfroILEkx55iUmKBZTP5f+Y= cloud.google.com/go/pubsub v1.36.1/go.mod h1:iYjCa9EzWOoBiTdd4ps7QoMtMln5NwaZQpK1hbRfBDE= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -73,6 +77,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= +github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= +github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/receiver/googlecloudpubsubreceiver/internal/common_protos.go b/receiver/googlecloudpubsubreceiver/internal/common_protos.go new file mode 100644 index 000000000000..a1f5c2e55d98 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/common_protos.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" + +import ( + _ "google.golang.org/genproto/googleapis/cloud/audit" // support decoding Cloud Audit logs +) diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry.go b/receiver/googlecloudpubsubreceiver/internal/log_entry.go new file mode 100644 index 000000000000..e2b86f4ecf1d --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry.go @@ -0,0 +1,598 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" + +import ( + "bytes" + "context" + "encoding/hex" + stdjson "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "cloud.google.com/go/logging/apiv2/loggingpb" + "github.com/iancoleman/strcase" + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/anypb" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +var invalidTraceID = [16]byte{} +var invalidSpanID = [8]byte{} + +func cloudLoggingTraceToTraceIDBytes(trace string) [16]byte { + // Format: projects/my-gcp-project/traces/4ebc71f1def9274798cac4e8960d0095 + lastSlashIdx := strings.LastIndex(trace, "/") + if lastSlashIdx == -1 { + return invalidTraceID + } + traceIDStr := trace[lastSlashIdx+1:] + + return traceIDStrTotraceIDBytes(traceIDStr) +} + +func traceIDStrTotraceIDBytes(traceIDStr string) [16]byte { + traceIDSlice := [16]byte{} + decoded, err := hex.Decode(traceIDSlice[:], []byte(traceIDStr)) + if err != nil || decoded != 16 { + return invalidTraceID + } + + return traceIDSlice +} + +func spanIDStrToSpanIDBytes(spanIDStr string) [8]byte { + spanIDSlice := [8]byte{} + decoded, err := hex.Decode(spanIDSlice[:], []byte(spanIDStr)) + if err != nil || decoded != 8 { + return invalidSpanID + } + + return spanIDSlice +} + +func cloudLoggingSeverityToNumber(severity string) plog.SeverityNumber { + // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity + switch severity { + case "DEBUG": + return plog.SeverityNumberDebug + case "INFO": + return plog.SeverityNumberInfo + case "NOTICE": + return plog.SeverityNumberInfo2 + case "WARNING": + return plog.SeverityNumberWarn + case "ERROR": + return plog.SeverityNumberError + case "CRITICAL": + return plog.SeverityNumberFatal + case "ALERT": + return plog.SeverityNumberFatal2 + case "EMERGENCY": + return plog.SeverityNumberFatal4 + case "DEFAULT": + } + return plog.SeverityNumberUnspecified +} + +var desc protoreflect.MessageDescriptor +var descOnce sync.Once + +func getLogEntryDescriptor() protoreflect.MessageDescriptor { + descOnce.Do(func() { + var logEntry loggingpb.LogEntry + + desc = logEntry.ProtoReflect().Descriptor() + }) + + return desc +} + +// TranslateLogEntry translates a JSON-encoded LogEntry message into a pair of +// pcommon.Resource and plog.LogRecord, trying to keep as close as possible to +// the semantic conventions. +// +// For maximum fidelity, the decoding is done according to the protobuf message +// schema; this ensures that a numeric value in the input is correctly +// translated to either an integer or a double in the output. It falls back to +// plain JSON decoding if payload type is not available in the proto registry. +func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) { + lr := plog.NewLogRecord() + res := pcommon.NewResource() + + var src map[string]stdjson.RawMessage + err := json.Unmarshal(data, &src) + + if err != nil { + return res, lr, err + } + + resAttrs := res.Attributes() + attrs := lr.Attributes() + + for k, v := range src { + // Pick out some keys for special handling, and let the rest + // pass through to be translated according to the schema. + switch k { + // Unpack as suggested in the logs data model appendix + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model-appendix.md#google-cloud-logging + case "insertId": + // timestamp -> Attributes[“log.record.uid”] + // see: https://github.com/open-telemetry/semantic-conventions/blob/main/model/logs/general.yaml + var insertID string + err = json.Unmarshal(v, &insertID) + if err != nil { + return res, lr, err + } + attrs.PutStr("log.record.uid", insertID) + delete(src, k) + case "timestamp": + // timestamp -> Timestamp + var t time.Time + err = json.Unmarshal(v, &t) + if err != nil { + return res, lr, err + } + lr.SetTimestamp(pcommon.NewTimestampFromTime(t)) + delete(src, k) + case "receiveTimestamp": + // timestamp -> Timestamp + var t time.Time + err = json.Unmarshal(v, &t) + if err != nil { + return res, lr, err + } + lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(t)) + delete(src, k) + case "resource": + // resource -> Resource + // mapping type -> gcp.resource_type + // labels -> gcp.