diff --git a/README.md b/README.md index 7afd0a2f2..3ee6e7f07 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Type-safe columns for DataFrames! ![Maven Central](https://img.shields.io/maven-central/v/org.hablapps/doric_2.12) Doric offers type-safety in DataFrame column expressions at a minimum -cost, without compromising performace. In particular, doric allows you +cost, without compromising performance. In particular, doric allows you to: * Get rid of malformed column expressions at compile time diff --git a/src/main/scala/doric/syntax/CommonColumns.scala b/src/main/scala/doric/syntax/CommonColumns.scala index 8f23aa704..bc851815b 100644 --- a/src/main/scala/doric/syntax/CommonColumns.scala +++ b/src/main/scala/doric/syntax/CommonColumns.scala @@ -60,11 +60,21 @@ trait CommonColumns extends ColGetters[DoricColumn] { * @param other * the column to compare * @return - * a reference to a Boolean DoricColumn whit the comparation + * a reference to a Boolean DoricColumn with the comparation */ def ===(other: DoricColumn[T]): BooleanColumn = (column.elem, other.elem).mapN(_ === _).toDC + /** + * Type safe distinct between Columns + * @param other + * the column to compare + * @return + * a reference to a Boolean DoricColumn with the comparation + */ + def =!=(other: DoricColumn[T]): BooleanColumn = + (column.elem, other.elem).mapN(_ =!= _).toDC + /** * Pipes the column with the provided transformation * @param f diff --git a/src/main/scala/doric/syntax/StringColumns.scala b/src/main/scala/doric/syntax/StringColumns.scala index b09ab8b68..a34b14696 100644 --- a/src/main/scala/doric/syntax/StringColumns.scala +++ b/src/main/scala/doric/syntax/StringColumns.scala @@ -1,20 +1,506 @@ package doric package syntax -import cats.implicits.toTraverseOps - -import org.apache.spark.sql.{functions => f} +import cats.implicits.{catsSyntaxTuple2Semigroupal, catsSyntaxTuple3Semigroupal, catsSyntaxTuple4Semigroupal, toTraverseOps} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.{Column, functions => f} trait StringColumns { + /** + * Returns the first column that is not null, or null if all inputs are null. + * + * For example, `coalesce(a, b, c)` will return a if a is not null, or b if a + * is null and b is not null, or c if both a and b are null but c is not + * null. + * + * @group string_type + * @param cols + * the String DoricColumns to coalesce + * @return + * the first column that is not null, or null if all inputs are null. + */ + def coalesce(cols: StringColumn*): StringColumn = + cols.map(_.elem).toList.sequence.map(f.coalesce(_: _*)).toDC + /** * Concatenate string columns to form a single one + * + * @group string_type * @param cols * the String DoricColumns to concatenate * @return * a reference of a single DoricColumn with all strings concatenated. If at * least one is null will return null. */ - def concat(cols: DoricColumn[String]*): DoricColumn[String] = + def concat(cols: StringColumn*): StringColumn = cols.map(_.elem).toList.sequence.map(f.concat(_: _*)).toDC + + /** + * Formats the arguments in printf-style and returns the result as a string + * column. + * + * @group string_type + * @param format + * Printf format + * @param arguments + * the String DoricColumns to format + * @return + * Formats the arguments in printf-style and returns the result as a string + * column. + */ + def formatString( + format: StringColumn, + arguments: DoricColumn[_]* + ): StringColumn = + (format.elem, arguments.toList.traverse(_.elem)) + .mapN((f, args) => { + new Column(FormatString((f +: args).map(_.expr): _*)) + }) + .toDC + + /** + * Unique column operations + * + * @param s + * Doric String column + */ + implicit class StringOperationsSyntax(s: StringColumn) { + + /** + * ******************************************************** SPARK SQL + * EQUIVALENT FUNCTIONS + * ******************************************************** + */ + + /** + * Computes the numeric value of the first character of the string column, + * and returns the result as an int column. + * + * @group string_type + */ + def ascii: IntegerColumn = s.elem.map(f.ascii).toDC + + /** + * Returns a new string column by converting the first letter of each word + * to uppercase. Words are delimited by whitespace. + * + * For example, "hello world" will become "Hello World". + * + * @group string_type + */ + def initcap: StringColumn = s.elem.map(f.initcap).toDC + + /** + * Locate the position of the first occurrence of substr column in the + * given string. Returns null if either of the arguments are null. + * + * @group string_type + * @note + * The position is not zero based, but 1 based index. Returns 0 if substr + * could not be found in str. + */ + def instr(substring: StringColumn): IntegerColumn = + (s.elem, substring.elem) + .mapN((str, substr) => { + new Column(StringInstr(str.expr, substr.expr)) + }) + .toDC + + /** + * Computes the character length of a given string or number of bytes of a + * binary string. The length of character strings include the trailing + * spaces. The length of binary strings includes binary zeros. + * + * @group string_type + */ + def length: IntegerColumn = s.elem.map(f.length).toDC + + /** + * Computes the Levenshtein distance of the two given string columns. + * + * @group string_type + */ + def levenshtein(dc: StringColumn): IntegerColumn = + (s.elem, dc.elem).mapN(f.levenshtein).toDC + + /** + * Locate the position of the first occurrence of substr in a string + * column, after position pos. + * + * @group string_type + * @note + * The position is not zero based, but 1 based index. returns 0 if substr + * could not be found in str. + */ + def locate( + substr: StringColumn, + pos: IntegerColumn = 1.lit + ): IntegerColumn = + (substr.elem, s.elem, pos.elem) + .mapN((substring, str, position) => { + new Column(StringLocate(substring.expr, str.expr, position.expr)) + }) + .toDC + + /** + * Converts a string column to lower case. + * + * @group string_type + */ + def lower: StringColumn = s.elem.map(f.lower).toDC + + /** + * Left-pad the string column with pad to a length of len. If the string + * column is longer than len, the return value is shortened to len + * characters. + * + * @group string_type + */ + def lpad(len: IntegerColumn, pad: StringColumn): StringColumn = + (s.elem, len.elem, pad.elem) + .mapN((str, lenCol, lpad) => { + new Column(StringLPad(str.expr, lenCol.expr, lpad.expr)) + }) + .toDC + + /** + * Trim the spaces from left end for the specified string value. + * + * @group string_type + */ + def ltrim: StringColumn = s.elem.map(f.ltrim).toDC + + /** + * Trim the specified character string from left end for the specified + * string column. + * + * @group string_type + */ + def ltrim(trimString: StringColumn): StringColumn = + (s.elem, trimString.elem) + .mapN((str, trimStr) => { + new Column(StringTrimLeft(str.expr, trimStr.expr)) + }) + .toDC + + /** + * Overlay the specified portion of `src` with `replace`, starting from + * byte position `pos` of `src` and proceeding for `len` bytes. + * + * @group string_type + */ + def overlay( + replace: StringColumn, + pos: IntegerColumn, + len: IntegerColumn = (-1).lit + ): StringColumn = + (s.elem, replace.elem, pos.elem, len.elem).mapN(f.overlay).toDC + + /** + * Extract a specific group matched by a Java regex, from the specified + * string column. If the regex did not match, or the specified group did + * not match, an empty string is returned. if the specified group index + * exceeds the group count of regex, an IllegalArgumentException will be + * thrown. + * + * @group string_type + */ + def regexpExtract( + exp: StringColumn, + groupIdx: IntegerColumn + ): StringColumn = + (s.elem, exp.elem, groupIdx.elem) + .mapN((str, regexp, gIdx) => + new Column(RegExpExtract(str.expr, regexp.expr, gIdx.expr)) + ) + .toDC + + /** + * Replace all substrings of the specified string value that match regexp + * with replacement. + * + * @group string_type + */ + def regexpReplace( + pattern: StringColumn, + replacement: StringColumn + ): StringColumn = + (s.elem, pattern.elem, replacement.elem).mapN(f.regexp_replace).toDC + + /** + * Repeats a string column n times, and returns it as a new string column. + * + * @group string_type + */ + def repeat(n: IntegerColumn): StringColumn = (s.elem, n.elem) + .mapN((str, times) => new Column(StringRepeat(str.expr, times.expr))) + .toDC + + /** + * Right-pad the string column with pad to a length of len. If the string + * column is longer than len, the return value is shortened to len + * characters. + * + * @group string_type + */ + def rpad(len: IntegerColumn, pad: StringColumn): StringColumn = + (s.elem, len.elem, pad.elem) + .mapN((str, l, p) => new Column(StringRPad(str.expr, l.expr, p.expr))) + .toDC + + /** + * Trim the spaces from right end for the specified string value. + * + * @group string_type + */ + def rtrim: StringColumn = s.elem.map(f.rtrim).toDC + + /** + * Trim the specified character string from right end for the specified + * string column. + * + * @group string_type + */ + def rtrim(trimString: StringColumn): StringColumn = + (s.elem, trimString.elem) + .mapN((str, t) => new Column(StringTrimRight(str.expr, t.expr))) + .toDC + + /** + * Returns the soundex code for the specified expression. + * + * @group string_type + */ + def soundex: StringColumn = s.elem.map(f.soundex).toDC + + /** + * Splits str around matches of the given pattern. + * + * @param pattern + * a string representing a regular expression. The regex string should be + * a Java regular expression. + * + * @group string_type + */ + def split(pattern: StringColumn): ArrayColumn[String] = + split(pattern, (-1).lit) + + /** + * Splits str around matches of the given pattern. + * + * @group string_type + * @param pattern + * a string representing a regular expression. The regex string should be + * a Java regular expression. + * @param limit + * an integer expression which controls the number of times the regex is + * applied. + */ + def split( + pattern: StringColumn, + limit: IntegerColumn + ): ArrayColumn[String] = + (s.elem, pattern.elem, limit.elem) + .mapN((str, p, l) => new Column(StringSplit(str.expr, p.expr, l.expr))) + .toDC + + /** + * Substring starts at `pos` and is of length `len` when str is String type + * or returns the slice of byte array that starts at `pos` in byte and is + * of length `len` when str is Binary type + * + * @group string_type + * @note + * The position is not zero based, but 1 based index. + */ + def substring(pos: IntegerColumn, len: IntegerColumn): StringColumn = + (s.elem, pos.elem, len.elem) + .mapN((str, p, l) => new Column(Substring(str.expr, p.expr, l.expr))) + .toDC + + /** + * Returns the substring from string str before count occurrences of the + * delimiter delim. If count is positive, everything the left of the final + * delimiter (counting from left) is returned. If count is negative, every + * to the right of the final delimiter (counting from the right) is + * returned. substring_index performs a case-sensitive match when searching + * for delim. + * + * @group string_type + */ + def substringIndex( + delim: StringColumn, + count: IntegerColumn + ): StringColumn = + (s.elem, delim.elem, count.elem) + .mapN((str, d, c) => + new Column(SubstringIndex(str.expr, d.expr, c.expr)) + ) + .toDC + + /** + * Translate any character in the src by a character in replaceString. The + * characters in replaceString correspond to the characters in + * matchingString. The translate will happen when any character in the + * string matches the character in the `matchingString`. + * + * @group string_type + */ + def translate( + matchingString: StringColumn, + replaceString: StringColumn + ): StringColumn = + (s.elem, matchingString.elem, replaceString.elem) + .mapN((str, m, r) => + new Column(StringTranslate(str.expr, m.expr, r.expr)) + ) + .toDC + + /** + * Trim the spaces from both ends for the specified string column. + * + * @group string_type + */ + def trim: StringColumn = s.elem.map(f.trim).toDC + + /** + * Trim the specified character from both ends for the specified string + * column (literal). + * + * @group string_type + */ + def trim(trimString: StringColumn): StringColumn = + (s.elem, trimString.elem) + .mapN((str, trimStr) => { + new Column(StringTrim(str.expr, trimStr.expr)) + }) + .toDC + + /** + * Converts a string column to upper case. + * + * @group string_type + */ + def upper: StringColumn = s.elem.map(f.upper).toDC + + /** + * ******************************************************** COLUMN + * FUNCTIONS ******************************************************** + */ + + /** + * Contains the other element. Returns a boolean column based on a string + * match. + * + * @group string_type + */ + def contains(dc: StringColumn): BooleanColumn = + (s.elem, dc.elem).mapN(_.contains(_)).toDC + + /** + * String ends with. Returns a boolean column based on a string match. + * + * @group string_type + */ + def endsWith(dc: StringColumn): BooleanColumn = + (s.elem, dc.elem).mapN(_.endsWith(_)).toDC + + /** + * SQL like expression. Returns a boolean column based on a SQL LIKE match. + * + * @group string_type + */ + def like(literal: StringColumn): BooleanColumn = + (s.elem, literal.elem) + .mapN((str, l) => new Column(new Like(str.expr, l.expr))) + .toDC + + /** + * SQL RLIKE expression (LIKE with Regex). Returns a boolean column based + * on a regex match. + * + * @group string_type + */ + def rLike(literal: StringColumn): BooleanColumn = + (s.elem, literal.elem) + .mapN((str, regex) => new Column(RLike(str.expr, regex.expr))) + .toDC + + /** + * String starts with. Returns a boolean column based on a string match. + * + * @group string_type + */ + def startsWith(dc: StringColumn): BooleanColumn = + (s.elem, dc.elem).mapN(_.startsWith(_)).toDC + + /** + * Same as rLike doric function. + * + * SQL RLIKE expression (LIKE with Regex). Returns a boolean column based + * on a regex match. + * + * @group string_type + */ + def matchRegex(literal: StringColumn): BooleanColumn = rLike(literal) + + /** + * ******************************************************** DORIC FUNCTIONS + * ******************************************************** + */ + + /** + * Similar to concat doric function, but only with two columns + * + * @group string_type + */ + def +(s2: StringColumn): StringColumn = concat(s, s2) + + /** + * Converts the column into a `DateType` with a specified format + * + * See + * Datetime Patterns for valid date and time format patterns + * + * @group string_type + * @param format + * A date time pattern detailing the format of `e` when `e`is a string + * @return + * A date, or null if `e` was a string that could not be cast to a date + * or `format` was an invalid format + */ + def toDate(format: StringColumn): LocalDateColumn = + (s.elem, format.elem) + .mapN((str, dateFormat) => + new Column(new ParseToDate(str.expr, dateFormat.expr)) + ) + .toDC + + /** + * Converts time string with the given pattern to timestamp. + * + * See + * Datetime Patterns for valid date and time format patterns + * + * @group string_type + * @param format + * A date time pattern detailing the format of `s` when `s` is a string + * @return + * A timestamp, or null if `s` was a string that could not be cast to a + * timestamp or `format` was an invalid format + */ + def toTimestamp(format: StringColumn): InstantColumn = + (s.elem, format.elem) + .mapN((str, tsFormat) => + new Column(new ParseToTimestamp(str.expr, tsFormat.expr)) + ) + .toDC + } } diff --git a/src/main/scala/edu/PhoneNumberType.scala b/src/main/scala/edu/PhoneNumberType.scala new file mode 100644 index 000000000..41d368b12 --- /dev/null +++ b/src/main/scala/edu/PhoneNumberType.scala @@ -0,0 +1,14 @@ +package edu + +import doric._ +import doric.types.SparkType + +trait PhoneNumberType + +object PhoneNumberType { + implicit val phoneNumberSparkType: SparkType[String] = SparkType[String] + + implicit class phoneOps(c: DoricColumn[PhoneNumberType]) { + def getPrefix: StringColumn = ??? //c.cast[String] + } +} diff --git a/src/test/scala/doric/Equalities.scala b/src/test/scala/doric/Equalities.scala new file mode 100644 index 000000000..a1082ea86 --- /dev/null +++ b/src/test/scala/doric/Equalities.scala @@ -0,0 +1,36 @@ +package doric + +import org.scalactic._ +import TripleEquals._ + +import java.time.Instant + +object Equalities { + + implicit def eqOptional[O: Equality]: Equality[Option[O]] = + (a: Option[O], b: Any) => + (a, b) match { + case (Some(x), Some(y)) => x === y + case (None, None) => true + case _ => false + } + + implicit def eqList[O: Equality]: Equality[List[O]] = + (a: List[O], b: Any) => + b match { + case l :: rest => + if (a.size == rest.size + 1) { + val zipped = a.zip(l :: rest) + val bools = zipped.map(x => x._1 === x._2) + val r = bools.reduce(_ && _) + r + } else false + case _ => a === b + } + + implicit val eqInstant: Equality[Instant] = (a: Instant, b: Any) => + b match { + case x: Instant => x.equals(a) + case _ => a === b + } +} diff --git a/src/test/scala/doric/SparkSessionTestWrapper.scala b/src/test/scala/doric/SparkSessionTestWrapper.scala index add17c932..b7001a0d8 100644 --- a/src/test/scala/doric/SparkSessionTestWrapper.scala +++ b/src/test/scala/doric/SparkSessionTestWrapper.scala @@ -1,17 +1,23 @@ package doric import org.apache.log4j.{Level, Logger} - import org.apache.spark.sql.SparkSession +import java.util.TimeZone + trait SparkSessionTestWrapper { lazy val spark: SparkSession = { Logger.getLogger("org").setLevel(Level.OFF) + + val timeZone: String = "UTC" + TimeZone.setDefault(TimeZone.getTimeZone(timeZone)) + SparkSession .builder() .master("local") .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.sql.session.timeZone", timeZone) .appName("spark session") .getOrCreate() } diff --git a/src/test/scala/doric/TypedColumnTest.scala b/src/test/scala/doric/TypedColumnTest.scala index f8f73fae1..c8b17fa51 100644 --- a/src/test/scala/doric/TypedColumnTest.scala +++ b/src/test/scala/doric/TypedColumnTest.scala @@ -1,15 +1,224 @@ package doric import scala.reflect.{ClassTag, _} - +import scala.reflect.runtime.universe._ import doric.types.{Casting, SparkType} - import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Column, DataFrame, Encoder} +import org.scalactic._ +import org.scalatest.matchers.should.Matchers -trait TypedColumnTest { +trait TypedColumnTest extends Matchers { implicit class ValidateColumnType(df: DataFrame) { + + private lazy val doricCol = "dcol" + private lazy val sparkCol = "scol" + + /** + * @param column1 + * literal or column name + * @param dcolumn + * function to get doric column + * @param scolumn + * function to get spark column + * @param expected + * list of values + * @tparam I1 + * literal or column type + * @tparam T + * Comparing column type + */ + def testColumns[I1, T: SparkType: TypeTag: Equality]( + column1: I1 + )( + dcolumn: I1 => DoricColumn[T], + scolumn: I1 => Column, + expected: List[Option[T]] = List.empty + ): Unit = { + + val result = df.select( + dcolumn(column1).as(doricCol), + scolumn(column1).asDoric[T].as(sparkCol) + ) + + compareDifferences(result, expected) + } + + /** + * @param column1 + * literal or column name + * @param column2 + * literal or column name + * @param dcolumn + * function to get doric column + * @param scolumn + * function to get spark column + * @param expected + * list of values + * @tparam I1 + * literal or column type + * @tparam I2 + * literal or column type + * @tparam T + * Comparing column type + */ + def testColumns2[I1, I2, T: SparkType: TypeTag: Equality]( + column1: I1, + column2: I2 + )( + dcolumn: (I1, I2) => DoricColumn[T], + scolumn: (I1, I2) => Column, + expected: List[Option[T]] = List.empty + ): Unit = { + + val result = df.select( + dcolumn(column1, column2).as(doricCol), + scolumn(column1, column2).asDoric[T].as(sparkCol) + ) + + compareDifferences(result, expected) + } + + /** + * @param column1 + * literal or column name + * @param column2 + * literal or column name + * @param column3 + * literal or column name + * @param dcolumn + * function to get doric column + * @param scolumn + * function to get spark column + * @param expected + * list of values + * @tparam I1 + * literal or column type + * @tparam I2 + * literal or column type + * @tparam I3 + * literal or column type + * @tparam T + * Comparing column type + */ + def testColumns3[I1, I2, I3, T: SparkType: TypeTag: Equality]( + column1: I1, + column2: I2, + column3: I3 + )( + dcolumn: (I1, I2, I3) => DoricColumn[T], + scolumn: (I1, I2, I3) => Column, + expected: List[Option[T]] = List.empty + ): Unit = { + + val result = df.select( + dcolumn(column1, column2, column3).as(doricCol), + scolumn(column1, column2, column3).asDoric[T].as(sparkCol) + ) + + compareDifferences(result, expected) + } + + /** + * @param column1 + * literal or column name + * @param column2 + * literal or column name + * @param column3 + * literal or column name + * @param column4 + * literal or column name + * @param dcolumn + * function to get doric column + * @param scolumn + * function to get spark column + * @param expected + * list of values + * @tparam I1 + * literal or column type + * @tparam I2 + * literal or column type + * @tparam I3 + * literal or column type + * @tparam I4 + * literal or column type + * @tparam T + * Comparing column type + */ + def testColumns4[I1, I2, I3, I4, T: SparkType: TypeTag: Equality]( + column1: I1, + column2: I2, + column3: I3, + column4: I4 + )( + dcolumn: (I1, I2, I3, I4) => DoricColumn[T], + scolumn: (I1, I2, I3, I4) => Column, + expected: List[Option[T]] = List.empty + ): Unit = { + + val result = df.select( + dcolumn(column1, column2, column3, column4).as(doricCol), + scolumn(column1, column2, column3, column4).asDoric[T].as(sparkCol) + ) + + compareDifferences(result, expected) + } + + /** + * @param df + * Spark dataFrame + * @param expected + * list of values + * @tparam T + * Comparing column type + */ + private def compareDifferences[T: SparkType: TypeTag: Equality]( + df: DataFrame, + expected: List[Option[T]] + ): Unit = { + + val equalsColumn = "equals" + val result = df + .withColumn( + equalsColumn, + ( + col[T](doricCol) === col(sparkCol) + or ( + col(doricCol).isNull + and col(sparkCol).isNull + ) + ).as(equalsColumn) + ) + .na + .fill(Map(equalsColumn -> false)) + + implicit val enc: Encoder[(Option[T], Option[T], Boolean)] = + result.sparkSession.implicits + .newProductEncoder[(Option[T], Option[T], Boolean)] + val rows = result.as[(Option[T], Option[T], Boolean)].collect().toList + + val doricColumns = rows.map(_._1) + val sparkColumns = rows.map(_._2) + val boolResColumns = rows.map(_._3) + + assert( + boolResColumns.reduce(_ && _), + s"\nDoric function & Spark function return different values\n" + + s"Doric : $doricColumns\n" + + s"Spark : $sparkColumns}" + + s"${if (expected.nonEmpty) s"\nExpected: $expected"}" + ) + + if (expected.nonEmpty) { + import Equalities._ + assert( + doricColumns === expected, + s"\nDoric and Spark functions return different values than expected" + ) + } + } + def validateColumnType[T: SparkType]( column: DoricColumn[T], show: Boolean = false diff --git a/src/test/scala/doric/syntax/StringColumnsSpec.scala b/src/test/scala/doric/syntax/StringColumnsSpec.scala new file mode 100644 index 000000000..ce903840d --- /dev/null +++ b/src/test/scala/doric/syntax/StringColumnsSpec.scala @@ -0,0 +1,825 @@ +package doric +package syntax + +import Equalities._ +import org.apache.spark.sql.{functions => f} +import org.scalatest.EitherValues +import org.scalatest.matchers.should.Matchers + +import java.time.format.DateTimeFormatter +import java.time.{Instant, LocalDate, ZoneId} + +class StringColumnsSpec + extends DoricTestElements + with EitherValues + with Matchers { + + describe("coalesce doric function") { + import spark.implicits._ + + val df = List(("1", "1"), (null, "2"), ("3", null), (null, null)) + .toDF("col1", "col2") + + it("should work as spark coalesce function") { + df.testColumns2("col1", "col2")( + (col1, col2) => coalesce(colString(col1), colString(col2)), + (col1, col2) => f.coalesce(f.col(col1), f.col(col2)), + List("1", "2", "3", null).map(Option(_)) + ) + } + } + + describe("concat doric function") { + import spark.implicits._ + + val df = List(("1", "1"), (null, "2"), ("3", null), (null, null)) + .toDF("col1", "col2") + + it("should work as spark concat function") { + df.testColumns2("col1", "col2")( + (col1, col2) => concat(colString(col1), colString(col2)), + (col1, col2) => f.concat(f.col(col1), f.col(col2)), + List("11", null, null, null).map(Option(_)) + ) + } + + it("should work with + function") { + df.testColumns3("col1", "col2", "col1")( + (col1, col2, col3) => + colString(col1) + colString(col2) + colString(col3), + (col1, col2, col3) => f.concat(f.col(col1), f.col(col2), f.col(col3)), + List("111", null, null, null).map(Option(_)) + ) + } + } + + describe("formatString doric function") { + import spark.implicits._ + + it("should work as spark formatString function") { + val df = List(("1", Some(1)), (null, Some(2)), ("3", None), (null, None)) + .toDF("col1", "col2") + + df.testColumns4("Hello World %s %s %d", "col1", "col1", "col2")( + (format, col1, col2, col3) => + formatString( + format.lit, + colString(col1), + colString(col2), + colInt(col3) + ), + (format, col1, col2, col3) => + f.format_string(format, f.col(col1), f.col(col2), f.col(col3)), + List( + "Hello World 1 1 1", + "Hello World null null 2", + "Hello World 3 3 null", + "Hello World null null null" + ) + .map(Option(_)) + ) + } + } + + describe("ascii doric function") { + import spark.implicits._ + + it("should work as spark ascii function") { + val df = List("1", "a", "A", null).toDF("col1") + + df.testColumns("col1")( + c => colString(c).ascii, + c => f.ascii(f.col(c)), + List( + Some(49), + Some(97), + Some(65), + None + ) + ) + } + } + + describe("initcap doric function") { + import spark.implicits._ + + it("should work as spark initcap function") { + val df = List("hello world", "ñañeñiño", "Tú vas a ir a Álaba", "1", null) + .toDF("col1") + + df.testColumns("col1")( + c => colString(c).initcap, + c => f.initcap(f.col(c)), + List("Hello World", "Ñañeñiño", "Tú Vas A Ir A Álaba", "1", null) + .map(Option(_)) + ) + } + } + + describe("instr doric function") { + import spark.implicits._ + + it("should work as spark instr function") { + val df = List("hello world", "ñañeñiño", "Tú vas a ir a Álaba", "1", null) + .toDF("col1") + + df.testColumns2("col1", "a")( + (c, str) => colString(c).instr(str.lit), + (c, str) => f.instr(f.col(c), str), + List(Some(0), Some(2), Some(5), Some(0), None) + ) + } + } + + describe("length doric function") { + import spark.implicits._ + + it("should work as spark length function") { + val df = List("hello world", "ñañeñiño", "Tú vas a ir a Álaba", "1", null) + .toDF("col1") + + df.testColumns("col1")( + c => colString(c).length, + c => f.length(f.col(c)), + List(Some(11), Some(8), Some(19), Some(1), None) + ) + } + } + + describe("levenshtein doric function") { + import spark.implicits._ + + it("should work as spark levenshtein function") { + val df = List( + ("kitten", "sitting"), + ("jander", ""), + (null, "jander"), + ("jander", null), + (null, null) + ).toDF("col1", "col2") + + df.testColumns2("col1", "col2")( + (c, right) => colString(c).levenshtein(colString(right)), + (c, right) => f.levenshtein(f.col(c), f.col(right)), + List(Some(3), Some(6), None, None, None) + ) + } + } + + describe("locate doric function") { + import org.apache.spark.sql.functions.{locate => sparkLocate} + import spark.implicits._ + + val df = List("hello world", "abcde hello hello", "other words", null) + .toDF("col1") + + it("should work as spark locate function with default position") { + df.testColumns2("col1", "hello")( + (c, substr) => colString(c).locate(substr.lit), + (c, substr) => sparkLocate(substr, f.col(c)), + List(Some(1), Some(7), Some(0), None) + ) + } + + it("should work as spark locate function setting a position") { + df.testColumns3("col1", "hello", 4)( + (c, substr, pos) => colString(c).locate(substr.lit, pos.lit), + (c, substr, pos) => sparkLocate(substr, f.col(c), pos), + List(Some(0), Some(7), Some(0), None) + ) + } + } + + describe("lower doric function") { + import spark.implicits._ + + it("should work as spark lower function") { + val df = List("Hello World", "HELLO WORLD", " 1", null).toDF("col1") + + df.testColumns("col1")( + c => colString(c).lower, + c => f.lower(f.col(c)), + List("hello world", "hello world", " 1", null).map(Option(_)) + ) + } + } + + describe("lpad doric function") { + import spark.implicits._ + + val df = List("abcd", "", "1", null).toDF("col1") + + it("should work as spark lpad function") { + df.testColumns3("col1", 7, ".")( + (c, len, pad) => colString(c).lpad(len.lit, pad.lit), + (c, len, pad) => f.lpad(f.col(c), len, pad), + List("...abcd", "." * 7, "." * 6 + "1", null).map(Option(_)) + ) + } + + it("should cut the text if lpad length < than text length") { + df.testColumns3("col1", 0, ".")( + (c, len, pad) => colString(c).lpad(len.lit, pad.lit), + (c, len, pad) => f.lpad(f.col(c), len, pad), + List("", "", "", null).map(Option(_)) + ) + } + + it("should do nothing if pad is empty") { + df.testColumns3("col1", 7, "")( + (c, len, pad) => colString(c).lpad(len.lit, pad.lit), + (c, len, pad) => f.lpad(f.col(c), len, pad), + List("abcd", "", "1", null).map(Option(_)) + ) + } + } + + describe("ltrim doric function") { + import spark.implicits._ + + it("should work as spark ltrim function") { + val df = List(" hello world ", "hello world ", " hello world", null) + .toDF("col1") + + df.testColumns("col1")( + c => colString(c).ltrim, + c => f.ltrim(f.col(c)), + List("hello world ", "hello world ", "hello world", null).map( + Option(_) + ) + ) + } + + it("should work as spark ltrim function with trim argument") { + val df = + List("--hello world--", "hello world--", "--hello world", null).toDF( + "col1" + ) + + df.testColumns2("col1", "-")( + (c, t) => colString(c).ltrim(t.lit), + (c, t) => f.ltrim(f.col(c), t), + List("hello world--", "hello world--", "hello world", null).map( + Option(_) + ) + ) + } + + it("should do nothing if empty trimString") { + val df = + List("--hello world--", "hello world--", "--hello world", null) + .toDF("col1") + + df.testColumns2("col1", "")( + (c, t) => colString(c).ltrim(t.lit), + (c, t) => f.ltrim(f.col(c), t), + List("--hello world--", "hello world--", "--hello world", null).map( + Option(_) + ) + ) + } + } + + describe("overlay doric function") { + import spark.implicits._ + + it("should work as spark overlay function") { + val df = List( + ("hello world", "LAMBDA WORLD", Some(7)), + ("123456", "987654", Some(0)), + ("hello world", "", Some(7)), + ("hello world", null, Some(7)), + ("hello world", "LAMBDA WORLD", None), + (null, "LAMBDA WORLD", Some(1)) + ).toDF("col1", "col2", "col3") + + df.testColumns3("col1", "col2", "col3")( + ( + str, + repl, + pos + ) => colString(str).overlay(colString(repl), colInt(pos)), + (str, repl, pos) => f.overlay(f.col(str), f.col(repl), f.col(pos)), + List( + "hello LAMBDA WORLD", + "9876546", + "hello world", + null, + null, + null + ).map(Option(_)) + ) + } + + it("should work as spark overlay function with length parameter") { + val df = List( + ("hello world", "LAMBDA WORLD", Some(7), Some(6)), + ("123456", "987654", Some(0), Some(20)), + ("hello world", "", Some(7), Some(20)), + ("hello world", null, Some(7), Some(20)), + ("hello world", "LAMBDA WORLD", None, Some(20)), + ("hello world", "LAMBDA WORLD", Some(5), None), + (null, "LAMBDA WORLD", Some(1), Some(20)) + ).toDF("col1", "col2", "col3", "col4") + + df.testColumns4("col1", "col2", "col3", "col4")( + (str, repl, pos, len) => + colString(str).overlay(colString(repl), colInt(pos), colInt(len)), + (str, repl, pos, len) => + f.overlay(f.col(str), f.col(repl), f.col(pos), f.col(len)), + List("hello LAMBDA WORLD", "987654", "hello ", null, null, null, null) + .map(Option(_)) + ) + } + } + + describe("regexpExtract doric function") { + import spark.implicits._ + + val df = List("100-200", "100-a", null).toDF("col1") + + it("should work as spark regexpExtract function") { + df.testColumns3("col1", "(\\d+)-(\\d+)", 1)( + (str, regex, group) => + colString(str).regexpExtract(regex.lit, group.lit), + (str, regex, group) => f.regexp_extract(f.col(str), regex, group), + List("100", "", null).map(Option(_)) + ) + } + + it("should work as spark regexpExtract function if regex is empty") { + df.testColumns3("col1", "", 0)( + (str, regex, group) => + colString(str).regexpExtract(regex.lit, group.lit), + (str, regex, group) => f.regexp_extract(f.col(str), regex, group), + List("", "", null).map(Option(_)) + ) + } + + it("should raise an error if group > regex group result") { + intercept[java.lang.IllegalArgumentException] { + df.withColumn( + "res", + colString("col1").regexpExtract("(\\d+)-(\\d+)".lit, 4.lit) + ).collect() + } + } + } + + describe("regexpReplace doric function") { + import spark.implicits._ + + val df = List( + ("hello world", "world", "everybody"), + ("hello world", null, "everybody"), + ("hello world", "world", null), + (null, "world", "everybody"), + (null, null, null) + ).toDF("str", "pattern", "replacement") + + it("should work as spark regexpReplace function") { + df.testColumns3("str", "pattern", "replacement")( + (str, p, r) => colString(str).regexpReplace(colString(p), colString(r)), + (str, p, r) => f.regexp_replace(f.col(str), f.col(p), f.col(r)), + List("hello everybody", null, null, null, null).map(Option(_)) + ) + } + } + + describe("repeat doric function") { + import spark.implicits._ + + val df = List("hello world", "12345", null).toDF("col1") + + it("should work as spark repeat function") { + df.testColumns2("col1", 2)( + (str, repeat) => colString(str).repeat(repeat.lit), + (str, repeat) => f.repeat(f.col(str), repeat), + List("hello worldhello world", "1234512345", null).map(Option(_)) + ) + } + + it("should empty the string column if repeat = 0") { + df.testColumns2("col1", 0)( + (str, repeat) => colString(str).repeat(repeat.lit), + (str, repeat) => f.repeat(f.col(str), repeat), + List("", "", null).map(Option(_)) + ) + } + } + + describe("rpad doric function") { + import spark.implicits._ + + val df = List("abcd", "", "1", null).toDF("col1") + + it("should work as spark rpad function") { + df.testColumns3("col1", 7, ".")( + (c, len, pad) => colString(c).rpad(len.lit, pad.lit), + (c, len, pad) => f.rpad(f.col(c), len, pad), + List("abcd...", "." * 7, "1" + "." * 6, null).map(Option(_)) + ) + } + + it("should cut the text if rpad length < than text length") { + df.testColumns3("col1", 0, ".")( + (c, len, pad) => colString(c).rpad(len.lit, pad.lit), + (c, len, pad) => f.rpad(f.col(c), len, pad), + List("", "", "", null).map(Option(_)) + ) + } + + it("should do nothing if pad is empty") { + df.testColumns3("col1", 7, "")( + (c, len, pad) => colString(c).rpad(len.lit, pad.lit), + (c, len, pad) => f.rpad(f.col(c), len, pad), + List("abcd", "", "1", null).map(Option(_)) + ) + } + } + + describe("rtrim doric function") { + import spark.implicits._ + + it("should work as spark rtrim function") { + val df = List(" hello world ", "hello world ", " hello world", null) + .toDF("col1") + + df.testColumns("col1")( + c => colString(c).rtrim, + c => f.rtrim(f.col(c)), + List(" hello world", "hello world", " hello world", null).map( + Option(_) + ) + ) + } + + it("should work as spark rtrim function with trim argument") { + val df = + List("--hello world--", "hello world--", "--hello world", null).toDF( + "col1" + ) + + df.testColumns2("col1", "-")( + (c, t) => colString(c).rtrim(t.lit), + (c, t) => f.rtrim(f.col(c), t), + List("--hello world", "hello world", "--hello world", null).map( + Option(_) + ) + ) + } + + it("should do nothing if empty trimString") { + val df = + List("--hello world--", "hello world--", "--hello world", null) + .toDF("col1") + + df.testColumns2("col1", "")( + (c, t) => colString(c).rtrim(t.lit), + (c, t) => f.rtrim(f.col(c), t), + List("--hello world--", "hello world--", "--hello world", null).map( + Option(_) + ) + ) + } + } + + describe("soundex doric function") { + import spark.implicits._ + + val df = List("hello world", "12345", null).toDF("col1") + + it("should work as spark soundex function") { + df.testColumns("col1")( + str => colString(str).soundex, + str => f.soundex(f.col(str)), + List("H464", "12345", null).map(Option(_)) + ) + } + } + + describe("split doric function") { + import spark.implicits._ + + val df = List("hello world", "12345", null).toDF("col1") + + it("should work as spark split function") { + df.testColumns2("col1", " ")( + (str, pattern) => colString(str).split(pattern.lit), + (str, pattern) => f.split(f.col(str), pattern), + List(Array("hello", "world"), Array("12345"), null).map(Option(_)) + ) + } + + it("should split every char if no pattern is empty") { + df.testColumns2("col1", "")( + (str, pattern) => colString(str).split(pattern.lit), + (str, pattern) => f.split(f.col(str), pattern), + List( + Array("h", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d", ""), + Array("1", "2", "3", "4", "5", ""), + null + ).map(Option(_)) + ) + } + + it("should stop splitting if limit is set") { + val df2 = List("how are you", "hello world", "12345", null).toDF("col1") + + df2.testColumns3("col1", " ", 2)( + (str, pattern, limit) => colString(str).split(pattern.lit, limit.lit), + (str, pattern, limit) => f.split(f.col(str), pattern, limit), + List( + Array("how", "are you"), + Array("hello", "world"), + Array("12345"), + null + ).map(Option(_)) + ) + } + } + + describe("substring doric function") { + import spark.implicits._ + + val df = List("hello world", "12345", null).toDF("col1") + + it("should work as spark substring function") { + df.testColumns3("col1", 3, 5)( + (str, pos, len) => colString(str).substring(pos.lit, len.lit), + (str, pos, len) => f.substring(f.col(str), pos, len), + List("llo w", "345", null).map(Option(_)) + ) + } + + it("should generate empty string if pos > length of string") { + df.testColumns3("col1", 30, 5)( + (str, pos, len) => colString(str).substring(pos.lit, len.lit), + (str, pos, len) => f.substring(f.col(str), pos, len), + List("", "", null).map(Option(_)) + ) + } + + it("should generate empty string if len = 0") { + df.testColumns3("col1", 3, 0)( + (str, pos, len) => colString(str).substring(pos.lit, len.lit), + (str, pos, len) => f.substring(f.col(str), pos, len), + List("", "", null).map(Option(_)) + ) + } + } + + describe("substringIndex doric function") { + import spark.implicits._ + + val df = List("hello world", "12345", null).toDF("col1") + + it("should work as spark substringIndex function") { + df.testColumns3("col1", " ", 1)( + (str, delim, count) => + colString(str).substringIndex(delim.lit, count.lit), + (str, delim, count) => f.substring_index(f.col(str), delim, count), + List("hello", "12345", null).map(Option(_)) + ) + } + + it("should work as spark substringIndex function if count < 0") { + df.testColumns3("col1", " ", -1)( + (str, delim, count) => + colString(str).substringIndex(delim.lit, count.lit), + (str, delim, count) => f.substring_index(f.col(str), delim, count), + List("world", "12345", null).map(Option(_)) + ) + } + + it("should empty strings if delim is empty") { + df.testColumns3("col1", "", 1)( + (str, delim, count) => + colString(str).substringIndex(delim.lit, count.lit), + (str, delim, count) => f.substring_index(f.col(str), delim, count), + List("", "", null).map(Option(_)) + ) + } + + it("should empty strings if count = 0") { + df.testColumns3("col1", " ", 0)( + (str, delim, count) => + colString(str).substringIndex(delim.lit, count.lit), + (str, delim, count) => f.substring_index(f.col(str), delim, count), + List("", "", null).map(Option(_)) + ) + } + } + + describe("translate doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null).toDF("col1") + + it("should work as spark translate function") { + df.testColumns3("col1", "l wd2345eh", "L.Wd1111")( + (str, matching, replace) => + colString(str).translate(matching.lit, replace.lit), + (str, matching, replace) => f.translate(f.col(str), matching, replace), + List("LLo.WorLd", "111116", null).map(Option(_)) + ) + } + + it("should do nothing id matching string is empty") { + df.testColumns3("col1", "", "L.Wd1111")( + (str, matching, replace) => + colString(str).translate(matching.lit, replace.lit), + (str, matching, replace) => f.translate(f.col(str), matching, replace), + List("hello world", "123456", null).map(Option(_)) + ) + } + + it("should remove characters if replace is empty") { + df.testColumns3("col1", "l wd2345", "")( + (str, matching, replace) => + colString(str).translate(matching.lit, replace.lit), + (str, matching, replace) => f.translate(f.col(str), matching, replace), + List("heoor", "16", null).map(Option(_)) + ) + } + } + + describe("trim doric function") { + import spark.implicits._ + + val df = List(" hello world ", "hello world ", " hello world", null) + .toDF("col1") + + it("should work as spark trim function") { + df.testColumns("col1")( + c => colString(c).trim, + c => f.trim(f.col(c)), + List("hello world", "hello world", "hello world", null).map(Option(_)) + ) + } + + it("should work as spark trim function with trim argument") { + df.testColumns2("col1", " ")( + (c, t) => colString(c).trim(t.lit), + (c, t) => f.trim(f.col(c), t), + List("hello world", "hello world", "hello world", null).map(Option(_)) + ) + } + + it("should do nothing if trim argument is empty") { + + df.testColumns2("col1", "")( + (c, t) => colString(c).trim(t.lit), + (c, t) => f.trim(f.col(c), t), + List(" hello world ", "hello world ", " hello world", null).map( + Option(_) + ) + ) + } + } + + describe("upper doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null) + .toDF("col1") + + it("should work as spark upper function") { + df.testColumns("col1")( + c => colString(c).upper, + c => f.upper(f.col(c)), + List("HELLO WORLD", "123456", null).map(Option(_)) + ) + } + } + + describe("contains doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null).toDF("col1") + + it("should work as spark contains function") { + df.testColumns2("col1", "world")( + (c, str) => colString(c).contains(str.lit), + (c, str) => f.col(c).contains(str), + List(Some(true), Some(false), None) + ) + } + } + + describe("endsWith doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null).toDF("col1") + + it("should work as spark endsWith function") { + df.testColumns2("col1", "world")( + (c, str) => colString(c).endsWith(str.lit), + (c, str) => f.col(c).endsWith(str), + List(Some(true), Some(false), None) + ) + } + } + + describe("like doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null).toDF("col1") + + it("should work as spark like function") { + df.testColumns2("col1", "%45%")( + (c, regex) => colString(c).like(regex.lit), + (c, regex) => f.col(c).like(regex), + List(Some(false), Some(true), None) + ) + } + } + + describe("rLike doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null).toDF("col1") + + it("should work as spark rlike function") { + df.testColumns2("col1", "^[0-9]*$")( + (c, regex) => colString(c).rLike(regex.lit), + (c, regex) => f.col(c).rlike(regex), + List(Some(false), Some(true), None) + ) + } + + it("should work with matchRegex function") { + df.testColumns2("col1", "^[0-9]*$")( + (c, regex) => colString(c).matchRegex(regex.lit), + (c, regex) => f.col(c).rlike(regex), + List(Some(false), Some(true), None) + ) + } + } + + describe("startsWith doric function") { + import spark.implicits._ + + val df = List("hello world", "123456", null).toDF("col1") + + it("should work as spark startsWith function") { + df.testColumns2("col1", "hello")( + (c, str) => colString(c).startsWith(str.lit), + (c, str) => f.col(c).startsWith(str), + List(Some(true), Some(false), None) + ) + } + } + + describe("toDate doric function") { + import spark.implicits._ + + val df = List("28/05/2021", "28-05-21", null).toDF("col1") + + it( + "should work as spark toDate function (returning null if malformed format)" + ) { + df.testColumns2("col1", "dd/MM/yyyy")( + (c, str) => colString(c).toDate(str.lit), + (c, str) => f.to_date(f.col(c), str), + List(Some(LocalDate.of(2021, 5, 28)), None, None) + ) + } + + it("should return none if empty format") { + df.testColumns2("col1", "")( + (c, str) => colString(c).toDate(str.lit), + (c, str) => f.to_date(f.col(c), str), + List(None, None, None) + ) + } + } + + describe("toTimestamp doric function") { + import spark.implicits._ + + val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME + .withZone(ZoneId.systemDefault()) + val expectedTime: Instant = Instant.parse("2021-05-28T10:55:23Z") + val df = List(formatter.format(expectedTime), "28/05/21 04:27:31", null) + .toDF("col1") + + it( + "should work as spark toTimestamp function (returning null if malformed date)" + ) { + df.testColumns2("col1", "yyyy-MM-dd'T'hh:mm:ss")( + (c, str) => colString(c).toTimestamp(str.lit), + (c, str) => f.to_timestamp(f.col(c), str), + List(Some(expectedTime), None, None) + ) + } + + it("should return none if empty format") { + df.testColumns2("col1", "")( + (c, str) => colString(c).toTimestamp(str.lit), + (c, str) => f.to_timestamp(f.col(c), str), + List(None, None, None) + ) + } + } +}