diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 41b358906e0a..f892a2a7a89c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -34,6 +34,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc/health/grpc_health_v1" + server_util "github.com/grafana/loki/v3/pkg/util/server" + "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" @@ -1041,6 +1043,13 @@ func (i *Ingester) asyncStoreMaxLookBack() time.Duration { // GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { + gcr, err := i.getChunkIDs(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return gcr, err +} + +// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb. +func (i *Ingester) getChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { orgID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1168,6 +1177,12 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp // Series queries the ingester for log stream identifiers (label sets) matching a set of matchers func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + sr, err := i.series(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return sr, err +} + +func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1331,6 +1346,11 @@ func (i *Ingester) getInstances() []*instance { // Tail logs matching given query func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { + err := i.tail(req, queryServer) + err = server_util.ClientGrpcStatusAndError(err) + return err +} +func (i *Ingester) tail(req *logproto.TailRequest, queryServer logproto.Querier_TailServer) error { select { case <-i.tailersQuit: return errors.New("Ingester is stopping") @@ -1376,6 +1396,12 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ // TailersCount returns count of active tail requests from a user func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { + tcr, err := i.tailersCount(ctx) + err = server_util.ClientGrpcStatusAndError(err) + return tcr, err +} + +func (i *Ingester) tailersCount(ctx context.Context) (*logproto.TailersCountResponse, error) { instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -1431,6 +1457,12 @@ func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFiel // GetDetectedLabels returns map of detected labels and unique values from this ingester func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) { + lvr, err := i.getDetectedLabels(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return lvr, err +} + +func (i *Ingester) getDetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.LabelToValuesResponse, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7f1ec78601ff..ecef3f10347b 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -49,6 +49,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/deletion" util_log "github.com/grafana/loki/v3/pkg/util/log" mathutil "github.com/grafana/loki/v3/pkg/util/math" + server_util "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/validation" ) @@ -441,6 +442,12 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels } func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { + it, err := i.query(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return it, err +} + +func (i *instance) query(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { expr, err := req.LogSelector() if err != nil { return nil, err @@ -495,6 +502,12 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E } func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { + it, err := i.querySample(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return it, err +} + +func (i *instance) querySample(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { expr, err := req.Expr() if err != nil { return nil, err @@ -556,6 +569,12 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams // If label matchers are given only the matching streams are fetched from the index. // The label names or values are then retrieved from those matching streams. func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) { + lr, err := i.label(ctx, req, matchers...) + err = server_util.ClientGrpcStatusAndError(err) + return lr, err +} + +func (i *instance) label(ctx context.Context, req *logproto.LabelRequest, matchers ...*labels.Matcher) (*logproto.LabelResponse, error) { if len(matchers) == 0 { var labels []string if req.Values { @@ -709,6 +728,12 @@ func (i *instance) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo } func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { + isr, err := i.getStats(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return isr, err +} + +func (i *instance) getStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { matchers, err := syntax.ParseMatchers(req.Matchers, true) if err != nil { return nil, err @@ -765,6 +790,12 @@ func (i *instance) GetStats(ctx context.Context, req *logproto.IndexStatsRequest } func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { + vr, err := i.getVolume(ctx, req) + err = server_util.ClientGrpcStatusAndError(err) + return vr, err +} + +func (i *instance) getVolume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { matchers, err := syntax.ParseMatchers(req.Matchers, true) if err != nil && req.Matchers != seriesvolume.MatchAny { return nil, err diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index c120a79176f8..7326f7cecb6c 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -27,6 +27,15 @@ const ( ErrDeadlineExceeded = "Request timed out, decrease the duration of the request or add more label matchers (prefer exact match over regex match) to reduce the amount of data processed." ) +func ClientGrpcStatusAndError(err error) error { + if err == nil { + return nil + } + + status, newErr := ClientHTTPStatusAndError(err) + return httpgrpc.Errorf(status, "%s", newErr.Error()) +} + // WriteError write a go error with the correct status code. func WriteError(err error, w http.ResponseWriter) { status, cerr := ClientHTTPStatusAndError(err)