Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query parameters to delete request cache on heap and disk selectively #15

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ static Request clearCache(ClearIndicesCacheRequest clearIndicesCacheRequest) {
parameters.putParam("fielddata", Boolean.toString(clearIndicesCacheRequest.fieldDataCache()));
parameters.putParam("file", Boolean.toString(clearIndicesCacheRequest.fileCache()));
parameters.putParam("request", Boolean.toString(clearIndicesCacheRequest.requestCache()));
parameters.putParam("requestOnDisk", Boolean.toString(clearIndicesCacheRequest.requestCacheOnDisk()));
parameters.putParam("requestOnHeap", Boolean.toString(clearIndicesCacheRequest.requestCacheOnHeap()));
parameters.putParam("fields", String.join(",", clearIndicesCacheRequest.fields()));
request.addParameters(parameters.asMap());
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,94 @@ public void testClearCache() throws IOException {
}
}

public void testClearRequestCacheOnDisk() throws IOException {
{
String index = "index";
Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build();
createIndex(index, settings);
ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest(index);
clearCacheRequest.requestCacheOnDisk(true);
ClearIndicesCacheResponse clearCacheResponse = execute(
clearCacheRequest,
highLevelClient().indices()::clearCache,
highLevelClient().indices()::clearCacheAsync
);
assertThat(clearCacheResponse.getTotalShards(), equalTo(1));
assertThat(clearCacheResponse.getSuccessfulShards(), equalTo(1));
assertThat(clearCacheResponse.getFailedShards(), equalTo(0));
assertThat(clearCacheResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest(nonExistentIndex);
OpenSearchException exception = expectThrows(
OpenSearchException.class,
() -> execute(clearCacheRequest, highLevelClient().indices()::clearCache, highLevelClient().indices()::clearCacheAsync)
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}

public void testClearRequestCacheOnHeap() throws IOException {
{
String index = "index";
Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build();
createIndex(index, settings);
ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest(index);
clearCacheRequest.requestCacheOnHeap(true);
ClearIndicesCacheResponse clearCacheResponse = execute(
clearCacheRequest,
highLevelClient().indices()::clearCache,
highLevelClient().indices()::clearCacheAsync
);
assertThat(clearCacheResponse.getTotalShards(), equalTo(1));
assertThat(clearCacheResponse.getSuccessfulShards(), equalTo(1));
assertThat(clearCacheResponse.getFailedShards(), equalTo(0));
assertThat(clearCacheResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest(nonExistentIndex);
OpenSearchException exception = expectThrows(
OpenSearchException.class,
() -> execute(clearCacheRequest, highLevelClient().indices()::clearCache, highLevelClient().indices()::clearCacheAsync)
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}

public void testClearRequestCacheOnHeapAndOnDisk() throws IOException {
{
String index = "index";
Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build();
createIndex(index, settings);
ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest(index);
clearCacheRequest.requestCacheOnDisk(true);
clearCacheRequest.requestCacheOnHeap(true);
ClearIndicesCacheResponse clearCacheResponse = execute(
clearCacheRequest,
highLevelClient().indices()::clearCache,
highLevelClient().indices()::clearCacheAsync
);
assertThat(clearCacheResponse.getTotalShards(), equalTo(1));
assertThat(clearCacheResponse.getSuccessfulShards(), equalTo(1));
assertThat(clearCacheResponse.getFailedShards(), equalTo(0));
assertThat(clearCacheResponse.getShardFailures(), equalTo(BroadcastResponse.EMPTY));
}
{
String nonExistentIndex = "non_existent_index";
assertFalse(indexExists(nonExistentIndex));
ClearIndicesCacheRequest clearCacheRequest = new ClearIndicesCacheRequest(nonExistentIndex);
OpenSearchException exception = expectThrows(
OpenSearchException.class,
() -> execute(clearCacheRequest, highLevelClient().indices()::clearCache, highLevelClient().indices()::clearCacheAsync)
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}
}

public void testForceMerge() throws IOException {
{
String index = "index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,14 @@ public void testClearCache() {
clearIndicesCacheRequest.fields(RequestConvertersTests.randomIndicesNames(1, 5));
expectedParams.put("fields", String.join(",", clearIndicesCacheRequest.fields()));
}
if (OpenSearchTestCase.randomBoolean()) {
clearIndicesCacheRequest.requestCacheOnDisk(OpenSearchTestCase.randomBoolean());
}
expectedParams.put("requestOnDisk", Boolean.toString(clearIndicesCacheRequest.requestCacheOnDisk()));
if (OpenSearchTestCase.randomBoolean()) {
clearIndicesCacheRequest.requestCacheOnHeap(OpenSearchTestCase.randomBoolean());
}
expectedParams.put("requestOnHeap", Boolean.toString(clearIndicesCacheRequest.requestCacheOnHeap()));
if (OpenSearchTestCase.randomBoolean()) {
clearIndicesCacheRequest.fileCache(OpenSearchTestCase.randomBoolean());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,14 @@ public void testClearCache() throws Exception {
request.requestCache(true); // <1>
// end::clear-cache-request-request

// tag::clear-cache-request-onDisk
request.requestCacheOnDisk(true); // <1>
// end::clear-cache-request-onDisk

// tag::clear-cache-request-onHeap
request.requestCacheOnHeap(true); // <1>
// end::clear-cache-request-onHeap

// tag::clear-cache-request-fielddata
request.fieldDataCache(true); // <1>
// end::clear-cache-request-fielddata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,54 @@ public void testDiskTierInvalidationByCleanCacheAPI() throws Exception {
assertEquals(0, entries);
}

public void testDiskTierInvalidationByCleanCacheAPI_DiskOnly() throws Exception {
int cleanupIntervalInMillis = 10_000_000; // setting this intentionally high so that we don't get background cleanups
int heapSizeBytes = 9876;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(cleanupIntervalInMillis))
.put(IndicesService.INDICES_REQUEST_CACHE_DISK_CLEAN_THRESHOLD_SETTING.getKey(), "0%")
.put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes))
.put(DiskTierTookTimePolicy.DISK_TOOKTIME_THRESHOLD_SETTING.getKey(), TimeValue.ZERO) // allow into disk cache regardless of took time
);
Client client = client(node);

Settings.Builder indicesSettingBuilder = Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0);

assertAcked(
client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get()
);

indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
ensureSearchable("index");
client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get();
int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP);
assertTrue(heapSizeBytes > requestSize);
// If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query
// as the cache size setting is not dynamic
int numOnDisk = 2;
int numRequests = heapSizeBytes / requestSize + numOnDisk;
for (int i = 1; i < numRequests; i++) {
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get();
assertSearchResponse(resp);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false);
IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false);
}
// make sure we have 2 entries in disk.
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 2, TierType.DISK);

// call clear cache api
client.admin().indices().prepareClearCache().setIndices("index").setRequestCacheOnDisk(true).get();

// make sure we have 0 entries in disk.
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 0, TierType.DISK);
// make sure we have entries on-heap were not deleted
IndicesRequestCacheIT.assertNumCacheEntries(client, "index", 14, TierType.ON_HEAP);
}

// When entire disk tier is stale, test whether cache cleaner cleans up everything from disk
public void testDiskTierInvalidationByCacheCleanerEntireDiskTier() throws Exception {
int thresholdInMillis = 4_000;
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/action/InputValidator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action;

public interface InputValidator {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like something like this must already exist somewhere... seems like a general problem that many APIs would have to solve

void validateInput() throws IllegalArgumentException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.indices.cache.clear;

import org.opensearch.Version;
import org.opensearch.action.InputValidator;
import org.opensearch.action.support.broadcast.BroadcastRequest;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -45,12 +46,14 @@
*
* @opensearch.internal
*/
public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCacheRequest> {
public class ClearIndicesCacheRequest extends BroadcastRequest<ClearIndicesCacheRequest> implements InputValidator {

private boolean queryCache = false;
private boolean fieldDataCache = false;
private boolean requestCache = false;
private boolean fileCache = false;
private boolean requestCacheOnDisk = false;
private boolean requestCacheOnHeap = false;
private String[] fields = Strings.EMPTY_ARRAY;

public ClearIndicesCacheRequest(StreamInput in) throws IOException {
Expand All @@ -59,6 +62,12 @@ public ClearIndicesCacheRequest(StreamInput in) throws IOException {
fieldDataCache = in.readBoolean();
fields = in.readStringArray();
requestCache = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
requestCacheOnDisk = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
requestCacheOnHeap = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_2_8_0)) {
fileCache = in.readBoolean();
}
Expand All @@ -81,11 +90,29 @@ public boolean requestCache() {
return this.requestCache;
}

public boolean requestCacheOnDisk() {
return this.requestCacheOnDisk;
}

public boolean requestCacheOnHeap() {
return this.requestCacheOnHeap;
}

public ClearIndicesCacheRequest requestCache(boolean requestCache) {
this.requestCache = requestCache;
return this;
}

public ClearIndicesCacheRequest requestCacheOnDisk(boolean requestCacheOnDisk) {
this.requestCacheOnDisk = requestCacheOnDisk;
return this;
}

public ClearIndicesCacheRequest requestCacheOnHeap(boolean requestCacheOnHeap) {
this.requestCacheOnHeap = requestCacheOnHeap;
return this;
}

public boolean fieldDataCache() {
return this.fieldDataCache;
}
Expand Down Expand Up @@ -120,8 +147,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(fieldDataCache);
out.writeStringArrayNullable(fields);
out.writeBoolean(requestCache);
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeBoolean(requestCacheOnDisk);
}
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeBoolean(requestCacheOnHeap);
}
kiranprakash154 marked this conversation as resolved.
Show resolved Hide resolved
if (out.getVersion().onOrAfter(Version.V_2_8_0)) {
out.writeBoolean(fileCache);
}
}

public void validateInput() {
if (requestCache && requestCacheOnDisk) {
throw new IllegalArgumentException("Invalid parameters: cannot have both request and requestCacheOnDisk set to true");
}
if (requestCache && requestCacheOnHeap) {
throw new IllegalArgumentException("Invalid parameters: cannot have both request and requestCacheOnHeap set to true");
}
}
Comment on lines +161 to +168
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove this for now and keep it consistent with rest of the actions. Move this logic to the actual function.
Doesn't look much useful to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the validity of the ClearIndicesCacheRequest should be scoped to the ClearIndicesCacheRequest class right ?
the caller of this class i.e RestClearIndicesCacheAction should not be concerned about that.

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public ClearIndicesCacheRequestBuilder setRequestCache(boolean requestCache) {
return this;
}

public ClearIndicesCacheRequestBuilder setRequestCacheOnDisk(boolean requestCacheOnDisk) {
request.requestCacheOnDisk(requestCacheOnDisk);
return this;
}

public ClearIndicesCacheRequestBuilder setRequestCacheOnHeap(boolean requestCacheOnHeap) {
request.requestCacheOnHeap(requestCacheOnHeap);
return this;
}

public ClearIndicesCacheRequestBuilder setFileCache(boolean fileCache) {
request.fileCache(fileCache);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ protected EmptyResult shardOperation(ClearIndicesCacheRequest request, ShardRout
request.queryCache(),
request.fieldDataCache(),
request.requestCache(),
request.requestCacheOnDisk(),
request.requestCacheOnHeap(),
request.fields()
);
return EmptyResult.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,32 @@ public void close() {
}

void clear(CacheEntity entity) {
clear(entity, true, true);
}

void clearDiskOnly(CacheEntity entity) {
clear(entity, false, true);
}

void clearHeapOnly(CacheEntity entity) {
clear(entity, true, false);
}

private void clear(CacheEntity entity, boolean cleanHeap, boolean cleanDisk) {
CleanupKey cleanupKey = new CleanupKey(entity, null);
keysToClean.put(cleanupKey, new CleanupStatus());
updateStaleKeysInDiskCount(cleanupKey);
cleanCache();
if (cleanHeap) {
cleanCache();
}
if (cleanDisk) {
/*
this would be triggered by the cache clear API call
we need to make sure we clean the disk cache as well
hence passing threshold as 0
*/
cleanDiskCache(0);
cleanDiskCache(0);
}
}

@Override
Expand Down
30 changes: 24 additions & 6 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1959,13 +1959,31 @@ private QueryRewriteContext getRewriteContext(LongSupplier nowInMillis, boolean
/**
* Clears the caches for the given shard id if the shard is still allocated on this node
*/
public void clearIndexShardCache(ShardId shardId, boolean queryCache, boolean fieldDataCache, boolean requestCache, String... fields) {
public void clearIndexShardCache(
ShardId shardId,
boolean queryCache,
boolean fieldDataCache,
boolean requestCache,
boolean requestCacheOnDisk,
boolean requestCacheOnHeap,
String... fields) {
final IndexService service = indexService(shardId.getIndex());
if (service != null) {
IndexShard shard = service.getShardOrNull(shardId.id());
final boolean clearedAtLeastOne = service.clearCaches(queryCache, fieldDataCache, fields);
if ((requestCache || (clearedAtLeastOne == false && fields.length == 0)) && shard != null) {
indicesRequestCache.clear(new IndexShardCacheEntity(shard));
if (service == null) {
return;
}
IndexShard shard = service.getShardOrNull(shardId.id());
final boolean clearedAtLeastOne = service.clearCaches(queryCache, fieldDataCache, fields);

if ((requestCache || (clearedAtLeastOne == false && fields.length == 0)) && shard != null) {
IndexShardCacheEntity indexShardCacheEntity = new IndexShardCacheEntity(shard);
if(requestCacheOnDisk && requestCacheOnHeap) {
indicesRequestCache.clear(indexShardCacheEntity);
} else if(requestCacheOnDisk) {
indicesRequestCache.clearDiskOnly(indexShardCacheEntity);
} else if (requestCacheOnHeap) {
indicesRequestCache.clearHeapOnly(indexShardCacheEntity);
} else {
indicesRequestCache.clear(indexShardCacheEntity);
}
}
}
Expand Down
Loading
Loading