Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change copy morsel size back to DEFAULT_VECTOR_CAPACITY #1992

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.3 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
12 changes: 6 additions & 6 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ expression_vector Binder::bindColumnExpressions(TableSchema* tableSchema) {
return columnExpressions;
}

bool Binder::bindPreservingOrder(TableSchema* tableSchema, CopyDescription::FileType fileType) {
bool preservingOrder = fileType == CopyDescription::FileType::CSV;
bool Binder::bindContainsSerial(TableSchema* tableSchema, CopyDescription::FileType fileType) {
bool containsSerial = false;
for (auto& property : tableSchema->properties) {
if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL) {
preservingOrder = true;
containsSerial = true;
break;
}
}
return preservingOrder;
return containsSerial;
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
Expand All @@ -114,7 +114,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
auto preservingOrder = bindPreservingOrder(tableSchema, actualFileType);
auto containsSerial = bindContainsSerial(tableSchema, actualFileType);
// Bind expressions.
auto columnExpressions = bindColumnExpressions(tableSchema);
auto copyDescription = std::make_unique<CopyDescription>(
Expand All @@ -131,7 +131,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
nullptr;
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(std::move(copyDescription),
tableSchema, std::move(columnExpressions), std::move(nodeOffsetExpression),
std::move(boundOffsetExpression), std::move(nbrOffsetExpression), preservingOrder);
std::move(boundOffsetExpression), std::move(nbrOffsetExpression), containsSerial);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down
2 changes: 1 addition & 1 deletion src/binder/copy/bound_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ std::unique_ptr<BoundCopyFromInfo> BoundCopyFromInfo::copy() {
std::move(copiedColumnExpressions), offsetExpression->copy(),
tableSchema->tableType == common::TableType::REL ? boundOffsetExpression->copy() : nullptr,
tableSchema->tableType == common::TableType::REL ? nbrOffsetExpression->copy() : nullptr,
preservingOrder);
containsSerial);
}

} // namespace binder
Expand Down
6 changes: 6 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ void ArrowColumnVector::setArrowColumn(
arrowColumnBuffer->column = std::move(column);
}

void ArrowColumnVector::slice(ValueVector* vector, offset_t offset) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
setArrowColumn(vector, arrowColumnBuffer->column->Slice((int64_t)offset));
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Binder {
std::unique_ptr<common::LogicalType> bindDataType(const std::string& dataType);

/*** bind copy from/to ***/
static bool bindPreservingOrder(
static bool bindContainsSerial(
catalog::TableSchema* tableSchema, common::CopyDescription::FileType fileType);
expression_vector bindColumnExpressions(catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
Expand Down
6 changes: 3 additions & 3 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ struct BoundCopyFromInfo {
std::shared_ptr<Expression> boundOffsetExpression;
std::shared_ptr<Expression> nbrOffsetExpression;

bool preservingOrder;
bool containsSerial;

BoundCopyFromInfo(std::unique_ptr<common::CopyDescription> copyDesc,
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> offsetExpression,
std::shared_ptr<Expression> boundOffsetExpression,
std::shared_ptr<Expression> nbrOffsetExpression, bool preservingOrder)
std::shared_ptr<Expression> nbrOffsetExpression, bool containsSerial)
: copyDesc{std::move(copyDesc)}, tableSchema{tableSchema}, columnExpressions{std::move(
columnExpressions)},
offsetExpression{std::move(offsetExpression)}, boundOffsetExpression{std::move(
boundOffsetExpression)},
nbrOffsetExpression{std::move(nbrOffsetExpression)}, preservingOrder{preservingOrder} {}
nbrOffsetExpression{std::move(nbrOffsetExpression)}, containsSerial{containsSerial} {}

std::unique_ptr<BoundCopyFromInfo> copy();
};
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class ArrowColumnVector {
}

static void setArrowColumn(ValueVector* vector, std::shared_ptr<arrow::ChunkedArray> column);

static void slice(ValueVector* vector, offset_t offset);
};

class MapVector {
Expand Down
8 changes: 1 addition & 7 deletions src/include/planner/logical_plan/copy/logical_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,7 @@ class LogicalCopyFrom : public LogicalOperator {
LogicalCopyFrom(std::unique_ptr<binder::BoundCopyFromInfo> info,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY_FROM}, info{std::move(info)},
outputExpression{std::move(outputExpression)} {
assert((this->info->tableSchema->tableType == common::TableType::REL &&
this->info->boundOffsetExpression && this->info->nbrOffsetExpression) ||
(this->info->tableSchema->tableType == common::TableType::NODE &&
(this->info->boundOffsetExpression == nullptr) &&
(this->info->nbrOffsetExpression == nullptr)));
}
outputExpression{std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override {
return info->tableSchema->tableName;
Expand Down
7 changes: 5 additions & 2 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct CopyNodeInfo {
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
bool orderPreserving;
bool containsSerial;
};

class CopyNode : public Sink {
Expand All @@ -80,7 +80,10 @@ class CopyNode : public Sink {

void executeInternal(ExecutionContext* context) final;

void finalize(ExecutionContext* context) final;
static void sliceDataChunk(const common::DataChunk& dataChunk,
const std::vector<DataPos>& dataColumnPoses, common::offset_t offset);

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyNode>(sharedState, copyNodeInfo, resultSetDescriptor->copy(),
Expand Down
11 changes: 7 additions & 4 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace processor {
struct ReaderInfo {
DataPos nodeOffsetPos;
std::vector<DataPos> dataColumnsPos;
bool orderPreserving;
bool containsSerial;

ReaderInfo(
const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos, bool orderPreserving)
const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos, bool containsSerial)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)},
orderPreserving{orderPreserving} {}
containsSerial{containsSerial} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos}, dataColumnsPos{other.dataColumnsPos},
orderPreserving{other.orderPreserving} {}
containsSerial{other.containsSerial} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }

Expand Down Expand Up @@ -45,12 +45,15 @@ class Reader : public PhysicalOperator {
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

inline bool getContainsSerial() const { return info->containsSerial; }

protected:
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();
void readNextNodeGroupInParallel();

private:
std::unique_ptr<ReaderInfo> info;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/persistent/reader_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class ReaderSharedState {

private:
std::unique_ptr<ReaderMorsel> getMorselOfNextBlock();
void readNextBlock(common::DataChunk* dataChunk);

public:
std::mutex mtx;
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.8.3", 19}, {"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17}, {"0.0.7.1", 16},
{"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11},
{"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6},
{"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.8.4", 19}, {"0.0.8.3", 19}, {"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17},
{"0.0.7.1", 16}, {"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12},
{"0.0.6.2", 11}, {"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7},
{"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2},
{"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
// Map reader.
auto reader = createReader(copyFromInfo->copyDesc.get(), copyFromInfo->tableSchema,
copyFrom->getSchema(), copyFromInfo->columnExpressions, copyFromInfo->offsetExpression,
copyFromInfo->preservingOrder);
copyFromInfo->containsSerial);
auto readerOp = reinterpret_cast<Reader*>(reader.get());
auto readerInfo = readerOp->getReaderInfo();
auto nodeTable = storageManager.getNodesStore().getNodeTable(tableSchema->tableID);
Expand All @@ -64,7 +64,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
tableSchema, nodeTable, *copyFromInfo->copyDesc, memoryManager);
CopyNodeInfo copyNodeDataInfo{readerInfo->dataColumnsPos, readerInfo->nodeOffsetPos,
*copyFromInfo->copyDesc, nodeTable, &storageManager.getRelsStore(), catalog,
storageManager.getWAL(), copyFromInfo->preservingOrder};
storageManager.getWAL(), copyFromInfo->containsSerial};
auto copyNode = std::make_unique<CopyNode>(copyNodeSharedState, copyNodeDataInfo,
std::make_unique<ResultSetDescriptor>(copyFrom->getSchema()), std::move(reader),
getOperatorID(), copyFrom->getExpressionsForPrinting());
Expand Down Expand Up @@ -102,7 +102,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto reader = createReader(copyFromInfo->copyDesc.get(), copyFromInfo->tableSchema, outFSchema,
copyFromInfo->columnExpressions, copyFromInfo->offsetExpression,
copyFromInfo->preservingOrder);
copyFromInfo->containsSerial);
auto readerOp = reinterpret_cast<Reader*>(reader.get());
auto readerInfo = readerOp->getReaderInfo();
auto offsetDataPos = DataPos{outFSchema->getExpressionPos(*copyFromInfo->offsetExpression)};
Expand Down
36 changes: 22 additions & 14 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,37 @@ void CopyNode::executeInternal(ExecutionContext* context) {
auto numTuplesToAppend = ArrowColumnVector::getArrowColumn(
resultSet->getValueVector(copyNodeInfo.dataColumnPoses[0]).get())
->length();
assert(numTuplesToAppend <= StorageConstants::NODE_GROUP_SIZE);
auto nodeOffset = nodeOffsetVector->getValue<offset_t>(
nodeOffsetVector->state->selVector->selectedPositions[0]);
auto numAppendedTuplesInNodeGroup =
localNodeGroup->append(resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend);
assert(numAppendedTuplesInNodeGroup == numTuplesToAppend);
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
if (copyNodeInfo.orderPreserving) {
nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
sharedState->setNextNodeGroupIdx(nodeGroupIdx + 1);
} else {
auto numAppendedTuples = 0ul;
while (numAppendedTuples < numTuplesToAppend) {
auto numAppendedTuplesInNodeGroup = localNodeGroup->append(
resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend - numAppendedTuples);
numAppendedTuples += numAppendedTuplesInNodeGroup;
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get());
}
if (numAppendedTuples < numTuplesToAppend) {
sliceDataChunk(
*resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos),
copyNodeInfo.dataColumnPoses, (offset_t)numAppendedTuplesInNodeGroup);
}
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get());
}
}
if (localNodeGroup->getNumNodes() > 0) {
sharedState->appendLocalNodeGroup(std::move(localNodeGroup));
}
}

void CopyNode::sliceDataChunk(
const DataChunk& dataChunk, const std::vector<DataPos>& dataColumnPoses, offset_t offset) {
for (auto& dataColumnPos : dataColumnPoses) {
ArrowColumnVector::slice(
dataChunk.valueVectors[dataColumnPos.valueVectorPos].get(), offset);
}
}

void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table,
NodeGroup* nodeGroup) {
Expand Down
26 changes: 16 additions & 10 deletions src/processor/operator/persistent/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ void Reader::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}

bool Reader::getNextTuplesInternal(ExecutionContext* context) {
info->orderPreserving ? getNextNodeGroupInSerial() : getNextNodeGroupInParallel();
sharedState->copyDescription->fileType == common::CopyDescription::FileType::CSV ?
getNextNodeGroupInSerial() :
getNextNodeGroupInParallel();
return dataChunk->state->selVector->selectedSize != 0;
}

Expand All @@ -40,10 +42,21 @@ void Reader::getNextNodeGroupInSerial() {
}

void Reader::getNextNodeGroupInParallel() {
while (leftArrowArrays.getLeftNumRows() < StorageConstants::NODE_GROUP_SIZE) {
readNextNodeGroupInParallel();
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
} else {
int64_t numRowsToReturn =
std::min(leftArrowArrays.getLeftNumRows(), DEFAULT_VECTOR_CAPACITY);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn);
}
}

void Reader::readNextNodeGroupInParallel() {
if (leftArrowArrays.getLeftNumRows() == 0) {
auto morsel = sharedState->getParallelMorsel();
if (morsel->fileIdx == INVALID_VECTOR_IDX) {
break;
return;
}
if (!readFuncData || morsel->fileIdx != readFuncData->fileIdx) {
readFuncData = initFunc(sharedState->copyDescription->filePaths, morsel->fileIdx,
Expand All @@ -52,13 +65,6 @@ void Reader::getNextNodeGroupInParallel() {
readFunc(*readFuncData, morsel->blockIdx, dataChunk.get());
leftArrowArrays.appendFromDataChunk(dataChunk.get());
}
if (leftArrowArrays.getLeftNumRows() == 0) {
dataChunk->state->selVector->selectedSize = 0;
} else {
int64_t numRowsToReturn =
std::min(leftArrowArrays.getLeftNumRows(), StorageConstants::NODE_GROUP_SIZE);
leftArrowArrays.appendToDataChunk(dataChunk.get(), numRowsToReturn);
}
}

} // namespace processor
Expand Down
Loading