Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global checkpoint background sync #26591

Merged
merged 42 commits into from
Sep 21, 2017

Conversation

jasontedor
Copy link
Member

@jasontedor jasontedor commented Sep 11, 2017

It is the exciting return of the global checkpoint background sync. Long, long ago, in snapshot version far, far away we had and only had a global checkpoint background sync. This sync would fire periodically and send the global checkpoint from the primary shard to the replicas so that they could update their local knowledge of the global checkpoint. Later in time, as we sped ahead towards finalizing the initial version of sequence IDs, we realized that we need the global checkpoint updates to be inline. This means that on a replication operation, the primary shard would piggy back the global checkpoint with the replication operation to the replicas. The replicas would update their local knowledge of the global checkpoint and reply with their local checkpoint. However, this could allow the global checkpoint on the primary to advance again and the replicas would fall behind in their local knowledge of the global checkpoint. If another replication operation never fired, then the replicas would be permanently behind. To account for this, we added one more sync that would fire when the primary shard fell idle. However, this has problems:

  • the shard idle timer defaults to five minutes, a long time to wait for the replicas to learn of the new global checkpoint
  • if a replica missed the sync, there was no follow-up sync to catch them up
  • there is an inherent race condition where the primary shard could fall idle mid-operation (after having sent the replication request to the replicas); in this case, there would never be a background sync after the operation completes
  • tying the global checkpoint sync to the idle timer was never natural

To fix this, we add two additional changes for the global checkpoint to be synced to the replicas. The first is that we add a post-operation sync that only fires if there are no operations in flight and there is a lagging replica. This gives us a chance to sync the global checkpoint to the replicas immediately after an operation so that they are always kept up to date. The second is that we add back a global checkpoint background sync that fires on a timer. This timer fires every thirty seconds, and is not configurable (for simplicity). This background sync is smarter than what we had previously in the sense that it only sends a sync if the global checkpoint on at least one replica is lagging that of the primary. When the timer fires, we can compare the global checkpoint on the primary to its knowledge of the global checkpoint on the replicas and only send a sync if there is a shard behind.

Closes #26573, relates #26630, relates #26666

It is the exciting return of the global checkpoint background
sync. Long, long ago, in snapshot version far, far away we had and only
had a global checkpoint background sync. This sync would fire
periodically and send the global checkpoint from the primary shard to
the replicas so that they could update their local knowledge of the
global checkpoint. Later in time, as we sped ahead towards finalizing
the initial version of sequence IDs, we realized that we need the global
checkpoint updates to be inline. This means that on a replication
operation, the primary shard would piggy back the global checkpoint with
the replication operation to the replicas. The replicas would update
their local knowledge of the global checkpoint and reply with their
local checkpoint. However, this could allow the global checkpoint on the
primary to advance again and the replicas would fall behind in their
local knowledge of the global checkpoint. If another replication
operation never fired, then the replicas would be permanently behind. To
account for this, we added one more sync that would fire when the
primary shard fell idle. However, this has problems:
 - the shard idle timer defaults to five minutes, a long time to wait
   for the replicas to learn of the new global checkpoint
 - if a replica missed the sync, there was no follow-up sync to catch
   them up
 - there is an inherent race condition where the primary shard could
   fall idle mid-operation (after having sent the replication request to
   the replicas); in this case, there would never be a background sync
   after the operation completes
 - tying the global checkpoint sync to the idle timer was never natural

To fix this, we add back a global checkpoint background sync that fires
on a timer. This timer fires every thirty seconds, and is not
configurable (for simplicity). This background sync is smarter in the
sense that it only sends a sync if the global checkpoint on at least one
replica is lagging that of the primary. This necessitates adding the
primary shard tracking its knowledge of the local knowledge of the
global checkpoint on the replicas. When the timer fires, we can compare
the global checkpoint on the primary to its knowledge of the global
checkpoint on the replicas and only send a sync if there is a shard
behind. During replication operations it can be the case that the timer
fires and sends a sync that would be covered by an in-flight
operation. This is okay, the extra sync does not hurt and we do not need
the complexity of optimizing away this duplicate sync.
Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I found some issues regarding recoveries and failing shards. I will have to give this another close look, just some initial thoughts to share.

*
* @return the global checkpoints for all shards
*/
synchronized ObjectLongMap<String> getGlobalCheckpoints(final String allocationId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can set the allocation id of the primary when creating the tracker in the constructor (we then also don't need to pass it anymore to the activatePrimaryMode method).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26630.

assert primaryMode;
assert handoffInProgress == false;
final ObjectLongMap<String> copy = new ObjectLongHashMap<>(globalCheckpoints);
copy.put(allocationId, globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the above suggestion (allocation id of primary set during construction), we can just have a globalCheckpoints map that replaces the current globalCheckpoint variable. The global checkpoint of the primary is then tracked in the same map as the other shard copies. An alternative is to generalize "localCheckPoints" to just "checkPoints" and put the global checkpoint info into that same map entry used for the local checkpoint tracking. This would also allow that info to be transferred during primary relocation handoff (neat?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

@@ -585,6 +619,7 @@ public synchronized void completeRelocationHandoff() {
lcps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
});
globalCheckpoints.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about removing map entries when replicas fail etc. and are removed by the cluster state (and cleaned from the localCheckpoints map)?

Note that in the above scenario maybeSyncGlobalCheckpoint will execute the sync again and again (also a good test case?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

assert globalCheckpoints.containsKey(allocationId);
final long globalCheckpoint = globalCheckpoints.get(allocationId);
final boolean syncNeeded =
StreamSupport.stream(globalCheckpoints.values().spliterator(), false).anyMatch(v -> v.value < globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is very coarse. With the info available in GlobalCheckpointTracker, we could actually only care for global checkpoints of in-sync copies. In case of a recovering shard, we don't care about running the global checkpoint sync again and again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking some more about this, I think that the information for non in-sync shard copies is even incorrect at the moment.

In case where we replicated a document while the target shard was recovering, it might not have set the global checkpoint information on that shard (see IndexShard#updateGlobalCheckpointOnReplica). We can therefore not consider the global checkpoint on the recovering shard to be correctly updated. Note that we rely on the recovery finalization to "fix" the global checkpoint info on the target by explicitly calling updateGlobalCheckpointOnReplica during recovery finalization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

@@ -42,9 +42,22 @@
Setting.timeSetting("index.translog.retention.check_interval", new TimeValue(10, TimeUnit.MINUTES),
new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope);

public static final Setting<TimeValue> GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING =
Setting.timeSetting(
"index.global_checkpoint_sync.interval",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about making this a String constant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is removed, it's not needed in the test case anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this setting should be defined in the main source set (even if not registered), not here. Yeah, it follows a pattern here, but that pattern is horrible imo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I pushed 7e6d1bf.

* @param allocationId the allocation ID to update the global checkpoint for
* @param globalCheckpoint the global checkpoint
*/
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this also be called from RecoverySourceHandler after calling recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed by #26666.

public void testGlobalCheckpointSync() throws Exception {
internalCluster().startNode();
prepareCreate("test", Settings.builder().put(GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")).get();
ensureGreen("test");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ensure green? I think the test becomes more interesting when we index while the cluster might not be green yet (see my comments on recovery)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this.

* master: (67 commits)
  Restoring from snapshot should force generation of a new history uuid (elastic#26694)
  test: Use a single primary shard so that the exception can caught in the same way
  Move pre-6.0 node checkpoint to SequenceNumbers
  Invalid JSON request body caused endless loop (elastic#26680)
  added comment
  fix line length violation
  Moved the check to fetch phase. This basically means that we throw a better error message instead of an AOBE and not adding more restrictions.
  inner hits: Do not allow inner hits that use _source and have a non nested object field as parent
  Separate Painless Whitelist Loading from the Painless Definition (elastic#26540)
  convert more admin requests to writeable (elastic#26566)
  Handle release of 5.6.1
  Allow `InputStreamStreamInput` array size validation where applicable (elastic#26692)
  Update global checkpoint with permit after recovery
  Filter pre-6.0 nodes for checkpoint invariants
  Skip bad request REST test on pre-6.0
  Reenable BWC tests after disabling for backport
  Add global checkpoint tracking on the primary
  [Test] Fix reference/cat/allocation/line_8 test failure
  [Docs] improved description for fs.total.available_in_bytes (elastic#26657)
  Fix discovery-file plugin to use custom config path
  ...
* master:
  Remove assertion from checkpoint tracker invariants
  Upgrade API: fix excessive logging and unnecessary template updates (elastic#26698)
* master:
  [DOCS] Added index-shared4 and index-shared5.asciidoc
  BulkProcessor flush runnable preserves the thread context from creation time (elastic#26718)
  Catch exceptions and inform handler in RemoteClusterConnection#collectNodes (elastic#26725)
  [Docs] Fix name of character filter in example. (elastic#26724)
  Remove parse field deprecations in query builders (elastic#26711)
  elastic#26720: Set the correct bwc version after backport to 6.0
  Remove deprecated type and slop field in MatchQueryBuilder (elastic#26720)
  Refactoring of Gateway*** classes (elastic#26706)
  Make RestHighLevelClient's Request class public (elastic#26627)
  Deguice ActionFilter (elastic#26691)
  aggs: Allow aggregation sorting via nested aggregation.
  Build: Set bwc builds to always set snapshot (elastic#26704)
  File Discovery: Remove fallback with zen discovery (elastic#26667)
@jasontedor jasontedor closed this Sep 20, 2017
@jasontedor jasontedor deleted the global-checkpoint-sync branch September 20, 2017 22:24
@jasontedor jasontedor reopened this Sep 20, 2017
@jasontedor jasontedor removed the request for review from bleskes September 20, 2017 22:29
@jasontedor
Copy link
Member Author

retest this please

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - yay for super-fast gcp syncs

.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2))
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
));
.setSettings(Settings.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE gone rogue in this file? :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concurrently pushed b640b10. 😄

@@ -330,7 +330,8 @@ public IndexService newIndexService(
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry)
NamedWriteableRegistry namedWriteableRegistry,
Consumer<ShardId> globalCheckpointSyncer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this and remove all changes to IndicesService.createIndex etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 26e4c76.

Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE gone rogue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b8adcce.

final Consumer<Client> afterIndexing) throws Exception {
final int numberOfReplicas = randomIntBetween(1, 4);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
// set the sync interval high so it does not execute during this test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment belongs to the calling method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed b80b728.

@jasontedor
Copy link
Member Author

Finally, we can also think about putting the same checks we have in RelocationIT into other classes (e.g. testAckedIndexing).

I pushed f3b04dc.

* master:
  Add permission checks before reading from HDFS stream (elastic#26716)
  muted test
  [Docs] Fixed typo of *configuration* (elastic#25058)
  Add azure storage endpoint suffix elastic#26432 (elastic#26568)
@jasontedor jasontedor merged commit f35d1de into elastic:master Sep 21, 2017
jasontedor added a commit that referenced this pull request Sep 21, 2017
It is the exciting return of the global checkpoint background
sync. Long, long ago, in snapshot version far, far away we had and only
had a global checkpoint background sync. This sync would fire
periodically and send the global checkpoint from the primary shard to
the replicas so that they could update their local knowledge of the
global checkpoint. Later in time, as we sped ahead towards finalizing
the initial version of sequence IDs, we realized that we need the global
checkpoint updates to be inline. This means that on a replication
operation, the primary shard would piggy back the global checkpoint with
the replication operation to the replicas. The replicas would update
their local knowledge of the global checkpoint and reply with their
local checkpoint. However, this could allow the global checkpoint on the
primary to advance again and the replicas would fall behind in their
local knowledge of the global checkpoint. If another replication
operation never fired, then the replicas would be permanently behind. To
account for this, we added one more sync that would fire when the
primary shard fell idle. However, this has problems:
 - the shard idle timer defaults to five minutes, a long time to wait
   for the replicas to learn of the new global checkpoint
 - if a replica missed the sync, there was no follow-up sync to catch
   them up
 - there is an inherent race condition where the primary shard could
   fall idle mid-operation (after having sent the replication request to
   the replicas); in this case, there would never be a background sync
   after the operation completes
 - tying the global checkpoint sync to the idle timer was never natural

To fix this, we add two additional changes for the global checkpoint to
be synced to the replicas. The first is that we add a post-operation
sync that only fires if there are no operations in flight and there is a
lagging replica. This gives us a chance to sync the global checkpoint to
the replicas immediately after an operation so that they are always kept
up to date. The second is that we add back a global checkpoint
background sync that fires on a timer. This timer fires every thirty
seconds, and is not configurable (for simplicity). This background sync
is smarter than what we had previously in the sense that it only sends a
sync if the global checkpoint on at least one replica is lagging that of
the primary. When the timer fires, we can compare the global checkpoint
on the primary to its knowledge of the global checkpoint on the replicas
and only send a sync if there is a shard behind.

Relates #26591
jasontedor added a commit that referenced this pull request Sep 21, 2017
It is the exciting return of the global checkpoint background
sync. Long, long ago, in snapshot version far, far away we had and only
had a global checkpoint background sync. This sync would fire
periodically and send the global checkpoint from the primary shard to
the replicas so that they could update their local knowledge of the
global checkpoint. Later in time, as we sped ahead towards finalizing
the initial version of sequence IDs, we realized that we need the global
checkpoint updates to be inline. This means that on a replication
operation, the primary shard would piggy back the global checkpoint with
the replication operation to the replicas. The replicas would update
their local knowledge of the global checkpoint and reply with their
local checkpoint. However, this could allow the global checkpoint on the
primary to advance again and the replicas would fall behind in their
local knowledge of the global checkpoint. If another replication
operation never fired, then the replicas would be permanently behind. To
account for this, we added one more sync that would fire when the
primary shard fell idle. However, this has problems:
 - the shard idle timer defaults to five minutes, a long time to wait
   for the replicas to learn of the new global checkpoint
 - if a replica missed the sync, there was no follow-up sync to catch
   them up
 - there is an inherent race condition where the primary shard could
   fall idle mid-operation (after having sent the replication request to
   the replicas); in this case, there would never be a background sync
   after the operation completes
 - tying the global checkpoint sync to the idle timer was never natural

To fix this, we add two additional changes for the global checkpoint to
be synced to the replicas. The first is that we add a post-operation
sync that only fires if there are no operations in flight and there is a
lagging replica. This gives us a chance to sync the global checkpoint to
the replicas immediately after an operation so that they are always kept
up to date. The second is that we add back a global checkpoint
background sync that fires on a timer. This timer fires every thirty
seconds, and is not configurable (for simplicity). This background sync
is smarter than what we had previously in the sense that it only sends a
sync if the global checkpoint on at least one replica is lagging that of
the primary. When the timer fires, we can compare the global checkpoint
on the primary to its knowledge of the global checkpoint on the replicas
and only send a sync if there is a shard behind.

Relates #26591
jasontedor added a commit that referenced this pull request Sep 21, 2017
This commit reenables the BWC tests after the introduction of the
post-operation and background global checkpoint sync.

Relates #26591
jasontedor added a commit that referenced this pull request Sep 21, 2017
This commit reenables the BWC tests after the introduction of the
post-operation and background global checkpoint sync.

Relates #26591
jasontedor added a commit that referenced this pull request Sep 21, 2017
This commit reenables the BWC tests after the introduction of the
post-operation and background global checkpoint sync.

Relates #26591
@jasontedor
Copy link
Member Author

Thanks @ywelsch.

@lcawl lcawl removed the v6.1.0 label Dec 12, 2017
@clintongormley clintongormley added :Distributed/Engine Anything around managing Lucene and the Translog in an open shard. and removed :Sequence IDs labels Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Engine Anything around managing Lucene and the Translog in an open shard. >enhancement v6.0.0-rc1 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decouple global checkpoint sync from a shard falling idle
5 participants