Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BMM Restart Improvements: Bug fixes and logging improvements #926

Merged
merged 3 commits into from
Mar 2, 2023

Conversation

jzakaryan
Copy link
Collaborator

@jzakaryan jzakaryan commented Feb 28, 2023

This pull request addresses bugs and makes logging improvements in Assignment Tokens Feature (#919 #921 #922 #924)

  • Addressed a bug where a stream with no tasks causes the assignment handler to fail.
  • Addressed a bug where for tokens assigned to itself, the leader would print "Claiming token" instead of "Revoking token".
  • Addressed a bug which was causing a failure in followers' code for inferring stopping streams.
  • Added additional log messages for debugging.

@@ -268,8 +271,7 @@ public void start() {
// Initializing executor services
_scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build());
// TODO Assess whether having a single threaded executor for token claim tasks is sufficient or it will be exhausted
_tokenClaimExecutor = Executors.newSingleThreadExecutor(
_tokenClaimExecutor = Executors.newFixedThreadPool(TOKEN_CLAIM_THREAD_POOL_SIZE,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single threaded token claim executor will get backed up in the following cases:

  • The leader will try to schedule 2 tasks in this executor upon a stream stop: (1) a task for polling the Zookeeper to check whether stop has propagated and completed across the cluster (2) a task for claiming the token assigned to itself.
  • Concurrent requests to stop different streams.

A single threaded executor won't work here. For now I increased the thread pool size to 8 threads. That will let the cluster process up to 8 concurrent requests without being backed up (note that token claim tasks are short lived unlike the poll tasks, so they won't be the bottleneck here). I'll monitor how this performs in EI and make adjustments accordingly. A config property can also be added in the future if we end up tinkering with this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch!

@@ -869,15 +890,18 @@ boolean connectorTasksHaveStopped(String connectorName, Set<String> stoppingTask
@VisibleForTesting
static List<Datastream> inferStoppingDatastreamsFromAssignment(List<DatastreamTask> newAssignment,
List<DatastreamTask> removedTasks) {
Map<String, List<Datastream>> taskPrefixToDatastream = removedTasks.stream().
collect(Collectors.toMap(DatastreamTask::getTaskPrefix, DatastreamTask::getDatastreams));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For streams with >1 task assigned to a host, this was causing the task claim thread to silently fail with a DuplicateKeyException

collect(Collectors.toList());
List<String> newAssignmentTaskNames = newAssignment.stream().map(DatastreamTask::getDatastreamTaskName).
collect(Collectors.toList());
_log.debug("Claiming assignment tokens. Old assignment: {}", oldAssignmentTaskNames);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't you rather want these logs after / (inside when) a thread is scheduled for claiming the assignment tokens?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted these printed before scheduling the task for claiming tokens. These logs helped me figure out the bug with the dying tokens claim thread. I'm also thinking to clean these up once the feature is tested more and rolled out in production.

@@ -268,8 +271,7 @@ public void start() {
// Initializing executor services
_scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("CoordinatorScheduledExecutor-%d").build());
// TODO Assess whether having a single threaded executor for token claim tasks is sufficient or it will be exhausted
_tokenClaimExecutor = Executors.newSingleThreadExecutor(
_tokenClaimExecutor = Executors.newFixedThreadPool(TOKEN_CLAIM_THREAD_POOL_SIZE,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch!

@jzakaryan jzakaryan merged commit 31502b2 into linkedin:master Mar 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants