Skip to content

Commit

Permalink
Allow tests to override mapping validation
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Nov 1, 2019
1 parent 1e3983c commit 148206d
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

public interface MappingUpdatePerformer {

/**
* Update the mappings on the master.
*/
void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener);

/**
* Validate the mapping update locally. Throws an unchecked exception of some kind if the mapping update is invalid.
*/
void validateMappings(Mapping update, String type) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -121,10 +122,20 @@ protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard prim
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis,
(update, shardId, type, mappingListener) -> {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener);
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> mappingListener) {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener);
}

@Override
public void validateMappings(Mapping update, String type) throws IOException {
primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(update, XContentType.JSON, ToXContent.EMPTY_PARAMS),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
}
},
mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand Down Expand Up @@ -267,9 +278,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
primary.mapperService().merge("_doc",
new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
mappingUpdater.validateMappings(result.getRequiredMappingUpdate(), MapperService.SINGLE_MAPPING_NAME);
} catch (Exception e) {
logger.info("required mapping update failed during pre-flight check", e);
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,18 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
AtomicInteger updateCalled = new AtomicInteger();
TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis,
(update, shardId, type, listener) -> {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
}

@Override
public void validateMappings(Mapping update, String type) {
}
}, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER);
assertTrue(context.isInitial());
assertTrue(context.hasMoreOperationsToExecute());
Expand All @@ -266,7 +273,16 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
.thenReturn(success);

TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis,
(update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {},
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
fail("should not have had to update the mappings");
}

@Override
public void validateMappings(Mapping update, String type) {
}
}, listener -> {},
ASSERTING_DONE_LISTENER);


Expand Down Expand Up @@ -856,6 +872,10 @@ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
listener.onResponse(null);
}

@Override
public void validateMappings(Mapping update, String type) {
}
}

/** Always throw the given exception */
Expand All @@ -870,5 +890,9 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer {
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
listener.onFailure(e);
}

@Override
public void validateMappings(Mapping update, String type) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.index.engine.DocIdSeqNoAndSource;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
Expand Down Expand Up @@ -776,7 +777,15 @@ private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest requ
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request);
try (Releasable ignored = permitAcquiredFuture.actionGet()) {
MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {};
MappingUpdatePerformer noopMappingUpdater = new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener1) {
}

@Override
public void validateMappings(Mapping update, String type) {
}
};
TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater,
null, ActionTestUtils.assertNoFailureListener(result -> {
TransportWriteActionTestHelper.performPostWriteActions(primary, request,
Expand Down

0 comments on commit 148206d

Please sign in to comment.