From 59d99b58aae32c509e11717b14c661cdfe52a727 Mon Sep 17 00:00:00 2001 From: Przemko Robakowski Date: Fri, 6 Dec 2019 21:57:06 +0100 Subject: [PATCH] Allow list of IPs in geoip ingest processor (#49573) * Allow list of IPs in geoip ingest processor This change lets you use array of IPs in addition to string in geoip processor source field. It will set array containing geoip data for each element in source, unless first_only parameter option is enabled, then only first found will be returned. Closes #46193 --- .../ingest/processors/geoip.asciidoc | 1 + .../ingest/geoip/GeoIpProcessor.java | 79 ++++++++---- .../ingest/geoip/GeoIpProcessorTests.java | 114 ++++++++++++++++-- .../test/ingest_geoip/20_geoip_processor.yml | 81 +++++++++++++ 4 files changed, 241 insertions(+), 34 deletions(-) diff --git a/docs/reference/ingest/processors/geoip.asciidoc b/docs/reference/ingest/processors/geoip.asciidoc index 58cc32d629760..84e8ed2c41a8a 100644 --- a/docs/reference/ingest/processors/geoip.asciidoc +++ b/docs/reference/ingest/processors/geoip.asciidoc @@ -25,6 +25,7 @@ uncompressed. The `ingest-geoip` config directory is located at `$ES_CONFIG/inge | `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip module ships with the GeoLite2-City.mmdb, GeoLite2-Country.mmdb and GeoLite2-ASN.mmdb files. | `properties` | no | [`continent_name`, `country_iso_code`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup. | `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document +| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array |====== *Depends on what is available in `database_file`: diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java index 5c82c68d93032..41300f71093a0 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java @@ -41,6 +41,7 @@ import java.net.InetAddress; import java.security.AccessController; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -68,6 +69,7 @@ public final class GeoIpProcessor extends AbstractProcessor { private final Set properties; private final boolean ignoreMissing; private final GeoIpCache cache; + private final boolean firstOnly; /** * Construct a geo-IP processor. @@ -79,15 +81,17 @@ public final class GeoIpProcessor extends AbstractProcessor { * @param properties the properties; ideally this is lazily-loaded once on first use * @param ignoreMissing true if documents with a missing value for the field should be ignored * @param cache a geo-IP cache + * @param firstOnly true if only first result should be returned in case of array */ GeoIpProcessor( - final String tag, - final String field, - final DatabaseReaderLazyLoader lazyLoader, - final String targetField, - final Set properties, - final boolean ignoreMissing, - final GeoIpCache cache) { + final String tag, + final String field, + final DatabaseReaderLazyLoader lazyLoader, + final String targetField, + final Set properties, + final boolean ignoreMissing, + final GeoIpCache cache, + boolean firstOnly) { super(tag); this.field = field; this.targetField = targetField; @@ -95,6 +99,7 @@ public final class GeoIpProcessor extends AbstractProcessor { this.properties = properties; this.ignoreMissing = ignoreMissing; this.cache = cache; + this.firstOnly = firstOnly; } boolean isIgnoreMissing() { @@ -103,7 +108,7 @@ boolean isIgnoreMissing() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws IOException { - String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing); + Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); if (ip == null && ignoreMissing) { return ingestDocument; @@ -111,11 +116,43 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information."); } - final InetAddress ipAddress = InetAddresses.forString(ip); + if (ip instanceof String) { + Map geoData = getGeoData((String) ip); + if (geoData.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, geoData); + } + } else if (ip instanceof List) { + boolean match = false; + List> geoDataList = new ArrayList<>(((List) ip).size()); + for (Object ipAddr : (List) ip) { + if (ipAddr instanceof String == false) { + throw new IllegalArgumentException("array in field [" + field + "] should only contain strings"); + } + Map geoData = getGeoData((String) ipAddr); + if (geoData.isEmpty()) { + geoDataList.add(null); + continue; + } + if (firstOnly) { + ingestDocument.setFieldValue(targetField, geoData); + return ingestDocument; + } + match = true; + geoDataList.add(geoData); + } + if (match) { + ingestDocument.setFieldValue(targetField, geoDataList); + } + } else { + throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings"); + } + return ingestDocument; + } - Map geoData; + private Map getGeoData(String ip) throws IOException { String databaseType = lazyLoader.getDatabaseType(); - + final InetAddress ipAddress = InetAddresses.forString(ip); + Map geoData; if (databaseType.endsWith(CITY_DB_SUFFIX)) { try { geoData = retrieveCityGeoData(ipAddress); @@ -136,12 +173,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException } } else { throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType() - + "]", new IllegalStateException()); - } - if (geoData.isEmpty() == false) { - ingestDocument.setFieldValue(targetField, geoData); + + "]", new IllegalStateException()); } - return ingestDocument; + return geoData; } @Override @@ -360,14 +394,15 @@ public Factory(Map databaseReaders, GeoIpCache @Override public GeoIpProcessor create( - final Map registry, - final String processorTag, - final Map config) throws IOException { + final Map registry, + final String processorTag, + final Map config) throws IOException { String ipField = readStringProperty(TYPE, processorTag, config, "field"); String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip"); String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb"); List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true); DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile); if (lazyLoader == null) { @@ -397,11 +432,11 @@ public GeoIpProcessor create( properties = DEFAULT_ASN_PROPERTIES; } else { throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type [" - + databaseType + "]"); + + databaseType + "]"); } } - return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache); + return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache, firstOnly); } } @@ -460,7 +495,7 @@ public static Property parseProperty(String databaseType, String value) { return property; } catch (IllegalArgumentException e) { throw new IllegalArgumentException("illegal property value [" + value + "]. valid values are " + - Arrays.toString(validProperties.toArray())); + Arrays.toString(validProperties.toArray())); } } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java index b136fbae0376a..f0e9b30ae69ca 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpProcessorTests.java @@ -29,9 +29,11 @@ import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -39,13 +41,14 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class GeoIpProcessorTests extends ESTestCase { public void testCity() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "8.8.8.8"); @@ -70,7 +73,7 @@ public void testCity() throws Exception { public void testNullValueWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -81,7 +84,7 @@ public void testNullValueWithIgnoreMissing() throws Exception { public void testNonExistentWithIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), true, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); processor.execute(ingestDocument); @@ -91,7 +94,7 @@ public void testNonExistentWithIgnoreMissing() throws Exception { public void testNullWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.singletonMap("source_field", null)); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); @@ -102,7 +105,7 @@ public void testNullWithoutIgnoreMissing() throws Exception { public void testNonExistentWithoutIgnoreMissing() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); IngestDocument originalIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); IngestDocument ingestDocument = new IngestDocument(originalIngestDocument); Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument)); @@ -112,7 +115,7 @@ public void testNonExistentWithoutIgnoreMissing() throws Exception { public void testCity_withIpV6() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); String address = "2602:306:33d3:8000::3257:9652"; Map document = new HashMap<>(); @@ -141,7 +144,7 @@ public void testCity_withIpV6() throws Exception { public void testCityWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -158,7 +161,7 @@ public void testCityWithMissingLocation() throws Exception { public void testCountry() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "82.170.213.79"); @@ -178,7 +181,7 @@ public void testCountry() throws Exception { public void testCountryWithMissingLocation() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-Country.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "80.231.5.0"); @@ -196,7 +199,7 @@ public void testAsn() throws Exception { String ip = "82.171.64.0"; GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-ASN.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", ip); @@ -215,7 +218,7 @@ public void testAsn() throws Exception { public void testAddressIsNotInTheDatabase() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "127.0.0.1"); @@ -228,7 +231,7 @@ public void testAddressIsNotInTheDatabase() throws Exception { public void testInvalid() throws Exception { GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, - new GeoIpCache(1000)); + new GeoIpCache(1000), false); Map document = new HashMap<>(); document.put("source_field", "www.google.com"); @@ -237,6 +240,93 @@ public void testInvalid() throws Exception { assertThat(e.getMessage(), containsString("not an IP string literal")); } + public void testListAllValid() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), false); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("8.8.8.8", "82.171.64.0")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + @SuppressWarnings("unchecked") + List> geoData = (List>) ingestDocument.getSourceAndMetadata().get("target_field"); + + Map location = new HashMap<>(); + location.put("lat", 37.751d); + location.put("lon", -97.822d); + assertThat(geoData.get(0).get("location"), equalTo(location)); + + assertThat(geoData.get(1).get("city_name"), equalTo("Hoensbroek")); + } + + public void testListPartiallyValid() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), false); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + @SuppressWarnings("unchecked") + List> geoData = (List>) ingestDocument.getSourceAndMetadata().get("target_field"); + + Map location = new HashMap<>(); + location.put("lat", 37.751d); + location.put("lon", -97.822d); + assertThat(geoData.get(0).get("location"), equalTo(location)); + + assertThat(geoData.get(1), nullValue()); + } + + public void testListNoMatches() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), false); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.1")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + assertFalse(ingestDocument.hasField("target_field")); + } + + public void testListFirstOnly() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), true); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("8.8.8.8", "127.0.0.1")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + @SuppressWarnings("unchecked") + Map geoData = (Map) ingestDocument.getSourceAndMetadata().get("target_field"); + + Map location = new HashMap<>(); + location.put("lat", 37.751d); + location.put("lon", -97.822d); + assertThat(geoData.get("location"), equalTo(location)); + } + + public void testListFirstOnlyNoMatches() throws Exception { + GeoIpProcessor processor = new GeoIpProcessor(randomAlphaOfLength(10), "source_field", + loader("/GeoLite2-City.mmdb"), "target_field", EnumSet.allOf(GeoIpProcessor.Property.class), false, + new GeoIpCache(1000), true); + + Map document = new HashMap<>(); + document.put("source_field", Arrays.asList("127.0.0.1", "127.0.0.2")); + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document); + processor.execute(ingestDocument); + + assertThat(ingestDocument.getSourceAndMetadata().containsKey("target_field"), is(false)); + } + private DatabaseReaderLazyLoader loader(final String path) { final Supplier databaseInputStreamSupplier = () -> GeoIpProcessor.class.getResourceAsStream(path); final CheckedSupplier loader = diff --git a/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml b/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml index 27ab1f4e8747d..f6bdce0532ace 100644 --- a/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml +++ b/modules/ingest-geoip/src/test/resources/rest-api-spec/test/ingest_geoip/20_geoip_processor.yml @@ -37,6 +37,87 @@ - match: { _source.geoip.region_name: "Minnesota" } - match: { _source.geoip.continent_name: "North America" } +--- +"Test geoip processor with list": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "geoip" : { + "field" : "field1", + "first_only" : false + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {field1: ["128.101.101.101", "127.0.0.1"]} + + - do: + get: + index: test + id: 1 + - match: { _source.field1: ["128.101.101.101", "127.0.0.1"] } + - length: { _source.geoip: 2 } + - length: { _source.geoip.0: 6 } + - match: { _source.geoip.0.city_name: "Minneapolis" } + - match: { _source.geoip.0.country_iso_code: "US" } + - match: { _source.geoip.0.location.lon: -93.2548 } + - match: { _source.geoip.0.location.lat: 44.9399 } + - match: { _source.geoip.0.region_iso_code: "US-MN" } + - match: { _source.geoip.0.region_name: "Minnesota" } + - match: { _source.geoip.0.continent_name: "North America" } + - match: { _source.geoip.1: null } + +--- +"Test geoip processor with lists, first only": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "geoip" : { + "field" : "field1" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"]} + + - do: + get: + index: test + id: 1 + - match: { _source.field1: ["127.0.0.1", "128.101.101.101", "128.101.101.101"] } + - length: { _source.geoip: 6 } + - match: { _source.geoip.city_name: "Minneapolis" } + - match: { _source.geoip.country_iso_code: "US" } + - match: { _source.geoip.location.lon: -93.2548 } + - match: { _source.geoip.location.lat: 44.9399 } + - match: { _source.geoip.region_iso_code: "US-MN" } + - match: { _source.geoip.region_name: "Minnesota" } + - match: { _source.geoip.continent_name: "North America" } + --- "Test geoip processor with fields": - do: