-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Deduplication #8708
[Feature] Deduplication #8708
Conversation
Codecov Report
@@ Coverage Diff @@
## master #8708 +/- ##
=============================================
- Coverage 69.81% 14.13% -55.69%
+ Complexity 4622 168 -4454
=============================================
Files 1735 1695 -40
Lines 91320 89448 -1872
Branches 13644 13440 -204
=============================================
- Hits 63759 12642 -51117
- Misses 23144 75868 +52724
+ Partials 4417 938 -3479
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
f684376
to
3d42ad5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job extracting several common properties from upsert and dedup
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class TableState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest modeling it as a util class (TableStateUtils
) and have one static method public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType)
. The _allSegmentsLoaded
can still be tracked within the metadata manager. We don't want this util class to track the loaded flag, instead it should always re-calculate the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_allSegmentsLoaded will need to present in both upsert and dedupe metadata classes separately. Here its with just one instance of this class. Is that okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I suggest modeling this class as a util and not tracking _allSegmentsLoaded
within this class is because we may reuse this util method for other features, and we don't want to couple this "check once then always true" semantic into this util method/class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Let's rename it to TableStateUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@@ -538,6 +539,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS | |||
} | |||
for (GenericRow transformedRow : reusedResult.getTransformedRows()) { | |||
try { | |||
// TODO(saurabh): we may have dropped the record due to dedup. Should we increment indexedMessageCount? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO it is okay to increase the value since we are just tracking the row count fed into the index()
. We should use another metrics to track the rows ignored because of the dedup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I think with the metric added, this is no longer needed
@@ -1307,6 +1310,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo | |||
Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns(); | |||
_fstIndexColumns = new ArrayList<>(fstIndexColumns); | |||
|
|||
boolean dedupEnabled = (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is redundant. It is implicit on the presence of partitionDedupMetadataManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@@ -111,7 +116,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { | |||
private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30; | |||
|
|||
private UpsertConfig.Mode _upsertMode; | |||
private boolean _isDedupEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flag is redundant, and is implicit on the presence of _tableDedupMetadataManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
Integer partitionGroupId = SegmentUtils | ||
.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0)); | ||
Preconditions.checkNotNull(partitionGroupId, String | ||
.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, | |
.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@@ -288,8 +303,8 @@ public void setTierConfigsList(List<TierConfig> tierConfigsList) { | |||
} | |||
|
|||
@JsonIgnore | |||
public UpsertConfig.HashFunction getHashFunction() { | |||
return _upsertConfig == null ? UpsertConfig.HashFunction.NONE : _upsertConfig.getHashFunction(); | |||
public HashFunction getHashFunction() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this method. The hash function can come from both upsert config and dedup config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
private final HashFunction _hashFunction; | ||
|
||
@JsonCreator | ||
public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) final boolean dedupEnabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) We don't usually put final
for local variables or parameters
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
Show resolved
Hide resolved
"Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing"); | ||
|
||
// specifically for upsert | ||
if (tableConfig.getUpsertConfig() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-null config doesn't mean it is enabled
if (tableConfig.getUpsertConfig() != null) { | |
if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
static void validateUpsertConfig(TableConfig tableConfig, Schema schema) { | ||
if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE) { | ||
static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema) { | ||
if (tableConfig.getUpsertMode() == UpsertConfig.Mode.NONE && tableConfig.getDedupConfig() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-null dedup config doesn't mean it is enabled. We either remove the dedupEnabled
field and treat non-null dedup as dedup-enabled, or check the flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea here is, if the config json doesn't have dedupeConfig field, no need to run the validaiton
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. We should also skip the validation when DedupConfig
is available, but dedup is not enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
d41aa14
to
f594261
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some non-blocking comments. Good job!
@@ -538,6 +539,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS | |||
} | |||
for (GenericRow transformedRow : reusedResult.getTransformedRows()) { | |||
try { | |||
// TODO(saurabh): we may have dropped the record due to dedup. Should we increment indexedMessageCount? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment
private final HashFunction _hashFunction; | ||
private boolean _allSegmentsLoaded; | ||
|
||
// TODO(saurabh) : We can replace this with a ocncurrent Set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Remove this TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
|
||
// TODO(saurabh) : We can replace this with a ocncurrent Set | ||
@VisibleForTesting | ||
final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>(); | |
final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
} | ||
|
||
public boolean checkRecordPresentOrUpdate(RecordInfo recordInfo, IndexSegment indexSegment) { | ||
if (!_allSegmentsLoaded) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move the if check into the waitTillAllSegmentsLoaded()
for thread safety. It is single threaded now, but in case that changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help me understand the thread safety concerns with this? I don't any, single threaded or multi threaded.
Infact, moving this if check inside waitTillAllSegmentsLoaded()
would lead to unnecessary serialization even when all segments have already been loaded. Even in single threaded env, that's a heavy lock acquisition cost, when _allSegmentsLoaded
is already true.
To the point where, I think we should reduce the critical section here https://github.com/apache/pinot/blob/master/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java#L74, once _allSegmentsLoaded
has been set to true, no need to enter a syncronized block.
Do let me know your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I misused the word thread-safety. I was suggesting adding an extra if check in the synchronized block to avoid potential unnecessary checks when multiple threads invoke waitTillAllSegmentsLoaded()
.
Good point on reducing the critical section in PartialUpsertHandler
. We should first check the flag, then enter the critical section
} | ||
|
||
if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, this)) { | ||
_logger.info("Dropped row {} since its primary key already exists", row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't log anything here, it can flood the log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
if (_serverMetrics != null) { | ||
_serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1); | ||
} | ||
return numDocsIndexed < _capacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor)
return numDocsIndexed < _capacity; | |
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class TableState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Let's rename it to TableStateUtils
"description" : "second", | ||
"secondsSinceEpoch": 1567205392 | ||
} | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
} | ||
}, | ||
"primaryKeyColumns": ["event_id"] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
} | ||
|
||
@VisibleForTesting | ||
public static Iterator<RecordInfo> getRecordInfoIterator(IndexSegment segment, List<String> primaryKeyColumns) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest returning an iterator of PrimaryKey
. For dedup, we don't need the docId and comparisonValue information from the RecordInfo
. Similar for the checkRecordPresentOrUpdate()
which can just take the PrimaryKey
object. This is not a blocker, so maybe put a TODO and address it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Didn't see any big impact of changing the method signature to accept PK, hence made that change too.
693659f
to
b56bfe2
Compare
@saurabhd336 Please add documentation for this in |
This PR adds support for enabling deduplication for realtime table, via a top level table config. At a high level, primaryKey (as defined in the table schema) hashes are stored into in-memory data structures, and each incoming row is validated against it. Duplicate rows are dropped. The expectation while using this feature, is for the stream to be partitioned by the primary key,
strictReplicaGroup
routing to be enabled and the configured stream consumer type to belowLevel
. These requirements are therefore mandated via tableConfig API's input validations.Design doc: https://docs.google.com/document/d/17sOSRQ1slff30z7jDc0ec5qKwv0xSfPkDjpMOY07POQ/edit?usp=sharing
How to use
https://docs.pinot.apache.org/basics/data-import/dedup