Skip to content

Commit

Permalink
Add support to copy from multiple parquet files
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Mar 14, 2023
1 parent 3ec4124 commit 033e784
Show file tree
Hide file tree
Showing 14 changed files with 95 additions and 180 deletions.
2 changes: 1 addition & 1 deletion dataset/copy-test/node/parquet/copy.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/parquet/types_50k.parquet" (HEADER=true);
COPY tableOfTypes FROM "dataset/copy-test/node/parquet/types_50k*.parquet" (HEADER=true);
Binary file removed dataset/copy-test/node/parquet/types_50k.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3 changes: 0 additions & 3 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
// types is not supported.
auto fileName = filePaths[0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto arrowSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::ARROW);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
} else if (fileName.ends_with(arrowSuffix)) {
return CopyDescription::FileType::ARROW;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
} else {
Expand Down
3 changes: 0 additions & 3 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ std::string CopyDescription::getFileTypeName(FileType fileType) {
case FileType::CSV: {
return "csv";
}
case FileType::ARROW: {
return "arrow";
}
case FileType::PARQUET: {
return "parquet";
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType { CSV, ARROW, PARQUET };
enum class FileType : uint8_t { CSV = 0, PARQUET = 1 };

CopyDescription(const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig,
FileType fileType);
Expand Down
12 changes: 7 additions & 5 deletions src/include/storage/copy_arrow/copy_node_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ class CopyNodeArrow : public CopyStructuresArrow {
arrow::Status populateColumns();

template<typename T>
arrow::Status populateColumnsFromFiles(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromArrow(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
arrow::Status populateColumnsFromCSV(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromParquet(std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
Expand All @@ -58,7 +55,12 @@ class CopyNodeArrow : public CopyStructuresArrow {
const std::vector<std::shared_ptr<T2>>& batchColumns, std::string filePath);

template<typename T>
arrow::Status assignCopyTasks(std::shared_ptr<arrow::csv::StreamingReader>& csvStreamingReader,
arrow::Status assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

Expand Down
2 changes: 0 additions & 2 deletions src/include/storage/copy_arrow/copy_rel_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class CopyRelArrow : public CopyStructuresArrow {

arrow::Status populateFromCSV(PopulateTaskType populateTaskType);

arrow::Status populateFromArrow(PopulateTaskType populateTaskType);

arrow::Status populateFromParquet(PopulateTaskType populateTaskType);

void populateAdjColumnsAndCountRelsInAdjLists();
Expand Down
4 changes: 1 addition & 3 deletions src/include/storage/copy_arrow/copy_structures_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ class CopyStructuresArrow {

arrow::Status countNumLinesCSV(const std::vector<std::string>& filePaths);

arrow::Status countNumLinesArrow(std::string const& filePath);

arrow::Status countNumLinesParquet(std::string const& filePath);
arrow::Status countNumLinesParquet(const std::vector<std::string>& filePaths);

arrow::Status initCSVReaderAndCheckStatus(
std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
Expand Down
110 changes: 42 additions & 68 deletions src/storage/copy_arrow/copy_node_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ arrow::Status CopyNodeArrow::populateColumns() {
arrow::Status status;
switch (copyDescription.fileType) {
case CopyDescription::FileType::CSV:
status = populateColumnsFromFiles<T>(pkIndex);
break;
case CopyDescription::FileType::ARROW:
status = populateColumnsFromArrow<T>(pkIndex);
status = populateColumnsFromCSV<T>(pkIndex);
break;
case CopyDescription::FileType::PARQUET:
status = populateColumnsFromParquet<T>(pkIndex);
Expand All @@ -77,75 +74,27 @@ arrow::Status CopyNodeArrow::populateColumns() {
}

template<typename T>
arrow::Status CopyNodeArrow::populateColumnsFromFiles(
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
arrow::Status CopyNodeArrow::populateColumnsFromCSV(std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
for (auto& filePath : copyDescription.filePaths) {
offset_t startOffset = fileBlockInfos.at(filePath).startOffset;
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
auto status = initCSVReaderAndCheckStatus(csvStreamingReader, filePath);
status = assignCopyTasks<T>(csvStreamingReader, startOffset, filePath, pkIndex);
status = assignCopyCSVTasks<T>(
csvStreamingReader.get(), fileBlockInfos.at(filePath).startOffset, filePath, pkIndex);
throwCopyExceptionIfNotOK(status);
}
return arrow::Status::OK();
}

template<typename T>
arrow::Status CopyNodeArrow::populateColumnsFromArrow(
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc_reader;
auto status = initArrowReaderAndCheckStatus(ipc_reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::RecordBatch> currBatch;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocksInFile = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocksInFile) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocksInFile) {
break;
}
ARROW_ASSIGN_OR_RAISE(currBatch, ipc_reader->ReadRecordBatch(blockIdx));
taskScheduler.scheduleTask(
CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::Array>,
reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
startOffset, pkIndex.get(), this, currBatch->columns(),
copyDescription.filePaths[0]));
startOffset += currBatch->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}
taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

template<typename T>
arrow::Status CopyNodeArrow::populateColumnsFromParquet(
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::Table> currTable;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocks = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocks) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
}
ARROW_RETURN_NOT_OK(reader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(
CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::ChunkedArray>,
reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
startOffset, pkIndex.get(), this, currTable->columns(),
copyDescription.filePaths[0]));
startOffset += currTable->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, filePath);
status = assignCopyParquetTasks<T>(
reader.get(), fileBlockInfos.at(filePath).startOffset, filePath, pkIndex);
throwCopyExceptionIfNotOK(status);
}
taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

Expand Down Expand Up @@ -185,8 +134,7 @@ arrow::Status CopyNodeArrow::batchPopulateColumnsTask(uint64_t primaryKeyPropert
}
populatePKIndex(
copier->columns[primaryKeyPropertyIdx].get(), pkIndex, startOffset, numLinesInCurBlock);
copier->logger->trace(
"End: path={0} blkIdx={1}", copier->copyDescription.filePaths[0], blockIdx);
copier->logger->trace("End: path={0} blkIdx={1}", filePath, blockIdx);
return arrow::Status::OK();
}

Expand Down Expand Up @@ -266,11 +214,10 @@ void CopyNodeArrow::putPropsOfLineIntoColumns(
}

template<typename T>
arrow::Status CopyNodeArrow::assignCopyTasks(
std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader, offset_t startOffset,
std::string filePath, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
auto it = csv_streaming_reader->begin();
auto endIt = csv_streaming_reader->end();
arrow::Status CopyNodeArrow::assignCopyCSVTasks(arrow::csv::StreamingReader* csvStreamingReader,
offset_t startOffset, std::string filePath, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
auto it = csvStreamingReader->begin();
auto endIt = csvStreamingReader->end();
std::shared_ptr<arrow::RecordBatch> currBatch;
int blockIdx = 0;
while (it != endIt) {
Expand All @@ -294,5 +241,32 @@ arrow::Status CopyNodeArrow::assignCopyTasks(
return arrow::Status::OK();
}

template<typename T>
arrow::Status CopyNodeArrow::assignCopyParquetTasks(parquet::arrow::FileReader* parquetReader,
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
auto numBlocks = fileBlockInfos.at(filePath).numBlocks;
auto blockIdx = 0u;
std::shared_ptr<arrow::Table> currTable;
while (blockIdx < numBlocks) {
for (int i = 0; i < common::CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
}
ARROW_RETURN_NOT_OK(parquetReader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(
CopyTaskFactory::createCopyTask(batchPopulateColumnsTask<T, arrow::ChunkedArray>,
reinterpret_cast<NodeTableSchema*>(tableSchema)->primaryKeyPropertyID, blockIdx,
startOffset, pkIndex.get(), this, currTable->columns(), filePath));
startOffset += currTable->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
common::CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}
taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

} // namespace storage
} // namespace kuzu
87 changes: 25 additions & 62 deletions src/storage/copy_arrow/copy_rel_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ arrow::Status CopyRelArrow::executePopulateTask(PopulateTaskType populateTaskTyp
case CopyDescription::FileType::CSV: {
status = populateFromCSV(populateTaskType);
} break;
case CopyDescription::FileType::ARROW: {
status = populateFromArrow(populateTaskType);
} break;
case CopyDescription::FileType::PARQUET: {
status = populateFromParquet(populateTaskType);
} break;
Expand Down Expand Up @@ -230,67 +227,37 @@ arrow::Status CopyRelArrow::populateFromCSV(PopulateTaskType populateTaskType) {
return arrow::Status::OK();
}

arrow::Status CopyRelArrow::populateFromArrow(PopulateTaskType populateTaskType) {
auto populateTask = populateAdjColumnsAndCountRelsInAdjListsTask<arrow::Array>;
if (populateTaskType == PopulateTaskType::populateListsTask) {
populateTask = populateListsTask<arrow::Array>;
}
logger->debug("Assigning task {0}", getTaskTypeName(populateTaskType));

std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc_reader;
auto status = initArrowReaderAndCheckStatus(ipc_reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::RecordBatch> currBatch;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocksInFile = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocksInFile) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocksInFile) {
break;
}
ARROW_ASSIGN_OR_RAISE(currBatch, ipc_reader->ReadRecordBatch(blockIdx));
taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask(populateTask, blockIdx,
startOffset, this, currBatch->columns(), copyDescription.filePaths[0]));
startOffset += currBatch->num_rows();
++blockIdx;
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}

taskScheduler.waitAllTasksToCompleteOrError();
return arrow::Status::OK();
}

arrow::Status CopyRelArrow::populateFromParquet(PopulateTaskType populateTaskType) {
auto populateTask = populateAdjColumnsAndCountRelsInAdjListsTask<arrow::ChunkedArray>;
if (populateTaskType == PopulateTaskType::populateListsTask) {
populateTask = populateListsTask<arrow::ChunkedArray>;
}
logger->debug("Assigning task {0}", getTaskTypeName(populateTaskType));

std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, copyDescription.filePaths[0]);
std::shared_ptr<arrow::Table> currTable;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocks = fileBlockInfos.at(copyDescription.filePaths[0]).numBlocks;
while (blockIdx < numBlocks) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = initParquetReaderAndCheckStatus(reader, filePath);
std::shared_ptr<arrow::Table> currTable;
int blockIdx = 0;
offset_t startOffset = 0;
auto numBlocks = fileBlockInfos.at(filePath).numBlocks;
while (blockIdx < numBlocks) {
for (int i = 0; i < CopyConstants::NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH; ++i) {
if (blockIdx == numBlocks) {
break;
}
ARROW_RETURN_NOT_OK(reader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask(
populateTask, blockIdx, startOffset, this, currTable->columns(), filePath));
startOffset += currTable->num_rows();
++blockIdx;
}
ARROW_RETURN_NOT_OK(reader->RowGroup(blockIdx)->ReadTable(&currTable));
taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask(populateTask, blockIdx,
startOffset, this, currTable->columns(), copyDescription.filePaths[0]));
startOffset += currTable->num_rows();
++blockIdx;
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}
taskScheduler.waitUntilEnoughTasksFinish(
CopyConstants::MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE);
}

taskScheduler.waitAllTasksToCompleteOrError();
taskScheduler.waitAllTasksToCompleteOrError();
}
return arrow::Status::OK();
}

Expand Down Expand Up @@ -624,8 +591,7 @@ template<typename T>
void CopyRelArrow::populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t blockIdx,
uint64_t blockStartRelID, CopyRelArrow* copier,
const std::vector<std::shared_ptr<T>>& batchColumns, const std::string& filePath) {
copier->logger->debug(
"Start: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockIdx);
copier->logger->debug("Start: path=`{0}` blkIdx={1}", filePath, blockIdx);
std::vector<bool> requireToReadTableLabels{true, true};
std::vector<nodeID_t> nodeIDs{2};
std::vector<DataType> nodePKTypes{2};
Expand Down Expand Up @@ -674,16 +640,14 @@ void CopyRelArrow::populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t blockId
copier->propertyColumnsPerDirection, nodeIDs, (uint8_t*)&relID);
relID++;
}
copier->logger->debug(
"End: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockIdx);
copier->logger->debug("End: path=`{0}` blkIdx={1}", filePath, blockIdx);
}

template<typename T>
void CopyRelArrow::populateListsTask(uint64_t blockId, uint64_t blockStartRelID,
CopyRelArrow* copier, const std::vector<std::shared_ptr<T>>& batchColumns,
const std::string& filePath) {
copier->logger->trace(
"Start: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockId);
copier->logger->trace("Start: path=`{0}` blkIdx={1}", filePath, blockId);
std::vector<nodeID_t> nodeIDs(2);
std::vector<DataType> nodePKTypes(2);
std::vector<uint64_t> reversePos(2);
Expand Down Expand Up @@ -723,8 +687,7 @@ void CopyRelArrow::populateListsTask(uint64_t blockId, uint64_t blockStartRelID,
(uint8_t*)&relID);
relID++;
}
copier->logger->trace(
"End: path=`{0}` blkIdx={1}", copier->copyDescription.filePaths[0], blockId);
copier->logger->trace("End: path=`{0}` blkIdx={1}", filePath, blockId);
}

void CopyRelArrow::sortOverflowValuesOfPropertyColumnTask(const DataType& dataType,
Expand Down
Loading

0 comments on commit 033e784

Please sign in to comment.