Skip to content

Commit

Permalink
Change copy morsel size back to DEFAULT_VECTOR_CAPACITY
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Sep 6, 2023
1 parent bf7414f commit 37191a7
Show file tree
Hide file tree
Showing 22 changed files with 155 additions and 123 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.3 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
12 changes: 6 additions & 6 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
return columnExpressions;
}

bool Binder::bindPreservingOrder(TableSchema* tableSchema, CopyDescription::FileType fileType) {
bool preservingOrder = fileType == CopyDescription::FileType::CSV;
bool Binder::bindContainsSerial(TableSchema* tableSchema, CopyDescription::FileType fileType) {
bool containsSerial = false;
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL) {
preservingOrder = true;
containsSerial = true;
break;
}
}
return preservingOrder;
return containsSerial;
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
Expand All @@ -114,7 +114,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
auto preservingOrder = bindPreservingOrder(tableSchema, actualFileType);
auto containsSerial = bindContainsSerial(tableSchema, actualFileType);
// Bind expressions.
auto columnExpressions = bindColumnExpressions(tableSchema);
auto copyDescription = std::make_unique<CopyDescription>(
Expand All @@ -131,7 +131,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
nullptr;
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columnExpressions), std::move(nodeOffsetExpression),
std::move(boundOffsetExpression), std::move(nbrOffsetExpression), preservingOrder);
std::move(boundOffsetExpression), std::move(nbrOffsetExpression), containsSerial);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down
2 changes: 1 addition & 1 deletion src/binder/copy/bound_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
std::move(copiedColumnExpressions), offsetExpression->copy(),
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
preservingOrder);
containsSerial);
}

} // namespace binder
Expand Down
6 changes: 6 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ void ArrowColumnVector::setArrowColumn(
arrowColumnBuffer->column = std::move(column);
}

void ArrowColumnVector::slice(ValueVector* vector, offset_t offset) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
setArrowColumn(vector, arrowColumnBuffer->column->Slice((int64_t)offset));
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Binder {
std::unique_ptr<common::LogicalType> bindDataType(const std::string& dataType);

/*** bind copy from/to ***/
static bool bindPreservingOrder(
static bool bindContainsSerial(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindColumnExpressions(catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
Expand Down
6 changes: 3 additions & 3 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ struct BoundCopyFromInfo {
std::shared_ptr<Expression> boundOffsetExpression;
std::shared_ptr<Expression> nbrOffsetExpression;

bool preservingOrder;
bool containsSerial;

BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> offsetExpression,
std::shared_ptr<Expression> boundOffsetExpression,
std::shared_ptr<Expression> nbrOffsetExpression, bool preservingOrder)
std::shared_ptr<Expression> nbrOffsetExpression, bool containsSerial)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
columnExpressions)},
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
boundOffsetExpression)},
nbrOffsetExpression{std::move(nbrOffsetExpression)}, preservingOrder{preservingOrder} {}
nbrOffsetExpression{std::move(nbrOffsetExpression)}, containsSerial{containsSerial} {}

std::unique_ptr<BoundCopyFromInfo> copy();
};
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class ArrowColumnVector {
}

static void setArrowColumn(ValueVector* vector, std::shared_ptr<arrow::ChunkedArray> column);

static void slice(ValueVector* vector, offset_t offset);
};

class MapVector {
Expand Down
8 changes: 1 addition & 7 deletions src/include/planner/logical_plan/copy/logical_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@ class LogicalCopyFrom : public LogicalOperator {
LogicalCopyFrom(std::unique_ptr<binder::BoundCopyFromInfo> info,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY_FROM}, info{std::move(info)},
outputExpression{std::move(outputExpression)} {
assert((this->info->tableSchema->tableType == common::TableType::REL &&
this->info->boundOffsetExpression && this->info->nbrOffsetExpression) ||
(this->info->tableSchema->tableType == common::TableType::NODE &&
(this->info->boundOffsetExpression == nullptr) &&
(this->info->nbrOffsetExpression == nullptr)));
}
outputExpression{std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override {
return info->tableSchema->tableName;
Expand Down
7 changes: 5 additions & 2 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct CopyNodeInfo {
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
bool orderPreserving;
bool containsSerial;
};

class CopyNode : public Sink {
Expand All @@ -80,7 +80,10 @@ class CopyNode : public Sink {

void executeInternal(ExecutionContext* context) final;

void finalize(ExecutionContext* context) final;
static void sliceDataChunk(const common::DataChunk& dataChunk,
const std::vector<DataPos>& dataColumnPoses, common::offset_t offset);

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyNode>(sharedState, copyNodeInfo, resultSetDescriptor->copy(),
Expand Down
11 changes: 7 additions & 4 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace processor {
struct ReaderInfo {
DataPos nodeOffsetPos;
std::vector<DataPos> dataColumnsPos;
bool orderPreserving;
bool containsSerial;

ReaderInfo(
const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos, bool orderPreserving)
const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos, bool containsSerial)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)},
orderPreserving{orderPreserving} {}
containsSerial{containsSerial} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos}, dataColumnsPos{other.dataColumnsPos},
orderPreserving{other.orderPreserving} {}
containsSerial{other.containsSerial} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }

Expand Down Expand Up @@ -45,12 +45,15 @@ class Reader : public PhysicalOperator {
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

inline bool getContainsSerial() const { return info->containsSerial; }

protected:
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();
void readNextNodeGroupInParallel();

private:
std::unique_ptr<ReaderInfo> info;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/persistent/reader_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class ReaderSharedState {

private:
std::unique_ptr<ReaderMorsel> getMorselOfNextBlock();
void readNextBlock(common::DataChunk* dataChunk);

public:
std::mutex mtx;
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.8.3", 19}, {"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17}, {"0.0.7.1", 16},
{"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11},
{"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6},
{"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.8.4", 19}, {"0.0.8.3", 19}, {"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17},
{"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12},
{"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7},
{"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2},
{"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
// Map reader.
auto reader = createReader(copyFromInfo->copyDesc.get(), copyFromInfo->tableSchema,
copyFrom->getSchema(), copyFromInfo->columnExpressions, copyFromInfo->offsetExpression,
copyFromInfo->preservingOrder);
copyFromInfo->containsSerial);
auto readerOp = reinterpret_cast<Reader*>(reader.get());
auto readerInfo = readerOp->getReaderInfo();
auto nodeTable = storageManager.getNodesStore().getNodeTable(tableSchema->tableID);
Expand All @@ -64,7 +64,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
tableSchema, nodeTable, *copyFromInfo->copyDesc, memoryManager);
CopyNodeInfo copyNodeDataInfo{readerInfo->dataColumnsPos, readerInfo->nodeOffsetPos,
*copyFromInfo->copyDesc, nodeTable, &storageManager.getRelsStore(), catalog,
storageManager.getWAL(), copyFromInfo->preservingOrder};
storageManager.getWAL(), copyFromInfo->containsSerial};
auto copyNode = std::make_unique<CopyNode>(copyNodeSharedState, copyNodeDataInfo,
std::make_unique<ResultSetDescriptor>(copyFrom->getSchema()), std::move(reader),
getOperatorID(), copyFrom->getExpressionsForPrinting());
Expand Down Expand Up @@ -102,7 +102,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto reader = createReader(copyFromInfo->copyDesc.get(), copyFromInfo->tableSchema, outFSchema,
copyFromInfo->columnExpressions, copyFromInfo->offsetExpression,
copyFromInfo->preservingOrder);
copyFromInfo->containsSerial);
auto readerOp = reinterpret_cast<Reader*>(reader.get());
auto readerInfo = readerOp->getReaderInfo();
auto offsetDataPos = DataPos{outFSchema->getExpressionPos(*copyFromInfo->offsetExpression)};
Expand Down
36 changes: 22 additions & 14 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,37 @@ void CopyNode::executeInternal(ExecutionContext* context) {
auto numTuplesToAppend = ArrowColumnVector::getArrowColumn(
resultSet->getValueVector(copyNodeInfo.dataColumnPoses[0]).get())
->length();
assert(numTuplesToAppend <= StorageConstants::NODE_GROUP_SIZE);
auto nodeOffset = nodeOffsetVector->getValue<offset_t>(
nodeOffsetVector->state->selVector->selectedPositions[0]);
auto numAppendedTuplesInNodeGroup =
localNodeGroup->append(resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend);
assert(numAppendedTuplesInNodeGroup == numTuplesToAppend);
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
if (copyNodeInfo.orderPreserving) {
nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
sharedState->setNextNodeGroupIdx(nodeGroupIdx + 1);
} else {
auto numAppendedTuples = 0ul;
while (numAppendedTuples < numTuplesToAppend) {
auto numAppendedTuplesInNodeGroup = localNodeGroup->append(
resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend - numAppendedTuples);
numAppendedTuples += numAppendedTuplesInNodeGroup;
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get());
}
if (numAppendedTuples < numTuplesToAppend) {
sliceDataChunk(
*resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos),
copyNodeInfo.dataColumnPoses, (offset_t)numAppendedTuplesInNodeGroup);
}
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get());
}
}
if (localNodeGroup->getNumNodes() > 0) {
sharedState->appendLocalNodeGroup(std::move(localNodeGroup));
}
}

void CopyNode::sliceDataChunk(
const DataChunk& dataChunk, const std::vector<DataPos>& dataColumnPoses, offset_t offset) {
for (auto& dataColumnPos : dataColumnPoses) {
ArrowColumnVector::slice(
dataChunk.valueVectors[dataColumnPos.valueVectorPos].get(), offset);
}
}

void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table,
NodeGroup* nodeGroup) {
Expand Down
26 changes: 16 additions & 10 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}

bool Reader::getNextTuplesInternal(ExecutionContext* context) {
info->orderPreserving ? getNextNodeGroupInSerial() : getNextNodeGroupInParallel();
sharedState->copyDescription->fileType == common::CopyDescription::FileType::CSV ?
getNextNodeGroupInSerial() :
getNextNodeGroupInParallel();
return dataChunk->state->selVector->selectedSize != 0;
}

Expand All @@ -40,10 +42,21 @@ void Reader::getNextNodeGroupInSerial() {
}

void Reader::getNextNodeGroupInParallel() {
while (leftArrowArrays.getLeftNumRows() < StorageConstants::NODE_GROUP_SIZE) {
readNextNodeGroupInParallel();
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
} else {
int64_t numRowsToReturn =
std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn);
}
}

void Reader::readNextNodeGroupInParallel() {
if (leftArrowArrays.getLeftNumRows() == 0) {
auto morsel = sharedState->getParallelMorsel();
if (morsel->fileIdx == INVALID_VECTOR_IDX) {
break;
return;
}
if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) {
readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx,
Expand All @@ -52,13 +65,6 @@ void Reader::getNextNodeGroupInParallel() {
readFunc(*readFuncData, morsel->blockIdx, dataChunk.get());
leftArrowArrays.appendFromDataChunk(dataChunk.get());
}
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
} else {
int64_t numRowsToReturn =
std::min(leftArrowArrays.getLeftNumRows(), StorageConstants::NODE_GROUP_SIZE);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn);
}
}

} // namespace processor
Expand Down
Loading

0 comments on commit 37191a7

Please sign in to comment.