From 07f1c264eebe53953a01134e59edd70ecaa90f7e Mon Sep 17 00:00:00 2001 From: Eduardo Ruiz Date: Sun, 7 Nov 2021 19:28:43 +0100 Subject: [PATCH] feat: String functions #71 --- .../scala/doric/syntax/BinaryColumns.scala | 28 +++++++- .../scala/doric/syntax/NumericColumns.scala | 19 ++++++ .../scala/doric/syntax/StringColumns.scala | 22 +++++++ .../doric/syntax/BinaryColumnsSpec.scala | 66 +++++++++++++++++++ .../doric/syntax/NumericOperationsSpec.scala | 16 +++++ .../doric/syntax/StringColumnsSpec.scala | 40 +++++++++++ 6 files changed, 190 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/doric/syntax/BinaryColumns.scala b/core/src/main/scala/doric/syntax/BinaryColumns.scala index bb07d4979..5e4b73fbe 100644 --- a/core/src/main/scala/doric/syntax/BinaryColumns.scala +++ b/core/src/main/scala/doric/syntax/BinaryColumns.scala @@ -1,8 +1,12 @@ package doric package syntax +import cats.implicits.catsSyntaxTuple2Semigroupal + import doric.types.{BinaryType, SparkType} -import org.apache.spark.sql.{functions => f} + +import org.apache.spark.sql.catalyst.expressions.Decode +import org.apache.spark.sql.{Column, functions => f} private[syntax] trait BinaryColumns { @@ -42,6 +46,28 @@ private[syntax] trait BinaryColumns { * @group Binary Type */ def crc32: LongColumn = column.elem.map(f.crc32).toDC + + /** + * Computes the BASE64 encoding of a binary column and returns it as a string column. + * This is the reverse of unbase64. + * + * @group Binary Type + */ + def base64: StringColumn = column.elem.map(f.base64).toDC + + /** + * Computes the first argument into a string from a binary using the provided character set + * (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). + * If either argument is null, the result will also be null. + * + * @group Binary Type + */ + def decode(charset: StringColumn): StringColumn = + (column.elem, charset.elem) + .mapN((col, char) => { + new Column(Decode(col.expr, char.expr)) + }) + .toDC } } diff --git a/core/src/main/scala/doric/syntax/NumericColumns.scala b/core/src/main/scala/doric/syntax/NumericColumns.scala index eb7d1d4ba..500bada33 100644 --- a/core/src/main/scala/doric/syntax/NumericColumns.scala +++ b/core/src/main/scala/doric/syntax/NumericColumns.scala @@ -1,8 +1,11 @@ package doric package syntax +import cats.implicits.catsSyntaxTuple2Semigroupal import doric.DoricColumn.sparkFunction import doric.types.NumericType +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.expressions.FormatNumber private[syntax] trait NumericColumns { @@ -64,6 +67,22 @@ private[syntax] trait NumericColumns { def <=(other: DoricColumn[T]): BooleanColumn = sparkFunction[T, Boolean](column, other, _ <= _) + /** + * Formats numeric column x to a format like '#,###,###.##', rounded to d decimal places + * with HALF_EVEN round mode, and returns the result as a string column. + * + * If d is 0, the result has no decimal point or fractional part. + * If d is less than 0, the result will be null. + * + * @group Numeric Type + */ + def formatNumber(decimals: IntegerColumn): StringColumn = + (column.elem, decimals.elem) + .mapN((c, d) => { + new Column(FormatNumber(c.expr, d.expr)) + }) + .toDC + } } diff --git a/core/src/main/scala/doric/syntax/StringColumns.scala b/core/src/main/scala/doric/syntax/StringColumns.scala index 1dfa76825..5ed4b307c 100644 --- a/core/src/main/scala/doric/syntax/StringColumns.scala +++ b/core/src/main/scala/doric/syntax/StringColumns.scala @@ -435,6 +435,28 @@ private[syntax] trait StringColumns { */ def matchRegex(literal: StringColumn): BooleanColumn = rLike(literal) + /** + * Computes the first argument into a binary from a string using the provided character set + * (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'). + * If either argument is null, the result will also be null. + * + * @group String Type + */ + def encode(charset: StringColumn): BinaryColumn = + (s.elem, charset.elem) + .mapN((col, char) => { + new Column(Encode(col.expr, char.expr)) + }) + .toDC + + /** + * Decodes a BASE64 encoded string column and returns it as a binary column. + * This is the reverse of base64. + * + * @group String Type + */ + def unbase64: BinaryColumn = s.elem.map(f.unbase64).toDC + /** * ******************************************************** * DORIC FUNCTIONS diff --git a/core/src/test/scala/doric/syntax/BinaryColumnsSpec.scala b/core/src/test/scala/doric/syntax/BinaryColumnsSpec.scala index 5ed67f134..285833304 100644 --- a/core/src/test/scala/doric/syntax/BinaryColumnsSpec.scala +++ b/core/src/test/scala/doric/syntax/BinaryColumnsSpec.scala @@ -155,4 +155,70 @@ class BinaryColumnsSpec } } + describe("base64 doric function") { + import spark.implicits._ + + it("should work as spark base64 function with strings") { + val df = List("this is a string", null) + .toDF("col1") + + df.testColumns("col1")( + c => colString(c).base64, + c => f.base64(f.col(c)), + List( + Some("dGhpcyBpcyBhIHN0cmluZw=="), + None + ) + ) + } + + it("should work as spark base64 function with array of bytes") { + val df = List(Array[Byte](1, 2, 3, 4, 5)) + .toDF("col1") + + df.testColumns("col1")( + c => colBinary(c).base64, + c => f.base64(f.col(c)), + List( + Some("AQIDBAU=") + ) + ) + } + } + + describe("decode doric function") { + import spark.implicits._ + + it("should work as spark decode function with strings") { + val df = List("this is a string", null) + .toDF("col1") + + df.testColumns2("col1", "UTF-8")( + (c, charset) => colString(c).decode(charset.lit), + (c, charset) => f.decode(f.col(c), charset), + List( + Some("this is a string"), + None + ) + ) + } + + it("should work as spark decode function with array of bytes") { + val df = List( + Array[Byte](116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 115, 116, 114, + 105, 110, 103), + null + ).toDF("col1") + + df.testColumns2("col1", "UTF-8")( + (c, charset) => colBinary(c).decode(charset.lit), + (c, charset) => f.decode(f.col(c), charset), + List( + Some("this is a string"), + None + ) + ) + } + } + } diff --git a/core/src/test/scala/doric/syntax/NumericOperationsSpec.scala b/core/src/test/scala/doric/syntax/NumericOperationsSpec.scala index a689ca0e8..33e665b2d 100644 --- a/core/src/test/scala/doric/syntax/NumericOperationsSpec.scala +++ b/core/src/test/scala/doric/syntax/NumericOperationsSpec.scala @@ -6,6 +6,7 @@ import scala.reflect.{classTag, ClassTag} import doric.types.{NumericType, SparkType} import org.scalatest.funspec.AnyFunSpecLike +import org.apache.spark.sql.{functions => f} import org.apache.spark.sql.DataFrame trait NumericOperationsSpec extends AnyFunSpecLike with TypedColumnTest { @@ -78,4 +79,19 @@ class NumericSpec extends NumericOperationsSpec with SparkSessionTestWrapper { test[Float]() test[Long]() test[Double]() + + describe("formatNumber doric function") { + import spark.implicits._ + + it("should work as spark format_number function") { + val df = List(Some(123.567), Some(1.0001), None) + .toDF("col1") + + df.testColumns2("col1", 1)( + (c, d) => colDouble(c.cname).formatNumber(d.lit), + (c, d) => f.format_number(f.col(c), d), + List(Some("123.6"), Some("1.0"), None) + ) + } + } } diff --git a/core/src/test/scala/doric/syntax/StringColumnsSpec.scala b/core/src/test/scala/doric/syntax/StringColumnsSpec.scala index 99da0dce6..707d1d621 100644 --- a/core/src/test/scala/doric/syntax/StringColumnsSpec.scala +++ b/core/src/test/scala/doric/syntax/StringColumnsSpec.scala @@ -834,4 +834,44 @@ class StringColumnsSpec doricErr.getMessage shouldBe sparkErr.getMessage } } + + describe("encode doric function") { + import spark.implicits._ + + it("should work as spark encode function") { + val df = List("this is a string", null) + .toDF("col1") + + df.testColumns2("col1", "UTF-8")( + (c, charset) => colString(c).encode(charset.lit), + (c, charset) => f.encode(f.col(c), charset), + List( + Some( + Array[Byte](116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 115, 116, + 114, 105, 110, 103) + ), + None + ) + ) + } + } + + describe("unbase64 doric function") { + import spark.implicits._ + + it("should work as spark unbase64 function") { + val df = List("AQIDBAU=", null) + .toDF("col1") + + df.testColumns("col1")( + c => colString(c).unbase64, + c => f.unbase64(f.col(c)), + List( + Some(Array[Byte](1, 2, 3, 4, 5)), + None + ) + ) + } + } + }