Skip to content

Commit

Permalink
Allow realtime get to read from translog (#48843)
Browse files Browse the repository at this point in the history
The realtime GET API currently has erratic performance in case where a document is accessed
that has just been indexed but not refreshed yet, as the implementation will currently force an
internal refresh in that case. Refreshing can be an expensive operation, and also will block the
thread that executes the GET operation, blocking other GETs to be processed. In case of
frequent access of recently indexed documents, this can lead to a refresh storm and terrible
GET performance.

While older versions of Elasticsearch (2.x and older) did not trigger refreshes and instead opted
to read from the translog in case of realtime GET API or update API, this was removed in 5.0
(#20102) to avoid inconsistencies between values that were returned from the translog and
those returned by the index. This was partially reverted in 6.3 (#29264) to allow _update and
upsert to read from the translog again as it was easier to guarantee consistency for these, and
also brought back more predictable performance characteristics of this API. Calls to the realtime
GET API, however, would still always do a refresh if necessary to return consistent results. This
means that users that were calling realtime GET APIs to coordinate updates on client side
(realtime GET + CAS for conditional index of updated doc) would still see very erratic
performance.

This PR (together with #48707) resolves the inconsistencies between reading from translog and
index. In particular it fixes the inconsistencies that happen when requesting stored fields, which
were not available when reading from translog. In case where stored fields are requested, this
PR will reparse the _source from the translog and derive the stored fields to be returned. With
this, it changes the realtime GET API to allow reading from the translog again, avoid refresh
storms and blocking the GET threadpool, and provide overall much better and predictable
performance for this API.
  • Loading branch information
ywelsch committed Nov 9, 2019
1 parent ff6c121 commit 8786286
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 43 deletions.
10 changes: 5 additions & 5 deletions docs/reference/docs/get.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ that it exists.
===== Realtime

By default, the get API is realtime, and is not affected by the refresh
rate of the index (when data will become visible for search). If a document
has been updated but is not yet refreshed, the get API will issue a refresh
call in-place to make the document visible. This will also make other documents
changed since the last refresh visible. In order to disable realtime GET,
one can set the `realtime` parameter to `false`.
rate of the index (when data will become visible for search). In case where
stored fields are requested (see `stored_fields` parameter) and the document
has been updated but is not yet refreshed, the get API will have to parse
and analyze the source to extract the stored fields. In order to disable
realtime GET, the `realtime` parameter can be set to `false`.

[float]
[[get-source-filtering]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
Expand All @@ -44,11 +45,12 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Set;

/**
* Internal class that mocks a single doc read from the transaction log as a leaf reader.
*/
final class TranslogLeafReader extends LeafReader {
public final class TranslogLeafReader extends LeafReader {

private final Translog.Index operation;
private static final FieldInfo FAKE_SOURCE_FIELD
Expand All @@ -60,6 +62,7 @@ final class TranslogLeafReader extends LeafReader {
private static final FieldInfo FAKE_ID_FIELD
= new FieldInfo(IdFieldMapper.NAME, 3, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(),
0, 0, 0, false);
public static Set<String> ALL_FIELD_NAMES = Sets.newHashSet(FAKE_SOURCE_FIELD.name, FAKE_ROUTING_FIELD.name, FAKE_ID_FIELD.name);

TranslogLeafReader(Translog.Index operation) {
this.operation = operation;
Expand Down Expand Up @@ -161,7 +164,7 @@ public void document(int docID, StoredFieldVisitor visitor) throws IOException {
BytesRef bytesRef = Uid.encodeId(operation.id());
final byte[] id = new byte[bytesRef.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
visitor.stringField(FAKE_ID_FIELD, id);
visitor.binaryField(FAKE_ID_FIELD, id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.lucene.index.FieldInfo;

import java.io.IOException;
import java.util.Set;

/**
Expand All @@ -39,7 +38,7 @@ public CustomFieldsVisitor(Set<String> fields, boolean loadSource) {
}

@Override
public Status needsField(FieldInfo fieldInfo) throws IOException {
public Status needsField(FieldInfo fieldInfo) {
if (super.needsField(fieldInfo) == Status.YES) {
return Status.YES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -72,7 +71,7 @@ public FieldsVisitor(boolean loadSource, String sourceFieldName) {
}

@Override
public Status needsField(FieldInfo fieldInfo) throws IOException {
public Status needsField(FieldInfo fieldInfo) {
if (requiredFields.remove(fieldInfo.name)) {
return Status.YES;
}
Expand Down Expand Up @@ -108,42 +107,54 @@ public void postProcess(MapperService mapperService) {
}

@Override
public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
public void binaryField(FieldInfo fieldInfo, byte[] value) {
binaryField(fieldInfo, new BytesRef(value));
}

public void binaryField(FieldInfo fieldInfo, BytesRef value) {
if (sourceFieldName.equals(fieldInfo.name)) {
source = new BytesArray(value);
} else if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
id = Uid.decodeId(value);
id = Uid.decodeId(value.bytes, value.offset, value.length);
} else {
addValue(fieldInfo.name, new BytesRef(value));
addValue(fieldInfo.name, value);
}
}

@Override
public void stringField(FieldInfo fieldInfo, byte[] bytes) throws IOException {
public void stringField(FieldInfo fieldInfo, byte[] bytes) {
assert IdFieldMapper.NAME.equals(fieldInfo.name) == false : "_id field must go through binaryField";
assert sourceFieldName.equals(fieldInfo.name) == false : "source field must go through binaryField";
final String value = new String(bytes, StandardCharsets.UTF_8);
addValue(fieldInfo.name, value);
}

@Override
public void intField(FieldInfo fieldInfo, int value) throws IOException {
public void intField(FieldInfo fieldInfo, int value) {
addValue(fieldInfo.name, value);
}

@Override
public void longField(FieldInfo fieldInfo, long value) throws IOException {
public void longField(FieldInfo fieldInfo, long value) {
addValue(fieldInfo.name, value);
}

@Override
public void floatField(FieldInfo fieldInfo, float value) throws IOException {
public void floatField(FieldInfo fieldInfo, float value) {
addValue(fieldInfo.name, value);
}

@Override
public void doubleField(FieldInfo fieldInfo, double value) throws IOException {
public void doubleField(FieldInfo fieldInfo, double value) {
addValue(fieldInfo.name, value);
}

public void objectField(FieldInfo fieldInfo, Object object) {
assert IdFieldMapper.NAME.equals(fieldInfo.name) == false : "_id field must go through binaryField";
assert sourceFieldName.equals(fieldInfo.name) == false : "source field must go through binaryField";
addValue(fieldInfo.name, object);
}

public BytesReference source() {
return source;
}
Expand Down
127 changes: 106 additions & 21 deletions server/src/main/java/org/elasticsearch/index/get/ShardGetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.elasticsearch.index.get;

import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexableFieldType;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
Expand All @@ -37,24 +43,29 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.TranslogLeafReader;
import org.elasticsearch.index.fieldvisitor.CustomFieldsVisitor;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand All @@ -81,16 +92,16 @@ public GetStats stats() {
public GetResult get(String type, String id, String[] gFields, boolean realtime, long version,
VersionType versionType, FetchSourceContext fetchSourceContext) {
return
get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext, false);
get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
}

private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
currentMetric.inc();
try {
long now = System.nanoTime();
GetResult getResult =
innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext, readFromTranslog);
innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);

if (getResult.isExists()) {
existsMetric.inc(System.nanoTime() - now);
Expand All @@ -105,7 +116,7 @@ private GetResult get(String type, String id, String[] gFields, boolean realtime

public GetResult getForUpdate(String type, String id, long ifSeqNo, long ifPrimaryTerm) {
return get(type, id, new String[]{RoutingFieldMapper.NAME}, true,
Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE, true);
Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);
}

/**
Expand Down Expand Up @@ -156,7 +167,7 @@ private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceCont
}

private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext, boolean readFromTranslog) {
long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
if (type == null || type.equals("_all")) {
DocumentMapper mapper = mapperService.documentMapper();
Expand All @@ -166,9 +177,9 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
Engine.GetResult get = null;
if (type != null) {
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
get = indexShard.get(new Engine.Get(realtime, readFromTranslog, type, id, uidTerm)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
if (get.exists() == false) {
get.close();
}
Expand All @@ -186,13 +197,33 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
}
}

private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext,
Engine.GetResult get, MapperService mapperService) {
private GetResult innerGetLoadFromStoredFields(String type, String id, String[] storedFields, FetchSourceContext fetchSourceContext,
Engine.GetResult get, MapperService mapperService) {
assert get.exists() : "method should only be called if document could be retrieved";

// check first if stored fields to be loaded don't contain an object field
DocumentMapper docMapper = mapperService.documentMapper();
if (storedFields != null) {
for (String field : storedFields) {
Mapper fieldMapper = docMapper.mappers().getMapper(field);
if (fieldMapper == null) {
if (docMapper.objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
}
}
}
}

Map<String, DocumentField> documentFields = null;
Map<String, DocumentField> metaDataFields = null;
BytesReference source = null;
DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
FieldsVisitor fieldVisitor = buildFieldsVisitors(gFields, fetchSourceContext);
// force fetching source if we read from translog and need to recreate stored fields
boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&
Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);
FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,
forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);
if (fieldVisitor != null) {
try {
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
Expand All @@ -201,6 +232,54 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
}
source = fieldVisitor.source();

// in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
if (get.isFromTranslog()) {
// Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
// just make source consistent by reapplying source filters from mapping (possibly also nulling the source)
if (forceSourceForComputingTranslogStoredFields == false) {
try {
source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);
} catch (IOException e) {
throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
}
} else {
// Slow path: recreate stored fields from original source
assert source != null : "original source in translog must exist";
SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), type, id, source,
XContentHelper.xContentType(source), fieldVisitor.routing());
ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
// update special fields
doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
doc.version().setLongValue(docIdAndVersion.version);

// retrieve stored fields from parsed doc
fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);
for (IndexableField indexableField : doc.rootDoc().getFields()) {
IndexableFieldType fieldType = indexableField.fieldType();
if (fieldType.stored()) {
FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
if (status == StoredFieldVisitor.Status.YES) {
if (indexableField.binaryValue() != null) {
fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
} else if (indexableField.stringValue() != null) {
fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
} else if (indexableField.numericValue() != null) {
fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
}
} else if (status == StoredFieldVisitor.Status.STOP) {
break;
}
}
}
// retrieve source (with possible transformations, e.g. source filters
source = fieldVisitor.source();
}
}

// put stored fields into result objects
if (!fieldVisitor.fields().isEmpty()) {
fieldVisitor.postProcess(mapperService);
documentFields = new HashMap<>();
Expand All @@ -215,16 +294,22 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
}
}

DocumentMapper docMapper = mapperService.documentMapper();

if (gFields != null && gFields.length > 0) {
for (String field : gFields) {
Mapper fieldMapper = docMapper.mappers().getMapper(field);
if (fieldMapper == null) {
if (docMapper.objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
}
if (source != null) {
// apply request-level source filtering
if (fetchSourceContext.fetchSource() == false) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
Map<String, Object> sourceAsMap;
// TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
// Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
XContentType sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
try {
source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
} catch (IOException e) {
throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
}
}
}
Expand Down
Loading

0 comments on commit 8786286

Please sign in to comment.