Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CustomAggregator #572

Merged
merged 7 commits into from
Jul 31, 2024
Merged

CustomAggregator #572

merged 7 commits into from
Jul 31, 2024

Conversation

joshuazexter
Copy link
Contributor

@joshuazexter joshuazexter commented Jul 29, 2024

This pull request introduces the CustomAggregator, a tool designed for dynamic data aggregation based on user-specified conditions within Apache Spark DataFrames. This addition can preform customized metric calculations and aggregations, making it applicable where conditional data aggregation is required.

Core Features:

  • Custom Aggregation Logic: Users can pass a lambda function that specifies how data should be aggregated. This function is applied to a DataFrame to compute a state representing the aggregation result.
  • Generic Metric Computation: Post aggregation, the analyzer computes metrics from the aggregated data state

How It Can Be Used:
To use the CustomAggregator, developers will need to:

  • Define a lambda function that describes the aggregation logic specific to their data and requirements on a specific column.
  • Instantiate the analyzer with this function, specifying the relevant metric names and instances.
  • Apply the analyzer to a DataFrame within a Spark session to compute and retrieve metrics.

Usage Examples:
Included in the pull request are unit tests that demonstrate potential use cases:

Content Engagement Metrics:

  • Scenario Description: A media company wants to assess how different types of content perform across various social media platforms to guide content strategy and investment.
  • Data: Assume the company has data in the form of a DataFrame that includes columns for content_type, platform, views, likes, and shares.
  • Analysis Logic: The company uses the CustomAggregator to aggregate engagement metrics (views, likes, shares) for each content type across platforms.
  • Implementation Example:
val contentEngagementLambda: DataFrame => AggregatedMetricState = df => {
  val counts = df
    .groupBy("content_type")
    .agg(
      (sum("views") + sum("likes") + sum("shares")).cast("int").alias("totalEngagements")
    )
    .collect()
    .map(row =>
      row.getString(0) -> row.getInt(1)
    )
    .toMap
  val totalEngagements = counts.values.sum
  AggregatedMetricState(counts, totalEngagements)
}

val analyzer = CustomAggregator(contentEngagementLambda, "ContentEngagement", "AllPlatforms")

val data = session.read.format("csv").option("header", "true").load("path_to_data_file")
val state = analyzer.computeStateFrom(data)
val metric = analyzer.computeMetricFrom(state)

println("Content Engagement Metrics: " + metric.value.get)
//  Content Engagement Metrics: Map(Video -> 0.81, Article -> 0.18)

Resource Utilization in Cloud Services:

  • Scenario Description: An IT administrator needs to monitor and analyze resource utilization across different cloud services to ensure efficient usage and cost management.
  • Data: The organization collects usage data for each cloud service, including CPU hours, memory GBs used, and storage GBs used, stored in a DataFrame.
  • Analysis Logic: The analyzer is used to aggregate and compute the total and percentage utilization of each resource type across services.
  • Implementation Example:
val resourceUtilizationLambda: DataFrame => AggregatedMetricState = df => {
  val totalResources = df.groupBy("service_type")
    .agg(
      ((sum("cpu_hours") + sum("memory_gbs") + sum("storage_gbs")).cast("int") / df.count()).alias("percentageResources")
    )
    .collect()
    .map(row =>
      row.getString(0) -> row.getDouble(1)
    )
    .toMap
  val totalSum = totalResources.values.sum
  AggregatedMetricState(resourceUtilizationLambda, totalSum.toInt)
}

val analyzer = CustomAggregator(resourceUtilizationLambda, "ResourceUtilization", "CloudServices")

val data = session.read.format("csv").option("header", "true").load("path_to_usage_data_file")
val state = analyzer.computeStateFrom(data)
val metric = analyzer.computeMetricFrom(state)

println("Resource Utilization Metrics: " + metric.value.get)
//  Resource Utilization Metrics: Map(Compute -> 0.51, Database -> 0.27, Storage -> 0.21)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@joshuazexter joshuazexter changed the title Entity types ConditionalAggregationAnalyzer Jul 29, 2024
@rdsharma26
Copy link
Contributor

Can we add a unit test that shows the usage of this analyzer along with other analyzers? See ColumnProfilerRunner and this readme

instance: String)
extends Analyzer[AggregatedMetricState, AttributeDoubleMetric] {

def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the override keyword here and in front of computeMetricFrom?

@rdsharma26
Copy link
Contributor

Great PR description! Can you also add the output of the println statements ?

// Define the analyzer
case class ConditionalAggregationAnalyzer(aggregatorFunc: DataFrame => AggregatedMetricState,
metricName: String,
instance: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are running the aggregator on the entire dataframe, we can probably use Dataset for the instance (like how we do in other analyzers like rowcount). That way, we do not need to ask for this parameter from the user. We should keep the public facing API as simple as possible.

@joshuazexter joshuazexter changed the title ConditionalAggregationAnalyzer CustomAggregator Jul 29, 2024
Copy link
Contributor

@eycho-am eycho-am left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR on both the implementation and description

@eycho-am eycho-am merged commit d45db61 into awslabs:master Jul 31, 2024
1 check passed
@joshuazexter joshuazexter deleted the entityTypes branch July 31, 2024 21:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants