Skip to content

Commit

Permalink
Merge pull request #2088 from kuzudb/direct-scan-parquet
Browse files Browse the repository at this point in the history
Add direct scan on parquet
  • Loading branch information
andyfengHKU committed Sep 26, 2023
2 parents 5cbb422 + d5fe4d3 commit 58ba29b
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 32 deletions.
43 changes: 30 additions & 13 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"

using namespace kuzu::common;
using namespace kuzu::parser;
Expand Down Expand Up @@ -120,23 +121,39 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
auto fileType = bindFileType(filePaths);
auto readerConfig =
std::make_unique<ReaderConfig>(fileType, std::move(filePaths), std::move(csvReaderConfig));
if (readerConfig->fileType != FileType::CSV) {
throw BinderException("Load from non-csv file is not supported.");
}
if (readerConfig->getNumFiles() > 1) {
throw BinderException("Load from multiple files is not supported.");
}
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
expression_vector columns;
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
switch (fileType) {
case FileType::CSV: {
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
}
} break;
case FileType::PARQUET: {
auto reader = ParquetReader(readerConfig->filePaths[0], memoryManager);
auto state = std::make_unique<processor::ParquetReaderScanState>();
reader.initializeScan(*state, std::vector<uint64_t>{});
for (auto i = 0u; i < reader.getNumColumns(); ++i) {
auto columnName = reader.getColumnName(i);
auto columnType = reader.getColumnType(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(columnType->copy());
columns.push_back(createVariable(columnName, *columnType));
}
} break;
default:
throw BinderException(StringUtils::string_format(
"Load from {} file is not supported.", FileTypeUtils::toString(fileType)));
}
auto info = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), nullptr, TableType::UNKNOWN);
Expand Down
20 changes: 20 additions & 0 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,25 @@ FileType FileTypeUtils::getFileTypeFromExtension(const std::string& extension) {
return fileType;
}

std::string FileTypeUtils::toString(FileType fileType) {
switch (fileType) {
case FileType::UNKNOWN: {
return "UNKNOWN";
}
case FileType::CSV: {
return "CSV";
}
case FileType::PARQUET: {
return "PARQUET";
}
case FileType::NPY: {
return "NPY";
}
case FileType::TURTLE: {
return "TURTLE";
}
}
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct FileTypeUtils {
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);
static std::string toString(FileType fileType);
};

struct ReaderConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ class ParquetReader {
public:
ParquetReader(const std::string& filePath, storage::MemoryManager* memoryManager);
~ParquetReader() = default;

void initializeScan(ParquetReaderScanState& state, std::vector<uint64_t> groups_to_read);
bool scanInternal(ParquetReaderScanState& state, common::DataChunk& result);
void scan(ParquetReaderScanState& state, common::DataChunk& result);

inline uint32_t getNumColumns() const { return columnNames.size(); }
inline std::string getColumnName(uint32_t idx) const { return columnNames[idx]; }
inline common::LogicalType* getColumnType(uint32_t idx) const { return columnTypes[idx].get(); }

inline kuzu_parquet::format::FileMetaData* getMetadata() const { return metadata.get(); }

private:
Expand Down Expand Up @@ -74,6 +80,8 @@ class ParquetReader {
private:
std::unique_ptr<common::FileInfo> fileInfo;
std::string filePath;
std::vector<std::string> columnNames;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
std::unique_ptr<kuzu_parquet::format::FileMetaData> metadata;
storage::MemoryManager* memoryManager;
};
Expand Down
10 changes: 5 additions & 5 deletions src/include/processor/operator/persistent/reader_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ struct RelParquetReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<parquet::arrow::FileReader> reader = nullptr;
};

struct NodeParquetReaderFunctionData : public ReaderFunctionData {
struct ParquetReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<ParquetReader> reader = nullptr;
std::unique_ptr<ParquetReaderScanState> state = nullptr;

inline bool hasMoreToRead() const override {
return !reinterpret_cast<const NodeParquetReaderFunctionData*>(this)
return !reinterpret_cast<const ParquetReaderFunctionData*>(this)
->state->groupIdxList.empty();
}
};
Expand Down Expand Up @@ -85,7 +85,7 @@ struct ReaderFunctions {
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInRelParquetFile(
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNodeParquetFile(
static std::vector<FileBlocksInfo> countRowsInParquetFile(
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNPYFile(
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
Expand All @@ -98,7 +98,7 @@ struct ReaderFunctions {
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static void initRelParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static void initNodeParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
static void initParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static void initNPYReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
Expand All @@ -111,7 +111,7 @@ struct ReaderFunctions {
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
static void readRowsFromRelParquetFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNodeParquetFile(const ReaderFunctionData& funcData,
static void readRowsFromParquetFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,18 @@ std::unique_ptr<ColumnReader> ParquetReader::createReader() {
if (metadata->schema[0].num_children == 0) {
throw common::CopyException{"Parquet reader: root schema element has no children"};
}
auto ret = createReaderRecursive(0, 0, 0, nextSchemaIdx, nextFileIdx);
if (ret->getDataType()->getPhysicalType() != common::PhysicalTypeID::STRUCT) {
auto rootReader = createReaderRecursive(0, 0, 0, nextSchemaIdx, nextFileIdx);
if (rootReader->getDataType()->getPhysicalType() != common::PhysicalTypeID::STRUCT) {
throw common::CopyException{"Root element of Parquet file must be a struct"};
}
for (auto& field : common::StructType::getFields(rootReader->getDataType())) {
columnNames.push_back(field->getName());
columnTypes.push_back(field->getType()->copy());
}

assert(nextSchemaIdx == metadata->schema.size() - 1);
assert(metadata->row_groups.empty() || nextFileIdx == metadata->row_groups[0].columns.size());
return ret;
return rootReader;
}

void ParquetReader::prepareRowGroupBuffer(ParquetReaderScanState& state, uint64_t col_idx) {
Expand Down
24 changes: 14 additions & 10 deletions src/processor/operator/persistent/reader_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ count_blocks_func_t ReaderFunctions::getCountBlocksFunc(FileType fileType, Table
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return countRowsInNodeParquetFile;
case TableType::UNKNOWN:
return countRowsInParquetFile;
case TableType::REL:
return countRowsInRelParquetFile;
default:
Expand Down Expand Up @@ -73,7 +74,8 @@ init_reader_data_func_t ReaderFunctions::getInitDataFunc(FileType fileType, Tabl
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return initNodeParquetReadData;
case TableType::UNKNOWN:
return initParquetReadData;
case TableType::REL:
return initRelParquetReadData;
default:
Expand Down Expand Up @@ -108,7 +110,8 @@ read_rows_func_t ReaderFunctions::getReadRowsFunc(FileType fileType, common::Tab
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return readRowsFromNodeParquetFile;
case TableType::UNKNOWN:
return readRowsFromParquetFile;
case TableType::REL:
return readRowsFromRelParquetFile;
default:
Expand Down Expand Up @@ -144,7 +147,8 @@ std::shared_ptr<ReaderFunctionData> ReaderFunctions::getReadFuncData(
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return std::make_shared<NodeParquetReaderFunctionData>();
case TableType::UNKNOWN:
return std::make_shared<ParquetReaderFunctionData>();
case TableType::REL:
return std::make_shared<RelParquetReaderFunctionData>();
default:
Expand Down Expand Up @@ -231,7 +235,7 @@ std::vector<FileBlocksInfo> ReaderFunctions::countRowsInRelParquetFile(
return fileInfos;
}

std::vector<FileBlocksInfo> ReaderFunctions::countRowsInNodeParquetFile(
std::vector<FileBlocksInfo> ReaderFunctions::countRowsInParquetFile(
const common::ReaderConfig& config, MemoryManager* memoryManager) {
std::vector<FileBlocksInfo> fileInfos;
fileInfos.reserve(config.filePaths.size());
Expand Down Expand Up @@ -302,13 +306,13 @@ void ReaderFunctions::initRelParquetReadData(ReaderFunctionData& funcData, vecto
TableCopyUtils::createParquetReader(config.filePaths[fileIdx], config);
}

void ReaderFunctions::initNodeParquetReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx,
void ReaderFunctions::initParquetReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx,
const common::ReaderConfig& config, MemoryManager* memoryManager) {
assert(fileIdx < config.getNumFiles());
funcData.fileIdx = fileIdx;
reinterpret_cast<NodeParquetReaderFunctionData&>(funcData).reader =
reinterpret_cast<ParquetReaderFunctionData&>(funcData).reader =
std::make_unique<ParquetReader>(config.filePaths[fileIdx], memoryManager);
reinterpret_cast<NodeParquetReaderFunctionData&>(funcData).state =
reinterpret_cast<ParquetReaderFunctionData&>(funcData).state =
std::make_unique<ParquetReaderScanState>();
}

Expand Down Expand Up @@ -362,9 +366,9 @@ void ReaderFunctions::readRowsFromRelParquetFile(const ReaderFunctionData& funct
dataChunkToRead->state->selVector->selectedSize = table->num_rows();
}

void ReaderFunctions::readRowsFromNodeParquetFile(const ReaderFunctionData& functionData,
void ReaderFunctions::readRowsFromParquetFile(const ReaderFunctionData& functionData,
block_idx_t blockIdx, common::DataChunk* dataChunkToRead) {
auto& readerData = reinterpret_cast<const NodeParquetReaderFunctionData&>(functionData);
auto& readerData = reinterpret_cast<const ParquetReaderFunctionData&>(functionData);
if (blockIdx != UINT64_MAX &&
(readerData.state->groupIdxList.empty() || readerData.state->groupIdxList[0] != blockIdx)) {
readerData.reader->initializeScan(*readerData.state, {blockIdx});
Expand Down
30 changes: 29 additions & 1 deletion test/test_files/tinysnb/load_from/load_from.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,36 @@
-DATASET CSV tinysnb

--
-CASE LoadFromParquetTest1
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet" RETURN * ORDER BY id LIMIT 5;
---- 5
0|73|3.258507|True|1994-01-12|FrPZkcHFuepVxcAiMwyAsRqDlRtQx|[65,25]|[4deQc5]|[[163,237],[28,60,77,31,137],[286,186,249,206]]|{id: 764, name: CwFRaCoEp}
1|17|45.692842|False|2077-04-16|La|[64,41]|[Yq029g79TUAiq9VA,5h5ozRjtfsxbtCeb,2WLnSHVZojagYe,3HsiFD7b7DRk6n]|[[189,84,16],[143,135],[284,182,219,45],[250,143,195,210,244],[31,85]]|{id: 461, name: PmAvlzC0MVN2kr5}
2|30|13.397253|True|2015-01-06|uQJCBEePLuGkoAp|[47,27,57,46]|[INx9T8cF,fQds,GVbSmwovuURxXiRQ6vI3]|[[89,232],[186,224],[278,106,154]]|{id: 275, name: LeJHI4vdgjFDl}
3|4|3.174669|True|2104-03-14|fjyKxMjhXXgCkZmwBACpRrjNHlhrDtkQPl|[58,77,66,48]|[SUFT8NmyhMQ,DaTDnzkotQ2pjvdCN]|[[44]]|{id: 545, name: 0jhUkRv7R8}
4|99|17.608944|True|2089-10-27||[78,93,50,3]|[7Jyqki,Y0FQsTGx,7LqWTypucemvMYm,t5spe07tWSCJ]|[[267,172,283],[74,37],[148,62,96,47],[277,95]]|{id: 460, name: 1e6nIx}
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet" RETURN id, column1, column2 ORDER BY column1, id LIMIT 3;
---- 3
20|0|57.579280
40|0|62.634335
57|0|50.232784
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet"
WITH id AS a, column4 AS b ORDER BY a LIMIT 1
MATCH (p:person) WHERE p.ID > a
RETURN p.fName, a, b
---- 7
Bob|0|1994-01-12
Carol|0|1994-01-12
Dan|0|1994-01-12
Elizabeth|0|1994-01-12
Farooq|0|1994-01-12
Greg|0|1994-01-12
Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0|1994-01-12
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/rdf/taxonomy.ttl" RETURN COUNT(*)
---- error
Binder exception: Load from TURTLE file is not supported.

-CASE LoadFromTest1
-CASE LoadFromCSVTest
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eStudyAt.csv" (HEADER=True) RETURN column0, column1, column2, column3;
---- 3
0|1|2021|[wwAewsdndweusd,wek]
Expand Down

0 comments on commit 58ba29b

Please sign in to comment.