Skip to content

Commit

Permalink
[Remove] Segment memory estimation and tracking (#2029)
Browse files Browse the repository at this point in the history
Lucene 9 removed CodecReader#ramBytesUsed and all file formats that no longer
consume large amounts of memory. As a result RAM estimation for segments is no
longer possible and is removed by this commit. backwards compatibility is retained 
for the tranport layer.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Feb 4, 2022
1 parent 1e5d983 commit fc0d3a3
Show file tree
Hide file tree
Showing 21 changed files with 80 additions and 591 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -57,7 +55,6 @@
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.settings.Settings;
Expand All @@ -76,7 +73,6 @@
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.NoOpEngine;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.flush.FlushStats;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
Expand All @@ -86,10 +82,8 @@
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.breaker.CircuitBreakerStats;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
Expand Down Expand Up @@ -122,7 +116,6 @@
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.BREAKER;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.NONE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -135,13 +128,11 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public class IndexShardIT extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -643,86 +634,6 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
}
}

/** Check that the accounting breaker correctly matches the segments API for memory usage */
private void checkAccountingBreaker() {
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class);
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
long usedMem = acctBreaker.getUsed();
assertThat(usedMem, greaterThan(0L));
NodesStatsResponse response = client().admin().cluster().prepareNodesStats().setIndices(true).addMetric(BREAKER.metricName()).get();
NodeStats stats = response.getNodes().get(0);
assertNotNull(stats);
SegmentsStats segmentsStats = stats.getIndices().getSegments();
CircuitBreakerStats breakerStats = stats.getBreaker().getStats(CircuitBreaker.ACCOUNTING);
assertEquals(usedMem, segmentsStats.getMemoryInBytes());
assertEquals(usedMem, breakerStats.getEstimated());
}

public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("network.breaker.inflight_requests.overhead", 0.0))
.get();

// Generate a couple of segments
client().prepareIndex("test", "_doc", "1")
.setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE)
.get();
// Use routing so 2 documents are guaranteed to be on the same shard
String routing = randomAlphaOfLength(5);
client().prepareIndex("test", "_doc", "2")
.setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE)
.setRouting(routing)
.get();
client().prepareIndex("test", "_doc", "3")
.setSource("{\"foo\":\"" + randomAlphaOfLength(100) + "\"}", XContentType.JSON)
.setRefreshPolicy(IMMEDIATE)
.setRouting(routing)
.get();

checkAccountingBreaker();
// Test that force merging causes the breaker to be correctly adjusted
logger.info("--> force merging to a single segment");
client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).setFlush(randomBoolean()).get();
client().admin().indices().prepareRefresh().get();
checkAccountingBreaker();

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("indices.breaker.total.limit", "1kb"))
.get();

// Test that we're now above the parent limit due to the segments
Exception e = expectThrows(
Exception.class,
() -> client().prepareSearch("test").addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get()
);
logger.info("--> got an expected exception", e);
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));

client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().putNull("indices.breaker.total.limit").putNull("network.breaker.inflight_requests.overhead")
)
.get();

// Test that deleting the index causes the breaker to correctly be decremented
logger.info("--> deleting index");
client().admin().indices().prepareDelete("test").get();

// Accounting breaker should now be 0
CircuitBreakerService breakerService = getInstanceFromNode(CircuitBreakerService.class);
CircuitBreaker acctBreaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);
assertThat(acctBreaker.getUsed(), equalTo(0L));
}

public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,6 @@ public void testSegmentsStats() {

assertThat(stats.getTotal().getSegments(), notNullValue());
assertThat(stats.getTotal().getSegments().getCount(), equalTo((long) test1.totalNumShards));
assertThat(stats.getTotal().getSegments().getMemoryInBytes(), greaterThan(0L));
}

public void testAllFlags() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.field(Fields.NUM_DOCS, segment.getNumDocs());
builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, new ByteSizeValue(segment.getMemoryInBytes()));
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, segment.getZeroMemory());
builder.field(Fields.COMMITTED, segment.isCommitted());
builder.field(Fields.SEARCH, segment.isSearch());
if (segment.getVersion() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public RecoveryStats getRecoveryStats() {

/**
* Utility method which computes total memory by adding
* FieldData, PercolatorCache, Segments (memory, index writer, version map)
* FieldData, PercolatorCache, Segments (index writer, version map)
*/
public ByteSizeValue getTotalMemory() {
long size = 0;
Expand All @@ -504,8 +504,7 @@ public ByteSizeValue getTotalMemory() {
size += this.getQueryCache().getMemorySizeInBytes();
}
if (this.getSegments() != null) {
size += this.getSegments().getMemoryInBytes() + this.getSegments().getIndexWriterMemoryInBytes() + this.getSegments()
.getVersionMapMemoryInBytes();
size += this.getSegments().getIndexWriterMemoryInBytes() + this.getSegments().getVersionMapMemoryInBytes();
}

return new ByteSizeValue(size);
Expand Down
19 changes: 1 addition & 18 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -161,14 +160,6 @@ protected Engine(EngineConfig engineConfig) {
this.eventListener = engineConfig.getEventListener();
}

/** Returns 0 in the case where accountable is null, otherwise returns {@code ramBytesUsed()} */
protected static long guardedRamBytesUsed(Accountable a) {
if (a == null) {
return 0;
}
return a.ramBytesUsed();
}

public final EngineConfig config() {
return engineConfig;
}
Expand Down Expand Up @@ -875,14 +866,7 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
}

protected void fillSegmentStats(SegmentReader segmentReader, boolean includeSegmentFileSizes, SegmentsStats stats) {
stats.add(1, segmentReader.ramBytesUsed());
stats.addTermsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPostingsReader()));
stats.addStoredFieldsMemoryInBytes(guardedRamBytesUsed(segmentReader.getFieldsReader()));
stats.addTermVectorsMemoryInBytes(guardedRamBytesUsed(segmentReader.getTermVectorsReader()));
stats.addNormsMemoryInBytes(guardedRamBytesUsed(segmentReader.getNormsReader()));
stats.addPointsMemoryInBytes(guardedRamBytesUsed(segmentReader.getPointsReader()));
stats.addDocValuesMemoryInBytes(guardedRamBytesUsed(segmentReader.getDocValuesReader()));

stats.add(1);
if (includeSegmentFileSizes) {
// TODO: consider moving this to StoreStats
stats.addFileSizes(getSegmentFileSizes(segmentReader));
Expand Down Expand Up @@ -1048,7 +1032,6 @@ private void fillSegmentInfo(SegmentReader segmentReader, boolean verbose, boole
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to get size for [{}]", info.info.name), e);
}
segment.memoryInBytes = segmentReader.ramBytesUsed();
segment.segmentSort = info.info.getIndexSort();
if (verbose) {
segment.ramTree = Accountables.namedAccountable("root", segmentReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
DirectoryReader.open(indexWriter),
shardId
);
internalReaderManager = new OpenSearchReaderManager(
directoryReader,
new RamAccountingRefreshListener(engineConfig.getCircuitBreakerService())
);
internalReaderManager = new OpenSearchReaderManager(directoryReader);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
ExternalReaderManager externalReaderManager = new ExternalReaderManager(internalReaderManager, externalRefreshListener);
success = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.index.engine;

import java.io.IOException;
import java.util.function.BiConsumer;

import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.ReferenceManager;
Expand All @@ -52,23 +51,15 @@
*/
@SuppressForbidden(reason = "reference counting is required here")
class OpenSearchReaderManager extends ReferenceManager<OpenSearchDirectoryReader> {
private final BiConsumer<OpenSearchDirectoryReader, OpenSearchDirectoryReader> refreshListener;

/**
* Creates and returns a new OpenSearchReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the directoryReader to use for future reopens
* @param refreshListener A consumer that is called every time a new reader is opened
*/
OpenSearchReaderManager(
OpenSearchDirectoryReader reader,
BiConsumer<OpenSearchDirectoryReader, OpenSearchDirectoryReader> refreshListener
) {
OpenSearchReaderManager(OpenSearchDirectoryReader reader) {
this.current = reader;
this.refreshListener = refreshListener;
refreshListener.accept(current, null);
}

@Override
Expand All @@ -79,9 +70,6 @@ protected void decRef(OpenSearchDirectoryReader reference) throws IOException {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
final OpenSearchDirectoryReader reader = (OpenSearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
if (reader != null) {
refreshListener.accept(reader, referenceToRefresh);
}
return reader;
}

Expand Down

This file was deleted.

Loading

0 comments on commit fc0d3a3

Please sign in to comment.