Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add direct scan on parquet #2088

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"

using namespace kuzu::common;
using namespace kuzu::parser;
Expand Down Expand Up @@ -120,23 +121,39 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
auto fileType = bindFileType(filePaths);
auto readerConfig =
std::make_unique<ReaderConfig>(fileType, std::move(filePaths), std::move(csvReaderConfig));
if (readerConfig->fileType != FileType::CSV) {
throw BinderException("Load from non-csv file is not supported.");
}
if (readerConfig->getNumFiles() > 1) {
throw BinderException("Load from multiple files is not supported.");
}
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
expression_vector columns;
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
switch (fileType) {
case FileType::CSV: {
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
}
} break;
case FileType::PARQUET: {
auto reader = ParquetReader(readerConfig->filePaths[0], memoryManager);
auto state = std::make_unique<processor::ParquetReaderScanState>();
reader.initializeScan(*state, std::vector<uint64_t>{});
for (auto i = 0u; i < reader.getNumColumns(); ++i) {
auto columnName = reader.getColumnName(i);
auto columnType = reader.getColumnType(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(columnType->copy());
columns.push_back(createVariable(columnName, *columnType));
}
} break;
default:
throw BinderException(StringUtils::string_format(
"Load from {} file is not supported.", FileTypeUtils::toString(fileType)));
}
auto info = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), nullptr, TableType::UNKNOWN);
Expand Down
20 changes: 20 additions & 0 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,25 @@
return fileType;
}

std::string FileTypeUtils::toString(FileType fileType) {
switch (fileType) {
case FileType::UNKNOWN: {
return "UNKNOWN";

Check warning on line 22 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L22

Added line #L22 was not covered by tests
}
case FileType::CSV: {
return "CSV";

Check warning on line 25 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L25

Added line #L25 was not covered by tests
}
case FileType::PARQUET: {
return "PARQUET";

Check warning on line 28 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L28

Added line #L28 was not covered by tests
}
case FileType::NPY: {
return "NPY";

Check warning on line 31 in src/common/copier_config/copier_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/copier_config.cpp#L31

Added line #L31 was not covered by tests
}
case FileType::TURTLE: {
return "TURTLE";
}
}
}

} // namespace common
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct FileTypeUtils {
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);
static std::string toString(FileType fileType);
};

struct ReaderConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ class ParquetReader {
public:
ParquetReader(const std::string& filePath, storage::MemoryManager* memoryManager);
~ParquetReader() = default;

void initializeScan(ParquetReaderScanState& state, std::vector<uint64_t> groups_to_read);
bool scanInternal(ParquetReaderScanState& state, common::DataChunk& result);
void scan(ParquetReaderScanState& state, common::DataChunk& result);

inline uint32_t getNumColumns() const { return columnNames.size(); }
inline std::string getColumnName(uint32_t idx) const { return columnNames[idx]; }
inline common::LogicalType* getColumnType(uint32_t idx) const { return columnTypes[idx].get(); }

inline kuzu_parquet::format::FileMetaData* getMetadata() const { return metadata.get(); }

private:
Expand Down Expand Up @@ -74,6 +80,8 @@ class ParquetReader {
private:
std::unique_ptr<common::FileInfo> fileInfo;
std::string filePath;
std::vector<std::string> columnNames;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
std::unique_ptr<kuzu_parquet::format::FileMetaData> metadata;
storage::MemoryManager* memoryManager;
};
Expand Down
10 changes: 5 additions & 5 deletions src/include/processor/operator/persistent/reader_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ struct RelParquetReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<parquet::arrow::FileReader> reader = nullptr;
};

struct NodeParquetReaderFunctionData : public ReaderFunctionData {
struct ParquetReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<ParquetReader> reader = nullptr;
std::unique_ptr<ParquetReaderScanState> state = nullptr;

inline bool hasMoreToRead() const override {
return !reinterpret_cast<const NodeParquetReaderFunctionData*>(this)
return !reinterpret_cast<const ParquetReaderFunctionData*>(this)
->state->groupIdxList.empty();
}
};
Expand Down Expand Up @@ -85,7 +85,7 @@ struct ReaderFunctions {
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInRelParquetFile(
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNodeParquetFile(
static std::vector<FileBlocksInfo> countRowsInParquetFile(
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNPYFile(
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
Expand All @@ -98,7 +98,7 @@ struct ReaderFunctions {
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static void initRelParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static void initNodeParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
static void initParquetReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
static void initNPYReadData(ReaderFunctionData& funcData, common::vector_idx_t fileIdx,
const common::ReaderConfig& config, storage::MemoryManager* memoryManager);
Expand All @@ -111,7 +111,7 @@ struct ReaderFunctions {
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
static void readRowsFromRelParquetFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNodeParquetFile(const ReaderFunctionData& funcData,
static void readRowsFromParquetFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,18 @@ std::unique_ptr<ColumnReader> ParquetReader::createReader() {
if (metadata->schema[0].num_children == 0) {
throw common::CopyException{"Parquet reader: root schema element has no children"};
}
auto ret = createReaderRecursive(0, 0, 0, nextSchemaIdx, nextFileIdx);
if (ret->getDataType()->getPhysicalType() != common::PhysicalTypeID::STRUCT) {
auto rootReader = createReaderRecursive(0, 0, 0, nextSchemaIdx, nextFileIdx);
if (rootReader->getDataType()->getPhysicalType() != common::PhysicalTypeID::STRUCT) {
throw common::CopyException{"Root element of Parquet file must be a struct"};
}
for (auto& field : common::StructType::getFields(rootReader->getDataType())) {
columnNames.push_back(field->getName());
columnTypes.push_back(field->getType()->copy());
}

assert(nextSchemaIdx == metadata->schema.size() - 1);
assert(metadata->row_groups.empty() || nextFileIdx == metadata->row_groups[0].columns.size());
return ret;
return rootReader;
}

void ParquetReader::prepareRowGroupBuffer(ParquetReaderScanState& state, uint64_t col_idx) {
Expand Down
24 changes: 14 additions & 10 deletions src/processor/operator/persistent/reader_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ count_blocks_func_t ReaderFunctions::getCountBlocksFunc(FileType fileType, Table
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return countRowsInNodeParquetFile;
case TableType::UNKNOWN:
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
return countRowsInParquetFile;
case TableType::REL:
return countRowsInRelParquetFile;
default:
Expand Down Expand Up @@ -73,7 +74,8 @@ init_reader_data_func_t ReaderFunctions::getInitDataFunc(FileType fileType, Tabl
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return initNodeParquetReadData;
case TableType::UNKNOWN:
return initParquetReadData;
case TableType::REL:
return initRelParquetReadData;
default:
Expand Down Expand Up @@ -108,7 +110,8 @@ read_rows_func_t ReaderFunctions::getReadRowsFunc(FileType fileType, common::Tab
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return readRowsFromNodeParquetFile;
case TableType::UNKNOWN:
return readRowsFromParquetFile;
case TableType::REL:
return readRowsFromRelParquetFile;
default:
Expand Down Expand Up @@ -144,7 +147,8 @@ std::shared_ptr<ReaderFunctionData> ReaderFunctions::getReadFuncData(
case FileType::PARQUET: {
switch (tableType) {
case TableType::NODE:
return std::make_shared<NodeParquetReaderFunctionData>();
case TableType::UNKNOWN:
return std::make_shared<ParquetReaderFunctionData>();
case TableType::REL:
return std::make_shared<RelParquetReaderFunctionData>();
default:
Expand Down Expand Up @@ -231,7 +235,7 @@ std::vector<FileBlocksInfo> ReaderFunctions::countRowsInRelParquetFile(
return fileInfos;
}

std::vector<FileBlocksInfo> ReaderFunctions::countRowsInNodeParquetFile(
std::vector<FileBlocksInfo> ReaderFunctions::countRowsInParquetFile(
const common::ReaderConfig& config, MemoryManager* memoryManager) {
std::vector<FileBlocksInfo> fileInfos;
fileInfos.reserve(config.filePaths.size());
Expand Down Expand Up @@ -302,13 +306,13 @@ void ReaderFunctions::initRelParquetReadData(ReaderFunctionData& funcData, vecto
TableCopyUtils::createParquetReader(config.filePaths[fileIdx], config);
}

void ReaderFunctions::initNodeParquetReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx,
void ReaderFunctions::initParquetReadData(ReaderFunctionData& funcData, vector_idx_t fileIdx,
const common::ReaderConfig& config, MemoryManager* memoryManager) {
assert(fileIdx < config.getNumFiles());
funcData.fileIdx = fileIdx;
reinterpret_cast<NodeParquetReaderFunctionData&>(funcData).reader =
reinterpret_cast<ParquetReaderFunctionData&>(funcData).reader =
std::make_unique<ParquetReader>(config.filePaths[fileIdx], memoryManager);
reinterpret_cast<NodeParquetReaderFunctionData&>(funcData).state =
reinterpret_cast<ParquetReaderFunctionData&>(funcData).state =
std::make_unique<ParquetReaderScanState>();
}

Expand Down Expand Up @@ -362,9 +366,9 @@ void ReaderFunctions::readRowsFromRelParquetFile(const ReaderFunctionData& funct
dataChunkToRead->state->selVector->selectedSize = table->num_rows();
}

void ReaderFunctions::readRowsFromNodeParquetFile(const ReaderFunctionData& functionData,
void ReaderFunctions::readRowsFromParquetFile(const ReaderFunctionData& functionData,
block_idx_t blockIdx, common::DataChunk* dataChunkToRead) {
auto& readerData = reinterpret_cast<const NodeParquetReaderFunctionData&>(functionData);
auto& readerData = reinterpret_cast<const ParquetReaderFunctionData&>(functionData);
if (blockIdx != UINT64_MAX &&
(readerData.state->groupIdxList.empty() || readerData.state->groupIdxList[0] != blockIdx)) {
readerData.reader->initializeScan(*readerData.state, {blockIdx});
Expand Down
30 changes: 29 additions & 1 deletion test/test_files/tinysnb/load_from/load_from.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,36 @@
-DATASET CSV tinysnb

--
-CASE LoadFromParquetTest1
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet" RETURN * ORDER BY id LIMIT 5;
---- 5
0|73|3.258507|True|1994-01-12|FrPZkcHFuepVxcAiMwyAsRqDlRtQx|[65,25]|[4deQc5]|[[163,237],[28,60,77,31,137],[286,186,249,206]]|{id: 764, name: CwFRaCoEp}
1|17|45.692842|False|2077-04-16|La|[64,41]|[Yq029g79TUAiq9VA,5h5ozRjtfsxbtCeb,2WLnSHVZojagYe,3HsiFD7b7DRk6n]|[[189,84,16],[143,135],[284,182,219,45],[250,143,195,210,244],[31,85]]|{id: 461, name: PmAvlzC0MVN2kr5}
2|30|13.397253|True|2015-01-06|uQJCBEePLuGkoAp|[47,27,57,46]|[INx9T8cF,fQds,GVbSmwovuURxXiRQ6vI3]|[[89,232],[186,224],[278,106,154]]|{id: 275, name: LeJHI4vdgjFDl}
3|4|3.174669|True|2104-03-14|fjyKxMjhXXgCkZmwBACpRrjNHlhrDtkQPl|[58,77,66,48]|[SUFT8NmyhMQ,DaTDnzkotQ2pjvdCN]|[[44]]|{id: 545, name: 0jhUkRv7R8}
4|99|17.608944|True|2089-10-27||[78,93,50,3]|[7Jyqki,Y0FQsTGx,7LqWTypucemvMYm,t5spe07tWSCJ]|[[267,172,283],[74,37],[148,62,96,47],[277,95]]|{id: 460, name: 1e6nIx}
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet" RETURN id, column1, column2 ORDER BY column1, id LIMIT 3;
---- 3
20|0|57.579280
40|0|62.634335
57|0|50.232784
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_0.parquet"
WITH id AS a, column4 AS b ORDER BY a LIMIT 1
MATCH (p:person) WHERE p.ID > a
RETURN p.fName, a, b
---- 7
Bob|0|1994-01-12
Carol|0|1994-01-12
Dan|0|1994-01-12
Elizabeth|0|1994-01-12
Farooq|0|1994-01-12
Greg|0|1994-01-12
Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|0|1994-01-12
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/rdf/taxonomy.ttl" RETURN COUNT(*)
---- error
Binder exception: Load from TURTLE file is not supported.

-CASE LoadFromTest1
-CASE LoadFromCSVTest
-STATEMENT LOAD FROM "${KUZU_ROOT_DIRECTORY}/dataset/tinysnb/eStudyAt.csv" (HEADER=True) RETURN column0, column1, column2, column3;
---- 3
0|1|2021|[wwAewsdndweusd,wek]
Expand Down