Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream query responses from boltdb index client #5468

Merged
merged 5 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
return nil, err
}

gateway := indexgateway.NewIndexGateway(shipperIndexClient.(*shipper.Shipper))
gateway := indexgateway.NewIndexGateway(shipperIndexClient)
indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway)
return gateway, nil
}
Expand Down
89 changes: 63 additions & 26 deletions pkg/storage/chunk/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package local
import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"os"
Expand All @@ -13,6 +12,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/storage/chunk"
Expand Down Expand Up @@ -268,7 +268,15 @@ func (b *BoltIndexClient) QueryWithCursor(_ context.Context, c *bbolt.Cursor, qu

rowPrefix := []byte(query.HashValue + separator)

var batch boltReadBatch
// sync.WaitGroup is needed to wait for the caller to finish processing all the index entries being streamed
wg := sync.WaitGroup{}
batch := newReadBatch()
defer func() {
batch.done()
wg.Wait()
}()

callbackDone := false

for k, v := c.Seek(start); k != nil; k, v = c.Next() {
if !bytes.HasPrefix(k, rowPrefix) {
Expand All @@ -282,16 +290,32 @@ func (b *BoltIndexClient) QueryWithCursor(_ context.Context, c *bbolt.Cursor, qu
continue
}

// we need to do callback only once to pass the batch iterator
if !callbackDone {
wg.Add(1)
// do the callback in a goroutine to stream back the index entries
go func() {
// wait for callback to finish processing the batch and return
defer wg.Done()
callback(query, batch)
}()
callbackDone = true
}

// make a copy since k, v are only valid for the life of the transaction.
// See: https://godoc.org/github.com/boltdb/bolt#Cursor.Seek
batch.rangeValue = make([]byte, len(k)-len(rowPrefix))
copy(batch.rangeValue, k[len(rowPrefix):])
rangeValue := make([]byte, len(k)-len(rowPrefix))
copy(rangeValue, k[len(rowPrefix):])

batch.value = make([]byte, len(v))
copy(batch.value, v)
value := make([]byte, len(v))
copy(value, v)

if !callback(query, &batch) {
break
err := batch.send(singleResponse{
rangeValue: rangeValue,
value: value,
})
if err != nil {
return errors.Wrap(err, "failed to send row while processing boltdb index query")
}
}

Expand Down Expand Up @@ -334,36 +358,49 @@ func (b *BoltWriteBatch) Add(tableName, hashValue string, rangeValue []byte, val
writes.puts[key] = value
}

type boltReadBatch struct {
type singleResponse struct {
rangeValue []byte
value []byte
}

func (b boltReadBatch) Iterator() chunk.ReadBatchIterator {
return &boltReadBatchIterator{
boltReadBatch: b,
}
type readBatch struct {
respChan chan singleResponse
curr singleResponse
}

type boltReadBatchIterator struct {
consumed bool
boltReadBatch
func newReadBatch() *readBatch {
return &readBatch{respChan: make(chan singleResponse)}
}

func (b *boltReadBatchIterator) Next() bool {
if b.consumed {
return false
}
b.consumed = true
return true
func (r *readBatch) Iterator() chunk.ReadBatchIterator {
return r
}

func (b *boltReadBatchIterator) RangeValue() []byte {
return b.rangeValue
func (r *readBatch) Next() bool {
var ok bool
r.curr, ok = <-r.respChan
return ok
}

func (b *boltReadBatchIterator) Value() []byte {
return b.value
func (r *readBatch) RangeValue() []byte {
return r.curr.rangeValue
}

func (r *readBatch) Value() []byte {
return r.curr.value
}

func (r *readBatch) done() {
close(r.respChan)
}

func (r *readBatch) send(resp singleResponse) error {
select {
case r.respChan <- resp:
return nil
case <-time.After(10 * time.Second):
return errors.New("timed out sending response")
}
}

// Open the database.
Expand Down
144 changes: 143 additions & 1 deletion pkg/storage/stores/shipper/gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,29 @@ import (
"fmt"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

"github.com/grafana/dskit/flagext"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_math "github.com/grafana/loki/pkg/util/math"
)

const (
Expand All @@ -30,6 +42,10 @@ const (
// response prefixes
rangeValuePrefix = "range-value"
valuePrefix = "value"

// the number of index entries for benchmarking will be divided amongst numTables
benchMarkNumEntries = 1000000
numTables = 50
)

type mockIndexGatewayServer struct{}
Expand Down Expand Up @@ -141,3 +157,129 @@ func TestGatewayClient(t *testing.T) {

require.Equal(t, len(queries), numCallbacks)
}

func buildTableName(i int) string {
return fmt.Sprintf("%s%d", tableNamePrefix, i)
}

func benchmarkIndexQueries(b *testing.B, queries []chunk.IndexQuery) {
buffer := 1024 * 1024
listener := bufconn.Listen(buffer)

// setup the grpc server
s := grpc.NewServer(grpc.ChainStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return middleware.StreamServerUserHeaderInterceptor(srv, ss, info, handler)
}))
conn, _ := grpc.DialContext(context.Background(), "", grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}), grpc.WithInsecure())
defer func() {
s.Stop()
conn.Close()
}()

// setup test data
dir := b.TempDir()
bclient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{
Directory: dir + "/boltdb",
})
require.NoError(b, err)

for i := 0; i < numTables; i++ {
// setup directory for table in both cache and object storage
tableName := buildTableName(i)
objectStorageDir := filepath.Join(dir, "index", tableName)
cacheDir := filepath.Join(dir, "cache", tableName)
require.NoError(b, os.MkdirAll(objectStorageDir, 0777))
require.NoError(b, os.MkdirAll(cacheDir, 0777))

// add few rows at a time to the db because doing to many writes in a single transaction puts too much strain on boltdb and makes it slow
for i := 0; i < benchMarkNumEntries/numTables; i += 10000 {
end := util_math.Min(i+10000, benchMarkNumEntries/numTables)
// setup index files in both the cache directory and object storage directory so that we don't spend time syncing files at query time
testutil.AddRecordsToDB(b, filepath.Join(objectStorageDir, "db1"), bclient, i, end-i, []byte("index"))
testutil.AddRecordsToDB(b, filepath.Join(cacheDir, "db1"), bclient, i, end-i, []byte("index"))
}
}

fs, err := local.NewFSObjectClient(local.FSConfig{
Directory: dir,
})
require.NoError(b, err)
tm, err := downloads.NewTableManager(downloads.Config{
CacheDir: dir + "/cache",
SyncInterval: 15 * time.Minute,
CacheTTL: 15 * time.Minute,
QueryReadyNumDays: 30,
}, bclient, storage.NewIndexStorageClient(fs, "index/"), nil)
require.NoError(b, err)

// initialize the index gateway server
gw := indexgateway.NewIndexGateway(tm)
indexgatewaypb.RegisterIndexGatewayServer(s, gw)
go func() {
if err := s.Serve(listener); err != nil {
panic(err)
}
}()

// setup context for querying
ctx := user.InjectOrgID(context.Background(), "foo")
ctx, _ = user.InjectIntoGRPCRequest(ctx)

// initialize the gateway client
gatewayClient := GatewayClient{}
gatewayClient.grpcClient = indexgatewaypb.NewIndexGatewayClient(conn)

// build the response we expect to get from queries
expected := map[string]int{}
for i := 0; i < benchMarkNumEntries/numTables; i++ {
expected[strconv.Itoa(i)] = numTables
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
actual := map[string]int{}
syncMtx := sync.Mutex{}

err := gatewayClient.QueryPages(ctx, queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
itr := batch.Iterator()
for itr.Next() {
syncMtx.Lock()
actual[string(itr.Value())]++
syncMtx.Unlock()
}
return true
})
require.NoError(b, err)
require.Equal(b, expected, actual)
}
}

func Benchmark_QueriesMatchingSingleRow(b *testing.B) {
queries := []chunk.IndexQuery{}
// do a query per row from each of the tables
for i := 0; i < benchMarkNumEntries/numTables; i++ {
for j := 0; j < numTables; j++ {
queries = append(queries, chunk.IndexQuery{
TableName: buildTableName(j),
RangeValuePrefix: []byte(strconv.Itoa(i)),
ValueEqual: []byte(strconv.Itoa(i)),
})
}
}

benchmarkIndexQueries(b, queries)
}

func Benchmark_QueriesMatchingLargeNumOfRows(b *testing.B) {
var queries []chunk.IndexQuery
// do a query per table matching all the rows from it
for j := 0; j < numTables; j++ {
queries = append(queries, chunk.IndexQuery{
TableName: buildTableName(j),
})
}
benchmarkIndexQueries(b, queries)
}
17 changes: 11 additions & 6 deletions pkg/storage/stores/shipper/indexgateway/gateway.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
package indexgateway

import (
"context"
"sync"

"github.com/grafana/dskit/services"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

const maxIndexEntriesPerResponse = 1000

type IndexQuerier interface {
QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk.QueryPagesCallback) error
Stop()
}

type gateway struct {
services.Service

shipper chunk.IndexClient
indexQuerier IndexQuerier
}

func NewIndexGateway(shipperIndexClient *shipper.Shipper) *gateway {
func NewIndexGateway(indexQuerier IndexQuerier) *gateway {
g := &gateway{
shipper: shipperIndexClient,
indexQuerier: indexQuerier,
}
g.Service = services.NewIdleService(nil, func(failureCase error) error {
g.shipper.Stop()
g.indexQuerier.Stop()
return nil
})
return g
Expand All @@ -46,7 +51,7 @@ func (g *gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server i
}

sendBatchMtx := sync.Mutex{}
outerErr = g.shipper.QueryPages(server.Context(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
outerErr = g.indexQuerier.QueryPages(server.Context(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
innerErr = buildResponses(query, batch, func(response *indexgatewaypb.QueryIndexResponse) error {
// do not send grpc responses concurrently. See https://github.com/grpc/grpc-go/blob/master/stream.go#L120-L123.
sendBatchMtx.Lock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/indexgateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestGateway_QueryIndex(t *testing.T) {
}
expectedQueryKey = util.QueryKey(query)

gateway.shipper = mockIndexClient{response: &mockBatch{size: responseSize}}
gateway.indexQuerier = mockIndexClient{response: &mockBatch{size: responseSize}}
err := gateway.QueryIndex(&indexgatewaypb.QueryIndexRequest{Queries: []*indexgatewaypb.IndexQuery{{
TableName: query.TableName,
HashValue: query.HashValue,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
)

func AddRecordsToDB(t *testing.T, path string, dbClient *local.BoltIndexClient, start, numRecords int, bucketName []byte) {
func AddRecordsToDB(t testing.TB, path string, dbClient *local.BoltIndexClient, start, numRecords int, bucketName []byte) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need testing.TB here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since we use it with both tests and benchmarks.

t.Helper()
db, err := local.OpenBoltdbFile(path)
require.NoError(t, err)
Expand Down