Skip to content

Commit

Permalink
Update protocol when adding timestamp column in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 4, 2024
1 parent 59db1f7 commit 66da857
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1428,14 +1428,33 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
ADD_COLUMN_OPERATION,
session,
Optional.ofNullable(handle.getMetadataEntry().getDescription()),
protocolEntry);
buildProtocolEntryForNewColumn(protocolEntry, newColumnMetadata.getType()));
transactionLogWriter.flush();
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to add '%s' column for: %s.%s", newColumnMetadata.getName(), handle.getSchemaName(), handle.getTableName()), e);
}
}

private ProtocolEntry buildProtocolEntryForNewColumn(ProtocolEntry protocolEntry, Type type)
{
if (!containsTimestampType(type)) {
return protocolEntry;
}

return new ProtocolEntry(
max(protocolEntry.getMinReaderVersion(), TIMESTAMP_NTZ_SUPPORTED_READER_VERSION),
max(protocolEntry.getMinWriterVersion(), TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION),
Optional.of(ImmutableSet.<String>builder()
.addAll(protocolEntry.getReaderFeatures().orElse(ImmutableSet.of()))
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.build()),
Optional.of(ImmutableSet.<String>builder()
.addAll(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of()))
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.build()));
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,37 @@ private void testTimestampNtzPartitioned(ZoneId sessionZone)
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testAddTimestampNtzColumn()
throws Exception
{
String tableName = "test_add_timestamp_ntz_column" + randomNameSuffix();

assertUpdate("CREATE TABLE " + tableName + "(id INT)");
assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN ts timestamp(6)");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, TIMESTAMP '2023-01-02 03:04:05.123456')", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES (1, TIMESTAMP '2023-01-02 03:04:05.123456')");

String tableLocation = getTableLocation(tableName);
List<DeltaLakeTransactionLogEntry> transactionLogsByCreateTable = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow();
ProtocolEntry protocolEntryByCreateTable = transactionLogsByCreateTable.get(1).getProtocol();
assertThat(protocolEntryByCreateTable).isNotNull();
assertThat(protocolEntryByCreateTable.getMinReaderVersion()).isEqualTo(1);
assertThat(protocolEntryByCreateTable.getMinWriterVersion()).isEqualTo(2);
assertThat(protocolEntryByCreateTable.getReaderFeatures()).isEmpty();
assertThat(protocolEntryByCreateTable.getWriterFeatures()).isEmpty();

List<DeltaLakeTransactionLogEntry> transactionLogsByAddColumn = getEntriesFromJson(1, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow();
ProtocolEntry protocolEntryByAddColumn = transactionLogsByAddColumn.get(1).getProtocol();
assertThat(protocolEntryByAddColumn).isNotNull();
assertThat(protocolEntryByAddColumn.getMinReaderVersion()).isEqualTo(3);
assertThat(protocolEntryByAddColumn.getMinWriterVersion()).isEqualTo(7);
assertThat(protocolEntryByAddColumn.getReaderFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz")));
assertThat(protocolEntryByAddColumn.getWriterFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz")));

assertUpdate("DROP TABLE " + tableName);
}

/**
* @see databricks122.identity_columns
*/
Expand Down

0 comments on commit 66da857

Please sign in to comment.