Skip to content

Commit

Permalink
Merge pull request #1937 from kuzudb/list-local-storage
Browse files Browse the repository at this point in the history
List local storage
  • Loading branch information
ray6080 committed Aug 16, 2023
2 parents 62a4ac6 + 22e0f3a commit 5fc4852
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 76 deletions.
42 changes: 33 additions & 9 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,28 @@ class NullColumnChunk;
struct ColumnChunkMetadata {
common::page_idx_t pageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t numPages = 0;

ColumnChunkMetadata() = default;
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
: pageIdx(pageIdx), numPages(numPages) {}
};

struct MainColumnChunkMetadata : public ColumnChunkMetadata {
uint64_t numValues;

MainColumnChunkMetadata() = default;
MainColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk)
: ColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
};

struct OverflowColumnChunkMetadata : public ColumnChunkMetadata {
common::offset_t lastOffsetInPage;

OverflowColumnChunkMetadata() = default;
OverflowColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage)
: ColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {}
};

// Base data segment covers all fixed-sized data types.
Expand All @@ -31,6 +49,7 @@ struct OverflowColumnChunkMetadata : public ColumnChunkMetadata {
class ColumnChunk {
public:
friend class ColumnChunkFactory;
friend class VarListDataColumnChunk;

// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
Expand Down Expand Up @@ -85,7 +104,7 @@ class ColumnChunk {
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return numBytes; }
inline uint64_t getNumBytes() const { return bufferSize; }
inline uint8_t* getData() { return buffer.get(); }

virtual void write(const common::Value& val, uint64_t posToWrite);
Expand All @@ -101,9 +120,13 @@ class ColumnChunk {

void populateWithDefaultVal(common::ValueVector* defaultValueVector);

inline uint64_t getNumValues() const { return numValues; }

inline void setNumValues(uint64_t numValues_) { this->numValues = numValues_; }

protected:
// Initializes the data buffer. Is (and should be) only called in constructor.
virtual void initialize(common::offset_t numValues);
virtual void initialize(common::offset_t capacity);

template<typename T>
void templateCopyArrowArray(
Expand All @@ -118,7 +141,7 @@ class ColumnChunk {
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual inline common::page_idx_t getNumPagesForBuffer() const {
return getNumPagesForBytes(numBytes);
return getNumPagesForBytes(bufferSize);
}

common::offset_t getOffsetInBuffer(common::offset_t pos) const;
Expand All @@ -128,11 +151,12 @@ class ColumnChunk {
protected:
common::LogicalType dataType;
uint32_t numBytesPerValue;
uint64_t numBytes;
uint64_t bufferSize;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
const common::CopyDescription* copyDescription;
uint64_t numValues;
};

template<>
Expand Down Expand Up @@ -171,7 +195,7 @@ class NullColumnChunk : public BoolColumnChunk {

void resize(uint64_t numValues) final;

inline void resetNullBuffer() { memset(buffer.get(), 0 /* non null */, numBytes); }
inline void resetNullBuffer() { memset(buffer.get(), 0 /* non null */, bufferSize); }

protected:
inline uint64_t numBytesForValues(common::offset_t numValues) const {
Expand All @@ -180,9 +204,9 @@ class NullColumnChunk : public BoolColumnChunk {
}
inline void initialize(common::offset_t numValues) final {
numBytesPerValue = 0;
numBytes = numBytesForValues(numValues);
bufferSize = numBytesForValues(numValues);
// Each byte defaults to 0, indicating everything is non-null
buffer = std::make_unique<uint8_t[]>(numBytes);
buffer = std::make_unique<uint8_t[]>(bufferSize);
}
};

Expand All @@ -205,9 +229,9 @@ class SerialColumnChunk : public ColumnChunk {

inline void initialize(common::offset_t numValues) final {
numBytesPerValue = 0;
numBytes = 0;
bufferSize = 0;
// Each byte defaults to 0, indicating everything is non-null
buffer = std::make_unique<uint8_t[]>(numBytes);
buffer = std::make_unique<uint8_t[]>(bufferSize);
}
};

Expand Down
41 changes: 28 additions & 13 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,33 @@ namespace kuzu {
namespace storage {

struct VarListDataColumnChunk {
std::unique_ptr<ColumnChunk> dataChunk;
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;
std::unique_ptr<ColumnChunk> dataColumnChunk;
uint64_t capacity;

explicit VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataChunk{std::move(dataChunk)}, numValuesInDataChunk{0},
capacityInDataChunk{StorageConstants::NODE_GROUP_SIZE} {}
: dataColumnChunk{std::move(dataChunk)}, capacity{StorageConstants::NODE_GROUP_SIZE} {}

void reset();

void resize(uint64_t numValues);
void resizeBuffer(uint64_t numValues);

inline void append(ValueVector* dataVector) const {
dataColumnChunk->append(dataVector, dataColumnChunk->getNumValues());
}

inline uint64_t getNumValues() const { return dataColumnChunk->getNumValues(); }

inline void increaseNumValues(uint64_t numValues) const {
dataColumnChunk->numValues += numValues;
}
};

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataChunk.get();
return varListDataColumnChunk.dataColumnChunk.get();
}

void setValueFromString(const char* value, uint64_t length, uint64_t pos);
Expand All @@ -36,9 +44,14 @@ class VarListColumnChunk : public ColumnChunk {

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

inline void resizeDataColumnChunk(uint64_t numBytesForBuffer) {
varListDataColumnChunk.resizeBuffer(
numBytesForBuffer / varListDataColumnChunk.dataColumnChunk->getNumBytesPerValue());
}

private:
inline common::page_idx_t getNumPages() const final {
return varListDataColumnChunk.dataChunk->getNumPages() + ColumnChunk::getNumPages();
return varListDataColumnChunk.dataColumnChunk->getNumPages() + ColumnChunk::getNumPages();
}

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;
Expand All @@ -53,18 +66,20 @@ class VarListColumnChunk : public ColumnChunk {
void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto dataChunkOffsetToAppend = varListDataColumnChunk.numValuesInDataChunk;
auto dataChunkOffsetToAppend = varListDataColumnChunk.getNumValues();
auto curListOffset = varListDataColumnChunk.getNumValues();
for (auto i = 0u; i < numValuesToAppend; i++) {
nullChunk->setNull(i + startPosInChunk, listArray->IsNull(i));
auto length = listArray->value_length(i);
varListDataColumnChunk.numValuesInDataChunk += length;
setValue(varListDataColumnChunk.numValuesInDataChunk, i + startPosInChunk);
curListOffset += length;
setValue(curListOffset, i + startPosInChunk);
}
auto startOffset = listArray->value_offset(startPosInChunk);
auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(
varListDataColumnChunk.resizeBuffer(curListOffset);
varListDataColumnChunk.dataColumnChunk->append(
listArray->values().get(), dataChunkOffsetToAppend, endOffset - startOffset);
numValues += numValuesToAppend;
}

void write(const common::Value& listVal, uint64_t posToWrite) override;
Expand Down
8 changes: 8 additions & 0 deletions src/include/storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ class StringLocalColumn : public LocalColumn {
common::node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk);
};

class VarListLocalColumn : public LocalColumn {
public:
explicit VarListLocalColumn(storage::NodeColumn* column) : LocalColumn{column} {};

private:
void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx) final;
};

struct LocalColumnFactory {
static std::unique_ptr<LocalColumn> createLocalColumn(storage::NodeColumn* column);
};
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class StorageUtils {
common::node_group_idx_t nodeGroupIdx) {
return nodeGroupIdx << common::StorageConstants::NODE_GROUP_SIZE_LOG2;
}
static inline common::offset_t getStartOffsetOfVector(common::vector_idx_t vectorIdx) {
static inline common::offset_t getStartOffsetOfVectorInChunk(common::vector_idx_t vectorIdx) {
return vectorIdx << common::DEFAULT_VECTOR_CAPACITY_LOG_2;
}
static inline common::node_group_idx_t getNodeGroupIdx(common::offset_t nodeOffset) {
Expand All @@ -111,7 +111,7 @@ class StorageUtils {
auto startOffsetOfNodeGroup = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto offsetInChunk = nodeOffset - startOffsetOfNodeGroup;
auto vectorIdx = getVectorIdx(offsetInChunk);
return std::make_pair(vectorIdx, offsetInChunk - getStartOffsetOfVector(vectorIdx));
return std::make_pair(vectorIdx, offsetInChunk - getStartOffsetOfVectorInChunk(vectorIdx));
}

static std::string getNodeIndexFName(const std::string& directory,
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class NullNodeColumn;
class NodeColumn {
friend class LocalColumn;
friend class StringLocalColumn;
friend class VarListLocalColumn;
friend class transaction::TransactionTests;

public:
Expand Down Expand Up @@ -128,7 +129,7 @@ class NodeColumn {
BMFileHandle* metadataFH;
BufferManager* bufferManager;
WAL* wal;
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> metadataDA;
std::unique_ptr<InMemDiskArray<MainColumnChunkMetadata>> metadataDA;
std::unique_ptr<NodeColumn> nullColumn;
std::vector<std::unique_ptr<NodeColumn>> childrenColumns;
read_node_column_func_t readNodeColumnFunc;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/store/var_list_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ struct ListOffsetInfoInStorage {
};

class VarListNodeColumn : public NodeColumn {
friend class VarListLocalColumn;

public:
VarListNodeColumn(common::LogicalType dataType,
const catalog::MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
Expand All @@ -59,6 +61,8 @@ class VarListNodeColumn : public NodeColumn {
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
Expand Down
32 changes: 20 additions & 12 deletions src/storage/copier/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@ namespace storage {

ColumnChunk::ColumnChunk(LogicalType dataType, CopyDescription* copyDescription, bool hasNullChunk)
: dataType{std::move(dataType)}, numBytesPerValue{getDataTypeSizeInChunk(this->dataType)},
copyDescription{copyDescription} {
copyDescription{copyDescription}, numValues{0} {
if (hasNullChunk) {
nullChunk = std::make_unique<NullColumnChunk>();
}
}

void ColumnChunk::initialize(offset_t numValues) {
numBytes = numBytesPerValue * numValues;
buffer = std::make_unique<uint8_t[]>(numBytes);
void ColumnChunk::initialize(offset_t capacity) {
bufferSize = numBytesPerValue * capacity;
buffer = std::make_unique<uint8_t[]>(bufferSize);
if (nullChunk) {
static_cast<ColumnChunk*>(nullChunk.get())->initialize(numValues);
static_cast<ColumnChunk*>(nullChunk.get())->initialize(capacity);
}
}

void ColumnChunk::resetToEmpty() {
if (nullChunk) {
nullChunk->resetNullBuffer();
}
numValues = 0;
}

void ColumnChunk::append(common::ValueVector* vector, common::offset_t startPosInChunk) {
Expand All @@ -50,6 +51,7 @@ void ColumnChunk::append(common::ValueVector* vector, common::offset_t startPosI
throw NotImplementedException{"ColumnChunk::append"};
}
}
numValues += vector->state->selVector->selectedSize;
}

void ColumnChunk::append(
Expand Down Expand Up @@ -77,6 +79,7 @@ void ColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
memcpy(buffer.get() + startPosInChunk * numBytesPerValue,
other->buffer.get() + startPosInOtherChunk * numBytesPerValue,
numValuesToAppend * numBytesPerValue);
numValues += numValuesToAppend;
}

void ColumnChunk::append(
Expand Down Expand Up @@ -117,6 +120,7 @@ void ColumnChunk::append(
throw NotImplementedException("ColumnChunk::append");
}
}
numValues += numValuesToAppend;
}

void ColumnChunk::write(const Value& val, uint64_t posToWrite) {
Expand Down Expand Up @@ -158,8 +162,8 @@ void ColumnChunk::write(const Value& val, uint64_t posToWrite) {
void ColumnChunk::resize(uint64_t numValues) {
auto numBytesAfterResize = numValues * numBytesPerValue;
auto resizedBuffer = std::make_unique<uint8_t[]>(numBytesAfterResize);
memcpy(resizedBuffer.get(), buffer.get(), numBytes);
numBytes = numBytesAfterResize;
memcpy(resizedBuffer.get(), buffer.get(), bufferSize);
bufferSize = numBytesAfterResize;
buffer = std::move(resizedBuffer);
if (nullChunk) {
nullChunk->resize(numValues);
Expand Down Expand Up @@ -321,7 +325,7 @@ page_idx_t ColumnChunk::getNumPages() const {
}

page_idx_t ColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
FileUtils::writeToFile(dataFH->getFileInfo(), buffer.get(), numBytes,
FileUtils::writeToFile(dataFH->getFileInfo(), buffer.get(), bufferSize,
startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
return getNumPagesForBuffer();
}
Expand Down Expand Up @@ -362,12 +366,14 @@ void BoolColumnChunk::append(common::ValueVector* vector, common::offset_t start
common::NullMask::setNull(
(uint64_t*)buffer.get(), startPosInChunk + i, vector->getValue<bool>(pos));
}
numValues += vector->state->selVector->selectedSize;
}

void BoolColumnChunk::append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
assert(array->type_id() == arrow::Type::BOOL);
templateCopyArrowArray<bool>(array, startPosInChunk, numValuesToAppend);
numValues += numValuesToAppend;
}

void BoolColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
Expand All @@ -379,6 +385,7 @@ void BoolColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOthe
nullChunk->append(
other->getNullChunk(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
}
numValues += numValuesToAppend;
}

void FixedListColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
Expand All @@ -394,6 +401,7 @@ void FixedListColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherCh
otherChunk->buffer.get() + getOffsetInBuffer(startPosInOtherChunk + i),
numBytesPerValue);
}
numValues += numValuesToAppend;
}

void FixedListColumnChunk::write(const Value& fixedListVal, uint64_t posToWrite) {
Expand Down Expand Up @@ -518,7 +526,7 @@ offset_t ColumnChunk::getOffsetInBuffer(offset_t pos) const {
auto posCursor = PageUtils::getPageByteCursorForPos(pos, numElementsInAPage, numBytesPerValue);
auto offsetInBuffer =
posCursor.pageIdx * BufferPoolConstants::PAGE_4KB_SIZE + posCursor.offsetInPage;
assert(offsetInBuffer + numBytesPerValue <= numBytes);
assert(offsetInBuffer + numBytesPerValue <= bufferSize);
return offsetInBuffer;
}

Expand All @@ -536,12 +544,12 @@ void ColumnChunk::copyVectorToBuffer(

void NullColumnChunk::resize(uint64_t numValues) {
auto numBytesAfterResize = numBytesForValues(numValues);
assert(numBytesAfterResize > numBytes);
assert(numBytesAfterResize > bufferSize);
auto reservedBuffer = std::make_unique<uint8_t[]>(numBytesAfterResize);
memset(reservedBuffer.get(), 0 /* non null */, numBytesAfterResize);
memcpy(reservedBuffer.get(), buffer.get(), numBytes);
memcpy(reservedBuffer.get(), buffer.get(), bufferSize);
buffer = std::move(reservedBuffer);
numBytes = numBytesAfterResize;
bufferSize = numBytesAfterResize;
}

} // namespace storage
Expand Down
Loading

0 comments on commit 5fc4852

Please sign in to comment.