Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a freshness based consumption status checker #9244

Merged
merged 12 commits into from
Aug 27, 2022
Merged

add a freshness based consumption status checker #9244

merged 12 commits into from
Aug 27, 2022

Conversation

jadami10
Copy link
Contributor

This is a new feature to allow setting a freshness threshold for servers to catch up to before going healthy and serving queries.

  • This will supercede the offset based checker if that is set
  • This will wait until:
    • your current offset is within 1 of the latest offset
    • your latest ingested timestamp is non negative and within the desired threshold
  • will not work if you set a 0 or negative freshness

This still requires and respects the overall realtime "wait" variable so we never get stuck forever.

This was tested in an internal QA cluster. We actually hit all cases:

  • partitions with 0 offset with no messages were caught up
  • partitions in error state waited the full time until going healthy
  • partitions that were not fresh but then caught up

@navina
Copy link
Contributor

navina commented Aug 18, 2022

@jadami10 Iiuc, this check basically happens during server startup, but pertains to a table catching up with some point in its source. This approach tightly couples table query readiness and server liveness. Is there a reason why this "freshness based consumption catch-up" can't be done at a per table-level?

@jadami10
Copy link
Contributor Author

Hey @navina, thank you for looking!

I agree that this would likely be better as a table level check and maybe even happen continuously through the lifecycle of the server so the broker is aware what servers are fresh enough to server data (I think Kafka does something similar with its brokers).

That said, I think Pinot currently lacks some of the protection needed to do something like that. I know at the scale of our events, even 1 table catching up utilizes a large amount of CPU for example. And while tables can have their consumption rate limited, it's up to the user to make sure that's tuned correctly rather than being able to rely on Pinot to ensure there's always enough CPU to serve queries quickly.

This was also much easier to implement as a starting point. Kudos to whoever wrote the Offset based startup checker because I was able to follow that pattern to easily do this as well. I realize the description also lacked some motivation. But we've seen cases where the Offset based checker take 10-15 minutes to catchup to that offset at server start, the server starts serving queries, but then we are 10-15 minutes behind still and the server is slow to serve queries until it actually finishes catching up.

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Aug 19, 2022
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a release-note section to the PR description about the new added config keys?

public static final boolean DEFAULT_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER = false;
public static final String CONFIG_OF_STARTUP_REALTIME_MIN_FRESHNESS_MS =
"pinot.server.starter.realtimeMinFreshnessMs";
public static final int DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a stream with high throughput, can this make it never able to catch up? I think a higher default is better, e.g. 10s

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default of 0 is actually "off". but given the feedback above, i've set this to 10s, and this will get used if a user misconfigures this

@@ -1440,6 +1440,17 @@ private void fetchLatestStreamOffset() {
}
}

public StreamPartitionMsgOffset fetchLatestStreamOffset(Long maxWaitTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor)

Suggested change
public StreamPartitionMsgOffset fetchLatestStreamOffset(Long maxWaitTimeMs) {
public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) {

@@ -1440,6 +1440,17 @@ private void fetchLatestStreamOffset() {
}
}

public StreamPartitionMsgOffset fetchLatestStreamOffset(Long maxWaitTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we are always using 5s as the wait time. So we should either:

  1. Make fetchLatestStreamOffset() public and use it
  2. Remove fetchLatestStreamOffset(), and add a DEFAULT_MAX_WAIT_TIME_MS constant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is already a public method called getLatestStreamOffsetAtStartupTime in LLRealtimeSegmentDataManager. This is also used by the offset checker. I think it is the same information that you are looking for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed on making it public

.createPartitionMetadataProvider(_clientId, _partitionGroupId)) {
return metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs);
} catch (Exception e) {
_segmentLogger.warn("Cannot fetch latest stream offset for clientId {} and partitionGroupId {}", _clientId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also log the maxWaitTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

continue;
}
LLRealtimeSegmentDataManager rtSegmentDataManager = (LLRealtimeSegmentDataManager) segmentDataManager;
Long now = now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Change boxed Long to primitive, same for other places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, done

// constructor parameters
private final InstanceDataManager _instanceDataManager;
private final Set<String> _consumingSegments;
private final Long _minFreshnessMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Same for other boxed Long

Suggested change
private final Long _minFreshnessMs;
private final long _minFreshnessMs;

@navina
Copy link
Contributor

navina commented Aug 19, 2022

I agree that this would likely be better as a table level check and maybe even happen continuously through the lifecycle of the server so the broker is aware what servers are fresh enough to server data (I think Kafka does something similar with its brokers).

Hmm.. I didn't know Kafka does something like this. I know that it tries to load the active partitions. However, a while back , it was made asynchronous. Perhaps I need to go back and see the latest code.

That said, I think Pinot currently lacks some of the protection needed to do something like that. I know at the scale of our events, even 1 table catching up utilizes a large amount of CPU for example. And while tables can have their consumption rate limited, it's up to the user to make sure that's tuned correctly rather than being able to rely on Pinot to ensure there's always enough CPU to serve queries quickly.

Thanks for explaining. I see where you are coming from. Basically, the user ends up tuning per-table level rate limit according to steady state and not peak / catchup traffic. I see the imbalance that can happen during startup on a cluster with multiple tables.

But we've seen cases where the Offset based checker take 10-15 minutes to catchup to that offset at server start, the server starts serving queries, but then we are 10-15 minutes behind still and the server is slow to serve queries until it actually finishes catching up.

I see. My concern was around this checker not allowing the server to start during the startup timeout. I have noticed cases where sometimes a segment doesn't load and the status checker just hangs while the consumer is going full throttle to catchup. Eventually, the server times-out and reboots. That's why I felt it is better to de-couple the start-up validation from server health.

We can tackle this in a separate PR. But thanks for the context!

@@ -1440,6 +1440,17 @@ private void fetchLatestStreamOffset() {
}
}

public StreamPartitionMsgOffset fetchLatestStreamOffset(Long maxWaitTimeMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is already a public method called getLatestStreamOffsetAtStartupTime in LLRealtimeSegmentDataManager. This is also used by the offset checker. I think it is the same information that you are looking for.

Comment on lines 250 to 256
if (isFreshnessStatusCheckerEnabled && realtimeMinFreshnessMs <= 0) {
LOGGER.warn("Realtime min freshness {} must be > 0. Will not setup freshness based checker.", realtimeMinFreshnessMs);
}
if (isFreshnessStatusCheckerEnabled && isOffsetBasedConsumptionStatusCheckerEnabled) {
LOGGER.warn("Offset and Freshness checkers both enabled. Will only setup freshness based checker.");
}
boolean checkRealtimeFreshness = realtimeMinFreshnessMs > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition checks are confusing.
The first condition checks if a valid realtimeFreshness is set or not. The second condition checks if both checks are enabled and warns saying it will choose freshness based checker over offset based checker.

so, if my config has offsetBasedConsumptionStatusChecker = true, freshnessBasedConsumptionStatusCheckerEnabled = true and realtimeMinFreshness = -50, the logs will be very confusing.

It seems simpler to do this: if both are setup, choose freshness based check and use default freshness value if the minFreshness is not configured correctly. Also, the check for the correct config value can be moved further down Line 298 if (isFreshnessStatusCheckerEnabled) {

nit: the boolean checkRealtimeFreshness is unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, it's definitely confusing. I've changed this to

  • use the default freshness setting if it's not set correctly
  • to always use the freshness checker regardless of other configs

Copy link
Contributor

@sajjad-moradi sajjad-moradi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with offset based status checker is that we don't know how many events from the stream we're behind. That's because the StreamPartitionMsgOffset interface only tells us if we're caught up or we're behind, but it it doesn't tell us by how many events we are behind. That was the reason we couldn't define a threshold for catchup when offset based status checker was added.

Anyways, the current PR has duplicate code between OffsetBasedStatusChecker and FreshnessBasedStatusChecker. I think by making a small change to the OffsetBasedStatusChecker, we can achieve the goal of this PR. We can pass in a config for freshness threshold to OffsetBasedStatusChecker and where we get the stream latest offset at startup, if the config is provided, we can get stream latest offset at the current moment and check the freshness then.

Comment on lines 68 to 69
long currentOffsetLong = ((LongMsgOffset) currentOffset).getOffset();
long latestOffsetLong = ((LongMsgOffset) latestOffset).getOffset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This we can't do. Not all streams have long offsets like Kafka does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, i missed the type checking before it. We already do this in LLRealtimeSegmentDataManager, and I'd prefer to avoid changing the StreamPartitionMsgOffset interface in this PR

@mcvsubbu
Copy link
Contributor

The issue with offset based status checker is that we don't know how many events from the stream we're behind. That's because the StreamPartitionMsgOffset interface only tells us if we're caught up or we're behind, but it it doesn't tell us by how many events we are behind. That was the reason we couldn't define a threshold for catchup when offset based status checker was added.

Anyways, the current PR has duplicate code between OffsetBasedStatusChecker and FreshnessBasedStatusChecker. I think by making a small change to the OffsetBasedStatusChecker, we can achieve the goal of this PR. We can pass in a config for freshness threshold to OffsetBasedStatusChecker and where we get the stream latest offset at startup, if the config is provided, we can get stream latest offset at the current moment and check the freshness then.

Good point. Can we enhance the stream interface to decide the lag and then emit a metric ?

@jadami10
Copy link
Contributor Author

The issue with offset based status checker is that we don't know how many events from the stream we're behind. That's because the StreamPartitionMsgOffset interface only tells us if we're caught up or we're behind, but it it doesn't tell us by how many events we are behind. That was the reason we couldn't define a threshold for catchup when offset based status checker was added.

Even if we had this, I'm not sure it would help. I feel most folks set their SLAs based on time freshness, not "number of events". So even if we knew we were 10,000 events behind, that might be fine if you have 1,000 events per second but not fine if you have 10 events per seconds.

Anyways, the current PR has duplicate code between OffsetBasedStatusChecker and FreshnessBasedStatusChecker. I think by making a small change to the OffsetBasedStatusChecker, we can achieve the goal of this PR. We can pass in a config for freshness threshold to OffsetBasedStatusChecker and where we get the stream latest offset at startup, if the config is provided, we can get stream latest offset at the current moment and check the freshness then.

Fair feedback. I opted to use a parent class to keep the functionality separate rather than try to merge them. I feel like this is a better approach at the moment to keep things more easily testable and singularly responsible.

Good point. Can we enhance the stream interface to decide the lag and then emit a metric ?

I'd really prefer not to make such a big change in this PR if it's not necessary.

_segmentLogger
.info("Stopping consumption due to time limit start={} now={} numRowsConsumed={} numRowsIndexed={}",
_startTimeMs, now, _numRowsConsumed, _numRowsIndexed);
_segmentLogger.info(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for all the changes here. Reformat Code in the IDE did all this. happy to undo it if prefered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please undo the reformatting? It's a bit hard to follow the code change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sajjad-moradi
Copy link
Contributor

I feel most folks set their SLAs based on time freshness, not "number of events". So even if we knew we were 10,000 events behind, that might be fine if you have 1,000 events per second but not fine if you have 10 events per seconds.

The purpose of this status checker is to consume as many events as possible before enabling query execution. We shouldn't care if the messages are fresh or not. For example, let's say the freshness threshold is 15s and assume there's a huge spike in the topic in last 15s and there are thousands of events available in the topic. The ingestion time for these events is less than the threshold so the status checker announces the consumption has caught up while there are a lot of messages available in the topic to be consumed.

Anyways, the current PR has duplicate code between OffsetBasedStatusChecker and FreshnessBasedStatusChecker. I think by making a small change to the OffsetBasedStatusChecker, we can achieve the goal of this PR. We can pass in a config for freshness threshold to OffsetBasedStatusChecker and where we get the stream latest offset at startup, if the config is provided, we can get stream latest offset at the current moment and check the freshness then.

Fair feedback. I opted to use a parent class to keep the functionality separate rather than try to merge them. I feel like this is a better approach at the moment to keep things more easily testable and singularly responsible.

I agree it's easier to develop and test this way, but in the long run duplicated code has a high maintenance cost. Let's iron out the issues and consolidate the two classes into one.

@jadami10
Copy link
Contributor Author

The purpose of this status checker is to consume as many events as possible before enabling query execution. We shouldn't care if the messages are fresh or not. For example, let's say the freshness threshold is 15s and assume there's a huge spike in the topic in last 15s and there are thousands of events available in the topic. The ingestion time for these events is less than the threshold so the status checker announces the consumption has caught up while there are a lot of messages available in the topic to be consumed.

Others can chime in here as well about their SLAs/use cases. But for our use case, we promise an internal freshness SLA, not anything else based on number of events. So if there's thousands of events that are < N seconds old, we are fine starting the system up. I assumed this was generic enough of a streaming system SLA to provide as a feature in Pinot.

I agree it's easier to develop and test this way, but in the long run duplicated code has a high maintenance cost. Let's iron out the issues and consolidate the two classes into one.

There's no longer any duplicated code as it's in a parent class, so I don't see the value in merging the classes when they'll be controlled by separate config values anyway.

@sajjad-moradi
Copy link
Contributor

Others can chime in here as well about their SLAs/use cases. But for our use case, we promise an internal freshness SLA, not anything else based on number of events. So if there's thousands of events that are < N seconds old, we are fine starting the system up. I assumed this was generic enough of a streaming system SLA to provide as a feature in Pinot.

I thought your requirement is to not have many uningested messages in the topic because ingestion interferes with query execution:

we've seen cases where the Offset based checker take 10-15 minutes to catchup to that offset at server start, the server starts serving queries, but then we are 10-15 minutes behind still and the server is slow to serve queries until it actually finishes catching up

@jadami10
Copy link
Contributor Author

I thought your requirement is to not have many uningested messages in the topic because ingestion interferes with query execution:

Both are a concern, but the ingestion lag is much more of a problem than the slow execution. For a topic with ~4,000 events/second, it takes us ~1-2 minutes per hour of ingestion lag to catch up when a server is restarted. Our segment seal threshold is 6 hours, so each restart takes ~3-6 minutes to reach the offset at startup. That brief period where we're then serving queries at 3-6 minutes behind is a bigger issue than the latency. Using freshness as the threshold happens to help with both. Even if we start serving queries at 15 seconds behind, that's relatively few remaining events to consume, so I don't expect query latency to be impacted any longer.

@sajjad-moradi
Copy link
Contributor

I thought your requirement is to not have many uningested messages in the topic because ingestion interferes with query execution:

Both are a concern, but the ingestion lag is much more of a problem than the slow execution. For a topic with ~4,000 events/second, it takes us ~1-2 minutes per hour of ingestion lag to catch up when a server is restarted. Our segment seal threshold is 6 hours, so each restart takes ~3-6 minutes to reach the offset at startup. That brief period where we're then serving queries at 3-6 minutes behind is a bigger issue than the latency. Using freshness as the threshold happens to help with both. Even if we start serving queries at 15 seconds behind, that's relatively few remaining events to consume, so I don't expect query latency to be impacted any longer.

Makes sense. Even in the case of a huge spike in the latest 15s, consumption rate limiter doesn't allow immediate consumption which in turns prevents latency in query execution.

if (currentOffset.compareTo(latestOffset) == 0) {
return true;
}
if (currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the interface to do this. We don't want any Kafka specific code in the pinot. It is not too hard to change the interface to add a comparator, with a default method.

Please, let us keep it clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of how else to do this even in the interface. This issue only arises with Kafka because "latest offset" there is actually +1 of whatever your current offset is. The kinesis and pulsar implementations use a string and MessageId respectively to store the offset.

I can make a getPreviousOffset method in the LLRealtimeSegmentDataManager, but then I just move this logic there.

I can put a fetchPreviousOffset in StreamMetadataProvider, but then I'll just have to raise a NotImplementedException by default and later go update our own custom StreamMetadataProvider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one option: add a method called isNextOffset(streamMsgOffset) you can also add a method (if need be) like isNextOffsetSupported() or some such, so that you can make sure you call it only if it is supported. the default can throw exceptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sajjad-moradi Seems we will have the same problem in the offset based status checker when the ingestion rate is very low? If we never consume anything since the server restarts, it will never reach the latest offset since it is exclusive.

I feel we run into this issue because of the fetchStreamPartitionOffset() implementation for kafka 2.0. For all other implementations, it will return the last available offset, but for kafka 2.0 it returns the next offset after the last available offset. We should fix the contract for this method.

IMO we should have a way to get the last available offset, and return null if there is no message available in the stream. That's the only way to ensure the consumption already catches up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a bigger issue than I want to tackle in this PR. I've removed the kafka specific code for now. We have our own stream ingestion plugin where I will update the fetchStreamPartitionOffset to return - 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sajjad-moradi Seems we will have the same problem in the offset based status checker when the ingestion rate is very low? If we never consume anything since the server restarts, it will never reach the latest offset since it is exclusive.

I feel we run into this issue because of the fetchStreamPartitionOffset() implementation for kafka 2.0. For all other implementations, it will return the last available offset, but for kafka 2.0 it returns the next offset after the last available offset. We should fix the contract for this method.

IMO we should have a way to get the last available offset, and return null if there is no message available in the stream. That's the only way to ensure the consumption already catches up

That's correct. I created this issue for it: #9291

@codecov-commenter
Copy link

codecov-commenter commented Aug 24, 2022

Codecov Report

Merging #9244 (26bda92) into master (0adf5ef) will decrease coverage by 2.25%.
The diff coverage is 47.84%.

❗ Current head 26bda92 differs from pull request most recent head 5e3e804. Consider uploading reports for the commit 5e3e804 to get more accurate results

@@             Coverage Diff              @@
##             master    #9244      +/-   ##
============================================
- Coverage     63.64%   61.39%   -2.26%     
+ Complexity     4984     4548     -436     
============================================
  Files          1809     1854      +45     
  Lines         96993    99160    +2167     
  Branches      14832    15101     +269     
============================================
- Hits          61735    60882     -853     
- Misses        30707    33651    +2944     
- Partials       4551     4627      +76     
Flag Coverage Δ
integration1 26.24% <16.64%> (?)
unittests1 67.09% <66.85%> (-0.02%) ⬇️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...er/api/access/ZkBasicAuthAccessControlFactory.java 0.00% <ø> (ø)
...ache/pinot/controller/api/resources/Constants.java 21.05% <ø> (-21.06%) ⬇️
...sources/PinotAccessControlUserRestletResource.java 0.00% <ø> (-60.61%) ⬇️
...ler/api/resources/PinotControllerAuthResource.java 0.00% <ø> (ø)
...oller/api/resources/PinotRunningQueryResource.java 0.00% <0.00%> (ø)
...ller/api/resources/PinotSchemaRestletResource.java 17.72% <0.00%> (-50.70%) ⬇️
...ces/PinotSegmentUploadDownloadRestletResource.java 50.41% <ø> (+9.58%) ⬆️
...oller/api/resources/PinotTableRestletResource.java 22.25% <0.00%> (-36.81%) ⬇️
...ler/api/resources/TableConfigsRestletResource.java 0.00% <0.00%> (-75.44%) ⬇️
...che/pinot/core/query/scheduler/QueryScheduler.java 84.56% <ø> (+14.23%) ⬆️
... and 607 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@jadami10
Copy link
Contributor Author

@mcvsubbu and @sajjad-moradi, are you ok to accept this PR as is?

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We can address the latest offset inconsistency problem for kafka 2.0 separately

_latestStreamOffsetAtStartupTime =
metadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, /*maxWaitTimeMs*/5000);
public StreamPartitionMsgOffset fetchLatestStreamOffset() {
long maxWaitTimeMs = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this waitTime to be an argument maxWaitTimeMillis, since we seem to be calling it from different places and each may want to set their own wait time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to leave as is for now since this is how it was working. It's a minimal change if/when anyone else needs it configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not worried about the minimal change afterwards. We have turned what was a private method in a class an changed it to a public method. At the minimum, the javadoc could indicate that there is 5s wait. At best, take a timeout parameter so that any other class that calls it does it knowingly.

What we don't want is another class calling this method without knowing that there could be a 5s delay and get surprised when there is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair, updated to make it a function parameter

@@ -386,6 +386,15 @@ public static class Server {
"pinot.server.starter.enableRealtimeOffsetBasedConsumptionStatusChecker";
public static final boolean DEFAULT_ENABLE_REALTIME_OFFSET_BASED_CONSUMPTION_STATUS_CHECKER = false;

public static final String CONFIG_OF_ENABLE_REALTIME_FRESHNESS_BASED_CONSUMPTION_STATUS_CHECKER =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ensure that exactly one of the config is set (maybe in the starter), and throw exception if not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it this way in a previous iteration but went back for a few reasons:

  • @navina suggested this approach
  • the behaviors here aren't orthogonal. The freshness checker is strictly more robust than the offset checker. If you enable both, getting the more robust one is perfectly acceptable from an operational standpoint (in my opinion)
  • both of these are disabled by default. anyone running pinot today is at most using the offset based checker. so they would be taking a tangible action to enable the freshness checker meaning it's what they wanted enabled anyway. Having the server fail at startup because of this seems like poor UX

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if/when someone comes up with yet another way of doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone wants an orthogonal way to do this, then it's worth further discussion there. The previous code was already doing something like this:

  • if nothing is set, just start
  • if realtime wait is set, wait a static amount of time before starting
  • if offset is set, do nothing
  • if offset and realtime wait is set, do the offset checker

At this point we've somewhat exhausted the "am I caught up" methods with either freshness or offset based. If someone eventually wants something more complex like "freshness and offset based together", we can migrate this config to dynamically point to the class name of the offset checker they want.

Copy link
Contributor

@sajjad-moradi sajjad-moradi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

continue;
}

if (isSegmentCaughtUp(segName, segmentDataManager)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can change the type of segmentDataManager passed into the isSegmentCaughtUp to LLRealtimeSegmentDataManager so there's no need to cast it in each subclass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants