Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize segment commit to not read partition group metadata #11943

Merged
merged 1 commit into from
Nov 20, 2023

Conversation

Jackie-Jiang
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang commented Nov 3, 2023

Solves #11812
Currently when committing a real-time segment, controller needs to read partition group metadata for all partitions from upstream, which can be very slow for stream with lots of partitions.
The partition group metadata is used only to extract the partition ids, which can be simply derived from partition count except for stream that closes partitions such as Kinesis.

In this PR, we made the following changes:

  1. Only read partition count from upstream if available
  2. If partition count is not available, fall back to the current approach
  3. Log the time spent in each step for debugging
  4. In SegmentFlushThresholdComputer, remove the logic of only counting the segments with smallest partition id for the size ratio because it complicates the handling (quite anti-pattern as any segment commit requires info from all partitions) a lot, and I don't see much value from it. Quickly converging to the size ratio of recent data trend should be a pro instead of a con because this ratio is used to decide the segment size to consume data for the same period of time

@Jackie-Jiang Jackie-Jiang added the release-notes Referenced by PRs that need attention when compiling the next release notes label Nov 3, 2023
@Jackie-Jiang
Copy link
Contributor Author

cc @jadami10

@codecov-commenter
Copy link

codecov-commenter commented Nov 3, 2023

Codecov Report

Attention: 27 lines in your changes are missing coverage. Please review.

Comparison is base (09da0ea) 61.63% compared to head (b65dddf) 61.58%.

Files Patch % Lines
...in/stream/kafka20/KafkaStreamMetadataProvider.java 0.00% 10 Missing ⚠️
...in/stream/pulsar/PulsarStreamMetadataProvider.java 11.11% 8 Missing ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 83.33% 6 Missing and 1 partial ⚠️
...pache/pinot/spi/stream/StreamMetadataProvider.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #11943      +/-   ##
============================================
- Coverage     61.63%   61.58%   -0.06%     
+ Complexity     1153     1146       -7     
============================================
  Files          2385     2385              
  Lines        129324   129359      +35     
  Branches      20022    20025       +3     
============================================
- Hits          79707    79660      -47     
- Misses        43808    43881      +73     
- Partials       5809     5818       +9     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (ø)
integration <0.01% <0.00%> (ø)
integration1 <0.01% <0.00%> (ø)
integration2 0.00% <0.00%> (ø)
java-11 61.57% <59.70%> (+26.54%) ⬆️
java-21 27.56% <59.70%> (-33.96%) ⬇️
skip-bytebuffers-false 61.57% <59.70%> (-0.05%) ⬇️
skip-bytebuffers-true 27.56% <59.70%> (-33.92%) ⬇️
temurin 61.58% <59.70%> (-0.06%) ⬇️
unittests 61.57% <59.70%> (-0.06%) ⬇️
unittests1 46.92% <0.00%> (-0.07%) ⬇️
unittests2 27.56% <59.70%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang force-pushed the optimize_segment_commit branch 2 times, most recently from 00ca463 to 1dc45b7 Compare November 3, 2023 22:34
@mcvsubbu
Copy link
Contributor

mcvsubbu commented Nov 3, 2023

[Without looking at the code changes]
Using the smallest partitionID is because of the algorthm that optimizes the segment size. All partition IDs commit roughly at the same time, so we will not come up with newer values to correct the segment size soon.

@Jackie-Jiang
Copy link
Contributor Author

[Without looking at the code changes] Using the smallest partitionID is because of the algorthm that optimizes the segment size. All partition IDs commit roughly at the same time, so we will not come up with newer values to correct the segment size soon.

Let's say I have 8 partitions, all partitions commit at roughly the same time but partition 0 is committed last, we will lose the segment size tuning for all 7 partitions. This PR changes it so that all segment commit will contribute to the segment size tuning, and IMO it will give better segment size prediction

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Nov 4, 2023

[Without looking at the code changes] Using the smallest partitionID is because of the algorthm that optimizes the segment size. All partition IDs commit roughly at the same time, so we will not come up with newer values to correct the segment size soon.

Let's say I have 8 partitions, all partitions commit at roughly the same time but partition 0 is committed last, we will lose the segment size tuning for all 7 partitions. This PR changes it so that all segment commit will contribute to the segment size tuning, and IMO it will give better segment size prediction

No, you want to count only one segment in the group that finishes together. That is because of the formula we use where we pay most attention to the past segments and less attention to the most recent segment size (R_next = R_prev * 0.75 + R_current * 0.25).

(It could be 0.9 and 0.1, not sure).

So, the most recent segment counted 8 times (in your example) will soil the "past" value to be incorrectly weighted as the present value.

@Jackie-Jiang
Copy link
Contributor Author

No, you want to count only one segment in the group that finishes together. That is because of the formula we use where we pay most attention to the past segments and less attention to the most recent segment size (R_next = R_prev * 0.75 + R_current * 0.25).

(It could be 0.9 and 0.1, not sure).

So, the most recent segment counted 8 times (in your example) will soil the "past" value to be incorrectly weighted as the present value.

The past weight is 0.9 and the current weight is 0.1
In regular case the ratio for old and new segment should be very close. My point is when data distribution changes, why do we want it to pick up the new pattern so slowly? IMO we want to quickly adjust to the recent data distribution for better prediction. With current algorithm, I can see it gives more stable ratio, but when data distribution changes, it will take ~10 rounds to adjust to the new ratio, which can take multiple days. For certain stream types such as Kinesis, the smallest partition might be fully consumed, which will cause it not be able to update the ratio. Given these drawbacks and extra overhead, I feel counting all segments is a better approach.

@mcvsubbu
Copy link
Contributor

mcvsubbu commented Nov 6, 2023

I wrote this program to test out some hypothesis. According to this, it takes slightly more number of iterations to stabilize to the right segment size if we apply the algorithm for all partitions. I tried with numPartitions = 1 and numPartitions=32.

Assumptions made here:

  • The segment size varies with numrows in a logarithmic fashion (see program). This may be wrong, open to corrections.
  • Segment size reported has a noise of 5%

I think all partitions are somewhat same in behavior, so we should count each partition once so as to learn best from previous completions. One way we could improve is (with the assumptions that partitions complete roughly at the same time), pick the partition that completes first (may not be partition 0, of course). This was considered at the time, but was hard to do, given that partitions may complete within minutes in some cases.

public class SegSizeAlgorithmTester {
  static Double sizeToRowsRatio = null;

  static final double ALPHA = 0.9;
  static final long optimalSegmentSize = 350*1024*1024;

  // Assume segment size varies with number of rows as:
  // segmentSize = A + B * ln (numRows/C), with a noise variation of 5%
  static final double A = 4096;
  static final double B = 9.297e+7;
  static final double C = 9.7e+4;
  static final double noisePercent = 5;

  final static int nPartitions = 32;

  public static void main(String args[]) {
    int nIterations = 0;  // One iteration is completion of all partitions (roughly at the same time)

    int reportedNumRows = computeNumRows(0, 0);
    for (int s = 0; s < 50; s++) {
      double reportedSegSize = 0;
      int numRows = 0;
      for (int j = 0; j < nPartitions; j++) {
        // All partitions report the same number of rows
        reportedSegSize = A + B * Math.log((double) reportedNumRows / C);
        reportedSegSize = addNoiseToReportedSegSize(reportedSegSize, noisePercent);
        numRows = computeNumRows(reportedNumRows, (long) reportedSegSize);
      }
      nIterations++;
      double deviationPercent = Math.abs(optimalSegmentSize-reportedSegSize) *100/optimalSegmentSize;
      System.out.println(nIterations + " " + deviationPercent + " " +  reportedNumRows + " " + reportedSegSize);
      reportedNumRows = numRows;
    }
  }

  private static double addNoiseToReportedSegSize(double segSize, double noisePercent) {
    double noise = noisePercent * Math.random() * segSize/100.0;
    if (Math.random() > 0.5) {
      return segSize + noise;
    } else {
      return segSize - noise;
    }
  }

  private static int computeNumRows(int numRowsConsumed, long segmentSize) {
    if (segmentSize == 0) { // first segment of table
      return 100_000; // First guess on number of rows
    }

    double   currentRatio = (double)segmentSize/numRowsConsumed;
    if (sizeToRowsRatio != null) {
      sizeToRowsRatio = sizeToRowsRatio * (1 - ALPHA) + currentRatio * ALPHA;
    } else {
      sizeToRowsRatio = currentRatio;
    }

    int newNumRows;

    if (segmentSize <= 0.5 * optimalSegmentSize) {
      // Quicker ramp up
      newNumRows = numRowsConsumed * 3/2;
    } else if (segmentSize >= 2 * optimalSegmentSize) {
      // Even quicker ramp down
      newNumRows = numRowsConsumed/2;
    } else {
      newNumRows = (int) (optimalSegmentSize / sizeToRowsRatio);
    }

    return newNumRows;
  }
}

@Jackie-Jiang
Copy link
Contributor Author

Jackie-Jiang commented Nov 6, 2023

@mcvsubbu Thanks for taking time writing this program!

According to this, it takes slightly more number of iterations to stabilize to the right segment size if we apply the algorithm for all partitions. I tried with numPartitions = 1 and numPartitions=32.

Conceptually, with numPartitions=32, it should take much less iterations comparing to numPartitions=1 to stabilize to the desired segment size. The reason why it didn't show in the experiment is because we assume all partitions report the same number of rows within an iteration. Within the same iteration, the segment committed earlier can contribute to a more accurate segment size ratio to be picked up by the next committing segment, which is not captured in the program.
The program also assumes all segment commit at roughly the same time, which is not always the case (actually we probably want to avoid this because it can cause hotspot).

Based on the above analysis, do you think we can simplify this handling to just count all committing segments?

@Jackie-Jiang Jackie-Jiang merged commit 8e84fc3 into apache:master Nov 20, 2023
19 checks passed
@Jackie-Jiang Jackie-Jiang deleted the optimize_segment_commit branch November 20, 2023 19:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants