From 13e19e7428568e8a71dd42618bbbdd807881f131 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 28 Mar 2018 18:03:34 +0200 Subject: [PATCH] Allow _update and upsert to read from the transaction log (#29264) We historically removed reading from the transaction log to get consistent results from _GET calls. There was also the motivation that the read-modify-update principle we apply should not be hidden from the user. We still agree on the fact that we should not hide these aspects but the impact on updates is quite significant especially if the same documents is updated before it's written to disk and made serachable. This change adds back the ability to read from the transaction log but only for update calls. Calls to the _GET API will always do a refresh if necessary to return consistent results ie. if stored fields or DocValues Fields are requested. Closes #26802 --- .../explain/TransportExplainAction.java | 4 +- .../action/update/UpdateHelper.java | 6 +- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 2 +- .../lucene/uid/VersionsAndSeqNoResolver.java | 9 +- .../elasticsearch/index/engine/Engine.java | 8 +- .../index/engine/InternalEngine.java | 36 ++- .../index/engine/TranslogLeafReader.java | 237 ++++++++++++++++++ .../index/engine/TranslogVersionValue.java | 71 ++++++ .../index/engine/VersionValue.java | 11 +- .../index/get/ShardGetService.java | 22 +- .../index/termvectors/TermVectorsService.java | 4 +- .../index/translog/BaseTranslogReader.java | 9 + .../index/translog/Translog.java | 27 ++ .../index/translog/TranslogSnapshot.java | 1 - .../index/engine/InternalEngineTests.java | 26 +- .../index/shard/IndexShardTests.java | 6 +- .../index/shard/RefreshListenersTests.java | 4 +- .../index/shard/ShardGetServiceTests.java | 132 ++++++++++ .../index/translog/TranslogTests.java | 23 +- .../index/engine/EngineTestCase.java | 2 +- .../index/shard/IndexShardTestCase.java | 7 +- 21 files changed, 602 insertions(+), 45 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java create mode 100644 server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java create mode 100644 server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java index 5b20b848f0b04..18c1ea41e95b9 100644 --- a/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java +++ b/server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java @@ -112,13 +112,13 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId if (uidTerm == null) { return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false); } - result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm)); + result = context.indexShard().get(new Engine.Get(false, false, request.type(), request.id(), uidTerm)); if (!result.exists()) { return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false); } context.parsedQuery(context.getQueryShardContext().toQuery(request.query())); context.preProcess(true); - int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().context.docBase; + int topLevelDocId = result.docIdAndVersion().docId + result.docIdAndVersion().docBase; Explanation explanation = context.searcher().explain(context.query(), topLevelDocId); for (RescoreContext ctx : context.rescore()) { Rescorer rescorer = ctx.rescorer(); diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 4ee49f2407b5d..ab10aa710cce6 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -47,7 +47,6 @@ import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.lookup.SourceLookup; import java.io.IOException; @@ -71,9 +70,8 @@ public UpdateHelper(Settings settings, ScriptService scriptService) { * Prepares an update request by converting it into an index or delete request or an update response (no action). */ public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) { - final GetResult getResult = indexShard.getService().get(request.type(), request.id(), - new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME}, - true, request.version(), request.versionType(), FetchSourceContext.FETCH_SOURCE); + final GetResult getResult = indexShard.getService().getForUpdate(request.type(), request.id(), request.version(), + request.versionType()); return prepare(indexShard.shardId(), request, getResult, nowInMillis); } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index f8ccd827019a4..38fcdfe5f1b62 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -100,7 +100,7 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) if (versions.advanceExact(docID) == false) { throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field"); } - return new DocIdAndVersion(docID, versions.longValue(), context); + return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase); } else { return null; } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 126e4dee51cc2..9db7e3716d51a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.lucene.uid; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; @@ -97,12 +98,14 @@ private VersionsAndSeqNoResolver() { public static class DocIdAndVersion { public final int docId; public final long version; - public final LeafReaderContext context; + public final LeafReader reader; + public final int docBase; - DocIdAndVersion(int docId, long version, LeafReaderContext context) { + public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) { this.docId = docId; this.version = version; - this.context = context; + this.reader = reader; + this.docBase = docBase; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1ca4468539da1..6cc8c4197dcd5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1232,14 +1232,16 @@ public static class Get { private final boolean realtime; private final Term uid; private final String type, id; + private final boolean readFromTranslog; private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; - public Get(boolean realtime, String type, String id, Term uid) { + public Get(boolean realtime, boolean readFromTranslog, String type, String id, Term uid) { this.realtime = realtime; this.type = type; this.id = id; this.uid = uid; + this.readFromTranslog = readFromTranslog; } public boolean realtime() { @@ -1275,6 +1277,10 @@ public Get versionType(VersionType versionType) { this.versionType = versionType; return this; } + + public boolean isReadFromTranslog() { + return readFromTranslog; + } } public static class GetResult implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0fda2f04ac5a4..864385667f5fe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -78,6 +78,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -145,6 +146,7 @@ public class InternalEngine extends Engine { * being indexed/deleted. */ private final AtomicLong writingBytes = new AtomicLong(); + private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); @Nullable private final String historyUUID; @@ -558,6 +560,27 @@ public GetResult get(Get get, BiFunction search throw new VersionConflictEngineException(shardId, get.type(), get.id(), get.versionType().explainConflictForReads(versionValue.version, get.version())); } + if (get.isReadFromTranslog()) { + // this is only used for updates - API _GET calls will always read form a reader for consistency + // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 + if (versionValue.getLocation() != null) { + try { + Translog.Operation operation = translog.readOperation(versionValue.getLocation()); + if (operation != null) { + // in the case of a already pruned translog generation we might get null here - yet very unlikely + TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig + .getIndexSettings().getIndexVersionCreated()); + return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader)), + new VersionsAndSeqNoResolver.DocIdAndVersion(0, ((Translog.Index) operation).version(), reader, 0)); + } + } catch (IOException e) { + maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event + throw new EngineException(shardId, "failed to read operation from translog", e); + } + } else { + trackTranslogLocation.set(true); + } + } refresh("realtime_get", SearcherScope.INTERNAL); } scope = SearcherScope.INTERNAL; @@ -790,6 +813,10 @@ public IndexResult index(Index index) throws IOException { } indexResult.setTranslogLocation(location); } + if (plan.indexIntoLucene && indexResult.hasFailure() == false) { + versionMap.maybePutUnderLock(index.uid().bytes(), + getVersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm(), indexResult.getTranslogLocation())); + } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo()); } @@ -916,8 +943,6 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); index(index.docs(), indexWriter); } - versionMap.maybePutUnderLock(index.uid().bytes(), - new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { @@ -941,6 +966,13 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) } } + private VersionValue getVersionValue(long version, long seqNo, long term, Translog.Location location) { + if (location != null && trackTranslogLocation.get()) { + return new TranslogVersionValue(location, version, seqNo, term); + } + return new VersionValue(version, seqNo, term); + } + /** * returns true if the indexing operation may have already be processed by this engine. * Note that it is OK to rarely return true even if this is not the case. However a `false` diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java new file mode 100644 index 0000000000000..628bfd4826935 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogLeafReader.java @@ -0,0 +1,237 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.LeafMetaData; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.apache.lucene.index.Terms; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.Version; +import org.elasticsearch.index.fielddata.AbstractSortedDocValues; +import org.elasticsearch.index.fielddata.AbstractSortedSetDocValues; +import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.mapper.UidFieldMapper; +import org.elasticsearch.index.translog.Translog; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * Internal class that mocks a single doc read from the transaction log as a leaf reader. + */ +final class TranslogLeafReader extends LeafReader { + + private final Translog.Index operation; + private static final FieldInfo FAKE_SOURCE_FIELD + = new FieldInfo(SourceFieldMapper.NAME, 1, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private static final FieldInfo FAKE_ROUTING_FIELD + = new FieldInfo(RoutingFieldMapper.NAME, 2, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private static final FieldInfo FAKE_ID_FIELD + = new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private static final FieldInfo FAKE_UID_FIELD + = new FieldInfo(UidFieldMapper.NAME, 4, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), + 0,0); + private final Version indexVersionCreated; + + TranslogLeafReader(Translog.Index operation, Version indexVersionCreated) { + this.operation = operation; + this.indexVersionCreated = indexVersionCreated; + } + @Override + public CacheHelper getCoreCacheHelper() { + throw new UnsupportedOperationException(); + } + + @Override + public Terms terms(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public NumericDocValues getNumericDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public BinaryDocValues getBinaryDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public SortedDocValues getSortedDocValues(String field) { + // TODO this can be removed in 7.0 and upwards we don't support the parent field anymore + if (field.startsWith(ParentFieldMapper.NAME + "#") && operation.parent() != null) { + return new AbstractSortedDocValues() { + @Override + public int docID() { + return 0; + } + + private final BytesRef term = new BytesRef(operation.parent()); + private int ord; + @Override + public boolean advanceExact(int docID) { + if (docID != 0) { + throw new IndexOutOfBoundsException("do such doc ID: " + docID); + } + ord = 0; + return true; + } + + @Override + public int ordValue() { + return ord; + } + + @Override + public BytesRef lookupOrd(int ord) { + if (ord == 0) { + return term; + } + return null; + } + + @Override + public int getValueCount() { + return 1; + } + }; + } + if (operation.parent() == null) { + return null; + } + assert false : "unexpected field: " + field; + return null; + } + + @Override + public SortedNumericDocValues getSortedNumericDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public SortedSetDocValues getSortedSetDocValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public NumericDocValues getNormValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public FieldInfos getFieldInfos() { + throw new UnsupportedOperationException(); + } + + @Override + public Bits getLiveDocs() { + throw new UnsupportedOperationException(); + } + + @Override + public PointValues getPointValues(String field) { + throw new UnsupportedOperationException(); + } + + @Override + public void checkIntegrity() { + + } + + @Override + public LeafMetaData getMetaData() { + throw new UnsupportedOperationException(); + } + + @Override + public Fields getTermVectors(int docID) { + throw new UnsupportedOperationException(); + } + + @Override + public int numDocs() { + return 1; + } + + @Override + public int maxDoc() { + return 1; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + if (docID != 0) { + throw new IllegalArgumentException("no such doc ID " + docID); + } + if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) { + assert operation.source().toBytesRef().offset == 0; + assert operation.source().toBytesRef().length == operation.source().toBytesRef().bytes.length; + visitor.binaryField(FAKE_SOURCE_FIELD, operation.source().toBytesRef().bytes); + } + if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) { + visitor.stringField(FAKE_ROUTING_FIELD, operation.routing().getBytes(StandardCharsets.UTF_8)); + } + if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) { + final byte[] id; + if (indexVersionCreated.onOrAfter(Version.V_6_0_0)) { + BytesRef bytesRef = Uid.encodeId(operation.id()); + id = new byte[bytesRef.length]; + System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length); + } else { // TODO this can go away in 7.0 after backport + id = operation.id().getBytes(StandardCharsets.UTF_8); + } + visitor.stringField(FAKE_ID_FIELD, id); + } + if (visitor.needsField(FAKE_UID_FIELD) == StoredFieldVisitor.Status.YES) { + visitor.stringField(FAKE_UID_FIELD, Uid.createUid(operation.type(), operation.id()).getBytes(StandardCharsets.UTF_8)); + } + } + + @Override + protected void doClose() { + + } + + @Override + public CacheHelper getReaderCacheHelper() { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java new file mode 100644 index 0000000000000..67415ea6139a6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/TranslogVersionValue.java @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.index.translog.Translog; + +import java.util.Objects; + +final class TranslogVersionValue extends VersionValue { + + private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(TranslogVersionValue.class); + + private final Translog.Location translogLocation; + + TranslogVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) { + super(version, seqNo, term); + this.translogLocation = translogLocation; + } + + @Override + public long ramBytesUsed() { + return RAM_BYTES_USED; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + TranslogVersionValue that = (TranslogVersionValue) o; + return Objects.equals(translogLocation, that.translogLocation); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), translogLocation); + } + + @Override + public String toString() { + return "TranslogVersionValue{" + + "version=" + version + + ", seqNo=" + seqNo + + ", term=" + term + + ", location=" + translogLocation + + '}'; + } + + @Override + public Translog.Location getLocation() { + return translogLocation; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java index e2a2614d6c102..d63306486732e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java +++ b/server/src/main/java/org/elasticsearch/index/engine/VersionValue.java @@ -21,6 +21,8 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.index.translog.Translog; import java.util.Collection; import java.util.Collections; @@ -81,9 +83,16 @@ public int hashCode() { public String toString() { return "VersionValue{" + "version=" + version + - ", seqNo=" + seqNo + ", term=" + term + '}'; } + + /** + * Returns the translog location for this version value or null. This is optional and might not be tracked all the time. + */ + @Nullable + public Translog.Location getLocation() { + return null; + } } diff --git a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java index dcd18c8f313f9..a6c8dbf53b395 100644 --- a/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java +++ b/server/src/main/java/org/elasticsearch/index/get/ShardGetService.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; @@ -75,10 +76,15 @@ public GetStats stats() { } public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) { + return get(type, id, gFields, realtime, version, versionType, fetchSourceContext, false); + } + + private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, + FetchSourceContext fetchSourceContext, boolean readFromTranslog) { currentMetric.inc(); try { long now = System.nanoTime(); - GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext); + GetResult getResult = innerGet(type, id, gFields, realtime, version, versionType, fetchSourceContext, readFromTranslog); if (getResult.isExists()) { existsMetric.inc(System.nanoTime() - now); @@ -91,6 +97,11 @@ public GetResult get(String type, String id, String[] gFields, boolean realtime, } } + public GetResult getForUpdate(String type, String id, long version, VersionType versionType) { + return get(type, id, new String[]{RoutingFieldMapper.NAME, ParentFieldMapper.NAME}, true, version, versionType, + FetchSourceContext.FETCH_SOURCE, true); + } + /** * Returns {@link GetResult} based on the specified {@link org.elasticsearch.index.engine.Engine.GetResult} argument. * This method basically loads specified fields for the associated document in the engineGetResult. @@ -137,7 +148,8 @@ private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceCont return FetchSourceContext.DO_NOT_FETCH_SOURCE; } - private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext) { + private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, + FetchSourceContext fetchSourceContext, boolean readFromTranslog) { fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields); final Collection types; if (type == null || type.equals("_all")) { @@ -150,7 +162,7 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea for (String typeX : types) { Term uidTerm = mapperService.createUidTerm(typeX, id); if (uidTerm != null) { - get = indexShard.get(new Engine.Get(realtime, typeX, id, uidTerm) + get = indexShard.get(new Engine.Get(realtime, readFromTranslog, typeX, id, uidTerm) .version(version).versionType(versionType)); if (get.exists()) { type = typeX; @@ -180,7 +192,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext); if (fieldVisitor != null) { try { - docIdAndVersion.context.reader().document(docIdAndVersion.docId, fieldVisitor); + docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor); } catch (IOException e) { throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "]", e); } @@ -197,7 +209,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[] DocumentMapper docMapper = mapperService.documentMapper(type); if (docMapper.parentFieldMapper().active()) { - String parentId = ParentFieldSubFetchPhase.getParentId(docMapper.parentFieldMapper(), docIdAndVersion.context.reader(), docIdAndVersion.docId); + String parentId = ParentFieldSubFetchPhase.getParentId(docMapper.parentFieldMapper(), docIdAndVersion.reader, docIdAndVersion.docId); if (fields == null) { fields = new HashMap<>(1); } diff --git a/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java b/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java index d527fa83501b3..573e75d78060a 100644 --- a/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java +++ b/server/src/main/java/org/elasticsearch/index/termvectors/TermVectorsService.java @@ -85,7 +85,7 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ termVectorsResponse.setExists(false); return termVectorsResponse; } - Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), request.type(), request.id(), uidTerm) + Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), false, request.type(), request.id(), uidTerm) .version(request.version()).versionType(request.versionType())); Fields termVectorsByField = null; @@ -114,7 +114,7 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ /* or from an existing document */ else if (docIdAndVersion != null) { // fields with stored term vectors - termVectorsByField = docIdAndVersion.context.reader().getTermVectors(docIdAndVersion.docId); + termVectorsByField = docIdAndVersion.reader.getTermVectors(docIdAndVersion.docId); Set selectedFields = request.selectedFields(); // generate tvs for fields where analyzer is overridden if (selectedFields == null && request.perFieldAnalyzer() != null) { diff --git a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java index d86c4491b63e9..14ee8ecb9b3c0 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/BaseTranslogReader.java @@ -126,4 +126,13 @@ public Path path() { public long getLastModifiedTime() throws IOException { return Files.getLastModifiedTime(path).toMillis(); } + + /** + * Reads a single opertation from the given location. + */ + Translog.Operation read(Translog.Location location) throws IOException { + assert location.generation == this.generation : "generation mismatch expected: " + generation + " got: " + location.generation; + ByteBuffer buffer = ByteBuffer.allocate(location.size); + return read(checksummedStream(buffer, location.translogLocation, location.size, null)); + } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 0043472b72f7c..62e47d08ded54 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -571,6 +571,33 @@ public Snapshot newSnapshotFromGen(long minGeneration) throws IOException { } } + /** + * Reads and returns the operation from the given location if the generation it references is still available. Otherwise + * this method will return null. + */ + public Operation readOperation(Location location) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + if (location.generation < getMinFileGeneration()) { + return null; + } + if (current.generation == location.generation) { + // no need to fsync here the read operation will ensure that buffers are written to disk + // if they are still in RAM and we are reading onto that position + return current.read(location); + } else { + // read backwards - it's likely we need to read on that is recent + for (int i = readers.size() - 1; i >= 0; i--) { + TranslogReader translogReader = readers.get(i); + if (translogReader.generation == location.generation) { + return translogReader.read(location); + } + } + } + } + return null; + } + public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java index 656772fa8169d..5f6d14e192eb8 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogSnapshot.java @@ -104,5 +104,4 @@ public String toString() { ", reusableBuffer=" + reusableBuffer + '}'; } - } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 1ecb1829234ab..bba05401d4155 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1238,7 +1238,7 @@ public void testVersionedUpdate() throws IOException { Engine.Index create = new Engine.Index(newUid(doc), doc, Versions.MATCH_DELETED); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(1, get.version()); } @@ -1246,7 +1246,7 @@ public void testVersionedUpdate() throws IOException { Engine.IndexResult update_1_result = engine.index(update_1); assertThat(update_1_result.getVersion(), equalTo(2L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(2, get.version()); } @@ -1254,7 +1254,7 @@ public void testVersionedUpdate() throws IOException { Engine.IndexResult update_2_result = engine.index(update_2); assertThat(update_2_result.getVersion(), equalTo(3L)); - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), create.uid()), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), create.uid()), searcherFactory)) { assertEquals(3, get.version()); } @@ -1765,7 +1765,7 @@ public void testVersioningPromotedReplica() throws IOException { assertOpsOnReplica(replicaOps, replicaEngine, true); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, - new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); + new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); try (Searcher searcher = engine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); searcher.searcher().search(new MatchAllDocsQuery(), collector); @@ -1830,9 +1830,9 @@ class OpAndVersion { throw new AssertionError(e); } for (int op = 0; op < opsPerThread; op++) { - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), uidTerm), searcherFactory)) { FieldsVisitor visitor = new FieldsVisitor(true); - get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); + get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); List values = new ArrayList<>(Strings.commaDelimitedListToSet(visitor.source().utf8ToString())); String removed = op % 3 == 0 && values.size() > 0 ? values.remove(0) : null; String added = "v_" + idGenerator.incrementAndGet(); @@ -1872,9 +1872,9 @@ class OpAndVersion { assertTrue(op.added + " should not exist", exists); } - try (Engine.GetResult get = engine.get(new Engine.Get(true, doc.type(), doc.id(), uidTerm), searcherFactory)) { + try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), uidTerm), searcherFactory)) { FieldsVisitor visitor = new FieldsVisitor(true); - get.docIdAndVersion().context.reader().document(get.docIdAndVersion().docId, visitor); + get.docIdAndVersion().reader.document(get.docIdAndVersion().docId, visitor); List values = Arrays.asList(Strings.commaDelimitedListToStringArray(visitor.source().utf8ToString())); assertThat(currentValues, equalTo(new HashSet<>(values))); } @@ -2275,7 +2275,7 @@ public void testEnableGcDeletes() throws Exception { engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document (we never indexed uid=2): - getResult = engine.get(new Engine.Get(true, "type", "2", newUid("2")), searcherFactory); + getResult = engine.get(new Engine.Get(true, false, "type", "2", newUid("2")), searcherFactory); assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: @@ -3450,7 +3450,7 @@ public void afterRefresh(boolean didRefresh) throws IOException { } public void testSequenceIDs() throws Exception { - Tuple seqID = getSequenceID(engine, new Engine.Get(false, "type", "2", newUid("1"))); + Tuple seqID = getSequenceID(engine, new Engine.Get(false, false, "type", "2", newUid("1"))); // Non-existent doc returns no seqnum and no primary term assertThat(seqID.v1(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); assertThat(seqID.v2(), equalTo(0L)); @@ -3665,7 +3665,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio } assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint)); - try (Engine.GetResult result = engine.get(new Engine.Get(true, "type", "2", uid), searcherFactory)) { + try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "type", "2", uid), searcherFactory)) { assertThat(result.exists(), equalTo(exists)); } } @@ -4454,14 +4454,14 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup CountDownLatch awaitStarted = new CountDownLatch(1); Thread thread = new Thread(() -> { awaitStarted.countDown(); - try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc3.type(), doc3.id(), doc3.uid()), + try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc3.type(), doc3.id(), doc3.uid()), engine::acquireSearcher)) { assertTrue(getResult.exists()); } }); thread.start(); awaitStarted.await(); - try (Engine.GetResult getResult = engine.get(new Engine.Get(true, doc.type(), doc.id(), doc.uid()), + try (Engine.GetResult getResult = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), doc.uid()), engine::acquireSearcher)) { assertFalse(getResult.exists()); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 941a967355345..7aa597c2d4d42 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1185,7 +1185,7 @@ public void testRefreshMetric() throws IOException { } long refreshCount = shard.refreshStats().getTotal(); indexDoc(shard, "test", "test"); - try (Engine.GetResult ignored = shard.get(new Engine.Get(true, "test", "test", + try (Engine.GetResult ignored = shard.get(new Engine.Get(true, false, "test", "test", new Term(IdFieldMapper.NAME, Uid.encodeId("test"))))) { assertThat(shard.refreshStats().getTotal(), equalTo(refreshCount+1)); } @@ -1833,7 +1833,7 @@ public void testSearcherWrapperIsUsed() throws IOException { indexDoc(shard, "test", "1", "{\"foobar\" : \"bar\"}"); shard.refresh("test"); - Engine.GetResult getResult = shard.get(new Engine.Get(false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); + Engine.GetResult getResult = shard.get(new Engine.Get(false, false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); assertTrue(getResult.exists()); assertNotNull(getResult.searcher()); getResult.release(); @@ -1867,7 +1867,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException { search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10); assertEquals(search.totalHits, 1); } - getResult = newShard.get(new Engine.Get(false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); + getResult = newShard.get(new Engine.Get(false, false, "test", "1", new Term(IdFieldMapper.NAME, Uid.encodeId("1")))); assertTrue(getResult.exists()); assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 0609477dda8e5..1bd98cd1c9e69 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -323,12 +323,12 @@ public void testLotsOfThreads() throws Exception { } listener.assertNoError(); - Engine.Get get = new Engine.Get(false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); + Engine.Get get = new Engine.Get(false, false, "test", threadId, new Term(IdFieldMapper.NAME, threadId)); try (Engine.GetResult getResult = engine.get(get, engine::acquireSearcher)) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); SingleFieldsVisitor visitor = new SingleFieldsVisitor("test"); - getResult.docIdAndVersion().context.reader().document(getResult.docIdAndVersion().docId, visitor); + getResult.docIdAndVersion().reader.document(getResult.docIdAndVersion().docId, visitor); assertEquals(Arrays.asList(testFieldValue), visitor.fields().get("test")); } } catch (Exception t) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java new file mode 100644 index 0000000000000..c626f2d18522c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardGetServiceTests.java @@ -0,0 +1,132 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.RoutingFieldMapper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class ShardGetServiceTests extends IndexShardTestCase { + + public void testGetForUpdate() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL); + assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we refreshed + } + + Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar", null); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we read from the translog + } + primary.getEngine().refresh("test"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 2); + } + + // now again from the reader + test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar", null); + assertTrue(primary.getEngine().refreshNeeded()); + testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(RoutingFieldMapper.NAME).getValue()); + + closeShards(primary); + } + + public void testGetForUpdateWithParentField() throws IOException { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put("index.version.created", Version.V_5_6_0) // for parent field mapper + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("parent", "{ \"properties\": {}}") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}, \"_parent\": { \"type\": \"parent\"}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(primary); + Engine.IndexResult test = indexDoc(primary, "test", "0", "{\"foo\" : \"bar\"}"); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet = primary.getService().getForUpdate("test", "0", test.getVersion(), VersionType.INTERNAL); + assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals(new String(testGet.source(), StandardCharsets.UTF_8), "{\"foo\" : \"bar\"}"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we refreshed + } + + Engine.IndexResult test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, null, "foobar"); + assertTrue(primary.getEngine().refreshNeeded()); + GetResult testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(ParentFieldMapper.NAME).getValue()); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 1); // we read from the translog + } + primary.getEngine().refresh("test"); + try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + assertEquals(searcher.reader().maxDoc(), 2); + } + + // now again from the reader + test1 = indexDoc(primary, "test", "1", "{\"foo\" : \"baz\"}", XContentType.JSON, null, "foobar"); + assertTrue(primary.getEngine().refreshNeeded()); + testGet1 = primary.getService().getForUpdate("test", "1", test1.getVersion(), VersionType.INTERNAL); + assertEquals(new String(testGet1.source(), StandardCharsets.UTF_8), "{\"foo\" : \"baz\"}"); + assertTrue(testGet1.getFields().containsKey(ParentFieldMapper.NAME)); + assertFalse(testGet1.getFields().containsKey(RoutingFieldMapper.NAME)); + assertEquals("foobar", testGet1.getFields().get(ParentFieldMapper.NAME).getValue()); + + closeShards(primary); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 2317d8fb0d8bf..61e5cdcfd953a 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -235,9 +235,9 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); } - private void addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { + private Location addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { list.add(op); - translog.add(op); + return translog.add(op); } public void testIdParsingFromFile() { @@ -579,6 +579,19 @@ public void testSnapshot() throws IOException { } } + public void testReadLocation() throws IOException { + ArrayList ops = new ArrayList<>(); + ArrayList locs = new ArrayList<>(); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "2", 1, new byte[]{1}))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("test", "3", 2, new byte[]{1}))); + int i = 0; + for (Translog.Operation op : ops) { + assertEquals(op, translog.readOperation(locs.get(i++))); + } + assertNull(translog.readOperation(new Location(100, 0, 0))); + } + public void testSnapshotWithNewTranslog() throws IOException { List toClose = new ArrayList<>(); try { @@ -689,6 +702,9 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable { Translog.Operation op = snapshot.next(); assertNotNull(op); Translog.Operation expectedOp = locationOperation.operation; + if (randomBoolean()) { + assertEquals(expectedOp, translog.readOperation(locationOperation.location)); + } assertEquals(expectedOp.opType(), op.opType()); switch (op.opType()) { case INDEX: @@ -1643,6 +1659,9 @@ public void run() { Translog.Location loc = add(op); writtenOperations.add(new LocationOperation(op, loc)); + if (rarely()) { // lets verify we can concurrently read this + assertEquals(op, translog.readOperation(loc)); + } afterAdd(); } } catch (Exception t) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8a9ad3d2a76e1..667adf9d990cc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -471,7 +471,7 @@ protected Term newUid(ParsedDocument doc) { } protected Engine.Get newGet(boolean realtime, ParsedDocument doc) { - return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc)); + return new Engine.Get(realtime, false, doc.type(), doc.id(), newUid(doc)); } protected Engine.Index indexForDoc(ParsedDocument doc) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 6d6cc36d78b1b..2656855b9fd15 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -548,12 +548,15 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) } protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source) throws IOException { - return indexDoc(shard, type, id, source, XContentType.JSON); + return indexDoc(shard, type, id, source, XContentType.JSON, null, null); } - protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType) + protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id, String source, XContentType xContentType, + String routing, String parentId) throws IOException { SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType); + sourceToParse.routing(routing); + sourceToParse.parent(parentId); if (shard.routingEntry().primary()) { final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, type));