Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to copy from multiple parquet files #1377

Merged
merged 1 commit into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataset/copy-test/node/parquet/copy.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/parquet/types_50k.parquet" (HEADER=true);
COPY tableOfTypes FROM "dataset/copy-test/node/parquet/types_50k*.parquet" (HEADER=true);
Binary file removed dataset/copy-test/node/parquet/types_50k.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3 changes: 0 additions & 3 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
// types is not supported.
auto fileName = filePaths[0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto arrowSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::ARROW);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
} else if (fileName.ends_with(arrowSuffix)) {
return CopyDescription::FileType::ARROW;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
} else {
Expand Down
3 changes: 0 additions & 3 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ std::string CopyDescription::getFileTypeName(FileType fileType) {
case FileType::CSV: {
return "csv";
}
case FileType::ARROW: {
return "arrow";
}
case FileType::PARQUET: {
return "parquet";
}
Expand Down
1 change: 1 addition & 0 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ std::vector<std::string> FileUtils::globFilePath(const std::string& path) {
for (auto i = 0u; i < globResult.gl_pathc; ++i) {
result.emplace_back(globResult.gl_pathv[i]);
}
globfree(&globResult);
return result;
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType { CSV, ARROW, PARQUET };
enum class FileType : uint8_t { CSV = 0, PARQUET = 1 };

CopyDescription(const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig,
FileType fileType);
Expand Down
12 changes: 7 additions & 5 deletions src/include/storage/copy_arrow/copy_node_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ class CopyNodeArrow : public CopyStructuresArrow {
arrow::Status populateColumns();

template<typename T>
arrow::Status populateColumnsFromFiles(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromArrow(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
arrow::Status populateColumnsFromCSV(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromParquet(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
Expand All @@ -58,7 +55,12 @@ class CopyNodeArrow : public CopyStructuresArrow {
const std::vector<std::shared_ptr<T2>>& batchColumns, std::string filePath);

template<typename T>
arrow::Status assignCopyTasks(std::shared_ptr<arrow::csv::StreamingReader>& csvStreamingReader,
arrow::Status assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

Expand Down
2 changes: 0 additions & 2 deletions src/include/storage/copy_arrow/copy_rel_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class CopyRelArrow : public CopyStructuresArrow {

arrow::Status populateFromCSV(PopulateTaskType populateTaskType);

arrow::Status populateFromArrow(PopulateTaskType populateTaskType);

arrow::Status populateFromParquet(PopulateTaskType populateTaskType);

void populateAdjColumnsAndCountRelsInAdjLists();
Expand Down
4 changes: 1 addition & 3 deletions src/include/storage/copy_arrow/copy_structures_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ class CopyStructuresArrow {

arrow::Status countNumLinesCSV(const std::vector<std::string>& filePaths);

arrow::Status countNumLinesArrow(std::string const& filePath);

arrow::Status countNumLinesParquet(std::string const& filePath);
arrow::Status countNumLinesParquet(const std::vector<std::string>& filePaths);

arrow::Status initCSVReaderAndCheckStatus(
std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
Expand Down
110 changes: 42 additions & 68 deletions src/storage/copy_arrow/copy_node_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ arrow::Status CopyNodeArrow::populateColumns() {
arrow::Status status;
switch (copyDescription.fileType) {
case CopyDescription::FileType::CSV:
status = populateColumnsFromFiles<T>(pkIndex);
break;
case CopyDescription::FileType::ARROW:
status = populateColumnsFromArrow<T>(pkIndex);
status = populateColumnsFromCSV<T>(pkIndex);
break;
case CopyDescription::FileType::PARQUET:
status = populateColumnsFromParquet<T>(pkIndex);
Expand All @@ -77,75 +74,27 @@ arrow::Status CopyNodeArrow::populateColumns() {
}

template<typename T>
arrow::Status CopyNodeArrow::populateColumnsFromFiles(
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
arrow::Status CopyNodeArrow::populateColumnsFromCSV(std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
for (auto& filePath : copyDescription.filePaths) {
offset_t startOffset = fileBlockInfos.at(filePath).startOffset;
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
auto status = initCSVReaderAndCheckStatus(csvStreamingReader, filePath);
status = assignCopyTasks<T>(csvStreamingReader, startOffset, filePath, pkIndex);
status = assignCopyCSVTasks<T>(
csvStreamingReader.get(), fileBlockInfos.at(filePath).startOffset, filePath, pkIndex);
throwCopyExceptionIfNotOK(status);
}
return arrow::Status::OK();
}

template<typename T>
arrow::Status CopyNodeArrow::populateColumnsFromArrow(
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc_reader;
auto status = initArrowReaderAndCheckStatus(ipc_reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::RecordBatch> currBatch;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocksInFile = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocksInFile) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocksInFile) {
break;
}
ARROW_ASSIGN_OR_RAISE(currBatch, ipc_reader->ReadRecordBatch(blockIdx));
taskScheduler.scheduleTask(
CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::Array>,
reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
startOffset, pkIndex.get(), this, currBatch->columns(),
copyDescription.filePaths[0]));
startOffset += currBatch->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}
taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

template<typename T>
arrow::Status CopyNodeArrow::populateColumnsFromParquet(
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::Table> currTable;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocks = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocks) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
}
ARROW_RETURN_NOT_OK(reader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(
CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::ChunkedArray>,
reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
startOffset, pkIndex.get(), this, currTable->columns(),
copyDescription.filePaths[0]));
startOffset += currTable->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, filePath);
status = assignCopyParquetTasks<T>(
reader.get(), fileBlockInfos.at(filePath).startOffset, filePath, pkIndex);
throwCopyExceptionIfNotOK(status);
}
taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

Expand Down Expand Up @@ -185,8 +134,7 @@ arrow::Status CopyNodeArrow::batchPopulateColumnsTask(uint64_t primaryKeyPropert
}
populatePKIndex(
copier->columns[primaryKeyPropertyIdx].get(), pkIndex, startOffset, numLinesInCurBlock);
copier->logger->trace(
"End: path={0} blkIdx={1}", copier->copyDescription.filePaths[0], blockIdx);
copier->logger->trace("End: path={0} blkIdx={1}", filePath, blockIdx);
return arrow::Status::OK();
}

Expand Down Expand Up @@ -266,11 +214,10 @@ void CopyNodeArrow::putPropsOfLineIntoColumns(
}

template<typename T>
arrow::Status CopyNodeArrow::assignCopyTasks(
std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader, offset_t startOffset,
std::string filePath, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
auto it = csv_streaming_reader->begin();
auto endIt = csv_streaming_reader->end();
arrow::Status CopyNodeArrow::assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
offset_t startOffset, std::string filePath, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
auto it = csvStreamingReader->begin();
auto endIt = csvStreamingReader->end();
std::shared_ptr<arrow::RecordBatch> currBatch;
int blockIdx = 0;
while (it != endIt) {
Expand All @@ -294,5 +241,32 @@ arrow::Status CopyNodeArrow::assignCopyTasks(
return arrow::Status::OK();
}

template<typename T>
arrow::Status CopyNodeArrow::assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
auto numBlocks = fileBlockInfos.at(filePath).numBlocks;
auto blockIdx = 0u;
std::shared_ptr<arrow::Table> currTable;
while (blockIdx < numBlocks) {
for (int i = 0; i < common::CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
}
ARROW_RETURN_NOT_OK(parquetReader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(
CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::ChunkedArray>,
reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
startOffset, pkIndex.get(), this, currTable->columns(), filePath));
startOffset += currTable->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
common::CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}
taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

} // namespace storage
} // namespace kuzu
87 changes: 25 additions & 62 deletions src/storage/copy_arrow/copy_rel_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ arrow::Status CopyRelArrow::executePopulateTask(PopulateTaskType populateTaskTyp
case CopyDescription::FileType::CSV: {
status = populateFromCSV(populateTaskType);
} break;
case CopyDescription::FileType::ARROW: {
status = populateFromArrow(populateTaskType);
} break;
case CopyDescription::FileType::PARQUET: {
status = populateFromParquet(populateTaskType);
} break;
Expand Down Expand Up @@ -230,67 +227,37 @@ arrow::Status CopyRelArrow::populateFromCSV(PopulateTaskType populateTaskType) {
return arrow::Status::OK();
}

arrow::Status CopyRelArrow::populateFromArrow(PopulateTaskType populateTaskType) {
auto populateTask = populateAdjColumnsAndCountRelsInAdjListsTask<arrow::Array>;
if (populateTaskType == PopulateTaskType::populateListsTask) {
populateTask = populateListsTask<arrow::Array>;
}
logger->debug("Assigning task {0}", getTaskTypeName(populateTaskType));

std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc_reader;
auto status = initArrowReaderAndCheckStatus(ipc_reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::RecordBatch> currBatch;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocksInFile = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocksInFile) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocksInFile) {
break;
}
ARROW_ASSIGN_OR_RAISE(currBatch, ipc_reader->ReadRecordBatch(blockIdx));
taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask(populateTask, blockIdx,
startOffset, this, currBatch->columns(), copyDescription.filePaths[0]));
startOffset += currBatch->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}

taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

arrow::Status CopyRelArrow::populateFromParquet(PopulateTaskType populateTaskType) {
auto populateTask = populateAdjColumnsAndCountRelsInAdjListsTask<arrow::ChunkedArray>;
if (populateTaskType == PopulateTaskType::populateListsTask) {
populateTask = populateListsTask<arrow::ChunkedArray>;
}
logger->debug("Assigning task {0}", getTaskTypeName(populateTaskType));

std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::Table> currTable;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocks = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocks) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, filePath);
std::shared_ptr<arrow::Table> currTable;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocks = fileBlockInfos.at(filePath).numBlocks;
while (blockIdx < numBlocks) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
}
ARROW_RETURN_NOT_OK(reader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask(
populateTask, blockIdx, startOffset, this, currTable->columns(), filePath));
startOffset += currTable->num_rows();
++blockIdx;
}
ARROW_RETURN_NOT_OK(reader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask(populateTask, blockIdx,
startOffset, this, currTable->columns(), copyDescription.filePaths[0]));
startOffset += currTable->num_rows();
++blockIdx;
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}

taskScheduler.waitAllTasksToCompleteOrError();
taskScheduler.waitAllTasksToCompleteOrError();
}
return arrow::Status::OK();
}

Expand Down Expand Up @@ -624,8 +591,7 @@ template<typename T>
void CopyRelArrow::populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t blockIdx,
uint64_t blockStartRelID, CopyRelArrow* copier,
const std::vector<std::shared_ptr<T>>& batchColumns, const std::string& filePath) {
copier->logger->debug(
"Start: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockIdx);
copier->logger->debug("Start: path=`{0}` blkIdx={1}", filePath, blockIdx);
std::vector<bool> requireToReadTableLabels{true, true};
std::vector<nodeID_t> nodeIDs{2};
std::vector<DataType> nodePKTypes{2};
Expand Down Expand Up @@ -674,16 +640,14 @@ void CopyRelArrow::populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t blockId
copier->propertyColumnsPerDirection, nodeIDs, (uint8_t*)&relID);
relID++;
}
copier->logger->debug(
"End: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockIdx);
copier->logger->debug("End: path=`{0}` blkIdx={1}", filePath, blockIdx);
}

template<typename T>
void CopyRelArrow::populateListsTask(uint64_t blockId, uint64_t blockStartRelID,
CopyRelArrow* copier, const std::vector<std::shared_ptr<T>>& batchColumns,
const std::string& filePath) {
copier->logger->trace(
"Start: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockId);
copier->logger->trace("Start: path=`{0}` blkIdx={1}", filePath, blockId);
std::vector<nodeID_t> nodeIDs(2);
std::vector<DataType> nodePKTypes(2);
std::vector<uint64_t> reversePos(2);
Expand Down Expand Up @@ -723,8 +687,7 @@ void CopyRelArrow::populateListsTask(uint64_t blockId, uint64_t blockStartRelID,
(uint8_t*)&relID);
relID++;
}
copier->logger->trace(
"End: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockId);
copier->logger->trace("End: path=`{0}` blkIdx={1}", filePath, blockId);
}

void CopyRelArrow::sortOverflowValuesOfPropertyColumnTask(const DataType& dataType,
Expand Down
Loading