Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 9, 2023
1 parent af4ee97 commit bf59483
Show file tree
Hide file tree
Showing 9 changed files with 773 additions and 76 deletions.
14 changes: 4 additions & 10 deletions src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class StringColumnChunk : public ColumnChunk {
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
common::page_idx_t flushBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx) final;

template<typename T>
void setValueFromString(const char* value, uint64_t length, uint64_t pos) {
Expand All @@ -26,18 +25,13 @@ class StringColumnChunk : public ColumnChunk {
throw common::NotImplementedException("VarSizedColumnChunk::getValue");
}

inline common::page_idx_t getNumPagesForMainChunk() const {
return ColumnChunk::getNumPagesForBuffer();
}

inline common::page_idx_t getNumPagesForOverflow() const { return overflowFile->getNumPages(); }
common::page_idx_t flushOverflowBuffer(BMFileHandle* dataFH, common::page_idx_t startPageIdx);

protected:
inline common::page_idx_t getNumPagesForBuffer() const final {
return ColumnChunk::getNumPagesForBuffer() + overflowFile->getNumPages();
private:
inline common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + overflowFile->getNumPages();
}

private:
template<typename T>
void templateCopyVarSizedValuesFromString(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);
Expand Down
12 changes: 4 additions & 8 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct VarListDataColumnChunk {

void reset();

void resize() {}
void resize(uint64_t numValues);
};

class VarListColumnChunk : public ColumnChunk {
Expand Down Expand Up @@ -51,7 +51,7 @@ class VarListColumnChunk : public ColumnChunk {
void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto originalSize = varListDataColumnChunk.numValuesInDataChunk;
auto dataChunkOffsetToAppend = varListDataColumnChunk.numValuesInDataChunk;
for (auto i = 0u; i < numValuesToAppend; i++) {
nullChunk->setNull(i + startPosInChunk, listArray->IsNull(i));
auto length = listArray->value_length(i);
Expand All @@ -60,17 +60,13 @@ class VarListColumnChunk : public ColumnChunk {
}
auto startOffset = listArray->value_offset(startPosInChunk);
auto endOffset = listArray->value_offset(startPosInChunk + numValuesToAppend);
assert(
varListDataColumnChunk.numValuesInDataChunk - originalSize == endOffset - startOffset);
resizeDataChunk(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(
listArray->values().get(), originalSize, endOffset - startOffset);
listArray->values().get(), dataChunkOffsetToAppend, endOffset - startOffset);
}

void write(const common::Value& listVal, uint64_t posToWrite) override;

void resizeDataChunk(uint64_t numValues);

private:
VarListDataColumnChunk varListDataColumnChunk;

Expand Down
3 changes: 0 additions & 3 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ class NodeColumn {
return nodeOffset >> common::StorageConstants::NODE_GROUP_SIZE_LOG2;
}

// data: length: 5, str: a e d d d, length: 10, str: a e d d d
// main: offset 0, offset 5,

protected:
StorageStructureID storageStructureID;
common::LogicalType dataType;
Expand Down
1 change: 0 additions & 1 deletion src/storage/copier/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ void ColumnChunk::write(const Value& val, uint64_t posToWrite) {

void ColumnChunk::resize(uint64_t numValues) {
auto numBytesAfterResize = numValues * numBytesPerValue;
assert(numBytesAfterResize > numBytes);
auto resizedBuffer = std::make_unique<uint8_t[]>(numBytesAfterResize);
memcpy(resizedBuffer.get(), buffer.get(), numBytes);
numBytes = numBytesAfterResize;
Expand Down
10 changes: 3 additions & 7 deletions src/storage/copier/string_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void StringColumnChunk::append(
}
}

void StringColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
void StringColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto otherChunk = reinterpret_cast<StringColumnChunk*>(other);
nullChunk->append(
Expand All @@ -91,15 +91,13 @@ void StringColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOt
}
}

page_idx_t StringColumnChunk::flushBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
ColumnChunk::flushBuffer(dataFH, startPageIdx);
startPageIdx += ColumnChunk::getNumPagesForBuffer();
page_idx_t StringColumnChunk::flushOverflowBuffer(BMFileHandle* dataFH, page_idx_t startPageIdx) {
for (auto i = 0u; i < overflowFile->getNumPages(); i++) {
FileUtils::writeToFile(dataFH->getFileInfo(), overflowFile->getPage(i)->data,
BufferPoolConstants::PAGE_4KB_SIZE, startPageIdx * BufferPoolConstants::PAGE_4KB_SIZE);
startPageIdx++;
}
return getNumPagesForBuffer();
return overflowFile->getNumPages();
}

void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other,
Expand All @@ -121,8 +119,6 @@ void StringColumnChunk::appendStringColumnChunk(StringColumnChunk* other,
other->overflowFile->getPage(cursorToCopyFrom.pageIdx)->data +
cursorToCopyFrom.offsetInPage,
&kuVals[posInChunk]);
TypeUtils::decodeOverflowPtr(kuVals[posInChunk].overflowPtr, cursorToCopyFrom.pageIdx,
cursorToCopyFrom.offsetInPage);
}
}

Expand Down
30 changes: 14 additions & 16 deletions src/storage/copier/var_list_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ void VarListDataColumnChunk::reset() {
numValuesInDataChunk = 0;
}

void VarListDataColumnChunk::resize(uint64_t numValues) {
if (numValues <= capacityInDataChunk) {
return;
}
while (capacityInDataChunk < numValues) {
capacityInDataChunk *= 2;
}
dataChunk->resize(capacityInDataChunk);
}

VarListColumnChunk::VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription)
: ColumnChunk{std::move(dataType), copyDescription, true /* hasNullChunk */},
varListDataColumnChunk{ColumnChunkFactory::createColumnChunk(
Expand Down Expand Up @@ -43,19 +53,17 @@ void VarListColumnChunk::append(ColumnChunk* other, common::offset_t startPosInO
nullChunk->append(
other->getNullChunk(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
auto otherListChunk = reinterpret_cast<VarListColumnChunk*>(other);
auto posInDataColumnChunkToAppend = varListDataColumnChunk.numValuesInDataChunk;
auto offsetInDataChunkToAppend = varListDataColumnChunk.numValuesInDataChunk;
for (auto i = 0u; i < numValuesToAppend; i++) {
varListDataColumnChunk.numValuesInDataChunk +=
otherListChunk->getListLen(startPosInOtherChunk + i);
setValue(varListDataColumnChunk.numValuesInDataChunk, startPosInChunk + i);
}
auto startOffset = otherListChunk->getListOffset(startPosInOtherChunk);
auto endOffset = otherListChunk->getListOffset(startPosInOtherChunk + numValuesToAppend);
assert(varListDataColumnChunk.numValuesInDataChunk - posInDataColumnChunkToAppend ==
endOffset - startOffset);
resizeDataChunk(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk);
varListDataColumnChunk.dataChunk->append(otherListChunk->varListDataColumnChunk.dataChunk.get(),
startOffset, posInDataColumnChunkToAppend, endOffset - startOffset);
startOffset, offsetInDataChunkToAppend, endOffset - startOffset);
}

void VarListColumnChunk::copyVarListFromArrowString(
Expand Down Expand Up @@ -89,7 +97,7 @@ void VarListColumnChunk::copyVarListFromArrowString(
void VarListColumnChunk::write(const Value& listVal, uint64_t posToWrite) {
assert(listVal.getDataType()->getPhysicalType() == PhysicalTypeID::VAR_LIST);
auto numValuesInList = NestedVal::getChildrenSize(&listVal);
resizeDataChunk(varListDataColumnChunk.numValuesInDataChunk + numValuesInList);
varListDataColumnChunk.resize(varListDataColumnChunk.numValuesInDataChunk + numValuesInList);
for (auto i = 0u; i < numValuesInList; i++) {
varListDataColumnChunk.dataChunk->write(
*NestedVal::getChildVal(&listVal, i), varListDataColumnChunk.numValuesInDataChunk);
Expand All @@ -109,15 +117,5 @@ void VarListColumnChunk::resetToEmpty() {
varListDataColumnChunk.reset();
}

void VarListColumnChunk::resizeDataChunk(uint64_t numValues) {
if (numValues <= varListDataColumnChunk.capacityInDataChunk) {
return;
}
while (varListDataColumnChunk.capacityInDataChunk < numValues) {
varListDataColumnChunk.capacityInDataChunk *= 2;
}
varListDataColumnChunk.dataChunk->resize(varListDataColumnChunk.capacityInDataChunk);
}

} // namespace storage
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void NodeColumn::batchLookup(const offset_t* nodeOffsets, size_t size, uint8_t*
auto nodeGroupIdx = getNodeGroupIdxFromNodeOffset(nodeOffset);
auto cursor = PageUtils::getPageElementCursorForPos(nodeOffset, numValuesPerPage);
cursor.pageIdx +=
metadataDA->get(nodeGroupIdx * 2, dummyReadOnlyTransaction->getType()).pageIdx;
metadataDA->get(nodeGroupIdx, dummyReadOnlyTransaction->getType()).pageIdx;
readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) -> void {
memcpy(result + i * numBytesPerFixedSizedValue,
frame + (cursor.elemPosInPage * numBytesPerFixedSizedValue),
Expand Down
32 changes: 6 additions & 26 deletions src/storage/store/string_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,14 @@ void StringNodeColumn::scan(transaction::Transaction* transaction, node_group_id

page_idx_t StringNodeColumn::append(kuzu::storage::ColumnChunk* columnChunk,
common::page_idx_t startPageIdx, uint64_t nodeGroupIdx) {
auto numPagesForMainChunk = NodeColumn::append(columnChunk, startPageIdx, nodeGroupIdx);
auto stringColumnChunk = reinterpret_cast<StringColumnChunk*>(columnChunk);
// Main column chunk.
page_idx_t numPagesFlushed = 0;
auto numPagesForChunk = columnChunk->flushBuffer(dataFH, startPageIdx);
auto numPagesForMainChunk = stringColumnChunk->getNumPagesForMainChunk();
auto numPagesForOverflow = stringColumnChunk->getNumPagesForOverflow();
metadataDA->resize(nodeGroupIdx + 1);
metadataDA->update(nodeGroupIdx, ColumnChunkMetadata{startPageIdx, numPagesForMainChunk});
startPageIdx += numPagesForMainChunk;
auto numPagesForOverflow =
stringColumnChunk->flushOverflowBuffer(dataFH, startPageIdx + numPagesForMainChunk);
overflowMetadataDA->resize(nodeGroupIdx + 1);
overflowMetadataDA->update(
nodeGroupIdx, ColumnChunkMetadata{startPageIdx, numPagesForOverflow});
startPageIdx += numPagesForOverflow;
numPagesFlushed += numPagesForChunk;
// Null column chunk.
auto numPagesForNullChunk =
nullColumn->append(columnChunk->getNullChunk(), startPageIdx, nodeGroupIdx);
numPagesFlushed += numPagesForNullChunk;
startPageIdx += numPagesForNullChunk;
// Children column chunks.
assert(childrenColumns.size() == columnChunk->getNumChildren());
for (auto i = 0u; i < childrenColumns.size(); i++) {
auto numPagesForChild =
childrenColumns[i]->append(columnChunk->getChild(i), startPageIdx, nodeGroupIdx);
numPagesFlushed += numPagesForChild;
startPageIdx += numPagesForChild;
}
return numPagesFlushed;
overflowMetadataDA->update(nodeGroupIdx,
ColumnChunkMetadata{startPageIdx + numPagesForMainChunk, numPagesForOverflow});
return numPagesForMainChunk + numPagesForOverflow;
}

void StringNodeColumn::checkpointInMemory() {
Expand Down
Loading

0 comments on commit bf59483

Please sign in to comment.