Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to split shards #26931

Merged
merged 23 commits into from
Nov 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shrink.ResizeAction;
import org.elasticsearch.action.admin.indices.shrink.ShrinkAction;
import org.elasticsearch.action.admin.indices.shrink.TransportResizeAction;
import org.elasticsearch.action.admin.indices.shrink.TransportShrinkAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
Expand Down Expand Up @@ -181,7 +183,6 @@
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
Expand All @@ -199,7 +200,6 @@
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -271,6 +271,7 @@
import org.elasticsearch.rest.action.admin.indices.RestRefreshAction;
import org.elasticsearch.rest.action.admin.indices.RestRolloverIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestShrinkIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestSplitIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestSyncedFlushAction;
import org.elasticsearch.rest.action.admin.indices.RestUpdateSettingsAction;
import org.elasticsearch.rest.action.admin.indices.RestUpgradeAction;
Expand Down Expand Up @@ -324,7 +325,6 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;

/**
Expand Down Expand Up @@ -438,6 +438,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
actions.register(ShrinkAction.INSTANCE, TransportShrinkAction.class);
actions.register(ResizeAction.INSTANCE, TransportResizeAction.class);
actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class);
actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
Expand Down Expand Up @@ -554,6 +555,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestIndicesAliasesAction(settings, restController));
registerHandler.accept(new RestCreateIndexAction(settings, restController));
registerHandler.accept(new RestShrinkIndexAction(settings, restController));
registerHandler.accept(new RestSplitIndexAction(settings, restController));
registerHandler.accept(new RestRolloverIndexAction(settings, restController));
registerHandler.accept(new RestDeleteIndexAction(settings, restController));
registerHandler.accept(new RestCloseIndexAction(settings, restController));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
Expand All @@ -43,7 +44,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
private final String index;
private final String providedName;
private final boolean updateAllTypes;
private Index shrinkFrom;
private Index recoverFrom;
private ResizeType resizeType;

private IndexMetaData.State state = IndexMetaData.State.OPEN;

Expand All @@ -59,7 +61,6 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;


public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, String providedName,
boolean updateAllTypes) {
this.originalMessage = originalMessage;
Expand Down Expand Up @@ -99,8 +100,8 @@ public CreateIndexClusterStateUpdateRequest state(IndexMetaData.State state) {
return this;
}

public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) {
this.shrinkFrom = shrinkFrom;
public CreateIndexClusterStateUpdateRequest recoverFrom(Index recoverFrom) {
this.recoverFrom = recoverFrom;
return this;
}

Expand All @@ -109,6 +110,11 @@ public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount
return this;
}

public CreateIndexClusterStateUpdateRequest resizeType(ResizeType resizeType) {
this.resizeType = resizeType;
return this;
}

public TransportMessage originalMessage() {
return originalMessage;
}
Expand Down Expand Up @@ -145,8 +151,8 @@ public Set<ClusterBlock> blocks() {
return blocks;
}

public Index shrinkFrom() {
return shrinkFrom;
public Index recoverFrom() {
return recoverFrom;
}

/** True if all fields that span multiple types should be updated, false otherwise */
Expand All @@ -165,4 +171,11 @@ public String getProvidedName() {
public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

/**
* Returns the resize type or null if this is an ordinary create index request
*/
public ResizeType resizeType() {
return resizeType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.shrink;

import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

public class ResizeAction extends Action<ResizeRequest, ResizeResponse, ResizeRequestBuilder> {

public static final ResizeAction INSTANCE = new ResizeAction();
public static final String NAME = "indices:admin/resize";
public static final Version COMPATIBILITY_VERSION = Version.V_7_0_0_alpha1; // TODO remove this once it's backported

private ResizeAction() {
super(NAME);
}

@Override
public ResizeResponse newResponse() {
return new ResizeResponse();
}

@Override
public ResizeRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new ResizeRequestBuilder(client, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.elasticsearch.action.admin.indices.shrink;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -37,37 +39,41 @@
/**
* Request class to shrink an index into a single shard
*/
public class ShrinkRequest extends AcknowledgedRequest<ShrinkRequest> implements IndicesRequest {
public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements IndicesRequest {

public static final ObjectParser<ShrinkRequest, Void> PARSER = new ObjectParser<>("shrink_request", null);
public static final ObjectParser<ResizeRequest, Void> PARSER = new ObjectParser<>("resize_request", null);
static {
PARSER.declareField((parser, request, context) -> request.getShrinkIndexRequest().settings(parser.map()),
PARSER.declareField((parser, request, context) -> request.getTargetIndexRequest().settings(parser.map()),
new ParseField("settings"), ObjectParser.ValueType.OBJECT);
PARSER.declareField((parser, request, context) -> request.getShrinkIndexRequest().aliases(parser.map()),
PARSER.declareField((parser, request, context) -> request.getTargetIndexRequest().aliases(parser.map()),
new ParseField("aliases"), ObjectParser.ValueType.OBJECT);
}

private CreateIndexRequest shrinkIndexRequest;
private CreateIndexRequest targetIndexRequest;
private String sourceIndex;
private ResizeType type = ResizeType.SHRINK;

ShrinkRequest() {}
ResizeRequest() {}

public ShrinkRequest(String targetIndex, String sourceindex) {
this.shrinkIndexRequest = new CreateIndexRequest(targetIndex);
this.sourceIndex = sourceindex;
public ResizeRequest(String targetIndex, String sourceIndex) {
this.targetIndexRequest = new CreateIndexRequest(targetIndex);
this.sourceIndex = sourceIndex;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = shrinkIndexRequest == null ? null : shrinkIndexRequest.validate();
ActionRequestValidationException validationException = targetIndexRequest == null ? null : targetIndexRequest.validate();
if (sourceIndex == null) {
validationException = addValidationError("source index is missing", validationException);
}
if (shrinkIndexRequest == null) {
validationException = addValidationError("shrink index request is missing", validationException);
if (targetIndexRequest == null) {
validationException = addValidationError("target index request is missing", validationException);
}
if (shrinkIndexRequest.settings().getByPrefix("index.sort.").isEmpty() == false) {
validationException = addValidationError("can't override index sort when shrinking index", validationException);
if (targetIndexRequest.settings().getByPrefix("index.sort.").isEmpty() == false) {
validationException = addValidationError("can't override index sort when resizing an index", validationException);
}
if (type == ResizeType.SPLIT && IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) == false) {
validationException = addValidationError("index.number_of_shards is required for split operations", validationException);
}
return validationException;
}
Expand All @@ -79,16 +85,24 @@ public void setSourceIndex(String index) {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shrinkIndexRequest = new CreateIndexRequest();
shrinkIndexRequest.readFrom(in);
targetIndexRequest = new CreateIndexRequest();
targetIndexRequest.readFrom(in);
sourceIndex = in.readString();
if (in.getVersion().onOrAfter(ResizeAction.COMPATIBILITY_VERSION)) {
type = in.readEnum(ResizeType.class);
} else {
type = ResizeType.SHRINK; // BWC this used to be shrink only
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shrinkIndexRequest.writeTo(out);
targetIndexRequest.writeTo(out);
out.writeString(sourceIndex);
if (out.getVersion().onOrAfter(ResizeAction.COMPATIBILITY_VERSION)) {
out.writeEnum(type);
}
}

@Override
Expand All @@ -101,15 +115,15 @@ public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandOpen();
}

public void setShrinkIndex(CreateIndexRequest shrinkIndexRequest) {
this.shrinkIndexRequest = Objects.requireNonNull(shrinkIndexRequest, "shrink index request must not be null");
public void setTargetIndex(CreateIndexRequest targetIndexRequest) {
this.targetIndexRequest = Objects.requireNonNull(targetIndexRequest, "target index request must not be null");
}

/**
* Returns the {@link CreateIndexRequest} for the shrink index
*/
public CreateIndexRequest getShrinkIndexRequest() {
return shrinkIndexRequest;
public CreateIndexRequest getTargetIndexRequest() {
return targetIndexRequest;
}

/**
Expand All @@ -128,13 +142,13 @@ public String getSourceIndex() {
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to
* to be active before returning. Check {@link ResizeResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards);
this.getTargetIndexRequest().waitForActiveShards(waitForActiveShards);
}

/**
Expand All @@ -145,4 +159,18 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
public void setWaitForActiveShards(final int waitForActiveShards) {
setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

/**
* The type of the resize operation
*/
public void setResizeType(ResizeType type) {
this.type = Objects.requireNonNull(type);
}

/**
* Returns the type of the resize operation
*/
public ResizeType getResizeType() {
return type;
}
}
Loading