Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Local checkpoints #15111

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b364cf5
Introduce Primary Terms
bleskes Oct 9, 2015
9541fb0
Merge branch 'master' into feature/seq_no
bleskes Nov 2, 2015
245a402
Merge branch 'master' into feature/seq_no
bleskes Nov 10, 2015
57e36db
Merge branch 'master' into feature/seq_no
bleskes Nov 13, 2015
a91a6f6
Merge branch 'master' into feature/seq_no
bleskes Nov 13, 2015
1e5af7b
Merge branch 'master' into feature/seq_no
bleskes Nov 15, 2015
d68b810
Merge branch 'master' into feature/seq_no
bleskes Nov 19, 2015
1e67ef8
Merge branch 'master' into feature/seq_no
bleskes Nov 19, 2015
5fb0f9a
Add Sequence Numbers and enforce Primary Terms
bleskes Nov 3, 2015
693641d
Merge branch 'master' into feature/seq_no
bleskes Nov 19, 2015
bc87855
trace logging
bleskes Nov 19, 2015
2e1e430
Disable RecoveryWhileUnderLoadIT for now
bleskes Nov 20, 2015
04be4e4
Merge branch 'master' into feature/seq_no
bleskes Nov 20, 2015
ee10c5f
Merge branch 'master' into feature/seq_no
bleskes Nov 20, 2015
ba80752
Merge branch 'master' into feature/seq_no
bleskes Nov 23, 2015
8e1ef30
re enable RecoveryWhileUnderLoadIT now that #14918 is merged.
bleskes Nov 23, 2015
e2e38b3
merge master
bleskes Nov 23, 2015
8de24c2
merge master
bleskes Nov 23, 2015
3d061a8
Introduce a relocated state during primary relocation
bleskes Nov 23, 2015
bffd55d
merge from master
bleskes Nov 24, 2015
95e8a39
merge master
bleskes Dec 7, 2015
68f1a87
Merge branch 'master' into feature/seq_no
bleskes Dec 10, 2015
b836781
merge for master
bleskes Dec 11, 2015
30ad721
Initial implementation
bleskes Nov 17, 2015
9aea462
Add tests plus other changes
bleskes Nov 29, 2015
1ef8a0d
Java Docs!
bleskes Nov 30, 2015
4edb7aa
tweak
bleskes Nov 30, 2015
0b50630
Another checkpoint implementation
bleskes Dec 10, 2015
7f49c1a
feedback
bleskes Dec 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions core/src/main/java/org/elasticsearch/action/DocWriteResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;

Expand All @@ -38,11 +38,13 @@ public abstract class DocWriteResponse extends ReplicationResponse implements St
private String id;
private String type;
private long version;
private long seqNo;

public DocWriteResponse(ShardId shardId, String type, String id, long version) {
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long version) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.seqNo = seqNo;
this.version = version;
}

Expand Down Expand Up @@ -86,19 +88,27 @@ public long getVersion() {
return this.version;
}

/**
* Returns the sequence number assigned for this change. Returns {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} if the operation wasn't
* performed (i.e., an update operation that resulted in a NOOP).
*/
public long getSeqNo() {
return seqNo;
}

/** returns the rest status for this response (based on {@link ShardInfo#status()} */
public RestStatus status() {
return getShardInfo().status();
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
type = in.readString();
id = in.readString();
version = in.readZLong();
seqNo = in.readZLong();
}

@Override
Expand All @@ -108,13 +118,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeString(id);
out.writeZLong(version);
out.writeZLong(seqNo);
}

static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString _SHARD_ID = new XContentBuilderString("_shard_id");
static final XContentBuilderString _SEQ_NO = new XContentBuilderString("_seq_no");
}

@Override
Expand All @@ -125,6 +138,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(Fields._ID, id)
.field(Fields._VERSION, version);
shardInfo.toXContent(builder, params);
//nocommit: i'm not sure we want to expose it in the api but it will be handy for debugging while we work...
builder.field(Fields._SHARD_ID, shardId.id());
if (getSeqNo() >= 0) {
builder.field(Fields._SEQ_NO, getSeqNo());
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -105,7 +105,8 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
// only report on fully started shards
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats()));
shardsStats.add(new ShardStats(indexShard.routingEntry(), indexShard.shardPath(),
new CommonStats(indexShard, SHARD_STATS_FLAGS), indexShard.commitStats(), indexShard.seqNoStats()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ public static enum Flag {
RequestCache("request_cache"),
Recovery("recovery");


private final String restName;

Flag(String restName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardPath;

import java.io.IOException;
Expand All @@ -42,20 +42,23 @@ public class ShardStats implements Streamable, ToXContent {
private CommonStats commonStats;
@Nullable
private CommitStats commitStats;
@Nullable
private SeqNoStats seqNoStats;
private String dataPath;
private String statePath;
private boolean isCustomDataPath;

ShardStats() {
}

public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats) {
public ShardStats(ShardRouting routing, ShardPath shardPath, CommonStats commonStats, CommitStats commitStats, SeqNoStats seqNoStats) {
this.shardRouting = routing;
this.dataPath = shardPath.getRootDataPath().toString();
this.statePath = shardPath.getRootStatePath().toString();
this.isCustomDataPath = shardPath.isCustomDataPath();
this.commitStats = commitStats;
this.commonStats = commonStats;
this.seqNoStats = seqNoStats;
}

/**
Expand All @@ -73,6 +76,11 @@ public CommitStats getCommitStats() {
return this.commitStats;
}

@Nullable
public SeqNoStats getSeqNoStats() {
return this.seqNoStats;
}

public String getDataPath() {
return dataPath;
}
Expand All @@ -99,6 +107,7 @@ public void readFrom(StreamInput in) throws IOException {
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalStreamableReader(SeqNoStats::new);
}

@Override
Expand All @@ -109,6 +118,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);
out.writeOptionalWritable(seqNoStats);
}

@Override
Expand All @@ -124,6 +134,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (commitStats != null) {
commitStats.toXContent(builder, params);
}
if (seqNoStats != null) {
seqNoStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARD_PATH);
builder.field(Fields.STATE_PATH, statePath);
builder.field(Fields.DATA_PATH, dataPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting sh
flags.set(CommonStatsFlags.Flag.Recovery);
}

return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(), new CommonStats(indexShard, flags), indexShard.commitStats());
return new ShardStats(indexShard.routingEntry(), indexShard.shardPath(),
new CommonStats(indexShard, flags), indexShard.commitStats(), indexShard.seqNoStats());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Met
IndexResponse indexResponse = result.response();
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
} catch (Throwable e) {
// nocommit: since we now have RetryOnPrimaryException, retrying doesn't always mean the shard is closed.
// some operations were already perform and have a seqno assigned. we shouldn't just reindex them
// if we have a pending mapping update
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
Expand Down Expand Up @@ -157,6 +160,8 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Met
location = locationToSync(location, writeResult.location);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
} catch (Throwable e) {
// nocommit: since we now have RetryOnPrimaryException, retrying doesn't always mean the shard is closed.
// some operations were already perform and have a seqno assigned. we shouldn't just reindex them
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
Expand Down Expand Up @@ -204,7 +209,8 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Met
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(),
indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(),indexResponse.getVersion(), indexResponse.isCreated());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
Expand All @@ -216,7 +222,8 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Met
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
DeleteResponse response = writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(),
response.getId(), response.getSeqNo(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
Expand All @@ -238,6 +245,8 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Met
new BulkItemResponse.Failure(request.index(), updateRequest.type(), updateRequest.id(), t)));
}
} else {
// nocommit: since we now have RetryOnPrimaryException, retrying doesn't always mean the shard is closed.
// some operations were already perform and have a seqno assigned. we shouldn't just reindex them
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(t)) {
// restore updated versions...
Expand Down Expand Up @@ -308,7 +317,7 @@ private void setResponse(BulkItemRequest request, BulkItemResponse response) {
}
}

private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, MetaData metaData,
private WriteResult<IndexResponse> shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, MetaData metaData,
IndexShard indexShard, boolean processed) throws Throwable {

// validate, if routing is required, that we got routing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ public DeleteResponse() {

}

public DeleteResponse(ShardId shardId, String type, String id, long version, boolean found) {
super(shardId, type, id, version);
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean found) {
super(shardId, type, id, seqNo, version);
this.found = found;
}


/**
* Returns <tt>true</tt> if a doc was found to delete.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,16 @@ public static WriteResult<DeleteResponse> executeDeleteRequestOnPrimary(DeleteRe
// update the request with the version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
request.seqNo(delete.seqNo());

assert request.versionType().validateVersionForWrites(request.version());
return new WriteResult<>(
new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found()),
new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.seqNo(), delete.version(), delete.found()),
delete.getTranslogLocation());
}

public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.version(), request.versionType());
Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.seqNo(), request.version(), request.versionType());
indexShard.delete(delete);
return delete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ public class IndexResponse extends DocWriteResponse {
private boolean created;

public IndexResponse() {

}

public IndexResponse(ShardId shardId, String type, String id, long version, boolean created) {
super(shardId, type, id, version);
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) {
super(shardId, type, id, seqNo, version);
this.created = created;
}

Expand Down Expand Up @@ -84,6 +83,7 @@ public String toString() {
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",created=").append(created);
builder.append(",seqNo=").append(getSeqNo());
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public static Engine.Index executeIndexRequestOnReplica(IndexRequest request, In
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).index(shardId.getIndex()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType());
final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType());
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
Expand Down Expand Up @@ -219,11 +219,11 @@ public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexReque
final long version = operation.version();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.seqNo(operation.seqNo());

assert request.versionType().validateVersionForWrites(request.version());

return new WriteResult<>(new IndexResponse(shardId, request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
return new WriteResult<>(new IndexResponse(shardId, request.type(), request.id(), request.seqNo(), request.version(), created), operation.getTranslogLocation());
}

}

Loading