Skip to content

Commit

Permalink
Integrates KNN plugin with ConcurrentSearchRequestDecider interface
Browse files Browse the repository at this point in the history
This allows knn queries to enable concurrency when index.search.concurrent_segment_search.mode or
search.concurrent_segment_search.mode in auto mode. Without this the
default behavior of auto mode is non-concurrent search

Signed-off-by: Tejas Shah <shatejas@amazon.com>
  • Loading branch information
shatejas committed Sep 18, 2024
1 parent 004fcc0 commit 921e5d7
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111)
### Bug Fixes
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
### Infrastructure
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/opensearch/knn/plugin/KNNPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.indices.SystemIndexDescriptor;
import org.opensearch.knn.index.KNNCircuitBreaker;
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
import org.opensearch.knn.index.util.KNNClusterUtil;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.KNNSettings;
Expand Down Expand Up @@ -95,6 +96,7 @@
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.ScriptService;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -349,4 +351,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return ImmutableList.of(new SystemIndexDescriptor(MODEL_INDEX_NAME, "Index for storing models used for k-NN indices"));
}

@Override
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import lombok.EqualsAndHashCode;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;

import java.util.Optional;

/**
* Decides if the knn query uses concurrent segment search
* As of 2.17, this is only used when
* - "index.search.concurrent_segment_search.mode": "auto" or
* - "search.concurrent_segment_search.mode": "auto"
*
* Note: the class is not thread-safe and a new instance needs to be created for each request
*/
@EqualsAndHashCode(callSuper = true)
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider {

private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.NO_OP,
"Default decision"
);
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);

private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION;

@Override
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) {
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
knnDecision = YES;
} else {
knnDecision = DEFAULT_KNN_DECISION;
}
}

@Override
public ConcurrentSearchDecision getConcurrentSearchDecision() {
return knnDecision;
}

/**
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true
*/
public static class Factory implements ConcurrentSearchRequestDecider.Factory {
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) {
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
return Optional.of(new KNNConcurrentSearchRequestDecider());
}
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.integ.search;

import com.google.common.primitives.Floats;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.junit.BeforeClass;
import org.opensearch.client.Response;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.KNNJsonIndexMappingsBuilder;
import org.opensearch.knn.KNNRestTestCase;
import org.opensearch.knn.KNNResult;
import org.opensearch.knn.TestUtils;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.plugin.script.KNNScoringUtil;

import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;

/**
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact
* There is no latency verification as it can be better encapsulated in nightly runs.
*/
public class ConcurrentSegmentSearchIT extends KNNRestTestCase {

static TestUtils.TestData testData;

@BeforeClass
public static void setUpClass() throws IOException {
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) {
throw new IllegalStateException("ClassLoader of ConcurrentSegmentSearchIT Class is null");
}
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json");
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv");
assert testIndexVectors != null;
assert testQueries != null;
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath());
}

@SneakyThrows
public void testConcurrentSegmentSearch_thenSucceed() {
String indexName = "test-concurrent-segment";
String fieldName = "test-field-1";
int dimension = testData.indexData.vectors[0].length;
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension);
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder);
String mapping = indexBuilder.toString();
createKnnIndex(indexName, mapping);
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName)));

// Index the test data
for (int i = 0; i < testData.indexData.docs.length; i++) {
addKnnDoc(
indexName,
Integer.toString(testData.indexData.docs[i]),
fieldName,
Floats.asList(testData.indexData.vectors[i]).toArray()
);
}
refreshAllNonSystemIndices();
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto"));

// Test search queries
int k = 10;
verifySearch(indexName, fieldName, k);

updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all"));
verifySearch(indexName, fieldName, k);

deleteKNNIndex(indexName);
}

/*
{
"properties": {
"<fieldName>": {
"type": "knn_vector",
"dimension": <dimension>,
"method": {
"name": "hnsw",
"space_type": "l2",
"engine": "faiss",
"parameters": {
"m": 16,
"ef_construction": 128,
"ef_search": 128
}
}
}
}
*/
@SneakyThrows
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) {
return KNNJsonIndexMappingsBuilder.builder()
.fieldName(fieldName)
.dimension(dimension)
.method(
KNNJsonIndexMappingsBuilder.Method.builder()
.engine(KNNEngine.FAISS.getName())
.methodName(METHOD_HNSW)
.spaceType(SpaceType.L2.getValue())
.parameters(KNNJsonIndexMappingsBuilder.Method.Parameters.builder().efConstruction(128).efSearch(128).m(16).build())
.build()
)
.build()
.getIndexMappingBuilder();
}

@SneakyThrows
private void verifySearch(String indexName, String fieldName, int k) {
for (int i = 0; i < testData.queries.length; i++) {
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build();
Response response = searchKNNIndex(indexName, queryBuilder, k);
String responseBody = EntityUtils.toString(response.getEntity());
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName);
assertEquals(k, knnResults.size());

List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName);
for (int j = 0; j < k; j++) {
float[] primitiveArray = knnResults.get(j).getVector();
assertEquals(
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2),
actualScores.get(j),
0.0001
);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.search;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.search.deciders.ConcurrentSearchDecision;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase {

public void testDecider_thenSucceed() {
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision");

KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider();
assertDecision(noop, decider.getConcurrentSearchDecision());
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);

// Non KNNQueryBuilder
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
assertDecision(noop, decider.getConcurrentSearchDecision());

when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
decider.evaluateForQuery(
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
indexSettingsMock
);
ConcurrentSearchDecision yes = new ConcurrentSearchDecision(
ConcurrentSearchDecision.DecisionStatus.YES,
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
);
assertDecision(yes, decider.getConcurrentSearchDecision());

decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
assertDecision(noop, decider.getConcurrentSearchDecision());
}

public void testDeciderFactory_thenSucceed() {
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory();
IndexSettings indexSettingsMock = mock(IndexSettings.class);
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get());
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
assertTrue(factory.create(indexSettingsMock).isEmpty());
}

private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) {
assertEquals(expected.getDecisionReason(), actual.getDecisionReason());
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.NonNull;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.knn.common.KNNConstants;

import java.io.IOException;

Expand All @@ -26,7 +27,7 @@ public class KNNJsonIndexMappingsBuilder {
private String vectorDataType;
private Method method;

public String getIndexMapping() throws IOException {
public XContentBuilder getIndexMappingBuilder() throws IOException {
if (nestedFieldName != null) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -40,7 +41,7 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
} else {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
Expand All @@ -51,10 +52,14 @@ public String getIndexMapping() throws IOException {
addVectorDataType(xContentBuilder);
addMethod(xContentBuilder);
xContentBuilder.endObject().endObject().endObject();
return xContentBuilder.toString();
return xContentBuilder;
}
}

public String getIndexMapping() throws IOException {
return getIndexMappingBuilder().toString();
}

private void addVectorDataType(final XContentBuilder xContentBuilder) throws IOException {
if (vectorDataType == null) {
return;
Expand Down Expand Up @@ -104,6 +109,7 @@ public static class Parameters {
private Encoder encoder;
private Integer efConstruction;
private Integer efSearch;
private Integer m;

private void addTo(final XContentBuilder xContentBuilder) throws IOException {
xContentBuilder.startObject("parameters");
Expand All @@ -113,6 +119,9 @@ private void addTo(final XContentBuilder xContentBuilder) throws IOException {
if (efSearch != null) {
xContentBuilder.field("ef_search", efSearch);
}
if (m != null) {
xContentBuilder.field(KNNConstants.METHOD_PARAMETER_M, m);
}
addEncoder(xContentBuilder);
xContentBuilder.endObject();
}
Expand Down

0 comments on commit 921e5d7

Please sign in to comment.