Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort dictionary data alphabetically in the ORC writer #14295

Merged
merged 17 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ class orc_writer_options {
std::map<std::string, std::string> _user_data;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// Specify whether string dictionaries should be alphabetically sorted
bool _enable_dictionary_sort = true;

friend orc_writer_options_builder;

Expand Down Expand Up @@ -572,6 +574,13 @@ class orc_writer_options {
return _compression_stats;
}

/**
* @brief Returns whether string dictionaries should be sorted.
*
* @return `true` if string dictionaries should be sorted
*/
[[nodiscard]] bool get_enable_dictionary_sort() const { return _enable_dictionary_sort; }

// Setters

/**
Expand Down Expand Up @@ -670,6 +679,13 @@ class orc_writer_options {
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
*/
void set_enable_dictionary_sort(bool val) { _enable_dictionary_sort = val; }
};

/**
Expand Down Expand Up @@ -810,6 +826,18 @@ class orc_writer_options_builder {
return *this;
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
* @return this for chaining
*/
orc_writer_options_builder& enable_dictionary_sort(bool val)
{
options._enable_dictionary_sort = val;
return *this;
}

/**
* @brief move orc_writer_options member once it's built.
*/
Expand Down Expand Up @@ -866,6 +894,8 @@ class chunked_orc_writer_options {
std::map<std::string, std::string> _user_data;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;
// Specify whether string dictionaries should be alphabetically sorted
bool _enable_dictionary_sort = true;

friend chunked_orc_writer_options_builder;

Expand Down Expand Up @@ -966,6 +996,13 @@ class chunked_orc_writer_options {
return _compression_stats;
}

/**
* @brief Returns whether string dictionaries should be sorted.
*
* @return `true` if string dictionaries should be sorted
*/
[[nodiscard]] bool get_enable_dictionary_sort() const { return _enable_dictionary_sort; }

// Setters

/**
Expand Down Expand Up @@ -1057,6 +1094,13 @@ class chunked_orc_writer_options {
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
*/
void set_enable_dictionary_sort(bool val) { _enable_dictionary_sort = val; }
};

/**
Expand Down Expand Up @@ -1183,6 +1227,18 @@ class chunked_orc_writer_options_builder {
return *this;
}

/**
* @brief Sets whether string dictionaries should be sorted.
*
* @param val Boolean value to enable/disable
* @return this for chaining
*/
chunked_orc_writer_options_builder& enable_dictionary_sort(bool val)
{
options._enable_dictionary_sort = val;
return *this;
}

/**
* @brief move chunked_orc_writer_options member once it's built.
*/
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ struct EncChunk {
uint8_t dtype_len; // data type length
int32_t scale; // scale for decimals or timestamps

uint32_t* dict_index; // dictionary index from row index
uint32_t* dict_index; // dictionary index from row index
uint32_t* dict_data_order; // map from data to sorted data indices
uint32_t* decimal_offsets;
orc_column_device_view const* column;
};
Expand Down Expand Up @@ -191,11 +192,12 @@ struct stripe_dictionary {
size_type num_rows = 0; // number of rows in the stripe

// output
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
device_span<uint32_t> index; // index into the dictionary for each row in the column
size_type entry_count = 0; // number of entries in the dictionary
size_type char_count = 0; // number of characters in the dictionary
bool is_enabled = false; // true if dictionary encoding is enabled for this stripe
device_span<uint32_t> data; // index of elements in the column to include in the dictionary
device_span<uint32_t> index; // index into the dictionary for each row in the column
device_span<uint32_t> data_order; // map from data to sorted data indices
size_type entry_count = 0; // number of entries in the dictionary
size_type char_count = 0; // number of characters in the dictionary
bool is_enabled = false; // true if dictionary encoding is enabled for this stripe
};

/**
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,10 @@ __global__ void __launch_bounds__(block_size)
if (dict_idx > 0x7fff'ffffu) {
dict_idx = s->chunk.dict_index[dict_idx & 0x7fff'ffffu];
}
// translate dictionary index to sorted order, if enabled
if (s->chunk.dict_data_order != nullptr) {
dict_idx = s->chunk.dict_data_order[dict_idx];
}
s->vals.u32[nz_idx] = dict_idx;
} else {
string_view value = column.element<string_view>(row);
Expand Down
113 changes: 92 additions & 21 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/pinned_host_vector.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/bit.hpp>
Expand All @@ -50,6 +51,8 @@
#include <thrust/pair.h>
#include <thrust/reduce.h>
#include <thrust/scan.h>
#include <thrust/sequence.h>
#include <thrust/sort.h>
#include <thrust/tabulate.h>
#include <thrust/transform.h>

Expand Down Expand Up @@ -867,16 +870,15 @@ encoded_data encode_columns(orc_table_view const& orc_table,
ck.null_mask_num_rows = aligned_rowgroups[rg_idx][column.index()].size();
ck.encoding_kind = column.orc_encoding();
ck.type_kind = column.orc_kind();
if (ck.type_kind == TypeKind::STRING) {
ck.dict_index = (ck.encoding_kind == DICTIONARY_V2)
? column.host_stripe_dict(stripe.id).index.data()
: nullptr;
ck.dtype_len = 1;
} else {
ck.dtype_len = column.type_width();
}
ck.scale = column.scale();
if (ck.type_kind == TypeKind::DECIMAL) { ck.decimal_offsets = column.decimal_offsets(); }
auto const is_str_dict =
ck.type_kind == TypeKind::STRING and ck.encoding_kind == DICTIONARY_V2;
ck.dict_index = is_str_dict ? column.host_stripe_dict(stripe.id).index.data() : nullptr;
ck.dict_data_order =
is_str_dict ? column.host_stripe_dict(stripe.id).data_order.data() : nullptr;
ck.dtype_len = (ck.type_kind == TypeKind::STRING) ? 1 : column.type_width();
ck.scale = column.scale();
ck.decimal_offsets =
(ck.type_kind == TypeKind::DECIMAL) ? column.decimal_offsets() : nullptr;
Comment on lines +873 to +881
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of these were left uninitialized when unused, changed to always initialize.

}
}
}
Expand Down Expand Up @@ -2012,24 +2014,41 @@ struct stripe_dictionaries {
hostdevice_2dvector<gpu::stripe_dictionary> views; // descriptors [string_column][stripe]
std::vector<rmm::device_uvector<uint32_t>> data_owner; // dictionary data owner, per stripe
std::vector<rmm::device_uvector<uint32_t>> index_owner; // dictionary index owner, per stripe
std::vector<rmm::device_uvector<uint32_t>> order_owner; // dictionary order owner, per stripe

// Should be called after encoding is complete to deallocate the dictionary buffers.
void on_encode_complete(rmm::cuda_stream_view stream)
{
data_owner.clear();
index_owner.clear();
order_owner.clear();

for (auto& sd : views.host_view().flat_view()) {
sd.data = {};
sd.index = {};
sd.data = {};
sd.index = {};
sd.data_order = {};
}
views.host_to_device_async(stream);
}
};

/**
* @brief Compares two rows in a strings column
*/
struct string_rows_less {
device_span<orc_column_device_view> cols;
uint32_t col_idx;
__device__ bool operator()(size_type lhs_idx, size_type rhs_idx) const
{
auto const& col = cols[col_idx];
return col.element<string_view>(lhs_idx) < col.element<string_view>(rhs_idx);
}
};

// Build stripe dictionaries for string columns
stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
file_segmentation const& segmentation,
bool sort_dictionaries,
rmm::cuda_stream_view stream)
{
std::vector<std::vector<rmm::device_uvector<gpu::slot_type>>> hash_maps_storage(
Expand Down Expand Up @@ -2080,6 +2099,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
// Data owners; can be cleared after encode
std::vector<rmm::device_uvector<uint32_t>> dict_data_owner;
std::vector<rmm::device_uvector<uint32_t>> dict_index_owner;
std::vector<rmm::device_uvector<uint32_t>> dict_order_owner;
// Make decision about which stripes to encode with dictionary encoding
for (auto col_idx : orc_table.string_column_indices) {
auto& str_column = orc_table.column(col_idx);
Expand Down Expand Up @@ -2122,15 +2142,61 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
gpu::collect_map_entries(stripe_dicts, stream);
gpu::get_dictionary_indices(stripe_dicts, orc_table.d_columns, stream);

// Clear map slots; hash map storage is deallocated at the end of this function
auto device_dicts_flat = stripe_dicts.device_view().flat_view();
thrust::for_each(rmm::exec_policy(stream),
device_dicts_flat.begin(),
device_dicts_flat.end(),
[] __device__(auto& sd) { sd.map_slots = {}; });
stripe_dicts.device_to_host_async(stream);
// deallocate hash map storage, unused after this point
hash_maps_storage.clear();

// Clear map slots and attach order buffers
auto dictionaries_flat = stripe_dicts.host_view().flat_view();
for (auto& sd : dictionaries_flat) {
if (not sd.is_enabled) { continue; }

sd.map_slots = {};
if (sort_dictionaries) {
dict_order_owner.emplace_back(sd.entry_count, stream);
sd.data_order = dict_order_owner.back();
} else {
sd.data_order = {};
}
}
stripe_dicts.host_to_device_async(stream);

// Sort stripe dictionaries alphabetically
if (sort_dictionaries) {
auto streams = cudf::detail::fork_streams(stream, std::min<size_t>(dict_order_owner.size(), 8));
auto stream_idx = 0;
for (auto& sd : dictionaries_flat) {
if (not sd.is_enabled) { continue; }

auto const& current_stream = streams[stream_idx];

// Sort the dictionary data and create a mapping from the sorted order to the original
thrust::sequence(
rmm::exec_policy_nosync(current_stream), sd.data_order.begin(), sd.data_order.end());
thrust::sort_by_key(rmm::exec_policy_nosync(current_stream),
sd.data.begin(),
sd.data.end(),
sd.data_order.begin(),
string_rows_less{orc_table.d_columns, sd.column_idx});

// Create the inverse permutation - i.e. the mapping from the original order to the sorted
auto order_copy = cudf::detail::make_device_uvector_async<uint32_t>(
sd.data_order, current_stream, rmm::mr::get_current_device_resource());
thrust::scatter(rmm::exec_policy_nosync(current_stream),
thrust::counting_iterator<uint32_t>(0),
thrust::counting_iterator<uint32_t>(sd.data_order.size()),
order_copy.begin(),
sd.data_order.begin());

stream_idx = (stream_idx + 1) % streams.size();
}

cudf::detail::join_streams(streams, stream);
}

return {std::move(stripe_dicts), std::move(dict_data_owner), std::move(dict_index_owner)};
return {std::move(stripe_dicts),
std::move(dict_data_owner),
std::move(dict_index_owner),
std::move(dict_order_owner)};
}

/**
Expand All @@ -2142,6 +2208,7 @@ stripe_dictionaries build_dictionaries(orc_table_view& orc_table,
* @param max_stripe_size Maximum size of stripes in the output file
* @param row_index_stride The row index stride
* @param enable_dictionary Whether dictionary is enabled
* @param sort_dictionaries Whether to sort the dictionaries
* @param compression_kind The compression kind
* @param compression_blocksize The block size used for compression
* @param stats_freq Column statistics granularity type for parquet/orc writers
Expand All @@ -2156,6 +2223,7 @@ auto convert_table_to_orc_data(table_view const& input,
stripe_size_limits max_stripe_size,
size_type row_index_stride,
bool enable_dictionary,
bool sort_dictionaries,
CompressionKind compression_kind,
size_t compression_blocksize,
statistics_freq stats_freq,
Expand All @@ -2180,7 +2248,7 @@ auto convert_table_to_orc_data(table_view const& input,
auto segmentation =
calculate_segmentation(orc_table.columns, std::move(rowgroup_bounds), max_stripe_size);

auto stripe_dicts = build_dictionaries(orc_table, segmentation, stream);
auto stripe_dicts = build_dictionaries(orc_table, segmentation, sort_dictionaries, stream);
auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream);

auto const uncompressed_block_align = uncomp_block_alignment(compression_kind);
Expand Down Expand Up @@ -2314,6 +2382,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
_compression_blocksize(compression_block_size(_compression_kind)),
_compression_statistics(options.get_compression_statistics()),
_stats_freq(options.get_statistics_freq()),
_sort_dictionaries{options.get_enable_dictionary_sort()},
_single_write_mode(mode),
_kv_meta(options.get_key_value_metadata()),
_out_sink(std::move(sink))
Expand All @@ -2335,6 +2404,7 @@ writer::impl::impl(std::unique_ptr<data_sink> sink,
_compression_blocksize(compression_block_size(_compression_kind)),
_compression_statistics(options.get_compression_statistics()),
_stats_freq(options.get_statistics_freq()),
_sort_dictionaries{options.get_enable_dictionary_sort()},
_single_write_mode(mode),
_kv_meta(options.get_key_value_metadata()),
_out_sink(std::move(sink))
Expand Down Expand Up @@ -2382,6 +2452,7 @@ void writer::impl::write(table_view const& input)
_max_stripe_size,
_row_index_stride,
_enable_dictionary,
_sort_dictionaries,
_compression_kind,
_compression_blocksize,
_stats_freq,
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/orc/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class writer::impl {
size_t const _compression_blocksize;
std::shared_ptr<writer_compression_statistics> _compression_statistics; // Optional output
statistics_freq const _stats_freq;
bool const _sort_dictionaries;
single_write_mode const _single_write_mode; // Special parameter only used by `write()` to
// indicate that we are guaranteeing a single table
// write. This enables some internal optimizations.
Expand Down
Loading