Skip to content

Commit

Permalink
fix update bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Sep 28, 2023
1 parent f05d348 commit dd413f0
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 67 deletions.
2 changes: 1 addition & 1 deletion src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class ColumnChunk {
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return bufferSize; }
inline uint8_t* getData() { return buffer.get(); }

virtual void write(const common::Value& val, uint64_t posToWrite);
Expand All @@ -127,6 +126,7 @@ class ColumnChunk {

void populateWithDefaultVal(common::ValueVector* defaultValueVector);

inline uint64_t getCapacity() const { return capacity; }
inline uint64_t getNumValues() const { return numValues; }

inline void setNumValues(uint64_t numValues_) { this->numValues = numValues_; }
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ class CompressedFunctor {

protected:
explicit CompressedFunctor(const common::LogicalType& logicalType)
: copy{logicalType}, physicalType{logicalType.getPhysicalType()} {}
const Uncompressed copy;
: uncompressed{logicalType}, physicalType{logicalType.getPhysicalType()} {}
const Uncompressed uncompressed;
const BooleanBitpacking booleanBitpacking;
const common::PhysicalTypeID physicalType;
};
Expand Down
19 changes: 11 additions & 8 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ namespace storage {

struct CompressionMetadata;

using read_node_column_func_t = std::function<void(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata)>;
using write_node_column_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
using read_values_to_vector_func_t = std::function<void(uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numValuesToRead, const CompressionMetadata& metadata)>;
using write_values_from_vector_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata)>;

using lookup_node_column_func_t =
using read_values_to_page_func_t =
std::function<void(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
uint32_t posInResult, uint64_t numValues, const CompressionMetadata& metadata)>;
// This is a special usage for the `batchLookup` interface.
using batch_lookup_func_t = read_values_to_page_func_t;

class NullNodeColumn;
class StructNodeColumn;
Expand Down Expand Up @@ -123,9 +125,10 @@ class NodeColumn {
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> metadataDA;
std::unique_ptr<NodeColumn> nullColumn;
std::vector<std::unique_ptr<NodeColumn>> childrenColumns;
read_node_column_func_t readNodeColumnFunc;
write_node_column_func_t writeNodeColumnFunc;
lookup_node_column_func_t lookupNodeColumnFunc;
read_values_to_vector_func_t readToVectorFunc;
write_values_from_vector_func_t writeFromVectorFunc;
read_values_to_page_func_t readToPageFunc;
batch_lookup_func_t batchLookupFunc;
RWPropertyStats propertyStatistics;
};

Expand Down
29 changes: 13 additions & 16 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
// Figure out if the chunk needs to be re-compressed
auto metadata = column->getCompressionMetadata(nodeGroupIdx, TransactionType::WRITE);
if (!metadata.canAlwaysUpdateInPlace()) {
auto nodeGroupStartOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
for (auto& [vectorIdx, vector] : chunk->vectors) {
auto vectorStartOffset =
nodeGroupStartOffset + StorageUtils::getStartOffsetOfVectorInChunk(vectorIdx);
for (auto i = 0u; i < vector->vector->state->selVector->selectedSize; i++) {
auto pos = vector->vector->state->selVector->selectedPositions[i];
assert(vector->validityMask[pos]);
Expand Down Expand Up @@ -210,6 +207,19 @@ void LocalColumn::commitLocalChunkInPlace(
}
}

void LocalColumn::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) {
// Trigger rewriting the column chunk to another new place.
auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType());
// First scan the whole column chunk into ColumnChunk.
column->scan(nodeGroupIdx, columnChunk.get());
for (auto& [vectorIdx, vector] : localChunk->vectors) {
columnChunk->update(vector->vector.get(), vectorIdx);
}
// Append the updated ColumnChunk back to column.
column->append(columnChunk.get(), nodeGroupIdx);
}

void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto localChunk = chunks.at(nodeGroupIdx).get();
Expand All @@ -230,19 +240,6 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
}
}

void LocalColumn::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) {
// Trigger rewriting the column chunk to another new place.
auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType());
// First scan the whole column chunk into ColumnChunk.
column->scan(nodeGroupIdx, columnChunk.get());
for (auto& [vectorIdx, vector] : localChunk->vectors) {
columnChunk->update(vector->vector.get(), vectorIdx);
}
// Append the updated ColumnChunk back to column.
column->append(columnChunk.get(), nodeGroupIdx);
}

void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
Expand Down
8 changes: 4 additions & 4 deletions src/storage/store/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ void ReadCompressedValuesFromPageToVector::operator()(uint8_t* frame, PageElemen
const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(),
posInVector, numValuesToRead, metadata);
return uncompressed.decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
case PhysicalTypeID::INT64: {
Expand Down Expand Up @@ -483,7 +483,7 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor&
const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.decompressFromPage(
return uncompressed.decompressFromPage(
frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
Expand Down Expand Up @@ -536,7 +536,7 @@ void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.setValueFromUncompressed(
return uncompressed.setValueFromUncompressed(
vector->getData(), posInVector, frame, posInFrame, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
Expand Down
81 changes: 49 additions & 32 deletions src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace kuzu {
namespace storage {

struct InternalIDNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
static void readValuesFromPageToVector(uint8_t* frame, PageElementCursor& pageCursor,

Check warning on line 22 in src/storage/store/node_column.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/node_column.cpp#L22

Added line #L22 was not covered by tests
ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) {
auto resultData = (internalID_t*)resultVector->getData();
Expand All @@ -38,7 +38,7 @@ struct InternalIDNodeColumnFunc {
};

struct NullNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
static void readValuesFromPageToVector(uint8_t* frame, PageElementCursor& pageCursor,
ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) {
// Read bit-packed null flags from the frame into the result vector
Expand Down Expand Up @@ -81,7 +81,13 @@ struct BoolNodeColumnFunc {
posInVector, (uint64_t*)frame, posInFrame, 1);
}

static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
static void copyValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata) {
NullMask::copyNullMask((uint64_t*)frame, pageCursor.elemPosInPage, (uint64_t*)result,
startPosInResult, numValuesToRead);
}

static void batchLookupFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,

Check warning on line 90 in src/storage/store/node_column.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/node_column.cpp#L90

Added line #L90 was not covered by tests
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata) {
for (auto i = 0; i < numValuesToRead; i++) {
result[startPosInResult + i] =
Expand All @@ -90,27 +96,28 @@ struct BoolNodeColumnFunc {
}
};

static read_node_column_func_t getReadNodeColumnFunc(const LogicalType& logicalType) {
static read_values_to_vector_func_t getReadValuesToVectorFunc(const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::INTERNAL_ID:
return InternalIDNodeColumnFunc::readValuesFromPage;
return InternalIDNodeColumnFunc::readValuesFromPageToVector;
case LogicalTypeID::BOOL:
return BoolNodeColumnFunc::readValuesFromPageToVector;
default:
return ReadCompressedValuesFromPageToVector(logicalType);
}
}

static lookup_node_column_func_t getLookupNodeColumnFunc(const LogicalType& logicalType) {
static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::BOOL:
return BoolNodeColumnFunc::readValuesFromPage;
return BoolNodeColumnFunc::copyValuesFromPage;
default:
return ReadCompressedValuesFromPage(logicalType);
}
}

static write_node_column_func_t getWriteNodeColumnFunc(const LogicalType& logicalType) {
static write_values_from_vector_func_t getWriteValuesFromVectorFunc(
const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::INTERNAL_ID:
return InternalIDNodeColumnFunc::writeValueToPage;
Expand All @@ -121,6 +128,16 @@ static write_node_column_func_t getWriteNodeColumnFunc(const LogicalType& logica
}
}

static batch_lookup_func_t getBatchLookupFromPageFunc(const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::BOOL:
return BoolNodeColumnFunc::batchLookupFromPage;
default: {
return ReadCompressedValuesFromPage(logicalType);
}
}
}

NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
Expand All @@ -132,9 +149,10 @@ NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeader
StorageStructureID::newMetadataID(), metaDAHeaderInfo.dataDAHPageIdx, bufferManager, wal,
transaction);
numBytesPerFixedSizedValue = getDataTypeSizeInChunk(this->dataType);
readNodeColumnFunc = getReadNodeColumnFunc(this->dataType);
lookupNodeColumnFunc = getLookupNodeColumnFunc(this->dataType);
writeNodeColumnFunc = getWriteNodeColumnFunc(this->dataType);
readToVectorFunc = getReadValuesToVectorFunc(this->dataType);
readToPageFunc = getWriteValuesToPageFunc(this->dataType);
batchLookupFunc = getBatchLookupFromPageFunc(this->dataType);
writeFromVectorFunc = getWriteValuesFromVectorFunc(this->dataType);
assert(numBytesPerFixedSizedValue <= BufferPoolConstants::PAGE_4KB_SIZE);
if (requireNullColumn) {
nullColumn = std::make_unique<NullNodeColumn>(metaDAHeaderInfo.nullDAHPageIdx, dataFH,
Expand All @@ -150,7 +168,7 @@ void NodeColumn::batchLookup(
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
lookupNodeColumnFunc(frame, cursor, result, i, 1, chunkMeta.compMeta);
batchLookupFunc(frame, cursor, result, i, 1, chunkMeta.compMeta);
});
}
}
Expand Down Expand Up @@ -186,18 +204,17 @@ void NodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
} else {
auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
auto cursor = PageElementCursor(chunkMetadata.pageIdx, 0);
auto numValuesToScan = chunkMetadata.numValues;
uint64_t numValuesScanned = 0;
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)chunkMetadata.compMeta.numValues(
BufferPoolConstants::PAGE_4KB_SIZE, dataType),
numValuesToScan - numValuesScanned);
uint64_t numValuesPerPage =
chunkMetadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
uint64_t numValuesScanned = 0u;
while (numValuesScanned < columnChunk->getCapacity()) {
auto numValuesToReadInPage =
std::min(numValuesPerPage, columnChunk->getCapacity() - numValuesScanned);
readFromPage(&DUMMY_READ_TRANSACTION, cursor.pageIdx, [&](uint8_t* frame) -> void {
lookupNodeColumnFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToScanInPage, chunkMetadata.compMeta);
readToPageFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToReadInPage, chunkMetadata.compMeta);
});
numValuesScanned += numValuesToScanInPage;
numValuesScanned += numValuesToReadInPage;
cursor.nextPage();
}
columnChunk->setNumValues(chunkMetadata.numValues);
Expand All @@ -223,13 +240,13 @@ void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pag
uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta,
uint64_t startPosInVector) {
uint64_t numValuesScanned = 0;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType) -
pageCursor.elemPosInPage,
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
numValuesToScanInPage, compMeta);
});
numValuesScanned += numValuesToScanInPage;
Expand All @@ -242,16 +259,16 @@ void NodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageC
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType) -
pageCursor.elemPosInPage,
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
if (StorageStructure::isInRange(
nodeIDVector->state->selVector->selectedPositions[posInSelVector], numValuesScanned,
numValuesScanned + numValuesToScanInPage)) {
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, resultVector, numValuesScanned,
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned,
numValuesToScanInPage, compMeta);
});
}
Expand Down Expand Up @@ -289,7 +306,7 @@ void NodeColumn::lookupValue(transaction::Transaction* transaction, offset_t nod
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(
readToVectorFunc(
frame, cursor, resultVector, posInVector, 1 /* numValuesToRead */, chunkMeta.compMeta);
});
}
Expand Down Expand Up @@ -361,7 +378,7 @@ void NodeColumn::writeValue(
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
try {
writeNodeColumnFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom,
writeFromVectorFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom,
posInVectorToWriteFrom, chunkMeta.compMeta);
} catch (Exception& e) {
bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL);
Expand Down Expand Up @@ -449,8 +466,8 @@ NullNodeColumn::NullNodeColumn(page_idx_t metaDAHPageIdx, BMFileHandle* dataFH,
: NodeColumn{LogicalType(LogicalTypeID::BOOL), MetadataDAHInfo{metaDAHPageIdx}, dataFH,
metadataFH, bufferManager, wal, transaction, propertyStatistics,
false /*requireNullColumn*/} {
readNodeColumnFunc = NullNodeColumnFunc::readValuesFromPage;
writeNodeColumnFunc = NullNodeColumnFunc::writeValueToPage;
readToVectorFunc = NullNodeColumnFunc::readValuesFromPageToVector;
writeFromVectorFunc = NullNodeColumnFunc::writeValueToPage;
}

void NullNodeColumn::scan(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/string_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ StringNodeColumn::StringNodeColumn(LogicalType dataType, const MetadataDAHInfo&
: NodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
transaction, stats, true} {
if (this->dataType.getLogicalTypeID() == LogicalTypeID::STRING) {
writeNodeColumnFunc = StringNodeColumnFunc::writeStringValuesToPage;
writeFromVectorFunc = StringNodeColumnFunc::writeStringValuesToPage;
}
overflowMetadataDA = std::make_unique<InMemDiskArray<OverflowColumnChunkMetadata>>(*metadataFH,
StorageStructureID::newMetadataID(), metaDAHeaderInfo.childrenInfos[0]->dataDAHPageIdx,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/var_list_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ offset_t VarListNodeColumn::readOffset(
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType));
pageCursor.pageIdx += chunkMeta.pageIdx;
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */,
readToVectorFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */,
1 /* numValuesToRead */, chunkMeta.compMeta);
});
return offsetVector->getValue<offset_t>(0);
Expand Down
Loading

0 comments on commit dd413f0

Please sign in to comment.