From 0a73ba05de30b67b66e3bea84ac6e59d2730de1d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 27 Nov 2019 09:23:42 +0100 Subject: [PATCH] Do not mutate request on scripted upsert (#49578) Fixes a bug where a scripted upsert that causes a dynamic mapping update is retried (because mapping update is still in-flight), and the request is mutated multiple times. Closes #48670 --- .../action/update/UpdateHelper.java | 9 ++--- .../action/bulk/BulkWithUpdatesIT.java | 37 +++++++++++++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 7e0ff56f80442..dde760fefa157 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -98,8 +98,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult * Execute a scripted upsert, where there is an existing upsert document and a script to be executed. The script is executed and a new * Tuple of operation and updated {@code _source} is returned. */ - Tuple> executeScriptedUpsert(IndexRequest upsert, Script script, LongSupplier nowInMillis) { - Map upsertDoc = upsert.sourceAsMap(); + Tuple> executeScriptedUpsert(Map upsertDoc, Script script, LongSupplier nowInMillis) { Map ctx = new HashMap<>(3); // Tell the script that this is a create and not an update ctx.put(ContextFields.OP, UpdateOpType.CREATE.toString()); @@ -132,11 +131,11 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get if (request.scriptedUpsert() && request.script() != null) { // Run the script to perform the create logic IndexRequest upsert = request.upsertRequest(); - Tuple> upsertResult = executeScriptedUpsert(upsert, request.script, nowInMillis); + Tuple> upsertResult = executeScriptedUpsert(upsert.sourceAsMap(), request.script, + nowInMillis); switch (upsertResult.v1()) { case CREATE: - // Update the index request with the new "_source" - indexRequest.source(upsertResult.v2()); + indexRequest = Requests.indexRequest(request.index()).source(upsertResult.v2()); break; case NONE: UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java index 4e71860b333c0..f9f59fd9658cf 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkWithUpdatesIT.java @@ -199,6 +199,43 @@ public void testBulkUpdateSimple() throws Exception { assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(4L)); } + public void testBulkUpdateWithScriptedUpsertAndDynamicMappingUpdate() throws Exception { + assertAcked(prepareCreate("test").addAlias(new Alias("alias"))); + ensureGreen(); + + final Script script = new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "ctx._source.field += 1", Collections.emptyMap()); + + BulkResponse bulkResponse = client().prepareBulk() + .add(client().prepareUpdate().setIndex(indexOrAlias()).setId("1") + .setScript(script).setScriptedUpsert(true).setUpsert("field", 1)) + .add(client().prepareUpdate().setIndex(indexOrAlias()).setId("2") + .setScript(script).setScriptedUpsert(true).setUpsert("field", 1)) + .get(); + + logger.info(bulkResponse.buildFailureMessage()); + + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(2)); + for (BulkItemResponse bulkItemResponse : bulkResponse) { + assertThat(bulkItemResponse.getIndex(), equalTo("test")); + } + assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1")); + assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("2")); + assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(1L)); + + GetResponse getResponse = client().prepareGet().setIndex("test").setId("1").execute() + .actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(1L)); + assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L)); + + getResponse = client().prepareGet().setIndex("test").setId("2").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(1L)); + assertThat(((Number) getResponse.getSource().get("field")).longValue(), equalTo(2L)); + } + public void testBulkWithCAS() throws Exception { createIndex("test", Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build()); ensureGreen();