Skip to content

Commit

Permalink
feat: String functions #71
Browse files Browse the repository at this point in the history
  • Loading branch information
eruizalo committed Nov 17, 2021
1 parent 0ebf60f commit 07f1c26
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 1 deletion.
28 changes: 27 additions & 1 deletion core/src/main/scala/doric/syntax/BinaryColumns.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package doric
package syntax

import cats.implicits.catsSyntaxTuple2Semigroupal

import doric.types.{BinaryType, SparkType}
import org.apache.spark.sql.{functions => f}

import org.apache.spark.sql.catalyst.expressions.Decode
import org.apache.spark.sql.{Column, functions => f}

private[syntax] trait BinaryColumns {

Expand Down Expand Up @@ -42,6 +46,28 @@ private[syntax] trait BinaryColumns {
* @group Binary Type
*/
def crc32: LongColumn = column.elem.map(f.crc32).toDC

/**
* Computes the BASE64 encoding of a binary column and returns it as a string column.
* This is the reverse of unbase64.
*
* @group Binary Type
*/
def base64: StringColumn = column.elem.map(f.base64).toDC

/**
* Computes the first argument into a string from a binary using the provided character set
* (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
* If either argument is null, the result will also be null.
*
* @group Binary Type
*/
def decode(charset: StringColumn): StringColumn =
(column.elem, charset.elem)
.mapN((col, char) => {
new Column(Decode(col.expr, char.expr))
})
.toDC
}

}
19 changes: 19 additions & 0 deletions core/src/main/scala/doric/syntax/NumericColumns.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package doric
package syntax

import cats.implicits.catsSyntaxTuple2Semigroupal
import doric.DoricColumn.sparkFunction
import doric.types.NumericType
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.FormatNumber

private[syntax] trait NumericColumns {

Expand Down Expand Up @@ -64,6 +67,22 @@ private[syntax] trait NumericColumns {
def <=(other: DoricColumn[T]): BooleanColumn =
sparkFunction[T, Boolean](column, other, _ <= _)

/**
* Formats numeric column x to a format like '#,###,###.##', rounded to d decimal places
* with HALF_EVEN round mode, and returns the result as a string column.
*
* If d is 0, the result has no decimal point or fractional part.
* If d is less than 0, the result will be null.
*
* @group Numeric Type
*/
def formatNumber(decimals: IntegerColumn): StringColumn =
(column.elem, decimals.elem)
.mapN((c, d) => {
new Column(FormatNumber(c.expr, d.expr))
})
.toDC

}

}
22 changes: 22 additions & 0 deletions core/src/main/scala/doric/syntax/StringColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,28 @@ private[syntax] trait StringColumns {
*/
def matchRegex(literal: StringColumn): BooleanColumn = rLike(literal)

/**
* Computes the first argument into a binary from a string using the provided character set
* (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
* If either argument is null, the result will also be null.
*
* @group String Type
*/
def encode(charset: StringColumn): BinaryColumn =
(s.elem, charset.elem)
.mapN((col, char) => {
new Column(Encode(col.expr, char.expr))
})
.toDC

/**
* Decodes a BASE64 encoded string column and returns it as a binary column.
* This is the reverse of base64.
*
* @group String Type
*/
def unbase64: BinaryColumn = s.elem.map(f.unbase64).toDC

/**
* ********************************************************
* DORIC FUNCTIONS
Expand Down
66 changes: 66 additions & 0 deletions core/src/test/scala/doric/syntax/BinaryColumnsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,70 @@ class BinaryColumnsSpec
}
}

describe("base64 doric function") {
import spark.implicits._

it("should work as spark base64 function with strings") {
val df = List("this is a string", null)
.toDF("col1")

df.testColumns("col1")(
c => colString(c).base64,
c => f.base64(f.col(c)),
List(
Some("dGhpcyBpcyBhIHN0cmluZw=="),
None
)
)
}

it("should work as spark base64 function with array of bytes") {
val df = List(Array[Byte](1, 2, 3, 4, 5))
.toDF("col1")

df.testColumns("col1")(
c => colBinary(c).base64,
c => f.base64(f.col(c)),
List(
Some("AQIDBAU=")
)
)
}
}

describe("decode doric function") {
import spark.implicits._

it("should work as spark decode function with strings") {
val df = List("this is a string", null)
.toDF("col1")

df.testColumns2("col1", "UTF-8")(
(c, charset) => colString(c).decode(charset.lit),
(c, charset) => f.decode(f.col(c), charset),
List(
Some("this is a string"),
None
)
)
}

it("should work as spark decode function with array of bytes") {
val df = List(
Array[Byte](116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 115, 116, 114,
105, 110, 103),
null
).toDF("col1")

df.testColumns2("col1", "UTF-8")(
(c, charset) => colBinary(c).decode(charset.lit),
(c, charset) => f.decode(f.col(c), charset),
List(
Some("this is a string"),
None
)
)
}
}

}
16 changes: 16 additions & 0 deletions core/src/test/scala/doric/syntax/NumericOperationsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import scala.reflect.{classTag, ClassTag}
import doric.types.{NumericType, SparkType}
import org.scalatest.funspec.AnyFunSpecLike

import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.DataFrame

trait NumericOperationsSpec extends AnyFunSpecLike with TypedColumnTest {
Expand Down Expand Up @@ -78,4 +79,19 @@ class NumericSpec extends NumericOperationsSpec with SparkSessionTestWrapper {
test[Float]()
test[Long]()
test[Double]()

describe("formatNumber doric function") {
import spark.implicits._

it("should work as spark format_number function") {
val df = List(Some(123.567), Some(1.0001), None)
.toDF("col1")

df.testColumns2("col1", 1)(
(c, d) => colDouble(c.cname).formatNumber(d.lit),
(c, d) => f.format_number(f.col(c), d),
List(Some("123.6"), Some("1.0"), None)
)
}
}
}
40 changes: 40 additions & 0 deletions core/src/test/scala/doric/syntax/StringColumnsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -834,4 +834,44 @@ class StringColumnsSpec
doricErr.getMessage shouldBe sparkErr.getMessage
}
}

describe("encode doric function") {
import spark.implicits._

it("should work as spark encode function") {
val df = List("this is a string", null)
.toDF("col1")

df.testColumns2("col1", "UTF-8")(
(c, charset) => colString(c).encode(charset.lit),
(c, charset) => f.encode(f.col(c), charset),
List(
Some(
Array[Byte](116, 104, 105, 115, 32, 105, 115, 32, 97, 32, 115, 116,
114, 105, 110, 103)
),
None
)
)
}
}

describe("unbase64 doric function") {
import spark.implicits._

it("should work as spark unbase64 function") {
val df = List("AQIDBAU=", null)
.toDF("col1")

df.testColumns("col1")(
c => colString(c).unbase64,
c => f.unbase64(f.col(c)),
List(
Some(Array[Byte](1, 2, 3, 4, 5)),
None
)
)
}
}

}

0 comments on commit 07f1c26

Please sign in to comment.