Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flushless producer supporting both comparable and non comparable offsets #873

Conversation

shrinandthakkar
Copy link
Collaborator

@shrinandthakkar shrinandthakkar commented Nov 30, 2021

In Mirrormaking, the connector job polls a list of records from the source and directs those records to be sent out to the destination via the transport provider using the flushless event producer. The flushless event producer keeps track of the in-flight messages and the acknowledged checkpoints for each source partition. These checkpoints are then used by the connector to commit the safe offsets to the source via the consumer API. And this is how the whole mirror maker process works in brief.

Before this change, this flushless event producer supported any pub-sub framework that has comparable type offsets. The offsets were compared by using a priority queue and based on that the checkpoints were moved in the flushless event producer's handler.

This new change provides support for both comparable offsets by using a priority queue and comparing offsets to move the checkpoint, and noncomparable offsets by using a simple deque-based approach that removes the dependency on comparable offsets for any underlying pub-sub framework.


Example Scenario:

Let's suppose that the connector reads messages from a source topic T, partition P from offsets L1, L2, L3 (in order) and the flushless event producer sends those messages out to the destination.

We receive a callback for the events in the order of L2, L1, L3 confirming that it is produced successfully in that order.

1. Flushless event producer with Comparable offsets

  • Callback for L2
    { L1, L2, L3 }
    Inflights: L1, L3
    Acked: L2

    Since there is no acked checkpoint that is smaller than the oldest inflight checkpoint, there is no safe checkpoint to send back to the connector here.

    Safe checkpoint: <none>

  • Callback for L1
    {L1, L2, L3}
    Inflights: L3
    Acked: L1, L2

    Following the algorithm the largest acked checkpoint that is still smaller than the oldest inflight checkpoint is L2. The FlushlessEventProducerHandler removes all checkpoints in acked list that are smaller than the oldest inflight checkpoint, and then returns the largest (L2) to the connector as the safe checkpoint for topic T partition P.

    Safe checkpoint: L2

  • Callback for L3
    {L3}
    Inflights: {}
    Acked: L3

    Safe checkpoint: L3

2. Flushless event producer with Non Comparable offsets

  • Callback for L2
    { L1, L2, L3 }

    Safe checkpoint: <none>

  • Callback for L1
    {L1, L2, L3}

    Since there is a contiguous prefix of acknowledged checkpoints, update the safe checkpoint to be the last removed checkpoint from the deque.

    Safe checkpoint: L2

  • Callback for L3
    {L3}

    Since there is a contiguous prefix of acknowledged checkpoints, update the safe checkpoint to be the last removed checkpoint from the deque.

    Safe checkpoint: L3

@@ -32,10 +32,38 @@
private static final String TOPIC = "MyTopic";
private static final Random RANDOM = new Random();

private static final String OFFSET_CHECKPOINT_TRACKING_STRATEGY_WITH_COMPARABLE_OFFSETS =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of redefining, you can reference these configs.

@@ -110,6 +114,9 @@ public KafkaBasedConnectorConfig(Properties properties) {
INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID, DEFAULT_INCLUDE_DATASTREAM_NAME_IN_CONSUMER_CLIENT_ID);
_enablePartitionAssignment = verifiableProperties.getBoolean(ENABLE_PARTITION_ASSIGNMENT, Boolean.FALSE);

_callbackStatusStrategy = verifiableProperties.getString(CONFIG_CALLBACK_STATUS_STRATEGY_FACTORY_CLASS,
CallbackStatusWithComparableOffsetsFactory.class.getName());

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add verification as well here and save the object rather then saving the name and putting the onus on the caller to verify and throw exception.

Comment on lines 31 to 35
// Hashset storing all the records which are yet to be acked
private final Set<T> _inFlight = Collections.synchronizedSet(new LinkedHashSet<>());

// Deque to store all the messages which are inflight until the last consumer checkpoint is made
private final Deque<T> _inFlightUntilLastConsumerCheckpoint = new ArrayDeque<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear how are both of these different. Can you please explain the logic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list maintains the records which are inflight until the latest checkpoint is made at the consumer side,

Example :
If we have 3 records for a topic partition that are consumed as L1, L2, L3, and we receive ack for L2, the _inFlights set would have 2 records, since we already got produce ack for L2, but the _inFlightUntilLastConsumerCheckpoint would have all 3 records as we still don't have a new consumer checkpoint to make until everything before L2 is acked from producer side.

_currentCheckpoint = _inFlightUntilLastConsumerCheckpoint.pollFirst();

if (!_acked.remove(_currentCheckpoint)) {
LOG.error("Internal state error; could not remove checkpoint {}", _currentCheckpoint);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: How will currentCheckpoints will look in case of non-comparable offsets? How will we debug the issues, i.e. stuck partitions? Offset numbers were easy to compare.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkpoints in non-comparable scenarios could be of byte type.

Copy link
Collaborator

@jzakaryan jzakaryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@vmaheshw vmaheshw self-requested a review January 5, 2022 19:20
@@ -6,11 +6,16 @@
package com.linkedin.datastream.server.callbackstatus;

/**
* Interface for CallbackStatus Factories
* Factory implementation for Callback Status With Non Comparable Offsets
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lower case for Callback Status With Non Comparable Offsets

@jzakaryan jzakaryan merged commit 039c28c into linkedin:master Jan 5, 2022
vmaheshw pushed a commit to vmaheshw/brooklin that referenced this pull request Mar 1, 2022
…ets (linkedin#873)

* Flushless producer without comparing offsets

* Flushless Producer Supporting Both Comparable and Non Comparable Offsets for Ack-ing

* Fix comments

* Making methods abstract

* Created config support to use both strategies and added tests

* Address comments

* Address comments

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn1.linkedin.biz>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants