Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPY loader fix #1571

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/include/storage/in_mem_storage_structure/in_mem_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class InMemColumn {
return std::make_unique<InMemColumnChunkWithOverflow>(
dataType, startNodeOffset, endNodeOffset, copyDescription, inMemOverflowFile.get());
}
case common::LogicalTypeID::FIXED_LIST: {
return std::make_unique<InMemFixedListColumnChunk>(
dataType, startNodeOffset, endNodeOffset, copyDescription);
}
case common::LogicalTypeID::STRUCT: {
auto inMemStructColumnChunk = std::make_unique<InMemStructColumnChunk>(
dataType, startNodeOffset, endNodeOffset, copyDescription);
Expand Down
22 changes: 20 additions & 2 deletions src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class InMemColumnChunk {
return ((T*)buffer.get())[pos];
}

void setValue(uint8_t* val, common::offset_t pos);
void setValueAtPos(uint8_t* val, common::offset_t pos);

inline bool isNull(common::offset_t pos) const {
assert(nullChunk);
Expand All @@ -45,7 +45,7 @@ class InMemColumnChunk {
inline uint64_t getNumBytes() const { return numBytes; }
inline InMemColumnChunk* getNullChunk() { return nullChunk.get(); }
virtual void copyArrowArray(arrow::Array& arrowArray);
void flush(common::FileInfo* walFileInfo);
virtual void flush(common::FileInfo* walFileInfo);

template<typename T>
void templateCopyValuesToPage(arrow::Array& array);
Expand All @@ -65,6 +65,10 @@ class InMemColumnChunk {
}

private:
inline virtual common::offset_t getOffsetInBuffer(common::offset_t pos) {
return pos * numBytesPerValue;
}

static uint32_t getDataTypeSizeInColumn(common::LogicalType& dataType);

protected:
Expand Down Expand Up @@ -132,6 +136,20 @@ class InMemStructColumnChunk : public InMemColumnChunk {
std::vector<std::unique_ptr<InMemColumnChunk>> fieldChunks;
};

class InMemFixedListColumnChunk : public InMemColumnChunk {
public:
InMemFixedListColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, const common::CopyDescription* copyDescription);

void flush(common::FileInfo* walFileInfo) override;

private:
common::offset_t getOffsetInBuffer(common::offset_t pos) override;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this function need to be public?


private:
uint64_t numElementsInAPage;
};

template<>
void InMemColumnChunk::templateCopyValuesToPage<bool>(arrow::Array& array);
template<>
Expand Down
9 changes: 8 additions & 1 deletion src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,18 @@ struct PageUtils {
// This function returns the page pageIdx of the page where element will be found and the pos of
// the element in the page as the offset.
static inline PageElementCursor getPageElementCursorForPos(
const uint64_t& elementPos, const uint32_t numElementsPerPage) {
uint64_t elementPos, uint32_t numElementsPerPage) {
assert((elementPos / numElementsPerPage) < UINT32_MAX);
return PageElementCursor{(common::page_idx_t)(elementPos / numElementsPerPage),
(uint16_t)(elementPos % numElementsPerPage)};
}

static inline PageByteCursor getPageByteCursorForPos(
uint64_t elementPos, uint32_t numElementsPerPage, uint64_t elementSize) {
assert((elementPos / numElementsPerPage) < UINT32_MAX);
return PageByteCursor{(common::page_idx_t)(elementPos / numElementsPerPage),
(uint16_t)(elementPos % numElementsPerPage * elementSize)};
}
};

class StorageUtils {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ void NPYNodeCopier::executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) {
auto endNodeOffset = morsel->nodeOffset + morsel->numNodes - 1;
// For NPY files, we can only read one column at a time.
std::vector<std::unique_ptr<InMemColumnChunk>> columnChunks(1);
columnChunks[0] = std::make_unique<InMemColumnChunk>(
properties[columnToCopy].dataType, morsel->nodeOffset, endNodeOffset, &copyDesc);
columnChunks[0] =
columns[columnToCopy]->getInMemColumnChunk(morsel->nodeOffset, endNodeOffset, &copyDesc);
for (auto i = 0u; i < morsel->numNodes; i++) {
columnChunks[0]->setValue(reader->getPointerToRow(morsel->nodeOffset + i), i);
columnChunks[0]->setValueAtPos(reader->getPointerToRow(morsel->nodeOffset + i), i);
}
flushChunksAndPopulatePKIndex(columnChunks, morsel->nodeOffset, endNodeOffset, columnToCopy);
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/copier/rel_copy_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ void RelCopyExecutor::populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t bloc
copier->catalog.getReadOnlyVersion()->getTableName(tableID),
RelDataDirectionUtils::relDataDirectionToString(relDirection)));
}
copier->adjColumnChunksPerDirection[relDirection]->setValue(
copier->adjColumnChunksPerDirection[relDirection]->setValueAtPos(
(uint8_t*)&nodeIDs[!relDirection].offset, nodeOffset);
} else {
InMemListsUtils::incrementListSize(
Expand Down Expand Up @@ -769,7 +769,7 @@ void RelCopyExecutor::putValueIntoColumns(uint64_t propertyID,
auto propertyColumnChunk =
directionTablePropertyColumnChunks[relDirection][propertyID].get();
auto nodeOffset = nodeIDs[relDirection].offset;
propertyColumnChunk->setValue(val, nodeOffset);
propertyColumnChunk->setValueAtPos(val, nodeOffset);
}
}

Expand Down
42 changes: 40 additions & 2 deletions src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ InMemColumnChunk::InMemColumnChunk(LogicalType dataType, offset_t startNodeOffse
}
}

void InMemColumnChunk::setValue(uint8_t* val, offset_t pos) {
memcpy(buffer.get() + (pos * numBytesPerValue), val, numBytesPerValue);
void InMemColumnChunk::setValueAtPos(uint8_t* val, common::offset_t pos) {
memcpy(buffer.get() + getOffsetInBuffer(pos), val, numBytesPerValue);
nullChunk->setValue(false, pos);
}

Expand Down Expand Up @@ -392,6 +392,44 @@ std::string InMemStructColumnChunk::parseStructFieldValue(
}
}

InMemFixedListColumnChunk::InMemFixedListColumnChunk(common::LogicalType dataType,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
const common::CopyDescription* copyDescription)
: InMemColumnChunk{std::move(dataType), startNodeOffset, endNodeOffset, copyDescription} {
numElementsInAPage = PageUtils::getNumElementsInAPage(numBytesPerValue, false /* hasNull */);
auto startNodeOffsetCursor =
PageUtils::getPageByteCursorForPos(startNodeOffset, numElementsInAPage, numBytesPerValue);
auto endNodeOffsetCursor =
PageUtils::getPageByteCursorForPos(endNodeOffset, numElementsInAPage, numBytesPerValue);
numBytes = (endNodeOffsetCursor.pageIdx - startNodeOffsetCursor.pageIdx) *
common::BufferPoolConstants::PAGE_4KB_SIZE +
endNodeOffsetCursor.offsetInPage - startNodeOffsetCursor.offsetInPage +
numBytesPerValue;
buffer = std::make_unique<uint8_t[]>(numBytes);
}

void InMemFixedListColumnChunk::flush(common::FileInfo* walFileInfo) {
if (numBytes > 0) {
auto pageByteCursor = PageUtils::getPageByteCursorForPos(
startNodeOffset, numElementsInAPage, numBytesPerValue);
auto startFileOffset = pageByteCursor.pageIdx * common::BufferPoolConstants::PAGE_4KB_SIZE +
pageByteCursor.offsetInPage;
common::FileUtils::writeToFile(walFileInfo, buffer.get(), numBytes, startFileOffset);
}
}

common::offset_t InMemFixedListColumnChunk::getOffsetInBuffer(common::offset_t pos) {
auto posCursor = PageUtils::getPageByteCursorForPos(
pos + startNodeOffset, numElementsInAPage, numBytesPerValue);
auto startNodeCursor =
PageUtils::getPageByteCursorForPos(startNodeOffset, numElementsInAPage, numBytesPerValue);
auto offsetInBuffer =
(posCursor.pageIdx - startNodeCursor.pageIdx) * common::BufferPoolConstants::PAGE_4KB_SIZE +
posCursor.offsetInPage - startNodeCursor.offsetInPage;
assert(offsetInBuffer + numBytesPerValue <= numBytes);
return offsetInBuffer;
}

// Bool
template<>
void InMemColumnChunk::setValueFromString<bool>(const char* value, uint64_t length, uint64_t pos) {
Expand Down
61 changes: 30 additions & 31 deletions test/copy/copy_npy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,37 +222,36 @@ TEST_F(CopyThreeDimensionalNpyTest, CopyThreeDimensionalNpyIntoTwoDimensionaTest
ASSERT_EQ(listVal[11]->val.int16Val, 24);
}

// TEST_F(CopyLargeNpyTest, CopyLargeNpyTest) {
// auto storageManager = getStorageManager(*database);
// auto catalog = getCatalog(*database);
// auto tableID = catalog->getReadOnlyVersion()->getTableID("npytable");
// auto property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "id");
// auto col = storageManager->getNodesStore().getNodePropertyColumn(tableID,
// property.propertyID); for (size_t i = 0; i < 20000; ++i) {
// ASSERT_EQ(col->readValueForTestingOnly(i).val.int64Val, i);
// }
// property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "f32");
// col = storageManager->getNodesStore().getNodePropertyColumn(tableID, property.propertyID);
// for (auto rowIdx = 0u; rowIdx < 20000; rowIdx++) {
// auto row = col->readValueForTestingOnly(rowIdx);
// for (auto colIdx = 0u; colIdx < 10; colIdx++) {
// if (row.nestedTypeVal[colIdx]->val.floatVal != (float)(rowIdx * 10 + colIdx)) {
// std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
// }
// ASSERT_EQ(row.nestedTypeVal[colIdx]->val.floatVal, (float)(rowIdx * 10 + colIdx));
// }
// }
// for (auto i = 0u; i < 200000; ++i) {
// size_t rowIdx = i / 10;
// size_t colIdx = i % 10;
// if (col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal != (float)i)
// {
// std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
// }
// ASSERT_EQ(
// col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal, (float)i);
// }
//}
TEST_F(CopyLargeNpyTest, CopyLargeNpyTest) {
auto storageManager = getStorageManager(*database);
auto catalog = getCatalog(*database);
auto tableID = catalog->getReadOnlyVersion()->getTableID("npytable");
auto property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "id");
auto col = storageManager->getNodesStore().getNodePropertyColumn(tableID, property.propertyID);
for (size_t i = 0; i < 20000; ++i) {
ASSERT_EQ(col->readValueForTestingOnly(i).val.int64Val, i);
}
property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "f32");
col = storageManager->getNodesStore().getNodePropertyColumn(tableID, property.propertyID);
for (auto rowIdx = 0u; rowIdx < 20000; rowIdx++) {
auto row = col->readValueForTestingOnly(rowIdx);
for (auto colIdx = 0u; colIdx < 10; colIdx++) {
if (row.nestedTypeVal[colIdx]->val.floatVal != (float)(rowIdx * 10 + colIdx)) {
std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
}
ASSERT_EQ(row.nestedTypeVal[colIdx]->val.floatVal, (float)(rowIdx * 10 + colIdx));
}
}
for (size_t i = 0; i < 200000; ++i) {
size_t rowIdx = i / 10;
size_t colIdx = i % 10;
if (col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal != (float)i) {
std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
}
ASSERT_EQ(
col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal, (float)i);
}
}

TEST_F(CopyNpyFaultTest, CopyNpyInsufficientNumberOfProperties) {
conn->query("create node table npytable (id INT64,i64 INT64[12],PRIMARY KEY(id));");
Expand Down
Loading