Skip to content

Commit

Permalink
rework var list
Browse files Browse the repository at this point in the history
  • Loading branch information
hououou committed Mar 19, 2024
1 parent c3decc2 commit e4571a8
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 202 deletions.
1 change: 1 addition & 0 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class ColumnChunk {

template<typename T>
void setValue(T val, common::offset_t pos) {
KU_ASSERT(pos < capacity);
((T*)buffer.get())[pos] = val;
if (pos >= numValues) {
numValues = pos + 1;
Expand Down
46 changes: 23 additions & 23 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "column.h"
#include "var_list_column_chunk.h"

// List is a nested data type which is stored as two chunks:
// 1. Offset column (type: INT64). Using offset to partition the data column into multiple lists.
Expand All @@ -24,20 +25,21 @@
namespace kuzu {
namespace storage {

struct ListOffsetInfoInStorage {
common::offset_t prevNodeListOffset;
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors;
struct ListOffsetSizeInfo {
common::offset_t numTotal;
std::unique_ptr<ColumnChunk> offsetColumnChunk;
std::unique_ptr<ColumnChunk> sizeColumnChunk;

ListOffsetInfoInStorage(common::offset_t prevNodeListOffset,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors)
: prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {}
ListOffsetSizeInfo(common::offset_t numTotal, std::unique_ptr<ColumnChunk> offsetColumnChunk,
std::unique_ptr<ColumnChunk> sizeColumnChunk)
: numTotal{numTotal}, offsetColumnChunk{std::move(offsetColumnChunk)},
sizeColumnChunk{std::move(sizeColumnChunk)} {}

common::offset_t getListOffset(uint64_t nodePos) const;
uint32_t getListLength(uint64_t pos) const;
common::offset_t getListEndOffset(uint64_t pos) const;
common::offset_t getListStartOffset(uint64_t pos) const;

inline uint64_t getListLength(uint64_t nodePos) const {
KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos));
return getListOffset(nodePos + 1) - getListOffset(nodePos);
}
bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const;
};

class VarListColumn : public Column {
Expand Down Expand Up @@ -69,18 +71,11 @@ class VarListColumn : public Column {
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

private:
inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) {
return offsetInNodeGroup == 0 ?
0 :
readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1);
}

void scanUnfiltered(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector,
const ListOffsetInfoInStorage& listOffsetInfoInStorage);
const ListOffsetSizeInfo& listOffsetInfoInStorage);
void scanFiltered(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::ValueVector* offsetVector, const ListOffsetInfoInStorage& listOffsetInfoInStorage);
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

inline bool canCommitInPlace(transaction::Transaction*, common::node_group_idx_t,
const ChunkCollection&, const offset_to_row_idx_t&, const ChunkCollection&,
Expand All @@ -101,13 +96,18 @@ class VarListColumn : public Column {

common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);
ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction,

uint32_t readSize(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t offsetInNodeGroup);

ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup,
const std::shared_ptr<common::DataChunkState>& state);
common::offset_t endOffsetInNodeGroup);

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
std::unique_ptr<VarListDataColumnChunk> tmpDataColumnChunk;
};

} // namespace storage
Expand Down
58 changes: 33 additions & 25 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ class VarListColumnChunk final : public ColumnChunk {
return varListDataColumnChunk->dataColumnChunk.get();
}

inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); }

void resetToEmpty() override;

inline void setNumValues(uint64_t numValues_) override {
ColumnChunk::setNumValues(numValues_);
sizeColumnChunk->setNumValues(numValues_);
}

void append(common::ValueVector* vector, common::SelectionVector& selVector) final;

void lookup(common::offset_t offsetInChunk, common::ValueVector& output,
Expand All @@ -56,45 +63,46 @@ class VarListColumnChunk final : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

void finalize() override;
inline void resize(uint64_t newCapacity) override {
ColumnChunk::resize(newCapacity);
sizeColumnChunk->resize(newCapacity);
}

inline common::offset_t getListStartOffset(common::offset_t offset) const {
if (numValues == 0)
return 0;
return offset == numValues ? getListEndOffset(offset - 1) :
getListEndOffset(offset) - getListLen(offset);
}

inline common::offset_t getListEndOffset(common::offset_t offset) const {
if (numValues == 0)
return 0;
KU_ASSERT(offset < numValues);
return getValue<uint64_t>(offset);
}

inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
inline uint64_t getListLen(common::offset_t offset) const {
if (numValues == 0)
return 0;
KU_ASSERT(offset < sizeColumnChunk->getNumValues());
return sizeColumnChunk->getValue<uint32_t>(offset);
}

void resetOffset();

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

private:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) override;

inline void initializeIndices() {
indicesColumnChunk = ColumnChunkFactory::createColumnChunk(
*common::LogicalType::INT64(), false /*enableCompression*/, capacity);
indicesColumnChunk->getNullChunk()->resetToAllNull();
for (auto i = 0u; i < numValues; i++) {
indicesColumnChunk->setValue<common::offset_t>(i, i);
indicesColumnChunk->getNullChunk()->setNull(i, nullChunk->isNull(i));
}
indicesColumnChunk->setNumValues(numValues);
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

void resetFromOtherChunk(VarListColumnChunk* other);
void appendNullList();

protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
// where each var list data is inside the column chunk.
// `needFinalize` is set to true whenever `write` is called.
// During `finalize`, the whole column chunk will be re-written according to indices.
bool needFinalize;
std::unique_ptr<ColumnChunk> indicesColumnChunk;
};

} // namespace storage
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
}
} break;
case PhysicalTypeID::VAR_LIST: {
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::UINT32(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal));
} break;
Expand Down
1 change: 1 addition & 0 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ void ColumnChunk::append(
KU_ASSERT(nullChunk->getNumValues() == getNumValues());
nullChunk->append(other->nullChunk.get(), startPosInOtherChunk, numValuesToAppend);
}
KU_ASSERT(numValues + numValuesToAppend <= capacity);
memcpy(buffer.get() + numValues * numBytesPerValue,
other->buffer.get() + startPosInOtherChunk * numBytesPerValue,
numValuesToAppend * numBytesPerValue);
Expand Down
Loading

0 comments on commit e4571a8

Please sign in to comment.