Skip to content

Commit

Permalink
Support Copy from Cloud Storage and csv.gz
Browse files Browse the repository at this point in the history
Test Framework: Support CSV to Parquet conversion (kuzudb#1611)

Convert CSV dataset to PARQUET dataset inside .test files by using:
-DATASET PARQUET CSV_TO_PARQUET(dataset)
  • Loading branch information
rfdavid authored and yuchenZhangTG committed Jun 8, 2023
1 parent c6ce3fe commit 846d8e9
Show file tree
Hide file tree
Showing 115 changed files with 3,980 additions and 143 deletions.
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ else()
set_target_properties(arrow_lib PROPERTIES IMPORTED_LOCATION ${ARROW_PATH})
include_directories(${ARROW_INSTALL}/include)

set(ARROW_LIB arrow_lib arrow_deps)
find_package(CURL)
if (${CURL_FOUND})
set(curllib curl crypto dl)
endif()

set(ARROW_LIB arrow_lib arrow_deps ${curllib})
set(PARQUET_LIB parquet_lib)
endif()

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ FORCE_COLOR=
NUM_THREADS=
SANITIZER_FLAG=
ROOT_DIR=$(CURDIR)
ENABLE_REMOTE_FS=

ifndef $(NUM_THREADS)
NUM_THREADS=1
Expand Down Expand Up @@ -45,7 +46,7 @@ endif

arrow:
$(call mkdirp,external/build) && cd external/build && \
cmake $(FORCE_COLOR) $(SANITIZER_FLAG) $(GENERATOR) -DCMAKE_BUILD_TYPE=Release .. && \
cmake $(FORCE_COLOR) $(SANITIZER_FLAG) $(GENERATOR) -DCMAKE_BUILD_TYPE=Release -DENABLE_REMOTE_FS=$(ENABLE_REMOTE_FS) .. && \
cmake --build . --config Release -- -j $(NUM_THREADS)

release: arrow
Expand Down
2 changes: 1 addition & 1 deletion dataset/tinysnb/copy.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ COPY person FROM "dataset/tinysnb/vPerson.csv" (HeaDER=true, deLim=',');
COPY organisation FROM "dataset/tinysnb/vOrganisation.csv";
COPY movies FROM "dataset/tinysnb/vMovies.csv";
COPY knows FROM "dataset/tinysnb/eKnows.csv";
COPY studyAt FROM "dataset/tinysnb/eStudyAt.csv" (HEADeR=true);
COPY studyAt FROM "dataset/tinysnb/eStudyAt.csv" (HeaDER=true);
COPY workAt FROM "dataset/tinysnb/eWorkAt.csv"
COPY meets FROM "dataset/tinysnb/eMeets.csv"
COPY marries FROM "dataset/tinysnb/eMarries.csv"
20 changes: 20 additions & 0 deletions external/arrow/apache_arrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ else()
set(ARROW_BUILD_TYPE Release)
endif()

# Arrow requires system Curl, build it internally might trip over S3's copy.
find_package(CURL)
if (${ENABLE_REMOTE_FS})
find_package(CURL REQUIRED)
elseif(NOT ${ENABLE_REMOTE_FS})
set(FILE_SYSTEM_ARGS -DARROW_GCS=OFF -DARROW_S3=OFF)
elseif(NOT ${CURL_FOUND})
message(WARNNING ": Could not find curllib. AWS S3 and Google Cloud Storage are turned off.")
set(FILE_SYSTEM_ARGS -DARROW_GCS=OFF -DARROW_S3=OFF)
else()
set(FILE_SYSTEM_ARGS -DARROW_GCS=ON -DARROW_S3=ON -Dgoogle_cloud_cpp_storage_SOURCE=BUNDLED -DAWSSDK_SOURCE=BUNDLED)
endif()

ExternalProject_Add(apache_arrow
GIT_REPOSITORY "https://github.com/apache/arrow"
GIT_TAG f10f5cfd1376fb0e602334588b3f3624d41dee7d
Expand All @@ -25,6 +38,7 @@ ExternalProject_Add(apache_arrow
-DCMAKE_TOOLCHAIN_FILE=${CMAKE_TOOLCHAIN_FILE} -DARROW_ALTIVEC=OFF
-DARROW_USE_CCACHE=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_BUILD_SHARED=OFF
-DARROW_BUILD_STATIC=ON -DARROW_COMPUTE=OFF -DARROW_CSV=ON -DARROW_IPC=ON -DARROW_JEMALLOC=OFF -DARROW_JSON=OFF
-DARROW_FILESYSTEM=ON ${FILE_SYSTEM_ARGS}
-DARROW_PARQUET=ON -DARROW_SIMD_LEVEL=NONE -DARROW_RUNTIME_SIMD_LEVEL=NONE -DARROW_WITH_BROTLI=OFF
-DARROW_WITH_LZ4=ON -Dlz4_SOURCE=BUNDLED -DARROW_WITH_PROTOBUF=OFF -DARROW_WITH_RAPIDJSON=OFF
-DARROW_WITH_SNAPPY=ON -DSnappy_SOURCE=BUNDLED -DARROW_WITH_ZLIB=ON -DZLIB_SOURCE=BUNDLED
Expand All @@ -33,3 +47,9 @@ ExternalProject_Add(apache_arrow
-DARROW_BUILD_UTILITIES=OFF -DARROW_BUILD_TESTS=OFF -DARROW_ENABLE_TIMING_TESTS=OFF -DARROW_FUZZING=OFF
<SOURCE_DIR>/cpp
UPDATE_COMMAND "")



if(NOT ${CURL_FOUND})
message(WARNNING ": AWS S3 and Google Cloud Storage are turned off. To enable it, install system curllib and rebuild.")
endif()
12 changes: 9 additions & 3 deletions scripts/parquet/csv_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
import pyarrow.parquet as pq

csv_files = ['dummy.csv']
read_options = csv.ReadOptions(autogenerate_column_names=True)
has_header = True
# CSV:
# has header? autogenerate_column_names=False
# no header? autogenerate_column_names=True
read_options = csv.ReadOptions(autogenerate_column_names=not has_header)
parse_options = csv.ParseOptions(delimiter=",")
for csv_file in csv_files:
table = csv.read_csv(csv_file, read_options=read_options)
pq.write_table(table, csv_file.replace('.csv', '.parquet'))
table = csv.read_csv(csv_file, read_options=read_options,
parse_options=parse_options)
pq.write_table(table, csv_file.replace('.csv', '.parquet'))
81 changes: 54 additions & 27 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "binder/expression/literal_expression.h"
#include "common/string_utils.h"
#include "parser/copy.h"
#include <arrow/api.h>
#include <arrow/filesystem/api.h>

using namespace kuzu::common;
using namespace kuzu::parser;
Expand Down Expand Up @@ -40,23 +42,61 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
CopyDescription(boundFilePaths, csvReaderConfig, actualFileType), tableID, tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& fileUriOrPaths) {
std::vector<std::string> boundFilePaths;
for (auto& filePath : filePaths) {
auto globbedFilePaths = FileUtils::globFilePath(filePath);
if (globbedFilePaths.empty()) {
throw BinderException{StringUtils::string_format(
"No file found that matches the pattern: {}.", filePath)};
for (auto& fileUriOrPath : fileUriOrPaths) {
std::shared_ptr<arrow::fs::FileSystem> fs;
std::string filePath;

auto status = arrow::fs::FileSystemFromUriOrPath(fileUriOrPath, &filePath).Value(&fs);
// if not recognized, it is considered as a local relative path
if (!status.ok()) {
filePath = fileUriOrPath;
fs = std::make_shared<arrow::fs::LocalFileSystem>();
}
if (dynamic_cast<arrow::fs::LocalFileSystem*>(fs.get()) != nullptr) {
// local file system
auto globbedFilePaths = FileUtils::globFilePath(filePath);
if (globbedFilePaths.empty()) {
throw BinderException{StringUtils::string_format(
"No file found that matches the pattern: {}.", filePath)};
}
boundFilePaths.insert(
boundFilePaths.end(), globbedFilePaths.begin(), globbedFilePaths.end());
} else {
// remote uri
arrow::fs::FileInfo finfo;
throwExceptionIfNotOK(fs->GetFileInfo(filePath).Value(&finfo));
if (finfo.IsFile())
boundFilePaths.push_back(fileUriOrPath);
else if (finfo.IsDirectory()) {
arrow::fs::FileSelector selector;
selector.base_dir = finfo.path();
selector.recursive = true;
arrow::fs::FileInfoVector subfiles;
throwExceptionIfNotOK(fs->GetFileInfo(selector).Value(&subfiles));
std::string scheme = fileUriOrPath.substr(0, fileUriOrPath.find(":"));
for (auto it = subfiles.begin(); it != subfiles.end(); it++) {
if (!it->IsFile())
continue;
boundFilePaths.push_back(scheme + "://" + it->path());
}
}
}
boundFilePaths.insert(
boundFilePaths.end(), globbedFilePaths.begin(), globbedFilePaths.end());
}
if (boundFilePaths.empty()) {
throw BinderException{StringUtils::string_format("Invalid file path: {}.", filePaths[0])};
throw BinderException{
StringUtils::string_format("Invalid file path: {}.", fileUriOrPaths[0])};
}
return boundFilePaths;
}

void Binder::throwExceptionIfNotOK(const arrow::Status& status) {
if (!status.ok()) {
throw BinderException(status.ToString());
}
}

CSVReaderConfig Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
Expand Down Expand Up @@ -119,26 +159,13 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
// We currently only support loading from files with the same type. Loading files with different
// types is not supported.
auto fileName = filePaths[0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
auto npySuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::NPY);
CopyDescription::FileType fileType;
std::string expectedSuffix;
if (fileName.ends_with(csvSuffix)) {
fileType = CopyDescription::FileType::CSV;
expectedSuffix = csvSuffix;
} else if (fileName.ends_with(parquetSuffix)) {
fileType = CopyDescription::FileType::PARQUET;
expectedSuffix = parquetSuffix;
} else if (fileName.ends_with(npySuffix)) {
fileType = CopyDescription::FileType::NPY;
expectedSuffix = npySuffix;
} else {
throw CopyException("Unsupported file type: " + fileName);
CopyDescription::FileType fileType = CopyDescription::getFileType(fileName);
if (fileType == CopyDescription::FileType::UNKNOWN) {
throw BinderException("Unsupported file type: " + fileName);
}
for (auto& path : filePaths) {
if (!path.ends_with(expectedSuffix)) {
throw CopyException("Loading files with different types is not currently supported.");
if (fileType != CopyDescription::getFileType(path)) {
throw BinderException("Loading files with different types is not currently supported.");
}
}
return fileType;
Expand Down
18 changes: 18 additions & 0 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,23 @@ std::string CopyDescription::getFileTypeName(FileType fileType) {
}
}

CopyDescription::FileType CopyDescription::getFileType(std::string& fileName) {
if (fileName.ends_with("parquet"))
return FileType::PARQUET;
if (fileName.ends_with("npy"))
return FileType::NPY;
if (fileName.ends_with("csv"))
return FileType::CSV;
// for "filename.csv.gz"
auto pos = fileName.rfind('.');
auto pos2 = fileName.rfind('.', pos - 1);
if (pos == std::string::npos || pos2 == std::string::npos)
return FileType::UNKNOWN;
if (fileName.substr(pos2 + 1, 3) == "csv" &&
(fileName.ends_with("gz") || fileName.ends_with("lzo") || fileName.ends_with("brotli") ||
fileName.ends_with("lz4") || fileName.ends_with("zstd") || fileName.ends_with("bz2")))
return FileType::CSV;
return FileType::UNKNOWN;
}
} // namespace common
} // namespace kuzu
20 changes: 16 additions & 4 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,23 @@ void FileUtils::writeToFile(
}
}

void FileUtils::overwriteFile(const std::string& from, const std::string& to) {
if (!fileOrPathExists(from) || !fileOrPathExists(to))
void FileUtils::copyFile(
const std::string& from, const std::string& to, std::filesystem::copy_options options) {
if (!fileOrPathExists(from))
return;
std::error_code errorCode;
if (!std::filesystem::copy_file(
from, to, std::filesystem::copy_options::overwrite_existing, errorCode)) {
if (!std::filesystem::copy_file(from, to, options, errorCode)) {
throw Exception(StringUtils::string_format(
"Error copying file {} to {}. ErrorMessage: {}", from, to, errorCode.message()));
}
}

void FileUtils::overwriteFile(const std::string& from, const std::string& to) {
if (!fileOrPathExists(from) || !fileOrPathExists(to))
return;
copyFile(from, to, std::filesystem::copy_options::overwrite_existing);
}

void FileUtils::readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position) {
#if defined(_WIN32)
Expand Down Expand Up @@ -179,6 +185,12 @@ void FileUtils::createDir(const std::string& dir) {
}
}

void FileUtils::createDirIfNotExists(const std::string& path) {
if (!fileOrPathExists(path)) {
createDir(path);
}
}

void FileUtils::removeDir(const std::string& dir) {
std::error_code removeErrorCode;
if (!fileOrPathExists(dir))
Expand Down
6 changes: 5 additions & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once

#include "binder/query/bound_regular_query.h"
#include "common/copier_config/copier_config.h"
#include "expression_binder.h"
#include "parser/query/regular_query.h"
#include "query_normalizer.h"

namespace arrow {
class Status;
}

namespace kuzu {
namespace binder {

Expand Down Expand Up @@ -227,6 +230,7 @@ class Binder {

std::unique_ptr<VariableScope> enterSubquery();
void exitSubquery(std::unique_ptr<VariableScope> prevVariableScope);
static void throwExceptionIfNotOK(const arrow::Status& status);

private:
const catalog::Catalog& catalog;
Expand Down
4 changes: 1 addition & 3 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ struct CopyDescription {

CopyDescription(const CopyDescription& copyDescription);

inline static std::string getFileTypeSuffix(FileType fileType) {
return "." + getFileTypeName(fileType);
}
static FileType getFileType(std::string& fileName);

static std::string getFileTypeName(FileType fileType);

Expand Down
3 changes: 3 additions & 0 deletions src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class FileUtils {
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset);
// This function is a no-op if either file, from or to, does not exist.
static void overwriteFile(const std::string& from, const std::string& to);
static void copyFile(const std::string& from, const std::string& to,
std::filesystem::copy_options options = std::filesystem::copy_options::none);
static void createDir(const std::string& dir);
static void createDirIfNotExists(const std::string& path);
static void removeDir(const std::string& dir);

static inline std::string joinPath(const std::string& base, const std::string& part) {
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/copier/table_copy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "storage/store/table_statistics.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/filesystem/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/reader.h>
#include <arrow/pretty_print.h>
Expand Down
42 changes: 37 additions & 5 deletions src/storage/copier/table_copy_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,38 @@ static bool skipCopyForProperty(const Property& property) {
}

std::shared_ptr<arrow::csv::StreamingReader> TableCopyUtils::createCSVReader(
const std::string& filePath, CSVReaderConfig* csvReaderConfig,
const std::string& fileUriOrPath, common::CSVReaderConfig* csvReaderConfig,
catalog::TableSchema* tableSchema) {
std::shared_ptr<arrow::io::InputStream> inputStream;
throwCopyExceptionIfNotOK(arrow::io::ReadableFile::Open(filePath).Value(&inputStream));
std::shared_ptr<arrow::fs::FileSystem> fs;
std::string filePath;
auto status = arrow::fs::FileSystemFromUriOrPath(fileUriOrPath, &filePath).Value(&fs);
// if not recognized, it is considered as a local relative path
if (!status.ok()) {
filePath = fileUriOrPath;
fs = std::make_shared<arrow::fs::LocalFileSystem>();
}
throwCopyExceptionIfNotOK(fs->OpenInputFile(filePath).Value(&inputStream));

// handle compressed CSV
arrow::Compression::type compression;
std::string extension = fileUriOrPath.substr(fileUriOrPath.rfind('.') + 1);
std::unique_ptr<parquet::Codec> codec;
if (extension == "gz") {
throwCopyExceptionIfNotOK(
arrow::util::Codec::Create(arrow::Compression::GZIP).Value(&codec));
throwCopyExceptionIfNotOK(
arrow::io::CompressedInputStream::Make(codec.get(), std::move(inputStream))
.Value(&inputStream));
} else if (extension != "csv") {
throwCopyExceptionIfNotOK(
arrow::util::Codec::GetCompressionType(extension).Value(&compression));
throwCopyExceptionIfNotOK(arrow::util::Codec::Create(compression).Value(&codec));
throwCopyExceptionIfNotOK(
arrow::io::CompressedInputStream::Make(codec.get(), std::move(inputStream))
.Value(&inputStream));
}

auto csvReadOptions = arrow::csv::ReadOptions::Defaults();
csvReadOptions.block_size = CopyConstants::CSV_READING_BLOCK_SIZE;
if (!tableSchema->isNodeTable) {
Expand Down Expand Up @@ -212,9 +240,13 @@ std::shared_ptr<arrow::csv::StreamingReader> TableCopyUtils::createCSVReader(
}

std::unique_ptr<parquet::arrow::FileReader> TableCopyUtils::createParquetReader(
const std::string& filePath) {
std::shared_ptr<arrow::io::ReadableFile> infile;
throwCopyExceptionIfNotOK(arrow::io::ReadableFile::Open(filePath).Value(&infile));
const std::string& fileUriOrPath) {
std::shared_ptr<arrow::io::RandomAccessFile> infile;
std::shared_ptr<arrow::fs::FileSystem> fs;
std::string filePath;
throwCopyExceptionIfNotOK(
arrow::fs::FileSystemFromUriOrPath(fileUriOrPath, &filePath).Value(&fs));
throwCopyExceptionIfNotOK(fs->OpenInputFile(filePath).Value(&infile));
std::unique_ptr<parquet::arrow::FileReader> reader;
throwCopyExceptionIfNotOK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
Expand Down
5 changes: 5 additions & 0 deletions test/include/test_helper/test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ class TestHelper {
static constexpr char E2E_TEST_FILES_DIRECTORY[] = "test/test_files";
static constexpr char SCHEMA_FILE_NAME[] = "schema.cypher";
static constexpr char COPY_FILE_NAME[] = "copy.cypher";
static constexpr char PARQUET_TEMP_DATASET_PATH[] = "dataset/parquet_temp/";

static std::string getTmpTestDir() { return appendKuzuRootPath("test/unittest_temp/"); }

static std::string appendParquetDatasetTempDir(const std::string& dataset) {
return TestHelper::appendKuzuRootPath(TestHelper::PARQUET_TEMP_DATASET_PATH + dataset);
}

static std::string appendKuzuRootPath(const std::string& path) {
return KUZU_ROOT_DIRECTORY + std::string("/") + path;
}
Expand Down
Loading

0 comments on commit 846d8e9

Please sign in to comment.