From 148206d857d834e56b1e27862e2d2bce51a805c0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 1 Nov 2019 17:36:57 +0000 Subject: [PATCH] Allow tests to override mapping validation --- .../action/bulk/MappingUpdatePerformer.java | 6 ++++ .../action/bulk/TransportShardBulkAction.java | 23 ++++++++---- .../bulk/TransportShardBulkActionTests.java | 36 +++++++++++++++---- .../ESIndexLevelReplicationTestCase.java | 11 +++++- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java index 5a38f0f43e070..65cf2465f5388 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java @@ -23,6 +23,8 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.shard.ShardId; +import java.io.IOException; + public interface MappingUpdatePerformer { /** @@ -30,4 +32,8 @@ public interface MappingUpdatePerformer { */ void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener); + /** + * Validate the mapping update locally. Throws an unchecked exception of some kind if the mapping update is invalid. + */ + void validateMappings(Mapping update, String type) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 264595dd3a06f..c58af1479d640 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -62,6 +62,7 @@ import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -121,10 +122,20 @@ protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard prim ActionListener> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, - (update, shardId, type, mappingListener) -> { - assert update != null; - assert shardId != null; - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener mappingListener) { + assert update != null; + assert shardId != null; + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + } + + @Override + public void validateMappings(Mapping update, String type) throws IOException { + primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME, + new CompressedXContent(update, XContentType.JSON, ToXContent.EMPTY_PARAMS), + MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); + } }, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override @@ -267,9 +278,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { try { - primary.mapperService().merge("_doc", - new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS), - MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT); + mappingUpdater.validateMappings(result.getRequiredMappingUpdate(), MapperService.SINGLE_MAPPING_NAME); } catch (Exception e) { logger.info("required mapping update failed during pre-flight check", e); onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 5d0affeb02b3c..69988430b0d73 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -248,11 +248,18 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type, listener) -> { - // There should indeed be a mapping update - assertNotNull(update); - updateCalled.incrementAndGet(); - listener.onResponse(null); + new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + // There should indeed be a mapping update + assertNotNull(update); + updateCalled.incrementAndGet(); + listener.onResponse(null); + } + + @Override + public void validateMappings(Mapping update, String type) { + } }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertTrue(context.isInitial()); assertTrue(context.hasMoreOperationsToExecute()); @@ -266,7 +273,16 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {}, + new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + fail("should not have had to update the mappings"); + } + + @Override + public void validateMappings(Mapping update, String type) { + } + }, listener -> {}, ASSERTING_DONE_LISTENER); @@ -856,6 +872,10 @@ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { listener.onResponse(null); } + + @Override + public void validateMappings(Mapping update, String type) { + } } /** Always throw the given exception */ @@ -870,5 +890,9 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { listener.onFailure(e); } + + @Override + public void validateMappings(Mapping update, String type) { + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 7a757f5d2ab66..b3391a1a8055e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -69,6 +69,7 @@ import org.elasticsearch.index.engine.DocIdSeqNoAndSource; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; @@ -776,7 +777,15 @@ private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest requ final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); try (Releasable ignored = permitAcquiredFuture.actionGet()) { - MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {}; + MappingUpdatePerformer noopMappingUpdater = new MappingUpdatePerformer() { + @Override + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener1) { + } + + @Override + public void validateMappings(Mapping update, String type) { + } + }; TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, null, ActionTestUtils.assertNoFailureListener(result -> { TransportWriteActionTestHelper.performPostWriteActions(primary, request,