Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 28, 2024
1 parent f43e92d commit 50fee62
Show file tree
Hide file tree
Showing 12 changed files with 511 additions and 16 deletions.
18 changes: 17 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/tracing_internal.h"
Expand All @@ -42,6 +43,7 @@
#include "parquet/arrow/writer.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/file_reader.h"
#include "parquet/properties.h"
Expand Down Expand Up @@ -1080,9 +1082,23 @@ Result<std::shared_ptr<DatasetFactory>> ParquetDatasetFactory::Make(

std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids;
std::unordered_map<std::string, int> paths_to_index;
const bool metadata_only_file = metadata_source.path() == "_metadata" && metadata->key_value_metadata();

for (int i = 0; i < metadata->num_row_groups(); i++) {
auto row_group = metadata->RowGroup(i);
std::unique_ptr<parquet::RowGroupMetaData> row_group;

if (metadata_only_file) {
auto kvmd = metadata->key_value_metadata();
PARQUET_ASSIGN_OR_THROW(auto aad, kvmd->Get("row_group_aad_" + std::to_string(i)));
auto fd = metadata->file_decryptor();
auto file_decryptor = std::make_shared<parquet::InternalFileDecryptor>(
fd->properties(), aad, fd->algorithm(),
fd->footer_key_metadata(), fd->pool());
metadata->set_file_decryptor(file_decryptor);
row_group = metadata->RowGroup(i);
} else {
row_group = metadata->RowGroup(i);
}
ARROW_ASSIGN_OR_RAISE(auto path,
FileFromRowGroup(filesystem.get(), base_path, *row_group,
options.validate_column_chunk_paths));
Expand Down
232 changes: 231 additions & 1 deletion cpp/src/arrow/dataset/file_parquet_encryption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/key_value_metadata.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/kms_client.h"
#include "parquet/encryption/test_in_memory_kms.h"
#include "parquet/file_writer.h"

constexpr std::string_view kFooterKeyMasterKey = "0123456789012345";
constexpr std::string_view kFooterKeyMasterKeyId = "footer_key";
Expand All @@ -51,6 +53,62 @@ using arrow::internal::checked_pointer_cast;
namespace arrow {
namespace dataset {

class DatasetTestBase : public ::testing::Test {
public:
void SetUp() override {
// Creates a mock file system using the current time point.
EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make(
std::chrono::system_clock::now(), {}));
ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir)));

// Init dataset and partitioning.
ASSERT_NO_FATAL_FAILURE(PrepareTableAndPartitioning());

auto file_format = std::make_shared<ParquetFileFormat>();
auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());

// Write dataset.
auto dataset = std::make_shared<InMemoryDataset>(table_);
EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());

FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = parquet_file_write_options;
write_options.filesystem = file_system_;
write_options.base_dir = kBaseDir;
write_options.partitioning = partitioning_;
write_options.basename_template = "part{i}.parquet";
ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner)));
}

void PrepareTableAndPartitioning() {
// Prepare table data.
auto table_schema = schema({field("a", int64()), field("c", int64()),
field("e", int64()), field("part", utf8())});
table_ = TableFromJSON(table_schema, {R"([
[ 0, 9, 1, "a" ],
[ 1, 8, 2, "a" ],
[ 2, 7, 1, "a" ],
[ 3, 6, 2, "a" ],
[ 4, 5, 1, "a" ],
[ 5, 4, 2, "a" ],
[ 6, 3, 1, "b" ],
[ 7, 2, 2, "b" ],
[ 8, 1, 1, "b" ],
[ 9, 0, 2, "b" ]
])"});

// Use a Hive-style partitioning scheme.
partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())}));
}

protected:
std::shared_ptr<fs::FileSystem> file_system_;
std::shared_ptr<Table> table_;
std::shared_ptr<Partitioning> partitioning_;
};

// Base class to test writing and reading encrypted dataset.
class DatasetEncryptionTestBase : public ::testing::Test {
public:
Expand Down Expand Up @@ -240,6 +298,178 @@ TEST_F(DatasetEncryptionTest, ReadSingleFile) {
ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1);
}

Result<std::vector<std::shared_ptr<parquet::FileMetaData>>> ReadMetadata(
const std::vector<std::string>& paths, std::shared_ptr<fs::FileSystem>& file_system,
const parquet::ReaderProperties& reader_properties) {
if (paths.empty()) {
return Status::Invalid("No files to read metadata from");
}

std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

for (const auto& path : paths) {
PARQUET_ASSIGN_OR_THROW(auto input, file_system->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
return metadata;
}

// Write encrypted _metadata file and read the dataset using the metadata.
TEST_F(DatasetEncryptionTest, ReadDatasetFromEncryptedMetadata) {
// Create decryption properties.
auto decryption_config =
std::make_shared<parquet::encryption::DecryptionConfiguration>();
auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

// Set scan options.
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

auto file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

auto reader_properties = parquet::default_reader_properties();
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
reader_properties.file_decryption_properties(
crypto_factory_->GetFileDecryptionProperties(*kms_connection_config_,
*decryption_config));
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string(kFooterKeyName));
encryption_config->column_keys = kColumnKeyMapping;

auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>();
// Directly assign shared_ptr objects to ParquetEncryptionConfig members
parquet_encryption_config->crypto_factory = crypto_factory_;
parquet_encryption_config->kms_connection_config = kms_connection_config_;
parquet_encryption_config->encryption_config = encryption_config;

auto parquet_file_write_options =
checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions());
parquet_file_write_options->parquet_encryption_config =
std::move(parquet_encryption_config);

std::vector<std::string> paths = {"part=a/part0.parquet", "part=c/part0.parquet",
"part=e/part0.parquet", "part=g/part0.parquet",
"part=i/part0.parquet"};

auto file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

auto table_schema =
schema({field("a", int64()), field("c", int64()), field("e", int64())});
std::shared_ptr<parquet::SchemaDescriptor> schema;
auto writer_props = parquet::WriterProperties::Builder().build();

PARQUET_ASSIGN_OR_THROW(auto metadata_list, ReadMetadata(paths, file_system_, reader_properties));
PARQUET_ASSIGN_OR_THROW(auto metadata, parquet::FileMetaData::CoalesceMetadata(metadata_list, writer_props));

// TODO: Make sure plaintext footer mode works
// encryption_config->plaintext_footer = true;
file_encryption_properties = crypto_factory_->GetFileEncryptionProperties(
*kms_connection_config_, *encryption_config);

// Write metadata to _metadata file.
std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteEncryptedMetadataFile(*metadata, stream, file_encryption_properties);
ARROW_EXPECT_OK(stream->Close());

// Set scan options.
decryption_config = std::make_shared<parquet::encryption::DecryptionConfiguration>();
parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>();
parquet_decryption_config->crypto_factory = crypto_factory_;
parquet_decryption_config->kms_connection_config = kms_connection_config_;
parquet_decryption_config->decryption_config = std::move(decryption_config);

parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

file_format = std::make_shared<ParquetFileFormat>();
file_format->default_fragment_scan_options = std::move(parquet_scan_options);

ParquetFactoryOptions parquet_factory_options;
parquet_factory_options.partitioning = partitioning_;
parquet_factory_options.partition_base_dir = kBaseDir;

ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(metadata_path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();

ASSERT_TRUE(file_metadata->Equals(*metadata));

// Create parquet dataset factory
ASSERT_OK_AND_ASSIGN(auto parquet_dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, parquet_factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, parquet_dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*table_, *combined_table);
}

// Write _metadata file and read the dataset using the metadata.
TEST_F(DatasetTestBase, ReadDatasetFromMetadata) {
auto reader_properties = parquet::default_reader_properties();

std::vector<std::string> paths = {"part=a/part0.parquet", "part=b/part0.parquet"};
std::vector<std::shared_ptr<parquet::FileMetaData>> metadata;

for (const auto& path : paths) {
ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile(path));
auto parquet_reader = parquet::ParquetFileReader::Open(input, reader_properties);
auto file_metadata = parquet_reader->metadata();
// Make sure file_paths are stored in metadata
file_metadata->set_file_path(path);
metadata.push_back(file_metadata);
}
metadata[0]->AppendRowGroups(*metadata[1]);

std::string metadata_path = "_metadata";
ASSERT_OK_AND_ASSIGN(auto stream, file_system_->OpenOutputStream(metadata_path));
WriteMetaDataFile(*metadata[0], stream.get());
ARROW_EXPECT_OK(stream->Close());

auto file_format = std::make_shared<ParquetFileFormat>();
ParquetFactoryOptions factory_options;
factory_options.partitioning = partitioning_;
factory_options.partition_base_dir = kBaseDir;
ASSERT_OK_AND_ASSIGN(auto dataset_factory,
ParquetDatasetFactory::Make(metadata_path, file_system_,
file_format, factory_options));

// Create the dataset
ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish());

// Read dataset into table
ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan());
ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish());
ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable());

// Verify the data was read correctly
ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks());

// Validate the table
ASSERT_OK(combined_table->ValidateFull());
AssertTablesEqual(*combined_table, *table_);
}

// GH-39444: This test covers the case where parquet dataset scanner crashes when
// processing encrypted datasets over 2^15 rows in multi-threaded mode.
class LargeRowEncryptionTest : public DatasetEncryptionTestBase {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
return Status::OK();
}

Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
PARQUET_CATCH_NOT_OK(::parquet::WriteEncryptedMetadataFile(file_metadata, sink,
file_encryption_properties));
return Status::OK();
}

Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
std::shared_ptr<WriterProperties> properties,
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ PARQUET_EXPORT
::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

/// \brief Write encrypted metadata-only Parquet file to indicated Arrow OutputStream
PARQUET_EXPORT
::arrow::Status WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

/// \brief Write a Table to Parquet.
///
/// This writes one table in a single shot. To write a Parquet file with
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/parquet/file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,46 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
}
}

void WriteEncryptedMetadataFile(
const FileMetaData& metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties) {
auto file_encryptor = std::make_unique<InternalFileEncryptor>(
file_encryption_properties.get(), ::arrow::default_memory_pool());

if (file_encryption_properties->encrypted_footer()) {
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));

PARQUET_ASSIGN_OR_THROW(int64_t position, sink->Tell());
auto metadata_start = static_cast<uint64_t>(position);

auto writer_props = parquet::WriterProperties::Builder()
.encryption(file_encryption_properties)
->build();
auto builder = FileMetaDataBuilder::Make(metadata.schema(), writer_props);

auto footer_metadata = builder->Finish(metadata.key_value_metadata());
auto crypto_metadata = builder->GetCryptoMetaData();

WriteFileCryptoMetaData(*crypto_metadata, sink.get());

auto footer_encryptor = file_encryptor->GetFooterEncryptor();
WriteEncryptedFileMetadata(metadata, sink.get(), footer_encryptor, true);

PARQUET_ASSIGN_OR_THROW(position, sink->Tell());
auto footer_and_crypto_len = static_cast<uint32_t>(position - metadata_start);
PARQUET_THROW_NOT_OK(
sink->Write(reinterpret_cast<uint8_t*>(&footer_and_crypto_len), 4));
PARQUET_THROW_NOT_OK(sink->Write(kParquetEMagic, 4));
} else {
// Encrypted file with plaintext footer mode.
PARQUET_THROW_NOT_OK(sink->Write(kParquetMagic, 4));
auto footer_signing_encryptor = file_encryptor->GetFooterSigningEncryptor();
WriteEncryptedFileMetadata(metadata, sink.get(), footer_signing_encryptor, false);
}

file_encryptor->WipeOutEncryptionKeys();
}

void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
ArrowOutputStream* sink) {
crypto_metadata.WriteTo(sink);
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/parquet/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ PARQUET_EXPORT
void WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

PARQUET_EXPORT
void WriteEncryptedMetadataFile(
const FileMetaData& file_metadata, std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<FileEncryptionProperties> file_encryption_properties);

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
ArrowOutputStream* sink,
Expand All @@ -125,9 +130,10 @@ void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,

PARQUET_EXPORT
void WriteEncryptedFileMetadata(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink,
std::shared_ptr<::arrow::io::OutputStream> sink,
const std::shared_ptr<Encryptor>& encryptor = NULLPTR,
bool encrypt_footer = false);

PARQUET_EXPORT
void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
::arrow::io::OutputStream* sink);
Expand Down
Loading

0 comments on commit 50fee62

Please sign in to comment.