Skip to content

Commit

Permalink
Do not mutate request on scripted upsert (#49578)
Browse files Browse the repository at this point in the history
Fixes a bug where a scripted upsert that causes a dynamic mapping update is retried (because
mapping update is still in-flight), and the request is mutated multiple times.

Closes #48670
  • Loading branch information
ywelsch committed Nov 27, 2019
1 parent bd00727 commit 0a73ba0
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult
* Execute a scripted upsert, where there is an existing upsert document and a script to be executed. The script is executed and a new
* Tuple of operation and updated {@code _source} is returned.
*/
Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(IndexRequest upsert, Script script, LongSupplier nowInMillis) {
Map<String, Object> upsertDoc = upsert.sourceAsMap();
Tuple<UpdateOpType, Map<String, Object>> executeScriptedUpsert(Map<String, Object> upsertDoc, Script script, LongSupplier nowInMillis) {
Map<String, Object> ctx = new HashMap<>(3);
// Tell the script that this is a create and not an update
ctx.put(ContextFields.OP, UpdateOpType.CREATE.toString());
Expand Down Expand Up @@ -132,11 +131,11 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
if (request.scriptedUpsert() && request.script() != null) {
// Run the script to perform the create logic
IndexRequest upsert = request.upsertRequest();
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert, request.script, nowInMillis);
Tuple<UpdateOpType, Map<String, Object>> upsertResult = executeScriptedUpsert(upsert.sourceAsMap(), request.script,
nowInMillis);
switch (upsertResult.v1()) {
case CREATE:
// Update the index request with the new "_source"
indexRequest.source(upsertResult.v2());
indexRequest = Requests.indexRequest(request.index()).source(upsertResult.v2());
break;
case NONE:
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,43 @@ public void testBulkUpdateSimple() throws Exception {
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L));
}

public void testBulkUpdateWithScriptedUpsertAndDynamicMappingUpdate() throws Exception {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
ensureGreen();

final Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "ctx._source.field += 1", Collections.emptyMap());

BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareUpdate().setIndex(indexOrAlias()).setId("1")
.setScript(script).setScriptedUpsert(true).setUpsert("field", 1))
.add(client().prepareUpdate().setIndex(indexOrAlias()).setId("2")
.setScript(script).setScriptedUpsert(true).setUpsert("field", 1))
.get();

logger.info(bulkResponse.buildFailureMessage());

assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(2));
for (BulkItemResponse bulkItemResponse : bulkResponse) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
}
assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("2"));
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L));

GetResponse getResponse = client().prepareGet().setIndex("test").setId("1").execute()
.actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(1L));
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L));

getResponse = client().prepareGet().setIndex("test").setId("2").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(1L));
assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L));
}

public void testBulkWithCAS() throws Exception {
createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build());
ensureGreen();
Expand Down

0 comments on commit 0a73ba0

Please sign in to comment.