Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preflight check to dynamic mapping updates #48817

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

public interface MappingUpdatePerformer {

/**
* Update the mappings on the master.
*/
void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener);

/**
* Validate the mapping update locally. Throws an unchecked exception of some kind if the mapping update is invalid.
*/
void validateMappings(Mapping update, String type) throws IOException;
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
Expand Down Expand Up @@ -119,10 +122,20 @@ protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard prim
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis,
(update, shardId, type, mappingListener) -> {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener);
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> mappingListener) {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener);
}

@Override
public void validateMappings(Mapping update, String type) throws IOException {
primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(update, XContentType.JSON, ToXContent.EMPTY_PARAMS),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
}
},
mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
Expand Down Expand Up @@ -263,6 +276,15 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
mappingUpdater.validateMappings(result.getRequiredMappingUpdate(), MapperService.SINGLE_MAPPING_NAME);
} catch (Exception e) {
logger.info("required mapping update failed during pre-flight check", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you log the index name here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done in 891e56a.

onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
return true;
}

mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
MapperService.SINGLE_MAPPING_NAME,
new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
* The reason why a mapping is being merged.
*/
public enum MergeReason {
/**
* Pre-flight check before sending a mapping update to the master
*/
MAPPING_UPDATE_PREFLIGHT,
/**
* Create or update a mapping.
*/
Expand Down Expand Up @@ -306,6 +310,7 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge

private synchronized Map<String, DocumentMapper> internalMerge(IndexMetaData indexMetaData,
MergeReason reason, boolean onlyUpdateIfNeeded) {
assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT;
Map<String, CompressedXContent> map = new LinkedHashMap<>();
MappingMetaData mappingMetaData = indexMetaData.mapping();
if (mappingMetaData != null) {
Expand Down Expand Up @@ -415,7 +420,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma

ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get);

if (reason == MergeReason.MAPPING_UPDATE) {
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
// this check will only be performed on the master node when there is
// a call to the update mapping API. For all other cases like
// the master node restoring mappings from disk or data nodes
Expand All @@ -430,7 +435,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
results.put(newMapper.type(), newMapper);
}

if (reason == MergeReason.MAPPING_UPDATE) {
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
// this check will only be performed on the master node when there is
// a call to the update mapping API. For all other cases like
// the master node restoring mappings from disk or data nodes
Expand All @@ -453,6 +458,10 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
// make structures immutable
results = Collections.unmodifiableMap(results);

if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
return results;
}

// only need to immutably rewrap these if the previous reference was changed.
// if not then they are already implicitly immutable.
if (fullPathObjectMappers != this.fullPathObjectMappers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,18 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard);
AtomicInteger updateCalled = new AtomicInteger();
TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis,
(update, shardId, type, listener) -> {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
listener.onResponse(null);
}

@Override
public void validateMappings(Mapping update, String type) {
}
}, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER);
assertTrue(context.isInitial());
assertTrue(context.hasMoreOperationsToExecute());
Expand All @@ -266,7 +273,16 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
.thenReturn(success);

TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis,
(update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {},
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
fail("should not have had to update the mappings");
}

@Override
public void validateMappings(Mapping update, String type) {
}
}, listener -> {},
ASSERTING_DONE_LISTENER);


Expand Down Expand Up @@ -856,6 +872,10 @@ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
listener.onResponse(null);
}

@Override
public void validateMappings(Mapping update, String type) {
}
}

/** Always throw the given exception */
Expand All @@ -870,5 +890,9 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer {
public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener<Void> listener) {
listener.onFailure(e);
}

@Override
public void validateMappings(Mapping update, String type) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
Expand All @@ -34,6 +40,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;

public class DynamicMappingIT extends ESIntegTestCase {

@Override
Expand Down Expand Up @@ -116,4 +124,38 @@ public void run() {
assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists());
}
}

public void testPreflightCheckAvoidsMaster() throws InterruptedException {
createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build());
ensureGreen("index");
client().prepareIndex("index").setId("1").setSource("field1", "value1").get();

final CountDownLatch masterBlockedLatch = new CountDownLatch(1);
final CountDownLatch indexingCompletedLatch = new CountDownLatch(1);

internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
masterBlockedLatch.countDown();
indexingCompletedLatch.await();
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
throw new AssertionError("unexpected", e);
}
});

masterBlockedLatch.await();
final IndexRequestBuilder indexRequestBuilder = client().prepareIndex("index").setId("2").setSource("field2", "value2");
try {
assertThat(
expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(),
Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded"));
} finally {
indexingCompletedLatch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class MapperServiceTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -99,6 +101,15 @@ public void testTypeValidation() {
MapperService.validateTypeName("_doc"); // no exception
}

public void testPreflightUpdateDoesNotChangeMapping() throws Throwable {
final MapperService mapperService = createIndex("test1").mapperService();
final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1);
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT);
assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue());
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue());
}

/**
* Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit".
* Any additional field should trigger an IllegalArgumentException.
Expand All @@ -113,7 +124,7 @@ public void testTotalFieldsLimit() throws Throwable {
// adding one more field should trigger exception
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
createIndex("test2", settings).mapperService().merge("type",
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE);
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight());
});
assertTrue(e.getMessage(),
e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded"));
Expand Down Expand Up @@ -149,7 +160,7 @@ public void testMappingDepthExceedsLimit() throws Throwable {
indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE));
() -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight()));
assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded"));
}

Expand Down Expand Up @@ -197,7 +208,7 @@ public void testIndexSortWithNestedFields() throws IOException {
.endObject().endObject()));
invalidNestedException = expectThrows(IllegalArgumentException.class,
() -> indexService.mapperService().merge("t", nestedFieldMapping,
MergeReason.MAPPING_UPDATE));
updateOrPreflight()));
assertThat(invalidNestedException.getMessage(),
containsString("cannot have nested fields when index sort is activated"));
}
Expand Down Expand Up @@ -233,7 +244,7 @@ public void testFieldAliasWithMismatchedNestedScope() throws Throwable {
.endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE));
() -> mapperService.merge("type", mappingUpdate, updateOrPreflight()));
assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]"));
}

Expand Down Expand Up @@ -261,7 +272,7 @@ public void testTotalFieldsLimitWithFieldAlias() throws Throwable {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
createIndex("test2",
Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build())
.mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE);
.mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight());
});
assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage());
}
Expand Down Expand Up @@ -294,7 +305,7 @@ public void testFieldNameLengthLimit() throws Throwable {
.endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mappingUpdate, updateOrPreflight());
});

assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
Expand All @@ -319,7 +330,7 @@ public void testObjectNameLengthLimit() throws Throwable {
.endObject().endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mapping, updateOrPreflight());
});

assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
Expand Down Expand Up @@ -348,7 +359,7 @@ public void testAliasFieldNameLengthLimit() throws Throwable {
.endObject().endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mapping, updateOrPreflight());
});

assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
Expand Down Expand Up @@ -439,6 +450,10 @@ private boolean assertSameContainedFilters(TokenFilterFactory[] originalTokenFil
return true;
}

private static MergeReason updateOrPreflight() {
return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT);
}

public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin {

@Override
Expand Down
Loading