Skip to content

Commit

Permalink
chore: Remove unused stream index from RF1 ingester (#13758)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Aug 7, 2024
1 parent 7683a79 commit 638f59f
Showing 1 changed file with 3 additions and 12 deletions.
15 changes: 3 additions & 12 deletions pkg/ingester-rf1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
Expand Down Expand Up @@ -61,7 +60,6 @@ type instance struct {
buf []byte // buffer used to compute fps.
streams *streamsMap

index *index.Multi
mapper *FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free

instanceID string
Expand Down Expand Up @@ -140,18 +138,13 @@ func newInstance(
customStreamsTracker push.UsageTracker,
logger log.Logger,
) (*instance, error) {
invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards))
if err != nil {
return nil, err
}
streams := newStreamsMap()
ownedStreamsSvc := newOwnedStreamService(instanceID, limiter)
c := config.SchemaConfig{Configs: periodConfigs}
i := &instance{
cfg: cfg,
streams: streams,
buf: make([]byte, 0, 1024),
index: invertedIndex,
instanceID: instanceID,
//
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -182,7 +175,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
// reducing the stream limits, for instance.
var err error

labels, err := syntax.ParseLabels(pushReqStream.Labels)
sortedLabels, err := syntax.ParseLabels(pushReqStream.Labels)
if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
Expand All @@ -196,12 +189,10 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
}

if err != nil {
return i.onStreamCreationError(ctx, pushReqStream, err, labels)
return i.onStreamCreationError(ctx, pushReqStream, err, sortedLabels)
}

fp := i.getHashForLabels(labels)

sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
fp := i.getHashForLabels(sortedLabels)

chunkfmt, headfmt, err := i.chunkFormatAt(minTs(&pushReqStream))
if err != nil {
Expand Down

0 comments on commit 638f59f

Please sign in to comment.