diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index b23fdd2ea402..d45c11bea637 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -166,7 +166,7 @@ func (i *Ingester) flushLoop(j int) { } func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { - instance, ok, _ := i.getInstanceByID(userID) + instance, ok := i.getInstanceByID(userID) if !ok { return nil } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cf6f9b63ecb1..d1d77b42a57e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -162,21 +162,19 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro instanceID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err - } - - instance, readonly := i.getOrCreateInstance(instanceID) - if readonly { + } else if i.readonly { return nil, ErrReadOnly } + instance := i.getOrCreateInstance(instanceID) err = instance.Push(ctx, req) return &logproto.PushResponse{}, err } -func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, readonly bool) { - inst, ok, readonly := i.getInstanceByID(instanceID) - if ok || readonly { - return inst, readonly +func (i *Ingester) getOrCreateInstance(instanceID string) *instance { + inst, ok := i.getInstanceByID(instanceID) + if ok || i.readonly { + return inst } i.instancesMtx.Lock() @@ -186,7 +184,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, r inst = newInstance(instanceID, i.cfg.BlockSize) i.instances[instanceID] = inst } - return inst, i.readonly + return inst } // Query the ingests for log streams matching a set of matchers. @@ -196,7 +194,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } - instance, _ := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(instanceID) return instance.Query(req, queryServer) } @@ -207,7 +205,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp return nil, err } - instance, _ := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(instanceID) return instance.Label(ctx, req) } @@ -236,12 +234,12 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { } } -func (i *Ingester) getInstanceByID(id string) (instance *instance, ok bool, readonly bool) { +func (i *Ingester) getInstanceByID(id string) (*instance, bool) { i.instancesMtx.RLock() defer i.instancesMtx.RUnlock() inst, ok := i.instances[id] - return inst, ok, i.readonly + return inst, ok } func (i *Ingester) getInstances() []*instance { @@ -268,7 +266,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ return err } - instance, _ := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(instanceID) tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer) if err != nil { return err diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 52f264cc9442..c329dab083b4 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -82,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value}) } - instance, _ := i.getOrCreateInstance(chunkSet.UserId) + instance := i.getOrCreateInstance(chunkSet.UserId) for _, chunk := range chunkSet.Chunks { if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil { return err