Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preflight check to dynamic mapping updates #48817

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -263,6 +265,17 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
primary.mapperService().merge("_doc",
new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unfortunate that we have to compress it here, just to uncompress and parse it again in the next step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the interface to the MapperService is not what I expected.

MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
} catch (Exception e) {
logger.info("required mapping update failed during pre-flight check", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you log the index name here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done in 891e56a.

onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
return true;
}

mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
MapperService.SINGLE_MAPPING_NAME,
new ActionListener<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
* The reason why a mapping is being merged.
*/
public enum MergeReason {
/**
* Pre-flight check before sending a mapping update to the master
*/
MAPPING_UPDATE_PREFLIGHT,
/**
* Create or update a mapping.
*/
Expand Down Expand Up @@ -306,6 +310,7 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge

private synchronized Map<String, DocumentMapper> internalMerge(IndexMetaData indexMetaData,
MergeReason reason, boolean onlyUpdateIfNeeded) {
assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT;
Map<String, CompressedXContent> map = new LinkedHashMap<>();
MappingMetaData mappingMetaData = indexMetaData.mapping();
if (mappingMetaData != null) {
Expand Down Expand Up @@ -415,7 +420,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma

ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get);

if (reason == MergeReason.MAPPING_UPDATE) {
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
// this check will only be performed on the master node when there is
// a call to the update mapping API. For all other cases like
// the master node restoring mappings from disk or data nodes
Expand All @@ -430,7 +435,7 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
results.put(newMapper.type(), newMapper);
}

if (reason == MergeReason.MAPPING_UPDATE) {
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
// this check will only be performed on the master node when there is
// a call to the update mapping API. For all other cases like
// the master node restoring mappings from disk or data nodes
Expand All @@ -453,6 +458,10 @@ private synchronized Map<String, DocumentMapper> internalMerge(DocumentMapper ma
// make structures immutable
results = Collections.unmodifiableMap(results);

if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
return results;
}

// only need to immutably rewrap these if the previous reference was changed.
// if not then they are already implicitly immutable.
if (fullPathObjectMappers != this.fullPathObjectMappers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -233,14 +235,15 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);

IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(mappingUpdate);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

randomlySetIgnoredPrimaryResponse(items[0]);

Expand Down Expand Up @@ -761,7 +764,7 @@ public void testRetries() throws Exception {
"I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);
Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);

Expand All @@ -778,6 +781,7 @@ public void testRetries() throws Exception {
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
Expand All @@ -34,6 +40,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;

public class DynamicMappingIT extends ESIntegTestCase {

@Override
Expand Down Expand Up @@ -116,4 +124,38 @@ public void run() {
assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists());
}
}

public void testPreflightCheckAvoidsMaster() throws InterruptedException {
createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build());
ensureGreen("index");
client().prepareIndex("index").setId("1").setSource("field1", "value1").get();

final CountDownLatch masterBlockedLatch = new CountDownLatch(1);
final CountDownLatch indexingCompletedLatch = new CountDownLatch(1);

internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
masterBlockedLatch.countDown();
indexingCompletedLatch.await();
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
throw new AssertionError("unexpected", e);
}
});

masterBlockedLatch.await();
final IndexRequestBuilder indexRequestBuilder = client().prepareIndex("index").setId("2").setSource("field2", "value2");
try {
assertThat(
expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(),
Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded"));
} finally {
indexingCompletedLatch.countDown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class MapperServiceTests extends ESSingleNodeTestCase {

Expand Down Expand Up @@ -99,6 +101,15 @@ public void testTypeValidation() {
MapperService.validateTypeName("_doc"); // no exception
}

public void testPreflightUpdateDoesNotChangeMapping() throws Throwable {
final MapperService mapperService = createIndex("test1").mapperService();
final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1);
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT);
assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue());
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue());
}

/**
* Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit".
* Any additional field should trigger an IllegalArgumentException.
Expand All @@ -113,7 +124,7 @@ public void testTotalFieldsLimit() throws Throwable {
// adding one more field should trigger exception
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
createIndex("test2", settings).mapperService().merge("type",
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE);
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight());
});
assertTrue(e.getMessage(),
e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded"));
Expand Down Expand Up @@ -149,7 +160,7 @@ public void testMappingDepthExceedsLimit() throws Throwable {
indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE);

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE));
() -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight()));
assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded"));
}

Expand Down Expand Up @@ -197,7 +208,7 @@ public void testIndexSortWithNestedFields() throws IOException {
.endObject().endObject()));
invalidNestedException = expectThrows(IllegalArgumentException.class,
() -> indexService.mapperService().merge("t", nestedFieldMapping,
MergeReason.MAPPING_UPDATE));
updateOrPreflight()));
assertThat(invalidNestedException.getMessage(),
containsString("cannot have nested fields when index sort is activated"));
}
Expand Down Expand Up @@ -233,7 +244,7 @@ public void testFieldAliasWithMismatchedNestedScope() throws Throwable {
.endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE));
() -> mapperService.merge("type", mappingUpdate, updateOrPreflight()));
assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]"));
}

Expand Down Expand Up @@ -261,7 +272,7 @@ public void testTotalFieldsLimitWithFieldAlias() throws Throwable {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
createIndex("test2",
Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build())
.mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE);
.mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight());
});
assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage());
}
Expand Down Expand Up @@ -294,7 +305,7 @@ public void testFieldNameLengthLimit() throws Throwable {
.endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mappingUpdate, updateOrPreflight());
});

assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
Expand All @@ -319,7 +330,7 @@ public void testObjectNameLengthLimit() throws Throwable {
.endObject().endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mapping, updateOrPreflight());
});

assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
Expand Down Expand Up @@ -348,7 +359,7 @@ public void testAliasFieldNameLengthLimit() throws Throwable {
.endObject().endObject()));

IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mapping, updateOrPreflight());
});

assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
Expand Down Expand Up @@ -439,6 +450,10 @@ private boolean assertSameContainedFilters(TokenFilterFactory[] originalTokenFil
return true;
}

private static MergeReason updateOrPreflight() {
return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT);
}

public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin {

@Override
Expand Down