Skip to content

Commit

Permalink
Add Seq# based optimistic concurrency control to UpdateRequest (#37872)
Browse files Browse the repository at this point in the history
The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations.

Relates #36148 
Relates #10708
  • Loading branch information
bleskes committed Jan 29, 2019
1 parent 5d1964b commit 65a9b61
Show file tree
Hide file tree
Showing 20 changed files with 450 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.rest.action.document.RestDeleteAction;
import org.elasticsearch.rest.action.document.RestGetAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestMultiGetAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
Expand All @@ -90,8 +90,10 @@
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -606,22 +608,46 @@ public void testUpdate() throws IOException {
IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
assertEquals(RestStatus.CREATED, indexResponse.status());

UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));

UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());

UpdateRequest updateRequestConflict = new UpdateRequest("index", "id");
updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values()));
updateRequestConflict.version(indexResponse.getVersion());
long lastUpdateSeqNo;
long lastUpdatePrimaryTerm;
{
UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
lastUpdateSeqNo = updateResponse.getSeqNo();
lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm();
assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L));
assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L));
}

ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: version conflict, " +
"current version [2] is different than the one provided [1]]", exception.getMessage());
{
final UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values()));
if (randomBoolean()) {
updateRequest.setIfSeqNo(lastUpdateSeqNo + 1);
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
} else {
updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1));
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1);
}
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync));
assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status());
assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception"));
}
{
final UpdateRequest updateRequest = new UpdateRequest("index", "id");
updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values()));
updateRequest.setIfSeqNo(lastUpdateSeqNo);
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
assertEquals(RestStatus.OK, updateResponse.status());
assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo());
assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm());
}
}
{
IndexRequest indexRequest = new IndexRequest("index").id("with_script");
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/docs/delete.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ The result of the above delete operation is:
[[optimistic-concurrency-control-delete]]
=== Optimistic concurrency control

Delete operations can be made optional and only be performed if the last
Delete operations can be made conditional and only be performed if the last
modification to the document was assigned the sequence number and primary
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
mismatch is detected, the operation will result in a `VersionConflictException`
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/docs/index_.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ The result of the above index operation is:
[[optimistic-concurrency-control-index]]
=== Optimistic concurrency control

Index operations can be made optional and only be performed if the last
Index operations can be made conditional and only be performed if the last
modification to the document was assigned the sequence number and primary
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
mismatch is detected, the operation will result in a `VersionConflictException`
Expand Down
8 changes: 8 additions & 0 deletions docs/reference/docs/update.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,11 @@ version numbers being out of sync with the external system. Use the
<<docs-index_,`index` API>> instead.
=====================================================

`if_seq_no` and `if_primary_term`::

Update operations can be made conditional and only be performed if the last
modification to the document was assigned the sequence number and primary
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
mismatch is detected, the operation will result in a `VersionConflictException`
and a status code of 409. See <<optimistic-concurrency-control>> for more details.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@
"type": "time",
"description": "Explicit operation timeout"
},
"if_seq_no" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified sequence number"
},
"if_primary_term" : {
"type" : "number",
"description" : "only perform the update operation if the last operation that has changed the document has the specified primary term"
},
"version": {
"type": "number",
"description": "Explicit version number for concurrency control"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
"Update with if_seq_no":

- skip:
version: " - 6.99.99"
reason: if_seq_no was added in 7.0

- do:
catch: missing
update:
index: test_1
id: 1
if_seq_no: 1
if_primary_term: 1
body:
doc: { foo: baz }

- do:
index:
index: test_1
id: 1
body:
foo: baz

- do:
catch: conflict
update:
index: test_1
id: 1
if_seq_no: 234
if_primary_term: 1
body:
doc: { foo: baz }

- do:
update:
index: test_1
id: 1
if_seq_no: 0
if_primary_term: 1
body:
doc: { foo: bar }

- do:
get:
index: test_1
id: 1

- match: { _source: { foo: bar } }

- do:
bulk:
body:
- update:
_index: test_1
_id: 1
if_seq_no: 100
if_primary_term: 200
- doc:
foo: baz

- match: { errors: true }
- match: { items.0.update.status: 409 }

64 changes: 64 additions & 0 deletions server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;

import java.io.IOException;
import java.util.Locale;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* Generic interface to group ActionRequest, which perform writes to a single document
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
Expand Down Expand Up @@ -117,6 +122,39 @@ public interface DocWriteRequest<T> extends IndicesRequest {
*/
T versionType(VersionType versionType);

/**
* only perform this request if the document was last modification was assigned the given
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
*
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
T setIfSeqNo(long seqNo);

/**
* only performs this request if the document was last modification was assigned the given
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
T setIfPrimaryTerm(long term);

/**
* If set, only perform this request if the document was last modification was assigned this sequence number.
* If the document last modification was assigned a different sequence number a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
long ifSeqNo();

/**
* If set, only perform this request if the document was last modification was assigned this primary term.
*
* If the document last modification was assigned a different term a
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
*/
long ifPrimaryTerm();

/**
* Get the requested document operation type of the request
* @return the operation type {@link OpType}
Expand Down Expand Up @@ -216,4 +254,30 @@ static void writeDocumentRequest(StreamOutput out, DocWriteRequest<?> request)
throw new IllegalStateException("invalid request [" + request.getClass().getSimpleName() + " ]");
}
}

static ActionRequestValidationException validateSeqNoBasedCASParams(
DocWriteRequest request, ActionRequestValidationException validationException) {
if (request.versionType().validateVersionForWrites(request.version()) == false) {
validationException = addValidationError("illegal version value [" + request.version() + "] for version type ["
+ request.versionType().name() + "]", validationException);
}
if (request.versionType() == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}

if (request.ifSeqNo() != UNASSIGNED_SEQ_NO && (
request.versionType() != VersionType.INTERNAL || request.version() != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (request.ifPrimaryTerm() == UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (request.ifPrimaryTerm() != UNASSIGNED_PRIMARY_TERM && request.ifSeqNo() == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + request.ifPrimaryTerm() + "]", validationException);
}

return validationException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
} else if ("update".equals(action)) {
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.routing(routing);
// EMPTY is safe here because we never call namedObject
try (InputStream dataStream = sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType).streamInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,8 @@ public ActionRequestValidationException validate() {
if (Strings.isEmpty(id)) {
validationException = addValidationError("id is missing", validationException);
}
if (versionType.validateVersionForWrites(version) == false) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["
+ versionType.name() + "]", validationException);
}
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}

if (ifSeqNo != UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}

if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);

return validationException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,7 @@ public ActionRequestValidationException validate() {
addValidationError("an id is required for a " + opType() + " operation", validationException);
}

if (!versionType.validateVersionForWrites(resolvedVersion)) {
validationException = addValidationError("illegal version value [" + resolvedVersion + "] for version type ["
+ versionType.name() + "]", validationException);
}

if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}
validationException = DocWriteRequest.validateSeqNoBasedCASParams(this, validationException);

if (id != null && id.getBytes(StandardCharsets.UTF_8).length > 512) {
validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
Expand All @@ -210,18 +203,6 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}

if (ifSeqNo != UNASSIGNED_SEQ_NO && (
versionType != VersionType.INTERNAL || version != Versions.MATCH_ANY
)) {
validationException = addValidationError("compare and write operations can not use versioning", validationException);
}
if (ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM && ifSeqNo != UNASSIGNED_SEQ_NO) {
validationException = addValidationError("ifSeqNo is set, but primary term is [0]", validationException);
}
if (ifPrimaryTerm != UNASSIGNED_PRIMARY_TERM && ifSeqNo == UNASSIGNED_SEQ_NO) {
validationException =
addValidationError("ifSeqNo is unassigned, but primary term is [" + ifPrimaryTerm + "]", validationException);
}

return validationException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public UpdateHelper(ScriptService scriptService) {
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {
final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(),
request.versionType());
final GetResult getResult = indexShard.getService().getForUpdate(
request.type(), request.id(), request.version(), request.versionType(), request.ifSeqNo(), request.ifPrimaryTerm());
return prepare(indexShard.shardId(), request, getResult, nowInMillis);
}

Expand Down
Loading

0 comments on commit 65a9b61

Please sign in to comment.