Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: separate insertions and updates in rel table local storage #2982

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.3.2.1 LANGUAGES CXX C)
project(Kuzu VERSION 0.3.2.2 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
41 changes: 24 additions & 17 deletions src/catalog/catalog_entry/rel_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

#include "catalog/catalog.h"

using namespace kuzu::common;

namespace kuzu {
namespace catalog {

RelTableCatalogEntry::RelTableCatalogEntry(std::string name, common::table_id_t tableID,
RelTableCatalogEntry::RelTableCatalogEntry(std::string name, table_id_t tableID,
common::RelMultiplicity srcMultiplicity, common::RelMultiplicity dstMultiplicity,
common::table_id_t srcTableID, common::table_id_t dstTableID)
table_id_t srcTableID, table_id_t dstTableID)
: TableCatalogEntry{CatalogEntryType::REL_TABLE_ENTRY, std::move(name), tableID},
srcMultiplicity{srcMultiplicity}, dstMultiplicity{dstMultiplicity}, srcTableID{srcTableID},
dstTableID{dstTableID} {}
Expand All @@ -20,27 +22,32 @@ RelTableCatalogEntry::RelTableCatalogEntry(const RelTableCatalogEntry& other)
dstTableID = other.dstTableID;
}

bool RelTableCatalogEntry::isParent(common::table_id_t tableID) {
bool RelTableCatalogEntry::isParent(table_id_t tableID) {
return srcTableID == tableID || dstTableID == tableID;
}

bool RelTableCatalogEntry::isSingleMultiplicity(common::RelDataDirection direction) const {
column_id_t RelTableCatalogEntry::getColumnID(property_id_t propertyID) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
// Skip the first column in the rel table, which is reserved for nbrID.
return it == properties.end() ? common::INVALID_COLUMN_ID :
std::distance(properties.begin(), it) + 1;
}

bool RelTableCatalogEntry::isSingleMultiplicity(RelDataDirection direction) const {
return getMultiplicity(direction) == common::RelMultiplicity::ONE;
}
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(
common::RelDataDirection direction) const {
return direction == common::RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(RelDataDirection direction) const {
return direction == RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
}
common::table_id_t RelTableCatalogEntry::getBoundTableID(
common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? srcTableID : dstTableID;
table_id_t RelTableCatalogEntry::getBoundTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? srcTableID : dstTableID;
}
common::table_id_t RelTableCatalogEntry::getNbrTableID(
common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? dstTableID : srcTableID;
table_id_t RelTableCatalogEntry::getNbrTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? dstTableID : srcTableID;
}

void RelTableCatalogEntry::serialize(common::Serializer& serializer) const {
void RelTableCatalogEntry::serialize(Serializer& serializer) const {
TableCatalogEntry::serialize(serializer);
serializer.write(srcMultiplicity);
serializer.write(dstMultiplicity);
Expand All @@ -49,11 +56,11 @@ void RelTableCatalogEntry::serialize(common::Serializer& serializer) const {
}

std::unique_ptr<RelTableCatalogEntry> RelTableCatalogEntry::deserialize(
common::Deserializer& deserializer) {
Deserializer& deserializer) {
common::RelMultiplicity srcMultiplicity;
common::RelMultiplicity dstMultiplicity;
common::table_id_t srcTableID;
common::table_id_t dstTableID;
table_id_t srcTableID;
table_id_t dstTableID;
deserializer.deserializeValue(srcMultiplicity);
deserializer.deserializeValue(dstMultiplicity);
deserializer.deserializeValue(srcTableID);
Expand Down
42 changes: 16 additions & 26 deletions src/common/data_chunk/data_chunk_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@

void DataChunkCollection::append(DataChunk& chunk) {
auto numTuplesToAppend = chunk.state->selVector->selectedSize;
auto chunkToAppendInfo = chunks.empty() ? allocateChunk(chunk) : chunks.back().get();
auto numTuplesAppended = 0u;
while (numTuplesAppended < numTuplesToAppend) {
if (chunkToAppendInfo->state->selVector->selectedSize == DEFAULT_VECTOR_CAPACITY) {
chunkToAppendInfo = allocateChunk(chunk);
if (chunks.empty() ||
chunks.back().state->selVector->selectedSize == DEFAULT_VECTOR_CAPACITY) {

Check warning on line 13 in src/common/data_chunk/data_chunk_collection.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/data_chunk/data_chunk_collection.cpp#L13

Added line #L13 was not covered by tests
allocateChunk(chunk);
}
auto& chunkToAppend = chunks.back();
auto numTuplesToCopy = std::min(numTuplesToAppend - numTuplesAppended,
DEFAULT_VECTOR_CAPACITY - chunkToAppendInfo->state->selVector->selectedSize);
DEFAULT_VECTOR_CAPACITY - chunkToAppend.state->selVector->selectedSize);
for (auto vectorIdx = 0u; vectorIdx < chunk.getNumValueVectors(); vectorIdx++) {
for (auto i = 0u; i < numTuplesToCopy; i++) {
auto srcPos = chunk.state->selVector->selectedPositions[numTuplesAppended + i];
auto dstPos = chunkToAppendInfo->state->selVector->selectedSize + i;
chunkToAppendInfo->getValueVector(vectorIdx)->copyFromVectorData(
auto dstPos = chunkToAppend.state->selVector->selectedSize + i;
chunkToAppend.getValueVector(vectorIdx)->copyFromVectorData(
dstPos, chunk.getValueVector(vectorIdx).get(), srcPos);
}
}
chunkToAppendInfo->state->selVector->selectedSize += numTuplesToCopy;
chunkToAppend.state->selVector->selectedSize += numTuplesToCopy;
numTuplesAppended += numTuplesToCopy;
}
}

void DataChunkCollection::append(std::unique_ptr<DataChunk> chunk) {
KU_ASSERT(chunk);
void DataChunkCollection::merge(DataChunk chunk) {
if (chunks.empty()) {
initTypes(*chunk);
initTypes(chunk);
}
KU_ASSERT(chunk->getNumValueVectors() == types.size());
for (auto vectorIdx = 0u; vectorIdx < chunk->getNumValueVectors(); vectorIdx++) {
KU_ASSERT(chunk->getValueVector(vectorIdx)->dataType == types[vectorIdx]);
KU_ASSERT(chunk.getNumValueVectors() == types.size());
for (auto vectorIdx = 0u; vectorIdx < chunk.getNumValueVectors(); vectorIdx++) {
KU_ASSERT(chunk.getValueVector(vectorIdx)->dataType == types[vectorIdx]);
}
chunks.push_back(std::move(chunk));
}
Expand All @@ -47,28 +47,18 @@
}
}

std::vector<common::DataChunk*> DataChunkCollection::getChunks() const {
std::vector<common::DataChunk*> ret;
ret.reserve(chunks.size());
for (auto& chunk : chunks) {
ret.push_back(chunk.get());
}
return ret;
}

DataChunk* DataChunkCollection::allocateChunk(DataChunk& chunk) {
void DataChunkCollection::allocateChunk(DataChunk& chunk) {
if (chunks.empty()) {
types.reserve(chunk.getNumValueVectors());
for (auto vectorIdx = 0u; vectorIdx < chunk.getNumValueVectors(); vectorIdx++) {
types.push_back(chunk.getValueVector(vectorIdx)->dataType);
}
}
auto newChunk = std::make_unique<DataChunk>(types.size(), std::make_shared<DataChunkState>());
DataChunk newChunk(types.size(), std::make_shared<DataChunkState>());
for (auto i = 0u; i < types.size(); i++) {
newChunk->insert(i, std::make_shared<ValueVector>(types[i], mm));
newChunk.insert(i, std::make_shared<ValueVector>(types[i], mm));
}
chunks.push_back(std::move(newChunk));
return chunks.back().get();
}

} // namespace common
Expand Down
9 changes: 9 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,15 @@
return typesCopy;
}

std::vector<LogicalType> LogicalType::copy(const std::vector<LogicalType*>& types) {
std::vector<LogicalType> typesCopy;
typesCopy.reserve(types.size());
for (auto& type : types) {
typesCopy.push_back(*type->copy());
}
return typesCopy;
}

Check warning on line 461 in src/common/types/types.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/types/types.cpp#L461

Added line #L461 was not covered by tests

PhysicalTypeID LogicalType::getPhysicalType(LogicalTypeID typeID) {
switch (typeID) {
case LogicalTypeID::ANY: {
Expand Down
8 changes: 3 additions & 5 deletions src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ struct StorageInfoSharedState final : public CallFuncSharedState {
columns.push_back(relTable->getCSRLengthColumn(RelDataDirection::FWD));
columns.push_back(relTable->getCSROffsetColumn(RelDataDirection::BWD));
columns.push_back(relTable->getCSRLengthColumn(RelDataDirection::BWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::FWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::BWD));
for (auto columnID = 0u; columnID < relTable->getNumColumns(); columnID++) {
auto column = relTable->getColumn(columnID, RelDataDirection::FWD);
auto collectedColumns = collectColumns(column);
Expand Down Expand Up @@ -167,10 +165,10 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output
while (true) {
if (localState->currChunkIdx < localState->dataChunkCollection->getNumChunks()) {
// Copy from local state chunk.
auto chunk = localState->dataChunkCollection->getChunk(localState->currChunkIdx);
auto numValuesToOutput = chunk->state->selVector->selectedSize;
auto& chunk = localState->dataChunkCollection->getChunkUnsafe(localState->currChunkIdx);
auto numValuesToOutput = chunk.state->selVector->selectedSize;
for (auto columnIdx = 0u; columnIdx < dataChunk.getNumValueVectors(); columnIdx++) {
auto localVector = chunk->getValueVector(columnIdx);
auto localVector = chunk.getValueVector(columnIdx);
auto outputVector = dataChunk.getValueVector(columnIdx);
for (auto i = 0u; i < numValuesToOutput; i++) {
outputVector->copyFromVectorData(i, localVector.get(), i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class RelTableCatalogEntry final : public TableCatalogEntry {
//===--------------------------------------------------------------------===//
bool isParent(common::table_id_t tableID) override;
common::TableType getTableType() const override { return common::TableType::REL; }
common::column_id_t getColumnID(common::property_id_t propertyID) const override;
common::table_id_t getSrcTableID() const { return srcTableID; }
common::table_id_t getDstTableID() const { return dstTableID; }
bool isSingleMultiplicity(common::RelDataDirection direction) const;
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/catalog_entry/table_catalog_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TableCatalogEntry : public CatalogEntry {
bool containProperty(const std::string& propertyName) const;
common::property_id_t getPropertyID(const std::string& propertyName) const;
const Property* getProperty(common::property_id_t propertyID) const;
common::column_id_t getColumnID(common::property_id_t propertyID) const;
virtual common::column_id_t getColumnID(common::property_id_t propertyID) const;
bool containPropertyType(const common::LogicalType& logicalType) const;
void addProperty(std::string propertyName, std::unique_ptr<common::LogicalType> dataType);
void dropProperty(common::property_id_t propertyID);
Expand All @@ -52,7 +52,7 @@ class TableCatalogEntry : public CatalogEntry {
static std::unique_ptr<TableCatalogEntry> deserialize(
common::Deserializer& deserializer, CatalogEntryType type);

private:
protected:
common::table_id_t tableID;
std::string comment;
common::property_id_t nextPID;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/column_data_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ namespace common {

enum class ColumnDataFormat : uint8_t { REGULAR = 0, CSR = 1 };

}
} // namespace common
} // namespace kuzu
24 changes: 14 additions & 10 deletions src/include/common/data_chunk/data_chunk_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,40 @@
namespace kuzu {
namespace common {

// TODO(Guodong/Ziyi): We should extend this to ColumnDataCollection, which takes ResultSet into
// consideration for storage and scan.
// TODO(Guodong): Should rework this to use ColumnChunk.
class DataChunkCollection {
public:
explicit DataChunkCollection(storage::MemoryManager* mm);
DELETE_COPY_DEFAULT_MOVE(DataChunkCollection);

void append(DataChunk& chunk);
void append(std::unique_ptr<DataChunk> chunk);
std::vector<common::DataChunk*> getChunks() const;

inline const std::vector<common::DataChunk>& getChunks() const { return chunks; }
inline std::vector<common::DataChunk>& getChunksUnsafe() { return chunks; }
inline uint64_t getNumChunks() const { return chunks.size(); }
inline DataChunk* getChunk(uint64_t idx) const {
inline const DataChunk& getChunk(uint64_t idx) const {
KU_ASSERT(idx < chunks.size());
return chunks[idx];
}
inline DataChunk& getChunkUnsafe(uint64_t idx) {
KU_ASSERT(idx < chunks.size());
return chunks[idx].get();
return chunks[idx];
}
inline void merge(DataChunkCollection* other) {
for (auto& chunk : other->chunks) {
append(std::move(chunk));
merge(std::move(chunk));
}
}
void merge(DataChunk chunk);

private:
DataChunk* allocateChunk(DataChunk& chunk);
void allocateChunk(DataChunk& chunk);

void initTypes(DataChunk& chunk);

private:
storage::MemoryManager* mm;
std::vector<LogicalType> types;
std::vector<std::unique_ptr<DataChunk>> chunks;
std::vector<DataChunk> chunks;
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace common
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class LogicalType {
static std::vector<std::unique_ptr<LogicalType>> copy(
const std::vector<std::unique_ptr<LogicalType>>& types);
static std::vector<LogicalType> copy(const std::vector<LogicalType>& types);
static std::vector<LogicalType> copy(const std::vector<LogicalType*>& types);

static std::unique_ptr<LogicalType> ANY() {
return std::make_unique<LogicalType>(LogicalTypeID::ANY);
Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,13 @@ class Partitioner : public Sink {
std::vector<common::partition_idx_t> numPartitions);

private:
common::DataChunk constructDataChunk(const std::vector<DataPos>& columnPositions,
const std::vector<common::LogicalType>& columnTypes, const ResultSet& resultSet,
const std::shared_ptr<common::DataChunkState>& state);
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(
common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom);
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
Expand Down
34 changes: 10 additions & 24 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,26 @@ class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::offset_t nodeGroupStartOffset,
const std::vector<common::LogicalType*>& dataTypes, MemoryManager* mm)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {
insertInfo.resize(dataTypes.size());
updateInfo.resize(dataTypes.size());
}
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {}

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::offset_t nodeOffset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

common::row_idx_t getRowIdx(common::column_id_t columnID, common::offset_t nodeOffset);
bool insert(std::vector<common::ValueVector*> nodeIDVectors,
std::vector<common::ValueVector*> propertyVectors) override;
bool update(std::vector<common::ValueVector*> nodeIDVectors, common::column_id_t columnID,
common::ValueVector* propertyVector) override;
bool delete_(
common::ValueVector* nodeIDVector, common::ValueVector* /*extraVector*/ = nullptr) override;

inline const offset_to_row_idx_t& getInsertInfoRef(common::column_id_t columnID) {
KU_ASSERT(columnID < insertInfo.size());
return insertInfo[columnID];
inline const offset_to_row_idx_t& getInsertInfoRef() {
return insertChunks.getOffsetToRowIdx();
}
inline const offset_to_row_idx_t& getUpdateInfoRef(common::column_id_t columnID) {
KU_ASSERT(columnID < updateInfo.size());
return updateInfo[columnID];
return getUpdateChunks(columnID).getOffsetToRowIdx();
}

private:
std::vector<offset_to_row_idx_t> insertInfo;
std::vector<offset_to_row_idx_t> updateInfo;
};

class LocalNodeTableData final : public LocalTableData {
Expand All @@ -52,11 +43,6 @@ class LocalNodeTableData final : public LocalTableData {
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
Expand Down
Loading
Loading