Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Sep 5, 2023
1 parent cf7134f commit 6ed048c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Reader : public PhysicalOperator {
private:
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();
void readNextNodeGroupInParallel();

private:
std::unique_ptr<ReaderInfo> info;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/copier/reader_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ class ReaderSharedState {

private:
std::unique_ptr<ReaderMorsel> getMorselOfNextBlock();
void readNextBlock(common::DataChunk* dataChunk);

public:
std::mutex mtx;
Expand Down
22 changes: 13 additions & 9 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,21 @@ void Reader::getNextNodeGroupInSerial() {
}

void Reader::getNextNodeGroupInParallel() {
while (leftArrowArrays.getLeftNumRows() < DEFAULT_VECTOR_CAPACITY) {
readNextNodeGroupInParallel();
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
} else {
int64_t numRowsToReturn =
std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn);
}
}

void Reader::readNextNodeGroupInParallel() {
if (leftArrowArrays.getLeftNumRows() == 0) {
auto morsel = sharedState->getParallelMorsel();
if (morsel->fileIdx == INVALID_VECTOR_IDX) {
break;
return;
}
if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) {
readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx,
Expand All @@ -54,13 +65,6 @@ void Reader::getNextNodeGroupInParallel() {
readFunc(*readFuncData, morsel->blockIdx, dataChunk.get());
leftArrowArrays.appendFromDataChunk(dataChunk.get());
}
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
} else {
int64_t numRowsToReturn =
std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn);
}
}

} // namespace processor
Expand Down
28 changes: 16 additions & 12 deletions src/storage/copier/reader_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,25 @@ void ReaderSharedState::countBlocks() {

std::unique_ptr<ReaderMorsel> ReaderSharedState::getSerialMorsel(DataChunk* dataChunk) {
std::unique_lock xLck{mtx};
while (leftArrowArrays.getLeftNumRows() < DEFAULT_VECTOR_CAPACITY) {
readNextBlock(dataChunk);
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
return std::make_unique<ReaderMorsel>();
} else {
auto numRowsToReturn = std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk, numRowsToReturn);
auto result = std::make_unique<ReaderMorsel>(currFileIdx, currBlockIdx, currRowIdx);
currRowIdx += numRowsToReturn;
return result;
}
}

void ReaderSharedState::readNextBlock(common::DataChunk* dataChunk) {
if (leftArrowArrays.getLeftNumRows() == 0) {
auto morsel = getMorselOfNextBlock();
if (morsel->fileIdx >= copyDescription->filePaths.size()) {
// No more blocks.
break;
return;
}
if (morsel->fileIdx != readFuncData->fileIdx) {
readFuncData = initFunc(copyDescription->filePaths, morsel->fileIdx,
Expand All @@ -264,16 +278,6 @@ std::unique_ptr<ReaderMorsel> ReaderSharedState::getSerialMorsel(DataChunk* data
readFunc(*readFuncData, morsel->blockIdx, dataChunk);
leftArrowArrays.appendFromDataChunk(dataChunk);
}
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
return std::make_unique<ReaderMorsel>();
} else {
auto numRowsToReturn = std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk, numRowsToReturn);
auto result = std::make_unique<ReaderMorsel>(currFileIdx, currBlockIdx, currRowIdx);
currRowIdx += numRowsToReturn;
return result;
}
}

std::unique_ptr<ReaderMorsel> ReaderSharedState::getParallelMorsel() {
Expand Down

0 comments on commit 6ed048c

Please sign in to comment.