Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix performance problem of base chunk forward index write #7930

Merged

Conversation

sajjad-moradi
Copy link
Contributor

Description

This PR fixes the performance issue of BaseChunkSVForwardIndexWriter by using a byte buffer instead of memory mapping of each compressed chunk. Please refer to #7929 for more details.

Testing Done

For one table that has a non-dictionary column, with existing code it takes more than 30 minutes to build the index, but with the fix it takes around one minute to do so.

Copy link
Member

@richardstartin richardstartin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, the impact on fixed width data which always use tiny buffers, wasn’t considered when removing the buffer for compressed chunks.

@codecov-commenter
Copy link

codecov-commenter commented Dec 18, 2021

Codecov Report

Merging #7930 (aa8477e) into master (6037cac) will increase coverage by 0.00%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #7930   +/-   ##
=========================================
  Coverage     71.15%   71.15%           
- Complexity     4111     4113    +2     
=========================================
  Files          1593     1593           
  Lines         82365    82362    -3     
  Branches      12270    12269    -1     
=========================================
- Hits          58609    58607    -2     
  Misses        19806    19806           
+ Partials       3950     3949    -1     
Flag Coverage Δ
integration1 29.12% <0.00%> (+0.01%) ⬆️
integration2 27.62% <0.00%> (+0.03%) ⬆️
unittests1 68.07% <100.00%> (+<0.01%) ⬆️
unittests2 14.32% <0.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../io/writer/impl/BaseChunkSVForwardIndexWriter.java 85.96% <100.00%> (+0.96%) ⬆️
...core/query/executor/ServerQueryExecutorV1Impl.java 83.33% <0.00%> (-4.42%) ⬇️
...pinot/core/query/request/context/TimerContext.java 91.66% <0.00%> (-4.17%) ⬇️
.../pinot/core/query/scheduler/PriorityScheduler.java 80.82% <0.00%> (-2.74%) ⬇️
.../org/apache/pinot/core/startree/StarTreeUtils.java 69.89% <0.00%> (-2.16%) ⬇️
...not/broker/broker/helix/ClusterChangeMediator.java 77.65% <0.00%> (-2.13%) ⬇️
.../helix/core/realtime/SegmentCompletionManager.java 72.00% <0.00%> (-1.02%) ⬇️
...ata/manager/realtime/RealtimeTableDataManager.java 67.88% <0.00%> (-0.82%) ⬇️
...a/org/apache/pinot/core/common/DataBlockCache.java 91.44% <0.00%> (-0.66%) ⬇️
.../aggregation/function/ModeAggregationFunction.java 88.37% <0.00%> (ø)
... and 8 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6037cac...aa8477e. Read the comment docs.

@richardstartin
Copy link
Member

For posterity, we do have a benchmark which would catch this regression if it were to happen again:

master:

Benchmark                                  (_chunkCompressionType)      (_distribution)  (_maxChunkSize)  (_records)  Mode  Cnt           Score      Error  Units
BenchmarkRawForwardIndexWriter.writeV3                         LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5       81529.035 ± 2089.707  ms/op
BenchmarkRawForwardIndexWriter.writeV3:b                       LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5  7796873815.000                 #
BenchmarkRawForwardIndexWriter.writeV3:kb                      LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5     7614130.000                 #
BenchmarkRawForwardIndexWriter.writeV3:mb                      LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5        7435.000                 #
BenchmarkRawForwardIndexWriter.writeV3                         LZ4           EXP(0.001)          1048576      100000    ss    5       13314.818 ±  302.889  ms/op
BenchmarkRawForwardIndexWriter.writeV3:b                       LZ4           EXP(0.001)          1048576      100000    ss    5  1482381585.000                 #
BenchmarkRawForwardIndexWriter.writeV3:kb                      LZ4           EXP(0.001)          1048576      100000    ss    5     1447635.000                 #
BenchmarkRawForwardIndexWriter.writeV3:mb                      LZ4           EXP(0.001)          1048576      100000    ss    5        1410.000                 #

branch:

Benchmark                                  (_chunkCompressionType)      (_distribution)  (_maxChunkSize)  (_records)  Mode  Cnt           Score    Error  Units
BenchmarkRawForwardIndexWriter.writeV3                         LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5        7690.783 ± 87.355  ms/op
BenchmarkRawForwardIndexWriter.writeV3:b                       LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5  7796873815.000               #
BenchmarkRawForwardIndexWriter.writeV3:kb                      LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5     7614130.000               #
BenchmarkRawForwardIndexWriter.writeV3:mb                      LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5        7435.000               #
BenchmarkRawForwardIndexWriter.writeV3                         LZ4           EXP(0.001)          1048576      100000    ss    5        1438.566 ± 39.583  ms/op
BenchmarkRawForwardIndexWriter.writeV3:b                       LZ4           EXP(0.001)          1048576      100000    ss    5  1482381585.000               #
BenchmarkRawForwardIndexWriter.writeV3:kb                      LZ4           EXP(0.001)          1048576      100000    ss    5     1447635.000               #
BenchmarkRawForwardIndexWriter.writeV3:mb                      LZ4           EXP(0.001)          1048576      100000    ss    5        1410.000               #

For comparison, the mmap approach does work well for the V4 raw index writer, which is slightly slower to build than V3 on this branch but is guaranteed never to use more than 1MB memory (and is slightly faster to read according to benchmarks):

Benchmark                                  (_chunkCompressionType)      (_distribution)  (_maxChunkSize)  (_records)  Mode  Cnt           Score     Error  Units
BenchmarkRawForwardIndexWriter.writeV4                         LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5        9857.203 ± 167.084  ms/op
BenchmarkRawForwardIndexWriter.writeV4:b                       LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5  7781048545.000                #
BenchmarkRawForwardIndexWriter.writeV4:kb                      LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5     7598680.000                #
BenchmarkRawForwardIndexWriter.writeV4:mb                      LZ4  UNIFORM(1000,10000)          1048576      100000    ss    5        7420.000                #
BenchmarkRawForwardIndexWriter.writeV4                         LZ4           EXP(0.001)          1048576      100000    ss    5        1802.487 ± 112.871  ms/op
BenchmarkRawForwardIndexWriter.writeV4:b                       LZ4           EXP(0.001)          1048576      100000    ss    5  1416542875.000                #
BenchmarkRawForwardIndexWriter.writeV4:kb                      LZ4           EXP(0.001)          1048576      100000    ss    5     1383340.000                #
BenchmarkRawForwardIndexWriter.writeV4:mb                      LZ4           EXP(0.001)          1048576      100000    ss    5        1350.000                #

_dataChannel = new RandomAccessFile(file, "rw").getChannel();
_header = _dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, _dataOffset);
writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version);
_compressedBuffer = ByteBuffer.allocateDirect(chunkSize * 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to use ByteBuffer.allocateDirect(_chunkCompressor.maxCompressedSize(chunkSize)) instead because most compressors can produce a much tighter bound than 2x. E.g. LZ4 (default) produces 8240 for 8192 and 1052704 for 1MB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Refactored.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sajjad-moradi @richardstartin, I think 2x was being used to account for worst case where there can be negative compression which is a possibility (although unlikely) imo. Unless we are 100% sure that our compression codecs (LZ4, snappy, ZSTD) are never going to result in negative compression for any kind of data, using chunkSize is fine. Otherwise, I suggest moving it back to 2x (old code) or something more than chunkSize at least

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each compression codec provides a bound for the worst case, i.e. when the data is incompressible and the result is larger than the input.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have these bounds available at run time (preferably as an API)? Otherwise, instead of trying to optimize to something we don't know, we should revert to code that we know works in production environment

Copy link
Member

@richardstartin richardstartin Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These bounds are specified by the compression codecs themselves. They take the size of the uncompressed input as a parameter and produce the largest possible compressed size for that input size. This isn't trying to optimise something we don't know, but making proper use of compression libraries. Playing devil's advocate, I'm curious how you know 2x is large enough for any data? Wouldn't you prefer to use the compression library's specified bounds?

Copy link
Member

@richardstartin richardstartin Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity, here are links to the documentation of the functions we call in to via JNI:

LZ4_compressBound is described here
Snappy::MaxCompressedLength is described here
ZSTD_compressBound is described here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siddharthteotia What do you mean by negative compression? If you mean compressed size is more than the input size, then this maxCompressedSize function returns the upper bound for that - so we don't need to always go with 2x size.
BTW here's the max value for the input size of 1000 for different compressors:

pass through: 1000
snappy: 1198
zsandard: 1066
lz4: 1019
lz4 with length compressor: 1023

@mcvsubbu mcvsubbu merged commit f27cf0b into apache:master Dec 20, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants