-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues #12220
Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk Issues #12220
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #12220 +/- ##
============================================
- Coverage 61.58% 61.47% -0.11%
+ Complexity 1153 1147 -6
============================================
Files 2417 2418 +1
Lines 131611 131697 +86
Branches 20317 20327 +10
============================================
- Hits 81057 80966 -91
- Misses 44613 44790 +177
Partials 5941 5941
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
8f24cc7
to
96f1279
Compare
.../main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
Show resolved
Hide resolved
fb03ae9
to
a4e98ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me in high level 👍
Can we add the test for this feature?
.../main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
0fbb2c2
to
ea8181d
Compare
Added the test. |
59e86ac
to
8076adb
Compare
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
Show resolved
Hide resolved
.../main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
Show resolved
Hide resolved
.../main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
...t/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
Outdated
Show resolved
Hide resolved
02cfad9
to
00f5449
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise. Thanks for adding this functionality !
.../main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Show resolved
Hide resolved
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
Show resolved
Hide resolved
f9f16a4
to
45a4ac4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM !
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
Show resolved
Hide resolved
4ec7355
to
acfcdeb
Compare
…ll the records to recordreaderfileconfig.
- Added further tests for SegmentProcessorFramework. - Removed redundant checks for SegmentProcessorFramework. - Detected an edge case and added an additional statement in SegmentMapper.
- Pass the total count of RecordReaders through SegmentMapper Constructor instead of map() arguments.
* kept the logging for default scenario same as before * conditionally logged the progress between iterations in case the feature is enabled.
acfcdeb
to
f7f783b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thank you for working on this feature!
Can we make sure that all tests pass?
...t/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
Outdated
Show resolved
Hide resolved
ce32e17
to
dfeb7d6
Compare
- PR apache#12290 touched dimBaseballTeams.csv on which the tests depends on. Modified the test to reflect that. - Changed import statements (addressed comment).
dfeb7d6
to
2d98e31
Compare
The test was failing because of another change. I have fixed them now. |
LGTM! Thank you for working on this 👍 |
Problem
During the segment generation process for offline ingestion through
SegmentProcessorFramework
, the size of the intermediate files during the map phase can sometimes increase drastically causing Out of Disk issues. This feature aims to aid segment generation configured by bounded size of the intermediate files in the map phase.Solution
Design Doc
Adaptive map-transform Phase (Original idea from Manish’s Design for Smart Ingestion)
During the map phase, we will keep track of the total bytes written to the disk so far, for the intermediate files (Y). We will need to keep track of the bytes written for each write for the intermediate files in the map phase.
Currently SegmentProcessorFramework does all the steps (Map/Reduce/Segment Generation) in one step. The idea is to break the process into multiple iterations bound by a configurable threshold (targetFileSize) for the size of mapper output.
In each iteration at any point we can have 2 possibilities:
If Y < targetFileSize
If Y >= targetFileSize, we do the following:
Keep track of current file record reader being ingested (i.e we are at ith recordReader, and that internally keeps track of current row via its iterator).
Terminate map phase and pass the output of the mapper to reduce phase.
The output from the reduce phase will be input to the segment generation phase and segments will be generated based on this.
Resume map phase from where we left off (RecordReader) in the next iteration.
This process ensures that the map phase is bound by the targetFileSize we set, as we ingest rows from a file.
This PR implements the adaptive segment generation configured by bounded intermediate file size.
Release Notes
segmentMapperFileSizeThresholdInBytes
is introduced to specify the threshold of the size of intermediate files during map phase in thetaskConfig
. The default value of this threshold will be Long.MAX_VALUE.SegmentProcessorFramework
will be the current behaviour i.e. do map and reduce in one step to create intermediate files for all the record readers and generate segments from them.segmentMapperFileSizeThresholdInBytes
in the taskConfig of the respective tasks.MergeRollupTask
andRealtimeToOfflineSegmentsTask
generates segments through SegmentProcessorFramework so these tasks are supported for this feature.Example Config :
set intermediate mapper output file size as 1000000000 bytes