Skip to content

Commit

Permalink
Json processor: allow duplicate keys (#74956)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny committed Jul 6, 2021
1 parent 265f783 commit 67fbc33
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 27 deletions.
10 changes: 6 additions & 4 deletions docs/reference/ingest/processors/json.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ Converts a JSON string into a structured JSON object.
.Json Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to be parsed.
| `target_field` | no | `field` | The field that the converted structured object will be written into. Any existing content in this field will be overwritten.
| `add_to_root` | no | false | Flag that forces the serialized json to be injected into the top level of the document. `target_field` must not be set when this option is chosen.
| Name | Required | Default | Description
| `field` | yes | - | The field to be parsed.
| `target_field` | no | `field` | The field that the converted structured object will be written into. Any existing content in this field will be overwritten.
| `add_to_root` | no | false | Flag that forces the serialized json to be injected into the top level of the document. `target_field` must not be set when this option is chosen.
| `allow_duplicate_keys` | no | false | When set to `true`, the JSON parser will not fail if the JSON contains duplicate keys.
Instead, the latest value wins. Allowing duplicate keys also improves execution time.
include::common-options.asciidoc[]
|======

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public XContentType contentType() {
return in.contentType();
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
in.allowDuplicateKeys(allowDuplicateKeys);
}

@Override
public Token nextToken() throws IOException {
return in.nextToken();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ enum NumberType {

XContentType contentType();

void allowDuplicateKeys(boolean allowDuplicateKeys);

Token nextToken() throws IOException;

void skipChildren() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public XContentType contentType() {
return parser.contentType();
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.allowDuplicateKeys(allowDuplicateKeys);
}

@Override
public Token nextToken() throws IOException {
if (level > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ public CborXContentParser(NamedXContentRegistry xContentRegistry,
public XContentType contentType() {
return XContentType.CBOR;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for CBOR");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public XContentType contentType() {
return XContentType.JSON;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
parser.configure(JsonParser.Feature.STRICT_DUPLICATE_DETECTION, allowDuplicateKeys == false);
}

@Override
public Token nextToken() throws IOException {
return convertToken(parser.nextToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ public SmileXContentParser(NamedXContentRegistry xContentRegistry,
public XContentType contentType() {
return XContentType.SMILE;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys after the parser has been created is not possible for Smile");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public XContentType contentType() {
return xContentType;
}

@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
throw new UnsupportedOperationException("Allowing duplicate keys is not possible for maps");
}

@Override
public Token nextToken() throws IOException {
if (iterator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public final class JsonProcessor extends AbstractProcessor {
private final String field;
private final String targetField;
private final boolean addToRoot;
private final boolean allowDuplicateKeys;

JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot) {
JsonProcessor(String tag, String description, String field, String targetField, boolean addToRoot, boolean allowDuplicateKeys) {
super(tag, description);
this.field = field;
this.targetField = targetField;
this.addToRoot = addToRoot;
this.allowDuplicateKeys = allowDuplicateKeys;
}

public String getField() {
Expand All @@ -56,11 +58,12 @@ boolean isAddToRoot() {
return addToRoot;
}

public static Object apply(Object fieldValue) {
public static Object apply(Object fieldValue, boolean allowDuplicateKeys) {
BytesReference bytesRef = fieldValue == null ? new BytesArray("null") : new BytesArray(fieldValue.toString());
try (InputStream stream = bytesRef.streamInput();
XContentParser parser = JsonXContent.jsonXContent
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, stream)) {
parser.allowDuplicateKeys(allowDuplicateKeys);
XContentParser.Token token = parser.nextToken();
Object value = null;
if (token == XContentParser.Token.VALUE_NULL) {
Expand All @@ -84,12 +87,12 @@ public static Object apply(Object fieldValue) {
}
}

public static void apply(Map<String, Object> ctx, String fieldName) {
Object value = apply(ctx.get(fieldName));
public static void apply(Map<String, Object> ctx, String fieldName, boolean allowDuplicateKeys) {
Object value = apply(ctx.get(fieldName), allowDuplicateKeys);
if (value instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> map = (Map<String, Object>) value;
ctx.putAll(map);
ctx.putAll(map);
} else {
throw new IllegalArgumentException("cannot add non-map fields to root of document");
}
Expand All @@ -98,9 +101,9 @@ public static void apply(Map<String, Object> ctx, String fieldName) {
@Override
public IngestDocument execute(IngestDocument document) throws Exception {
if (addToRoot) {
apply(document.getSourceAndMetadata(), field);
apply(document.getSourceAndMetadata(), field, allowDuplicateKeys);
} else {
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class)));
document.setFieldValue(targetField, apply(document.getFieldValue(field, Object.class), allowDuplicateKeys));
}
return document;
}
Expand All @@ -117,6 +120,7 @@ public JsonProcessor create(Map<String, Processor.Factory> registry, String proc
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
boolean addToRoot = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "add_to_root", false);
boolean allowDuplicateKeys = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "allow_duplicate_keys", false);

if (addToRoot && targetField != null) {
throw newConfigurationException(TYPE, processorTag, "target_field",
Expand All @@ -127,7 +131,7 @@ public JsonProcessor create(Map<String, Processor.Factory> registry, String proc
targetField = field;
}

return new JsonProcessor(processorTag, description, field, targetField, addToRoot);
return new JsonProcessor(processorTag, description, field, targetField, addToRoot, allowDuplicateKeys);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static String uppercase(String value) {
* @return structured JSON object
*/
public static Object json(Object fieldValue) {
return JsonProcessor.apply(fieldValue);
return JsonProcessor.apply(fieldValue, false);
}

/**
Expand All @@ -72,7 +72,7 @@ public static Object json(Object fieldValue) {
* contains the JSON string
*/
public static void json(Map<String, Object> map, String field) {
JsonProcessor.apply(map, field);
JsonProcessor.apply(map, field, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void testExecute() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomField = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, randomField, randomTargetField, false, false);
Map<String, Object> document = new HashMap<>();

Map<String, Object> randomJsonMap = RandomDocumentPicks.randomSource(random());
Expand All @@ -47,7 +47,7 @@ public void testExecute() throws Exception {
}

public void testInvalidValue() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", "blah blah");
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -58,7 +58,7 @@ public void testInvalidValue() {
}

public void testByteArray() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", new byte[] { 0, 1 });
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -73,7 +73,7 @@ public void testByteArray() {
}

public void testNull() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
document.put("field", null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand All @@ -82,7 +82,7 @@ public void testNull() throws Exception {
}

public void testBoolean() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
boolean value = true;
document.put("field", value);
Expand All @@ -92,7 +92,7 @@ public void testBoolean() throws Exception {
}

public void testInteger() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
int value = 3;
document.put("field", value);
Expand All @@ -102,7 +102,7 @@ public void testInteger() throws Exception {
}

public void testDouble() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
double value = 3.0;
document.put("field", value);
Expand All @@ -112,7 +112,7 @@ public void testDouble() throws Exception {
}

public void testString() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
String value = "hello world";
document.put("field", "\"" + value + "\"");
Expand All @@ -122,7 +122,7 @@ public void testString() throws Exception {
}

public void testArray() throws Exception {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
List<Boolean> value = Arrays.asList(true, true, false);
document.put("field", value.toString());
Expand All @@ -132,7 +132,7 @@ public void testArray() throws Exception {
}

public void testFieldMissing() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", false, false);
Map<String, Object> document = new HashMap<>();
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);

Expand All @@ -143,7 +143,7 @@ public void testFieldMissing() {
public void testAddToRoot() throws Exception {
String processorTag = randomAlphaOfLength(3);
String randomTargetField = randomAlphaOfLength(2);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true);
JsonProcessor jsonProcessor = new JsonProcessor(processorTag, null, "a", randomTargetField, true, false);
Map<String, Object> document = new HashMap<>();

String json = "{\"a\": 1, \"b\": 2}";
Expand All @@ -159,8 +159,30 @@ public void testAddToRoot() throws Exception {
assertEquals("see", sourceAndMetadata.get("c"));
}

public void testDuplicateKeys() throws Exception {
String processorTag = randomAlphaOfLength(3);
JsonProcessor lenientJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, true);

Map<String, Object> document = new HashMap<>();
String json = "{\"a\": 1, \"a\": 2}";
document.put("a", json);
document.put("c", "see");

IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
lenientJsonProcessor.execute(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();
assertEquals(2, sourceAndMetadata.get("a"));
assertEquals("see", sourceAndMetadata.get("c"));

JsonProcessor strictJsonProcessor = new JsonProcessor(processorTag, null, "a", null, true, false);
Exception exception = expectThrows(IllegalArgumentException.class, () ->
strictJsonProcessor.execute(RandomDocumentPicks.randomIngestDocument(random(), document)));
assertThat(exception.getMessage(), containsString("Duplicate field 'a'"));
}

public void testAddBoolToRoot() {
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true);
JsonProcessor jsonProcessor = new JsonProcessor("tag", null, "field", "target_field", true, false);
Map<String, Object> document = new HashMap<>();
document.put("field", true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ teardown:
ingest.delete_pipeline:
id: "1"
ignore: 404
- do:
ingest.delete_pipeline:
id: "2"
ignore: 404

---
"Test JSON Processor":
Expand Down Expand Up @@ -71,3 +75,36 @@ teardown:
- match: { _source.foo_number: 3 }
- is_true: _source.foo_boolean
- is_false: _source.foo_null

---
"Test JSON Processor duplicate keys":
- do:
ingest.put_pipeline:
id: "2"
body: {
"processors": [
{
"json" : {
"field" : "json",
"add_to_root": true,
"allow_duplicate_keys": true
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 2
pipeline: "2"
body: {
json: "{\"dupe\": 1, \"dupe\": 2}",
}

- do:
get:
index: test
id: 2
- match: { _source.dupe: 2 }
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract class BaseXContentTestCase extends ESTestCase {

protected abstract XContentType xcontentType();

private XContentBuilder builder() throws IOException {
protected XContentBuilder builder() throws IOException {
return XContentBuilder.builder(xcontentType().xContent());
}

Expand Down Expand Up @@ -1149,6 +1149,18 @@ public void testChecksForDuplicates() throws Exception {
}
}

public void testAllowsDuplicates() throws Exception {
XContentBuilder builder = builder()
.startObject()
.field("key", 1)
.field("key", 2)
.endObject();
try (XContentParser xParser = createParser(builder)) {
xParser.allowDuplicateKeys(true);
assertThat(xParser.map(), equalTo(Map.of("key", 2)));
}
}

public void testNamedObject() throws IOException {
Object test1 = new Object();
Object test2 = new Object();
Expand Down
Loading

0 comments on commit 67fbc33

Please sign in to comment.