Skip to content

Commit

Permalink
fix update bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Sep 28, 2023
1 parent 17723ab commit 5474f75
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 44 deletions.
5 changes: 0 additions & 5 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class ColumnChunk {
}

inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }
inline uint64_t getNumBytes() const { return bufferSize; }
inline uint8_t* getData() { return buffer.get(); }

virtual void write(const common::Value& val, uint64_t posToWrite);
Expand Down Expand Up @@ -152,10 +151,6 @@ class ColumnChunk {
void templateCopyValuesAsString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

virtual inline common::page_idx_t getNumPagesForBuffer() const {
return getNumPagesForBytes(bufferSize);
}

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

virtual void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk);
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ class CompressedFunctor {

protected:
explicit CompressedFunctor(const common::LogicalType& logicalType)
: copy{logicalType}, physicalType{logicalType.getPhysicalType()} {}
const Uncompressed copy;
: uncompressed{logicalType}, physicalType{logicalType.getPhysicalType()} {}
const Uncompressed uncompressed;
const BooleanBitpacking booleanBitpacking;
const common::PhysicalTypeID physicalType;
};
Expand Down
29 changes: 13 additions & 16 deletions src/storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ void LocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
// Figure out if the chunk needs to be re-compressed
auto metadata = column->getCompressionMetadata(nodeGroupIdx, TransactionType::WRITE);
if (!metadata.canAlwaysUpdateInPlace()) {
auto nodeGroupStartOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
for (auto& [vectorIdx, vector] : chunk->vectors) {
auto vectorStartOffset =
nodeGroupStartOffset + StorageUtils::getStartOffsetOfVectorInChunk(vectorIdx);
for (auto i = 0u; i < vector->vector->state->selVector->selectedSize; i++) {
auto pos = vector->vector->state->selVector->selectedPositions[i];
assert(vector->validityMask[pos]);
Expand Down Expand Up @@ -210,6 +207,19 @@ void LocalColumn::commitLocalChunkInPlace(
}
}

void LocalColumn::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) {
// Trigger rewriting the column chunk to another new place.
auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType());
// First scan the whole column chunk into ColumnChunk.
column->scan(nodeGroupIdx, columnChunk.get());
for (auto& [vectorIdx, vector] : localChunk->vectors) {
columnChunk->update(vector->vector.get(), vectorIdx);
}
// Append the updated ColumnChunk back to column.
column->append(columnChunk.get(), nodeGroupIdx);
}

void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto localChunk = chunks.at(nodeGroupIdx).get();
Expand All @@ -230,19 +240,6 @@ void StringLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
}
}

void LocalColumn::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalColumnChunk* localChunk) {
// Trigger rewriting the column chunk to another new place.
auto columnChunk = ColumnChunkFactory::createColumnChunk(column->getDataType());
// First scan the whole column chunk into ColumnChunk.
column->scan(nodeGroupIdx, columnChunk.get());
for (auto& [vectorIdx, vector] : localChunk->vectors) {
columnChunk->update(vector->vector.get(), vectorIdx);
}
// Append the updated ColumnChunk back to column.
column->append(columnChunk.get(), nodeGroupIdx);
}

void VarListLocalColumn::prepareCommitForChunk(node_group_idx_t nodeGroupIdx) {
assert(chunks.contains(nodeGroupIdx));
auto chunk = chunks.at(nodeGroupIdx).get();
Expand Down
8 changes: 4 additions & 4 deletions src/storage/store/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ void ReadCompressedValuesFromPageToVector::operator()(uint8_t* frame, PageElemen
const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.decompressFromPage(frame, pageCursor.elemPosInPage, resultVector->getData(),
posInVector, numValuesToRead, metadata);
return uncompressed.decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
case PhysicalTypeID::INT64: {
Expand Down Expand Up @@ -410,7 +410,7 @@ void ReadCompressedValuesFromPage::operator()(uint8_t* frame, PageElementCursor&
const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.decompressFromPage(
return uncompressed.decompressFromPage(
frame, pageCursor.elemPosInPage, result, startPosInResult, numValuesToRead, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
Expand Down Expand Up @@ -447,7 +447,7 @@ void WriteCompressedValueToPage::operator()(uint8_t* frame, uint16_t posInFrame,
common::ValueVector* vector, uint32_t posInVector, const CompressionMetadata& metadata) {
switch (metadata.compression) {
case CompressionType::UNCOMPRESSED:
return copy.setValueFromUncompressed(
return uncompressed.setValueFromUncompressed(
vector->getData(), posInVector, frame, posInFrame, metadata);
case CompressionType::INTEGER_BITPACKING: {
switch (physicalType) {
Expand Down
28 changes: 13 additions & 15 deletions src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ struct BoolNodeColumnFunc {

static void readValuesFromPage(uint8_t* frame, PageElementCursor& pageCursor, uint8_t* result,
uint32_t startPosInResult, uint64_t numValuesToRead, const CompressionMetadata& metadata) {
for (auto i = 0; i < numValuesToRead; i++) {
result[startPosInResult + i] =
NullMask::isNull((uint64_t*)frame, pageCursor.elemPosInPage + i);
}
NullMask::copyNullMask((uint64_t*)frame, pageCursor.elemPosInPage, (uint64_t*)result,
startPosInResult, numValuesToRead);
}
};

Expand Down Expand Up @@ -186,19 +184,19 @@ void NodeColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
} else {
auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
auto cursor = PageElementCursor(chunkMetadata.pageIdx, 0);
auto numValuesToScan = chunkMetadata.numValues;
uint64_t numValuesScanned = 0;
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)chunkMetadata.compMeta.numValues(
BufferPoolConstants::PAGE_4KB_SIZE, dataType),
numValuesToScan - numValuesScanned);
auto numValuesPerPage =
chunkMetadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
// numValuesPerPage being UINT64_MAX is a special case that bit_width is 0.
auto numValuesToScanInPage = numValuesPerPage == UINT64_MAX ? 0 : numValuesPerPage;
auto numValuesScanned = 0u, numPagesScanned = 0u;
while (numPagesScanned < chunkMetadata.numPages) {
readFromPage(&DUMMY_READ_TRANSACTION, cursor.pageIdx, [&](uint8_t* frame) -> void {
lookupNodeColumnFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToScanInPage, chunkMetadata.compMeta);
});
numValuesScanned += numValuesToScanInPage;
cursor.nextPage();
numPagesScanned++;
}
columnChunk->setNumValues(chunkMetadata.numValues);
}
Expand All @@ -223,10 +221,10 @@ void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pag
uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta,
uint64_t startPosInVector) {
uint64_t numValuesScanned = 0;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType) -
pageCursor.elemPosInPage,
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readNodeColumnFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
Expand All @@ -242,10 +240,10 @@ void NodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageC
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType) -
pageCursor.elemPosInPage,
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
if (StorageStructure::isInRange(
nodeIDVector->state->selVector->selectedPositions[posInSelVector], numValuesScanned,
Expand Down
15 changes: 13 additions & 2 deletions test/storage/node_insertion_deletion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,23 @@ TEST_F(NodeInsertionDeletionTests, InsertManyNodesTest) {
auto preparedStatement = conn->prepare("CREATE (:person {ID:$id});");
for (int64_t i = 0; i < BufferPoolConstants::PAGE_4KB_SIZE; i++) {
auto result =
conn->execute(preparedStatement.get(), std::make_pair(std::string("id"), 10001 + i));
conn->execute(preparedStatement.get(), std::make_pair(std::string("id"), 10000 + i));
ASSERT_TRUE(result->isSuccess()) << result->toString();
}
auto result = conn->query("MATCH (a:person) WHERE a.ID >= 10001 RETURN COUNT(*);");
auto result = conn->query("MATCH (a:person) WHERE a.ID >= 10000 RETURN COUNT(*);");
ASSERT_TRUE(result->hasNext());
auto tuple = result->getNext();
ASSERT_EQ(tuple->getValue(0)->getValue<int64_t>(), BufferPoolConstants::PAGE_4KB_SIZE);
ASSERT_FALSE(result->hasNext());
result = conn->query("MATCH (a:person) WHERE a.ID=10000 RETURN a.ID;");
ASSERT_TRUE(result->hasNext());
tuple = result->getNext();
ASSERT_EQ(tuple->getValue(0)->getValue<int64_t>(), 10000);
result = conn->query("MATCH (a:person) WHERE a.ID>=10000 RETURN a.ID ORDER BY a.ID;");
int64_t i = 0;
while (result->hasNext()) {
tuple = result->getNext();
EXPECT_EQ(10000 + i++, tuple->getValue(0)->getValue<int64_t>());
}
ASSERT_EQ(i, BufferPoolConstants::PAGE_4KB_SIZE);
}

0 comments on commit 5474f75

Please sign in to comment.