From d34b1f74508076f55b3b1aed78ad9b9fc3d02140 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 22 Jul 2019 13:49:20 -0400 Subject: [PATCH 1/6] ingester: support chunk transfers on ingester shutdown. This commit introduces chunk transfers, borrowing the mechanism from Cortex's implementation: when an ingester is shut down with claim on rollout enabled, the ingester will find a pending ingester and transfer all of its chunks to it. --- pkg/chunkenc/gzip.go | 5 +- pkg/ingester/client/client.go | 8 +- pkg/ingester/flush.go | 2 +- pkg/ingester/flush_test.go | 3 +- pkg/ingester/ingester.go | 71 +- pkg/ingester/ingester_test.go | 3 +- pkg/ingester/instance.go | 19 + pkg/ingester/stream.go | 15 + pkg/ingester/transfer.go | 228 ++++++ pkg/logproto/logproto.pb.go | 1332 +++++++++++++++++++++++++++++++-- pkg/logproto/logproto.proto | 24 + pkg/loki/modules.go | 2 +- 12 files changed, 1592 insertions(+), 120 deletions(-) create mode 100644 pkg/ingester/transfer.go diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index 3b3bb67ee354..fd8580fdcd46 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -155,8 +155,9 @@ func NewMemChunk(enc Encoding) *MemChunk { // NewByteChunk returns a MemChunk on the passed bytes. func NewByteChunk(b []byte) (*MemChunk, error) { bc := &MemChunk{ - cPool: &Gzip, - head: &headBlock{}, // Dummy, empty headblock. + cPool: &Gzip, + encoding: EncGZIP, + head: &headBlock{}, // Dummy, empty headblock. } db := decbuf{b: b} diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 9c08be7fbdf2..ea8f0a72263e 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -55,11 +55,13 @@ func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) { return struct { logproto.PusherClient logproto.QuerierClient + logproto.IngesterClient grpc_health_v1.HealthClient io.Closer }{ - PusherClient: logproto.NewPusherClient(conn), - QuerierClient: logproto.NewQuerierClient(conn), - Closer: conn, + PusherClient: logproto.NewPusherClient(conn), + QuerierClient: logproto.NewQuerierClient(conn), + IngesterClient: logproto.NewIngesterClient(conn), + Closer: conn, }, nil } diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index d45c11bea637..b23fdd2ea402 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -166,7 +166,7 @@ func (i *Ingester) flushLoop(j int) { } func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { - instance, ok := i.getInstanceByID(userID) + instance, ok, _ := i.getInstanceByID(userID) if !ok { return nil } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index aab3f96f3862..c75a7c8fd414 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/common/model" @@ -61,7 +62,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) { chunks: map[string][]chunk.Chunk{}, } - ing, err := New(cfg, store) + ing, err := New(cfg, client.Config{}, store) require.NoError(t, err) return store, ing diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 05638f37174d..cf6f9b63ecb1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -17,9 +17,14 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" + "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" ) +// ErrReadOnly is returned when the ingester is shutting down and a push was +// attempted. +var ErrReadOnly = errors.New("Ingester is shutting down") + var readinessProbeSuccess = []byte("Ready") var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ @@ -31,18 +36,25 @@ var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ type Config struct { LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` + // Config for transferring chunks. + MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"` + ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` BlockSize int `yaml:"chunk_block_size"` + + // For testing, you can override the address and ID of this ingester. + ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) } // RegisterFlags registers the flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) + f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.") f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushed", 16, "") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "") f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "") @@ -53,10 +65,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Ingester builds chunks for incoming log streams. type Ingester struct { - cfg Config + cfg Config + clientConfig client.Config instancesMtx sync.RWMutex instances map[string]*instance + readonly bool lifecycler *ring.Lifecycler store ChunkStore @@ -77,14 +91,19 @@ type ChunkStore interface { } // New makes a new Ingester. -func New(cfg Config, store ChunkStore) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store ChunkStore) (*Ingester, error) { + if cfg.ingesterClientFactory == nil { + cfg.ingesterClientFactory = client.New + } + i := &Ingester{ - cfg: cfg, - instances: map[string]*instance{}, - store: store, - quit: make(chan struct{}), - flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - quitting: make(chan struct{}), + cfg: cfg, + clientConfig: clientConfig, + instances: map[string]*instance{}, + store: store, + quit: make(chan struct{}), + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + quitting: make(chan struct{}), } i.flushQueuesDone.Add(cfg.ConcurrentFlushes) @@ -138,16 +157,6 @@ func (i *Ingester) Stopping() { } } -// StopIncomingRequests implements ring.Lifecycler. -func (i *Ingester) StopIncomingRequests() { - -} - -// TransferOut implements ring.Lifecycler. -func (i *Ingester) TransferOut(context.Context) error { - return nil -} - // Push implements logproto.Pusher. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { instanceID, err := user.ExtractOrgID(ctx) @@ -155,15 +164,19 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro return nil, err } - instance := i.getOrCreateInstance(instanceID) + instance, readonly := i.getOrCreateInstance(instanceID) + if readonly { + return nil, ErrReadOnly + } + err = instance.Push(ctx, req) return &logproto.PushResponse{}, err } -func (i *Ingester) getOrCreateInstance(instanceID string) *instance { - inst, ok := i.getInstanceByID(instanceID) - if ok { - return inst +func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, readonly bool) { + inst, ok, readonly := i.getInstanceByID(instanceID) + if ok || readonly { + return inst, readonly } i.instancesMtx.Lock() @@ -173,7 +186,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { inst = newInstance(instanceID, i.cfg.BlockSize) i.instances[instanceID] = inst } - return inst + return inst, i.readonly } // Query the ingests for log streams matching a set of matchers. @@ -183,7 +196,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } - instance := i.getOrCreateInstance(instanceID) + instance, _ := i.getOrCreateInstance(instanceID) return instance.Query(req, queryServer) } @@ -194,7 +207,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp return nil, err } - instance := i.getOrCreateInstance(instanceID) + instance, _ := i.getOrCreateInstance(instanceID) return instance.Label(ctx, req) } @@ -223,12 +236,12 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { } } -func (i *Ingester) getInstanceByID(id string) (*instance, bool) { +func (i *Ingester) getInstanceByID(id string) (instance *instance, ok bool, readonly bool) { i.instancesMtx.RLock() defer i.instancesMtx.RUnlock() inst, ok := i.instances[id] - return inst, ok + return inst, ok, i.readonly } func (i *Ingester) getInstances() []*instance { @@ -255,7 +268,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ return err } - instance := i.getOrCreateInstance(instanceID) + instance, _ := i.getOrCreateInstance(instanceID) tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer) if err != nil { return err diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 16f395b5becb..4c6949138227 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" @@ -21,7 +22,7 @@ func TestIngester(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, store) + i, err := New(ingesterConfig, client.Config{}, store) require.NoError(t, err) defer i.Shutdown() diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index be38c6400aaf..2cd75c100952 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -71,6 +71,25 @@ func newInstance(instanceID string, blockSize int) *instance { } } +// consumeChunk manually adds a chunk that was received during ingester chunk +// transfer. +func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error { + i.streamsMtx.Lock() + defer i.streamsMtx.Unlock() + + fp := client.FastFingerprint(labels) + stream, ok := i.streams[fp] + if !ok { + stream = newStream(fp, labels, i.blockSize) + i.index.Add(labels, fp) + i.streams[fp] = stream + i.streamsCreatedTotal.Inc() + i.addTailersToNewStream(stream) + } + + return stream.consumeChunk(ctx, chunk) +} + func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { i.streamsMtx.Lock() defer i.streamsMtx.Unlock() diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index f3aab3d79760..0e131ea9f948 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -73,6 +73,21 @@ func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int } } +// consumeChunk manually adds a chunk to the stream that was received during +// ingester chunk transfer. +func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { + c, err := chunkenc.NewByteChunk(chunk.Data) + if err != nil { + return err + } + + s.chunks = append(s.chunks, chunkDesc{ + chunk: c, + }) + chunksCreatedTotal.Inc() + return nil +} + func (s *stream) Push(_ context.Context, entries []logproto.Entry) error { if len(s.chunks) == 0 { s.chunks = append(s.chunks, chunkDesc{ diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go new file mode 100644 index 000000000000..9a4872371051 --- /dev/null +++ b/pkg/ingester/transfer.go @@ -0,0 +1,228 @@ +package ingester + +import ( + "fmt" + "io" + "os" + "time" + + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/logproto" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/user" + "golang.org/x/net/context" +) + +var ( + sentChunks = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_sent_chunks", + Help: "The total number of chunks sent by this ingester whilst leaving.", + }) + receivedChunks = promauto.NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_received_chunks", + Help: "The total number of chunks received by this ingester whilst joining.", + }) +) + +// TransferChunks receieves all chunks from another ingester. The Ingester +// must be in PENDING state or else the call will fail. +func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { + // Entry JOINING state (only valid from PENDING) + if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil { + return err + } + + // The ingesters state effectively works as a giant mutex around this + // whole method, and as such we have to ensure we unlock the mutex. + defer func() { + state := i.lifecycler.GetState() + if i.lifecycler.GetState() == ring.ACTIVE { + return + } + + level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state) + + // Enter PENDING state (only valid from JOINING) + if i.lifecycler.GetState() == ring.JOINING { + if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil { + level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err) + os.Exit(1) + } + } + }() + + fromIngesterID := "" + seriesReceived := 0 + + for { + chunkSet, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + return err + } + + // We can't send "extra" fields with a streaming call, so we repeat + // chunkSet.FromIngesterId and assume it is the same every time around + // this loop. + if fromIngesterID == "" { + fromIngesterID = chunkSet.FromIngesterId + level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) + } + + userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId) + + lbls := []client.LabelAdapter{} + for _, lbl := range chunkSet.Labels { + lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value}) + } + + instance, _ := i.getOrCreateInstance(chunkSet.UserId) + for _, chunk := range chunkSet.Chunks { + if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil { + return err + } + } + + seriesReceived++ + receivedChunks.Add(float64(len(chunkSet.Chunks))) + } + + if seriesReceived == 0 { + level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID) + return fmt.Errorf("no series") + } else if fromIngesterID == "" { + level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester") + return fmt.Errorf("no ingester id") + } + + if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil { + return err + } + + if err := i.lifecycler.ChangeState(stream.Context(), ring.ACTIVE); err != nil { + return err + } + + // Close the stream last, as this is what tells the "from" ingester that + // it's OK to shut down. + if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil { + level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err) + return err + } + level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived) + return nil +} + +// StopIncomingRequests implements ring.Lifecycler. +func (i *Ingester) StopIncomingRequests() { + i.instancesMtx.Lock() + defer i.instancesMtx.Unlock() + i.readonly = true +} + +// TransferOut implements ring.Lifecycler. +func (i *Ingester) TransferOut(ctx context.Context) error { + backoff := util.NewBackoff(ctx, util.BackoffConfig{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 5 * time.Second, + MaxRetries: i.cfg.MaxTransferRetries, + }) + + for backoff.Ongoing() { + err := i.transferOut(ctx) + if err == nil { + return nil + } + + level.Error(util.Logger).Log("msg", "transfer failed", "err", err) + backoff.Wait() + } + + return backoff.Err() +} + +func (i *Ingester) transferOut(ctx context.Context) error { + targetIngester, err := i.findTransferTarget(ctx) + if err != nil { + return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err) + } + + level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr) + c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr) + if err != nil { + return err + } + if c, ok := c.(io.Closer); ok { + defer c.Close() + } + ic := c.(logproto.IngesterClient) + + ctx = user.InjectOrgID(ctx, "-1") + stream, err := ic.TransferChunks(ctx) + + _, err = stream.CloseAndRecv() + if err != nil { + return errors.Wrap(err, "CloseAndRecv") + } + + for instanceID, inst := range i.instances { + for _, istream := range inst.streams { + chunks := make([]*logproto.Chunk, 0, len(istream.chunks)) + + for _, c := range istream.chunks { + bb, err := c.chunk.Bytes() + if err != nil { + return err + } + + chunks = append(chunks, &logproto.Chunk{ + Data: bb, + }) + } + + lbls := []*logproto.LabelPair{} + for _, lbl := range istream.labels { + lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) + } + + stream.Send(&logproto.TimeSeriesChunk{ + Chunks: chunks, + UserId: instanceID, + Labels: lbls, + FromIngesterId: i.lifecycler.ID, + }) + + sentChunks.Add(float64(len(chunks))) + } + } + + for _, flushQueue := range i.flushQueues { + flushQueue.DiscardAndClose() + } + i.flushQueuesDone.Wait() + + level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr) + return nil +} + +// findTransferTarget finds an ingester in a PENDING state to use for transferring +// chunks to. +func (i *Ingester) findTransferTarget(ctx context.Context) (*ring.IngesterDesc, error) { + ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) + if err != nil { + return nil, err + } + + ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) + if len(ingesters) == 0 { + return nil, fmt.Errorf("no pending ingesters") + } + + return &ingesters[0], nil +} diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index 08cbd487fae0..e86d56f010f0 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -4,6 +4,7 @@ package logproto import ( + bytes "bytes" context "context" fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" @@ -656,6 +657,202 @@ func (m *DroppedStream) GetLabels() string { return "" } +type TimeSeriesChunk struct { + FromIngesterId string `protobuf:"bytes,1,opt,name=from_ingester_id,json=fromIngesterId,proto3" json:"from_ingester_id,omitempty"` + UserId string `protobuf:"bytes,2,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + Labels []*LabelPair `protobuf:"bytes,3,rep,name=labels,proto3" json:"labels,omitempty"` + Chunks []*Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks,omitempty"` +} + +func (m *TimeSeriesChunk) Reset() { *m = TimeSeriesChunk{} } +func (*TimeSeriesChunk) ProtoMessage() {} +func (*TimeSeriesChunk) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{11} +} +func (m *TimeSeriesChunk) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TimeSeriesChunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TimeSeriesChunk.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TimeSeriesChunk) XXX_Merge(src proto.Message) { + xxx_messageInfo_TimeSeriesChunk.Merge(m, src) +} +func (m *TimeSeriesChunk) XXX_Size() int { + return m.Size() +} +func (m *TimeSeriesChunk) XXX_DiscardUnknown() { + xxx_messageInfo_TimeSeriesChunk.DiscardUnknown(m) +} + +var xxx_messageInfo_TimeSeriesChunk proto.InternalMessageInfo + +func (m *TimeSeriesChunk) GetFromIngesterId() string { + if m != nil { + return m.FromIngesterId + } + return "" +} + +func (m *TimeSeriesChunk) GetUserId() string { + if m != nil { + return m.UserId + } + return "" +} + +func (m *TimeSeriesChunk) GetLabels() []*LabelPair { + if m != nil { + return m.Labels + } + return nil +} + +func (m *TimeSeriesChunk) GetChunks() []*Chunk { + if m != nil { + return m.Chunks + } + return nil +} + +type LabelPair struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *LabelPair) Reset() { *m = LabelPair{} } +func (*LabelPair) ProtoMessage() {} +func (*LabelPair) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{12} +} +func (m *LabelPair) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LabelPair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LabelPair.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LabelPair) XXX_Merge(src proto.Message) { + xxx_messageInfo_LabelPair.Merge(m, src) +} +func (m *LabelPair) XXX_Size() int { + return m.Size() +} +func (m *LabelPair) XXX_DiscardUnknown() { + xxx_messageInfo_LabelPair.DiscardUnknown(m) +} + +var xxx_messageInfo_LabelPair proto.InternalMessageInfo + +func (m *LabelPair) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *LabelPair) GetValue() string { + if m != nil { + return m.Value + } + return "" +} + +type Chunk struct { + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *Chunk) Reset() { *m = Chunk{} } +func (*Chunk) ProtoMessage() {} +func (*Chunk) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{13} +} +func (m *Chunk) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Chunk.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Chunk) XXX_Merge(src proto.Message) { + xxx_messageInfo_Chunk.Merge(m, src) +} +func (m *Chunk) XXX_Size() int { + return m.Size() +} +func (m *Chunk) XXX_DiscardUnknown() { + xxx_messageInfo_Chunk.DiscardUnknown(m) +} + +var xxx_messageInfo_Chunk proto.InternalMessageInfo + +func (m *Chunk) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +type TransferChunksResponse struct { +} + +func (m *TransferChunksResponse) Reset() { *m = TransferChunksResponse{} } +func (*TransferChunksResponse) ProtoMessage() {} +func (*TransferChunksResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_7a8976f235a02f79, []int{14} +} +func (m *TransferChunksResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *TransferChunksResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_TransferChunksResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *TransferChunksResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_TransferChunksResponse.Merge(m, src) +} +func (m *TransferChunksResponse) XXX_Size() int { + return m.Size() +} +func (m *TransferChunksResponse) XXX_DiscardUnknown() { + xxx_messageInfo_TransferChunksResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_TransferChunksResponse proto.InternalMessageInfo + func init() { proto.RegisterEnum("logproto.Direction", Direction_name, Direction_value) proto.RegisterType((*PushRequest)(nil), "logproto.PushRequest") @@ -669,60 +866,74 @@ func init() { proto.RegisterType((*TailRequest)(nil), "logproto.TailRequest") proto.RegisterType((*TailResponse)(nil), "logproto.TailResponse") proto.RegisterType((*DroppedStream)(nil), "logproto.DroppedStream") + proto.RegisterType((*TimeSeriesChunk)(nil), "logproto.TimeSeriesChunk") + proto.RegisterType((*LabelPair)(nil), "logproto.LabelPair") + proto.RegisterType((*Chunk)(nil), "logproto.Chunk") + proto.RegisterType((*TransferChunksResponse)(nil), "logproto.TransferChunksResponse") } func init() { proto.RegisterFile("logproto.proto", fileDescriptor_7a8976f235a02f79) } var fileDescriptor_7a8976f235a02f79 = []byte{ - // 754 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x4f, 0x4f, 0x13, 0x4f, - 0x18, 0xde, 0xe9, 0xff, 0xbe, 0xfd, 0x03, 0x99, 0xdf, 0x4f, 0x68, 0x1a, 0xb3, 0x6d, 0xf6, 0xa0, - 0x0d, 0x89, 0x45, 0x2b, 0x11, 0x45, 0x13, 0x43, 0x45, 0x62, 0xa2, 0x89, 0x3a, 0x90, 0x78, 0xde, - 0xd2, 0xa1, 0x6c, 0xb2, 0xed, 0x94, 0xdd, 0xa9, 0xb1, 0x37, 0x3f, 0x02, 0x37, 0x3f, 0x82, 0x9e, - 0xfc, 0x08, 0x9e, 0x39, 0x72, 0xe4, 0x54, 0x65, 0xb9, 0x18, 0x4e, 0xdc, 0xbc, 0x9a, 0xf9, 0xb3, - 0xdd, 0x05, 0x8c, 0x80, 0x97, 0x76, 0x9e, 0x99, 0xf7, 0x9d, 0x7d, 0x9f, 0x67, 0x9e, 0xf7, 0x85, - 0xb2, 0xcb, 0x7a, 0x43, 0x8f, 0x71, 0xd6, 0x94, 0xbf, 0x38, 0x17, 0xe2, 0x6a, 0xad, 0xc7, 0x58, - 0xcf, 0xa5, 0x8b, 0x12, 0x75, 0x46, 0xdb, 0x8b, 0xdc, 0xe9, 0x53, 0x9f, 0xdb, 0xfd, 0xa1, 0x0a, - 0xad, 0xde, 0xe9, 0x39, 0x7c, 0x67, 0xd4, 0x69, 0x6e, 0xb1, 0xfe, 0x62, 0x8f, 0xf5, 0x58, 0x14, - 0x29, 0x90, 0x04, 0x72, 0xa5, 0xc2, 0xad, 0x75, 0x28, 0xbc, 0x19, 0xf9, 0x3b, 0x84, 0xee, 0x8e, - 0xa8, 0xcf, 0xf1, 0x32, 0x64, 0x7d, 0xee, 0x51, 0xbb, 0xef, 0x57, 0x50, 0x3d, 0xd9, 0x28, 0xb4, - 0x66, 0x9b, 0xd3, 0x52, 0x36, 0xe4, 0x41, 0xbb, 0x70, 0x32, 0xa9, 0x85, 0x41, 0x24, 0x5c, 0x58, - 0x65, 0x28, 0xaa, 0x7b, 0xfc, 0x21, 0x1b, 0xf8, 0xd4, 0xfa, 0x85, 0xa0, 0xf8, 0x76, 0x44, 0xbd, - 0x71, 0x78, 0xf3, 0xff, 0x90, 0xde, 0x15, 0xb8, 0x82, 0xea, 0xa8, 0x91, 0x27, 0x0a, 0x88, 0x5d, - 0xd7, 0xe9, 0x3b, 0xbc, 0x92, 0xa8, 0xa3, 0x46, 0x89, 0x28, 0x80, 0x57, 0x20, 0xed, 0x73, 0xdb, - 0xe3, 0x95, 0x64, 0x1d, 0x35, 0x0a, 0xad, 0x6a, 0x53, 0x91, 0x6e, 0x86, 0x54, 0x9a, 0x9b, 0x21, - 0xe9, 0x76, 0x6e, 0x7f, 0x52, 0x33, 0xf6, 0xbe, 0xd7, 0x10, 0x51, 0x29, 0xf8, 0x01, 0x24, 0xe9, - 0xa0, 0x5b, 0x49, 0x5d, 0x23, 0x53, 0x24, 0xe0, 0x7b, 0x90, 0xef, 0x3a, 0x1e, 0xdd, 0xe2, 0x0e, - 0x1b, 0x54, 0xd2, 0x75, 0xd4, 0x28, 0xb7, 0xfe, 0x8b, 0xb8, 0xaf, 0x85, 0x47, 0x24, 0x8a, 0x12, - 0xc5, 0x7b, 0xb4, 0x47, 0x3f, 0x54, 0x32, 0x8a, 0x92, 0x04, 0xd6, 0x63, 0x28, 0x69, 0xe2, 0x4a, - 0x0a, 0xbc, 0x70, 0xa9, 0xa6, 0x91, 0x8c, 0x5f, 0x11, 0x14, 0x5f, 0xd9, 0x1d, 0xea, 0x86, 0xb2, - 0x61, 0x48, 0x0d, 0xec, 0x3e, 0xd5, 0xaa, 0xc9, 0x35, 0x9e, 0x83, 0xcc, 0x7b, 0xdb, 0x1d, 0x51, - 0x5f, 0xaa, 0x96, 0x23, 0x1a, 0x5d, 0x57, 0x36, 0xf4, 0xcf, 0xb2, 0xa1, 0xa9, 0x6c, 0xd6, 0x6d, - 0x28, 0xe9, 0x7a, 0x35, 0xdb, 0xa8, 0x38, 0x41, 0x36, 0x1f, 0x16, 0x67, 0xed, 0x40, 0x46, 0x91, - 0xc5, 0x16, 0x64, 0x5c, 0x91, 0xe2, 0x2b, 0x52, 0x6d, 0x38, 0x99, 0xd4, 0xf4, 0x0e, 0xd1, 0xff, - 0x78, 0x05, 0xb2, 0x74, 0xc0, 0x3d, 0x47, 0x72, 0x14, 0x9a, 0xcd, 0x44, 0x9a, 0x3d, 0x1f, 0x70, - 0x6f, 0xdc, 0x9e, 0x11, 0xcf, 0x27, 0xac, 0xa8, 0xe3, 0x48, 0xb8, 0xb0, 0x18, 0xa4, 0x65, 0x08, - 0x7e, 0x01, 0xf9, 0x69, 0x77, 0xc8, 0x6f, 0xfd, 0x9d, 0x59, 0x59, 0xdf, 0x98, 0xe0, 0xbe, 0xe4, - 0x17, 0x25, 0xe3, 0x9b, 0x90, 0x72, 0x9d, 0x01, 0x95, 0x7a, 0xe7, 0xdb, 0xb9, 0x93, 0x49, 0x4d, - 0x62, 0x22, 0x7f, 0xad, 0xcf, 0x08, 0x0a, 0x9b, 0xb6, 0xe3, 0x5e, 0x6a, 0x75, 0xe5, 0x96, 0x44, - 0xcc, 0x2d, 0xb8, 0x0a, 0xb9, 0x2e, 0x75, 0xed, 0xf1, 0x3a, 0xf3, 0xe4, 0xb3, 0x95, 0xc8, 0x14, - 0x47, 0xcd, 0x91, 0xfa, 0x63, 0x73, 0xa4, 0xaf, 0xdd, 0x1c, 0xd6, 0x18, 0x8a, 0xaa, 0x50, 0xfd, - 0x58, 0x0d, 0xc8, 0x28, 0xe7, 0x69, 0x79, 0x2e, 0x3a, 0x53, 0x9f, 0xe3, 0xa7, 0x50, 0xee, 0x7a, - 0x6c, 0x38, 0xa4, 0xdd, 0x0d, 0xed, 0x65, 0xf5, 0x2e, 0xf3, 0xb1, 0x1e, 0x89, 0x9f, 0x93, 0x73, - 0xe1, 0xd6, 0x27, 0x04, 0xa5, 0x33, 0x11, 0xf8, 0x21, 0xa4, 0xb6, 0x3d, 0xd6, 0xbf, 0xc2, 0xcb, - 0x44, 0x3c, 0x64, 0x06, 0x5e, 0x82, 0x04, 0x67, 0x52, 0xc7, 0xab, 0xe6, 0x25, 0x38, 0x13, 0xce, - 0xd4, 0xbe, 0x4b, 0xca, 0x17, 0xd0, 0x68, 0xe1, 0x16, 0xe4, 0xa7, 0xed, 0x8d, 0x0b, 0x90, 0x5d, - 0x7f, 0x4d, 0xde, 0xad, 0x92, 0xb5, 0x59, 0x03, 0x17, 0x21, 0xd7, 0x5e, 0x7d, 0xf6, 0x52, 0x22, - 0xd4, 0x5a, 0x85, 0x8c, 0x18, 0x71, 0xd4, 0xc3, 0xcb, 0x90, 0x12, 0x2b, 0x7c, 0x23, 0x22, 0x1f, - 0x1b, 0xa2, 0xd5, 0xb9, 0xf3, 0xdb, 0x7a, 0x26, 0x1a, 0xad, 0x6f, 0x08, 0xb2, 0x62, 0x38, 0x38, - 0xd4, 0xc3, 0x4f, 0x20, 0x2d, 0xe7, 0x04, 0x8e, 0x85, 0xc7, 0x27, 0x66, 0x75, 0xfe, 0xc2, 0x7e, - 0x78, 0xcf, 0x5d, 0x24, 0x5c, 0x20, 0xfb, 0x2e, 0x9e, 0x1d, 0x1f, 0x1c, 0xf1, 0xec, 0x33, 0x0d, - 0x6a, 0x19, 0xf8, 0x11, 0xa4, 0x84, 0x0b, 0xe2, 0xe5, 0xc7, 0xec, 0x1b, 0x2f, 0x3f, 0x6e, 0x16, - 0xf1, 0xd9, 0xf6, 0xd2, 0xc1, 0x91, 0x69, 0x1c, 0x1e, 0x99, 0xc6, 0xe9, 0x91, 0x89, 0x3e, 0x06, - 0x26, 0xfa, 0x12, 0x98, 0x68, 0x3f, 0x30, 0xd1, 0x41, 0x60, 0xa2, 0x1f, 0x81, 0x89, 0x7e, 0x06, - 0xa6, 0x71, 0x1a, 0x98, 0x68, 0xef, 0xd8, 0x34, 0x0e, 0x8e, 0x4d, 0xe3, 0xf0, 0xd8, 0x34, 0x3a, - 0x19, 0x79, 0xdb, 0xfd, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x25, 0xbd, 0x8d, 0xbf, 0xd7, 0x06, - 0x00, 0x00, + // 916 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x6f, 0x1b, 0x45, + 0x14, 0xdf, 0x71, 0xd6, 0x6b, 0xfb, 0xf9, 0x4f, 0xa2, 0xa1, 0x24, 0x8b, 0x41, 0x6b, 0x6b, 0x0e, + 0xd4, 0x2a, 0xc2, 0x01, 0x53, 0x28, 0x14, 0x24, 0x14, 0xb7, 0x44, 0x44, 0x20, 0xd1, 0x6e, 0x22, + 0x71, 0x42, 0xd5, 0x26, 0x3b, 0x71, 0x56, 0xac, 0x77, 0xdd, 0xd9, 0x59, 0x44, 0x6e, 0x7c, 0x84, + 0xde, 0xf8, 0x08, 0x20, 0x0e, 0x7c, 0x04, 0xce, 0x3d, 0xe6, 0xd8, 0x53, 0x20, 0xce, 0x05, 0xe5, + 0xd4, 0x1b, 0x57, 0x34, 0x7f, 0xd6, 0x3b, 0x4e, 0x23, 0xda, 0x70, 0xb1, 0xe7, 0xcd, 0xbc, 0x37, + 0xf3, 0x7e, 0xbf, 0xf7, 0x7b, 0x6f, 0xa1, 0x13, 0xa7, 0x93, 0x19, 0x4b, 0x79, 0x3a, 0x94, 0xbf, + 0xb8, 0x5e, 0xd8, 0xdd, 0xde, 0x24, 0x4d, 0x27, 0x31, 0xdd, 0x94, 0xd6, 0x7e, 0x7e, 0xb8, 0xc9, + 0xa3, 0x29, 0xcd, 0x78, 0x30, 0x9d, 0x29, 0xd7, 0xee, 0xbb, 0x93, 0x88, 0x1f, 0xe5, 0xfb, 0xc3, + 0x83, 0x74, 0xba, 0x39, 0x49, 0x27, 0x69, 0xe9, 0x29, 0x2c, 0x69, 0xc8, 0x95, 0x72, 0x27, 0xdb, + 0xd0, 0x7c, 0x90, 0x67, 0x47, 0x3e, 0x7d, 0x9c, 0xd3, 0x8c, 0xe3, 0x3b, 0x50, 0xcb, 0x38, 0xa3, + 0xc1, 0x34, 0x73, 0x51, 0x7f, 0x65, 0xd0, 0x1c, 0xad, 0x0d, 0x17, 0xa9, 0xec, 0xca, 0x83, 0x71, + 0xf3, 0xe2, 0xb4, 0x57, 0x38, 0xf9, 0xc5, 0x82, 0x74, 0xa0, 0xa5, 0xee, 0xc9, 0x66, 0x69, 0x92, + 0x51, 0xf2, 0x0f, 0x82, 0xd6, 0xc3, 0x9c, 0xb2, 0xe3, 0xe2, 0xe6, 0x1b, 0x50, 0x7d, 0x2c, 0x6c, + 0x17, 0xf5, 0xd1, 0xa0, 0xe1, 0x2b, 0x43, 0xec, 0xc6, 0xd1, 0x34, 0xe2, 0x6e, 0xa5, 0x8f, 0x06, + 0x6d, 0x5f, 0x19, 0xf8, 0x2e, 0x54, 0x33, 0x1e, 0x30, 0xee, 0xae, 0xf4, 0xd1, 0xa0, 0x39, 0xea, + 0x0e, 0x15, 0xe8, 0x61, 0x01, 0x65, 0xb8, 0x57, 0x80, 0x1e, 0xd7, 0x9f, 0x9e, 0xf6, 0xac, 0x27, + 0x7f, 0xf6, 0x90, 0xaf, 0x42, 0xf0, 0x47, 0xb0, 0x42, 0x93, 0xd0, 0xb5, 0xaf, 0x11, 0x29, 0x02, + 0xf0, 0xfb, 0xd0, 0x08, 0x23, 0x46, 0x0f, 0x78, 0x94, 0x26, 0x6e, 0xb5, 0x8f, 0x06, 0x9d, 0xd1, + 0x6b, 0x25, 0xf6, 0xfb, 0xc5, 0x91, 0x5f, 0x7a, 0x89, 0xe4, 0x19, 0x9d, 0xd0, 0x1f, 0x5d, 0x47, + 0x41, 0x92, 0x06, 0xf9, 0x14, 0xda, 0x1a, 0xb8, 0xa2, 0x02, 0xdf, 0x7a, 0x29, 0xa7, 0x25, 0x8d, + 0xbf, 0x23, 0x68, 0x7d, 0x1d, 0xec, 0xd3, 0xb8, 0xa0, 0x0d, 0x83, 0x9d, 0x04, 0x53, 0xaa, 0x59, + 0x93, 0x6b, 0xbc, 0x0e, 0xce, 0x0f, 0x41, 0x9c, 0xd3, 0x4c, 0xb2, 0x56, 0xf7, 0xb5, 0x75, 0x5d, + 0xda, 0xd0, 0xff, 0xa6, 0x0d, 0x2d, 0x68, 0x23, 0x37, 0xa1, 0xad, 0xf3, 0xd5, 0x68, 0xcb, 0xe4, + 0x04, 0xd8, 0x46, 0x91, 0x1c, 0x39, 0x02, 0x47, 0x81, 0xc5, 0x04, 0x9c, 0x58, 0x84, 0x64, 0x0a, + 0xd4, 0x18, 0x2e, 0x4e, 0x7b, 0x7a, 0xc7, 0xd7, 0xff, 0xf8, 0x2e, 0xd4, 0x68, 0xc2, 0x59, 0x24, + 0x31, 0x0a, 0xce, 0x56, 0x4b, 0xce, 0xbe, 0x48, 0x38, 0x3b, 0x1e, 0xaf, 0x8a, 0xf2, 0x09, 0x29, + 0x6a, 0x3f, 0xbf, 0x58, 0x90, 0x14, 0xaa, 0xd2, 0x05, 0x7f, 0x09, 0x8d, 0x45, 0x77, 0xc8, 0xb7, + 0xfe, 0x1b, 0x59, 0x47, 0xdf, 0x58, 0xe1, 0x99, 0xc4, 0x57, 0x06, 0xe3, 0xb7, 0xc0, 0x8e, 0xa3, + 0x84, 0x4a, 0xbe, 0x1b, 0xe3, 0xfa, 0xc5, 0x69, 0x4f, 0xda, 0xbe, 0xfc, 0x25, 0xbf, 0x20, 0x68, + 0xee, 0x05, 0x51, 0xfc, 0x52, 0xa9, 0x2b, 0xb5, 0x54, 0x0c, 0xb5, 0xe0, 0x2e, 0xd4, 0x43, 0x1a, + 0x07, 0xc7, 0xdb, 0x29, 0x93, 0x65, 0x6b, 0xfb, 0x0b, 0xbb, 0x6c, 0x0e, 0xfb, 0xca, 0xe6, 0xa8, + 0x5e, 0xbb, 0x39, 0xc8, 0x31, 0xb4, 0x54, 0xa2, 0xba, 0x58, 0x03, 0x70, 0x94, 0xf2, 0x34, 0x3d, + 0x2f, 0x2a, 0x53, 0x9f, 0xe3, 0xcf, 0xa1, 0x13, 0xb2, 0x74, 0x36, 0xa3, 0xe1, 0xae, 0xd6, 0xb2, + 0xaa, 0xcb, 0x86, 0xd1, 0x23, 0xe6, 0xb9, 0x7f, 0xc9, 0x9d, 0xfc, 0x8c, 0xa0, 0xbd, 0xe4, 0x81, + 0x3f, 0x06, 0xfb, 0x90, 0xa5, 0xd3, 0x57, 0xa8, 0x4c, 0x89, 0x43, 0x46, 0xe0, 0xdb, 0x50, 0xe1, + 0xa9, 0xe4, 0xf1, 0x55, 0xe3, 0x2a, 0x3c, 0x15, 0xca, 0xd4, 0xba, 0x5b, 0x91, 0x15, 0xd0, 0x16, + 0xf9, 0x0d, 0xc1, 0xaa, 0x88, 0xd9, 0xa5, 0x42, 0x3e, 0xf7, 0x8e, 0xf2, 0xe4, 0x7b, 0x3c, 0x80, + 0x35, 0xf1, 0xd2, 0xa3, 0x28, 0x99, 0xd0, 0x8c, 0x53, 0xf6, 0x28, 0x0a, 0x75, 0x35, 0x3b, 0x62, + 0x7f, 0x47, 0x6f, 0xef, 0x84, 0x78, 0x03, 0x6a, 0x79, 0xa6, 0x1c, 0x54, 0x61, 0x1d, 0x61, 0xee, + 0x84, 0xf8, 0x1d, 0xe3, 0x39, 0xc1, 0x94, 0x31, 0x4d, 0x64, 0xc7, 0x3c, 0x08, 0x22, 0xb6, 0xd0, + 0xfb, 0x4d, 0x70, 0x0e, 0xc4, 0xc3, 0x99, 0x6b, 0x5f, 0x96, 0xbb, 0x4c, 0xc8, 0xd7, 0xc7, 0xe4, + 0x43, 0x68, 0x2c, 0xa2, 0xaf, 0x1c, 0x0e, 0x37, 0xa0, 0x2a, 0x3b, 0xae, 0x90, 0x99, 0x34, 0xc8, + 0x9b, 0x50, 0x55, 0xc0, 0x30, 0xd8, 0x61, 0xc0, 0x03, 0x19, 0xd2, 0xf2, 0xe5, 0x9a, 0xb8, 0xb0, + 0xbe, 0xc7, 0x82, 0x24, 0x3b, 0xa4, 0x4c, 0x3a, 0x65, 0x85, 0x3e, 0x6e, 0xbd, 0x0d, 0x8d, 0xc5, + 0xe4, 0xc3, 0x4d, 0xa8, 0x6d, 0x7f, 0xe3, 0x7f, 0xbb, 0xe5, 0xdf, 0x5f, 0xb3, 0x70, 0x0b, 0xea, + 0xe3, 0xad, 0x7b, 0x5f, 0x49, 0x0b, 0x8d, 0xb6, 0xc0, 0x11, 0xd3, 0x9f, 0x32, 0x7c, 0x07, 0x6c, + 0xb1, 0xc2, 0xaf, 0x97, 0x00, 0x8c, 0xef, 0x4b, 0x77, 0xfd, 0xf2, 0xb6, 0xfe, 0x5c, 0x58, 0xa3, + 0x3f, 0x10, 0xd4, 0xc4, 0xdc, 0x8c, 0x28, 0xc3, 0x9f, 0x41, 0x55, 0x8e, 0x50, 0x6c, 0xb8, 0x9b, + 0x1f, 0x93, 0xee, 0xc6, 0x0b, 0xfb, 0xc5, 0x3d, 0xef, 0x21, 0xd1, 0x20, 0x92, 0x22, 0x33, 0xda, + 0x9c, 0xa9, 0x66, 0xf4, 0xd2, 0xec, 0x22, 0x16, 0xfe, 0x04, 0x6c, 0xd1, 0x20, 0x66, 0xfa, 0x46, + 0x67, 0x9b, 0xe9, 0x9b, 0x7d, 0x24, 0x9e, 0x1d, 0x7d, 0x07, 0xf5, 0x42, 0x16, 0xf8, 0x21, 0x74, + 0x96, 0x19, 0xc5, 0x6f, 0x18, 0x91, 0xcb, 0x5a, 0xeb, 0xf6, 0x8d, 0xa3, 0x2b, 0xcb, 0x40, 0xac, + 0x01, 0x1a, 0xdf, 0x3e, 0x39, 0xf3, 0xac, 0x67, 0x67, 0x9e, 0xf5, 0xfc, 0xcc, 0x43, 0x3f, 0xcd, + 0x3d, 0xf4, 0xeb, 0xdc, 0x43, 0x4f, 0xe7, 0x1e, 0x3a, 0x99, 0x7b, 0xe8, 0xaf, 0xb9, 0x87, 0xfe, + 0x9e, 0x7b, 0xd6, 0xf3, 0xb9, 0x87, 0x9e, 0x9c, 0x7b, 0xd6, 0xc9, 0xb9, 0x67, 0x3d, 0x3b, 0xf7, + 0xac, 0x7d, 0x47, 0xde, 0xfb, 0xc1, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe3, 0xdf, 0x07, 0x94, + 0x51, 0x08, 0x00, 0x00, } func (x Direction) String() string { @@ -1077,6 +1288,121 @@ func (this *DroppedStream) Equal(that interface{}) bool { } return true } +func (this *TimeSeriesChunk) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TimeSeriesChunk) + if !ok { + that2, ok := that.(TimeSeriesChunk) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.FromIngesterId != that1.FromIngesterId { + return false + } + if this.UserId != that1.UserId { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if !this.Labels[i].Equal(that1.Labels[i]) { + return false + } + } + if len(this.Chunks) != len(that1.Chunks) { + return false + } + for i := range this.Chunks { + if !this.Chunks[i].Equal(that1.Chunks[i]) { + return false + } + } + return true +} +func (this *LabelPair) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LabelPair) + if !ok { + that2, ok := that.(LabelPair) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Name != that1.Name { + return false + } + if this.Value != that1.Value { + return false + } + return true +} +func (this *Chunk) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Chunk) + if !ok { + that2, ok := that.(Chunk) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !bytes.Equal(this.Data, that1.Data) { + return false + } + return true +} +func (this *TransferChunksResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*TransferChunksResponse) + if !ok { + that2, ok := that.(TransferChunksResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} func (this *PushRequest) GoString() string { if this == nil { return "nil" @@ -1217,6 +1543,53 @@ func (this *DroppedStream) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *TimeSeriesChunk) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&logproto.TimeSeriesChunk{") + s = append(s, "FromIngesterId: "+fmt.Sprintf("%#v", this.FromIngesterId)+",\n") + s = append(s, "UserId: "+fmt.Sprintf("%#v", this.UserId)+",\n") + if this.Labels != nil { + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + } + if this.Chunks != nil { + s = append(s, "Chunks: "+fmt.Sprintf("%#v", this.Chunks)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *LabelPair) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&logproto.LabelPair{") + s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") + s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Chunk) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logproto.Chunk{") + s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *TransferChunksResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&logproto.TransferChunksResponse{") + s = append(s, "}") + return strings.Join(s, "") +} func valueToGoStringLogproto(v interface{}, typ string) string { rv := reflect.ValueOf(v) if rv.IsNil() { @@ -1505,17 +1878,123 @@ var _Querier_serviceDesc = grpc.ServiceDesc{ Metadata: "logproto.proto", } -func (m *PushRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil +// IngesterClient is the client API for Ingester service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type IngesterClient interface { + TransferChunks(ctx context.Context, opts ...grpc.CallOption) (Ingester_TransferChunksClient, error) } -func (m *PushRequest) MarshalTo(dAtA []byte) (int, error) { +type ingesterClient struct { + cc *grpc.ClientConn +} + +func NewIngesterClient(cc *grpc.ClientConn) IngesterClient { + return &ingesterClient{cc} +} + +func (c *ingesterClient) TransferChunks(ctx context.Context, opts ...grpc.CallOption) (Ingester_TransferChunksClient, error) { + stream, err := c.cc.NewStream(ctx, &_Ingester_serviceDesc.Streams[0], "/logproto.Ingester/TransferChunks", opts...) + if err != nil { + return nil, err + } + x := &ingesterTransferChunksClient{stream} + return x, nil +} + +type Ingester_TransferChunksClient interface { + Send(*TimeSeriesChunk) error + CloseAndRecv() (*TransferChunksResponse, error) + grpc.ClientStream +} + +type ingesterTransferChunksClient struct { + grpc.ClientStream +} + +func (x *ingesterTransferChunksClient) Send(m *TimeSeriesChunk) error { + return x.ClientStream.SendMsg(m) +} + +func (x *ingesterTransferChunksClient) CloseAndRecv() (*TransferChunksResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(TransferChunksResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// IngesterServer is the server API for Ingester service. +type IngesterServer interface { + TransferChunks(Ingester_TransferChunksServer) error +} + +// UnimplementedIngesterServer can be embedded to have forward compatible implementations. +type UnimplementedIngesterServer struct { +} + +func (*UnimplementedIngesterServer) TransferChunks(srv Ingester_TransferChunksServer) error { + return status.Errorf(codes.Unimplemented, "method TransferChunks not implemented") +} + +func RegisterIngesterServer(s *grpc.Server, srv IngesterServer) { + s.RegisterService(&_Ingester_serviceDesc, srv) +} + +func _Ingester_TransferChunks_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(IngesterServer).TransferChunks(&ingesterTransferChunksServer{stream}) +} + +type Ingester_TransferChunksServer interface { + SendAndClose(*TransferChunksResponse) error + Recv() (*TimeSeriesChunk, error) + grpc.ServerStream +} + +type ingesterTransferChunksServer struct { + grpc.ServerStream +} + +func (x *ingesterTransferChunksServer) SendAndClose(m *TransferChunksResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *ingesterTransferChunksServer) Recv() (*TimeSeriesChunk, error) { + m := new(TimeSeriesChunk) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _Ingester_serviceDesc = grpc.ServiceDesc{ + ServiceName: "logproto.Ingester", + HandlerType: (*IngesterServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "TransferChunks", + Handler: _Ingester_TransferChunks_Handler, + ClientStreams: true, + }, + }, + Metadata: "logproto.proto", +} + +func (m *PushRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PushRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -1922,6 +2401,132 @@ func (m *DroppedStream) MarshalTo(dAtA []byte) (int, error) { return i, nil } +func (m *TimeSeriesChunk) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TimeSeriesChunk) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.FromIngesterId) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.FromIngesterId))) + i += copy(dAtA[i:], m.FromIngesterId) + } + if len(m.UserId) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.UserId))) + i += copy(dAtA[i:], m.UserId) + } + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0x1a + i++ + i = encodeVarintLogproto(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Chunks) > 0 { + for _, msg := range m.Chunks { + dAtA[i] = 0x22 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *LabelPair) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LabelPair) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Name) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) + } + if len(m.Value) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Value))) + i += copy(dAtA[i:], m.Value) + } + return i, nil +} + +func (m *Chunk) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Chunk) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Data) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintLogproto(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) + } + return i, nil +} + +func (m *TransferChunksResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TransferChunksResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + func encodeVarintLogproto(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -2131,6 +2736,74 @@ func (m *DroppedStream) Size() (n int) { return n } +func (m *TimeSeriesChunk) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.FromIngesterId) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + l = len(m.UserId) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovLogproto(uint64(l)) + } + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovLogproto(uint64(l)) + } + } + return n +} + +func (m *LabelPair) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + l = len(m.Value) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + return n +} + +func (m *Chunk) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Data) + if l > 0 { + n += 1 + l + sovLogproto(uint64(l)) + } + return n +} + +func (m *TransferChunksResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovLogproto(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -2283,33 +2956,86 @@ func (this *DroppedStream) String() string { }, "") return s } -func valueToStringLogproto(v interface{}) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { +func (this *TimeSeriesChunk) String() string { + if this == nil { return "nil" } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("*%v", pv) + repeatedStringForLabels := "[]*LabelPair{" + for _, f := range this.Labels { + repeatedStringForLabels += strings.Replace(f.String(), "LabelPair", "LabelPair", 1) + "," + } + repeatedStringForLabels += "}" + repeatedStringForChunks := "[]*Chunk{" + for _, f := range this.Chunks { + repeatedStringForChunks += strings.Replace(f.String(), "Chunk", "Chunk", 1) + "," + } + repeatedStringForChunks += "}" + s := strings.Join([]string{`&TimeSeriesChunk{`, + `FromIngesterId:` + fmt.Sprintf("%v", this.FromIngesterId) + `,`, + `UserId:` + fmt.Sprintf("%v", this.UserId) + `,`, + `Labels:` + repeatedStringForLabels + `,`, + `Chunks:` + repeatedStringForChunks + `,`, + `}`, + }, "") + return s } -func (m *PushRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowLogproto - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } +func (this *LabelPair) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LabelPair{`, + `Name:` + fmt.Sprintf("%v", this.Name) + `,`, + `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `}`, + }, "") + return s +} +func (this *Chunk) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Chunk{`, + `Data:` + fmt.Sprintf("%v", this.Data) + `,`, + `}`, + }, "") + return s +} +func (this *TransferChunksResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&TransferChunksResponse{`, + `}`, + }, "") + return s +} +func valueToStringLogproto(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *PushRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } } fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) @@ -3700,6 +4426,448 @@ func (m *DroppedStream) Unmarshal(dAtA []byte) error { } return nil } +func (m *TimeSeriesChunk) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TimeSeriesChunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TimeSeriesChunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FromIngesterId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.FromIngesterId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UserId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UserId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, &LabelPair{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, &Chunk{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LabelPair) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LabelPair: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LabelPair: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Chunk) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Chunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Chunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthLogproto + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthLogproto + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *TransferChunksResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TransferChunksResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TransferChunksResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipLogproto(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogproto + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipLogproto(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index 016e8f0b9fc9..b520e162c2d9 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -15,6 +15,10 @@ service Querier { rpc Tail(TailRequest) returns (stream TailResponse) {}; } +service Ingester { + rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}; +} + message PushRequest { repeated Stream streams = 1 [(gogoproto.jsontag) = "streams"]; } @@ -79,3 +83,23 @@ message DroppedStream { google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; string labels = 3; } + +message TimeSeriesChunk { + string from_ingester_id = 1; + string user_id = 2; + repeated LabelPair labels = 3; + repeated Chunk chunks = 4; +} + +message LabelPair { + string name = 1; + string value = 2; +} + +message Chunk { + bytes data = 1; +} + +message TransferChunksResponse { + +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b5f05aef791f..7e9c97919bc9 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -151,7 +151,7 @@ func (t *Loki) initQuerier() (err error) { func (t *Loki) initIngester() (err error) { t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort - t.ingester, err = ingester.New(t.cfg.Ingester, t.store) + t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store) if err != nil { return } From 2eb666b0a8b53fb24e315b29a84c2bfdffa33532 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 22 Jul 2019 14:04:01 -0400 Subject: [PATCH 2/6] ingester: fix lint issues for chunk transfers --- pkg/ingester/transfer.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 9a4872371051..52f264cc9442 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -165,10 +165,8 @@ func (i *Ingester) transferOut(ctx context.Context) error { ctx = user.InjectOrgID(ctx, "-1") stream, err := ic.TransferChunks(ctx) - - _, err = stream.CloseAndRecv() if err != nil { - return errors.Wrap(err, "CloseAndRecv") + return errors.Wrap(err, "TransferChunks") } for instanceID, inst := range i.instances { @@ -191,17 +189,26 @@ func (i *Ingester) transferOut(ctx context.Context) error { lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) } - stream.Send(&logproto.TimeSeriesChunk{ + err := stream.Send(&logproto.TimeSeriesChunk{ Chunks: chunks, UserId: instanceID, Labels: lbls, FromIngesterId: i.lifecycler.ID, }) + if err != nil { + level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) + return err + } sentChunks.Add(float64(len(chunks))) } } + _, err = stream.CloseAndRecv() + if err != nil { + return errors.Wrap(err, "CloseAndRecv") + } + for _, flushQueue := range i.flushQueues { flushQueue.DiscardAndClose() } From f8155a1109fd6196ee7e362cdf8408b6abcb947a Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 22 Jul 2019 16:52:55 -0400 Subject: [PATCH 3/6] ingester: Add test for chunk transfers --- pkg/ingester/transfer_test.go | 210 ++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 pkg/ingester/transfer_test.go diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go new file mode 100644 index 000000000000..b0ea3f0d8ba2 --- /dev/null +++ b/pkg/ingester/transfer_test.go @@ -0,0 +1,210 @@ +package ingester + +import ( + "fmt" + "io" + "io/ioutil" + "sort" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logproto" + "golang.org/x/net/context" +) + +func TestTransferOut(t *testing.T) { + f := newTestIngesterFactory(t) + + ing := f.getIngester(time.Duration(0)) + + // Push some data into our original ingester + ctx := user.InjectOrgID(context.Background(), "test") + _, err := ing.Push(ctx, &logproto.PushRequest{ + Streams: []*logproto.Stream{ + { + Entries: []logproto.Entry{ + {Line: "line 0", Timestamp: time.Unix(0, 0)}, + {Line: "line 1", Timestamp: time.Unix(1, 0)}, + }, + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Entries: []logproto.Entry{ + {Line: "line 2", Timestamp: time.Unix(2, 0)}, + {Line: "line 3", Timestamp: time.Unix(3, 0)}, + }, + Labels: `{foo="bar",bar="baz2"}`, + }, + }, + }) + require.NoError(t, err) + + assert.Len(t, ing.instances, 1) + if assert.Contains(t, ing.instances, "test") { + assert.Len(t, ing.instances["test"].streams, 2) + } + + // Create a new ingester and trasfer data to it + ing2 := f.getIngester(time.Second * 60) + ing.Shutdown() + + assert.Len(t, ing2.instances, 1) + if assert.Contains(t, ing2.instances, "test") { + assert.Len(t, ing2.instances["test"].streams, 2) + + lines := []string{} + + // Get all the lines back and make sure the blocks transferred successfully + for _, stream := range ing2.instances["test"].streams { + it, err := stream.Iterator( + time.Unix(0, 0), + time.Unix(10, 0), + logproto.FORWARD, + func([]byte) bool { return true }, + ) + if !assert.NoError(t, err) { + continue + } + + for it.Next() { + entry := it.Entry() + lines = append(lines, entry.Line) + } + } + + sort.Strings(lines) + + assert.Equal( + t, + []string{"line 0", "line 1", "line 2", "line 3"}, + lines, + ) + } +} + +type testIngesterFactory struct { + t *testing.T + store ring.KVClient + n int + ingesters map[string]*Ingester +} + +func newTestIngesterFactory(t *testing.T) *testIngesterFactory { + return &testIngesterFactory{ + t: t, + store: ring.NewInMemoryKVClient(ring.ProtoCodec{Factory: ring.ProtoDescFactory}), + ingesters: make(map[string]*Ingester), + } +} + +func (f *testIngesterFactory) getIngester(joinAfter time.Duration) *Ingester { + f.n++ + + cfg := defaultIngesterTestConfig() + cfg.MaxTransferRetries = 1 + cfg.LifecyclerConfig.ClaimOnRollout = true + cfg.LifecyclerConfig.ID = fmt.Sprintf("localhost-%d", f.n) + cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store + cfg.LifecyclerConfig.JoinAfter = joinAfter + cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID + + cfg.ingesterClientFactory = func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) { + ingester, ok := f.ingesters[addr] + if !ok { + return nil, fmt.Errorf("no ingester %s", addr) + } + + return struct { + logproto.PusherClient + logproto.QuerierClient + logproto.IngesterClient + grpc_health_v1.HealthClient + io.Closer + }{ + PusherClient: nil, + QuerierClient: nil, + IngesterClient: &testIngesterClient{t: f.t, i: ingester}, + Closer: ioutil.NopCloser(nil), + }, nil + } + + _, ing := newTestStore(f.t, cfg) + f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing + + // NB there's some kind of race condition with the in-memory KV client when + // we don't give the ingester a little bit of time to initialize. a 100ms + // wait time seems effective. + time.Sleep(time.Millisecond * 100) + return ing +} + +type testIngesterClient struct { + t *testing.T + i *Ingester +} + +func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) { + chunkCh := make(chan *logproto.TimeSeriesChunk) + respCh := make(chan *logproto.TransferChunksResponse) + + client := &testTransferChunksClient{ch: chunkCh, resp: respCh} + go func() { + server := &testTransferChunksServer{ch: chunkCh, resp: respCh} + err := c.i.TransferChunks(server) + require.NoError(c.t, err) + }() + + return client, nil +} + +type testTransferChunksClient struct { + ch chan *logproto.TimeSeriesChunk + resp chan *logproto.TransferChunksResponse + + grpc.ClientStream +} + +func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error { + c.ch <- chunk + return nil +} + +func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) { + close(c.ch) + resp := <-c.resp + close(c.resp) + return resp, nil +} + +type testTransferChunksServer struct { + ch chan *logproto.TimeSeriesChunk + resp chan *logproto.TransferChunksResponse + + grpc.ServerStream +} + +func (s *testTransferChunksServer) Context() context.Context { + return context.Background() +} + +func (s *testTransferChunksServer) SendAndClose(resp *logproto.TransferChunksResponse) error { + s.resp <- resp + return nil +} + +func (s *testTransferChunksServer) Recv() (*logproto.TimeSeriesChunk, error) { + chunk, ok := <-s.ch + if !ok { + return nil, io.EOF + } + return chunk, nil +} From 15cbba724205289b93923b2e9f1699be378bff47 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 23 Jul 2019 10:17:59 -0400 Subject: [PATCH 4/6] ingester: clean up chunk transfer code --- pkg/ingester/flush.go | 2 +- pkg/ingester/ingester.go | 26 ++++++++++++-------------- pkg/ingester/transfer.go | 2 +- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index b23fdd2ea402..d45c11bea637 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -166,7 +166,7 @@ func (i *Ingester) flushLoop(j int) { } func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { - instance, ok, _ := i.getInstanceByID(userID) + instance, ok := i.getInstanceByID(userID) if !ok { return nil } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cf6f9b63ecb1..d1d77b42a57e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -162,21 +162,19 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro instanceID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err - } - - instance, readonly := i.getOrCreateInstance(instanceID) - if readonly { + } else if i.readonly { return nil, ErrReadOnly } + instance := i.getOrCreateInstance(instanceID) err = instance.Push(ctx, req) return &logproto.PushResponse{}, err } -func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, readonly bool) { - inst, ok, readonly := i.getInstanceByID(instanceID) - if ok || readonly { - return inst, readonly +func (i *Ingester) getOrCreateInstance(instanceID string) *instance { + inst, ok := i.getInstanceByID(instanceID) + if ok || i.readonly { + return inst } i.instancesMtx.Lock() @@ -186,7 +184,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, r inst = newInstance(instanceID, i.cfg.BlockSize) i.instances[instanceID] = inst } - return inst, i.readonly + return inst } // Query the ingests for log streams matching a set of matchers. @@ -196,7 +194,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } - instance, _ := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(instanceID) return instance.Query(req, queryServer) } @@ -207,7 +205,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp return nil, err } - instance, _ := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(instanceID) return instance.Label(ctx, req) } @@ -236,12 +234,12 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { } } -func (i *Ingester) getInstanceByID(id string) (instance *instance, ok bool, readonly bool) { +func (i *Ingester) getInstanceByID(id string) (*instance, bool) { i.instancesMtx.RLock() defer i.instancesMtx.RUnlock() inst, ok := i.instances[id] - return inst, ok, i.readonly + return inst, ok } func (i *Ingester) getInstances() []*instance { @@ -268,7 +266,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ return err } - instance, _ := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(instanceID) tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer) if err != nil { return err diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 52f264cc9442..c329dab083b4 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -82,7 +82,7 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value}) } - instance, _ := i.getOrCreateInstance(chunkSet.UserId) + instance := i.getOrCreateInstance(chunkSet.UserId) for _, chunk := range chunkSet.Chunks { if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil { return err From e4e5e3c1f136045d91e93fcff68c756c745576fc Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 25 Jul 2019 16:48:09 -0400 Subject: [PATCH 5/6] ingester: fix feedback from PR review --- pkg/ingester/ingester.go | 2 +- pkg/ingester/transfer.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d1d77b42a57e..cc78087dd5d2 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -173,7 +173,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro func (i *Ingester) getOrCreateInstance(instanceID string) *instance { inst, ok := i.getInstanceByID(instanceID) - if ok || i.readonly { + if ok { return inst } diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index c329dab083b4..a249ef2a0b27 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -29,7 +29,7 @@ var ( }) ) -// TransferChunks receieves all chunks from another ingester. The Ingester +// TransferChunks receives all chunks from another ingester. The Ingester // must be in PENDING state or else the call will fail. func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { // Entry JOINING state (only valid from PENDING) From e64c7e5faf2db42080310624166cf6d9bfb93e8e Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 25 Jul 2019 16:57:42 -0400 Subject: [PATCH 6/6] ingester: log error if closing client after transfer fails --- pkg/ingester/transfer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index a249ef2a0b27..ab3f1398990d 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -10,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" + "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/logproto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -159,7 +160,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { return err } if c, ok := c.(io.Closer); ok { - defer c.Close() + defer helpers.LogError("closing client", c.Close) } ic := c.(logproto.IngesterClient)