Skip to content

Commit

Permalink
rework local storage: separate the storage of insertions and updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 4, 2024
1 parent d1d0958 commit 30ced06
Show file tree
Hide file tree
Showing 45 changed files with 1,185 additions and 1,234 deletions.
41 changes: 24 additions & 17 deletions src/catalog/catalog_entry/rel_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

#include "catalog/catalog.h"

using namespace kuzu::common;

namespace kuzu {
namespace catalog {

RelTableCatalogEntry::RelTableCatalogEntry(std::string name, common::table_id_t tableID,
RelTableCatalogEntry::RelTableCatalogEntry(std::string name, table_id_t tableID,
common::RelMultiplicity srcMultiplicity, common::RelMultiplicity dstMultiplicity,
common::table_id_t srcTableID, common::table_id_t dstTableID)
table_id_t srcTableID, table_id_t dstTableID)
: TableCatalogEntry{CatalogEntryType::REL_TABLE_ENTRY, std::move(name), tableID},
srcMultiplicity{srcMultiplicity}, dstMultiplicity{dstMultiplicity}, srcTableID{srcTableID},
dstTableID{dstTableID} {}
Expand All @@ -20,27 +22,32 @@ RelTableCatalogEntry::RelTableCatalogEntry(const RelTableCatalogEntry& other)
dstTableID = other.dstTableID;
}

bool RelTableCatalogEntry::isParent(common::table_id_t tableID) {
bool RelTableCatalogEntry::isParent(table_id_t tableID) {
return srcTableID == tableID || dstTableID == tableID;
}

bool RelTableCatalogEntry::isSingleMultiplicity(common::RelDataDirection direction) const {
column_id_t RelTableCatalogEntry::getColumnID(property_id_t propertyID) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
// Skip the first column in the rel table, which is reserved for nbrID.
return it == properties.end() ? common::INVALID_COLUMN_ID :
std::distance(properties.begin(), it) + 1;
}

bool RelTableCatalogEntry::isSingleMultiplicity(RelDataDirection direction) const {
return getMultiplicity(direction) == common::RelMultiplicity::ONE;
}
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(
common::RelDataDirection direction) const {
return direction == common::RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(RelDataDirection direction) const {
return direction == RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
}
common::table_id_t RelTableCatalogEntry::getBoundTableID(
common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? srcTableID : dstTableID;
table_id_t RelTableCatalogEntry::getBoundTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? srcTableID : dstTableID;
}
common::table_id_t RelTableCatalogEntry::getNbrTableID(
common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? dstTableID : srcTableID;
table_id_t RelTableCatalogEntry::getNbrTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? dstTableID : srcTableID;
}

void RelTableCatalogEntry::serialize(common::Serializer& serializer) const {
void RelTableCatalogEntry::serialize(Serializer& serializer) const {
TableCatalogEntry::serialize(serializer);
serializer.write(srcMultiplicity);
serializer.write(dstMultiplicity);
Expand All @@ -49,11 +56,11 @@ void RelTableCatalogEntry::serialize(common::Serializer& serializer) const {
}

std::unique_ptr<RelTableCatalogEntry> RelTableCatalogEntry::deserialize(
common::Deserializer& deserializer) {
Deserializer& deserializer) {
common::RelMultiplicity srcMultiplicity;
common::RelMultiplicity dstMultiplicity;
common::table_id_t srcTableID;
common::table_id_t dstTableID;
table_id_t srcTableID;
table_id_t dstTableID;
deserializer.deserializeValue(srcMultiplicity);
deserializer.deserializeValue(dstMultiplicity);
deserializer.deserializeValue(srcTableID);
Expand Down
10 changes: 10 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,16 @@ std::vector<std::unique_ptr<LogicalType>> LogicalType::copy(
return typesCopy;
}

std::vector<std::unique_ptr<LogicalType>> LogicalType::copy(
const std::vector<LogicalType*>& types) {
std::vector<std::unique_ptr<LogicalType>> typesCopy;
typesCopy.reserve(types.size());
for (auto& type : types) {
typesCopy.push_back(type->copy());
}
return typesCopy;
}

Check warning on line 454 in src/common/types/types.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/types/types.cpp#L454

Added line #L454 was not covered by tests

PhysicalTypeID LogicalType::getPhysicalType(LogicalTypeID typeID) {
switch (typeID) {
case LogicalTypeID::ANY: {
Expand Down
2 changes: 0 additions & 2 deletions src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ struct StorageInfoSharedState final : public CallFuncSharedState {
columns.push_back(relTable->getCSRLengthColumn(RelDataDirection::FWD));
columns.push_back(relTable->getCSROffsetColumn(RelDataDirection::BWD));
columns.push_back(relTable->getCSRLengthColumn(RelDataDirection::BWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::FWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::BWD));
for (auto columnID = 0u; columnID < relTable->getNumColumns(); columnID++) {
auto column = relTable->getColumn(columnID, RelDataDirection::FWD);
auto collectedColumns = collectColumns(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class RelTableCatalogEntry final : public TableCatalogEntry {
//===--------------------------------------------------------------------===//
bool isParent(common::table_id_t tableID) override;
common::TableType getTableType() const override { return common::TableType::REL; }
common::column_id_t getColumnID(common::property_id_t propertyID) const override;
common::table_id_t getSrcTableID() const { return srcTableID; }
common::table_id_t getDstTableID() const { return dstTableID; }
bool isSingleMultiplicity(common::RelDataDirection direction) const;
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/catalog_entry/table_catalog_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TableCatalogEntry : public CatalogEntry {
bool containProperty(const std::string& propertyName) const;
common::property_id_t getPropertyID(const std::string& propertyName) const;
const Property* getProperty(common::property_id_t propertyID) const;
common::column_id_t getColumnID(common::property_id_t propertyID) const;
virtual common::column_id_t getColumnID(common::property_id_t propertyID) const;
bool containPropertyType(const common::LogicalType& logicalType) const;
void addProperty(std::string propertyName, std::unique_ptr<common::LogicalType> dataType);
void dropProperty(common::property_id_t propertyID);
Expand All @@ -52,7 +52,7 @@ class TableCatalogEntry : public CatalogEntry {
static std::unique_ptr<TableCatalogEntry> deserialize(
common::Deserializer& deserializer, CatalogEntryType type);

private:
protected:
common::table_id_t tableID;
std::string comment;
common::property_id_t nextPID;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/column_data_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ namespace common {

enum class ColumnDataFormat : uint8_t { REGULAR = 0, CSR = 1 };

}
} // namespace common
} // namespace kuzu
3 changes: 1 addition & 2 deletions src/include/common/data_chunk/data_chunk_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
namespace kuzu {
namespace common {

// TODO(Guodong/Ziyi): We should extend this to ColumnDataCollection, which takes ResultSet into
// consideration for storage and scan.
// TODO(Guodong): Should rework this to use ColumnChunk.
class DataChunkCollection {
public:
explicit DataChunkCollection(storage::MemoryManager* mm);
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ class LogicalType {

static std::vector<std::unique_ptr<LogicalType>> copy(
const std::vector<std::unique_ptr<LogicalType>>& types);
static std::vector<std::unique_ptr<LogicalType>> copy(const std::vector<LogicalType*>& types);

static std::unique_ptr<LogicalType> ANY() {
return std::make_unique<LogicalType>(LogicalTypeID::ANY);
Expand Down
34 changes: 10 additions & 24 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,26 @@ class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::offset_t nodeGroupStartOffset,
const std::vector<common::LogicalType*>& dataTypes, MemoryManager* mm)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {
insertInfo.resize(dataTypes.size());
updateInfo.resize(dataTypes.size());
}
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {}

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::offset_t nodeOffset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

common::row_idx_t getRowIdx(common::column_id_t columnID, common::offset_t nodeOffset);
bool insert(std::vector<common::ValueVector*> nodeIDVectors,
std::vector<common::ValueVector*> propertyVectors) override;
bool update(std::vector<common::ValueVector*> nodeIDVectors, common::column_id_t columnID,
common::ValueVector* propertyVector) override;
bool delete_(
common::ValueVector* nodeIDVector, common::ValueVector* /*extraVector*/ = nullptr) override;

inline const offset_to_row_idx_t& getInsertInfoRef(common::column_id_t columnID) {
KU_ASSERT(columnID < insertInfo.size());
return insertInfo[columnID];
inline const offset_to_row_idx_t& getInsertInfoRef() {
return insertChunks.getOffsetToRowIdx();
}
inline const offset_to_row_idx_t& getUpdateInfoRef(common::column_id_t columnID) {
KU_ASSERT(columnID < updateInfo.size());
return updateInfo[columnID];
return getUpdateChunks(columnID).getOffsetToRowIdx();
}

private:
std::vector<offset_to_row_idx_t> insertInfo;
std::vector<offset_to_row_idx_t> updateInfo;
};

class LocalNodeTableData final : public LocalTableData {
Expand All @@ -52,11 +43,6 @@ class LocalNodeTableData final : public LocalTableData {
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
Expand Down
96 changes: 23 additions & 73 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,89 +7,46 @@
namespace kuzu {
namespace storage {

static constexpr common::column_id_t REL_ID_COLUMN_ID = 0;

// Info of node groups with CSR chunks for rel tables.
// Note that srcNodeOffset here are the relative offset within each node group.
struct RelNGInfo {
update_insert_info_t adjInsertInfo;
std::vector<update_insert_info_t> insertInfoPerChunk;
std::vector<update_insert_info_t> updateInfoPerChunk;
delete_info_t deleteInfo;
common::RelMultiplicity multiplicity;

RelNGInfo(common::RelMultiplicity multiplicity, common::column_id_t numChunks)
: multiplicity{multiplicity} {
insertInfoPerChunk.resize(numChunks);
updateInfoPerChunk.resize(numChunks);
}

bool insert(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::row_idx_t adjNodeRowIdx, const std::vector<common::row_idx_t>& propertyNodesRowIdx);
void update(common::offset_t srcOffsetInChunk, common::offset_t relOffset,
common::column_id_t columnID, common::row_idx_t rowIdx);
bool delete_(common::offset_t srcOffsetInChunk, common::offset_t relOffset);

bool hasUpdates();

uint64_t getNumInsertedTuples(common::offset_t srcOffsetInChunk);

const update_insert_info_t& getUpdateInfo(common::column_id_t columnID) {
KU_ASSERT(columnID == common::INVALID_COLUMN_ID || columnID < updateInfoPerChunk.size());
return columnID == common::INVALID_COLUMN_ID ? getEmptyInfo() :
updateInfoPerChunk[columnID];
}
const update_insert_info_t& getInsertInfo(common::column_id_t columnID) {
KU_ASSERT(columnID == common::INVALID_COLUMN_ID || columnID < insertInfoPerChunk.size());
return columnID == common::INVALID_COLUMN_ID ? adjInsertInfo : insertInfoPerChunk[columnID];
}
const delete_info_t& getDeleteInfo() const { return deleteInfo; }

const update_insert_info_t& getEmptyInfo();

private:
inline static bool contains(
const std::unordered_set<common::offset_t>& set, common::offset_t value) {
return set.find(value) != set.end();
}
};
static constexpr common::column_id_t LOCAL_NBR_ID_COLUMN_ID = 0;
static constexpr common::column_id_t LOCAL_REL_ID_COLUMN_ID = 1;

class LocalRelNG final : public LocalNodeGroup {
friend class RelTableData;

public:
LocalRelNG(common::offset_t nodeGroupStartOffset, std::vector<common::LogicalType*> dataTypes,
MemoryManager* mm, common::RelMultiplicity multiplicity);

common::row_idx_t scanCSR(common::offset_t srcOffsetInChunk,
common::offset_t posToReadForOffset, const std::vector<common::column_id_t>& columnIDs,
common::row_idx_t scanCSR(common::offset_t srcOffset, common::offset_t posToReadForOffset,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVector);
// For CSR, we need to apply updates and deletions here, while insertions are handled by
// `scanCSR`.
void applyLocalChangesForCSRColumns(common::offset_t srcOffsetInChunk,
void applyLocalChangesToScannedVectors(common::offset_t srcOffset,
const std::vector<common::column_id_t>& columnIDs, common::ValueVector* relIDVector,
const std::vector<common::ValueVector*>& outputVectors);

bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
common::column_id_t columnID, common::ValueVector* propertyVector);
bool delete_(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector);
bool insert(std::vector<common::ValueVector*> nodeIDVectors,
std::vector<common::ValueVector*> vectors) override;
bool update(std::vector<common::ValueVector*> nodeIDVectors, common::column_id_t columnID,
common::ValueVector* propertyVector) override;
bool delete_(common::ValueVector* srcNodeVector, common::ValueVector* relIDVector) override;

inline LocalVectorCollection* getAdjChunk() { return adjChunk.get(); }
inline LocalVectorCollection* getPropertyChunk(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return chunks[columnID].get();
}
inline RelNGInfo* getRelNGInfo() { return relNGInfo.get(); }
common::offset_t getNumInsertedRels(common::offset_t srcOffset) const;
void getChangesPerCSRSegment(
std::vector<int64_t>& sizeChangesPerSegment, std::vector<bool>& hasChangesPerSegment);

private:
void applyCSRUpdates(common::offset_t srcOffsetInChunk, common::column_id_t columnID,
common::ValueVector* relIDVector, common::ValueVector* outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk, const delete_info_t& deleteInfo,
common::ValueVector* relIDVector);
static common::vector_idx_t getSegmentIdx(common::offset_t offset) {
return offset >> common::StorageConstants::CSR_SEGMENT_SIZE_LOG2;
}

void applyCSRUpdates(common::column_id_t columnID, common::ValueVector* relIDVector,
common::ValueVector* outputVector);
void applyCSRDeletions(common::offset_t srcOffsetInChunk, common::ValueVector* relIDVector);

private:
std::unique_ptr<LocalVectorCollection> adjChunk;
std::unique_ptr<RelNGInfo> relNGInfo;
common::RelMultiplicity multiplicity;
};

class LocalRelTableData final : public LocalTableData {
Expand All @@ -100,13 +57,6 @@ class LocalRelTableData final : public LocalTableData {
std::vector<common::LogicalType*> dataTypes, MemoryManager* mm)
: LocalTableData{std::move(dataTypes), mm}, multiplicity{multiplicity} {}

bool insert(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
common::column_id_t columnID, common::ValueVector* propertyVector);
bool delete_(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
common::ValueVector* relIDVector);

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;

Expand Down
Loading

0 comments on commit 30ced06

Please sign in to comment.