Skip to content

Commit

Permalink
Support for product types (#251)
Browse files Browse the repository at this point in the history
  • Loading branch information
jserranohidalgo authored Jul 20, 2022
1 parent 888b56a commit 7ab6876
Show file tree
Hide file tree
Showing 42 changed files with 1,462 additions and 238 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ lazy val core = project
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", // scala-steward:off
"org.typelevel" %% "cats-core" % catsVersion(sparkVersion.value),
"com.lihaoyi" %% "sourcecode" % "0.3.0",
"com.chuusai" %% "shapeless" % "2.3.9",
"com.github.mrpowers" %% "spark-fast-tests" % "1.3.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.12" % "test"
),
Expand Down
18 changes: 7 additions & 11 deletions core/src/main/scala/doric/DoricColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,13 @@ case class LiteralDoricColumn[T] private[doric] (
) extends DoricColumn[T]

object LiteralDoricColumn {
def apply[T: SparkType: LiteralSparkType](value: T): LiteralDoricColumn[T] = {
val colLit: Doric[Column] = new Column(
Literal(
CatalystTypeConverters.createToCatalystConverter(SparkType[T].dataType)(
LiteralSparkType[T].literalTo(value)
),
SparkType[T].dataType
)
).pure[Doric]
LiteralDoricColumn(colLit, value)
}
def apply[T: SparkType: LiteralSparkType](
value: T
)(implicit l: Location): LiteralDoricColumn[T] =
LiteralDoricColumn(
Kleisli { _ => LiteralSparkType[T].literal(value) },
value
)

implicit class LiteralGetter[T](litCol: LiteralDoricColumn[T]) {
def getColumnValueAsSparkValue(implicit
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/doric/sem/Errors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ case class SparkErrorWrapper(sparkCause: Throwable)(implicit
}
}

case class GenDoricError(
val message: String
)(implicit
val location: Location
) extends DoricSingleError(None)

object Location {
implicit def location(implicit
line: sourcecode.Line,
Expand Down
55 changes: 52 additions & 3 deletions core/src/main/scala/doric/syntax/DStructs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import cats.evidence.Is
import cats.implicits._
import doric.sem.{ColumnTypeError, Location, SparkErrorWrapper}
import doric.types.SparkType

import org.apache.spark.sql.{Column, Dataset, Row}
import org.apache.spark.sql.catalyst.expressions.ExtractValue
import org.apache.spark.sql.functions.{struct => sparkStruct}
import shapeless.labelled.FieldType
import shapeless.{::, HList, LabelledGeneric, Witness}

private[syntax] trait DStructs {

Expand All @@ -26,7 +27,9 @@ private[syntax] trait DStructs {
def struct(cols: DoricColumn[_]*): RowColumn =
cols.map(_.elem).toList.sequence.map(c => sparkStruct(c: _*)).toDC

implicit class DStructOps(private val col: RowColumn) {
implicit class DStructOps[T](private val col: DoricColumn[T])(implicit
st: SparkType.Custom[T, Row]
) {

/**
* Retreaves the child row of the Struct column
Expand Down Expand Up @@ -83,15 +86,22 @@ private[syntax] trait DStructs {
.toDC
}
}
trait DynamicFieldAccessor[T] extends Dynamic { self: DoricColumn[T] =>

trait DynamicFieldAccessor[T] extends Dynamic {
self: DoricColumn[T] =>

/**
* Allows for accessing fields of struct columns using the syntax `rowcol.name[T]`.
* This expression stands for `rowcol.getChild[T](name)`.
*
* @param name
* @param location
* @param st
* @tparam A
* @return The column which refers to the given field
* @throws doric.sem.ColumnTypeError if the parent column is not a struct
*/

def selectDynamic[A](name: String)(implicit
location: Location,
st: SparkType[A],
Expand All @@ -100,4 +110,43 @@ private[syntax] trait DStructs {
w.lift[DoricColumn].coerce(self).getChild[A](name)
}

@annotation.implicitNotFound(msg = "No field ${K} in record ${L}")
trait SelectorWithSparkType[L <: HList, K <: Symbol] {
type V
val st: SparkType[V]
}

object SelectorWithSparkType extends SelectorLPI {
type Aux[L <: HList, K <: Symbol, _V] = SelectorWithSparkType[L, K] {
type V = _V
}

implicit def Found[K <: Symbol, _V: SparkType, T <: HList] =
new SelectorWithSparkType[FieldType[K, _V] :: T, K] {
type V = _V
val st = SparkType[_V]
}
}

trait SelectorLPI {
implicit def KeepFinding[K1, V1, T <: HList, K <: Symbol](implicit
T: SelectorWithSparkType[T, K]
) =
new SelectorWithSparkType[FieldType[K1, V1] :: T, K] {
type V = T.V
val st = T.st
}
}

implicit class StructOps[T, L <: HList](dc: DoricColumn[T])(implicit
lg: LabelledGeneric.Aux[T, L],
st: SparkType.Custom[T, Row]
) {
def getChildSafe[K <: Symbol](k: Witness.Aux[K])(implicit
S: SelectorWithSparkType[L, K],
location: Location
): DoricColumn[S.V] =
new DStructOps(dc).getChild[S.V](k.value.name)(S.st, location)
}

}
9 changes: 7 additions & 2 deletions core/src/main/scala/doric/syntax/LiteralConversions.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package doric
package syntax

import doric.sem.Location
import doric.types.{LiteralSparkType, SparkType}

private[syntax] trait LiteralConversions {
Expand All @@ -16,7 +17,9 @@ private[syntax] trait LiteralConversions {
* A doric column that represent the literal value and the same type as the
* value.
*/
def lit[L: SparkType: LiteralSparkType](litv: L): LiteralDoricColumn[L] = {
def lit[L: SparkType: LiteralSparkType](
litv: L
)(implicit l: Location): LiteralDoricColumn[L] = {
LiteralDoricColumn(litv)
}

Expand All @@ -29,7 +32,9 @@ private[syntax] trait LiteralConversions {
* a literal with the same type.
*/
@inline
def lit: LiteralDoricColumn[L] = LiteralDoricColumn(litv)
def lit(implicit l: Location): LiteralDoricColumn[L] = LiteralDoricColumn(
litv
)
}

}
Loading

0 comments on commit 7ab6876

Please sign in to comment.