Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Schema Diff Error Message #160

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Expected DataFrame Row Count: '$expectedCount'
/**
* Raises an error unless `actualDS` and `expectedDS` are equal
*/
def assertSmallDatasetEquality[T](
def assertSmallDatasetEquality[T: ClassTag](
actualDS: Dataset[T],
expectedDS: Dataset[T],
ignoreNullable: Boolean = false,
Expand All @@ -53,7 +53,7 @@ Expected DataFrame Row Count: '$expectedCount'
assertSmallDatasetContentEquality(actual, expectedDS, orderedComparison, truncate, equals)
}

def assertSmallDatasetContentEquality[T](
def assertSmallDatasetContentEquality[T: ClassTag](
actualDS: Dataset[T],
expectedDS: Dataset[T],
orderedComparison: Boolean,
Expand All @@ -66,12 +66,12 @@ Expected DataFrame Row Count: '$expectedCount'
assertSmallDatasetContentEquality(defaultSortDataset(actualDS), defaultSortDataset(expectedDS), truncate, equals)
}

def assertSmallDatasetContentEquality[T](actualDS: Dataset[T], expectedDS: Dataset[T], truncate: Int, equals: (T, T) => Boolean): Unit = {
def assertSmallDatasetContentEquality[T: ClassTag](actualDS: Dataset[T], expectedDS: Dataset[T], truncate: Int, equals: (T, T) => Boolean): Unit = {
val a = actualDS.collect().toSeq
val e = expectedDS.collect().toSeq
if (!a.approximateSameElements(e, equals)) {
val arr = ("Actual Content", "Expected Content")
Comment on lines 70 to 73
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont see the improvement, any Dataset[T] can be transformed into a DataFrame, the only change here is that it will work with the original Class T` that was provided, but the result will be the same, only with the Class name prefixing each line. If the only thing was to prevent the .asRows method we could have done this and not any other change will be needed.

Suggested change
val a = actualDS.collect().toSeq
val e = expectedDS.collect().toSeq
if (!a.approximateSameElements(e, equals)) {
val arr = ("Actual Content", "Expected Content")
val a = actualDS.toDF.collect() //now its a Row
val e = expectedDS.toDF.collect()
if (!a.approximateSameElements(e, equals)) {
val arr = ("Actual Content", "Expected Content")
val msg = "Diffs\n" ++ DataframeUtil.showDataframeDiff(arr, a, e, truncate)

I see this as more unified and we wont have to check if its a Product or not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShowProductDiff is an update/extension to the original showDataframeDiff so we can have a nicely format table display for any Case Class and additionally Row. This was done so we have nicely format table for StructField

And since showProductDiff can now also display differences for any Case Class I figured we no longer need to convert a Dataset[T] into a Dataframe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these changes could be simplified by only adding a single parameter in the header because at the end the processing doesn't care if it's a row or a case class, it only requires the list of elements:

private[mrpowers] def showDataframeDiff(
      header: (String, String),
      actual: Seq[Row],
      expected: Seq[Row],
      truncate: Int = 20,
      minColWidth: Int = 3,
     className = Option[String]
  ): String = {
   val (className, lBracket, rBracket)  = className match { 
     case None => ("", "[", "]") 
     case Some(cn) (cn, "(", ")")
    }

Use these 3 elements to make the representation, and when you call this function, pass the class name if it's not a Row.
I don't see any other benefit of getting the element as T because the comparison is not done here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we also use it for Seq[StructField] for displaying schema fields won't we have to make actual and expected Seq[Any] ?

private[mrpowers] def showDataframeDiff(
      header: (String, String),
      actual: Seq[Any],
      expected: Seq[Any],
      truncate: Int = 20,
      minColWidth: Int = 3,
     className = Option[String]
  ): String = {
   val (className, lBracket, rBracket)  = className match { 
     case None => ("", "[", "]") 
     case Some(cn) (cn, "(", ")")
    }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def assertSmallDatasetContentEquality[T: ClassTag](actualDS: Dataset[T], expectedDS: Dataset[T], truncate: Int, equals: (T, T) => Boolean): Unit = {
    val a = actualDS.collect().toSeq
    val e = expectedDS.collect().toSeq
    if (!a.approximateSameElements(e, equals)) {
      val arr = ("Actual Content", "Expected Content")
      val runTimeClass                     = implicitly[ClassTag[T]].runtimeClass
      val prefix = if (runTimeClass == classOf[Row]) None else Some(runTimeClass.getSimpleName)
      val msg = "Diffs\n" ++ DataframeUtil.showDataframeDiff(arr, a.asRows, e.asRows, truncate, prefix)
      throw DatasetContentMismatch(msg)
    }
  }

If you don't want to do this here, move it inside showDataframeDiff, you can put the type parameter but you ar not forced to use it. But I see it weird to have the signature of your function as:

private[mrpowers] def showDataframeDiff[T:ClassTag](
      header: (String, String),
      actual: Seq[Any],
      expected: Seq[Any],
      truncate: Int = 20,
      minColWidth: Int = 3
  ): String

Copy link
Collaborator Author

@zeotuan zeotuan Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer putting it in showProductDiff since it will also need to be on betterSchemaMismatchMessage. Do you think we should still use Seq[T]? I think that would make it at least safer for the input Type. ofcourse unless someone decided to use Any

Copy link
Collaborator

@alfonsorr alfonsorr Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok to change the name, it's called from a dataset method. To keep the [T], this is not a public method, it carries the type and the compiler will validate the schema of the actual and expected dataframe. The only problem is if the conversion to DataSet[T] is not correct, e.g. :

assertSmallDataset(spark.table("aTable").as[MyType], expected)

If the table doesn't match, the error will be raised with an exception in the .as[T] method in runtime.
Twhy reason I say if we detect a non-Row DataSet, to skschema validation schema because it's already done.

I see it simpler if you know the exact type of the data as a Row only, and you only have one way to work with it. If you still think this could be good I would suggest first making more tests, not only with products but also with datasets of only one type like DataSet[String] DataSet[Int] DataSet[Timestamp] DataSet[Array[Int]]. These types are not products and in some cases, I don't know how will it work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I forgot that people can total do Dataset[Array[Int]]. I did some test, and it support single value type correctly but not Array[Int].
Actually, currently our main branch also has issue with Dataset[Seq[Int]]. let's me try fixing that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alfonsorr I fixed issue with Dataset[Array[Int]] and also add test cases for String, Int, Array, Seq cases

val msg = "Diffs\n" ++ DataframeUtil.showDataframeDiff(arr, a.asRows, e.asRows, truncate)
val msg = "Diffs\n" ++ ProductUtil.showProductDiff[T](arr, a, e, truncate)
throw DatasetContentMismatch(msg)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,49 @@
package com.github.mrpowers.spark.fast.tests

import com.github.mrpowers.spark.fast.tests.ufansi.Color.{DarkGray, Green, Red}
import com.github.mrpowers.spark.fast.tests.ufansi.FansiExtensions.StrOps
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.Row
import com.github.mrpowers.spark.fast.tests.ufansi.FansiExtensions.StrOps
object DataframeUtil {

private[mrpowers] def showDataframeDiff(
import scala.reflect.ClassTag

object ProductUtil {
private[mrpowers] def productOrRowToSeq(product: Any): Seq[Any] = {
product match {
case r: Row => r.toSeq
case p: Product => p.productIterator.toSeq
case null => Seq.empty
case s => Seq(s)
}
}
private[mrpowers] def showProductDiff[T: ClassTag](
zeotuan marked this conversation as resolved.
Show resolved Hide resolved
header: (String, String),
actual: Seq[Row],
expected: Seq[Row],
actual: Seq[T],
expected: Seq[T],
truncate: Int = 20,
minColWidth: Int = 3
minColWidth: Int = 3,
defaultVal: T = null.asInstanceOf[T]
): String = {

val runTimeClass = implicitly[ClassTag[T]].runtimeClass
val (className, lBracket, rBracket) = if (runTimeClass == classOf[Row]) ("", "[", "]") else (runTimeClass.getSimpleName, "(", ")")
val prodToString: Seq[Any] => String = s => s.mkString(s"$className$lBracket", ",", rBracket)
val emptyProd = "MISSING"

val sb = new StringBuilder

val fullJoin = actual.zipAll(expected, Row(), Row())
val fullJoin = actual.zipAll(expected, defaultVal, defaultVal)
zeotuan marked this conversation as resolved.
Show resolved Hide resolved

val diff = fullJoin.map { case (actualRow, expectedRow) =>
if (equals(actualRow, expectedRow)) {
if (actualRow == expectedRow) {
List(DarkGray(actualRow.toString), DarkGray(expectedRow.toString))
} else {
val actualSeq = actualRow.toSeq
val expectedSeq = expectedRow.toSeq
val actualSeq = productOrRowToSeq(actualRow)
val expectedSeq = productOrRowToSeq(expectedRow)
if (actualSeq.isEmpty)
List(
Red("[]"),
Green(expectedSeq.mkString("[", ",", "]"))
)
List(Red(emptyProd), Green(prodToString(expectedSeq)))
else if (expectedSeq.isEmpty)
List(Red(actualSeq.mkString("[", ",", "]")), Green("[]"))
List(Red(prodToString(actualSeq)), Green(emptyProd))
else {
val withEquals = actualSeq
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest skipping this part if the Type is not Row, if it´s T the schema will be compared in compile time and if the type T is equal to the DataSet, it will fail when we execute .collect() in runtime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought by this point we only work with in memory data - Seq[T] ?
or you mean in case of showProductDiff[Dataset[...]]

.zip(expectedSeq)
Expand All @@ -38,22 +52,18 @@ object DataframeUtil {
}
val allFieldsAreNotEqual = !withEquals.exists(_._3)
if (allFieldsAreNotEqual) {
List(
Red(actualSeq.mkString("[", ",", "]")),
Green(expectedSeq.mkString("[", ",", "]"))
)
List(Red(prodToString(actualSeq)), Green(prodToString(expectedSeq)))
} else {

val coloredDiff = withEquals
.map {
case (actualRowField, expectedRowField, true) =>
(DarkGray(actualRowField.toString), DarkGray(expectedRowField.toString))
case (actualRowField, expectedRowField, false) =>
(Red(actualRowField.toString), Green(expectedRowField.toString))
}
val start = DarkGray("[")
val start = DarkGray(s"$className$lBracket")
val sep = DarkGray(",")
val end = DarkGray("]")
val end = DarkGray(rBracket)
List(
coloredDiff.map(_._1).mkStr(start, sep, end),
coloredDiff.map(_._2).mkStr(start, sep, end)
Expand All @@ -69,11 +79,12 @@ object DataframeUtil {
val colWidths = Array.fill(numCols)(minColWidth)

// Compute the width of each column
for ((cell, i) <- headerSeq.zipWithIndex) {
headerSeq.zipWithIndex.foreach({ case (cell, i) =>
colWidths(i) = math.max(colWidths(i), cell.length)
}
for (row <- diff) {
for ((cell, i) <- row.zipWithIndex) {
})

diff.foreach { row =>
row.zipWithIndex.foreach { case (cell, i) =>
colWidths(i) = math.max(colWidths(i), cell.length)
}
}
Expand Down Expand Up @@ -117,5 +128,4 @@ object DataframeUtil {

sb.toString
}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
package com.github.mrpowers.spark.fast.tests

import com.github.mrpowers.spark.fast.tests.ProductUtil.showProductDiff
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, NullType, StructField, StructType}

object SchemaComparer {

case class DatasetSchemaMismatch(smth: String) extends Exception(smth)
private def betterSchemaMismatchMessage[T](actualDS: Dataset[T], expectedDS: Dataset[T]): String = {
"\nActual Schema Field | Expected Schema Field\n" + actualDS.schema
.zipAll(
expectedDS.schema,
"",
""
)
.map {
case (sf1, sf2) if sf1 == sf2 =>
ufansi.Color.Blue(s"$sf1 | $sf2")
case ("", sf2) =>
ufansi.Color.Red(s"MISSING | $sf2")
case (sf1, "") =>
ufansi.Color.Red(s"$sf1 | MISSING")
case (sf1, sf2) =>
ufansi.Color.Red(s"$sf1 | $sf2")
}
.mkString("\n")
showProductDiff(
("Actual Schema", "Expected Schema"),
actualDS.schema.fields,
expectedDS.schema.fields,
truncate = 200
)
}

def assertSchemaEqual[T](
Expand All @@ -36,7 +25,7 @@ object SchemaComparer {
require((ignoreColumnNames, ignoreColumnOrder) != (true, true), "Cannot set both ignoreColumnNames and ignoreColumnOrder to true.")
if (!SchemaComparer.equals(actualDS.schema, expectedDS.schema, ignoreNullable, ignoreColumnNames, ignoreColumnOrder)) {
throw DatasetSchemaMismatch(
betterSchemaMismatchMessage(actualDS, expectedDS)
"Diffs\n" + betterSchemaMismatchMessage(actualDS, expectedDS)
)
}
}
Expand Down Expand Up @@ -76,5 +65,4 @@ object SchemaComparer {
case _ => dt1 == dt2
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.mrpowers.spark.fast.tests

import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType}
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType}
import SparkSessionExt._
import com.github.mrpowers.spark.fast.tests.SchemaComparer.DatasetSchemaMismatch
import com.github.mrpowers.spark.fast.tests.StringExt.StringOps
Expand Down Expand Up @@ -310,6 +310,41 @@ class DataFrameComparerTest extends AnyFreeSpec with DataFrameComparer with Spar
)
assertLargeDataFrameEquality(sourceDF, expectedDF, ignoreColumnOrder = true)
}

"correctly mark unequal schema field" in {
val sourceDF = spark.createDF(
List(
(1, 2.0),
(5, 3.0)
),
List(
("number", IntegerType, true),
("float", DoubleType, true)
)
)

val expectedDF = spark.createDF(
List(
(1, "word", 1L),
(5, "word", 2L)
),
List(
("number", IntegerType, true),
("word", StringType, true),
("long", LongType, true)
)
)

val e = intercept[DatasetSchemaMismatch] {
assertSmallDataFrameEquality(sourceDF, expectedDF)
}

val colourGroup = e.getMessage.extractColorGroup
val expectedColourGroup = colourGroup.get(Console.GREEN)
val actualColourGroup = colourGroup.get(Console.RED)
assert(expectedColourGroup.contains(Seq("word", "StringType", "StructField(long,LongType,true,{})")))
assert(actualColourGroup.contains(Seq("float", "DoubleType", "MISSING")))
}
}

"assertApproximateDataFrameEquality" - {
Expand Down
Loading