Skip to content

Commit

Permalink
Cross build doric from Spark 3.0 to 3.2 (issue #159) (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
alfonsorr authored Mar 3, 2022
1 parent 7e0a7f7 commit 41f1766
Show file tree
Hide file tree
Showing 33 changed files with 795 additions and 466 deletions.
2 changes: 1 addition & 1 deletion .scalafix.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
OrganizeImports {
blankLines = Auto
coalesceToWildcardImportThreshold = 5
groups = ["habla.", "org.apache.spark.", "*"]
groups = ["doric.", "org.apache.spark.", "*"]
groupedImports = Merge
importSelectorsOrder = Ascii
removeUnused = true
Expand Down
127 changes: 108 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
import sbt.Compile
import sbt.{Compile, Def}

val sparkDefaultShortVersion = "3.1"
val spark30Version = "3.0.3"
val spark31Version = "3.1.3"
val spark32Version = "3.2.1"

val versionRegex = """^(.*)\.(.*)\.(.*)$""".r

val scala212 = "2.12.15"
val scala213 = "2.13.8"

val sparkShort: String => String = {
case "3.0" => spark30Version
case "3.1" => spark31Version
case "3.2" => spark32Version
}

val sparkLong2ShortVersion: String => String = {
case versionRegex("3", "0", _) => "3.0"
case versionRegex("3", "1", _) => "3.1"
case versionRegex("3", "2", _) => "3.2"
}

val scalaVersionSelect: String => String = {
case versionRegex("3", "0", _) => scala212
case versionRegex("3", "1", _) => scala212
case versionRegex("3", "2", _) => scala212
}

ThisBuild / organization := "org.hablapps"
ThisBuild / homepage := Some(url("https://github.com/hablapps/doric"))
Expand All @@ -13,14 +41,23 @@ ThisBuild / developers := List(
url("https://github.com/alfonsorr")
),
Developer(
"AlfonsoRR",
"eruizalo",
"Eduardo Ruiz",
"",
url("https://github.com/eruizalo")
)
)

Global / scalaVersion := "2.12.15"
val sparkVersion = settingKey[String]("Spark version")
Global / sparkVersion :=
System.getProperty(
"sparkVersion",
sparkShort(
System.getProperty("sparkShortVersion", sparkDefaultShortVersion)
)
)
Global / scalaVersion := scalaVersionSelect(sparkVersion.value)
Global / publish / skip := true
Global / publishArtifact := false

// scaladoc settings
Compile / doc / scalacOptions ++= Seq("-groups")
Expand All @@ -45,20 +82,32 @@ scmInfo := Some(

updateOptions := updateOptions.value.withLatestSnapshots(false)

val sparkVersion = "3.1.3"
val configSpark = Seq(
sparkVersion := System.getProperty(
"sparkVersion",
sparkShort(
System.getProperty("sparkShortVersion", sparkDefaultShortVersion)
)
)
)

lazy val core = project
.in(file("core"))
.settings(
name := "doric",
run / fork := true,
configSpark,
name := "doric_" + sparkLong2ShortVersion(sparkVersion.value),
run / fork := true,
publish / skip := false,
publishArtifact := true,
scalaVersion := scalaVersionSelect(sparkVersion.value),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.typelevel" %% "cats-core" % "2.7.0",
"com.lihaoyi" %% "sourcecode" % "0.2.8",
"io.monix" %% "newtypes-core" % "0.2.1",
"com.github.mrpowers" %% "spark-daria" % "1.2.3" % "test",
"com.github.mrpowers" %% "spark-fast-tests" % "1.2.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.11" % "test"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.typelevel" %% "cats-core" % "2.7.0",
"com.lihaoyi" %% "sourcecode" % "0.2.8",
"io.monix" %% "newtypes-core" % "0.2.1",
"com.github.mrpowers" %% "spark-daria" % "1.2.3" % "test",
"com.github.mrpowers" %% "spark-fast-tests" % "1.2.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.11" % "test"
),
// docs
run / fork := true,
Expand All @@ -68,23 +117,63 @@ lazy val core = project
"-implicits",
"-skip-packages",
"org.apache.spark"
)
),
Compile / unmanagedSourceDirectories ++= {
sparkVersion.value match {
case versionRegex("3", "0", _) =>
Seq(
(Compile / sourceDirectory)(_ / "spark_3.0_mount" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.0_3.1" / "scala")
).join.value
case versionRegex("3", "1", _) =>
Seq(
(Compile / sourceDirectory)(_ / "spark_3.0_3.1" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.1" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.1_mount" / "scala")
).join.value
case versionRegex("3", "2", _) =>
Seq(
(Compile / sourceDirectory)(_ / "spark_3.1" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.2" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.2_mount" / "scala")
).join.value
}
},
Test / unmanagedSourceDirectories ++= {
sparkVersion.value match {
case versionRegex("3", "0", _) =>
Seq.empty[Def.Initialize[File]].join.value
case versionRegex("3", "1", _) =>
Seq(
(Test / sourceDirectory)(_ / "spark_3.1" / "scala")
).join.value
case versionRegex("3", "2", _) =>
Seq(
(Test / sourceDirectory)(_ / "spark_3.1" / "scala"),
(Test / sourceDirectory)(_ / "spark_3.2" / "scala")
).join.value
}
}
)

lazy val docs = project
.in(file("docs"))
.dependsOn(core)
.settings(
run / fork := true,
configSpark,
run / fork := true,
publish / skip := true,
publishArtifact := false,
run / javaOptions += "-XX:MaxJavaStackTraceDepth=10",
mdocIn := baseDirectory.value / "docs",
scalaVersion := scalaVersionSelect(sparkVersion.value),
mdocIn := baseDirectory.value / "docs",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion
"org.apache.spark" %% "spark-sql" % sparkVersion.value
),
mdocVariables := Map(
"VERSION" -> version.value,
"STABLE_VERSION" -> "0.0.2",
"SPARK_VERSION" -> sparkVersion
"SPARK_VERSION" -> sparkVersion.value
),
mdocExtraArguments := Seq(
"--clean-target"
Expand Down
76 changes: 9 additions & 67 deletions core/src/main/scala/doric/syntax/AggregationColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package doric
package syntax

import cats.implicits.{catsSyntaxTuple2Semigroupal, toTraverseOps}
import doric.types.{DoubleC, NumericType}
import org.apache.spark.sql.{functions => f}
import doric.types.NumericType

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum

private[syntax] trait AggregationColumns {

Expand Down Expand Up @@ -252,70 +254,6 @@ private[syntax] trait AggregationColumns {
def mean[T: NumericType](col: DoricColumn[T]): DoubleColumn =
col.elem.map(f.mean).toDC

/**
* Aggregate function: returns the approximate `percentile` of the numeric column `col` which
* is the smallest value in the ordered `col` values (sorted from least to greatest) such that
* no more than `percentage` of `col` values is less than the value or equal to that value.
*
* @param percentage each value must be between 0.0 and 1.0.
* @param accuracy controls approximation accuracy at the cost of memory. Higher value of accuracy
* yields better accuracy, 1.0/accuracy is the relative error of the approximation.
* @note Support NumericType, DateType and TimestampType since their internal types are all numeric,
* and can be easily cast to double for processing.
* @group Aggregation DoubleC Type
* @see [[org.apache.spark.sql.functions.percentile_approx]]
*/
def percentileApprox[T: DoubleC](
col: DoricColumn[T],
percentage: Array[Double],
accuracy: Int
): ArrayColumn[T] = {
require(
percentage.forall(x => x >= 0.0 && x <= 1.0),
"Each value of percentage must be between 0.0 and 1.0."
)
require(
accuracy >= 0 && accuracy < Int.MaxValue,
s"The accuracy provided must be a literal between (0, ${Int.MaxValue}]" +
s" (current value = $accuracy)"
)
col.elem
.map(f.percentile_approx(_, f.lit(percentage), f.lit(accuracy)))
.toDC
}

/**
* Aggregate function: returns the approximate `percentile` of the numeric column `col` which
* is the smallest value in the ordered `col` values (sorted from least to greatest) such that
* no more than `percentage` of `col` values is less than the value or equal to that value.
*
* @param percentage must be between 0.0 and 1.0.
* @param accuracy controls approximation accuracy at the cost of memory. Higher value of accuracy
* yields better accuracy, 1.0/accuracy is the relative error of the approximation.
* @note Support NumericType, DateType and TimestampType since their internal types are all numeric,
* and can be easily cast to double for processing.
* @group Aggregation DoubleC Type
* @see [[org.apache.spark.sql.functions.percentile_approx]]
*/
def percentileApprox[T: DoubleC](
col: DoricColumn[T],
percentage: Double,
accuracy: Int
): DoricColumn[T] = {
require(
percentage >= 0.0 && percentage <= 1.0,
"Percentage must be between 0.0 and 1.0."
)
require(
accuracy >= 0 && accuracy < Int.MaxValue,
s"The accuracy provided must be a literal between (0, ${Int.MaxValue}]" +
s" (current value = $accuracy)"
)
col.elem
.map(f.percentile_approx(_, f.lit(percentage), f.lit(accuracy)))
.toDC
}

/**
* Aggregate function: returns the skewness of the values in a group.
*
Expand Down Expand Up @@ -361,7 +299,11 @@ private[syntax] trait AggregationColumns {
def sumDistinct[T](col: DoricColumn[T])(implicit
nt: NumericType[T]
): DoricColumn[nt.Sum] =
col.elem.map(f.sumDistinct).toDC
col.elem
.map(e =>
new Column(Sum(e.expr).toAggregateExpression(isDistinct = true))
)
.toDC

/**
* Aggregate function: alias for `var_samp`.
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/doric/syntax/ArrayColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package syntax

import cats.implicits._
import doric.types.CollectionType
import org.apache.spark.sql.{Column, Row, functions => f}

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.LambdaFunction.identity

Expand Down Expand Up @@ -469,12 +470,14 @@ private[syntax] trait ArrayColumns {
* end if `start` is negative) with the specified `length`.
*
* @note
* if `start` == 0 an exception will be thrown
* if `start` == 0 an exception will be thrown
* @group Array Type
* @see [[org.apache.spark.sql.functions.slice(x:org\.apache\.spark\.sql\.Column,start:org\.apache\.spark\.sql\.Column,length* org.apache.spark.sql.functions.slice]]
*/
def slice(start: IntegerColumn, length: IntegerColumn): ArrayColumn[T] =
(col.elem, start.elem, length.elem).mapN(f.slice).toDC
(col.elem, start.elem, length.elem)
.mapN((a, b, c) => new Column(Slice(a.expr, b.expr, c.expr)))
.toDC

/**
* Merge two given arrays, element-wise, into a single array using a function.
Expand Down
21 changes: 3 additions & 18 deletions core/src/main/scala/doric/syntax/BinaryColumns.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package doric
package syntax

import cats.implicits.{catsSyntaxTuple2Semigroupal, toTraverseOps}
import cats.implicits.toTraverseOps
import doric.types.{BinaryType, SparkType}
import org.apache.spark.sql.catalyst.expressions.Decode
import org.apache.spark.sql.{Column, functions => f}

import org.apache.spark.sql.{functions => f}

private[syntax] trait BinaryColumns {

Expand Down Expand Up @@ -76,21 +76,6 @@ private[syntax] trait BinaryColumns {
* @see [[org.apache.spark.sql.functions.base64]]
*/
def base64: StringColumn = column.elem.map(f.base64).toDC

/**
* Computes the first argument into a string from a binary using the provided character set
* (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
* If either argument is null, the result will also be null.
*
* @group Binary Type
* @see [[org.apache.spark.sql.functions.decode]]
*/
def decode(charset: StringColumn): StringColumn =
(column.elem, charset.elem)
.mapN((col, char) => {
new Column(Decode(col.expr, char.expr))
})
.toDC
}

}
21 changes: 1 addition & 20 deletions core/src/main/scala/doric/syntax/BooleanColumns.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package doric
package syntax

import cats.implicits._
import doric.DoricColumn.sparkFunction

import org.apache.spark.sql.{functions => f}

private[syntax] trait BooleanColumns {
Expand Down Expand Up @@ -61,24 +61,5 @@ private[syntax] trait BooleanColumns {
*/
def ||(other: DoricColumn[Boolean]): DoricColumn[Boolean] =
or(other)

/**
* Returns null if the condition is true, and throws an exception otherwise.
*
* @throws java.lang.RuntimeException if the condition is false
* @group Boolean Type
* @see [[org.apache.spark.sql.functions.assert_true(c:org\.apache\.spark\.sql\.Column):* org.apache.spark.sql.functions.assert_true]]
*/
def assertTrue: NullColumn = column.elem.map(f.assert_true).toDC

/**
* Returns null if the condition is true; throws an exception with the error message otherwise.
*
* @throws java.lang.RuntimeException if the condition is false
* @group Boolean Type
* @see [[org.apache.spark.sql.functions.assert_true(c:org\.apache\.spark\.sql\.Column,e:* org.apache.spark.sql.functions.assert_true]]
*/
def assertTrue(msg: StringColumn): NullColumn =
(column.elem, msg.elem).mapN(f.assert_true).toDC
}
}
Loading

0 comments on commit 41f1766

Please sign in to comment.