Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingester: support chunk transfers on ingester shutdown. #794

Merged
merged 6 commits into from
Jul 26, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func NewMemChunk(enc Encoding) *MemChunk {
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cPool: &Gzip,
head: &headBlock{}, // Dummy, empty headblock.
cPool: &Gzip,
encoding: EncGZIP,
head: &headBlock{}, // Dummy, empty headblock.
}

db := decbuf{b: b}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
return struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}{
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
Closer: conn,
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
IngesterClient: logproto.NewIngesterClient(conn),
Closer: conn,
}, nil
}
3 changes: 2 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
chunks: map[string][]chunk.Chunk{},
}

ing, err := New(cfg, store)
ing, err := New(cfg, client.Config{}, store)
require.NoError(t, err)

return store, ing
Expand Down
49 changes: 30 additions & 19 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
)

// ErrReadOnly is returned when the ingester is shutting down and a push was
// attempted.
var ErrReadOnly = errors.New("Ingester is shutting down")

var readinessProbeSuccess = []byte("Ready")

var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
Expand All @@ -31,18 +36,25 @@ var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
type Config struct {
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

// Config for transferring chunks.
MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushed", 16, "")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "")
Expand All @@ -53,10 +65,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// Ingester builds chunks for incoming log streams.
type Ingester struct {
cfg Config
cfg Config
clientConfig client.Config

instancesMtx sync.RWMutex
instances map[string]*instance
readonly bool

lifecycler *ring.Lifecycler
store ChunkStore
Expand All @@ -77,14 +91,19 @@ type ChunkStore interface {
}

// New makes a new Ingester.
func New(cfg Config, store ChunkStore) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}

i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
}

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
Expand Down Expand Up @@ -138,21 +157,13 @@ func (i *Ingester) Stopping() {
}
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {

}

// TransferOut implements ring.Lifecycler.
func (i *Ingester) TransferOut(context.Context) error {
return nil
}

// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
} else if i.readonly {
return nil, ErrReadOnly
}

instance := i.getOrCreateInstance(instanceID)
Expand All @@ -162,7 +173,7 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro

func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
inst, ok := i.getInstanceByID(instanceID)
if ok {
if ok || i.readonly {
rfratto marked this conversation as resolved.
Show resolved Hide resolved
return inst
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
Expand All @@ -21,7 +22,7 @@ func TestIngester(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, store)
i, err := New(ingesterConfig, client.Config{}, store)
require.NoError(t, err)
defer i.Shutdown()

Expand Down
19 changes: 19 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ func newInstance(instanceID string, blockSize int) *instance {
}
}

// consumeChunk manually adds a chunk that was received during ingester chunk
// transfer.
func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

fp := client.FastFingerprint(labels)
stream, ok := i.streams[fp]
if !ok {
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
}

return stream.consumeChunk(ctx, chunk)
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int
}
}

// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data)
if err != nil {
return err
}

s.chunks = append(s.chunks, chunkDesc{
chunk: c,
})
chunksCreatedTotal.Inc()
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
Expand Down
Loading