Skip to content

Commit

Permalink
feat: Misc functions resolves #67
Browse files Browse the repository at this point in the history
  • Loading branch information
eruizalo committed Nov 7, 2021
1 parent 972ac29 commit 6a39667
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 57 deletions.
48 changes: 48 additions & 0 deletions core/src/main/scala/doric/syntax/BinaryColumns.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package doric
package syntax

import doric.types.{BinaryType, SparkType}
import org.apache.spark.sql.{functions => f}

private[syntax] trait BinaryColumns {

implicit class BinaryOperationsSyntax[T: BinaryType: SparkType](
column: DoricColumn[T]
) {

/**
* Calculates the MD5 digest of a binary column and returns the value
* as a 32 character hex string.
*
* @group All Types
*/
def md5: StringColumn = column.elem.map(f.md5).toDC

/**
* Calculates the SHA-1 digest of a binary column and returns the value
* as a 40 character hex string.
*
* @group All Types
*/
def sha1: StringColumn = column.elem.map(f.sha1).toDC

/**
* Calculates the SHA-2 family of hash functions of a binary column and
* returns the value as a hex string.
*
* @group All Types
*/
def sha2(numBits: Int): StringColumn =
column.elem.map(x => f.sha2(x, numBits)).toDC

/**
* Calculates the cyclic redundancy check value (CRC32) of a binary column and
* returns the value as a long column.
*
* @group All Types
*/
def crc32: LongColumn =
column.elem.map(f.crc32).toDC
}

}
25 changes: 23 additions & 2 deletions core/src/main/scala/doric/syntax/BooleanColumns.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package doric
package syntax

import cats.implicits._
import doric.DoricColumn.sparkFunction
import org.apache.spark.sql.{functions => f}

private[syntax] trait BooleanColumns {

Expand All @@ -14,30 +16,49 @@ private[syntax] trait BooleanColumns {

/**
* Boolean AND
*
* @group Boolean Type
*/
def and(other: DoricColumn[Boolean]): DoricColumn[Boolean] =
sparkFunction(column, other, _ && _)

/**
* Boolean AND
*
* @group Boolean Type
*/
def &&(other: DoricColumn[Boolean]): DoricColumn[Boolean] =
sparkFunction(column, other, _ && _)
and(other)

/**
* Boolean OR
*
* @group Boolean Type
*/
def or(other: DoricColumn[Boolean]): DoricColumn[Boolean] =
sparkFunction(column, other, _ || _)

/**
* Boolean OR
*
* @group Boolean Type
*/
def ||(other: DoricColumn[Boolean]): DoricColumn[Boolean] =
sparkFunction(column, other, _ || _)
or(other)

/**
* Returns null if the condition is true, and throws an exception otherwise.
*
* @group Boolean Type
*/
def assertTrue: NullColumn = column.elem.map(f.assert_true).toDC

/**
* Returns null if the condition is true; throws an exception with the error message otherwise.
*
* @group Boolean Type
*/
def assertTrue(msg: StringColumn): NullColumn =
(column.elem, msg.elem).mapN(f.assert_true).toDC
}
}
15 changes: 15 additions & 0 deletions core/src/main/scala/doric/syntax/ColGetters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ private[doric] trait ColGetters[F[_]] {
): F[Double] =
col[Double](colName)

/**
* Retrieves a column with the provided name expecting it to be of double type.
*
* @param colName
* the name of the column to find.
* @param location
* error location.
* @return
* the long column reference
*/
def colBoolean(colName: CName)(implicit
location: Location
): F[Boolean] =
col[Boolean](colName)

/**
* Retrieves a column with the provided name expecting it to be of instant type.
*
Expand Down
22 changes: 20 additions & 2 deletions core/src/main/scala/doric/syntax/CommonColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,32 @@ private[syntax] trait CommonColumns extends ColGetters[NamedDoricColumn] {
* is null and b is not null, or c if both a and b are null but c is not
* null.
*
* @group All Types
* @param cols
* the String DoricColumns to coalesce
* the DoricColumns to coalesce
* @return
* the first column that is not null, or null if all inputs are null.
*/
def coalesce[T](cols: DoricColumn[T]*): DoricColumn[T] =
cols.map(_.elem).toList.sequence.map(f.coalesce(_: _*)).toDC

/**
* Calculates the hash code of given columns, and returns the result as an integer column.
*
* @group All Types
*/
def hash(cols: DoricColumn[_]*): IntegerColumn =
cols.map(_.elem).toList.sequence.map(f.hash(_: _*)).toDC

/**
* Calculates the hash code of given columns using the 64-bit
* variant of the xxHash algorithm, and returns the result as a long column.
*
* @group All Types
*/
def xxhash64(cols: DoricColumn[_]*): LongColumn =
cols.map(_.elem).toList.sequence.map(f.xxhash64(_: _*)).toDC

override protected def constructSide[T](
column: Doric[Column],
colName: CName
Expand All @@ -49,7 +67,7 @@ private[syntax] trait CommonColumns extends ColGetters[NamedDoricColumn] {
* @tparam T
* The expected type that should have the column.
* @return
* A DoricColumn referece of the provided type T
* A DoricColumn reference of the provided type T
*/
@inline def asDoric[T: SparkType](implicit
location: Location
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/doric/syntax/StringColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,5 +489,18 @@ private[syntax] trait StringColumns {
new Column(new ParseToTimestamp(str.expr, tsFormat.expr))
)
.toDC

/**
* ********************************************************
* MISC FUNCTIONS
* ********************************************************
*/

/**
* Throws an exception with the provided error message.
*
* @group String Type
*/
def raiseError: StringColumn = s.elem.map(f.raise_error).toDC
}
}
142 changes: 89 additions & 53 deletions core/src/test/scala/doric/TypedColumnTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,77 @@ package doric

import scala.reflect._
import scala.reflect.runtime.universe._

import doric.types.{Casting, SparkType}
import org.scalactic._
import org.scalatest.matchers.should.Matchers
import org.apache.spark.sql.{Column, DataFrame, Encoder, functions => f}
import org.apache.spark.sql.types._

import org.apache.spark.sql.{Column, DataFrame, Encoder}
import org.apache.spark.sql.types.DataType
import doric.implicitConversions.stringCname

trait TypedColumnTest extends Matchers {

implicit class ValidateColumnType(df: DataFrame) {
private lazy val doricCol = "dcol".cname
private lazy val sparkCol = "scol".cname

/**
* Compare two columns (doric & spark).
* If `expected` is defined is also compared
*
* @param df
* Spark dataFrame
* @param expected
* list of values
* @tparam T
* Comparing column type
*/
def compareDifferences[T: SparkType: TypeTag: Equality](
df: DataFrame,
expected: List[Option[T]]
): Unit = {

val equalsColumn = "equals".cname
val result = df
.withColumn(
equalsColumn,
(
col[T](doricCol) === col(sparkCol)
or (
col(doricCol).isNull
and col(sparkCol).isNull
)
).as(equalsColumn)
)
.na
.fill(Map(equalsColumn.value -> false))

implicit val enc: Encoder[(Option[T], Option[T], Boolean)] =
result.sparkSession.implicits
.newProductEncoder[(Option[T], Option[T], Boolean)]
val rows = result.as[(Option[T], Option[T], Boolean)].collect().toList

val doricColumns = rows.map(_._1)
val sparkColumns = rows.map(_._2)
val boolResColumns = rows.map(_._3)

assert(
boolResColumns.reduce(_ && _),
s"\nDoric function & Spark function return different values\n" +
s"Doric : $doricColumns\n" +
s"Spark : $sparkColumns}" +
s"${if (expected.nonEmpty) s"\nExpected: $expected"}"
)

if (expected.nonEmpty) {
import Equalities._
assert(
doricColumns === expected,
s"\nDoric and Spark functions return different values than expected"
)
}
}

private lazy val doricCol = "dcol".cname
private lazy val sparkCol = "scol".cname
implicit class ValidateColumnType(df: DataFrame) {

/**
* Tests doric & spark functions without parameters or columns
Expand Down Expand Up @@ -203,58 +260,37 @@ trait TypedColumnTest extends Matchers {
compareDifferences(result, expected)
}

/**
* @param df
* Spark dataFrame
* @param expected
* list of values
* @tparam T
* Comparing column type
*/
private def compareDifferences[T: SparkType: TypeTag: Equality](
df: DataFrame,
expected: List[Option[T]]
def testColumnsN[T: SparkType: TypeTag: Equality](
struct: StructType
)(
dcolumn: Seq[DoricColumn[_]] => DoricColumn[T],
scolumn: Seq[Column] => Column,
expected: List[Option[T]] = List.empty
): Unit = {

val equalsColumn = "equals".cname
val result = df
.withColumn(
equalsColumn,
(
col[T](doricCol) === col(sparkCol)
or (
col(doricCol).isNull
and col(sparkCol).isNull
)
).as(equalsColumn)
)
.na
.fill(Map(equalsColumn.value -> false))

implicit val enc: Encoder[(Option[T], Option[T], Boolean)] =
result.sparkSession.implicits
.newProductEncoder[(Option[T], Option[T], Boolean)]
val rows = result.as[(Option[T], Option[T], Boolean)].collect().toList

val doricColumns = rows.map(_._1)
val sparkColumns = rows.map(_._2)
val boolResColumns = rows.map(_._3)
val doricColumns: Seq[DoricColumn[_]] = struct.map {
case StructField(name, dataType, _, _) =>
dataType match {
case StringType => colString(name)
case IntegerType => colInt(name)
case LongType => colLong(name)
case DoubleType => colDouble(name)
case BooleanType => colBoolean(name)
case DateType => colDate(name)
case TimestampType => colTimestamp(name)
// TODO
// case ArrayType => colArray(name.cname)
// case StructType => colStruct(name.cname)
// case MapType => colMap(name.cname)
}
}

assert(
boolResColumns.reduce(_ && _),
s"\nDoric function & Spark function return different values\n" +
s"Doric : $doricColumns\n" +
s"Spark : $sparkColumns}" +
s"${if (expected.nonEmpty) s"\nExpected: $expected"}"
val result = df.select(
dcolumn(doricColumns).as(doricCol),
scolumn(struct.map(x => f.col(x.name))).asDoric[T].as(sparkCol)
)

if (expected.nonEmpty) {
import Equalities._
assert(
doricColumns === expected,
s"\nDoric and Spark functions return different values than expected"
)
}
compareDifferences(result, expected)
}

def validateColumnType[T: SparkType](
Expand Down
Loading

0 comments on commit 6a39667

Please sign in to comment.