Skip to content

Commit

Permalink
[Kernel][Writes] Write timestamp as INT64 type to Parquet data fi…
Browse files Browse the repository at this point in the history
…les (#3084)

## Description
Write the `timestamp` as `INT64` physical format in Parquet. Currently,
it is written as `INT96` which is a very old method of writing timestamp
and deprecated a long time ago. Also, collect statistics, for
`timestamp` type columns.

## How was this patch tested?
Update the existing tests.
  • Loading branch information
vkorukanti committed May 10, 2024
1 parent a5d7c69 commit f6ebe24
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.delta.kernel.defaults.internal.parquet;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import static java.util.Objects.requireNonNull;
Expand All @@ -28,10 +26,8 @@
import io.delta.kernel.data.*;
import io.delta.kernel.types.*;

import io.delta.kernel.internal.util.Tuple2;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.defaults.internal.DefaultKernelUtils;
import static io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.MAX_BYTES_PER_PRECISION;

/**
Expand Down Expand Up @@ -134,10 +130,9 @@ private static ColumnWriter createColumnWriter(
return new DecimalFixedBinaryWriter(colName, fieldIndex, columnVector);
} else if (dataType instanceof DateType) {
return new DateWriter(colName, fieldIndex, columnVector);
} else if (dataType instanceof TimestampType) {
} else if (dataType instanceof TimestampType || dataType instanceof TimestampNTZType) {
// for both get the input as long type from column vector and write to file as INT64
return new TimestampWriter(colName, fieldIndex, columnVector);
} else if (dataType instanceof TimestampNTZType) {
return new TimestampNTZWriter(colName, fieldIndex, columnVector);
} else if (dataType instanceof ArrayType) {
return new ArrayWriter(colName, fieldIndex, columnVector);
} else if (dataType instanceof MapType) {
Expand Down Expand Up @@ -338,38 +333,15 @@ void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
}
}

static class TimestampWriter extends ColumnWriter {
// Reuse this buffer to avoid allocating a new buffer for each row
private final byte[] reusedBuffer = new byte[12];

/**
* Writer for both timestamp and timestamp with time zone.
*/
static class TimestampWriter extends ColumnWriter {
TimestampWriter(String name, int fieldId, ColumnVector columnVector) {
super(name, fieldId, columnVector);
}

@Override
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
// TODO: Spark has various handling mode for DateType, need to check if it is needed
// for Delta Kernel.

// For now write as INT96 which is the most supported format for timestamps
// Later on, depending upon the config, we can write either as INT64 or INT96
long microsSinceEpochUTC = columnVector.getLong(rowId);
Tuple2<Integer, Long> julianDayRemainingNanos =
DefaultKernelUtils.toJulianDay(microsSinceEpochUTC);

ByteBuffer buffer = ByteBuffer.wrap(reusedBuffer);
buffer.order(ByteOrder.LITTLE_ENDIAN)
.putLong(julianDayRemainingNanos._2) // timeOfDayNanos
.putInt(julianDayRemainingNanos._1); // julianDay
recordConsumer.addBinary(Binary.fromReusedByteArray(reusedBuffer));
}
}

static class TimestampNTZWriter extends ColumnWriter {
TimestampNTZWriter(String name, int fieldId, ColumnVector columnVector) {
super(name, fieldId, columnVector);
}

@Override
void writeNonNullRowValue(RecordConsumer recordConsumer, int rowId) {
long microsSinceEpochUTC = columnVector.getLong(rowId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,14 @@ private static Type toParquetType(
} else if (dataType instanceof DateType) {
type = primitive(INT32, repetition).as(LogicalTypeAnnotation.dateType()).named(name);
} else if (dataType instanceof TimestampType) {
// We are supporting only the INT96 format now.
type = primitive(INT96, repetition).named(name);
// Kernel is by default going to write as INT64 with isAdjustedToUTC set to true
// Delta-Spark writes as INT96 for legacy reasons (maintaining compatibility with
// unknown consumers with very, very old versions of Parquet reader). Kernel is a new
// project, and we are ok if it breaks readers (we use this opportunity to find such
// readers and ask them to upgrade).
type = primitive(INT64, repetition)
.as(timestampType(true /* isAdjustedToUTC */, MICROS))
.named(name);
} else if (dataType instanceof TimestampNTZType) {
// Write as INT64 with isAdjustedToUTC set to false
type = primitive(INT64, repetition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ private static Literal decodeMinMaxStat(
statistics instanceof IntStatistics,
"Column with DATE type contained invalid statistics: %s", statistics);
return Literal.ofDate((Integer) statValue); // stats are stored as epoch days in Parquet
} else if (dataType instanceof TimestampType) {
// Kernel Parquet writer always writes timestamps in INT64 format
checkArgument(
statistics instanceof LongStatistics,
"Column with TIMESTAMP type contained invalid statistics: %s", statistics);
return Literal.ofTimestamp((Long) statValue);
} else if (dataType instanceof TimestampNTZType) {
checkArgument(
statistics instanceof LongStatistics,
Expand Down Expand Up @@ -235,11 +241,10 @@ private static boolean isStatsSupportedDataType(DataType dataType) {
dataType instanceof DoubleType ||
dataType instanceof DecimalType ||
dataType instanceof DateType ||
dataType instanceof TimestampType ||
dataType instanceof TimestampNTZType ||
dataType instanceof StringType ||
dataType instanceof BinaryType;
// TODO: timestamp is complicated to handle because of the storage format (INT96 or INT64).
// Add support later.
}

private static byte[] getBinaryStat(Statistics<?> statistics, boolean decodeMin) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ParquetFileWriterSuite extends AnyFunSuite

Seq(
// Test cases reading and writing all types of data with or without stats collection
Seq((200, 67), (1024, 17), (1048576, 1)).map {
Seq((200, 67), (1024, 16), (1048576, 1)).map {
case (targetFileSize, expParquetFileCount) =>
(
"write all types (no stats)", // test name
Expand Down Expand Up @@ -103,7 +103,7 @@ class ParquetFileWriterSuite extends AnyFunSuite
)
},
// Test cases reading and writing only a subset of data passing a predicate.
Seq((200, 26), (1024, 7), (1048576, 1)).map {
Seq((200, 26), (1024, 6), (1048576, 1)).map {
case (targetFileSize, expParquetFileCount) =>
(
"write filtered all types (no stats)", // test name
Expand All @@ -118,7 +118,7 @@ class ParquetFileWriterSuite extends AnyFunSuite
)
},
// Test cases reading and writing all types of data WITH stats collection
Seq((200, 67), (1024, 17), (1048576, 1)).map {
Seq((200, 67), (1024, 16), (1048576, 1)).map {
case (targetFileSize, expParquetFileCount) =>
(
"write all types (with stats for all leaf-level columns)", // test name
Expand All @@ -128,11 +128,11 @@ class ParquetFileWriterSuite extends AnyFunSuite
200, /* expected number of rows written to Parquet files */
Option.empty[Predicate], // predicate for filtering what rows to write to parquet files
leafLevelPrimitiveColumns(Seq.empty, tableSchema(goldenTablePath("parquet-all-types"))),
14 // how many columns have the stats collected from given list above
15 // how many columns have the stats collected from given list above
)
},
// Test cases reading and writing all types of data with a partial column set stats collection
Seq((200, 67), (1024, 17), (1048576, 1)).map {
Seq((200, 67), (1024, 16), (1048576, 1)).map {
case (targetFileSize, expParquetFileCount) =>
(
"write all types (with stats for a subset of leaf-level columns)", // test name
Expand All @@ -146,7 +146,6 @@ class ParquetFileWriterSuite extends AnyFunSuite
new Column("DateType"),
new Column(Array("nested_struct", "aa")),
new Column(Array("nested_struct", "ac", "aca")),
new Column("TimestampType"), // stats are not collected for timestamp type YET.
new Column(Array("nested_struct", "ac")), // stats are not collected for struct types
new Column("nested_struct"), // stats are not collected for struct types
new Column("array_of_prims"), // stats are not collected for array types
Expand Down Expand Up @@ -360,7 +359,6 @@ class ParquetFileWriterSuite extends AnyFunSuite
.flatMap { statColumn =>
val dataType = DefaultKernelUtils.getDataType(fileDataSchema, statColumn)
dataType match {
case _: TimestampType => nullStats // not yet supported
case _: StructType => nullStats // no concept of stats for struct types
case _: ArrayType => nullStats // no concept of stats for array types
case _: MapType => nullStats // no concept of stats for map types
Expand Down

0 comments on commit f6ebe24

Please sign in to comment.