Skip to content

Commit

Permalink
Use pgzip in the compactor. (#3672)
Browse files Browse the repository at this point in the history
Since index file are expected to be large, this will help to compress/decompress in parallel.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored May 3, 2021
1 parent dbeb984 commit a454f7e
Show file tree
Hide file tree
Showing 11 changed files with 1,388 additions and 6 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible
github.com/json-iterator/go v1.1.10
github.com/klauspost/compress v1.11.3
github.com/klauspost/pgzip v1.2.5
github.com/mitchellh/mapstructure v1.4.1
github.com/modern-go/reflect2 v1.0.1
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,8 @@ github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.5 h1:qnWYvvKqedOF2ulHpMG72XQol4ILEJ8k2wwRl/Km8oE=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/knq/sysutil v0.0.0-20191005231841-15668db23d08/go.mod h1:dFWs1zEqDjFtnBXsd1vPOZaLsESovai349994nHx3e0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
54 changes: 48 additions & 6 deletions pkg/storage/stores/shipper/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,60 @@ import (
"os"
"runtime/debug"
"strings"
"sync"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
gzip "github.com/klauspost/pgzip"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/chunkenc"
)

const delimiter = "/"

var (
gzipReader = sync.Pool{}
gzipWriter = sync.Pool{}
)

// getGzipReader gets or creates a new CompressionReader and reset it to read from src
func getGzipReader(src io.Reader) io.Reader {
if r := gzipReader.Get(); r != nil {
reader := r.(*gzip.Reader)
err := reader.Reset(src)
if err != nil {
panic(err)
}
return reader
}
reader, err := gzip.NewReader(src)
if err != nil {
panic(err)
}
return reader
}

// putGzipReader places back in the pool a CompressionReader
func putGzipReader(reader io.Reader) {
gzipReader.Put(reader)
}

// getGzipWriter gets or creates a new CompressionWriter and reset it to write to dst
func getGzipWriter(dst io.Writer) io.WriteCloser {
if w := gzipWriter.Get(); w != nil {
writer := w.(*gzip.Writer)
writer.Reset(dst)
return writer
}
return gzip.NewWriter(dst)
}

// PutWriter places back in the pool a CompressionWriter
func putGzipWriter(writer io.WriteCloser) {
gzipWriter.Put(writer)
}

type StorageClient interface {
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
}
Expand All @@ -44,8 +86,8 @@ func GetFileFromStorage(ctx context.Context, storageClient StorageClient, object

var objectReader io.Reader = readCloser
if strings.HasSuffix(objectKey, ".gz") {
decompressedReader := chunkenc.Gzip.GetReader(readCloser)
defer chunkenc.Gzip.PutReader(decompressedReader)
decompressedReader := getGzipReader(readCloser)
defer putGzipReader(decompressedReader)

objectReader = decompressedReader
}
Expand Down Expand Up @@ -108,8 +150,8 @@ func CompressFile(src, dest string) error {
}
}()

compressedWriter := chunkenc.Gzip.GetWriter(compressedFile)
defer chunkenc.Gzip.PutWriter(compressedWriter)
compressedWriter := getGzipWriter(compressedFile)
defer putGzipWriter(compressedWriter)

_, err = io.Copy(compressedWriter, uncompressedFile)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions vendor/github.com/klauspost/pgzip/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions vendor/github.com/klauspost/pgzip/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/github.com/klauspost/pgzip/GO_LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/klauspost/pgzip/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions vendor/github.com/klauspost/pgzip/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a454f7e

Please sign in to comment.