Skip to content

Commit

Permalink
Merge branch 'branch-24.10' of https://github.com/mhaseeb123/cudf int…
Browse files Browse the repository at this point in the history
…o fea-pq-reader-mismatched-schema
  • Loading branch information
mhaseeb123 committed Aug 6, 2024
2 parents 3fdb874 + 8068a2d commit c5eb3b6
Show file tree
Hide file tree
Showing 82 changed files with 1,839 additions and 466 deletions.
1 change: 1 addition & 0 deletions conda/environments/all_cuda-118_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies:
- dlpack>=0.8,<1.0
- doxygen=1.9.1
- fastavro>=0.22.9
- flatbuffers==24.3.25
- fmt>=10.1.1,<11
- fsspec>=0.6.0
- gcc_linux-64=11.*
Expand Down
1 change: 1 addition & 0 deletions conda/environments/all_cuda-125_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- dlpack>=0.8,<1.0
- doxygen=1.9.1
- fastavro>=0.22.9
- flatbuffers==24.3.25
- fmt>=10.1.1,<11
- fsspec>=0.6.0
- gcc_linux-64=11.*
Expand Down
3 changes: 3 additions & 0 deletions conda/recipes/libcudf/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ librdkafka_version:
fmt_version:
- ">=10.1.1,<11"

flatbuffers_version:
- "=24.3.25"

spdlog_version:
- ">=1.12.0,<1.13"

Expand Down
1 change: 1 addition & 0 deletions conda/recipes/libcudf/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ requirements:
- dlpack {{ dlpack_version }}
- librdkafka {{ librdkafka_version }}
- fmt {{ fmt_version }}
- flatbuffers {{ flatbuffers_version }}
- spdlog {{ spdlog_version }}
- zlib {{ zlib_version }}

Expand Down
5 changes: 5 additions & 0 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ ConfigureNVBench(JSON_READER_NVBENCH io/json/nested_json.cpp io/json/json_reader
ConfigureNVBench(JSON_READER_OPTION_NVBENCH io/json/json_reader_option.cpp)
ConfigureNVBench(JSON_WRITER_NVBENCH io/json/json_writer.cpp)

# ##################################################################################################
# * multi buffer memset benchmark
# ----------------------------------------------------------------------
ConfigureNVBench(BATCHED_MEMSET_BENCH io/utilities/batched_memset_bench.cpp)

# ##################################################################################################
# * io benchmark ---------------------------------------------------------------------
ConfigureNVBench(MULTIBYTE_SPLIT_NVBENCH io/text/multibyte_split.cpp)
Expand Down
101 changes: 101 additions & 0 deletions cpp/benchmarks/io/utilities/batched_memset_bench.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/io/cuio_common.hpp>
#include <benchmarks/io/nvbench_helpers.hpp>

#include <cudf/io/parquet.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <nvbench/nvbench.cuh>

// Size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks to
// run on most GPUs, but large enough to allow highest throughput
constexpr size_t data_size = 512 << 20;

void parquet_read_common(cudf::size_type num_rows_to_read,
cudf::size_type num_cols_to_read,
cuio_source_sink_pair& source_sink,
nvbench::state& state)
{
cudf::io::parquet_reader_options read_opts =
cudf::io::parquet_reader_options::builder(source_sink.make_source_info());

auto mem_stats_logger = cudf::memory_stats_logger();
state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value()));
state.exec(
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
try_drop_l3_cache();

timer.start();
auto const result = cudf::io::read_parquet(read_opts);
timer.stop();

CUDF_EXPECTS(result.tbl->num_columns() == num_cols_to_read, "Unexpected number of columns");
CUDF_EXPECTS(result.tbl->num_rows() == num_rows_to_read, "Unexpected number of rows");
});

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_element_count(static_cast<double>(data_size) / time, "bytes_per_second");
state.add_buffer_size(
mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage");
state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size");
}

template <data_type DataType>
void bench_batched_memset(nvbench::state& state, nvbench::type_list<nvbench::enum_type<DataType>>)
{
auto const d_type = get_type_or_group(static_cast<int32_t>(DataType));
auto const num_cols = static_cast<cudf::size_type>(state.get_int64("num_cols"));
auto const cardinality = static_cast<cudf::size_type>(state.get_int64("cardinality"));
auto const run_length = static_cast<cudf::size_type>(state.get_int64("run_length"));
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
auto const compression = cudf::io::compression_type::NONE;
cuio_source_sink_pair source_sink(source_type);
auto const tbl =
create_random_table(cycle_dtypes(d_type, num_cols),
table_size_bytes{data_size},
data_profile_builder().cardinality(cardinality).avg_run_length(run_length));
auto const view = tbl->view();

cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(compression);
cudf::io::write_parquet(write_opts);
auto const num_rows = view.num_rows();

parquet_read_common(num_rows, num_cols, source_sink, state);
}

using d_type_list = nvbench::enum_type_list<data_type::INTEGRAL,
data_type::FLOAT,
data_type::DECIMAL,
data_type::TIMESTAMP,
data_type::DURATION,
data_type::STRING,
data_type::LIST,
data_type::STRUCT>;

NVBENCH_BENCH_TYPES(bench_batched_memset, NVBENCH_TYPE_AXES(d_type_list))
.set_name("batched_memset")
.set_type_axes_names({"data_type"})
.add_int64_axis("num_cols", {1000})
.add_string_axis("io_type", {"DEVICE_BUFFER"})
.set_min_samples(4)
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});
17 changes: 4 additions & 13 deletions cpp/include/cudf/detail/cuco_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,10 @@ static double constexpr CUCO_DESIRED_LOAD_FACTOR = 0.5;
* later expects a standard C++ `Allocator` interface. This allocator helper provides a simple way
* to handle cuco memory allocation/deallocation with the given `stream` and the rmm default memory
* resource.
*
* @tparam T The allocator's value type.
*/
class cuco_allocator
: public rmm::mr::stream_allocator_adaptor<rmm::mr::polymorphic_allocator<char>> {
/// Default stream-ordered allocator type
using default_allocator = rmm::mr::polymorphic_allocator<char>;
/// The base allocator adaptor type
using base_type = rmm::mr::stream_allocator_adaptor<default_allocator>;

public:
/**
* @brief Constructs the allocator adaptor with the given `stream`
*/
cuco_allocator(rmm::cuda_stream_view stream) : base_type{default_allocator{}, stream} {}
};
template <typename T>
using cuco_allocator = rmm::mr::stream_allocator_adaptor<rmm::mr::polymorphic_allocator<T>>;

} // namespace cudf::detail
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/distinct_hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct distinct_hash_join {
cuda::thread_scope_device,
comparator_adapter<d_equal_type>,
probing_scheme_type,
cudf::detail::cuco_allocator,
cudf::detail::cuco_allocator<char>,
cuco_storage_type>;

bool _has_nulls; ///< true if nulls are present in either build table or probe table
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/hash_reduce_by_row.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
namespace cudf::detail {

using hash_map_type = cuco::legacy::
static_map<size_type, size_type, cuda::thread_scope_device, cudf::detail::cuco_allocator>;
static_map<size_type, size_type, cuda::thread_scope_device, cudf::detail::cuco_allocator<char>>;

/**
* @brief The base struct for customized reduction functor to perform reduce-by-key with keys are
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cudf/detail/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct hash_join {
cuco::static_multimap<hash_value_type,
cudf::size_type,
cuda::thread_scope_device,
cudf::detail::cuco_allocator,
cudf::detail::cuco_allocator<char>,
cuco::legacy::double_hashing<DEFAULT_JOIN_CG_SIZE, Hasher, Hasher>>;

hash_join() = delete;
Expand Down
82 changes: 82 additions & 0 deletions cpp/include/cudf/io/detail/batched_memset.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/detail/iterator.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
#include <rmm/resource_ref.hpp>

#include <cub/device/device_copy.cuh>
#include <cuda/functional>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/transform.h>

namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief A helper function that takes in a vector of device spans and memsets them to the
* value provided using batches sent to the GPU.
*
* @param bufs Vector with device spans of data
* @param value Value to memset all device spans to
* @param _stream Stream used for device memory operations and kernel launches
*
* @return The data in device spans all set to value
*/
template <typename T>
void batched_memset(std::vector<cudf::device_span<T>> const& bufs,
T const value,
rmm::cuda_stream_view stream)
{
// define task and bytes parameters
auto const num_bufs = bufs.size();

// copy bufs into device memory and then get sizes
auto gpu_bufs =
cudf::detail::make_device_uvector_async(bufs, stream, rmm::mr::get_current_device_resource());

// get a vector with the sizes of all buffers
auto sizes = cudf::detail::make_counting_transform_iterator(
static_cast<std::size_t>(0),
cuda::proclaim_return_type<std::size_t>(
[gpu_bufs = gpu_bufs.data()] __device__(std::size_t i) { return gpu_bufs[i].size(); }));

// get an iterator with a constant value to memset
auto iter_in = thrust::make_constant_iterator(thrust::make_constant_iterator(value));

// get an iterator pointing to each device span
auto iter_out = thrust::make_transform_iterator(
thrust::counting_iterator<std::size_t>(0),
cuda::proclaim_return_type<T*>(
[gpu_bufs = gpu_bufs.data()] __device__(std::size_t i) { return gpu_bufs[i].data(); }));

size_t temp_storage_bytes = 0;

cub::DeviceCopy::Batched(nullptr, temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);

rmm::device_buffer d_temp_storage(
temp_storage_bytes, stream, rmm::mr::get_current_device_resource());

cub::DeviceCopy::Batched(
d_temp_storage.data(), temp_storage_bytes, iter_in, iter_out, sizes, num_bufs, stream);
}

} // namespace io::detail
} // namespace CUDF_EXPORT cudf
14 changes: 7 additions & 7 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ class lists_column_wrapper : public detail::column_wrapper {
lists_column_wrapper(std::initializer_list<SourceElementT> elements) : column_wrapper{}
{
build_from_non_nested(
std::move(cudf::test::fixed_width_column_wrapper<T, SourceElementT>(elements).release()));
cudf::test::fixed_width_column_wrapper<T, SourceElementT>(elements).release());
}

/**
Expand All @@ -1361,7 +1361,7 @@ class lists_column_wrapper : public detail::column_wrapper {
lists_column_wrapper(InputIterator begin, InputIterator end) : column_wrapper{}
{
build_from_non_nested(
std::move(cudf::test::fixed_width_column_wrapper<T, SourceElementT>(begin, end).release()));
cudf::test::fixed_width_column_wrapper<T, SourceElementT>(begin, end).release());
}

/**
Expand All @@ -1386,7 +1386,7 @@ class lists_column_wrapper : public detail::column_wrapper {
: column_wrapper{}
{
build_from_non_nested(
std::move(cudf::test::fixed_width_column_wrapper<T, SourceElementT>(elements, v).release()));
cudf::test::fixed_width_column_wrapper<T, SourceElementT>(elements, v).release());
}

/**
Expand All @@ -1413,8 +1413,8 @@ class lists_column_wrapper : public detail::column_wrapper {
lists_column_wrapper(InputIterator begin, InputIterator end, ValidityIterator v)
: column_wrapper{}
{
build_from_non_nested(std::move(
cudf::test::fixed_width_column_wrapper<T, SourceElementT>(begin, end, v).release()));
build_from_non_nested(
cudf::test::fixed_width_column_wrapper<T, SourceElementT>(begin, end, v).release());
}

/**
Expand All @@ -1435,7 +1435,7 @@ class lists_column_wrapper : public detail::column_wrapper {
lists_column_wrapper(std::initializer_list<std::string> elements) : column_wrapper{}
{
build_from_non_nested(
std::move(cudf::test::strings_column_wrapper(elements.begin(), elements.end()).release()));
cudf::test::strings_column_wrapper(elements.begin(), elements.end()).release());
}

/**
Expand All @@ -1460,7 +1460,7 @@ class lists_column_wrapper : public detail::column_wrapper {
: column_wrapper{}
{
build_from_non_nested(
std::move(cudf::test::strings_column_wrapper(elements.begin(), elements.end(), v).release()));
cudf::test::strings_column_wrapper(elements.begin(), elements.end(), v).release());
}

/**
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/groupby/hash/groupby.cu
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,16 @@ std::unique_ptr<table> groupby(table_view const& keys,
cudf::detail::result_cache sparse_results(requests.size());

auto const comparator_helper = [&](auto const d_key_equal) {
auto const set = cuco::static_set{num_keys,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_key_equal,
probing_scheme_type{d_row_hash},
cuco::thread_scope_device,
cuco::storage<1>{},
cudf::detail::cuco_allocator{stream},
stream.value()};
auto const set = cuco::static_set{
num_keys,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_key_equal,
probing_scheme_type{d_row_hash},
cuco::thread_scope_device,
cuco::storage<1>{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

// Compute all single pass aggs first
compute_single_pass_aggs(keys,
Expand Down
Loading

0 comments on commit c5eb3b6

Please sign in to comment.