Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Aug 2, 2023
1 parent 11f3390 commit cbc66db
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 171 deletions.
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
return listEntry;
}

void ListAuxiliaryBuffer::reserve(uint64_t numValues) {
void ListAuxiliaryBuffer::resize(uint64_t numValues) {
if (numValues <= capacity) {
size = numValues;
return;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {

inline void resetSize() { size = 0; }

void reserve(uint64_t numValues);
void resize(uint64_t numValues);

private:
void resizeDataVector(ValueVector* dataVector);
Expand Down
4 changes: 2 additions & 2 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ class ListVector {
return reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->addList(listSize);
}
static inline void reserveDataVector(ValueVector* vector, uint64_t numValues) {
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->reserve(numValues);
static inline void resizeDataVector(ValueVector* vector, uint64_t numValues) {
reinterpret_cast<ListAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->resize(numValues);
}

static void copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData);
Expand Down
30 changes: 8 additions & 22 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ class ColumnChunk {
common::BufferPoolConstants::PAGE_4KB_SIZE;
}

inline uint64_t getNumElements() const { return numElements; }
virtual void writeVal(const common::Value& val, uint64_t posToWrite);

virtual void appendVal(const common::Value& val);

virtual void reserve(uint64_t numValues);
inline uint64_t getNumBytesPerValue() const { return numBytesPerValue; }

protected:
ColumnChunk(common::LogicalType dataType, common::offset_t numValues,
Expand All @@ -93,21 +91,19 @@ class ColumnChunk {

common::offset_t getOffsetInBuffer(common::offset_t pos) const;

private:
void reserveForBuffer(uint64_t numValues);

public:
std::unique_ptr<uint8_t[]> buffer;

private:
virtual void resize(uint64_t numBytesToResize);

protected:
common::LogicalType dataType;
uint32_t numBytesPerValue;
uint64_t numBytes;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
const common::CopyDescription* copyDescription;
uint64_t numElements = 0;
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE;
};

class NullColumnChunk : public ColumnChunk {
Expand All @@ -118,22 +114,12 @@ class NullColumnChunk : public ColumnChunk {
resetNullBuffer();
}

void reserve(uint64_t numValues) override {
while (capacity < numValues) {
capacity *= 2;
}
auto reservedNumBytes = capacity * numBytesPerValue;
auto reservedBuffer = std::make_unique<uint8_t[]>(reservedNumBytes);
memset(reservedBuffer.get(), 0 /* non null */, reservedNumBytes);
memcpy(reservedBuffer.get(), buffer.get(), numBytes);
buffer = std::move(reservedBuffer);
numBytes = reservedNumBytes;
}

inline void resetNullBuffer() { memset(buffer.get(), 0 /* non null */, numBytes); }

inline bool isNull(common::offset_t pos) const { return getValue<bool>(pos); }
inline void setNull(common::offset_t pos, bool isNull) { ((bool*)buffer.get())[pos] = isNull; }

void resize(uint64_t numBytesToResize) final;
};

class FixedListColumnChunk : public ColumnChunk {
Expand All @@ -144,7 +130,7 @@ class FixedListColumnChunk : public ColumnChunk {
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void appendVal(const common::Value& listVal) final;
void writeVal(const common::Value& fixedListVal, uint64_t posToWrite) final;
};

struct ColumnChunkFactory {
Expand Down
14 changes: 9 additions & 5 deletions src/include/storage/copier/list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ListColumnChunk : public ColumnChunk {
public:
ListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);

inline ColumnChunk* getDataColumnChunk() const { return dataColumnChunk.get(); }
inline ColumnChunk* getDataColumnChunk() const { return dataChunk.get(); }

void setValueFromString(const char* value, uint64_t length, uint64_t pos);

Expand All @@ -25,14 +25,18 @@ class ListColumnChunk : public ColumnChunk {
void copyListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);

void appendVal(const common::Value& listVal) override;
void writeVal(const common::Value& listVal, uint64_t posToWrite) override;

common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + dataColumnChunk->getNumPages();
inline common::page_idx_t getNumPages() const final {
return ColumnChunk::getNumPages() + dataChunk->getNumPages();
}

void resizeDataChunk(uint64_t numValues);

private:
std::unique_ptr<ColumnChunk> dataColumnChunk;
std::unique_ptr<ColumnChunk> dataChunk;
uint64_t numValuesInDataChunk;
uint64_t capacityInDataChunk;
};

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/copier/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class StructColumnChunk : public ColumnChunk {
common::LogicalType& type, const std::string& structString);
static std::string parseStructFieldName(const std::string& structString, uint64_t& curPos);
std::string parseStructFieldValue(const std::string& structString, uint64_t& curPos);
void appendVal(const common::Value& listVal) final;
void writeVal(const common::Value& val, uint64_t posToWrite) final;
};

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/copier/var_sized_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class VarSizedColumnChunk : public ColumnChunk {
void appendVarListColumnChunk(VarSizedColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend);

void appendVal(const common::Value& val) override;
void writeVal(const common::Value& val, uint64_t posToWrite) override;

private:
std::unique_ptr<InMemOverflowFile> overflowFile;
Expand Down
30 changes: 28 additions & 2 deletions src/include/storage/store/list_node_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,42 @@ class ListNodeColumn : public NodeColumn {
ColumnChunk* columnChunk, common::page_idx_t startPageIdx, uint64_t nodeGroupIdx) override;

private:
void scanUnfiltered(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* offsetVector, common::ValueVector* resultVector);

void scanFiltered(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* offsetVector, common::ValueVector* resultVector);

void checkpointInMemory() override;

void rollbackInMemory() override;

void scanWithOffsets(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInGroup,
common::offset_t endOffsetInGroup, common::ValueVector* resultVector,
uint64_t startPosInVector) override;
uint64_t posToWriteListEntry) override;

common::offset_t readOffset(transaction::Transaction* transaction, common::offset_t valuePos);

common::offset_t getOffset(transaction::Transaction* transaction, common::offset_t nodeOffset);
void scanListOffset(transaction::Transaction* transaction, common::offset_t startNodeOffset,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* offsetVector);

inline common::offset_t readListOffsetInStorage(
transaction::Transaction* transaction, common::offset_t nodeOffset) {
return nodeOffset == 0 ? 0 : readOffset(transaction, nodeOffset - 1);
}

inline common::offset_t getListOffsetInStorage(transaction::Transaction* transaction,
common::ValueVector* offsetVector, common::offset_t nodeOffset, uint64_t nodePos) {
return nodePos == 0 ? readListOffsetInStorage(transaction, nodeOffset) :
offsetVector->getValue<common::offset_t>(nodePos - 1);
}

inline uint64_t getListLength(transaction::Transaction* transaction,
common::ValueVector* offsetVector, common::offset_t nodeOffset, uint64_t nodePos) {
return getListOffsetInStorage(transaction, offsetVector, nodeOffset + 1, nodePos + 1) -
getListOffsetInStorage(transaction, offsetVector, nodeOffset, nodePos);
}

private:
std::unique_ptr<NodeColumn> dataNodeColumn;
Expand Down
96 changes: 60 additions & 36 deletions src/storage/copier/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,6 @@ void ColumnChunk::append(ColumnChunk* other, common::offset_t startPosInOtherChu
numValuesToAppend * numBytesPerValue);
}

void ColumnChunk::reserve(uint64_t numValues) {
if (numValues <= capacity) {
return;
}
while (capacity < numValues) {
capacity *= 2;
}
auto reservedNumBytes = capacity * numBytesPerValue;
auto reservedBuffer = std::make_unique<uint8_t[]>(reservedNumBytes);
memcpy(reservedBuffer.get(), buffer.get(), numBytes);
numBytes = reservedNumBytes;
buffer = std::move(reservedBuffer);
if (nullChunk) {
nullChunk->reserve(numValues);
}
for (auto& child : childrenChunks) {
child->reserve(numValues);
}
}

void ColumnChunk::append(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
switch (array->type_id()) {
Expand Down Expand Up @@ -126,35 +106,35 @@ void ColumnChunk::append(
}
}

void ColumnChunk::appendVal(const common::Value& val) {
nullChunk->setNull(numElements, val.isNull());
void ColumnChunk::writeVal(const common::Value& val, uint64_t posToWrite) {
nullChunk->setNull(posToWrite, val.isNull());
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::BOOL: {
setValue(val.getValue<bool>(), numElements++);
setValue(val.getValue<bool>(), posToWrite);
} break;
case PhysicalTypeID::INT64: {
setValue(val.getValue<int64_t>(), numElements++);
setValue(val.getValue<int64_t>(), posToWrite);
} break;
case PhysicalTypeID::INT32: {
setValue(val.getValue<int32_t>(), numElements++);
setValue(val.getValue<int32_t>(), posToWrite);
} break;
case PhysicalTypeID::INT16: {
setValue(val.getValue<int16_t>(), numElements++);
setValue(val.getValue<int16_t>(), posToWrite);
} break;
case PhysicalTypeID::DOUBLE: {
setValue(val.getValue<double_t>(), numElements++);
setValue(val.getValue<double_t>(), posToWrite);
} break;
case PhysicalTypeID::FLOAT: {
setValue(val.getValue<float_t>(), numElements++);
setValue(val.getValue<float_t>(), posToWrite);
} break;
case PhysicalTypeID::INTERVAL: {
setValue(val.getValue<interval_t>(), numElements++);
setValue(val.getValue<interval_t>(), posToWrite);
} break;

Check warning on line 132 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L132

Added line #L132 was not covered by tests
case PhysicalTypeID::INTERNAL_ID: {
setValue(val.getValue<internalID_t>(), numElements++);
setValue(val.getValue<internalID_t>(), posToWrite);
} break;
default: {
throw NotImplementedException{"ColumnChunk::appendVal"};
throw NotImplementedException{"ColumnChunk::writeVal"};

Check warning on line 137 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L135-L137

Added lines #L135 - L137 were not covered by tests
}
}
}
Expand Down Expand Up @@ -307,11 +287,34 @@ void FixedListColumnChunk::append(ColumnChunk* other, common::offset_t startPosI
}
}

void FixedListColumnChunk::appendVal(const common::Value& listVal) {
assert(listVal.getDataType()->getPhysicalType() == PhysicalTypeID::FIXED_LIST);
auto numElements = NestedVal::getChildrenSize(&listVal);
for (auto i = 0u; i < numElements; i++) {
throw NotImplementedException{"here"};
void FixedListColumnChunk::writeVal(const common::Value& fixedListVal, uint64_t posToWrite) {

Check warning on line 290 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L290

Added line #L290 was not covered by tests
assert(fixedListVal.getDataType()->getPhysicalType() == PhysicalTypeID::FIXED_LIST);
auto numValues = NestedVal::getChildrenSize(&fixedListVal);
auto numBytesPerValueInList = getDataTypeSizeInChunk(*fixedListVal.getDataType());
auto bufferToWrite = buffer.get() + posToWrite * numBytesPerValue;
for (auto i = 0u; i < numValues; i++) {
auto val = common::NestedVal::getChildVal(&fixedListVal, i);
switch (fixedListVal.getDataType()->getPhysicalType()) {
case PhysicalTypeID::INT64: {
memcpy(bufferToWrite, &val->getValueReference<int64_t>(), numBytesPerValueInList);

Check warning on line 299 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L292-L299

Added lines #L292 - L299 were not covered by tests
} break;
case PhysicalTypeID::INT32: {
memcpy(bufferToWrite, &val->getValueReference<int32_t>(), numBytesPerValueInList);

Check warning on line 302 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L301-L302

Added lines #L301 - L302 were not covered by tests
} break;
case PhysicalTypeID::INT16: {
memcpy(bufferToWrite, &val->getValueReference<int16_t>(), numBytesPerValueInList);

Check warning on line 305 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L304-L305

Added lines #L304 - L305 were not covered by tests
} break;
case PhysicalTypeID::DOUBLE: {
memcpy(bufferToWrite, &val->getValueReference<double_t>(), numBytesPerValueInList);

Check warning on line 308 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L307-L308

Added lines #L307 - L308 were not covered by tests
} break;
case PhysicalTypeID::FLOAT: {
memcpy(bufferToWrite, &val->getValueReference<float_t>(), numBytesPerValueInList);

Check warning on line 311 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L310-L311

Added lines #L310 - L311 were not covered by tests
} break;
default: {
throw NotImplementedException{"FixedListColumnChunk::writeVal"};

Check warning on line 314 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L313-L314

Added lines #L313 - L314 were not covered by tests
}
}
bufferToWrite += numBytesPerValueInList;

Check warning on line 317 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L317

Added line #L317 was not covered by tests
}
}

Check warning on line 319 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L319

Added line #L319 was not covered by tests

Expand Down Expand Up @@ -394,5 +397,26 @@ common::offset_t ColumnChunk::getOffsetInBuffer(common::offset_t pos) const {
return offsetInBuffer;
}

void ColumnChunk::resize(uint64_t numBytesToResize) {

Check warning on line 400 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L400

Added line #L400 was not covered by tests
auto reservedBuffer = std::make_unique<uint8_t[]>(numBytesToResize);
memcpy(reservedBuffer.get(), buffer.get(), numBytes);
numBytes = numBytesToResize;

Check warning on line 403 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L402-L403

Added lines #L402 - L403 were not covered by tests
buffer = std::move(reservedBuffer);
if (nullChunk) {
nullChunk->resize(numBytesToResize);

Check warning on line 406 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L405-L406

Added lines #L405 - L406 were not covered by tests
}
for (auto& child : childrenChunks) {
child->resize(numBytesToResize);

Check warning on line 409 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L408-L409

Added lines #L408 - L409 were not covered by tests
}
}

Check warning on line 411 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L411

Added line #L411 was not covered by tests

void NullColumnChunk::resize(uint64_t numBytesToResize) {

Check warning on line 413 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L413

Added line #L413 was not covered by tests
auto reservedBuffer = std::make_unique<uint8_t[]>(numBytesToResize);
memset(reservedBuffer.get(), 0 /* non null */, numBytesToResize);
memcpy(reservedBuffer.get(), buffer.get(), numBytes);

Check warning on line 416 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L416

Added line #L416 was not covered by tests
buffer = std::move(reservedBuffer);
numBytes = numBytesToResize;
}

Check warning on line 419 in src/storage/copier/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/copier/column_chunk.cpp#L418-L419

Added lines #L418 - L419 were not covered by tests

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit cbc66db

Please sign in to comment.