Skip to content

Commit

Permalink
feat: Date time functions #65
Browse files Browse the repository at this point in the history
  • Loading branch information
eruizalo committed Nov 16, 2021
1 parent 7e0482a commit e28b3a7
Show file tree
Hide file tree
Showing 9 changed files with 381 additions and 60 deletions.
83 changes: 60 additions & 23 deletions core/src/main/scala/doric/syntax/DateColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package syntax

import cats.implicits._
import doric.types.{DateType, SparkType}
import org.apache.spark.sql.catalyst.expressions.{DateFormatClass, MonthsBetween, NextDay, TruncDate}
import org.apache.spark.sql.catalyst.expressions.{DateFormatClass, MonthsBetween, NextDay, TruncDate, TruncTimestamp, UnixTimestamp}
import org.apache.spark.sql.{Column, functions => f}

import java.sql.Date
Expand All @@ -30,17 +30,20 @@ private[syntax] trait DateColumns {
* the number of months to add, can be negative to subtract.
* @return
* Date column after adding months
* @note Timestamp columns will be truncated to Date column
* @note
* Timestamp columns will be truncated to Date column
*/
def addMonths(nMonths: IntegerColumn): DateColumn =
(column.elem, nMonths.elem).mapN(f.add_months).toDC

/**
* Returns the date that is `days` days after date column
*
* @param days A column of the number of days to add to date column, can be negative to subtract days
* @param days
* A column of the number of days to add to date column, can be negative to subtract days
* @note
* Timestamp columns will be truncated to Date column
* @group Date & Timestamp Type
* @note Timestamp columns will be truncated to Date column
*/
def addDays(days: IntegerColumn): DateColumn =
(column.elem, days.elem).mapN(f.date_add).toDC
Expand All @@ -49,9 +52,11 @@ private[syntax] trait DateColumns {
* Converts a date to a value of string in the format specified by the date
* format given by the second argument.
*
* @param format A pattern `dd.MM.yyyy` would return a string like `18.03.1993`
* @note Use specialized functions like 'year' whenever possible as they benefit from a
* specialized implementation.
* @param format
* A pattern `dd.MM.yyyy` would return a string like `18.03.1993`
* @note
* Use specialized functions like 'year' whenever possible as they benefit from a
* specialized implementation.
* @group Date & Timestamp Type
*/
def format(format: StringColumn): StringColumn =
Expand All @@ -64,18 +69,20 @@ private[syntax] trait DateColumns {
/**
* Returns the date that is `days` days before date column
*
* @param days A column of the number of days to subtract from date column, can be negative to add
* days
* @param days
* A column of the number of days to subtract from date column, can be negative to add days
* @note
* Timestamp columns will be truncated to Date column
* @group Date & Timestamp Type
* @note Timestamp columns will be truncated to Date column
*/
def subDays(days: IntegerColumn): DateColumn =
(column.elem, days.elem).mapN(f.date_sub).toDC

/**
* Returns the number of days from date column to `dateCol`.
*
* @param dateCol A Date or Timestamp column
* @param dateCol
* A Date or Timestamp column
* @group Date & Timestamp Type
*/
def diff(dateCol: DoricColumn[T]): IntegerColumn =
Expand Down Expand Up @@ -141,7 +148,8 @@ private[syntax] trait DateColumns {
* Timestamp("2017-06-01 00:00:00").monthsBetween(Timestamp("2017-06-16 12:00:00")) // returns -0.5
* }}}
*
* @param dateCol Date or Timestamp column
* @param dateCol
* Date or Timestamp column
* @group Date & Timestamp Type
*/
def monthsBetween(dateCol: DoricColumn[T]): DoubleColumn =
Expand All @@ -150,9 +158,11 @@ private[syntax] trait DateColumns {
/**
* Returns number of months between dates `dateCol` and date column.
*
* @param dateCol Date or Timestamp column
* @param roundOff If `roundOff` is set to true, the result is rounded off to 8 digits;
* it is not rounded otherwise.
* @param dateCol
* Date or Timestamp column
* @param roundOff
* If `roundOff` is set to true, the result is rounded off to 8 digits;
* it is not rounded otherwise.
* @group Date & Timestamp Type
*/
def monthsBetween(
Expand All @@ -172,9 +182,11 @@ private[syntax] trait DateColumns {
* For example, `Date("2015-07-27").nextDay("Sunday")` returns Date("2015-08-02") because
* that is the first Sunday after 2015-07-27.
*
* @param dayOfWeek Case insensitive, and accepts: "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"
* @param dayOfWeek
* Case insensitive, and accepts: "Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"
* @note
* Timestamp columns will be truncated to Date column
* @group Date & Timestamp Type
* @note Timestamp columns will be truncated to Date column
*/
def nextDay(dayOfWeek: StringColumn): DateColumn =
(column.elem, dayOfWeek.elem)
Expand All @@ -195,19 +207,44 @@ private[syntax] trait DateColumns {
*
* For example, `Timestamp("2018-11-19 12:01:19").trunc("year")` returns Date("2018-01-01")
*
* @param format 'year', 'yyyy', 'yy' to truncate by year,
* or 'month', 'mon', 'mm' to truncate by month
* Other options are: 'week', 'quarter'
* @param format
* if date:
* * 'year', 'yyyy', 'yy' to truncate by year,
* * 'month', 'mon', 'mm' to truncate by month
* Other options are: 'week', 'quarter'
* if timestamp:
* * 'year', 'yyyy', 'yy' to truncate by year,
* * 'month', 'mon', 'mm' to truncate by month,
* * 'day', 'dd' to truncate by day,
* Other options are:
* * 'microsecond', 'millisecond', 'second', 'minute', 'hour', 'week', 'quarter'
* @note
* Timestamp columns will be truncated to Date column
* @group Date & Timestamp Type
* @note Timestamp columns will be truncated to Date column
*/
def trunc(format: StringColumn): DateColumn =
def truncate(format: StringColumn): DoricColumn[T] =
(column.elem, format.elem)
.mapN((c, fmt) => {
new Column(TruncDate(c.expr, fmt.expr))
new Column(SparkType[T].dataType match {
case org.apache.spark.sql.types.DateType =>
TruncDate(c.expr, fmt.expr)
case org.apache.spark.sql.types.TimestampType =>
TruncTimestamp(fmt.expr, c.expr)
})
})
.toDC

/**
* Converts date/timestamp to Unix timestamp (in seconds),
* using the default timezone and the default locale.
*
* @return
* A long
*
* @group Date & Timestamp Type
*/
def unixTimestamp: LongColumn = column.elem.map(f.unix_timestamp).toDC

/**
* Extracts the week number as an integer from a given date.
*
Expand Down
47 changes: 46 additions & 1 deletion core/src/main/scala/doric/syntax/NumericColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ import cats.implicits.catsSyntaxTuple2Semigroupal
import doric.DoricColumn.sparkFunction
import doric.types.NumericType
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.FormatNumber
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.catalyst.expressions.{FormatNumber, FromUnixTime}

private[syntax] trait NumericColumns {

def unixTimestamp(): LongColumn = {
DoricColumn(f.unix_timestamp())
}

implicit class NumericOperationsSyntax[T: NumericType](
column: DoricColumn[T]
) {
Expand Down Expand Up @@ -83,6 +88,46 @@ private[syntax] trait NumericColumns {
})
.toDC

/**
* Creates timestamp from the number of seconds since UTC epoch.
*
* @group Numeric Type
*/
def timestampSeconds: TimestampColumn =
column.elem.map(f.timestamp_seconds).toDC

}

implicit class LongOperationsSyntax(
column: LongColumn
) {

/**
* Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string
* representing the timestamp of that moment in the current system time zone in the
* yyyy-MM-dd HH:mm:ss format.
*
* @group Numeric Type
*/
def fromUnixTime: StringColumn = column.elem.map(f.from_unixtime).toDC

/**
* Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string
* representing the timestamp of that moment in the current system time zone in the given
* format.
*
* @note
* An IllegalArgumentException will be thrown if invalid pattern
*
* @group Numeric Type
*/
def fromUnixTime(format: StringColumn): StringColumn =
(column.elem, format.elem)
.mapN((c, f) => {
new Column(FromUnixTime(c.expr, f.expr))
})
.toDC

}

}
26 changes: 26 additions & 0 deletions core/src/main/scala/doric/syntax/StringColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,32 @@ private[syntax] trait StringColumns {
*/
def unbase64: BinaryColumn = s.elem.map(f.unbase64).toDC

/**
* Converts date/timestamp to Unix timestamp (in seconds),
* using the default timezone and the default locale.
*
* @return
* A long
*
* @group String Type
*/
def unixTimestamp: LongColumn = s.elem.map(f.unix_timestamp).toDC

/**
* Converts date/timestamp with given pattern to Unix timestamp (in seconds).
*
* @return
* A long, or null if the input was a string not of the correct format
*
* @group String Type
*/
def unixTimestamp(pattern: StringColumn): LongColumn =
(s.elem, pattern.elem)
.mapN((c, p) => {
new Column(UnixTimestamp(c.expr, p.expr))
})
.toDC

/**
* ********************************************************
* DORIC FUNCTIONS
Expand Down
69 changes: 53 additions & 16 deletions core/src/main/scala/doric/syntax/TimestampColumns.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package doric
package syntax

import cats.implicits.catsSyntaxTuple2Semigroupal
import doric.types.TimestampType
import org.apache.spark.sql.{functions => f}
import org.apache.spark.sql.catalyst.expressions.{FromUTCTimestamp, ToUTCTimestamp}
import org.apache.spark.sql.{Column, functions => f}

import java.sql.Timestamp

Expand All @@ -21,6 +23,34 @@ private[syntax] trait TimestampColumns {
column: DoricColumn[T]
) {

/**
* Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders
* that time as a timestamp in the given time zone. For example, 'GMT+1' would yield
* '2017-07-14 03:40:00.0'.
*
* @group Timestamp Type
*/
def fromUtc(timeZone: StringColumn): TimestampColumn =
(column.elem, timeZone.elem)
.mapN((c, tz) => {
new Column(FromUTCTimestamp(c.expr, tz.expr))
})
.toDC

/**
* Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in the given time
* zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' would yield
* '2017-07-14 01:40:00.0'.
*
* @group Timestamp Type
*/
def toUtc(timeZone: StringColumn): TimestampColumn =
(column.elem, timeZone.elem)
.mapN((c, tz) => {
new Column(ToUTCTimestamp(c.expr, tz.expr))
})
.toDC

/**
* Extracts the seconds as an integer from a given timestamp.
*
Expand All @@ -44,21 +74,24 @@ private[syntax] trait TimestampColumns {
* Generates tumbling time windows given a timestamp specifying column. Window
* starts are inclusive but the window ends are exclusive.
*
* @param windowDuration A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check `org.apache.spark.unsafe.types.CalendarInterval` for
* valid duration identifiers. Note that the duration is a fixed length of
* time, and does not vary over time according to a calendar. For example,
* `1 day` always means 86,400,000 milliseconds, not a calendar day.
* @param slideDuration A string specifying the sliding interval of the window, e.g. `1 minute`.
* A new window will be generated every `slideDuration`. Must be less than
* or equal to the `windowDuration`. Check
* `org.apache.spark.unsafe.types.CalendarInterval` for valid duration
* identifiers. This duration is likewise absolute, and does not vary
* according to a calendar.
* @param startTime The offset with respect to 1970-01-01 00:00:00 UTC with which to start
* window intervals. For example, in order to have hourly tumbling windows that
* start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide
* `startTime` as `15 minutes`.
* @param windowDuration
* A string specifying the width of the window, e.g. `10 minutes`,
* `1 second`. Check `org.apache.spark.unsafe.types.CalendarInterval` for
* valid duration identifiers. Note that the duration is a fixed length of
* time, and does not vary over time according to a calendar. For example,
* `1 day` always means 86,400,000 milliseconds, not a calendar day.
* @param slideDuration
* A string specifying the sliding interval of the window, e.g. `1 minute`.
* A new window will be generated every `slideDuration`. Must be less than
* or equal to the `windowDuration`. Check
* `org.apache.spark.unsafe.types.CalendarInterval` for valid duration
* identifiers. This duration is likewise absolute, and does not vary
* according to a calendar.
* @param startTime
* The offset with respect to 1970-01-01 00:00:00 UTC with which to start
* window intervals. For example, in order to have hourly tumbling windows that
* start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide
* `startTime` as `15 minutes`.
* @group Timestamp Type
*/
def window(
Expand All @@ -71,13 +104,17 @@ private[syntax] trait TimestampColumns {
.toDC

/**
* Safe casting to Date column
*
* @group Timestamp Type
* @return
* a Date Column without the hour
*/
def toDate: DateColumn = column.elem.map(f.to_date).toDC

/**
* Safe casting to LocalDate column
*
* @group Timestamp Type
* @return
* a LocalDate Column without the hour
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/doric/syntax/WhenBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final private[doric] case class WhenBuilder[T](
/**
* ads a case that if the condition is matched, the value is returned
* @param cond
* BooleanColumn with the condition to satify
* BooleanColumn with the condition to satisfy
* @param elem
* the returned element if the condition is true
* @return
Expand Down
Loading

0 comments on commit e28b3a7

Please sign in to comment.