-
Notifications
You must be signed in to change notification settings - Fork 532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternative aggregate functions to calculate histogram values. #475
Alternative aggregate functions to calculate histogram values. #475
Conversation
@rdsharma26, can you please take a look? Thanks. |
@akalotkin Will be having a look shortly. In the meantime, could you fix the build failure? |
@@ -43,7 +44,9 @@ case class Histogram( | |||
binningUdf: Option[UserDefinedFunction] = None, | |||
maxDetailBins: Integer = Histogram.MaximumAllowedDetailBins, | |||
where: Option[String] = None, | |||
computeFrequenciesAsRatio: Boolean = true) | |||
computeFrequenciesAsRatio: Boolean = true, | |||
aggregateFunction: String = Histogram.Count, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this an enum? In Scala 2, we can use a sealed trait
and case object
to create enumerations.
computeFrequenciesAsRatio: Boolean = true) | ||
computeFrequenciesAsRatio: Boolean = true, | ||
aggregateFunction: String = Histogram.Count, | ||
aggregateColumn: Option[String] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used when the aggregate function is sum. As a caller of this API, it is possible that this value is not provided when using sum
. Is it possible to group this configuration into a single object?
Check this parameter for reference: ef4c308#diff-4db9bbf900d2a35e16c36a158cbf3c4641d336e1930e6f343f2fef5773d0cea6R95
aggregateFunction match { | ||
case Histogram.Count => countQuery(data) | ||
case Histogram.Sum => sumQuery(data) | ||
case _ => throw new IllegalStateException() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use an enumeration, then we won't need a default case.
data.select(col(column).cast(StringType)) | ||
.na.fill(Histogram.NullFieldReplacement) | ||
.groupBy(column).count().withColumnRenamed("count", Analyzers.COUNT_COL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the same formatting as the one on lines 66-73 originally?
data.select(col(column).cast(StringType), col(aggregateColumn.get).cast(LongType)) | ||
.na.fill(Histogram.NullFieldReplacement) | ||
.groupBy(column).sum(aggregateColumn.get).withColumnRenamed("count", Analyzers.COUNT_COL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the same formatting as the one on lines 66-73 originally?
@rdsharma26, can you take a look at the change, please? |
def createAggregateFunction(function: String, aggregateColumn: String): AggregateFunction = { | ||
function match { | ||
case Histogram.count_function => Count | ||
case Histogram.sum_function => Sum(aggregateColumn) | ||
case _ => throw new IllegalArgumentException("Wrong aggregate function name: " + function) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this function and let the caller of the function provide a valid instance of AggregateFunction
? The reason I say this is because if we add more subclasses of AggregateFunction
in the future, we will need to update this createAggregateFunction
to accept the superset of ALL parameters for all subtypes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is used to deserialize object from json. I can move it to AnalysisResultSerde, but there must be a piece of code that deserializes all combinations from json.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I think it is fine then to keep it outside of Histogram
and in AnalysisResultSerde
.
// case class Min(aggColumn: String) extends AggregateFunction { | ||
// override def query(column: String, data: DataFrame): DataFrame = { | ||
// data | ||
// .select(col(column).cast(StringType), col(aggColumn).cast(LongType)) | ||
// .na.fill(Histogram.NullFieldReplacement) | ||
// .groupBy(column) | ||
// .min(aggColumn) | ||
// .withColumnRenamed("count", Analyzers.COUNT_COL) | ||
// } | ||
// | ||
// override def aggregateColumn(): Option[String] = { | ||
// Some(aggColumn) | ||
// } | ||
// | ||
// override def function(): String = min | ||
// } | ||
// | ||
// case class Max(aggColumn: String) extends AggregateFunction { | ||
// override def query(column: String, data: DataFrame): DataFrame = { | ||
// data | ||
// .select(col(column).cast(StringType), col(aggColumn).cast(LongType)) | ||
// .na.fill(Histogram.NullFieldReplacement) | ||
// .groupBy(column) | ||
// .max(aggColumn) | ||
// .withColumnRenamed("count", Analyzers.COUNT_COL) | ||
// } | ||
// | ||
// override def aggregateColumn(): Option[String] = { | ||
// Some(aggColumn) | ||
// } | ||
// | ||
// override def function(): String = max | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either we remove the commented out code, or we keep it as code and add tests. Otherwise, it adds bloat to the source code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess, I just forgot to remove this code. Will do
@@ -302,6 +302,10 @@ private[deequ] object AnalyzerSerializer | |||
result.addProperty(ANALYZER_NAME_FIELD, "Histogram") | |||
result.addProperty(COLUMN_FIELD, histogram.column) | |||
result.addProperty("maxDetailBins", histogram.maxDetailBins) | |||
if (histogram.aggregateFunction != Histogram.Count) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment here that:
- the reason we are excluding
Count
since it is the current default - It was the only previously supported function in the past, and we exclude it for backwards compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add comments.
|{"entity":"Dataset","instance":"*","name":"Size (where: att2 == 'd')","value":1.0}, | ||
|{"entity":"Dataset","instance":"*","name":"Size","value":4.0}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change required? It is not clear from the changes that the order of the fields in the serialization process is changing. If it is not, can we remove this diff to make the PR clearer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is required. This is the reason a test was failing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information. Is it possible to understand why it is required? The reason I ask is because of backwards compatibility. Will a serialized object that has the older order fail to get deserialized because of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to dig deeper to explain why it happens. My guess is that there is some kind of HashMap, HashSet that changes order of response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That's what happens. Below are screenshots from 2 different versions (breakpoint is on this line
saveOrAppendResultsIfNecessary( |
As you can see after map union it gives two different results. And code here (
analyzerContext.metricMap |
@rdsharma26, all done. |
Thanks for the PR! @akalotkin |
* Alternative aggregate functions to calculate histogram values. * Reorder expected json * Alternative aggregate functions to calculate histogram values. * Alternative aggregate functions to calculate histogram values * Alternative aggregate functions to calculate histogram values --------- Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
* Alternative aggregate functions to calculate histogram values. * Reorder expected json * Alternative aggregate functions to calculate histogram values. * Alternative aggregate functions to calculate histogram values * Alternative aggregate functions to calculate histogram values --------- Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
* Alternative aggregate functions to calculate histogram values. * Reorder expected json * Alternative aggregate functions to calculate histogram values. * Alternative aggregate functions to calculate histogram values * Alternative aggregate functions to calculate histogram values --------- Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
* Alternative aggregate functions to calculate histogram values. * Reorder expected json * Alternative aggregate functions to calculate histogram values. * Alternative aggregate functions to calculate histogram values * Alternative aggregate functions to calculate histogram values --------- Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
Issue #, if available:
NA. Kind of close to #381
Description of changes:
In addition to
provide a way get histogram values
WIP. Please take a look at the changes. I'll add more tests, docs and validation logic if you think that these changes make sense.
Thanks.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.