Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative aggregate functions to calculate histogram values. #475

Merged
merged 6 commits into from
Jul 7, 2023

Conversation

akalotkin
Copy link
Contributor

@akalotkin akalotkin commented Apr 29, 2023

Issue #, if available:
NA. Kind of close to #381

Description of changes:

  • Histogram analyzer changes
    • Provide a way to calculate histogram values using different aggregate functions (sum).
    • Add aggregateColumn - a column castable to Long. Aggregate function is applied to this column
  • Histogram serialization changes
    • Add aggregateFunction property ("count" is default in case of missing property)
    • Add aggregateColumn property

In addition to

select(col(column).cast(StringType)).groupBy(column).count()

provide a way get histogram values

select(col(column).cast(StringType), col(aggregateColumn.get).cast(LongType)).groupBy(column).sum(aggregateColumn.get)

WIP. Please take a look at the changes. I'll add more tests, docs and validation logic if you think that these changes make sense.

Thanks.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@akalotkin
Copy link
Contributor Author

@rdsharma26, can you please take a look? Thanks.

@rdsharma26
Copy link
Contributor

@akalotkin Will be having a look shortly. In the meantime, could you fix the build failure?

@@ -43,7 +44,9 @@ case class Histogram(
binningUdf: Option[UserDefinedFunction] = None,
maxDetailBins: Integer = Histogram.MaximumAllowedDetailBins,
where: Option[String] = None,
computeFrequenciesAsRatio: Boolean = true)
computeFrequenciesAsRatio: Boolean = true,
aggregateFunction: String = Histogram.Count,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this an enum? In Scala 2, we can use a sealed trait and case object to create enumerations.

computeFrequenciesAsRatio: Boolean = true)
computeFrequenciesAsRatio: Boolean = true,
aggregateFunction: String = Histogram.Count,
aggregateColumn: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used when the aggregate function is sum. As a caller of this API, it is possible that this value is not provided when using sum. Is it possible to group this configuration into a single object?

Check this parameter for reference: ef4c308#diff-4db9bbf900d2a35e16c36a158cbf3c4641d336e1930e6f343f2fef5773d0cea6R95

aggregateFunction match {
case Histogram.Count => countQuery(data)
case Histogram.Sum => sumQuery(data)
case _ => throw new IllegalStateException()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use an enumeration, then we won't need a default case.

Comment on lines 137 to 139
data.select(col(column).cast(StringType))
.na.fill(Histogram.NullFieldReplacement)
.groupBy(column).count().withColumnRenamed("count", Analyzers.COUNT_COL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the same formatting as the one on lines 66-73 originally?

Comment on lines 143 to 145
data.select(col(column).cast(StringType), col(aggregateColumn.get).cast(LongType))
.na.fill(Histogram.NullFieldReplacement)
.groupBy(column).sum(aggregateColumn.get).withColumnRenamed("count", Analyzers.COUNT_COL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the same formatting as the one on lines 66-73 originally?

@akalotkin akalotkin marked this pull request as ready for review June 12, 2023 21:25
@akalotkin
Copy link
Contributor Author

@rdsharma26, can you take a look at the change, please?

Comment on lines 142 to 148
def createAggregateFunction(function: String, aggregateColumn: String): AggregateFunction = {
function match {
case Histogram.count_function => Count
case Histogram.sum_function => Sum(aggregateColumn)
case _ => throw new IllegalArgumentException("Wrong aggregate function name: " + function)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this function and let the caller of the function provide a valid instance of AggregateFunction ? The reason I say this is because if we add more subclasses of AggregateFunction in the future, we will need to update this createAggregateFunction to accept the superset of ALL parameters for all subtypes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is used to deserialize object from json. I can move it to AnalysisResultSerde, but there must be a piece of code that deserializes all combinations from json.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I think it is fine then to keep it outside of Histogram and in AnalysisResultSerde.

Comment on lines 200 to 232
// case class Min(aggColumn: String) extends AggregateFunction {
// override def query(column: String, data: DataFrame): DataFrame = {
// data
// .select(col(column).cast(StringType), col(aggColumn).cast(LongType))
// .na.fill(Histogram.NullFieldReplacement)
// .groupBy(column)
// .min(aggColumn)
// .withColumnRenamed("count", Analyzers.COUNT_COL)
// }
//
// override def aggregateColumn(): Option[String] = {
// Some(aggColumn)
// }
//
// override def function(): String = min
// }
//
// case class Max(aggColumn: String) extends AggregateFunction {
// override def query(column: String, data: DataFrame): DataFrame = {
// data
// .select(col(column).cast(StringType), col(aggColumn).cast(LongType))
// .na.fill(Histogram.NullFieldReplacement)
// .groupBy(column)
// .max(aggColumn)
// .withColumnRenamed("count", Analyzers.COUNT_COL)
// }
//
// override def aggregateColumn(): Option[String] = {
// Some(aggColumn)
// }
//
// override def function(): String = max
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either we remove the commented out code, or we keep it as code and add tests. Otherwise, it adds bloat to the source code.

Copy link
Contributor Author

@akalotkin akalotkin Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, I just forgot to remove this code. Will do

@@ -302,6 +302,10 @@ private[deequ] object AnalyzerSerializer
result.addProperty(ANALYZER_NAME_FIELD, "Histogram")
result.addProperty(COLUMN_FIELD, histogram.column)
result.addProperty("maxDetailBins", histogram.maxDetailBins)
if (histogram.aggregateFunction != Histogram.Count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here that:

  • the reason we are excluding Count since it is the current default
  • It was the only previously supported function in the past, and we exclude it for backwards compatibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add comments.

Comment on lines +89 to +90
|{"entity":"Dataset","instance":"*","name":"Size (where: att2 == 'd')","value":1.0},
|{"entity":"Dataset","instance":"*","name":"Size","value":4.0},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change required? It is not clear from the changes that the order of the fields in the serialization process is changing. If it is not, can we remove this diff to make the PR clearer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is required. This is the reason a test was failing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information. Is it possible to understand why it is required? The reason I ask is because of backwards compatibility. Will a serialized object that has the older order fail to get deserialized because of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to dig deeper to explain why it happens. My guess is that there is some kind of HashMap, HashSet that changes order of response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's what happens. Below are screenshots from 2 different versions (breakpoint is on this line

)

As you can see after map union it gives two different results. And code here (

) iterates through the HashMap.

Master branch
master
feature/histogram-aggregate branch
feature:histogram-aggregate

@akalotkin
Copy link
Contributor Author

@rdsharma26, all done.

@rdsharma26
Copy link
Contributor

Thanks for the PR! @akalotkin

@rdsharma26 rdsharma26 merged commit d3dbe2d into awslabs:master Jul 7, 2023
1 check passed
eycho-am pushed a commit that referenced this pull request Aug 7, 2023
* Alternative aggregate functions to calculate histogram values.

* Reorder expected json

* Alternative aggregate functions to calculate histogram values.

* Alternative aggregate functions to calculate histogram values

* Alternative aggregate functions to calculate histogram values

---------

Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
rdsharma26 pushed a commit that referenced this pull request Apr 16, 2024
* Alternative aggregate functions to calculate histogram values.

* Reorder expected json

* Alternative aggregate functions to calculate histogram values.

* Alternative aggregate functions to calculate histogram values

* Alternative aggregate functions to calculate histogram values

---------

Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
rdsharma26 pushed a commit that referenced this pull request Apr 16, 2024
* Alternative aggregate functions to calculate histogram values.

* Reorder expected json

* Alternative aggregate functions to calculate histogram values.

* Alternative aggregate functions to calculate histogram values

* Alternative aggregate functions to calculate histogram values

---------

Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
rdsharma26 pushed a commit that referenced this pull request Apr 16, 2024
* Alternative aggregate functions to calculate histogram values.

* Reorder expected json

* Alternative aggregate functions to calculate histogram values.

* Alternative aggregate functions to calculate histogram values

* Alternative aggregate functions to calculate histogram values

---------

Co-authored-by: Aliaksei Kalotkin <aliaksei.kalotkin@nielsen.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants