Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update bugs #2098

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class ColumnChunk {
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return bufferSize; }
inline uint8_t* getData() { return buffer.get(); }

virtual void write(const common::Value& val, uint64_t posToWrite);
Expand All @@ -127,6 +126,7 @@ class ColumnChunk {

void populateWithDefaultVal(common::ValueVector* defaultValueVector);

inline uint64_t getCapacity() const { return capacity; }
inline uint64_t getNumValues() const { return numValues; }

inline void setNumValues(uint64_t numValues_) { this->numValues = numValues_; }
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ class CompressedFunctor {

protected:
explicit CompressedFunctor(const common::LogicalType& logicalType)
: copy{logicalType}, physicalType{logicalType.getPhysicalType()} {}
const Uncompressed copy;
: uncompressed{logicalType}, physicalType{logicalType.getPhysicalType()} {}
const Uncompressed uncompressed;
const BooleanBitpacking booleanBitpacking;
const common::PhysicalTypeID physicalType;
};
Expand Down
19 changes: 11 additions & 8 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ namespace storage {

struct CompressionMetadata;

using read_node_column_func_t = std::function<void(uint8_t* frame, PageElementCursor& pageCursor,
common::ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata)>;
using write_node_column_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
using read_values_to_vector_func_t = std::function<void(uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numValuesToRead, const CompressionMetadata& metadata)>;
using write_values_from_vector_func_t = std::function<void(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata)>;

using lookup_node_column_func_t =
using read_values_to_page_func_t =
std::function<void(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
uint32_t posInResult, uint64_t numValues, const CompressionMetadata& metadata)>;
// This is a special usage for the `batchLookup` interface.
using batch_lookup_func_t = read_values_to_page_func_t;

class NullNodeColumn;
class StructNodeColumn;
Expand Down Expand Up @@ -123,9 +125,10 @@ class NodeColumn {
std::unique_ptr<InMemDiskArray<ColumnChunkMetadata>> metadataDA;
std::unique_ptr<NodeColumn> nullColumn;
std::vector<std::unique_ptr<NodeColumn>> childrenColumns;
read_node_column_func_t readNodeColumnFunc;
write_node_column_func_t writeNodeColumnFunc;
lookup_node_column_func_t lookupNodeColumnFunc;
read_values_to_vector_func_t readToVectorFunc;
write_values_from_vector_func_t writeFromVectorFunc;
read_values_to_page_func_t readToPageFunc;
batch_lookup_func_t batchLookupFunc;
RWPropertyStats propertyStatistics;
};

Expand Down
29 changes: 13 additions & 16 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
// Figure out if the chunk needs to be re-compressed
auto metadata = column->getCompressionMetadata(nodeGroupIdx, TransactionType::WRITE);
if (!metadata.canAlwaysUpdateInPlace()) {
auto nodeGroupStartOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
for (auto& [vectorIdx, vector] : chunk->vectors) {
auto vectorStartOffset =
nodeGroupStartOffset + StorageUtils::getStartOffsetOfVectorInChunk(vectorIdx);
for (auto i = 0u; i < vector->vector->state->selVector->selectedSize; i++) {
auto pos = vector->vector->state->selVector->selectedPositions[i];
assert(vector->validityMask[pos]);
Expand Down Expand Up @@ -210,6 +207,19 @@ void LocalColumn::commitLocalChunkInPlace(
}
}

void LocalColumn::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) {
// Trigger rewriting the column chunk to another new place.
auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType());
// First scan the whole column chunk into ColumnChunk.
column->scan(nodeGroupIdx, columnChunk.get());
for (auto& [vectorIdx, vector] : localChunk->vectors) {
columnChunk->update(vector->vector.get(), vectorIdx);
}
// Append the updated ColumnChunk back to column.
column->append(columnChunk.get(), nodeGroupIdx);
}

void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto localChunk = chunks.at(nodeGroupIdx).get();
Expand All @@ -230,19 +240,6 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
}
}

void LocalColumn::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) {
// Trigger rewriting the column chunk to another new place.
auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType());
// First scan the whole column chunk into ColumnChunk.
column->scan(nodeGroupIdx, columnChunk.get());
for (auto& [vectorIdx, vector] : localChunk->vectors) {
columnChunk->update(vector->vector.get(), vectorIdx);
}
// Append the updated ColumnChunk back to column.
column->append(columnChunk.get(), nodeGroupIdx);
}

void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
Expand Down
8 changes: 4 additions & 4 deletions src/storage/store/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ void ReadCompressedValuesFromPageToVector::operator()(uint8_t* frame, PageElemen
const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(),
posInVector, numValuesToRead, metadata);
return uncompressed.decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
case PhysicalTypeID::INT64: {
Expand Down Expand Up @@ -483,7 +483,7 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor&
const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.decompressFromPage(
return uncompressed.decompressFromPage(
frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
Expand Down Expand Up @@ -536,7 +536,7 @@ void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.setValueFromUncompressed(
return uncompressed.setValueFromUncompressed(
vector->getData(), posInVector, frame, posInFrame, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
Expand Down
81 changes: 49 additions & 32 deletions src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace storage {

struct InternalIDNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
static void readValuesFromPageToVector(uint8_t* frame, PageElementCursor& pageCursor,

Check warning on line 22 in src/storage/store/node_column.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/node_column.cpp#L22

Added line #L22 was not covered by tests
ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) {
auto resultData = (internalID_t*)resultVector->getData();
Expand All @@ -38,7 +38,7 @@
};

struct NullNodeColumnFunc {
static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor,
static void readValuesFromPageToVector(uint8_t* frame, PageElementCursor& pageCursor,
ValueVector* resultVector, uint32_t posInVector, uint32_t numValuesToRead,
const CompressionMetadata& metadata) {
// Read bit-packed null flags from the frame into the result vector
Expand Down Expand Up @@ -81,7 +81,13 @@
posInVector, (uint64_t*)frame, posInFrame, 1);
}

static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
static void copyValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata) {
NullMask::copyNullMask((uint64_t*)frame, pageCursor.elemPosInPage, (uint64_t*)result,
startPosInResult, numValuesToRead);
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
}

static void batchLookupFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,

Check warning on line 90 in src/storage/store/node_column.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/node_column.cpp#L90

Added line #L90 was not covered by tests
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata) {
for (auto i = 0; i < numValuesToRead; i++) {
result[startPosInResult + i] =
Expand All @@ -90,27 +96,28 @@
}
};

static read_node_column_func_t getReadNodeColumnFunc(const LogicalType& logicalType) {
static read_values_to_vector_func_t getReadValuesToVectorFunc(const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::INTERNAL_ID:
return InternalIDNodeColumnFunc::readValuesFromPage;
return InternalIDNodeColumnFunc::readValuesFromPageToVector;
case LogicalTypeID::BOOL:
return BoolNodeColumnFunc::readValuesFromPageToVector;
default:
return ReadCompressedValuesFromPageToVector(logicalType);
}
}

static lookup_node_column_func_t getLookupNodeColumnFunc(const LogicalType& logicalType) {
static read_values_to_page_func_t getWriteValuesToPageFunc(const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::BOOL:
return BoolNodeColumnFunc::readValuesFromPage;
return BoolNodeColumnFunc::copyValuesFromPage;
default:
return ReadCompressedValuesFromPage(logicalType);
}
}

static write_node_column_func_t getWriteNodeColumnFunc(const LogicalType& logicalType) {
static write_values_from_vector_func_t getWriteValuesFromVectorFunc(
const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::INTERNAL_ID:
return InternalIDNodeColumnFunc::writeValueToPage;
Expand All @@ -121,6 +128,16 @@
}
}

static batch_lookup_func_t getBatchLookupFromPageFunc(const LogicalType& logicalType) {
switch (logicalType.getLogicalTypeID()) {
case LogicalTypeID::BOOL:
return BoolNodeColumnFunc::batchLookupFromPage;
default: {
return ReadCompressedValuesFromPage(logicalType);
}
}
}

NodeColumn::NodeColumn(LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
Expand All @@ -132,9 +149,10 @@
StorageStructureID::newMetadataID(), metaDAHeaderInfo.dataDAHPageIdx, bufferManager, wal,
transaction);
numBytesPerFixedSizedValue = getDataTypeSizeInChunk(this->dataType);
readNodeColumnFunc = getReadNodeColumnFunc(this->dataType);
lookupNodeColumnFunc = getLookupNodeColumnFunc(this->dataType);
writeNodeColumnFunc = getWriteNodeColumnFunc(this->dataType);
readToVectorFunc = getReadValuesToVectorFunc(this->dataType);
readToPageFunc = getWriteValuesToPageFunc(this->dataType);
batchLookupFunc = getBatchLookupFromPageFunc(this->dataType);
writeFromVectorFunc = getWriteValuesFromVectorFunc(this->dataType);
assert(numBytesPerFixedSizedValue <= BufferPoolConstants::PAGE_4KB_SIZE);
if (requireNullColumn) {
nullColumn = std::make_unique<NullNodeColumn>(metaDAHeaderInfo.nullDAHPageIdx, dataFH,
Expand All @@ -150,7 +168,7 @@
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
lookupNodeColumnFunc(frame, cursor, result, i, 1, chunkMeta.compMeta);
batchLookupFunc(frame, cursor, result, i, 1, chunkMeta.compMeta);
});
}
}
Expand Down Expand Up @@ -186,18 +204,17 @@
} else {
auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
auto cursor = PageElementCursor(chunkMetadata.pageIdx, 0);
auto numValuesToScan = chunkMetadata.numValues;
uint64_t numValuesScanned = 0;
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)chunkMetadata.compMeta.numValues(
BufferPoolConstants::PAGE_4KB_SIZE, dataType),
numValuesToScan - numValuesScanned);
uint64_t numValuesPerPage =
chunkMetadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
uint64_t numValuesScanned = 0u;
while (numValuesScanned < columnChunk->getCapacity()) {
auto numValuesToReadInPage =
std::min(numValuesPerPage, columnChunk->getCapacity() - numValuesScanned);
readFromPage(&DUMMY_READ_TRANSACTION, cursor.pageIdx, [&](uint8_t* frame) -> void {
lookupNodeColumnFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToScanInPage, chunkMetadata.compMeta);
readToPageFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToReadInPage, chunkMetadata.compMeta);
});
numValuesScanned += numValuesToScanInPage;
numValuesScanned += numValuesToReadInPage;
cursor.nextPage();
}
columnChunk->setNumValues(chunkMetadata.numValues);
Expand All @@ -223,13 +240,13 @@
uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta,
uint64_t startPosInVector) {
uint64_t numValuesScanned = 0;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType) -
pageCursor.elemPosInPage,
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
numValuesToScanInPage, compMeta);
});
numValuesScanned += numValuesToScanInPage;
Expand All @@ -242,16 +259,16 @@
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType) -
pageCursor.elemPosInPage,
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
if (StorageStructure::isInRange(
nodeIDVector->state->selVector->selectedPositions[posInSelVector], numValuesScanned,
numValuesScanned + numValuesToScanInPage)) {
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, resultVector, numValuesScanned,
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned,
numValuesToScanInPage, compMeta);
});
}
Expand Down Expand Up @@ -289,7 +306,7 @@
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(
readToVectorFunc(
frame, cursor, resultVector, posInVector, 1 /* numValuesToRead */, chunkMeta.compMeta);
});
}
Expand Down Expand Up @@ -361,7 +378,7 @@
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
try {
writeNodeColumnFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom,
writeFromVectorFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom,
posInVectorToWriteFrom, chunkMeta.compMeta);
} catch (Exception& e) {
bufferManager->unpin(*wal->fileHandle, walPageInfo.pageIdxInWAL);
Expand Down Expand Up @@ -449,8 +466,8 @@
: NodeColumn{LogicalType(LogicalTypeID::BOOL), MetadataDAHInfo{metaDAHPageIdx}, dataFH,
metadataFH, bufferManager, wal, transaction, propertyStatistics,
false /*requireNullColumn*/} {
readNodeColumnFunc = NullNodeColumnFunc::readValuesFromPage;
writeNodeColumnFunc = NullNodeColumnFunc::writeValueToPage;
readToVectorFunc = NullNodeColumnFunc::readValuesFromPageToVector;
writeFromVectorFunc = NullNodeColumnFunc::writeValueToPage;
}

void NullNodeColumn::scan(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/string_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ StringNodeColumn::StringNodeColumn(LogicalType dataType, const MetadataDAHInfo&
: NodeColumn{std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
transaction, stats, true} {
if (this->dataType.getLogicalTypeID() == LogicalTypeID::STRING) {
writeNodeColumnFunc = StringNodeColumnFunc::writeStringValuesToPage;
writeFromVectorFunc = StringNodeColumnFunc::writeStringValuesToPage;
}
overflowMetadataDA = std::make_unique<InMemDiskArray<OverflowColumnChunkMetadata>>(*metadataFH,
StorageStructureID::newMetadataID(), metaDAHeaderInfo.childrenInfos[0]->dataDAHPageIdx,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/var_list_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ offset_t VarListNodeColumn::readOffset(
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType));
pageCursor.pageIdx += chunkMeta.pageIdx;
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */,
readToVectorFunc(frame, pageCursor, offsetVector.get(), 0 /* posInVector */,
1 /* numValuesToRead */, chunkMeta.compMeta);
});
return offsetVector->getValue<offset_t>(0);
Expand Down
Loading
Loading