Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to split shards #26931

Merged
merged 23 commits into from
Nov 6, 2017
Merged

Add ability to split shards #26931

merged 23 commits into from
Nov 6, 2017

Conversation

s1monw
Copy link
Contributor

@s1monw s1monw commented Oct 9, 2017

This change adds a new _split API that allows to split indices into a new
index with a power of two more shards that the source index. This API works
alongside the _shrink API but doesn't require any shard relocation before
indices can be split.

The split operation is conceptually an inverse _shrink operation since we
initialize the index with a syntetic number of routing shards that are used
for the consistent hashing at index time. Compared to indices created with
earlier versions this might produce slightly different shard distributions but
has no impact on the per-index backwards compatibility. For now, the user is
required to prepare an index to be splittable by setting the
index.number_of_routing_shards at index creation time. The setting allows the
user to prepare the index to be splittable in factors of
index.number_of_routing_shards ie. if the index is created with
index.number_of_routing_shards: 16 and index.number_of_shards: 2 it can be
split into 4, 8, 16 shards. This is an intermediate step until we can make
this the default. This also allows us to safely backport this change to 6.x.

The _split operation is implemented internally as a DeleteByQuery on the
lucene level that is executed while the primary shards execute their initial
recovery. Subsequent merges that are triggered due to this operation will not be
executed immediately. All merges will be deferred unti the shards are started
and will then be throttled accordingly.

This change is intended for the 6.1 feature release but will not support pre-6.1
indices to be split unless these indices have been shrunk before. In that case
these indices can be split backwards into their original number of shards.

This change adds a new `_split` API that allows to split indices
into a new index with a power of two more shards that the source index.
This API works alongside the `_split` API but doesn't require any shard
relocation before indices can be split.

The split operation is conceptually an inverse `_shrink` operation since
we initialize the index with a _syntetic_ number of routing shards that
are used for the consistent hashing at index time. Compared to indices
created with earlier versions this might produce slightly different shard
distributions but has no impact on the per-index backwards compatibility.
For now, the user is required to prepare an index to be splittable by
setting the split factor at index creation time. Users can decide by what
factor they want to split the index ie. if an index should be splittable by
into a multiple of 2 setting `index.routing_shards_factor: 1024` allows to
split an index 10 times doubling the number of shards each time. This is
an intermediate step until we can make this the default. This also allows
us to safely backport this change to 6.x.

The `_split` operation is implemented internally as a DeleteByQuery on
the lucene level that is executed while the primary shards execute their
initial recovery. Subsequent merges that are triggered due to this operation
will not be executed immediately. All merges will be deferred unti the shards
are started and will then be throttled accordingly.

This change is intended for the 6.1 feature release but will not support
pre-6.1 indices to be split unless these indices have been shrunk before. In that
case these indices can be split backwards into their original number of shards.
@s1monw
Copy link
Contributor Author

s1monw commented Oct 9, 2017

@ywelsch can you please review the allocation decider / StoreRecovery work
@jpountz please take a look at the lucene level parts
@bleskes / @nik9000 can you do a general review

Copy link
Member

@dadoonet dadoonet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if at some point we should have only one endpoint _resize instead of _split and _shrink?

@s1monw
Copy link
Contributor Author

s1monw commented Oct 10, 2017

I wonder if at some point we should have only one endpoint _resize instead of _split and _shrink?

what's wrong with two endpoints? I makes it very clear what you wanna do no? there no implicit / hidden parameter you need to specify.

@dadoonet
Copy link
Member

what's wrong with two endpoints?

Nothing. Just that the internal classes named Resize* made me think about it. Was just a thought.

@s1monw
Copy link
Contributor Author

s1monw commented Oct 11, 2017

I took a look if I can break this PR into smaller pieces since it's quite big. I think it's possible to do several parts in core independently and I will open a branch for the guts. I will leave this one open for reference

@jasontedor
Copy link
Member

@s1monw It's not clear to me that splitting this PR will make it easier to review. I think that this PR looks big, but it has a conceptual nut that once you grok, the rest of the PR is ceremony/piping around that, and docs and tests. I sometimes (not always) find these larger changes easier to digest if I can see the entire picture at once (like in the initial CCS PR).

Copy link
Contributor

@jpountz jpountz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments about the query impl.

* This DISI visits every live doc and selects all docs that don't belong into this
* shard based on their id and rounting value. This is only used in a routing partitioned index.
*/
private final class RoutingPartitionedDocIdSetIterator extends DocIdSetIterator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is theoretically illegal to use live docs in a query since it prevents caching on the core key. However you could implement this more efficiently with a two-phase iterator, and the good news is that Lucene will make sure to check live docs before calling matches(). You could pass a DocIdSetIterator.all(maxDoc) as an approximation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good stuff I will explore this.

return leftToVisit == 0 ? Status.STOP : Status.NO;

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to support indices created in 5.x? If yes you will need to read the _uid field too (which is a string field).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW indices will have either _id or __uid but never both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah no we dont' suppport 5.x I will add comments

return shardId == targetShardId;
};
if (terms == null) { // this is the common case - no partitioning and no _routing values
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beware that 5.x indices will have a _uid field rather than _id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah so I prevent this feature from being used on 5.x indices for that reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check to the ctor to make sure we are only running this on 6.0 onwards indices


@Override
public boolean equals(Object o) {
return sameClassAs(o);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make equals/hashcode correct or throw an UOE so that we don't have caching bugs? Even if queries passed to IndexWriter.deleteDocuments are not supposed to be cached I'd like to make sure there are no issues if it occurs.

@s1monw
Copy link
Contributor Author

s1monw commented Oct 11, 2017

@jasontedor fair enough. Lets do a review round here and see if it's ok.

s1monw added a commit to s1monw/elasticsearch that referenced this pull request Oct 12, 2017
Today we only allow to decode byte arrays where the data has a 0 offset
and the same length as the array. Allowing to decode stuff from a slice will
make decoding IDs cheaper if the the ID is for instance coming from a term dictionary
or BytesRef.

Relates to elastic#26931
@s1monw
Copy link
Contributor Author

s1monw commented Oct 12, 2017

@jpountz I pushed changes for your comments here 68d2fa6

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a few comments, questions and suggestions.

return new CreateIndexClusterStateUpdateRequest(targetIndex,
cause, targetIndex.index(), targetIndexName, true)
// mappings are updated on the node when merging in the shards, this prevents race-conditions since all mapping must be
// applied once we took the snapshot and if somebody fucks things up and switches the index read/write and adds docs we miss
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

language please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch yannik!

shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout()));
shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout()));
shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards")));
return channel -> client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<ShrinkResponse>(channel) {
return channel -> client.admin().indices().resizeIndex(shrinkIndexRequest, new AcknowledgedRestListener<ResizeResponse>(channel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does BWC work here for shrink?
If you send a shrink request to a v7.0 node in a mixed-version cluster where the master is v6.x, the master won't be able to understand the resize action?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a TransportShrinkAction that is registered to deal with the inverse situation. I have to do the same (register a dummy resize there) for 6.x in oder to allow this but this is not part of this PR. I can also just stick with shrink but that can be decided when we approach the backport

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added bwc code to the TransportResizeAction here all should be sorted out now. /cc @bleskes

}
int numSourceShards = sourceIndexMetadata.getNumberOfShards();
if (numSourceShards > numTargetShards) {
throw new IllegalArgumentException("the number of source shards must be less that the number of target shards");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add numSourceShards and numTargetShards to the message?

}

/**
* Selects the source shards fro a local shard recovery. This might either be a split or a shrink operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for

} else if (sourceIndexMetadata.getNumberOfShards() < numTargetShards) {
return Collections.singleton(selectSplitShard(shardId, sourceIndexMetadata, numTargetShards));
}
throw new IllegalArgumentException("can't select recover from shards if both indices have the same number of shards");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have shrink and split, why not zero-copy clone ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

valid request, different PR IMO

return Decision.ALWAYS;
}
ShardId shardId = IndexMetaData.selectSplitShard(shardRouting.id(), sourceIndexMetaData, indexMetaData.getNumberOfShards());
ShardRouting sourceShardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use (the mutable) RoutingNodes instead here: allocation.routingNodes().activePrimary(shardRouting.shardId()).
The routingTable can be arbitrarily stale at this point and should practically never be used (I wanted to remove it from the RoutingAllocation interface, but there are a few usages that are difficult to replace by RoutingNodes).

}
ShardId shardId = IndexMetaData.selectSplitShard(shardRouting.id(), sourceIndexMetaData, indexMetaData.getNumberOfShards());
ShardRouting sourceShardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
if (sourceShardRouting.active() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed with the above change. just check for null

public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
// check if we have passed the maximum retry threshold through canAllocate,
// if so, we don't want to force the primary allocation here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment can be removed

final List<IndexShard> startedShards = new ArrayList<>();
final IndexService sourceIndexService = indicesService.indexService(mergeSourceIndex);
final int numShards = sourceIndexService != null ? sourceIndexService.getIndexSettings().getNumberOfShards() : -1;
final Set<ShardId> requiredShards = IndexMetaData.selectRecoverFromShards(shardId().id(),
sourceIndexService.getMetaData(), indexMetaData.getNumberOfShards());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceIndexService could be null? (you're doing the null check in the next line)

if (sourceShardRouting.active() == false) {
return allocation.decision(Decision.NO, NAME, "source primary shard [%s] is not active", sourceShardRouting.shardId());
}
if (node != null) { // we might get called from the 2 param canAllocate method..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should also check if the ES version on the node is capable of splitting a shard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point I will add that

s1monw added a commit that referenced this pull request Oct 12, 2017
Today we only allow to decode byte arrays where the data has a 0 offset
and the same length as the array. Allowing to decode stuff from a slice will
make decoding IDs cheaper if the the ID is for instance coming from a term dictionary
or BytesRef.

Relates to #26931
bleskes pushed a commit that referenced this pull request Nov 7, 2017
This change adds a new `_split` API that allows to split indices into a new
index with a power of two more shards that the source index.  This API works
alongside the `_shrink` API but doesn't require any shard relocation before
indices can be split.

The split operation is conceptually an inverse `_shrink` operation since we
initialize the index with a _syntetic_ number of routing shards that are used
for the consistent hashing at index time. Compared to indices created with
earlier versions this might produce slightly different shard distributions but
has no impact on the per-index backwards compatibility.  For now, the user is
required to prepare an index to be splittable by setting the
`index.number_of_routing_shards` at index creation time.  The setting allows the
user to prepare the index to be splittable in factors of
`index.number_of_routing_shards` ie. if the index is created with
`index.number_of_routing_shards: 16` and `index.number_of_shards: 2` it can be
split into `4, 8, 16` shards. This is an intermediate step until we can make
this the default. This also allows us to safely backport this change to 6.x.

The `_split` operation is implemented internally as a DeleteByQuery on the
lucene level that is executed while the primary shards execute their initial
recovery. Subsequent merges that are triggered due to this operation will not be
executed immediately. All merges will be deferred unti the shards are started
and will then be throttled accordingly.

This change is intended for the 6.1 feature release but will not support pre-6.1
indices to be split unless these indices have been shrunk before. In that case
these indices can be split backwards into their original number of shards.
bleskes added a commit that referenced this pull request Nov 7, 2017
jasontedor added a commit that referenced this pull request Nov 7, 2017
* master: (25 commits)
  Disable bwc tests in preparation of backporting #26931
  TemplateUpgradeService should only run on the master (#27294)
  Die with dignity while merging
  Fix profiling naming issues (#27133)
  Correctly encode warning headers
  Fixed references to Multi Index Syntax (#27283)
  Add an active Elasticsearch WordPress plugin link (#27279)
  Setting url parts as required to reflect the code base (#27263)
  keys in aggs percentiles need to be in quotes. (#26905)
  Align routing param type with search.json (#26958)
  Update to support bulk updates by query (#27172)
  Remove duplicated SnapshotStatus (#27276)
  add split index reference in indices.asciidoc
  Add ability to split shards (#26931)
  [Docs] Fix minor paragraph indentation error for multiple Indices params (#25535)
  Upgrade to Jackson 2.8.10 (#27230)
  Fix inconsistencies in the rest api specs for `tasks` (#27163)
  Adjust RestHighLevelClient method modifiers (#27238)
  Remove unused parameters in AnalysisRegistry (#27232)
  Add more information on `_failed_to_convert_` exception (#27034)
  ...
jasontedor added a commit to martijnvg/elasticsearch that referenced this pull request Nov 7, 2017
* ccr: (127 commits)
  Disable bwc tests in preparation of backporting elastic#26931
  TemplateUpgradeService should only run on the master (elastic#27294)
  Die with dignity while merging
  Fix profiling naming issues (elastic#27133)
  Correctly encode warning headers
  Fixed references to Multi Index Syntax (elastic#27283)
  Add an active Elasticsearch WordPress plugin link (elastic#27279)
  Setting url parts as required to reflect the code base (elastic#27263)
  keys in aggs percentiles need to be in quotes. (elastic#26905)
  Align routing param type with search.json (elastic#26958)
  Update to support bulk updates by query (elastic#27172)
  Remove duplicated SnapshotStatus (elastic#27276)
  add split index reference in indices.asciidoc
  Add ability to split shards (elastic#26931)
  [Docs] Fix minor paragraph indentation error for multiple Indices params (elastic#25535)
  Upgrade to Jackson 2.8.10 (elastic#27230)
  Fix inconsistencies in the rest api specs for `tasks` (elastic#27163)
  Adjust RestHighLevelClient method modifiers (elastic#27238)
  Remove unused parameters in AnalysisRegistry (elastic#27232)
  Add more information on `_failed_to_convert_` exception (elastic#27034)
  ...
jasontedor added a commit that referenced this pull request Nov 7, 2017
* 6.x:
  Update shrink's bwc version to 6.1.0
  add split index reference in indices.asciidoc
  Add ability to split shards (#26931)
  TemplateUpgradeService should only run on the master (#27294)
  Die with dignity while merging
  Fix profiling naming issues (#27133)
  Correctly encode warning headers
  Fixed references to Multi Index Syntax (#27283)
  Add an active Elasticsearch WordPress plugin link (#27279)
  Setting url parts as required to reflect the code base (#27263)
  keys in aggs percentiles need to be in quotes. (#26905)
  Align routing param type with search.json (#26958)
  Update to support bulk updates by query (#27172)
  Remove duplicated SnapshotStatus (#27276)
@bleskes
Copy link
Contributor

bleskes commented Nov 7, 2017

this is now backported to 6.x as well.

jasontedor added a commit to glefloch/elasticsearch that referenced this pull request Nov 9, 2017
* master: (556 commits)
  Fix find remote when building BWC
  Remove colons from task and configuration names
  Add unreleased 5.6.5 version number
  testCreateSplitIndexToN: do not set `routing_partition_size` to >= `number_of_routing_shards`
  Snapshot/Restore: better handle incorrect chunk_size settings in FS repo (elastic#26844)
  Add limits for ngram and shingle settings (elastic#27211) (elastic#27318)
  Correct comment in index shard test
  Roll translog generation on primary promotion
  ObjectParser: Replace IllegalStateException with ParsingException (elastic#27302)
  scripted_metric _agg parameter disappears if params are provided (elastic#27159)
  Update discovery-ec2.asciidoc
  Update shrink's bwc version to 6.1.0 and enabled bwc tests
  Add limits for ngram and shingle settings (elastic#27211)
  Disable bwc tests in preparation of backporting elastic#26931
  TemplateUpgradeService should only run on the master (elastic#27294)
  Die with dignity while merging
  Fix profiling naming issues (elastic#27133)
  Correctly encode warning headers
  Fixed references to Multi Index Syntax (elastic#27283)
  Add an active Elasticsearch WordPress plugin link (elastic#27279)
  ...
ruflin added a commit to ruflin/beats that referenced this pull request Nov 17, 2017
With elastic/elasticsearch#26931 the possibility for splitting shards was introduced. To make use of this feature for indices created with ES >=6.1 the config option `index.number_of_routing_shards` is required. This adds this config option currently set to 30 as it's a multiple of 1, 3 and 5, our current number of default shards in Beats and ES. This allows users with default configs to scale their split their shards.

The `number_of_routing_shards` can also be overwritten in the config file.
tsg pushed a commit to elastic/beats that referenced this pull request Nov 17, 2017
With elastic/elasticsearch#26931 the possibility for splitting shards was introduced. To make use of this feature for indices created with ES >=6.1 the config option `index.number_of_routing_shards` is required. This adds this config option currently set to 30 as it's a multiple of 1, 3 and 5, our current number of default shards in Beats and ES. This allows users with default configs to scale their split their shards.

The `number_of_routing_shards` can also be overwritten in the config file.
s1monw added a commit to s1monw/elasticsearch that referenced this pull request Nov 21, 2017
While we have an assertion that checks if the number of routing shards is a multiple
of the number of shards we need a real hard exception that checks this way earlier.
This change adds a check and test that is executed before we create the index.

Relates to elastic#26931
s1monw added a commit that referenced this pull request Nov 21, 2017
While we have an assertion that checks if the number of routing shards is a multiple
of the number of shards we need a real hard exception that checks this way earlier.
This change adds a check and test that is executed before we create the index.

Relates to #26931
s1monw added a commit that referenced this pull request Nov 21, 2017
While we have an assertion that checks if the number of routing shards is a multiple
of the number of shards we need a real hard exception that checks this way earlier.
This change adds a check and test that is executed before we create the index.

Relates to #26931
ywelsch added a commit that referenced this pull request Apr 3, 2018
DiskThresholdDecider currently assumes that the source index of a resize operation (e.g. shrink)
is available, and throws an IndexNotFoundException otherwise, thereby breaking any kind of shard
allocation. This can be quite harmful if the source index is deleted during a shrink, or if the source
index is unavailable during state recovery.

While this behavior has been partly fixed in 6.1 and above (due to #26931), it relies on the order in
which AllocationDeciders are executed (i.e. that ResizeAllocationDecider returns NO, ensuring that
DiskThresholdDecider does not run, something that for example does not hold for the allocation
explain API).

This change adds a more complete fix, and also solves the situation for 5.6.
ywelsch added a commit that referenced this pull request Apr 3, 2018
DiskThresholdDecider currently assumes that the source index of a resize operation (e.g. shrink)
is available, and throws an IndexNotFoundException otherwise, thereby breaking any kind of shard
allocation. This can be quite harmful if the source index is deleted during a shrink, or if the source
index is unavailable during state recovery.

While this behavior has been partly fixed in 6.1 and above (due to #26931), it relies on the order in
which AllocationDeciders are executed (i.e. that ResizeAllocationDecider returns NO, ensuring that
DiskThresholdDecider does not run, something that for example does not hold for the allocation
explain API).

This change adds a more complete fix, and also solves the situation for 5.6.
ywelsch added a commit that referenced this pull request Apr 3, 2018
DiskThresholdDecider currently assumes that the source index of a resize operation (e.g. shrink)
is available, and throws an IndexNotFoundException otherwise, thereby breaking any kind of shard
allocation. This can be quite harmful if the source index is deleted during a shrink, or if the source
index is unavailable during state recovery.

While this behavior has been partly fixed in 6.1 and above (due to #26931), it relies on the order in
which AllocationDeciders are executed (i.e. that ResizeAllocationDecider returns NO, ensuring that
DiskThresholdDecider does not run, something that for example does not hold for the allocation
explain API).

This change adds a more complete fix, and also solves the situation for 5.6.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants