Skip to content

Commit

Permalink
rework local storage: separate the storage of insertions and updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 10, 2024
1 parent 45c5aa9 commit b12ab78
Show file tree
Hide file tree
Showing 49 changed files with 1,233 additions and 1,297 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.3.1.1 LANGUAGES CXX C)
project(Kuzu VERSION 0.3.1.2 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
41 changes: 24 additions & 17 deletions src/catalog/catalog_entry/rel_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

#include "catalog/catalog.h"

using namespace kuzu::common;

namespace kuzu {
namespace catalog {

RelTableCatalogEntry::RelTableCatalogEntry(std::string name, common::table_id_t tableID,
RelTableCatalogEntry::RelTableCatalogEntry(std::string name, table_id_t tableID,
common::RelMultiplicity srcMultiplicity, common::RelMultiplicity dstMultiplicity,
common::table_id_t srcTableID, common::table_id_t dstTableID)
table_id_t srcTableID, table_id_t dstTableID)
: TableCatalogEntry{CatalogEntryType::REL_TABLE_ENTRY, std::move(name), tableID},
srcMultiplicity{srcMultiplicity}, dstMultiplicity{dstMultiplicity}, srcTableID{srcTableID},
dstTableID{dstTableID} {}
Expand All @@ -20,27 +22,32 @@ RelTableCatalogEntry::RelTableCatalogEntry(const RelTableCatalogEntry& other)
dstTableID = other.dstTableID;
}

bool RelTableCatalogEntry::isParent(common::table_id_t tableID) {
bool RelTableCatalogEntry::isParent(table_id_t tableID) {
return srcTableID == tableID || dstTableID == tableID;
}

bool RelTableCatalogEntry::isSingleMultiplicity(common::RelDataDirection direction) const {
column_id_t RelTableCatalogEntry::getColumnID(property_id_t propertyID) const {
auto it = std::find_if(properties.begin(), properties.end(),
[&propertyID](const auto& property) { return property.getPropertyID() == propertyID; });
// Skip the first column in the rel table, which is reserved for nbrID.
return it == properties.end() ? common::INVALID_COLUMN_ID :
std::distance(properties.begin(), it) + 1;
}

bool RelTableCatalogEntry::isSingleMultiplicity(RelDataDirection direction) const {
return getMultiplicity(direction) == common::RelMultiplicity::ONE;
}
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(
common::RelDataDirection direction) const {
return direction == common::RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
common::RelMultiplicity RelTableCatalogEntry::getMultiplicity(RelDataDirection direction) const {
return direction == RelDataDirection::FWD ? dstMultiplicity : srcMultiplicity;
}
common::table_id_t RelTableCatalogEntry::getBoundTableID(
common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? srcTableID : dstTableID;
table_id_t RelTableCatalogEntry::getBoundTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? srcTableID : dstTableID;
}
common::table_id_t RelTableCatalogEntry::getNbrTableID(
common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? dstTableID : srcTableID;
table_id_t RelTableCatalogEntry::getNbrTableID(RelDataDirection relDirection) const {
return relDirection == RelDataDirection::FWD ? dstTableID : srcTableID;
}

void RelTableCatalogEntry::serialize(common::Serializer& serializer) const {
void RelTableCatalogEntry::serialize(Serializer& serializer) const {
TableCatalogEntry::serialize(serializer);
serializer.write(srcMultiplicity);
serializer.write(dstMultiplicity);
Expand All @@ -49,11 +56,11 @@ void RelTableCatalogEntry::serialize(common::Serializer& serializer) const {
}

std::unique_ptr<RelTableCatalogEntry> RelTableCatalogEntry::deserialize(
common::Deserializer& deserializer) {
Deserializer& deserializer) {
common::RelMultiplicity srcMultiplicity;
common::RelMultiplicity dstMultiplicity;
common::table_id_t srcTableID;
common::table_id_t dstTableID;
table_id_t srcTableID;
table_id_t dstTableID;
deserializer.deserializeValue(srcMultiplicity);
deserializer.deserializeValue(dstMultiplicity);
deserializer.deserializeValue(srcTableID);
Expand Down
43 changes: 17 additions & 26 deletions src/common/data_chunk/data_chunk_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ DataChunkCollection::DataChunkCollection(storage::MemoryManager* mm) : mm{mm} {}

void DataChunkCollection::append(DataChunk& chunk) {
auto numTuplesToAppend = chunk.state->selVector->selectedSize;
auto chunkToAppendInfo = chunks.empty() ? allocateChunk(chunk) : chunks.back().get();
auto numTuplesAppended = 0u;
while (numTuplesAppended < numTuplesToAppend) {
if (chunkToAppendInfo->state->selVector->selectedSize == DEFAULT_VECTOR_CAPACITY) {
chunkToAppendInfo = allocateChunk(chunk);
if (chunks.empty() ||
chunks.back().state->selVector->selectedSize == DEFAULT_VECTOR_CAPACITY) {

Check warning on line 13 in src/common/data_chunk/data_chunk_collection.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/data_chunk/data_chunk_collection.cpp#L13

Added line #L13 was not covered by tests
allocateChunk(chunk);
}
auto& chunkToAppend = chunks.back();
auto numTuplesToCopy = std::min(numTuplesToAppend - numTuplesAppended,
DEFAULT_VECTOR_CAPACITY - chunkToAppendInfo->state->selVector->selectedSize);
DEFAULT_VECTOR_CAPACITY - chunkToAppend.state->selVector->selectedSize);
for (auto vectorIdx = 0u; vectorIdx < chunk.getNumValueVectors(); vectorIdx++) {
for (auto i = 0u; i < numTuplesToCopy; i++) {
auto srcPos = chunk.state->selVector->selectedPositions[numTuplesAppended + i];
auto dstPos = chunkToAppendInfo->state->selVector->selectedSize + i;
chunkToAppendInfo->getValueVector(vectorIdx)->copyFromVectorData(
auto dstPos = chunkToAppend.state->selVector->selectedSize + i;
chunkToAppend.getValueVector(vectorIdx)->copyFromVectorData(
dstPos, chunk.getValueVector(vectorIdx).get(), srcPos);
}
}
chunkToAppendInfo->state->selVector->selectedSize += numTuplesToCopy;
chunkToAppend.state->selVector->selectedSize += numTuplesToCopy;
numTuplesAppended += numTuplesToCopy;
}
}

void DataChunkCollection::append(std::unique_ptr<DataChunk> chunk) {
KU_ASSERT(chunk);
void DataChunkCollection::merge(DataChunk chunk) {
if (chunks.empty()) {
initTypes(*chunk);
initTypes(chunk);
}
KU_ASSERT(chunk->getNumValueVectors() == types.size());
for (auto vectorIdx = 0u; vectorIdx < chunk->getNumValueVectors(); vectorIdx++) {
KU_ASSERT(chunk->getValueVector(vectorIdx)->dataType == types[vectorIdx]);
KU_ASSERT(chunk.getNumValueVectors() == types.size());
for (auto vectorIdx = 0u; vectorIdx < chunk.getNumValueVectors(); vectorIdx++) {
KU_ASSERT(chunk.getValueVector(vectorIdx)->dataType == types[vectorIdx]);
}
chunks.push_back(std::move(chunk));
}
Expand All @@ -47,28 +47,19 @@ void DataChunkCollection::initTypes(DataChunk& chunk) {
}
}

std::vector<common::DataChunk*> DataChunkCollection::getChunks() const {
std::vector<common::DataChunk*> ret;
ret.reserve(chunks.size());
for (auto& chunk : chunks) {
ret.push_back(chunk.get());
}
return ret;
}

DataChunk* DataChunkCollection::allocateChunk(DataChunk& chunk) {
DataChunk& DataChunkCollection::allocateChunk(DataChunk& chunk) {
if (chunks.empty()) {
types.reserve(chunk.getNumValueVectors());
for (auto vectorIdx = 0u; vectorIdx < chunk.getNumValueVectors(); vectorIdx++) {
types.push_back(chunk.getValueVector(vectorIdx)->dataType);
}
}
auto newChunk = std::make_unique<DataChunk>(types.size(), std::make_shared<DataChunkState>());
DataChunk newChunk(types.size(), std::make_shared<DataChunkState>());
for (auto i = 0u; i < types.size(); i++) {
newChunk->insert(i, std::make_shared<ValueVector>(types[i], mm));
newChunk.insert(i, std::make_shared<ValueVector>(types[i], mm));
}
chunks.push_back(std::move(newChunk));
return chunks.back().get();
return chunks.back();
}

} // namespace common
Expand Down
9 changes: 9 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,15 @@ std::vector<std::unique_ptr<LogicalType>> LogicalType::copy(
return typesCopy;
}

std::vector<LogicalType> LogicalType::copy(const std::vector<LogicalType*>& types) {
std::vector<LogicalType> typesCopy;
typesCopy.reserve(types.size());
for (auto& type : types) {
typesCopy.push_back(*type->copy());
}
return typesCopy;
}

Check warning on line 453 in src/common/types/types.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/types/types.cpp#L453

Added line #L453 was not covered by tests

PhysicalTypeID LogicalType::getPhysicalType(LogicalTypeID typeID) {
switch (typeID) {
case LogicalTypeID::ANY: {
Expand Down
8 changes: 3 additions & 5 deletions src/function/table/call/storage_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ struct StorageInfoSharedState final : public CallFuncSharedState {
columns.push_back(relTable->getCSRLengthColumn(RelDataDirection::FWD));
columns.push_back(relTable->getCSROffsetColumn(RelDataDirection::BWD));
columns.push_back(relTable->getCSRLengthColumn(RelDataDirection::BWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::FWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::BWD));
for (auto columnID = 0u; columnID < relTable->getNumColumns(); columnID++) {
auto column = relTable->getColumn(columnID, RelDataDirection::FWD);
auto collectedColumns = collectColumns(column);
Expand Down Expand Up @@ -167,10 +165,10 @@ static common::offset_t tableFunc(TableFuncInput& input, TableFuncOutput& output
while (true) {
if (localState->currChunkIdx < localState->dataChunkCollection->getNumChunks()) {
// Copy from local state chunk.
auto chunk = localState->dataChunkCollection->getChunk(localState->currChunkIdx);
auto numValuesToOutput = chunk->state->selVector->selectedSize;
auto& chunk = localState->dataChunkCollection->getChunkUnSafe(localState->currChunkIdx);
auto numValuesToOutput = chunk.state->selVector->selectedSize;
for (auto columnIdx = 0u; columnIdx < dataChunk.getNumValueVectors(); columnIdx++) {
auto localVector = chunk->getValueVector(columnIdx);
auto localVector = chunk.getValueVector(columnIdx);
auto outputVector = dataChunk.getValueVector(columnIdx);
for (auto i = 0u; i < numValuesToOutput; i++) {
outputVector->copyFromVectorData(i, localVector.get(), i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class RelTableCatalogEntry final : public TableCatalogEntry {
//===--------------------------------------------------------------------===//
bool isParent(common::table_id_t tableID) override;
common::TableType getTableType() const override { return common::TableType::REL; }
common::column_id_t getColumnID(common::property_id_t propertyID) const override;
common::table_id_t getSrcTableID() const { return srcTableID; }
common::table_id_t getDstTableID() const { return dstTableID; }
bool isSingleMultiplicity(common::RelDataDirection direction) const;
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/catalog_entry/table_catalog_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TableCatalogEntry : public CatalogEntry {
bool containProperty(const std::string& propertyName) const;
common::property_id_t getPropertyID(const std::string& propertyName) const;
const Property* getProperty(common::property_id_t propertyID) const;
common::column_id_t getColumnID(common::property_id_t propertyID) const;
virtual common::column_id_t getColumnID(common::property_id_t propertyID) const;
bool containPropertyType(const common::LogicalType& logicalType) const;
void addProperty(std::string propertyName, std::unique_ptr<common::LogicalType> dataType);
void dropProperty(common::property_id_t propertyID);
Expand All @@ -52,7 +52,7 @@ class TableCatalogEntry : public CatalogEntry {
static std::unique_ptr<TableCatalogEntry> deserialize(
common::Deserializer& deserializer, CatalogEntryType type);

private:
protected:
common::table_id_t tableID;
std::string comment;
common::property_id_t nextPID;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/column_data_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ namespace common {

enum class ColumnDataFormat : uint8_t { REGULAR = 0, CSR = 1 };

}
} // namespace common
} // namespace kuzu
23 changes: 13 additions & 10 deletions src/include/common/data_chunk/data_chunk_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,39 @@
namespace kuzu {
namespace common {

// TODO(Guodong/Ziyi): We should extend this to ColumnDataCollection, which takes ResultSet into
// consideration for storage and scan.
// TODO(Guodong): Should rework this to use ColumnChunk.
class DataChunkCollection {
public:
explicit DataChunkCollection(storage::MemoryManager* mm);

void append(DataChunk& chunk);
void append(std::unique_ptr<DataChunk> chunk);
std::vector<common::DataChunk*> getChunks() const;

inline const std::vector<common::DataChunk>& getChunks() const { return chunks; }
inline std::vector<common::DataChunk>& getChunksUnSafe() { return chunks; }
inline uint64_t getNumChunks() const { return chunks.size(); }
inline DataChunk* getChunk(uint64_t idx) const {
inline const DataChunk& getChunk(uint64_t idx) const {
KU_ASSERT(idx < chunks.size());
return chunks[idx];
}
inline DataChunk& getChunkUnSafe(uint64_t idx) {
KU_ASSERT(idx < chunks.size());
return chunks[idx].get();
return chunks[idx];
}
inline void merge(DataChunkCollection* other) {
for (auto& chunk : other->chunks) {
append(std::move(chunk));
merge(std::move(chunk));
}
}
void merge(DataChunk chunk);

private:
DataChunk* allocateChunk(DataChunk& chunk);
DataChunk& allocateChunk(DataChunk& chunk);

void initTypes(DataChunk& chunk);

private:
storage::MemoryManager* mm;
std::vector<LogicalType> types;
std::vector<std::unique_ptr<DataChunk>> chunks;
std::vector<DataChunk> chunks;
};

} // namespace common
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ class LogicalType {

static std::vector<std::unique_ptr<LogicalType>> copy(
const std::vector<std::unique_ptr<LogicalType>>& types);
static std::vector<LogicalType> copy(const std::vector<LogicalType*>& types);

static std::unique_ptr<LogicalType> ANY() {
return std::make_unique<LogicalType>(LogicalTypeID::ANY);
Expand Down
5 changes: 4 additions & 1 deletion src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,13 @@ class Partitioner : public Sink {
std::vector<common::partition_idx_t> numPartitions);

private:
common::DataChunk constructDataChunk(const std::vector<DataPos>& columnPositions,
const common::logical_types_t& columnTypes, const ResultSet& resultSet,
const std::shared_ptr<common::DataChunkState>& state);
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(
common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom);
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
Expand Down
34 changes: 10 additions & 24 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,26 @@ class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::offset_t nodeGroupStartOffset,
const std::vector<common::LogicalType*>& dataTypes, MemoryManager* mm)
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {
insertInfo.resize(dataTypes.size());
updateInfo.resize(dataTypes.size());
}
: LocalNodeGroup{nodeGroupStartOffset, dataTypes, mm} {}

void scan(common::ValueVector* nodeIDVector, const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void lookup(common::offset_t nodeOffset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

common::row_idx_t getRowIdx(common::column_id_t columnID, common::offset_t nodeOffset);
bool insert(std::vector<common::ValueVector*> nodeIDVectors,
std::vector<common::ValueVector*> propertyVectors) override;
bool update(std::vector<common::ValueVector*> nodeIDVectors, common::column_id_t columnID,
common::ValueVector* propertyVector) override;
bool delete_(
common::ValueVector* nodeIDVector, common::ValueVector* /*extraVector*/ = nullptr) override;

inline const offset_to_row_idx_t& getInsertInfoRef(common::column_id_t columnID) {
KU_ASSERT(columnID < insertInfo.size());
return insertInfo[columnID];
inline const offset_to_row_idx_t& getInsertInfoRef() {
return insertChunks.getOffsetToRowIdx();
}
inline const offset_to_row_idx_t& getUpdateInfoRef(common::column_id_t columnID) {
KU_ASSERT(columnID < updateInfo.size());
return updateInfo[columnID];
return getUpdateChunks(columnID).getOffsetToRowIdx();
}

private:
std::vector<offset_to_row_idx_t> insertInfo;
std::vector<offset_to_row_idx_t> updateInfo;
};

class LocalNodeTableData final : public LocalTableData {
Expand All @@ -52,11 +43,6 @@ class LocalNodeTableData final : public LocalTableData {
void lookup(common::ValueVector* nodeIDVector,
const std::vector<common::column_id_t>& columnIDs,
const std::vector<common::ValueVector*>& outputVectors);
void insert(common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* nodeIDVector, common::column_id_t columnID,
common::ValueVector* propertyVector);
void delete_(common::ValueVector* nodeIDVector);

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector) override;
Expand Down
Loading

0 comments on commit b12ab78

Please sign in to comment.