Skip to content

Commit

Permalink
Merge pull request #2164 from kuzudb/storage-header-cleanup
Browse files Browse the repository at this point in the history
Clean up node_column.h and column_chunk.h
  • Loading branch information
ray6080 committed Oct 10, 2023
2 parents 9944b70 + a030ef0 commit abca5e6
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 310 deletions.
33 changes: 0 additions & 33 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,39 +227,6 @@ class NullColumnChunk : public BoolColumnChunk {
bool mayHaveNullValue;
};

class FixedListColumnChunk : public ColumnChunk {
public:
FixedListColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression)
: ColumnChunk(std::move(dataType), std::move(csvReaderConfig), enableCompression,
true /* hasNullChunk */) {}

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void write(const common::Value& fixedListVal, uint64_t posToWrite) final;

void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk) final;

private:
uint64_t getBufferSize() const final;
};

class SerialColumnChunk : public ColumnChunk {
public:
SerialColumnChunk()
: ColumnChunk{common::LogicalType(common::LogicalTypeID::SERIAL),
nullptr /* copyDescription */, false /*enableCompression*/,
false /* hasNullChunk */} {}

inline void initialize(common::offset_t numValues) final {
numBytesPerValue = 0;
bufferSize = 0;
// Each byte defaults to 0, indicating everything is non-null
buffer = std::make_unique<uint8_t[]>(bufferSize);
}
};

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, common::CSVReaderConfig* csvReaderConfig = nullptr);
Expand Down
47 changes: 0 additions & 47 deletions src/include/storage/store/node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,53 +133,6 @@ class NodeColumn {
bool enableCompression;
};

class BoolNodeColumn : public NodeColumn {
public:
BoolNodeColumn(const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
bool enableCompression, bool requireNullColumn = true);
};

class NullNodeColumn : public NodeColumn {
friend StructNodeColumn;

public:
NullNodeColumn(common::page_idx_t metaDAHPageIdx, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats propertyStatistics,
bool enableCompression);

void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;
void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final;

void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final;
void setNull(common::offset_t nodeOffset) final;

protected:
void writeInternal(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) final;
};

class SerialNodeColumn : public NodeColumn {
public:
SerialNodeColumn(const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH,
BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction);

void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) final;
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) final;
};

struct NodeColumnFactory {
static std::unique_ptr<NodeColumn> createNodeColumn(const common::LogicalType& dataType,
const MetadataDAHInfo& metaDAHeaderInfo, BMFileHandle* dataFH, BMFileHandle* metadataFH,
Expand Down
223 changes: 123 additions & 100 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,129 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace storage {

void BoolColumnChunk::initialize(common::offset_t capacity) {
numBytesPerValue = 0;
bufferSize = numBytesForValues(capacity);
this->capacity = capacity;
buffer = std::make_unique<uint8_t[]>(bufferSize);
if (nullChunk) {
static_cast<BoolColumnChunk*>(nullChunk.get())->initialize(capacity);
}
}

void BoolColumnChunk::resize(uint64_t capacity) {
auto numBytesAfterResize = numBytesForValues(capacity);
assert(numBytesAfterResize > bufferSize);
auto reservedBuffer = std::make_unique<uint8_t[]>(numBytesAfterResize);
memset(reservedBuffer.get(), 0 /* non null */, numBytesAfterResize);
memcpy(reservedBuffer.get(), buffer.get(), bufferSize);
buffer = std::move(reservedBuffer);
bufferSize = numBytesAfterResize;
this->capacity = numValues;
if (nullChunk) {
nullChunk->resize(capacity);
}
}

void NullColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
copyFromBuffer((uint64_t*)static_cast<NullColumnChunk*>(other)->buffer.get(),
startPosInOtherChunk, startPosInChunk, numValuesToAppend);
}

class FixedListColumnChunk : public ColumnChunk {
public:
FixedListColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool enableCompression)
: ColumnChunk(std::move(dataType), std::move(csvReaderConfig), enableCompression,
true /* hasNullChunk */) {}

void append(ColumnChunk* other, offset_t startPosInOtherChunk, common::offset_t startPosInChunk,
uint32_t numValuesToAppend) final {
auto otherChunk = (FixedListColumnChunk*)other;
if (nullChunk) {
nullChunk->append(otherChunk->nullChunk.get(), startPosInOtherChunk, startPosInChunk,
numValuesToAppend);
}
// TODO(Guodong): This can be optimized to not copy one by one.
for (auto i = 0u; i < numValuesToAppend; i++) {
memcpy(buffer.get() + getOffsetInBuffer(startPosInChunk + i),
otherChunk->buffer.get() + getOffsetInBuffer(startPosInOtherChunk + i),
numBytesPerValue);
}
numValues += numValuesToAppend;
}

void write(const Value& fixedListVal, uint64_t posToWrite) final {
assert(fixedListVal.getDataType()->getPhysicalType() == PhysicalTypeID::FIXED_LIST);
nullChunk->setNull(posToWrite, fixedListVal.isNull());
if (fixedListVal.isNull()) {
return;
}
auto numValues = NestedVal::getChildrenSize(&fixedListVal);
auto childType = FixedListType::getChildType(fixedListVal.getDataType());
auto numBytesPerValueInList = getDataTypeSizeInChunk(*childType);
auto bufferToWrite = buffer.get() + posToWrite * numBytesPerValue;
for (auto i = 0u; i < numValues; i++) {
auto val = NestedVal::getChildVal(&fixedListVal, i);
switch (childType->getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(bufferToWrite, &val->getValueReference<int64_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::INT32: {
memcpy(bufferToWrite, &val->getValueReference<int32_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::INT16: {
memcpy(bufferToWrite, &val->getValueReference<int16_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(bufferToWrite, &val->getValueReference<double_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(bufferToWrite, &val->getValueReference<float_t>(), numBytesPerValueInList);
} break;
default: {
throw NotImplementedException{"FixedListColumnChunk::write"};
}
}
bufferToWrite += numBytesPerValueInList;
}
}

void copyVectorToBuffer(common::ValueVector* vector, common::offset_t startPosInChunk) final {
auto vectorDataToWriteFrom = vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + getOffsetInBuffer(startPosInChunk + i),
vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
}
}

private:
uint64_t getBufferSize() const final {
auto numElementsInAPage =
PageUtils::getNumElementsInAPage(numBytesPerValue, false /* hasNull */);
auto numPages = capacity / numElementsInAPage + (capacity % numElementsInAPage ? 1 : 0);
return common::BufferPoolConstants::PAGE_4KB_SIZE * numPages;
}
};

class SerialColumnChunk : public ColumnChunk {
public:
SerialColumnChunk()
: ColumnChunk{common::LogicalType(common::LogicalTypeID::SERIAL),
nullptr /* copyDescription */, false /*enableCompression*/,
false /* hasNullChunk */} {}

inline void initialize(common::offset_t numValues) final {
numBytesPerValue = 0;
bufferSize = 0;
// Each byte defaults to 0, indicating everything is non-null
buffer = std::make_unique<uint8_t[]>(bufferSize);
}
};

ColumnChunkMetadata fixedSizedFlushBuffer(const uint8_t* buffer, uint64_t bufferSize,
BMFileHandle* dataFH, page_idx_t startPageIdx, const ColumnChunkMetadata& metadata) {
FileUtils::writeToFile(dataFH->getFileInfo(), buffer, bufferSize,
Expand Down Expand Up @@ -365,82 +488,6 @@ void BoolColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOthe
numValues += numValuesToAppend;
}

void NullColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
copyFromBuffer((uint64_t*)static_cast<NullColumnChunk*>(other)->buffer.get(),
startPosInOtherChunk, startPosInChunk, numValuesToAppend);
}

void FixedListColumnChunk::append(ColumnChunk* other, offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto otherChunk = (FixedListColumnChunk*)other;
if (nullChunk) {
nullChunk->append(
otherChunk->nullChunk.get(), startPosInOtherChunk, startPosInChunk, numValuesToAppend);
}
// TODO(Guodong): This can be optimized to not copy one by one.
for (auto i = 0u; i < numValuesToAppend; i++) {
memcpy(buffer.get() + getOffsetInBuffer(startPosInChunk + i),
otherChunk->buffer.get() + getOffsetInBuffer(startPosInOtherChunk + i),
numBytesPerValue);
}
numValues += numValuesToAppend;
}

void FixedListColumnChunk::write(const Value& fixedListVal, uint64_t posToWrite) {
assert(fixedListVal.getDataType()->getPhysicalType() == PhysicalTypeID::FIXED_LIST);
nullChunk->setNull(posToWrite, fixedListVal.isNull());
if (fixedListVal.isNull()) {
return;
}
auto numValues = NestedVal::getChildrenSize(&fixedListVal);
auto childType = FixedListType::getChildType(fixedListVal.getDataType());
auto numBytesPerValueInList = getDataTypeSizeInChunk(*childType);
auto bufferToWrite = buffer.get() + posToWrite * numBytesPerValue;
for (auto i = 0u; i < numValues; i++) {
auto val = NestedVal::getChildVal(&fixedListVal, i);
switch (childType->getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(bufferToWrite, &val->getValueReference<int64_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::INT32: {
memcpy(bufferToWrite, &val->getValueReference<int32_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::INT16: {
memcpy(bufferToWrite, &val->getValueReference<int16_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(bufferToWrite, &val->getValueReference<double_t>(), numBytesPerValueInList);
} break;
case PhysicalTypeID::FLOAT: {
memcpy(bufferToWrite, &val->getValueReference<float_t>(), numBytesPerValueInList);
} break;
default: {
throw NotImplementedException{"FixedListColumnChunk::write"};
}
}
bufferToWrite += numBytesPerValueInList;
}
}

void FixedListColumnChunk::copyVectorToBuffer(
common::ValueVector* vector, common::offset_t startPosInChunk) {
auto vectorDataToWriteFrom = vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + getOffsetInBuffer(startPosInChunk + i),
vectorDataToWriteFrom + pos * numBytesPerValue, numBytesPerValue);
}
}

uint64_t FixedListColumnChunk::getBufferSize() const {
auto numElementsInAPage =
PageUtils::getNumElementsInAPage(numBytesPerValue, false /* hasNull */);
auto numPages = capacity / numElementsInAPage + (capacity % numElementsInAPage ? 1 : 0);
return common::BufferPoolConstants::PAGE_4KB_SIZE * numPages;
}

std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
const LogicalType& dataType, bool enableCompression, CSVReaderConfig* csvReaderConfig) {
auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr;
Expand Down Expand Up @@ -522,30 +569,6 @@ void ColumnChunk::copyVectorToBuffer(
}
}

inline void BoolColumnChunk::initialize(common::offset_t capacity) {
numBytesPerValue = 0;
bufferSize = numBytesForValues(capacity);
this->capacity = capacity;
buffer = std::make_unique<uint8_t[]>(bufferSize);
if (nullChunk) {
static_cast<BoolColumnChunk*>(nullChunk.get())->initialize(capacity);
}
}

void BoolColumnChunk::resize(uint64_t capacity) {
auto numBytesAfterResize = numBytesForValues(capacity);
assert(numBytesAfterResize > bufferSize);
auto reservedBuffer = std::make_unique<uint8_t[]>(numBytesAfterResize);
memset(reservedBuffer.get(), 0 /* non null */, numBytesAfterResize);
memcpy(reservedBuffer.get(), buffer.get(), bufferSize);
buffer = std::move(reservedBuffer);
bufferSize = numBytesAfterResize;
this->capacity = numValues;
if (nullChunk) {
nullChunk->resize(capacity);
}
}

void ColumnChunk::update(ValueVector* vector, vector_idx_t vectorIdx) {
auto startOffsetInChunk = vectorIdx << DEFAULT_VECTOR_CAPACITY_LOG_2;
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
Expand Down
Loading

0 comments on commit abca5e6

Please sign in to comment.