Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remove] type support from Bulk API #2215

Merged
merged 4 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultType,
defaultRouting,
null,
defaultPipeline,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.search.SearchHit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -69,9 +66,7 @@
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.fieldFromSource;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasId;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasIndex;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasProperty;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
Expand All @@ -96,17 +91,6 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
);
}

private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(
request,
expectWarningsOnce(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
bulkListener
),
listener
);
}

public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
Expand Down Expand Up @@ -210,7 +194,6 @@ public void testBulkProcessorConcurrentRequests() throws Exception {
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
// we do want to check that we don't get duplicate ids back
Expand Down Expand Up @@ -317,7 +300,6 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
Set<String> readOnlyIds = new HashSet<>();
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
assertThat(bulkItemResponse.getType(), equalTo("_doc"));
if (bulkItemResponse.getIndex().equals("test")) {
assertThat(bulkItemResponse.isFailed(), equalTo(false));
// with concurrent requests > 1 we can't rely on the order of the bulk requests
Expand Down Expand Up @@ -346,7 +328,6 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
// tag::bulk-processor-mix-parameters
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {
Expand All @@ -373,87 +354,13 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
createIndexWithMultipleShards("test");

createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");
final String customType = "testType";
final String ignoredType = "ignoredType";

int numDocs = randomIntBetween(10, 10);
{
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
// Check that untyped document additions inherit the global type
String globalType = customType;
String localType = null;
try (
BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1))
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()
) {

indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, globalType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(globalType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}

}
{
// Check that typed document additions don't inherit the global type
String globalType = ignoredType;
String localType = customType;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (
BulkProcessor processor = initBulkProcessorBuilderUsingTypes(listener)
// let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1))
.setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()
) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, localType);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
{
// Check that untyped document additions and untyped global inherit the established custom type
// (the custom document type introduced to the mapping by the earlier code in this test)
String globalType = null;
String localType = null;
final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
try (
BulkProcessor processor = initBulkProcessorBuilder(listener)
// let's make sure that the bulk action limit trips, one single execution will index all the documents
Expand All @@ -462,23 +369,22 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
.setFlushInterval(TimeValue.timeValueHours(24))
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType(globalType)
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()
) {
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");

indexDocs(processor, numDocs, null, localType, "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);
assertResponseItems(listener.bulkItems, numDocs);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}
Expand All @@ -495,7 +401,6 @@ private MultiGetRequest indexDocs(
String localIndex,
String localType,
String globalIndex,
String globalType,
String globalPipeline
) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
Expand All @@ -510,7 +415,7 @@ private MultiGetRequest indexDocs(
);
} else {
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, XContentType.JSON);
processor.add(data, globalIndex, globalPipeline, XContentType.JSON);
}
multiGetRequest.add(localIndex, Integer.toString(i));
}
Expand Down Expand Up @@ -538,19 +443,14 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
}

private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", null, null, null, null);
return indexDocs(processor, numDocs, "test", null, null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertResponseItems(bulkItemResponses, numDocs, MapperService.SINGLE_MAPPING_NAME);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs, String expectedType) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo(expectedType));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat(
"item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.search.SearchHit;

import java.io.IOException;
Expand All @@ -46,7 +45,6 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasId;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasIndex;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasProperty;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
Expand Down Expand Up @@ -117,7 +115,7 @@ public void testMixPipelineOnRequestAndGlobal() throws IOException {
}

public void testGlobalIndex() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2").source(XContentType.JSON, "field", "bulk2"));

Expand All @@ -129,7 +127,7 @@ public void testGlobalIndex() throws IOException {

@SuppressWarnings("unchecked")
public void testIndexGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest("global_index", null);
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest("local_index").id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(
new IndexRequest().id("2") // will take global index
Expand All @@ -142,31 +140,6 @@ public void testIndexGlobalAndPerRequest() throws IOException {
assertThat(hits, containsInAnyOrder(both(hasId("1")).and(hasIndex("local_index")), both(hasId("2")).and(hasIndex("global_index"))));
}

public void testGlobalType() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index").id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest("index").id("2").source(XContentType.JSON, "field", "bulk2"));

bulkWithTypes(request);

Iterable<SearchHit> hits = searchAll("index");
assertThat(hits, everyItem(hasType("global_type")));
}

public void testTypeGlobalAndPerRequest() throws IOException {
BulkRequest request = new BulkRequest(null, "global_type");
request.add(new IndexRequest("index1", "local_type", "1").source(XContentType.JSON, "field", "bulk1"));
request.add(
new IndexRequest("index2").id("2") // will take global type
.source(XContentType.JSON, "field", "bulk2")
);

bulkWithTypes(request);

Iterable<SearchHit> hits = searchAll("index1", "index2");
assertThat(hits, containsInAnyOrder(both(hasId("1")).and(hasType("local_type")), both(hasId("2")).and(hasType("global_type"))));
}

public void testGlobalRouting() throws IOException {
createIndexWithMultipleShards("index");
BulkRequest request = new BulkRequest((String) null);
Expand Down Expand Up @@ -194,28 +167,6 @@ public void testMixLocalAndGlobalRouting() throws IOException {
assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2")));
}

public void testGlobalIndexNoTypes() throws IOException {
BulkRequest request = new BulkRequest("global_index");
request.add(new IndexRequest().id("1").source(XContentType.JSON, "field", "bulk1"));
request.add(new IndexRequest().id("2").source(XContentType.JSON, "field", "bulk2"));

bulk(request);

Iterable<SearchHit> hits = searchAll("global_index");
assertThat(hits, everyItem(hasIndex("global_index")));
}

private BulkResponse bulkWithTypes(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(
request,
highLevelClient()::bulk,
highLevelClient()::bulkAsync,
expectWarningsOnce(RestBulkAction.TYPES_DEPRECATION_MESSAGE)
);
assertFalse(bulkResponse.hasFailures());
return bulkResponse;
}

private BulkResponse bulk(BulkRequest request) throws IOException {
BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync, RequestOptions.DEFAULT);
assertFalse(bulkResponse.hasFailures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.opensearch.index.VersionType;
import org.opensearch.index.get.GetResult;
import org.opensearch.rest.RestStatus;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
Expand Down Expand Up @@ -441,10 +440,9 @@ public void testMultiGet() throws IOException {
public void testMultiGetWithIds() throws IOException {
BulkRequest bulk = new BulkRequest();
bulk.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulk.add(new IndexRequest("index", "type", "id1").source("{\"field\":\"value1\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "type", "id2").source("{\"field\":\"value2\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "id1").source("{\"field\":\"value1\"}", XContentType.JSON));
bulk.add(new IndexRequest("index", "id2").source("{\"field\":\"value2\"}", XContentType.JSON));

highLevelClient().bulk(bulk, expectWarningsOnce(RestBulkAction.TYPES_DEPRECATION_MESSAGE));
MultiGetRequest multiGetRequest = new MultiGetRequest();
multiGetRequest.add("index", "id1");
multiGetRequest.add("index", "id2");
Expand Down Expand Up @@ -1016,7 +1014,6 @@ private void validateBulkResponses(int nbItems, boolean[] errors, BulkResponse b

assertEquals(i, bulkItemResponse.getItemId());
assertEquals("index", bulkItemResponse.getIndex());
assertEquals("_doc", bulkItemResponse.getType());
assertEquals(String.valueOf(i), bulkItemResponse.getId());

DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ setup:
bulk:
refresh: true
body:
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
- '{"index": {"_index": "test-0"}}'
- '{"ip": "10.0.0.1", "integer": 38, "float": 12.5713, "name": "Ruth", "bool": true}'
- '{"index": {"_index": "test-0", "_type": "_doc"}}'
- '{"index": {"_index": "test-0"}}'
- '{"ip": "10.0.0.2", "integer": 42, "float": 15.3393, "name": "Jackie", "surname": "Bowling", "bool": false}'
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
- '{"index": {"_index": "test-1"}}'
- '{"ip": "10.0.0.3", "integer": 29, "float": 19.0517, "name": "Stephanie", "bool": true}'
- '{"index": {"_index": "test-1", "_type": "_doc"}}'
- '{"index": {"_index": "test-1"}}'
- '{"ip": "10.0.0.4", "integer": 19, "float": 19.3717, "surname": "Hamilton", "bool": true}'
- '{"index": {"_index": "test-2", "_type": "_doc"}}'
- '{"index": {"_index": "test-2"}}'
- '{"ip": "10.0.0.5", "integer": 0, "float": 17.3349, "name": "Natalie", "bool": false}'

---
Expand Down
Loading