Skip to content

Commit

Permalink
Fix npy loader
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed May 29, 2023
1 parent dba7110 commit ca14982
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 41 deletions.
4 changes: 4 additions & 0 deletions src/include/storage/in_mem_storage_structure/in_mem_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ class InMemColumn {
return std::make_unique<InMemColumnChunkWithOverflow>(
dataType, startNodeOffset, endNodeOffset, copyDescription, inMemOverflowFile.get());
}
case common::LogicalTypeID::FIXED_LIST: {
return std::make_unique<InMemFixedListColumnChunk>(
dataType, startNodeOffset, endNodeOffset, copyDescription);
}
case common::LogicalTypeID::STRUCT: {
auto inMemStructColumnChunk = std::make_unique<InMemStructColumnChunk>(
dataType, startNodeOffset, endNodeOffset, copyDescription);
Expand Down
22 changes: 20 additions & 2 deletions src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class InMemColumnChunk {
return ((T*)buffer.get())[pos];
}

void setValue(uint8_t* val, common::offset_t pos);
void setValueAtPos(uint8_t* val, common::offset_t pos);

inline bool isNull(common::offset_t pos) const {
assert(nullChunk);
Expand All @@ -45,7 +45,7 @@ class InMemColumnChunk {
inline uint64_t getNumBytes() const { return numBytes; }
inline InMemColumnChunk* getNullChunk() { return nullChunk.get(); }
virtual void copyArrowArray(arrow::Array& arrowArray);
void flush(common::FileInfo* walFileInfo);
virtual void flush(common::FileInfo* walFileInfo);

template<typename T>
void templateCopyValuesToPage(arrow::Array& array);
Expand All @@ -65,6 +65,10 @@ class InMemColumnChunk {
}

private:
inline virtual common::offset_t getOffsetInBuffer(common::offset_t pos) {
return pos * numBytesPerValue;
}

static uint32_t getDataTypeSizeInColumn(common::LogicalType& dataType);

protected:
Expand Down Expand Up @@ -132,6 +136,20 @@ class InMemStructColumnChunk : public InMemColumnChunk {
std::vector<std::unique_ptr<InMemColumnChunk>> fieldChunks;
};

class InMemFixedListColumnChunk : public InMemColumnChunk {
public:
InMemFixedListColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, const common::CopyDescription* copyDescription);

void flush(common::FileInfo* walFileInfo) override;

private:
common::offset_t getOffsetInBuffer(common::offset_t pos) override;

private:
uint64_t numElementsInAPage;
};

template<>
void InMemColumnChunk::templateCopyValuesToPage<bool>(arrow::Array& array);
template<>
Expand Down
9 changes: 8 additions & 1 deletion src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,18 @@ struct PageUtils {
// This function returns the page pageIdx of the page where element will be found and the pos of
// the element in the page as the offset.
static inline PageElementCursor getPageElementCursorForPos(
const uint64_t& elementPos, const uint32_t numElementsPerPage) {
uint64_t elementPos, uint32_t numElementsPerPage) {
assert((elementPos / numElementsPerPage) < UINT32_MAX);
return PageElementCursor{(common::page_idx_t)(elementPos / numElementsPerPage),
(uint16_t)(elementPos % numElementsPerPage)};
}

static inline PageByteCursor getPageByteCursorForPos(
uint64_t elementPos, uint32_t numElementsPerPage, uint64_t elementSize) {
assert((elementPos / numElementsPerPage) < UINT32_MAX);
return PageByteCursor{(common::page_idx_t)(elementPos / numElementsPerPage),
(uint16_t)(elementPos % numElementsPerPage * elementSize)};
}
};

class StorageUtils {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ void NPYNodeCopier::executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) {
auto endNodeOffset = morsel->nodeOffset + morsel->numNodes - 1;
// For NPY files, we can only read one column at a time.
std::vector<std::unique_ptr<InMemColumnChunk>> columnChunks(1);
columnChunks[0] = std::make_unique<InMemColumnChunk>(
properties[columnToCopy].dataType, morsel->nodeOffset, endNodeOffset, &copyDesc);
columnChunks[0] =
columns[columnToCopy]->getInMemColumnChunk(morsel->nodeOffset, endNodeOffset, &copyDesc);
for (auto i = 0u; i < morsel->numNodes; i++) {
columnChunks[0]->setValue(reader->getPointerToRow(morsel->nodeOffset + i), i);
columnChunks[0]->setValueAtPos(reader->getPointerToRow(morsel->nodeOffset + i), i);
}
flushChunksAndPopulatePKIndex(columnChunks, morsel->nodeOffset, endNodeOffset, columnToCopy);
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/copier/rel_copy_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ void RelCopyExecutor::populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t bloc
copier->catalog.getReadOnlyVersion()->getTableName(tableID),
RelDataDirectionUtils::relDataDirectionToString(relDirection)));
}
copier->adjColumnChunksPerDirection[relDirection]->setValue(
copier->adjColumnChunksPerDirection[relDirection]->setValueAtPos(
(uint8_t*)&nodeIDs[!relDirection].offset, nodeOffset);
} else {
InMemListsUtils::incrementListSize(
Expand Down Expand Up @@ -769,7 +769,7 @@ void RelCopyExecutor::putValueIntoColumns(uint64_t propertyID,
auto propertyColumnChunk =
directionTablePropertyColumnChunks[relDirection][propertyID].get();
auto nodeOffset = nodeIDs[relDirection].offset;
propertyColumnChunk->setValue(val, nodeOffset);
propertyColumnChunk->setValueAtPos(val, nodeOffset);
}
}

Expand Down
42 changes: 40 additions & 2 deletions src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ InMemColumnChunk::InMemColumnChunk(LogicalType dataType, offset_t startNodeOffse
}
}

void InMemColumnChunk::setValue(uint8_t* val, offset_t pos) {
memcpy(buffer.get() + (pos * numBytesPerValue), val, numBytesPerValue);
void InMemColumnChunk::setValueAtPos(uint8_t* val, common::offset_t pos) {
memcpy(buffer.get() + getOffsetInBuffer(pos), val, numBytesPerValue);
nullChunk->setValue(false, pos);
}

Expand Down Expand Up @@ -392,6 +392,44 @@ std::string InMemStructColumnChunk::parseStructFieldValue(
}
}

InMemFixedListColumnChunk::InMemFixedListColumnChunk(common::LogicalType dataType,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
const common::CopyDescription* copyDescription)
: InMemColumnChunk{std::move(dataType), startNodeOffset, endNodeOffset, copyDescription} {
numElementsInAPage = PageUtils::getNumElementsInAPage(numBytesPerValue, false /* hasNull */);
auto startNodeOffsetCursor =
PageUtils::getPageByteCursorForPos(startNodeOffset, numElementsInAPage, numBytesPerValue);
auto endNodeOffsetCursor =
PageUtils::getPageByteCursorForPos(endNodeOffset, numElementsInAPage, numBytesPerValue);
numBytes = (endNodeOffsetCursor.pageIdx - startNodeOffsetCursor.pageIdx) *
common::BufferPoolConstants::PAGE_4KB_SIZE +
endNodeOffsetCursor.offsetInPage - startNodeOffsetCursor.offsetInPage +
numBytesPerValue;
buffer = std::make_unique<uint8_t[]>(numBytes);
}

void InMemFixedListColumnChunk::flush(common::FileInfo* walFileInfo) {
if (numBytes > 0) {
auto pageByteCursor = PageUtils::getPageByteCursorForPos(
startNodeOffset, numElementsInAPage, numBytesPerValue);
auto startFileOffset = pageByteCursor.pageIdx * common::BufferPoolConstants::PAGE_4KB_SIZE +
pageByteCursor.offsetInPage;
common::FileUtils::writeToFile(walFileInfo, buffer.get(), numBytes, startFileOffset);
}
}

common::offset_t InMemFixedListColumnChunk::getOffsetInBuffer(common::offset_t pos) {
auto posCursor = PageUtils::getPageByteCursorForPos(
pos + startNodeOffset, numElementsInAPage, numBytesPerValue);
auto startNodeCursor =
PageUtils::getPageByteCursorForPos(startNodeOffset, numElementsInAPage, numBytesPerValue);
auto offsetInBuffer =
(posCursor.pageIdx - startNodeCursor.pageIdx) * common::BufferPoolConstants::PAGE_4KB_SIZE +
posCursor.offsetInPage - startNodeCursor.offsetInPage;
assert(offsetInBuffer + numBytesPerValue <= numBytes);
return offsetInBuffer;
}

// Bool
template<>
void InMemColumnChunk::setValueFromString<bool>(const char* value, uint64_t length, uint64_t pos) {
Expand Down
61 changes: 30 additions & 31 deletions test/copy/copy_npy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,37 +222,36 @@ TEST_F(CopyThreeDimensionalNpyTest, CopyThreeDimensionalNpyIntoTwoDimensionaTest
ASSERT_EQ(listVal[11]->val.int16Val, 24);
}

// TEST_F(CopyLargeNpyTest, CopyLargeNpyTest) {
// auto storageManager = getStorageManager(*database);
// auto catalog = getCatalog(*database);
// auto tableID = catalog->getReadOnlyVersion()->getTableID("npytable");
// auto property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "id");
// auto col = storageManager->getNodesStore().getNodePropertyColumn(tableID,
// property.propertyID); for (size_t i = 0; i < 20000; ++i) {
// ASSERT_EQ(col->readValueForTestingOnly(i).val.int64Val, i);
// }
// property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "f32");
// col = storageManager->getNodesStore().getNodePropertyColumn(tableID, property.propertyID);
// for (auto rowIdx = 0u; rowIdx < 20000; rowIdx++) {
// auto row = col->readValueForTestingOnly(rowIdx);
// for (auto colIdx = 0u; colIdx < 10; colIdx++) {
// if (row.nestedTypeVal[colIdx]->val.floatVal != (float)(rowIdx * 10 + colIdx)) {
// std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
// }
// ASSERT_EQ(row.nestedTypeVal[colIdx]->val.floatVal, (float)(rowIdx * 10 + colIdx));
// }
// }
// for (size_t i = 0; i < 200000; ++i) {
// size_t rowIdx = i / 10;
// size_t colIdx = i % 10;
// if (col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal != (float)i)
// {
// std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
// }
// ASSERT_EQ(
// col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal, (float)i);
// }
//}
TEST_F(CopyLargeNpyTest, CopyLargeNpyTest) {
auto storageManager = getStorageManager(*database);
auto catalog = getCatalog(*database);
auto tableID = catalog->getReadOnlyVersion()->getTableID("npytable");
auto property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "id");
auto col = storageManager->getNodesStore().getNodePropertyColumn(tableID, property.propertyID);
for (size_t i = 0; i < 20000; ++i) {
ASSERT_EQ(col->readValueForTestingOnly(i).val.int64Val, i);
}
property = catalog->getReadOnlyVersion()->getNodeProperty(tableID, "f32");
col = storageManager->getNodesStore().getNodePropertyColumn(tableID, property.propertyID);
for (auto rowIdx = 0u; rowIdx < 20000; rowIdx++) {
auto row = col->readValueForTestingOnly(rowIdx);
for (auto colIdx = 0u; colIdx < 10; colIdx++) {
if (row.nestedTypeVal[colIdx]->val.floatVal != (float)(rowIdx * 10 + colIdx)) {
std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
}
ASSERT_EQ(row.nestedTypeVal[colIdx]->val.floatVal, (float)(rowIdx * 10 + colIdx));
}
}
for (size_t i = 0; i < 200000; ++i) {
size_t rowIdx = i / 10;
size_t colIdx = i % 10;
if (col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal != (float)i) {
std::cout << "rowIdx: " << rowIdx << " colIdx: " << colIdx << std::endl;
}
ASSERT_EQ(
col->readValueForTestingOnly(rowIdx).nestedTypeVal[colIdx]->val.floatVal, (float)i);
}
}

TEST_F(CopyNpyFaultTest, CopyNpyInsufficientNumberOfProperties) {
conn->query("create node table npytable (id INT64,i64 INT64[12],PRIMARY KEY(id));");
Expand Down

0 comments on commit ca14982

Please sign in to comment.