Skip to content

Commit

Permalink
CUDF_KVIKIO_S3
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Aug 6, 2024
1 parent e0d1ac1 commit 708b0fc
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 14 deletions.
73 changes: 73 additions & 0 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cudf/utilities/span.hpp>

#include <kvikio/file_handle.hpp>
#include <kvikio/remote_handle.hpp>

#include <rmm/device_buffer.hpp>

Expand Down Expand Up @@ -427,12 +428,84 @@ class user_datasource_wrapper : public datasource {
datasource* const source; ///< A non-owning pointer to the user-implemented datasource
};

/**
* @brief
*/
class remote_file_source : public datasource {
public:
explicit remote_file_source(char const* filepath)
{
std::cout << "remote_file_source() - filepath: " << filepath << std::endl;
detail::force_init_cuda_context();
CUDF_EXPECTS(cufile_integration::is_kvikio_enabled(),
"Please enable kvikio to access S3 files.");
_kvikio_file = kvikio::RemoteHandle(filepath);
}

~remote_file_source() override = default;

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] bool is_device_read_preferred(size_t size) const override { return true; }

[[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); }

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Remote reads are not supported for this file.");

auto const read_size = std::min(size, this->size() - offset);
return _kvikio_file.pread(dst, read_size, offset);
}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
return device_read_async(offset, size, dst, stream).get();
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
rmm::device_buffer out_data(size, stream);
size_t read = device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read, stream);
return datasource::buffer::create(std::move(out_data));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
auto const read_size = std::min(size, this->size() - offset);
return _kvikio_file.pread(dst, read_size, offset).get();
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
auto const count = std::min(size, this->size() - offset);
std::vector<uint8_t> h_data(count);
this->host_read(offset, count, h_data.data());
return datasource::buffer::create(std::move(h_data));
}

private:
kvikio::RemoteHandle _kvikio_file;
};

} // namespace

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
size_t offset,
size_t size)
{
if (filepath.size() > 5 && filepath.substr(0, 5) == "s3://") {
return std::make_unique<remote_file_source>(filepath.c_str());
}
#ifdef CUFILE_FOUND
if (cufile_integration::is_always_enabled()) {
// avoid mmap as GDS is expected to be used for most reads
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ cdef class SourceInfo:
for src in sources:
if not isinstance(src, (os.PathLike, str)):
raise ValueError("All sources must be of the same type!")
if not os.path.isfile(src):
if not os.path.isfile(src) and "s3://" not in str(src):
raise FileNotFoundError(errno.ENOENT,
os.strerror(errno.ENOENT),
src)
Expand Down
64 changes: 51 additions & 13 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from pyarrow import dataset as ds

import cudf
import cudf.options
from cudf._lib import parquet as libparquet
from cudf.api.extensions import no_default
from cudf.api.types import is_list_like
from cudf.core.column import as_column, build_categorical_column, column_empty
from cudf.utils import ioutils
Expand Down Expand Up @@ -541,10 +543,13 @@ def read_parquet(
dataset_kwargs=None,
nrows=None,
skip_rows=None,
use_kvikio_s3=None,
*args,
**kwargs,
):
"""{docstring}"""
import s3fs.core

if engine not in {"cudf", "pyarrow"}:
raise ValueError(
f"Only supported engines are {{'cudf', 'pyarrow'}}, got {engine=}"
Expand Down Expand Up @@ -580,6 +585,9 @@ def read_parquet(
if not is_list_like(columns):
raise ValueError("Expected list like for columns")

if use_kvikio_s3 is None:
use_kvikio_s3 = cudf.options.get_option("kvikio_s3")

# Start by trying construct a filesystem object, so we
# can apply filters on remote file-systems
fs, paths = ioutils._get_filesystem_and_paths(
Expand Down Expand Up @@ -622,21 +630,48 @@ def read_parquet(
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
print(
f"read_parquet calling get_reader_filepath_or_buffer() - filepath_or_buffer: {filepath_or_buffer}, fs: {fs}"
)
for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=use_python_file_object,
open_file_options=open_file_options,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
source = ioutils.stringify_pathlike(source)
if use_kvikio_s3 and (
isinstance(fs, s3fs.core.S3FileSystem)
or isinstance(source, s3fs.core.S3File)
):
tmp_source = source
if isinstance(source, str) and not source.startswith("s3://"):
tmp_source = f"s3://{source}"
elif isinstance(source, s3fs.core.S3File):
tmp_source = source.full_name

# Trigger future warnings as in `ioutils.get_reader_filepath_or_buffer()`
if use_python_file_object not in (no_default, None):
warnings.warn(
"The 'use_python_file_object' keyword is deprecated and "
"will be removed in a future version.",
FutureWarning,
)
if open_file_options is not None:
warnings.warn(
"The 'open_file_options' keyword is deprecated and "
"will be removed in a future version.",
FutureWarning,
)
else:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=use_python_file_object,
open_file_options=open_file_options,
storage_options=storage_options,
bytes_per_thread=bytes_per_thread,
)
if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepath_or_buffer.extend(tmp_source)
else:
Expand Down Expand Up @@ -821,6 +856,9 @@ def _parquet_to_frame(
skip_rows=None,
**kwargs,
):
print(
f"_parquet_to_frame_parquet_to_frame() - paths_or_buffers: {paths_or_buffers}"
)
# If this is not a partitioned read, only need
# one call to `_read_parquet`
if not partition_keys:
Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ def _integer_and_none_validator(val):
_make_contains_validator([False, True]),
)

_register_option(
"kvikio_s3",
_env_get_bool("CUDF_KVIKIO_S3", False),
textwrap.dedent(
"""
Whether to use KvikIO's S3 backend or not.
\tValid values are True or False. Default is False.
"""
),
_make_contains_validator([False, True]),
)


class option_context(ContextDecorator):
"""
Expand Down
6 changes: 6 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def s3_base(endpoint_ip, endpoint_port):
os.environ["AWS_SECURITY_TOKEN"] = "foobar_security_token"
os.environ["AWS_SESSION_TOKEN"] = "foobar_session_token"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
os.environ["AWS_ENDPOINT_URL"] = (
f"http://{endpoint_ip}:{endpoint_port}"
)

# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost
Expand Down Expand Up @@ -228,6 +231,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):
@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
@pytest.mark.parametrize("precache", [None, "parquet"])
@pytest.mark.parametrize("use_python_file_object", [True, False])
@pytest.mark.parametrize("use_kvikio_s3", [True, False])
def test_read_parquet(
s3_base,
s3so,
Expand All @@ -236,6 +240,7 @@ def test_read_parquet(
columns,
precache,
use_python_file_object,
use_kvikio_s3,
):
fname = "test_parquet_reader.parquet"
bucket = "parquet"
Expand All @@ -257,6 +262,7 @@ def test_read_parquet(
bytes_per_thread=bytes_per_thread,
columns=columns,
use_python_file_object=use_python_file_object,
use_kvikio_s3=use_kvikio_s3,
)
expect = pdf[columns] if columns else pdf
assert_eq(expect, got1)
Expand Down

0 comments on commit 708b0fc

Please sign in to comment.