Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stats aggregation for string terms #47468

Merged
merged 14 commits into from
Nov 14, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
package org.elasticsearch.xpack.analytics;

import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;

public class DataScienceAggregationBuilders {
public class AnalyticsAggregationBuilders {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, good catch, thanks for the fix :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the embarrassing typo below heh :)


public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCaardinality(String name, String bucketsPath) {
public static CumulativeCardinalityPipelineAggregationBuilder cumulativeCardinality(String name, String bucketsPath) {
return new CumulativeCardinalityPipelineAggregationBuilder(name, bucketsPath);
}

public static StringStatsAggregationBuilder stringStats(String name) {
return new StringStatsAggregationBuilder(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.action.AnalyticsInfoTransportAction;
import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;

import java.util.Arrays;
import java.util.List;
Expand All @@ -38,11 +40,23 @@ public AnalyticsPlugin() { }

@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder::parse));
return singletonList(
new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder::parse)
);
}

@Override
public List<AggregationSpec> getAggregations() {
return singletonList(
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
StringStatsAggregationBuilder::parse).addResultReader(InternalStringStats::new)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.stringstats;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class InternalStringStats extends InternalAggregation {

enum Metrics {
count, min_length, max_length, avg_length, entropy;

public static Metrics resolve(String name) {
return Metrics.valueOf(name);
}
}

private static final DocValueFormat DEFAULT_FORMAT = DocValueFormat.RAW;

private DocValueFormat format = DEFAULT_FORMAT;
private final boolean showDistribution;
private final long count;
private final long totalLength;
private final int minLength;
private final int maxLength;
private final Map<String, Long> charOccurrences;

public InternalStringStats(String name, long count, long totalLength, int minLength, int maxLength,
Map<String, Long> charOccurences, boolean showDistribution,
DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.format = formatter;
this.showDistribution = showDistribution;
this.count = count;
this.totalLength = totalLength;
this.minLength = minLength;
this.maxLength = maxLength;
this.charOccurrences = charOccurences;
}

/** Read from a stream. */
public InternalStringStats(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
showDistribution = in.readBoolean();
count = in.readVLong();
totalLength = in.readVLong();
minLength = in.readVInt();
maxLength = in.readVInt();
charOccurrences = in.<String, Long>readMap(StreamInput::readString, StreamInput::readLong);
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeBoolean(showDistribution);
out.writeVLong(count);
out.writeVLong(totalLength);
out.writeVInt(minLength);
out.writeVInt(maxLength);
out.writeMap(charOccurrences, StreamOutput::writeString, StreamOutput::writeLong);
}

public String getWriteableName() {
return StatsAggregationBuilder.NAME;
}

public long getCount() {
return count;
}

public int getMinLength() {
return minLength;
}

public int getMaxLength() {
return maxLength;
}

public double getAvgLength() {
return (double) totalLength / count;
}

public double getEntropy() {
double sum = 0.0;
double compensation = 0.0;
for (double p : getDistribution().values()) {
if (p > 0) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double value = p * log2(p);
if (Double.isFinite(value) == false) {
sum += value;
} else if (Double.isFinite(sum)) {
double corrected = value - compensation;
double newSum = sum + corrected;
compensation = (newSum - sum) - corrected;
sum = newSum;
}
}
}
return -sum;
}

/**
* Convert the character occurrences map to character frequencies.
*
* @return A map with the character as key and the probability of
* this character to occur as value. The map is ordered by frequency descending.
*/
public Map<String, Double> getDistribution() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Does this need to be public? Looked to me like it was only called within the package

return charOccurrences.entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.collect(
Collectors.toMap(e -> e.getKey(), e -> (double) e.getValue() / totalLength,
(e1, e2) -> e2, LinkedHashMap::new)
);
}

/** Calculate base 2 logarithm */
static double log2(double d) {
return Math.log(d) / Math.log(2.0);
}

public String getCountAsString() {
return format.format(getCount()).toString();
}

public String getMinLengthAsString() {
return format.format(getMinLength()).toString();
}

public String getMaxLengthAsString() {
return format.format(getMaxLength()).toString();
}

public String getAvgLengthAsString() {
return format.format(getAvgLength()).toString();
}

public String getEntropyAsString() {
return format.format(getEntropy()).toString();
}

public Object value(String name) {
Metrics metrics = Metrics.valueOf(name);
switch (metrics) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of a nit, but I don't love switching on an enum. If someone later adds a field to the enum, they need to remember to also update this switch. IMHO, a better solution would be to put a method on the enum getFieldValue(InternalStringStats stats) which could then call the appropriate getter and return the value. That way, any new enum value would need to implement the method for it to compile.

case count: return this.count;
case min_length: return this.minLength;
case max_length: return this.maxLength;
case avg_length: return this.getAvgLength();
case entropy: return this.getEntropy();
default:
throw new IllegalArgumentException("Unknown value [" + name + "] in string stats aggregation");
}
}

@Override
public InternalStringStats doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long count = 0;
long totalLength = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = Integer.MIN_VALUE;
Map<String, Long> occurs = new HashMap<>();

for (InternalAggregation aggregation : aggregations) {
InternalStringStats stats = (InternalStringStats) aggregation;
count += stats.getCount();
minLength = Math.min(minLength, stats.getMinLength());
maxLength = Math.max(maxLength, stats.getMaxLength());
totalLength += stats.totalLength;
stats.charOccurrences.forEach((k, v) ->
occurs.merge(k, v, (oldValue, newValue) -> oldValue + newValue)
);
}

return new InternalStringStats(name, count, totalLength, minLength, maxLength, occurs,
showDistribution, format, pipelineAggregators(), getMetaData());
}

@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
return this;
} else if (path.size() == 1) {
return value(path.get(0));
} else {
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
}

static class Fields {
public static final ParseField COUNT = new ParseField("count");
public static final ParseField MIN_LENGTH = new ParseField("min_length");
public static final ParseField MIN_LENGTH_AS_STRING = new ParseField("min_length_as_string");
public static final ParseField MAX_LENGTH = new ParseField("max_length");
public static final ParseField MAX_LENGTH_AS_STRING = new ParseField("max_as_string");
public static final ParseField AVG_LENGTH = new ParseField("avg_length");
public static final ParseField AVG_LENGTH_AS_STRING = new ParseField("avg_length_as_string");
public static final ParseField ENTROPY = new ParseField("entropy");
public static final ParseField ENTROPY_AS_STRING = new ParseField("entropy_string");
public static final ParseField DISTRIBUTION = new ParseField("distribution");
public static final ParseField DISTRIBUTION_AS_STRING = new ParseField("distribution_string");
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.COUNT.getPreferredName(), count);
if (count > 0) {
builder.field(Fields.MIN_LENGTH.getPreferredName(), minLength);
builder.field(Fields.MAX_LENGTH.getPreferredName(), maxLength);
builder.field(Fields.AVG_LENGTH.getPreferredName(), getAvgLength());
builder.field(Fields.ENTROPY.getPreferredName(), getEntropy());
if (showDistribution == true) {
builder.field(Fields.DISTRIBUTION.getPreferredName(), getDistribution());
}
if (format != DocValueFormat.RAW) {
builder.field(Fields.MIN_LENGTH_AS_STRING.getPreferredName(), format.format(getMinLength()));
builder.field(Fields.MAX_LENGTH_AS_STRING.getPreferredName(), format.format(getMaxLength()));
builder.field(Fields.AVG_LENGTH_AS_STRING.getPreferredName(), format.format(getAvgLength()));
builder.field(Fields.ENTROPY_AS_STRING.getPreferredName(), format.format(getEntropy()));
if (showDistribution == true) {
builder.startObject(Fields.DISTRIBUTION_AS_STRING.getPreferredName());
for (Map.Entry<String, Double> e: getDistribution().entrySet()) {
builder.field(e.getKey(), format.format(e.getValue()).toString());
}
builder.endObject();
}
}
} else {
builder.nullField(Fields.MIN_LENGTH.getPreferredName());
builder.nullField(Fields.MAX_LENGTH.getPreferredName());
builder.nullField(Fields.AVG_LENGTH.getPreferredName());
builder.field(Fields.ENTROPY.getPreferredName(), 0.0);

if (showDistribution == true) {
builder.nullField(Fields.DISTRIBUTION.getPreferredName());
}
}
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), count, minLength, maxLength, totalLength, charOccurrences, showDistribution);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;

InternalStringStats other = (InternalStringStats) obj;
return count == other.count &&
minLength == other.minLength &&
maxLength == other.maxLength &&
totalLength == other.totalLength &&
showDistribution == other.showDistribution;
}
}
Loading