Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Consistent Data Push for Standalone Segment Push Job Runners #9295

Merged
merged 11 commits into from
Sep 7, 2022

Conversation

yuanbenson
Copy link
Contributor

@yuanbenson yuanbenson commented Aug 29, 2022

Description:

This PR addresses #9268 for the segment push job runners under the standalone execution framework: SegmentMetadataPushJobRunner, SegmentTarPushJobRunner, and SegmentUriPushJobRunner.

This is accomplished by introducing a new class ConsistentDataPushUtils which contains APIs and helpers for *PushJobRunner(s) to call to invoke the consistent push protocol.

Since there are large overlaps in the code for all of the *PushJobRunner(s), also took this opportunity to refactor and extract the common logic out to BaseSegmentPushJobRunner abstract class.

To enable consistent data push, this PR also introduces a new boolean config in table config under
TableConfig->IngestionConfig->BatchIngestionConfig->consistentDataPush.

Users can enable consistent data push by setting the consistentDataPush config to true as below before invoking ingestion jobs,

...
    "batchIngestionConfig": {
      "segmentIngestionType": "REFRESH",
      "segmentIngestionFrequency": "DAILY",
      "consistentDataPush": true
    },
...

which will

  1. In the segment generation phase: inject timestamps to segment names (via segment postfix) in order to prevent segment name conflicts from overwriting existing segments directly for the REFRESH usecase and
  2. In the segment push/upload phase: wrap the segments replacement protocol around segments upload which will achieve atomic switching between old and new segments data. In the case of failure, revert and abort the swap, which will ensure broker only routes to the old segments.

Testing Done:

Added new test testUploadAndQueryWithConsistentPush in SegmentUploadIntegrationTest, which

  1. Runs SegmentMetadataPushJobRunner with consistent push enabled.
    [] -> [v1 segments]

  2. Checks that the segment lineage entry is in expected and completed state.

  3. Checks that count stars return expected outputs.

  4. Runs SegmentTarPushJobRunner with consistent push enabled.
    [v1 segments] -> [v2 segments]

  5. Checks again that the segment lineage entry is in expected and completed state.

  6. Checks again that count stars return expected outputs (that we have successfully bulk replaced the original set of segments).

@yuanbenson yuanbenson changed the title Consistent Data Push: Standalone Job Runners Consistent Data Push: Standalone Segment Push Job Runners Aug 29, 2022
@yuanbenson yuanbenson changed the title Consistent Data Push: Standalone Segment Push Job Runners Enable Consistent Data Push for Standalone Segment Push Job Runners Aug 29, 2022
@yuanbenson yuanbenson closed this Aug 30, 2022
@yuanbenson yuanbenson reopened this Aug 30, 2022
Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial Review

@@ -81,10 +89,15 @@ protected List<String> getBloomFilterColumns() {
return null;
}

@BeforeMethod
public void setUpTest()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is not actually necessary.

We can call TestUtils.ensureDirectoriesExistAndEmpty() function in setUp() and clean up those in cleanUp()

For 2 different tests, we can use different table names.

Copy link
Contributor Author

@yuanbenson yuanbenson Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir) in setUp is insufficient as there is another test case testUploadAndQuery that leaves data in the tmp directories.

Using two table names will also lead to more code for now, since it seems like BaseClusterIntegrationTest is written in a way that assumes one table/config per subclass, e.g. getTableName, createOfflineTableConfig.

Hence, I prefer to keep the test structure as is with the before and after method annotations, unless this is an anti-pattern. Another way is to create a different test class altogether, but combining the two here to not prevent slow down from too many tests.

@@ -217,10 +358,15 @@ public Boolean apply(@Nullable Void aVoid) {
}, 100L, 300_000, "Failed to load " + countStarResult + " documents", true);
}

@AfterMethod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also not necessary if we use different table names.

@codecov-commenter
Copy link

codecov-commenter commented Aug 30, 2022

Codecov Report

Merging #9295 (deae0ca) into master (0f4bcfc) will decrease coverage by 0.03%.
The diff coverage is 26.96%.

❗ Current head deae0ca differs from pull request most recent head f3aa9fb. Consider uploading reports for the commit f3aa9fb to get more accurate results

@@             Coverage Diff              @@
##             master    #9295      +/-   ##
============================================
- Coverage     69.80%   69.76%   -0.04%     
+ Complexity     4777     4703      -74     
============================================
  Files          1875     1878       +3     
  Lines         99860    99930      +70     
  Branches      15194    15192       -2     
============================================
+ Hits          69706    69718      +12     
- Misses        25231    25285      +54     
- Partials       4923     4927       +4     
Flag Coverage Δ
integration1 26.14% <13.31%> (-0.07%) ⬇️
integration2 24.84% <1.70%> (-0.15%) ⬇️
unittests1 66.99% <18.10%> (-0.10%) ⬇️
unittests2 15.28% <1.70%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...gestion/batch/common/BaseSegmentPushJobRunner.java 0.00% <0.00%> (ø)
...batch/standalone/SegmentMetadataPushJobRunner.java 0.00% <0.00%> (ø)
...tion/batch/standalone/SegmentTarPushJobRunner.java 0.00% <0.00%> (ø)
...tion/batch/standalone/SegmentUriPushJobRunner.java 0.00% <0.00%> (ø)
...t/segment/local/utils/ConsistentDataPushUtils.java 0.00% <0.00%> (ø)
...ingestion/batch/spec/SegmentNameGeneratorSpec.java 0.00% <0.00%> (ø)
...g/apache/pinot/spi/utils/IngestionConfigUtils.java 69.13% <0.00%> (-5.54%) ⬇️
...spi/utils/builder/ControllerRequestURLBuilder.java 0.00% <0.00%> (ø)
...i/config/table/ingestion/BatchIngestionConfig.java 60.00% <71.42%> (+2.85%) ⬆️
...e/pinot/common/utils/FileUploadDownloadClient.java 60.27% <74.41%> (+3.45%) ⬆️
... and 37 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@yuanbenson yuanbenson requested a review from snleee August 30, 2022 23:49
String responseString = response.getResponse();
JsonNode responseJsonNode = JsonUtils.stringToJsonNode(responseString);
Iterator<JsonNode> responseElements = responseJsonNode.elements();
while (responseElements.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we encapsulate the logic of fetching the segment names into a method, or is there any existing logic doing that right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify a bit more on the input parameters/ask for this method we are talking about? I'll be glad to extract that out if it does not already exist.

Ohh I see, I'll look into possibilities of reusing getSelectedSegments API that you previously brought in. That may be used when we support APPEND tables anyways! Thx.

AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
LOGGER.info("Start replace segment URIs: " + segmentsUris);

int attempts = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a larger number than 1 for a retrial attempt? What's the reasoning behind of doing a retry only for once?

for (URI uri : uriToLineageEntryIdMap.keySet()) {
String segmentLineageEntryId = uriToLineageEntryIdMap.get(uri);
try {
FILE_UPLOAD_DOWNLOAD_CLIENT.endReplaceSegments(
Copy link
Contributor

@snleee snleee Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add the retry here as well to keep the consistency with the startReplaceSegments call?


private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPushUtils.class);
private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient();
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee @jtao15
Please lmk if these params are appropriate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

tableTypeToFilter, excludeReplacedSegments);
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(() -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this..

@yuanbenson yuanbenson requested a review from snleee August 31, 2022 22:52

private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPushUtils.class);
private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient();
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

}
public void uploadSegments(Map<String, String> segmentsUriToTarPathMap)
throws AttemptsExceededException, RetriableOperationException {
SegmentPushUtils.pushSegments(_spec, _outputDirFS, new ArrayList<>(segmentsUriToTarPathMap.values()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snleee Do you think this might cause some performance loss (in the case of having many segments to push)? If so, might need to change pushSegments to take in Collection<String> as opposed to List<String>.

Copy link
Contributor

@snleee snleee Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For copying the elements from map.values() to List?

In our case, the worst-case scenario should be 10s thousands of segments for a table and I think it's OK to keep it this way. I don't think that it's a performance-critical path.

}
public void uploadSegments(Map<String, String> segmentsUriToTarPathMap)
throws AttemptsExceededException, RetriableOperationException {
SegmentPushUtils.sendSegmentUris(_spec, new ArrayList<>(segmentsUriToTarPathMap.keySet()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me otherwise! Thank you for addressing all the comments.

* tarPaths (values), or both may be used depending on upload mode.
*/
public List<String> getSegmentsToReplace(Map<String, String> segmentsUriToTarPathMap) {
return SegmentPushUtils.getSegmentNames(segmentsUriToTarPathMap.values());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put the logic here instead of putting in Util class. We usually extract the logic to Util class when the same code needs to be called in multiple places to avoid the duplicate code. In this case, we only need to use this function here.

@snleee snleee added release-notes Referenced by PRs that need attention when compiling the next release notes refactor labels Sep 7, 2022
@snleee snleee merged commit 334c978 into apache:master Sep 7, 2022
@Jackie-Jiang
Copy link
Contributor

This is a great feature! Can you please help also update the pinot documentation about this new feature? https://github.com/pinot-contrib/pinot-docs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
refactor release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants