Skip to content

Commit

Permalink
Add direct scan on parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Sep 26, 2023
1 parent 4628776 commit bf13170
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 17 deletions.
40 changes: 27 additions & 13 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"

using namespace kuzu::common;
using namespace kuzu::parser;
Expand Down Expand Up @@ -120,23 +121,36 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
auto fileType = bindFileType(filePaths);
auto readerConfig =
std::make_unique<ReaderConfig>(fileType, std::move(filePaths), std::move(csvReaderConfig));
if (readerConfig->fileType != FileType::CSV) {
throw BinderException("Load from non-csv file is not supported.");
}
if (readerConfig->getNumFiles() > 1) {
throw BinderException("Load from multiple files is not supported.");
}
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
expression_vector columns;
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
if (fileType == FileType::CSV) {
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
}
} else if (fileType == FileType::PARQUET) {
auto reader = ParquetReader(readerConfig->filePaths[0], memoryManager);
auto state = std::make_unique<processor::ParquetReaderScanState>();
reader.initializeScan(*state, std::vector<uint64_t>{});
for (auto i = 0u; i < reader.getNumColumns(); ++i) {
auto columnName = reader.getColumnName(i);
auto columnType = reader.getColumnType(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(columnType->copy());
columns.push_back(createVariable(columnName, *columnType));
}
} else {
throw BinderException(StringUtils::string_format(
"Load from {} file is not supported.", FileTypeUtils::toString(fileType)));

Check warning on line 153 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L152-L153

Added lines #L152 - L153 were not covered by tests
}
auto info = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), nullptr, TableType::UNKNOWN);
Expand Down
20 changes: 20 additions & 0 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,25 @@ FileType FileTypeUtils::getFileTypeFromExtension(const std::string& extension) {
return fileType;
}

std::string FileTypeUtils::toString(FileType fileType) {
switch (fileType) {

Check warning on line 20 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L19-L20

Added lines #L19 - L20 were not covered by tests
case FileType::UNKNOWN: {
return "UNKNOWN";

Check warning on line 22 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L22

Added line #L22 was not covered by tests
}
case FileType::CSV: {
return "CSV";

Check warning on line 25 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L25

Added line #L25 was not covered by tests
}
case FileType::PARQUET: {
return "PARQUET";

Check warning on line 28 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L28

Added line #L28 was not covered by tests
}
case FileType::NPY: {
return "NPY";

Check warning on line 31 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L31

Added line #L31 was not covered by tests
}
case FileType::TURTLE: {
return "TURTLE";

Check warning on line 34 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L34

Added line #L34 was not covered by tests
}
}
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct FileTypeUtils {
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);
static std::string toString(FileType fileType);
};

struct ReaderConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ class ParquetReader {
public:
ParquetReader(const std::string& filePath, storage::MemoryManager* memoryManager);
~ParquetReader() = default;

void initializeScan(ParquetReaderScanState& state, std::vector<uint64_t> groups_to_read);
bool scanInternal(ParquetReaderScanState& state, common::DataChunk& result);
void scan(ParquetReaderScanState& state, common::DataChunk& result);

inline uint32_t getNumColumns() const { return columnNames.size(); }
inline std::string getColumnName(uint32_t idx) const { return columnNames[idx]; }
inline common::LogicalType* getColumnType(uint32_t idx) const { return columnTypes[idx].get(); }

inline kuzu_parquet::format::FileMetaData* getMetadata() const { return metadata.get(); }

private:
Expand Down Expand Up @@ -74,6 +80,8 @@ class ParquetReader {
private:
std::unique_ptr<common::FileInfo> fileInfo;
std::string filePath;
std::vector<std::string> columnNames;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
std::unique_ptr<kuzu_parquet::format::FileMetaData> metadata;
storage::MemoryManager* memoryManager;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,18 @@ std::unique_ptr<ColumnReader> ParquetReader::createReader() {
if (metadata->schema[0].num_children == 0) {
throw common::CopyException{"Parquet reader: root schema element has no children"};
}
auto ret = createReaderRecursive(0, 0, 0, nextSchemaIdx, nextFileIdx);
if (ret->getDataType()->getPhysicalType() != common::PhysicalTypeID::STRUCT) {
auto rootReader = createReaderRecursive(0, 0, 0, nextSchemaIdx, nextFileIdx);
if (rootReader->getDataType()->getPhysicalType() != common::PhysicalTypeID::STRUCT) {
throw common::CopyException{"Root element of Parquet file must be a struct"};
}
for (auto& field : common::StructType::getFields(rootReader->getDataType())) {
columnNames.push_back(field->getName());
columnTypes.push_back(field->getType()->copy());
}

assert(nextSchemaIdx == metadata->schema.size() - 1);
assert(metadata->row_groups.empty() || nextFileIdx == metadata->row_groups[0].columns.size());
return ret;
return rootReader;
}

void ParquetReader::prepareRowGroupBuffer(ParquetReaderScanState& state, uint64_t col_idx) {
Expand Down
4 changes: 4 additions & 0 deletions src/processor/operator/persistent/reader_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ count_blocks_func_t ReaderFunctions::getCountBlocksFunc(FileType fileType, Table
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
case TableType::UNKNOWN:
return countRowsInNodeParquetFile;
case TableType::REL:
return countRowsInRelParquetFile;
Expand Down Expand Up @@ -73,6 +74,7 @@ init_reader_data_func_t ReaderFunctions::getInitDataFunc(FileType fileType, Tabl
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
case TableType::UNKNOWN:
return initNodeParquetReadData;
case TableType::REL:
return initRelParquetReadData;
Expand Down Expand Up @@ -108,6 +110,7 @@ read_rows_func_t ReaderFunctions::getReadRowsFunc(FileType fileType, common::Tab
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
case TableType::UNKNOWN:
return readRowsFromNodeParquetFile;
case TableType::REL:
return readRowsFromRelParquetFile;
Expand Down Expand Up @@ -144,6 +147,7 @@ std::shared_ptr<ReaderFunctionData> ReaderFunctions::getReadFuncData(
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
case TableType::UNKNOWN:
return std::make_shared<NodeParquetReaderFunctionData>();
case TableType::REL:
return std::make_shared<RelParquetReaderFunctionData>();
Expand Down
27 changes: 26 additions & 1 deletion test/test_files/tinysnb/load_from/load_from.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,33 @@
-DATASET CSV tinysnb

--
-CASE LoadFromParquetTest1
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet" RETURN * ORDER BY id LIMIT 5;
---- 5
0|73|3.258507|True|1994-01-12|FrPZkcHFuepVxcAiMwyAsRqDlRtQx|[65,25]|[4deQc5]|[[163,237],[28,60,77,31,137],[286,186,249,206]]|{id: 764, name: CwFRaCoEp}
1|17|45.692842|False|2077-04-16|La|[64,41]|[Yq029g79TUAiq9VA,5h5ozRjtfsxbtCeb,2WLnSHVZojagYe,3HsiFD7b7DRk6n]|[[189,84,16],[143,135],[284,182,219,45],[250,143,195,210,244],[31,85]]|{id: 461, name: PmAvlzC0MVN2kr5}
2|30|13.397253|True|2015-01-06|uQJCBEePLuGkoAp|[47,27,57,46]|[INx9T8cF,fQds,GVbSmwovuURxXiRQ6vI3]|[[89,232],[186,224],[278,106,154]]|{id: 275, name: LeJHI4vdgjFDl}
3|4|3.174669|True|2104-03-14|fjyKxMjhXXgCkZmwBACpRrjNHlhrDtkQPl|[58,77,66,48]|[SUFT8NmyhMQ,DaTDnzkotQ2pjvdCN]|[[44]]|{id: 545, name: 0jhUkRv7R8}
4|99|17.608944|True|2089-10-27||[78,93,50,3]|[7Jyqki,Y0FQsTGx,7LqWTypucemvMYm,t5spe07tWSCJ]|[[267,172,283],[74,37],[148,62,96,47],[277,95]]|{id: 460, name: 1e6nIx}
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet" RETURN id, column1, column2 ORDER BY column1, id LIMIT 3;
---- 3
20|0|57.579280
40|0|62.634335
57|0|50.232784
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet"
WITH id AS a, column4 AS b ORDER BY a LIMIT 1
MATCH (p:person) WHERE p.ID > a
RETURN p.fName, a, b
---- 7
Bob|0|1994-01-12
Carol|0|1994-01-12
Dan|0|1994-01-12
Elizabeth|0|1994-01-12
Farooq|0|1994-01-12
Greg|0|1994-01-12
Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0|1994-01-12

-CASE LoadFromTest1
-CASE LoadFromCSVTest
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eStudyAt.csv" (HEADER=True) RETURN column0, column1, column2, column3;
---- 3
0|1|2021|[wwAewsdndweusd,wek]
Expand Down

0 comments on commit bf13170

Please sign in to comment.