Skip to content

Commit

Permalink
Allow list of IPs in geoip ingest processor (#49573)
Browse files Browse the repository at this point in the history
* Allow list of IPs in geoip ingest processor

This change lets you use array of IPs in addition to string in geoip processor source field.
It will set array containing geoip data for each element in source, unless first_only parameter
option is enabled, then only first found will be returned.

Closes #46193
  • Loading branch information
probakowski committed Dec 6, 2019
1 parent 356d1a2 commit c57032f
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/reference/ingest/processors/geoip.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ uncompressed. The `ingest-geoip` config directory is located at `$ES_CONFIG/inge
| `database_file` | no | GeoLite2-City.mmdb | The database filename in the geoip config directory. The ingest-geoip module ships with the GeoLite2-City.mmdb, GeoLite2-Country.mmdb and GeoLite2-ASN.mmdb files.
| `properties` | no | [`continent_name`, `country_iso_code`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
|======

*Depends on what is available in `database_file`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -68,6 +69,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
private final Set<Property> properties;
private final boolean ignoreMissing;
private final GeoIpCache cache;
private final boolean firstOnly;

/**
* Construct a geo-IP processor.
Expand All @@ -79,22 +81,25 @@ public final class GeoIpProcessor extends AbstractProcessor {
* @param properties the properties; ideally this is lazily-loaded once on first use
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param cache a geo-IP cache
* @param firstOnly true if only first result should be returned in case of array
*/
GeoIpProcessor(
final String tag,
final String field,
final DatabaseReaderLazyLoader lazyLoader,
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache) {
final String tag,
final String field,
final DatabaseReaderLazyLoader lazyLoader,
final String targetField,
final Set<Property> properties,
final boolean ignoreMissing,
final GeoIpCache cache,
boolean firstOnly) {
super(tag);
this.field = field;
this.targetField = targetField;
this.lazyLoader = lazyLoader;
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.cache = cache;
this.firstOnly = firstOnly;
}

boolean isIgnoreMissing() {
Expand All @@ -103,19 +108,51 @@ boolean isIgnoreMissing() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws IOException {
String ip = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);

if (ip == null && ignoreMissing) {
return ingestDocument;
} else if (ip == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
}

final InetAddress ipAddress = InetAddresses.forString(ip);
if (ip instanceof String) {
Map<String, Object> geoData = getGeoData((String) ip);
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
}
} else if (ip instanceof List) {
boolean match = false;
List<Map<String, Object>> geoDataList = new ArrayList<>(((List) ip).size());
for (Object ipAddr : (List) ip) {
if (ipAddr instanceof String == false) {
throw new IllegalArgumentException("array in field [" + field + "] should only contain strings");
}
Map<String, Object> geoData = getGeoData((String) ipAddr);
if (geoData.isEmpty()) {
geoDataList.add(null);
continue;
}
if (firstOnly) {
ingestDocument.setFieldValue(targetField, geoData);
return ingestDocument;
}
match = true;
geoDataList.add(geoData);
}
if (match) {
ingestDocument.setFieldValue(targetField, geoDataList);
}
} else {
throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings");
}
return ingestDocument;
}

Map<String, Object> geoData;
private Map<String, Object> getGeoData(String ip) throws IOException {
String databaseType = lazyLoader.getDatabaseType();

final InetAddress ipAddress = InetAddresses.forString(ip);
Map<String, Object> geoData;
if (databaseType.endsWith(CITY_DB_SUFFIX)) {
try {
geoData = retrieveCityGeoData(ipAddress);
Expand All @@ -136,12 +173,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws IOException
}
} else {
throw new ElasticsearchParseException("Unsupported database type [" + lazyLoader.getDatabaseType()
+ "]", new IllegalStateException());
}
if (geoData.isEmpty() == false) {
ingestDocument.setFieldValue(targetField, geoData);
+ "]", new IllegalStateException());
}
return ingestDocument;
return geoData;
}

@Override
Expand Down Expand Up @@ -360,14 +394,15 @@ public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache

@Override
public GeoIpProcessor create(
final Map<String, Processor.Factory> registry,
final String processorTag,
final Map<String, Object> config) throws IOException {
final Map<String, Processor.Factory> registry,
final String processorTag,
final Map<String, Object> config) throws IOException {
String ipField = readStringProperty(TYPE, processorTag, config, "field");
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);

DatabaseReaderLazyLoader lazyLoader = databaseReaders.get(databaseFile);
if (lazyLoader == null) {
Expand Down Expand Up @@ -397,11 +432,11 @@ public GeoIpProcessor create(
properties = DEFAULT_ASN_PROPERTIES;
} else {
throw newConfigurationException(TYPE, processorTag, "database_file", "Unsupported database type ["
+ databaseType + "]");
+ databaseType + "]");
}
}

return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache);
return new GeoIpProcessor(processorTag, ipField, lazyLoader, targetField, properties, ignoreMissing, cache, firstOnly);
}
}

Expand Down Expand Up @@ -460,7 +495,7 @@ public static Property parseProperty(String databaseType, String value) {
return property;
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("illegal property value [" + value + "]. valid values are " +
Arrays.toString(validProperties.toArray()));
Arrays.toString(validProperties.toArray()));
}
}
}
Expand Down
Loading

0 comments on commit c57032f

Please sign in to comment.