Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Deduplication #8708

Merged
merged 19 commits into from
Jun 2, 2022
Merged

[Feature] Deduplication #8708

merged 19 commits into from
Jun 2, 2022

Conversation

saurabhd336
Copy link
Contributor

@saurabhd336 saurabhd336 commented May 16, 2022

This PR adds support for enabling deduplication for realtime table, via a top level table config. At a high level, primaryKey (as defined in the table schema) hashes are stored into in-memory data structures, and each incoming row is validated against it. Duplicate rows are dropped. The expectation while using this feature, is for the stream to be partitioned by the primary key, strictReplicaGroup routing to be enabled and the configured stream consumer type to be lowLevel. These requirements are therefore mandated via tableConfig API's input validations.

Design doc: https://docs.google.com/document/d/17sOSRQ1slff30z7jDc0ec5qKwv0xSfPkDjpMOY07POQ/edit?usp=sharing

How to use

https://docs.pinot.apache.org/basics/data-import/dedup

@saurabhd336 saurabhd336 marked this pull request as draft May 16, 2022 11:26
@codecov-commenter
Copy link

codecov-commenter commented May 16, 2022

Codecov Report

Merging #8708 (38f4b55) into master (9abf15f) will decrease coverage by 55.68%.
The diff coverage is 0.00%.

@@              Coverage Diff              @@
##             master    #8708       +/-   ##
=============================================
- Coverage     69.81%   14.13%   -55.69%     
+ Complexity     4622      168     -4454     
=============================================
  Files          1735     1695       -40     
  Lines         91320    89448     -1872     
  Branches      13644    13440      -204     
=============================================
- Hits          63759    12642    -51117     
- Misses        23144    75868    +52724     
+ Partials       4417      938     -3479     
Flag Coverage Δ
integration1 ?
integration2 ?
unittests1 ?
unittests2 14.13% <0.00%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...a/org/apache/pinot/common/metrics/ServerGauge.java 0.00% <0.00%> (-95.66%) ⬇️
...a/org/apache/pinot/common/metrics/ServerMeter.java 0.00% <0.00%> (-100.00%) ⬇️
...he/pinot/common/utils/config/TableConfigUtils.java 0.00% <0.00%> (-85.93%) ⬇️
...manager/realtime/LLRealtimeSegmentDataManager.java 0.00% <0.00%> (-71.12%) ⬇️
...ata/manager/realtime/RealtimeTableDataManager.java 0.00% <0.00%> (-67.76%) ⬇️
...ent/local/dedup/PartitionDedupMetadataManager.java 0.00% <0.00%> (ø)
...segment/local/dedup/TableDedupMetadataManager.java 0.00% <0.00%> (ø)
...l/indexsegment/immutable/ImmutableSegmentImpl.java 0.00% <0.00%> (-68.12%) ⬇️
...local/indexsegment/mutable/MutableSegmentImpl.java 0.00% <0.00%> (-59.29%) ⬇️
...ent/local/realtime/impl/RealtimeSegmentConfig.java 0.00% <0.00%> (-92.37%) ⬇️
... and 1380 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 9abf15f...38f4b55. Read the comment docs.

@saurabhd336 saurabhd336 force-pushed the dedupConfig branch 2 times, most recently from f684376 to 3d42ad5 Compare May 23, 2022 07:13
@saurabhd336 saurabhd336 marked this pull request as ready for review May 23, 2022 08:43
@saurabhd336 saurabhd336 changed the title (WIP) Dedup config Dedup config May 24, 2022
@KKcorps KKcorps requested a review from Jackie-Jiang May 25, 2022 06:04
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job extracting several common properties from upsert and dedup

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest modeling it as a util class (TableStateUtils) and have one static method public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType). The _allSegmentsLoaded can still be tracked within the metadata manager. We don't want this util class to track the loaded flag, instead it should always re-calculate the state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_allSegmentsLoaded will need to present in both upsert and dedupe metadata classes separately. Here its with just one instance of this class. Is that okay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I suggest modeling this class as a util and not tracking _allSegmentsLoaded within this class is because we may reuse this util method for other features, and we don't want to couple this "check once then always true" semantic into this util method/class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Let's rename it to TableStateUtils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -538,6 +539,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
}
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
try {
// TODO(saurabh): we may have dropped the record due to dedup. Should we increment indexedMessageCount?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it is okay to increase the value since we are just tracking the row count fed into the index(). We should use another metrics to track the rows ignored because of the dedup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I think with the metric added, this is no longer needed

@@ -1307,6 +1310,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns();
_fstIndexColumns = new ArrayList<>(fstIndexColumns);

boolean dedupEnabled = (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is redundant. It is implicit on the presence of partitionDedupMetadataManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -111,7 +116,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;

private UpsertConfig.Mode _upsertMode;
private boolean _isDedupEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is redundant, and is implicit on the presence of _tableDedupMetadataManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Integer partitionGroupId = SegmentUtils
.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
Preconditions.checkNotNull(partitionGroupId, String
.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName,
.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@@ -288,8 +303,8 @@ public void setTierConfigsList(List<TierConfig> tierConfigsList) {
}

@JsonIgnore
public UpsertConfig.HashFunction getHashFunction() {
return _upsertConfig == null ? UpsertConfig.HashFunction.NONE : _upsertConfig.getHashFunction();
public HashFunction getHashFunction() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this method. The hash function can come from both upsert config and dedup config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

private final HashFunction _hashFunction;

@JsonCreator
public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) final boolean dedupEnabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) We don't usually put final for local variables or parameters

"Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing");

// specifically for upsert
if (tableConfig.getUpsertConfig() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-null config doesn't mean it is enabled

Suggested change
if (tableConfig.getUpsertConfig() != null) {
if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

static void validateUpsertConfig(TableConfig tableConfig, Schema schema) {
if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) {
static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) {
if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE && tableConfig.getDedupConfig() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-null dedup config doesn't mean it is enabled. We either remove the dedupEnabled field and treat non-null dedup as dedup-enabled, or check the flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea here is, if the config json doesn't have dedupeConfig field, no need to run the validaiton

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. We should also skip the validation when DedupConfig is available, but dedup is not enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

@saurabhd336 saurabhd336 force-pushed the dedupConfig branch 7 times, most recently from d41aa14 to f594261 Compare June 1, 2022 09:01
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some non-blocking comments. Good job!

@@ -538,6 +539,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
}
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
try {
// TODO(saurabh): we may have dropped the record due to dedup. Should we increment indexedMessageCount?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment

private final HashFunction _hashFunction;
private boolean _allSegmentsLoaded;

// TODO(saurabh) : We can replace this with a ocncurrent Set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Remove this TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack


// TODO(saurabh) : We can replace this with a ocncurrent Set
@VisibleForTesting
final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>();
final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}

public boolean checkRecordPresentOrUpdate(RecordInfo recordInfo, IndexSegment indexSegment) {
if (!_allSegmentsLoaded) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the if check into the waitTillAllSegmentsLoaded() for thread safety. It is single threaded now, but in case that changes

Copy link
Contributor Author

@saurabhd336 saurabhd336 Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand the thread safety concerns with this? I don't any, single threaded or multi threaded.

Infact, moving this if check inside waitTillAllSegmentsLoaded() would lead to unnecessary serialization even when all segments have already been loaded. Even in single threaded env, that's a heavy lock acquisition cost, when _allSegmentsLoaded is already true.

To the point where, I think we should reduce the critical section here https://github.com/apache/pinot/blob/master/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java#L74, once _allSegmentsLoaded has been set to true, no need to enter a syncronized block.

Do let me know your thoughts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misused the word thread-safety. I was suggesting adding an extra if check in the synchronized block to avoid potential unnecessary checks when multiple threads invoke waitTillAllSegmentsLoaded().
Good point on reducing the critical section in PartialUpsertHandler. We should first check the flag, then enter the critical section

}

if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, this)) {
_logger.info("Dropped row {} since its primary key already exists", row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't log anything here, it can flood the log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

if (_serverMetrics != null) {
_serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1);
}
return numDocsIndexed < _capacity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor)

Suggested change
return numDocsIndexed < _capacity;
return true;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Let's rename it to TableStateUtils

"description" : "second",
"secondsSinceEpoch": 1567205392
}
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}
},
"primaryKeyColumns": ["event_id"]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}

@VisibleForTesting
public static Iterator<RecordInfo> getRecordInfoIterator(IndexSegment segment, List<String> primaryKeyColumns) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest returning an iterator of PrimaryKey. For dedup, we don't need the docId and comparisonValue information from the RecordInfo. Similar for the checkRecordPresentOrUpdate() which can just take the PrimaryKey object. This is not a blocker, so maybe put a TODO and address it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Didn't see any big impact of changing the method signature to accept PK, hence made that change too.

@saurabhd336 saurabhd336 force-pushed the dedupConfig branch 2 times, most recently from 693659f to b56bfe2 Compare June 2, 2022 05:57
@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Jun 2, 2022
@Jackie-Jiang Jackie-Jiang changed the title Dedup config [Feature] Deduplication Jun 2, 2022
@Jackie-Jiang Jackie-Jiang merged commit 2b82366 into apache:master Jun 2, 2022
@KKcorps
Copy link
Contributor

KKcorps commented Jun 3, 2022

@saurabhd336 Please add documentation for this in Import Data section. You can create a new page titled Stream Ingestion with Deduplication

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants