Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Json processor: allow duplicate keys #74956

Merged
merged 11 commits into from
Jul 6, 2021
10 changes: 6 additions & 4 deletions docs/reference/ingest/processors/json.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ Converts a JSON string into a structured JSON object.
.Json Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to be parsed.
| `target_field` | no | `field` | The field that the converted structured object will be written into. Any existing content in this field will be overwritten.
| `add_to_root` | no | false | Flag that forces the serialized json to be injected into the top level of the document. `target_field` must not be set when this option is chosen.
| Name | Required | Default | Description
| `field` | yes | - | The field to be parsed.
| `target_field` | no | `field` | The field that the converted structured object will be written into. Any existing content in this field will be overwritten.
| `add_to_root` | no | false | Flag that forces the serialized json to be injected into the top level of the document. `target_field` must not be set when this option is chosen.
| `allow_duplicate_keys` | no | false | When set to `true`, the JSON parser will not fail if the JSON contains duplicate keys.
Instead, the latest value wins. Allowing duplicate keys also improves execution time.
include::common-options.asciidoc[]
|======

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public XContentType contentType() {
return in.contentType();
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
in.allowDuplicateKeys(allowDuplicateKeys);
}

@Override
public Token nextToken() throws IOException {
return in.nextToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ enum NumberType {

XContentType contentType();

void allowDuplicateKeys(boolean allowDuplicateKeys);

Token nextToken() throws IOException;

void skipChildren() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public XContentType contentType() {
return parser.contentType();
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.allowDuplicateKeys(allowDuplicateKeys);
}

@Override
public Token nextToken() throws IOException {
if (level > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ public CborXContentParser(NamedXContentRegistry xContentRegistry,
public XContentType contentType() {
return XContentType.CBOR;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for CBOR");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public XContentType contentType() {
return XContentType.JSON;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, allowDuplicateKeys == false);
}

@Override
public Token nextToken() throws IOException {
return convertToken(parser.nextToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ public SmileXContentParser(NamedXContentRegistry xContentRegistry,
public XContentType contentType() {
return XContentType.SMILE;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for Smile");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public XContentType contentType() {
return xContentType;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys is not possible for maps");
}

@Override
public Token nextToken() throws IOException {
if (iterator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public final class JsonProcessor extends AbstractProcessor {
private final String field;
private final String targetField;
private final boolean addToRoot;
private final boolean allowDuplicateKeys;

JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot) {
JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot, boolean allowDuplicateKeys) {
super(tag, description);
this.field = field;
this.targetField = targetField;
this.addToRoot = addToRoot;
this.allowDuplicateKeys = allowDuplicateKeys;
}

public String getField() {
Expand All @@ -56,11 +58,12 @@ boolean isAddToRoot() {
return addToRoot;
}

public static Object apply(Object fieldValue) {
public static Object apply(Object fieldValue, boolean allowDuplicateKeys) {
BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
try (InputStream stream = bytesRef.streamInput();
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
parser.allowDuplicateKeys(allowDuplicateKeys);
XContentParser.Token token = parser.nextToken();
Object value = null;
if (token == XContentParser.Token.VALUE_NULL) {
Expand All @@ -84,12 +87,12 @@ public static Object apply(Object fieldValue) {
}
}

public static void apply(Map<String, Object> ctx, String fieldName) {
Object value = apply(ctx.get(fieldName));
public static void apply(Map<String, Object> ctx, String fieldName, boolean allowDuplicateKeys) {
Object value = apply(ctx.get(fieldName), allowDuplicateKeys);
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) value;
ctx.putAll(map);
ctx.putAll(map);
} else {
throw new IllegalArgumentException("cannot add non-map fields to root of document");
}
Expand All @@ -98,9 +101,9 @@ public static void apply(Map<String, Object> ctx, String fieldName) {
@Override
public IngestDocument execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
apply(document.getSourceAndMetadata(), field, allowDuplicateKeys);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class), allowDuplicateKeys));
}
return document;
}
Expand All @@ -117,6 +120,7 @@ public JsonProcessor create(Map<String, Processor.Factory> registry, String proc
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
boolean addToRoot = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "add_to_root", false);
boolean allowDuplicateKeys = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicate_keys", false);

if (addToRoot && targetField != null) {
throw newConfigurationException(TYPE, processorTag, "target_field",
Expand All @@ -127,7 +131,7 @@ public JsonProcessor create(Map<String, Processor.Factory> registry, String proc
targetField = field;
}

return new JsonProcessor(processorTag, description, field, targetField, addToRoot);
return new JsonProcessor(processorTag, description, field, targetField, addToRoot, allowDuplicateKeys);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static String uppercase(String value) {
* @return structured JSON object
*/
public static Object json(Object fieldValue) {
return JsonProcessor.apply(fieldValue);
return JsonProcessor.apply(fieldValue, false);
}

/**
Expand All @@ -72,7 +72,7 @@ public static Object json(Object fieldValue) {
* contains the JSON string
*/
public static void json(Map<String, Object> map, String field) {
JsonProcessor.apply(map, field);
JsonProcessor.apply(map, field, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testExecute() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomField = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false, false);
Map<String, Object> document = new HashMap<>();

Map<String, Object> randomJsonMap = RandomDocumentPicks.randomSource(random());
Expand All @@ -47,7 +47,7 @@ public void testExecute() throws Exception {
}

public void testInvalidValue() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", "blah blah");
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -58,7 +58,7 @@ public void testInvalidValue() {
}

public void testByteArray() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", new byte[] { 0, 1 });
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -73,7 +73,7 @@ public void testByteArray() {
}

public void testNull() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -82,7 +82,7 @@ public void testNull() throws Exception {
}

public void testBoolean() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
boolean value = true;
document.put("field", value);
Expand All @@ -92,7 +92,7 @@ public void testBoolean() throws Exception {
}

public void testInteger() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
int value = 3;
document.put("field", value);
Expand All @@ -102,7 +102,7 @@ public void testInteger() throws Exception {
}

public void testDouble() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
double value = 3.0;
document.put("field", value);
Expand All @@ -112,7 +112,7 @@ public void testDouble() throws Exception {
}

public void testString() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
String value = "hello world";
document.put("field", "\"" + value + "\"");
Expand All @@ -122,7 +122,7 @@ public void testString() throws Exception {
}

public void testArray() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
List<Boolean> value = Arrays.asList(true, true, false);
document.put("field", value.toString());
Expand All @@ -132,7 +132,7 @@ public void testArray() throws Exception {
}

public void testFieldMissing() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);

Expand All @@ -143,7 +143,7 @@ public void testFieldMissing() {
public void testAddToRoot() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true, false);
Map<String, Object> document = new HashMap<>();

String json = "{\"a\": 1, \"b\": 2}";
Expand All @@ -159,8 +159,30 @@ public void testAddToRoot() throws Exception {
assertEquals("see", sourceAndMetadata.get("c"));
}

public void testDuplicateKeys() throws Exception {
String processorTag = randomAlphaOfLength(3);
JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, true);

Map<String, Object> document = new HashMap<>();
String json = "{\"a\": 1, \"a\": 2}";
document.put("a", json);
document.put("c", "see");

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
lenientJsonProcessor.execute(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
assertEquals(2, sourceAndMetadata.get("a"));
assertEquals("see", sourceAndMetadata.get("c"));

JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, false);
Exception exception = expectThrows(IllegalArgumentException.class, () ->
strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), document)));
assertThat(exception.getMessage(), containsString("Duplicate field 'a'"));
}

public void testAddBoolToRoot() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true, false);
Map<String, Object> document = new HashMap<>();
document.put("field", true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ teardown:
ingest.delete_pipeline:
id: "1"
ignore: 404
- do:
ingest.delete_pipeline:
id: "2"
ignore: 404

---
"Test JSON Processor":
Expand Down Expand Up @@ -71,3 +75,36 @@ teardown:
- match: { _source.foo_number: 3 }
- is_true: _source.foo_boolean
- is_false: _source.foo_null

---
"Test JSON Processor duplicate keys":
- do:
ingest.put_pipeline:
id: "2"
body: {
"processors": [
{
"json" : {
"field" : "json",
"add_to_root": true,
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
"allow_duplicate_keys": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 2
pipeline: "2"
body: {
json: "{\"dupe\": 1, \"dupe\": 2}",
}

- do:
get:
index: test
id: 2
- match: { _source.dupe: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class BaseXContentTestCase extends ESTestCase {

protected abstract XContentType xcontentType();

private XContentBuilder builder() throws IOException {
protected XContentBuilder builder() throws IOException {
return XContentBuilder.builder(xcontentType().xContent());
}

Expand Down Expand Up @@ -1149,6 +1149,18 @@ public void testChecksForDuplicates() throws Exception {
}
}

public void testAllowsDuplicates() throws Exception {
XContentBuilder builder = builder()
.startObject()
.field("key", 1)
.field("key", 2)
.endObject();
try (XContentParser xParser = createParser(builder)) {
xParser.allowDuplicateKeys(true);
assertThat(xParser.map(), equalTo(Map.of("key", 2)));
}
}

public void testNamedObject() throws IOException {
Object test1 = new Object();
Object test2 = new Object();
Expand Down
Loading