diff --git a/src/common/types/types.cpp b/src/common/types/types.cpp index ef624dbc02..423c83bbbc 100644 --- a/src/common/types/types.cpp +++ b/src/common/types/types.cpp @@ -219,7 +219,6 @@ std::unique_ptr DataType::copy() { } ExtraTypeInfo* DataType::getExtraTypeInfo() const { - assert(typeID == VAR_LIST || typeID == FIXED_LIST || typeID == STRUCT); return extraTypeInfo.get(); } diff --git a/src/include/storage/copier/node_copier.h b/src/include/storage/copier/node_copier.h index 7856794997..f401623ae1 100644 --- a/src/include/storage/copier/node_copier.h +++ b/src/include/storage/copier/node_copier.h @@ -1,6 +1,6 @@ #pragma once -#include "storage/in_mem_storage_structure/node_in_mem_column.h" +#include "storage/in_mem_storage_structure/in_mem_node_column.h" #include "storage/index/hash_index_builder.h" #include "storage/store/nodes_statistics_and_deleted_ids.h" #include "table_copier.h" @@ -8,75 +8,40 @@ namespace kuzu { namespace storage { -using set_element_func_t = std::function; - -template class NodeCopyMorsel { - public: - static constexpr common::block_idx_t INVALID_BLOCK_IDX = -1ull; + static constexpr common::block_idx_t INVALID_BLOCK_IDX = UINT64_MAX; public: - NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx) - : startOffset{startOffset}, blockIdx{blockIdx} {}; + NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx, + std::shared_ptr recordBatch) + : startOffset{startOffset}, blockIdx{blockIdx}, recordBatch{std::move(recordBatch)} {}; virtual ~NodeCopyMorsel() = default; - virtual const std::vector>& getArrowColumns() = 0; - - bool success() { return blockIdx != INVALID_BLOCK_IDX; } + bool success() const { return blockIdx != INVALID_BLOCK_IDX; } public: common::offset_t startOffset; common::block_idx_t blockIdx; -}; - -class CSVNodeCopyMorsel : public NodeCopyMorsel { - -public: - CSVNodeCopyMorsel(std::shared_ptr recordBatch, common::offset_t startOffset, - common::block_idx_t blockIdx) - : NodeCopyMorsel{startOffset, blockIdx}, recordBatch{std::move(recordBatch)} {}; - - const std::vector>& getArrowColumns() override { - return recordBatch->columns(); - } - -private: std::shared_ptr recordBatch; }; -class ParquetNodeCopyMorsel : public NodeCopyMorsel { - -public: - ParquetNodeCopyMorsel(std::shared_ptr currTable, common::offset_t startOffset, - common::block_idx_t blockIdx) - : NodeCopyMorsel{startOffset, blockIdx}, currTable{std::move(currTable)} {}; - - const std::vector>& getArrowColumns() override { - return currTable->columns(); - } - -private: - std::shared_ptr currTable; -}; - -template +template class NodeCopySharedState { public: NodeCopySharedState( - std::string filePath, HashIndexBuilder* pkIndex, common::offset_t startOffset) + std::string filePath, HashIndexBuilder* pkIndex, common::offset_t startOffset) : filePath{std::move(filePath)}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {}; virtual ~NodeCopySharedState() = default; - virtual std::unique_ptr> getMorsel() = 0; + virtual std::unique_ptr getMorsel() = 0; public: std::string filePath; - HashIndexBuilder* pkIndex; + HashIndexBuilder* pkIndex; common::offset_t startOffset; protected: @@ -84,31 +49,31 @@ class NodeCopySharedState { std::mutex mtx; }; -template -class CSVNodeCopySharedState : public NodeCopySharedState { +template +class CSVNodeCopySharedState : public NodeCopySharedState { public: - CSVNodeCopySharedState(std::string filePath, HashIndexBuilder* pkIndex, + CSVNodeCopySharedState(std::string filePath, HashIndexBuilder* pkIndex, common::offset_t startOffset, std::shared_ptr csvStreamingReader) - : NodeCopySharedState{filePath, pkIndex, startOffset}, - csvStreamingReader{std::move(csvStreamingReader)} {}; - std::unique_ptr> getMorsel() override; + : NodeCopySharedState{filePath, pkIndex, startOffset}, csvStreamingReader{std::move( + csvStreamingReader)} {}; + std::unique_ptr getMorsel() override; private: std::shared_ptr csvStreamingReader; }; -template -class ParquetNodeCopySharedState : public NodeCopySharedState { +template +class ParquetNodeCopySharedState : public NodeCopySharedState { public: - ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder* pkIndex, + ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder* pkIndex, common::offset_t startOffset, uint64_t numBlocks, std::unique_ptr parquetReader) - : NodeCopySharedState{filePath, pkIndex, startOffset}, - numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {}; - std::unique_ptr> getMorsel() override; + : NodeCopySharedState{filePath, pkIndex, startOffset}, numBlocks{numBlocks}, + parquetReader{std::move(parquetReader)} {}; + std::unique_ptr getMorsel() override; public: uint64_t numBlocks; @@ -133,87 +98,38 @@ class NodeCopier : public TableCopier { void saveToFile() override; - template + template static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile, - common::NullMask* nullMask, HashIndexBuilder* pkIndex, - common::offset_t startOffset, uint64_t numValues); + common::NullMask* nullMask, HashIndexBuilder* pkIndex, common::offset_t startOffset, + uint64_t numValues); - std::unordered_map> columns; + std::unordered_map> columns; private: - template + template void populateColumns(processor::ExecutionContext* executionContext); - template + template void populateColumnsFromCSV(processor::ExecutionContext* executionContext, - std::unique_ptr>& pkIndex); + std::unique_ptr>& pkIndex); - template + template void populateColumnsFromParquet(processor::ExecutionContext* executionContext, - std::unique_ptr>& pkIndex); + std::unique_ptr>& pkIndex); - template - static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column, - std::shared_ptr arrowArray, common::offset_t startNodeOffset, - uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription, - PageByteCursor& overflowCursor); + static void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, InMemNodeColumn* column, + arrow::Array& arrowArray, common::offset_t startNodeOffset, + common::CopyDescription& copyDescription, PageByteCursor& overflowCursor); // Concurrent tasks. - template - static void batchPopulateColumnsTask(NodeCopySharedState* sharedState, - NodeCopier* copier, processor::ExecutionContext* executionContext); - - template - static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile, - common::offset_t offset, HashIndexBuilder* pkIndex) { - assert(false); - } - - static set_element_func_t getSetElementFunc(common::DataTypeID typeID, - common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor); - template - inline static void setNumericElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk, - common::offset_t nodeOffset, const std::string& data) { - auto val = common::TypeUtils::convertStringToNumber(data.c_str()); - column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast(&val)); - } - - inline static void setBoolElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk, - common::offset_t nodeOffset, const std::string& data) { - auto val = common::TypeUtils::convertToBoolean(data.c_str()); - column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast(&val)); - } + static void populateColumnChunksTask(NodeCopySharedState* sharedState, NodeCopier* copier, + processor::ExecutionContext* executionContext, spdlog::logger& logger); template - inline static void setTimeElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk, - common::offset_t nodeOffset, const std::string& data) { - auto val = T::FromCString(data.c_str(), data.length()); - column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast(&val)); - } - - inline static void setStringElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk, - common::offset_t nodeOffset, const std::string& data, PageByteCursor& overflowCursor) { - auto val = column->getInMemOverflowFile()->copyString( - data.substr(0, common::BufferPoolConstants::PAGE_4KB_SIZE).c_str(), overflowCursor); - column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast(&val)); - } - - inline static void setVarListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk, - common::offset_t nodeOffset, const std::string& data, - common::CopyDescription& copyDescription, PageByteCursor& overflowCursor) { - auto varListVal = - getArrowVarList(data, 1, data.length() - 2, column->getDataType(), copyDescription); - auto kuList = column->getInMemOverflowFile()->copyList(*varListVal, overflowCursor); - column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast(&kuList)); - } - - inline static void setFixedListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk, - common::offset_t nodeOffset, const std::string& data, - common::CopyDescription& copyDescription) { - auto fixedListVal = - getArrowFixedList(data, 1, data.length() - 2, column->getDataType(), copyDescription); - column->setElementInChunk(columnChunk, nodeOffset, fixedListVal.get()); + static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile, + common::offset_t offset, HashIndexBuilder* pkIndex) { + assert(false); } }; diff --git a/src/include/storage/copier/table_copier.h b/src/include/storage/copier/table_copier.h index ff199b07aa..4128550bb5 100644 --- a/src/include/storage/copier/table_copier.h +++ b/src/include/storage/copier/table_copier.h @@ -42,6 +42,14 @@ class TableCopier { uint64_t copy(processor::ExecutionContext* executionContext); + static void throwCopyExceptionIfNotOK(const arrow::Status& status); + + static std::unique_ptr getArrowVarList(const std::string& l, int64_t from, + int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription); + + static std::unique_ptr getArrowFixedList(const std::string& l, int64_t from, + int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription); + protected: virtual void initializeColumnsAndLists() = 0; @@ -64,24 +72,9 @@ class TableCopier { static std::vector> getListElementPos( const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription); - static std::unique_ptr getArrowVarList(const std::string& l, int64_t from, - int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription); - - static std::unique_ptr getArrowFixedList(const std::string& l, int64_t from, - int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription); - - static void throwCopyExceptionIfNotOK(const arrow::Status& status); - inline void updateTableStatistics() { tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows); } - inline uint64_t getNumBlocks() const { - uint64_t numBlocks = 0; - for (auto& [_, info] : fileBlockInfos) { - numBlocks += info.numBlocks; - } - return numBlocks; - } static std::shared_ptr toArrowDataType(const common::DataType& dataType); diff --git a/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h b/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h new file mode 100644 index 0000000000..69a045bfab --- /dev/null +++ b/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h @@ -0,0 +1,139 @@ +#pragma once + +#include "arrow/array/array_base.h" +#include "arrow/array/array_binary.h" +#include "arrow/array/array_primitive.h" +#include "arrow/scalar.h" +#include "common/types/types.h" +#include "storage/copier/table_copier.h" +#include "storage/storage_structure/in_mem_file.h" + +namespace kuzu { +namespace storage { + +class InMemColumnChunk { +public: + InMemColumnChunk(common::DataType dataType, common::offset_t startOffset, + common::offset_t endOffset, uint16_t numBytesForElement, uint64_t numElementsInAPage); + + inline uint8_t* getPage(common::page_idx_t pageIdx) { + assert(pageIdx <= endPageIdx && pageIdx >= startPageIdx); + auto pageIdxInSet = pageIdx - startPageIdx; + return pages.get() + (pageIdxInSet * common::BufferPoolConstants::PAGE_4KB_SIZE); + } + inline void copyValue( + common::page_idx_t pageIdx, common::offset_t posInPage, const uint8_t* val) { + auto elemPosInPageInBytes = posInPage * numBytesForElement; + memcpy(getPage(pageIdx) + elemPosInPageInBytes, val, numBytesForElement); + } + inline uint8_t* getValue(common::offset_t nodeOffset) { + auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage); + auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement; + return getPage(cursor.pageIdx) + elemPosInPageInBytes; + } + + template + void templateCopyValuesToPage(const PageElementCursor& pageCursor, arrow::Array& array, + uint64_t posInArray, uint64_t numValues, Args... args) { + const auto& data = array.data(); + auto valuesInArray = data->GetValues(1); + auto valuesInPage = (T*)(getPage(pageCursor.pageIdx)); + if (data->MayHaveNulls()) { + for (auto i = 0u; i < numValues; i++) { + if (data->IsNull(i + posInArray)) { + continue; + } + valuesInPage[i + pageCursor.elemPosInPage] = valuesInArray[i + posInArray]; + } + } else { + for (auto i = 0u; i < numValues; i++) { + valuesInPage[i + pageCursor.elemPosInPage] = valuesInArray[i + posInArray]; + } + } + } + + template + void templateCopyValuesAsStringToPage(const PageElementCursor& pageCursor, arrow::Array& array, + uint64_t posInArray, uint64_t numValues, Args... args) { + auto& stringArray = (arrow::StringArray&)array; + auto data = stringArray.data(); + if (data->MayHaveNulls()) { + for (auto i = 0u; i < numValues; i++) { + if (data->IsNull(i + posInArray)) { + continue; + } + auto value = stringArray.GetView(i + posInArray); + setValueFromString(value.data(), value.length(), pageCursor.pageIdx, + i + pageCursor.elemPosInPage, args...); + } + } else { + for (auto i = 0u; i < numValues; i++) { + auto value = stringArray.GetView(i + posInArray); + setValueFromString(value.data(), value.length(), pageCursor.pageIdx, + i + pageCursor.elemPosInPage, args...); + } + } + } + + template + void setValueFromString(const char* value, uint64_t length, common::page_idx_t pageIdx, + uint64_t posInPage, Args... args) { + throw common::CopyException("Unsupported type to set element for " + + std::string(value, length) + " at pos " + + std::to_string(posInPage)); + } + +private: + common::DataType dataType; + uint16_t numBytesForElement; + uint64_t numElementsInAPage; + common::page_idx_t startPageIdx; + common::page_idx_t endPageIdx; + std::unique_ptr pages; +}; + +template<> +void InMemColumnChunk::templateCopyValuesToPage(const PageElementCursor& pageCursor, + arrow::Array& array, uint64_t posInArray, uint64_t numValues); +template<> +void InMemColumnChunk::templateCopyValuesToPage( + const PageElementCursor& pageCursor, arrow::Array& array, uint64_t posInArray, + uint64_t numValues); +template<> +void InMemColumnChunk::templateCopyValuesToPage( + const PageElementCursor& pageCursor, arrow::Array& array, uint64_t posInArray, + uint64_t numValues); +// Specialized optimization for copy string values from arrow to pages. +// The optimization is to use string_view to avoid creation of std::string. +// Possible switches: date, timestamp, interval, fixed/var list, string +template<> +void InMemColumnChunk::templateCopyValuesToPage(const PageElementCursor& pageCursor, arrow::Array& array, + uint64_t posInArray, uint64_t numValues, InMemOverflowFile* overflowFile, + PageByteCursor& overflowCursor, common::CopyDescription& copyDesc); + +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage, + InMemOverflowFile* overflowFile, PageByteCursor& overflowCursor); +template<> +void InMemColumnChunk::setValueFromString(const char* value, + uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage, + common::CopyDescription& copyDescription); +template<> +void InMemColumnChunk::setValueFromString(const char* value, uint64_t length, common::page_idx_t pageIdx, + uint64_t posInPage, InMemOverflowFile* overflowFile, PageByteCursor& overflowCursor, + common::CopyDescription& copyDescription); +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage); +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage); +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage); + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/in_mem_storage_structure/in_mem_node_column.h b/src/include/storage/in_mem_storage_structure/in_mem_node_column.h new file mode 100644 index 0000000000..effd1505f0 --- /dev/null +++ b/src/include/storage/in_mem_storage_structure/in_mem_node_column.h @@ -0,0 +1,101 @@ +#pragma once + +#include "storage/in_mem_storage_structure/in_mem_column_chunk.h" + +namespace kuzu { +namespace storage { +// TODO(GUODONG): Currently, we have both InMemNodeColumn and InMemColumn. This is a temporary +// solution for now to allow gradual refactorings. Eventually, we should only have InMemColumn. +class InMemNodeColumn { +public: + InMemNodeColumn(std::string filePath, common::DataType dataType, uint16_t numBytesForElement, + uint64_t numElements); + virtual ~InMemNodeColumn() = default; + + // Encode and flush null bits. + virtual inline void saveToFile() { flushNullBits(); } + + inline common::DataType getDataType() { return dataType; } + + // Flush pages which holds nodeOffsets in the range [startOffset, endOffset] (inclusive). + void flushChunk( + InMemColumnChunk* chunk, common::offset_t startOffset, common::offset_t endOffset); + + void setElementInChunk(InMemColumnChunk* chunk, common::offset_t offset, const uint8_t* val); + + virtual inline InMemOverflowFile* getInMemOverflowFile() { return nullptr; } + + inline common::NullMask* getNullMask() { return nullMask.get(); } + inline uint16_t getNumBytesForElement() const { return numBytesForElement; } + inline uint64_t getNumElementsInAPage() const { return numElementsInAPage; } + +private: + std::unique_ptr encodeNullBits(common::page_idx_t pageIdx); + void flushNullBits(); + +protected: + std::string filePath; + uint16_t numBytesForElement; + uint64_t numElementsInAPage; + uint64_t nullEntriesOffset; + uint64_t numNullEntriesPerPage; + std::unique_ptr fileHandle; + std::unique_ptr nullMask; + common::DataType dataType; + uint64_t numElements; + uint64_t numPages; +}; + +class NodeInMemColumnWithOverflow : public InMemNodeColumn { + +public: + NodeInMemColumnWithOverflow(std::string filePath, common::DataType dataType, + uint16_t numBytesForElement, uint64_t numElements) + : InMemNodeColumn{ + std::move(filePath), std::move(dataType), numBytesForElement, numElements} { + assert( + this->dataType.typeID == common::STRING || this->dataType.typeID == common::VAR_LIST); + inMemOverflowFile = + make_unique(StorageUtils::getOverflowFileName(this->filePath)); + } + + inline InMemOverflowFile* getInMemOverflowFile() override { return inMemOverflowFile.get(); } + + void saveToFile() override { + InMemNodeColumn::saveToFile(); + inMemOverflowFile->flush(); + } + +protected: + std::unique_ptr inMemOverflowFile; +}; + +class NodeInMemColumnFactory { +public: + static std::unique_ptr getNodeInMemColumn( + const std::string& filePath, const common::DataType& dataType, uint64_t numElements) { + switch (dataType.typeID) { + case common::INT64: + case common::INT32: + case common::INT16: + case common::DOUBLE: + case common::FLOAT: + case common::BOOL: + case common::DATE: + case common::TIMESTAMP: + case common::INTERVAL: + case common::FIXED_LIST: + return make_unique( + filePath, dataType, common::Types::getDataTypeSize(dataType), numElements); + case common::STRING: + case common::VAR_LIST: + return make_unique( + filePath, dataType, common::Types::getDataTypeSize(dataType), numElements); + default: + throw common::CopyException("Invalid type for property column creation."); + } + } +}; + +} // namespace storage +} // namespace kuzu diff --git a/src/include/storage/in_mem_storage_structure/node_in_mem_column.h b/src/include/storage/in_mem_storage_structure/node_in_mem_column.h deleted file mode 100644 index 1d1927573d..0000000000 --- a/src/include/storage/in_mem_storage_structure/node_in_mem_column.h +++ /dev/null @@ -1,147 +0,0 @@ -#pragma once - -#include "common/types/types.h" -#include "storage/buffer_manager/buffer_manager.h" -#include "storage/storage_structure/in_mem_file.h" - -namespace kuzu { -namespace storage { - -class InMemColumnChunk { -public: - InMemColumnChunk(common::offset_t startOffset, common::offset_t endOffset, - uint16_t numBytesForElement, uint64_t numElementsInAPage) - : numBytesForElement{numBytesForElement}, numElementsInAPage{numElementsInAPage} { - startPageIdx = CursorUtils::getPageIdx(startOffset, numElementsInAPage); - endPageIdx = CursorUtils::getPageIdx(endOffset, numElementsInAPage); - auto numPages = endPageIdx - startPageIdx + 1; - pages = std::make_unique(numPages * common::BufferPoolConstants::PAGE_4KB_SIZE); - memset(pages.get(), 0, numPages * common::BufferPoolConstants::PAGE_4KB_SIZE); - } - - inline uint8_t* getPage(common::page_idx_t pageIdx) { - assert(pageIdx <= endPageIdx && pageIdx >= startPageIdx); - auto pageIdxInSet = pageIdx - startPageIdx; - return pages.get() + (pageIdxInSet * common::BufferPoolConstants::PAGE_4KB_SIZE); - } - - void copyValue(common::offset_t nodeOffset, const uint8_t* val) { - auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage); - auto page = getPage(cursor.pageIdx); - auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement; - memcpy(page + elemPosInPageInBytes, val, numBytesForElement); - } - - uint8_t* getValue(common::offset_t nodeOffset) { - auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage); - auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement; - return getPage(cursor.pageIdx) + elemPosInPageInBytes; - } - -private: - uint16_t numBytesForElement; - uint64_t numElementsInAPage; - common::page_idx_t startPageIdx; - common::page_idx_t endPageIdx; - std::unique_ptr pages; -}; - -class NodeInMemColumn { - -public: - NodeInMemColumn(std::string filePath, common::DataType dataType, uint16_t numBytesForElement, - uint64_t numElements, uint64_t numBlocks); - virtual ~NodeInMemColumn() = default; - - // Encode and flush null bits. - virtual inline void saveToFile() { flushNullBits(); } - - inline common::DataType getDataType() { return dataType; } - - // Flush pages which holds nodeOffsets in the range [startOffset, endOffset] (inclusive). - void flushChunk( - InMemColumnChunk* chunk, common::offset_t startOffset, common::offset_t endOffset); - - void setElementInChunk(InMemColumnChunk* chunk, common::offset_t offset, const uint8_t* val); - - inline bool isNullAtNodeOffset(common::offset_t nodeOffset) { - return nullMask->isNull(nodeOffset); - } - - virtual inline InMemOverflowFile* getInMemOverflowFile() { return nullptr; } - - inline common::NullMask* getNullMask() { return nullMask.get(); } - inline uint16_t getNumBytesForElement() const { return numBytesForElement; } - inline uint64_t getNumElementsInAPage() const { return numElementsInAPage; } - -private: - std::unique_ptr encodeNullBits(common::page_idx_t pageIdx); - void flushNullBits(); - -protected: - std::string filePath; - uint16_t numBytesForElement; - uint64_t numElementsInAPage; - uint64_t nullEntriesOffset; - uint64_t numNullEntriesPerPage; - std::unique_ptr fileHandle; - std::unique_ptr nullMask; - common::DataType dataType; - uint64_t numElements; - uint64_t numPages; -}; - -class NodeInMemColumnWithOverflow : public NodeInMemColumn { - -public: - NodeInMemColumnWithOverflow(std::string filePath, common::DataType dataType, - uint16_t numBytesForElement, uint64_t numElements, uint64_t numBlocks) - : NodeInMemColumn{std::move(filePath), std::move(dataType), numBytesForElement, numElements, - numBlocks} { - assert( - this->dataType.typeID == common::STRING || this->dataType.typeID == common::VAR_LIST); - inMemOverflowFile = - make_unique(StorageUtils::getOverflowFileName(this->filePath)); - } - - inline InMemOverflowFile* getInMemOverflowFile() override { return inMemOverflowFile.get(); } - - void saveToFile() override { - NodeInMemColumn::saveToFile(); - inMemOverflowFile->flush(); - } - -protected: - std::unique_ptr inMemOverflowFile; -}; - -class InMemBMPageCollectionFactory { - -public: - static std::unique_ptr getInMemBMPageCollection(const std::string& filePath, - const common::DataType& dataType, uint64_t numElements, uint64_t numBlocks) { - switch (dataType.typeID) { - case common::INT64: - case common::INT32: - case common::INT16: - case common::DOUBLE: - case common::FLOAT: - case common::BOOL: - case common::DATE: - case common::TIMESTAMP: - case common::INTERVAL: - case common::FIXED_LIST: - return make_unique(filePath, dataType, - common::Types::getDataTypeSize(dataType), numElements, numBlocks); - case common::STRING: - case common::VAR_LIST: - return make_unique(filePath, dataType, - common::Types::getDataTypeSize(dataType), numElements, numBlocks); - default: - throw common::CopyException("Invalid type for property column creation."); - } - } -}; - -} // namespace storage -} // namespace kuzu diff --git a/src/include/storage/storage_structure/in_mem_file.h b/src/include/storage/storage_structure/in_mem_file.h index 932c5049bb..15fef6a671 100644 --- a/src/include/storage/storage_structure/in_mem_file.h +++ b/src/include/storage/storage_structure/in_mem_file.h @@ -63,7 +63,8 @@ class InMemOverflowFile : public InMemFile { // These two functions copies a string/list value to the file according to the cursor. Multiple // threads coordinate by that each thread takes the full control of a single page at a time. // When the page is not exhausted, each thread can write without an exclusive lock. - common::ku_string_t copyString(const char* rawString, PageByteCursor& overflowCursor); + common::ku_string_t copyString( + const char* rawString, common::page_offset_t length, PageByteCursor& overflowCursor); common::ku_list_t copyList(const common::Value& listValue, PageByteCursor& overflowCursor); // Copy overflow data at srcOverflow into dstKUString. diff --git a/src/storage/copier/node_copier.cpp b/src/storage/copier/node_copier.cpp index d31799bb56..0ae2628db1 100644 --- a/src/storage/copier/node_copier.cpp +++ b/src/storage/copier/node_copier.cpp @@ -10,49 +10,47 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -template -std::unique_ptr> CSVNodeCopySharedState::getMorsel() { - lock_t lck{this->mtx}; +template +std::unique_ptr CSVNodeCopySharedState::getMorsel() { + std::unique_lock lck{this->mtx}; std::shared_ptr recordBatch; - auto result = csvStreamingReader->ReadNext(&recordBatch); - if (!result.ok()) { - throw common::CopyException( - "Error reading a batch of rows from CSV using Arrow CSVStreamingReader."); - } + TableCopier::throwCopyExceptionIfNotOK(csvStreamingReader->ReadNext(&recordBatch)); + auto morselStartOffset = this->startOffset; + auto morselBlockIdx = this->blockIdx; if (recordBatch == NULL) { - return make_unique(std::move(recordBatch), INVALID_NODE_OFFSET, - NodeCopyMorsel::INVALID_BLOCK_IDX); + morselStartOffset = INVALID_NODE_OFFSET; + morselBlockIdx = NodeCopyMorsel::INVALID_BLOCK_IDX; + } else { + this->startOffset += recordBatch->num_rows(); + this->blockIdx++; } - auto numRows = recordBatch->num_rows(); - this->startOffset += numRows; - this->blockIdx++; - return make_unique( - std::move(recordBatch), this->startOffset - numRows, this->blockIdx - 1); + return make_unique(morselStartOffset, morselBlockIdx, std::move(recordBatch)); } -template -std::unique_ptr> -ParquetNodeCopySharedState::getMorsel() { - lock_t lck{this->mtx}; +template +std::unique_ptr ParquetNodeCopySharedState::getMorsel() { + std::unique_lock lck{this->mtx}; std::shared_ptr currTable; + std::shared_ptr recordBatch; if (this->blockIdx == numBlocks) { - return make_unique(std::move(currTable), INVALID_NODE_OFFSET, - NodeCopyMorsel::INVALID_BLOCK_IDX); - } - auto result = parquetReader->RowGroup(this->blockIdx)->ReadTable(&currTable); - if (!result.ok()) { - throw common::CopyException( - "Error reading a batch of rows from CSV using Arrow CSVStreamingReader."); + return make_unique( + INVALID_NODE_OFFSET, NodeCopyMorsel::INVALID_BLOCK_IDX, std::move(recordBatch)); } + TableCopier::throwCopyExceptionIfNotOK( + parquetReader->RowGroup(this->blockIdx)->ReadTable(&currTable)); if (currTable == NULL) { - return make_unique(std::move(currTable), INVALID_NODE_OFFSET, - NodeCopyMorsel::INVALID_BLOCK_IDX); + return make_unique( + INVALID_NODE_OFFSET, NodeCopyMorsel::INVALID_BLOCK_IDX, std::move(recordBatch)); } + // TODO(GUODONG): We assume here each time the table only contains one record batch. Needs to + // verify if this always holds true. + arrow::TableBatchReader batchReader(*currTable); + TableCopier::throwCopyExceptionIfNotOK(batchReader.ReadNext(&recordBatch)); auto numRows = currTable->num_rows(); this->startOffset += numRows; this->blockIdx++; - return make_unique( - std::move(currTable), this->startOffset - numRows, this->blockIdx - 1); + return make_unique( + this->startOffset - numRows, this->blockIdx - 1, std::move(recordBatch)); } void NodeCopier::initializeColumnsAndLists() { @@ -60,8 +58,8 @@ void NodeCopier::initializeColumnsAndLists() { for (auto& property : tableSchema->properties) { auto fName = StorageUtils::getNodePropertyColumnFName( outputDirectory, tableSchema->tableID, property.propertyID, DBFileType::WAL_VERSION); - columns[property.propertyID] = InMemBMPageCollectionFactory::getInMemBMPageCollection( - fName, property.dataType, numRows, getNumBlocks()); + columns[property.propertyID] = + NodeInMemColumnFactory::getNodeInMemColumn(fName, property.dataType, numRows); } logger->info("Done initializing in memory columns."); } @@ -87,26 +85,26 @@ void NodeCopier::saveToFile() { assert(!columns.empty()); for (auto& [_, column] : columns) { taskScheduler.scheduleTask(CopyTaskFactory::createCopyTask( - [&](NodeInMemColumn* x) { x->saveToFile(); }, column.get())); + [&](InMemNodeColumn* x) { x->saveToFile(); }, column.get())); } taskScheduler.waitAllTasksToCompleteOrError(); logger->debug("Done writing node columns to disk."); } -template +template void NodeCopier::populateColumns(processor::ExecutionContext* executionContext) { logger->info("Populating properties"); - auto pkIndex = std::make_unique>( - StorageUtils::getNodeIndexFName( - this->outputDirectory, tableSchema->tableID, DBFileType::WAL_VERSION), - reinterpret_cast(tableSchema)->getPrimaryKey().dataType); + auto pkIndex = + std::make_unique>(StorageUtils::getNodeIndexFName(this->outputDirectory, + tableSchema->tableID, DBFileType::WAL_VERSION), + reinterpret_cast(tableSchema)->getPrimaryKey().dataType); pkIndex->bulkReserve(numRows); switch (copyDescription.fileType) { case CopyDescription::FileType::CSV: - populateColumnsFromCSV(executionContext, pkIndex); + populateColumnsFromCSV(executionContext, pkIndex); break; case CopyDescription::FileType::PARQUET: - populateColumnsFromParquet(executionContext, pkIndex); + populateColumnsFromParquet(executionContext, pkIndex); break; default: { throw CopyException(StringUtils::string_format("Unsupported file type {}.", @@ -118,24 +116,25 @@ void NodeCopier::populateColumns(processor::ExecutionContext* executionContext) logger->info("Done populating properties, constructing the pk index."); } -template -void NodeCopier::populateColumnsFromCSV(processor::ExecutionContext* executionContext, - std::unique_ptr>& pkIndex) { +template +void NodeCopier::populateColumnsFromCSV( + processor::ExecutionContext* executionContext, std::unique_ptr>& pkIndex) { for (auto& filePath : copyDescription.filePaths) { std::shared_ptr csvStreamingReader = initCSVReader(filePath); CSVNodeCopySharedState sharedState{ filePath, pkIndex.get(), fileBlockInfos.at(filePath).startOffset, csvStreamingReader}; + logger->info("Create shared state for file {}, startOffset.", filePath, + fileBlockInfos.at(filePath).startOffset); taskScheduler.scheduleTaskAndWaitOrError( CopyTaskFactory::createParallelCopyTask(executionContext->numThreads, - batchPopulateColumnsTask, &sharedState, this, - executionContext), + populateColumnChunksTask, &sharedState, this, executionContext, *logger), executionContext); } } -template -void NodeCopier::populateColumnsFromParquet(processor::ExecutionContext* executionContext, - std::unique_ptr>& pkIndex) { +template +void NodeCopier::populateColumnsFromParquet( + processor::ExecutionContext* executionContext, std::unique_ptr>& pkIndex) { for (auto& filePath : copyDescription.filePaths) { std::unique_ptr parquetReader = initParquetReader(filePath); ParquetNodeCopySharedState sharedState{filePath, pkIndex.get(), @@ -143,15 +142,14 @@ void NodeCopier::populateColumnsFromParquet(processor::ExecutionContext* executi std::move(parquetReader)}; taskScheduler.scheduleTaskAndWaitOrError( CopyTaskFactory::createParallelCopyTask(executionContext->numThreads, - batchPopulateColumnsTask, &sharedState, this, - executionContext), + populateColumnChunksTask, &sharedState, this, executionContext, *logger), executionContext); } } -template +template void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile, - common::NullMask* nullMask, HashIndexBuilder* pkIndex, offset_t startOffset, + common::NullMask* nullMask, HashIndexBuilder* pkIndex, offset_t startOffset, uint64_t numValues) { for (auto i = 0u; i < numValues; i++) { auto offset = i + startOffset; @@ -162,36 +160,45 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove } } -template -void NodeCopier::batchPopulateColumnsTask(NodeCopySharedState* sharedState, - NodeCopier* copier, processor::ExecutionContext* executionContext) { +template +void NodeCopier::populateColumnChunksTask(NodeCopySharedState* sharedState, NodeCopier* copier, + processor::ExecutionContext* executionContext, spdlog::logger& logger) { while (true) { if (executionContext->clientContext->isInterrupted()) { throw common::InterruptException{}; } auto result = sharedState->getMorsel(); + logger.info("Get a morsel from file {}.", sharedState->filePath); if (!result->success()) { + logger.info("No more morsels from file {}.", sharedState->filePath); break; } - auto numLinesInCurBlock = - copier->fileBlockInfos.at(sharedState->filePath).numLinesPerBlock[result->blockIdx]; + auto numLinesInCurBlock = result->recordBatch->num_rows(); // Create a column chunk for tuples within the [StartOffset, endOffset] range. auto endOffset = result->startOffset + numLinesInCurBlock - 1; + logger.info("Processing a morsel from file {}: {} to {}.", sharedState->filePath, + result->startOffset, endOffset); std::unordered_map> chunks; for (auto& [propertyIdx, column] : copier->columns) { - chunks[propertyIdx] = std::make_unique(result->startOffset, endOffset, - column->getNumBytesForElement(), column->getNumElementsInAPage()); + chunks[propertyIdx] = + std::make_unique(column->getDataType(), result->startOffset, + endOffset, column->getNumBytesForElement(), column->getNumElementsInAPage()); } std::vector overflowCursors(copier->tableSchema->getNumProperties()); for (auto& [propertyIdx, column] : copier->columns) { - putPropsOfLinesIntoColumns(chunks.at(propertyIdx).get(), column.get(), - result->getArrowColumns()[propertyIdx], result->startOffset, numLinesInCurBlock, + logger.info("copy array into column chunk for property {}.", propertyIdx); + copyArrayIntoColumnChunk(chunks.at(propertyIdx).get(), column.get(), + *result->recordBatch->column(propertyIdx), result->startOffset, copier->copyDescription, overflowCursors[propertyIdx]); } + logger.info("Flush a morsel from file {}: {} to {}.", sharedState->filePath, + result->startOffset, endOffset); // Flush each page within the [StartOffset, endOffset] range. for (auto& [propertyIdx, column] : copier->columns) { column->flushChunk(chunks[propertyIdx].get(), result->startOffset, endOffset); } + logger.info("Populate hash index for a morsel from file {}: {} to {}.", + sharedState->filePath, result->startOffset, endOffset); auto primaryKeyPropertyIdx = reinterpret_cast(copier->tableSchema)->primaryKeyPropertyID; auto pkColumn = copier->columns.at(primaryKeyPropertyIdx).get(); @@ -200,19 +207,63 @@ void NodeCopier::batchPopulateColumnsTask(NodeCopySharedState -void NodeCopier::putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column, - std::shared_ptr arrowArray, common::offset_t startNodeOffset, - uint64_t numLinesInCurBlock, CopyDescription& copyDescription, PageByteCursor& overflowCursor) { - auto setElementFunc = - getSetElementFunc(column->getDataType().typeID, copyDescription, overflowCursor); - for (auto i = 0u; i < numLinesInCurBlock; i++) { - auto nodeOffset = startNodeOffset + i; - auto currentToken = arrowArray->GetScalar(i); - if ((*currentToken)->is_valid) { - auto stringToken = currentToken->get()->ToString(); - setElementFunc(column, columnChunk, nodeOffset, stringToken); +void NodeCopier::copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, InMemNodeColumn* column, + arrow::Array& arrowArray, common::offset_t startNodeOffset, CopyDescription& copyDescription, + PageByteCursor& overflowCursor) { + uint64_t numValuesLeftToCopy = arrowArray.length(); + while (numValuesLeftToCopy > 0) { + auto posInArray = arrowArray.length() - numValuesLeftToCopy; + auto pageCursor = CursorUtils::getPageElementCursor( + startNodeOffset + posInArray, column->getNumElementsInAPage()); + auto numValuesToCopy = std::min( + numValuesLeftToCopy, column->getNumElementsInAPage() - pageCursor.elemPosInPage); + for (auto i = 0u; i < numValuesToCopy; i++) { + column->getNullMask()->setNull( + startNodeOffset + posInArray + i, arrowArray.IsNull(posInArray + i)); + } + switch (arrowArray.type_id()) { + case arrow::Type::BOOL: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::INT16: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::INT32: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::INT64: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::DOUBLE: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::FLOAT: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::DATE32: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::TIMESTAMP: { + columnChunk->templateCopyValuesToPage( + pageCursor, arrowArray, posInArray, numValuesToCopy); + } break; + case arrow::Type::STRING: { + columnChunk->templateCopyValuesToPage(pageCursor, arrowArray, posInArray, numValuesToCopy, + column->getInMemOverflowFile(), overflowCursor, copyDescription); + } break; + default: { + throw CopyException("Unsupported data type " + arrowArray.type()->ToString()); + } } + numValuesLeftToCopy -= numValuesToCopy; } } @@ -236,40 +287,5 @@ void NodeCopier::appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overf } } -set_element_func_t NodeCopier::getSetElementFunc(common::DataTypeID typeID, - common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor) { - switch (typeID) { - case common::DataTypeID::INT64: - return setNumericElement; - case common::DataTypeID::INT32: - return setNumericElement; - case common::DataTypeID::INT16: - return setNumericElement; - case common::DataTypeID::DOUBLE: - return setNumericElement; - case common::DataTypeID::FLOAT: - return setNumericElement; - case common::DataTypeID::BOOL: - return setBoolElement; - case common::DataTypeID::DATE: - return setTimeElement; - case common::DataTypeID::TIMESTAMP: - return setTimeElement; - case common::DataTypeID::INTERVAL: - return setTimeElement; - case common::DataTypeID::STRING: - return std::bind(setStringElement, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4, pageByteCursor); - case common::DataTypeID::VAR_LIST: - return std::bind(setVarListElement, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4, copyDescription, pageByteCursor); - case common::DataTypeID::FIXED_LIST: - return std::bind(setFixedListElement, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4, copyDescription); - default: - throw common::RuntimeException("Unsupported data type."); - } -} - } // namespace storage } // namespace kuzu diff --git a/src/storage/copier/npy_node_copier.cpp b/src/storage/copier/npy_node_copier.cpp index 98112d6c02..a51c2ccf07 100644 --- a/src/storage/copier/npy_node_copier.cpp +++ b/src/storage/copier/npy_node_copier.cpp @@ -135,7 +135,7 @@ void NpyNodeCopier::batchPopulateColumnsTask(common::property_id_t primaryKeypro auto endNodeOffset = startNodeOffset + numLinesInCurBlock - 1; auto& column = copier->columns[propertyID]; std::unique_ptr columnChunk = - std::make_unique(startNodeOffset, endNodeOffset, + std::make_unique(column->getDataType(), startNodeOffset, endNodeOffset, column->getNumBytesForElement(), column->getNumElementsInAPage()); for (auto i = startNodeOffset; i < startNodeOffset + numLinesInCurBlock; ++i) { void* data = npyReader->getPointerToRow(i); @@ -145,7 +145,7 @@ void NpyNodeCopier::batchPopulateColumnsTask(common::property_id_t primaryKeypro if (propertyID == primaryKeypropertyID) { auto pkColumn = copier->columns.at(primaryKeypropertyID).get(); - populatePKIndex(columnChunk.get(), pkColumn->getInMemOverflowFile(), + populatePKIndex(columnChunk.get(), pkColumn->getInMemOverflowFile(), pkColumn->getNullMask(), pkIndex, startNodeOffset, numLinesInCurBlock); } diff --git a/src/storage/copier/rel_copier.cpp b/src/storage/copier/rel_copier.cpp index c4d60186a4..3bf99784ac 100644 --- a/src/storage/copier/rel_copier.cpp +++ b/src/storage/copier/rel_copier.cpp @@ -435,7 +435,7 @@ void RelCopier::putPropsOfLineIntoColumns(RelCopier* copier, } break; case STRING: { auto kuStr = inMemOverflowFilePerPropertyID[propertyID]->copyString( - data, inMemOverflowFileCursors[propertyID]); + data, strlen(data), inMemOverflowFileCursors[propertyID]); putValueIntoColumns(propertyID, directionTablePropertyColumns, nodeIDs, reinterpret_cast(&kuStr)); } break; @@ -531,7 +531,7 @@ void RelCopier::putPropsOfLineIntoLists(RelCopier* copier, } break; case STRING: { auto kuStr = inMemOverflowFilesPerProperty[propertyID]->copyString( - data, inMemOverflowFileCursors[propertyID]); + data, strlen(data), inMemOverflowFileCursors[propertyID]); putValueIntoLists(propertyID, directionTablePropertyLists, directionTableAdjLists, nodeIDs, reversePos, reinterpret_cast(&kuStr)); } break; diff --git a/src/storage/copier/table_copier.cpp b/src/storage/copier/table_copier.cpp index faf6f7a957..201fed45da 100644 --- a/src/storage/copier/table_copier.cpp +++ b/src/storage/copier/table_copier.cpp @@ -311,10 +311,10 @@ std::shared_ptr TableCopier::toArrowDataType(const common::Data } case common::TIMESTAMP: case common::DATE: + case common::INTERVAL: case common::FIXED_LIST: case common::VAR_LIST: - case common::STRING: - case common::INTERVAL: { + case common::STRING: { return arrow::utf8(); } default: { diff --git a/src/storage/in_mem_storage_structure/CMakeLists.txt b/src/storage/in_mem_storage_structure/CMakeLists.txt index 887cd453a2..7275d3ffba 100644 --- a/src/storage/in_mem_storage_structure/CMakeLists.txt +++ b/src/storage/in_mem_storage_structure/CMakeLists.txt @@ -1,8 +1,9 @@ add_library(kuzu_storage_in_mem_storage_structure OBJECT in_mem_column.cpp + in_mem_column_chunk.cpp in_mem_lists.cpp - node_in_mem_column.cpp) + in_mem_node_column.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp b/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp new file mode 100644 index 0000000000..351ef80fe5 --- /dev/null +++ b/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp @@ -0,0 +1,130 @@ +#include "storage/in_mem_storage_structure/in_mem_column_chunk.h" + +#include "common/types/types.h" + +namespace kuzu { +namespace storage { + +InMemColumnChunk::InMemColumnChunk(common::DataType dataType, common::offset_t startOffset, + common::offset_t endOffset, uint16_t numBytesForElement, uint64_t numElementsInAPage) + : dataType{std::move(dataType)}, numBytesForElement{numBytesForElement}, + numElementsInAPage{numElementsInAPage} { + startPageIdx = CursorUtils::getPageIdx(startOffset, numElementsInAPage); + endPageIdx = CursorUtils::getPageIdx(endOffset, numElementsInAPage); + auto numPages = endPageIdx - startPageIdx + 1; + pages = std::make_unique(numPages * common::BufferPoolConstants::PAGE_4KB_SIZE); + memset(pages.get(), 0, numPages * common::BufferPoolConstants::PAGE_4KB_SIZE); +} + +template<> +void InMemColumnChunk::templateCopyValuesToPage(const PageElementCursor& pageCursor, + arrow::Array& array, uint64_t posInArray, uint64_t numValues) { + auto& boolArray = (arrow::BooleanArray&)array; + auto data = boolArray.data(); + auto valuesInPage = (bool*)(getPage(pageCursor.pageIdx)); + if (data->MayHaveNulls()) { + for (auto i = 0u; i < numValues; i++) { + if (data->IsNull(i + posInArray)) { + continue; + } + valuesInPage[i + pageCursor.elemPosInPage] = boolArray.Value(i + posInArray); + } + } else { + for (auto i = 0u; i < numValues; i++) { + valuesInPage[i + pageCursor.elemPosInPage] = boolArray.Value(i + posInArray); + } + } +} + +template<> +void InMemColumnChunk::templateCopyValuesToPage(const PageElementCursor& pageCursor, arrow::Array& array, + uint64_t posInArray, uint64_t numValues, InMemOverflowFile* overflowFile, + PageByteCursor& overflowCursor, common::CopyDescription& copyDesc) { + switch (dataType.typeID) { + case common::DATE: { + templateCopyValuesAsStringToPage(pageCursor, array, posInArray, numValues); + } break; + case common::TIMESTAMP: { + templateCopyValuesAsStringToPage( + pageCursor, array, posInArray, numValues); + } break; + case common::INTERVAL: { + templateCopyValuesAsStringToPage( + pageCursor, array, posInArray, numValues); + } break; + case common::FIXED_LIST: { + // Fixed list is a fixed-sized blob. + templateCopyValuesAsStringToPage( + pageCursor, array, posInArray, numValues, copyDesc); + } break; + case common::VAR_LIST: { + templateCopyValuesAsStringToPage( + pageCursor, array, posInArray, numValues, overflowFile, overflowCursor, copyDesc); + } break; + case common::STRING: { + templateCopyValuesAsStringToPage( + pageCursor, array, posInArray, numValues, overflowFile, overflowCursor); + } break; + default: { + throw common::CopyException("Unsupported data type for string copy."); + } + } +} + +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage, + InMemOverflowFile* overflowFile, PageByteCursor& overflowCursor) { + if (length > common::BufferPoolConstants::PAGE_4KB_SIZE) { + length = common::BufferPoolConstants::PAGE_4KB_SIZE; + } + auto val = overflowFile->copyString(value, length, overflowCursor); + copyValue(pageIdx, posInPage, (uint8_t*)&val); +} + +// Fixed list +template<> +void InMemColumnChunk::setValueFromString(const char* value, + uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage, + common::CopyDescription& copyDescription) { + auto fixedListVal = + TableCopier::getArrowFixedList(value, 1, length - 2, dataType, copyDescription); + copyValue(pageIdx, posInPage, fixedListVal.get()); +} + +// Var list +template<> +void InMemColumnChunk::setValueFromString(const char* value, uint64_t length, common::page_idx_t pageIdx, + uint64_t posInPage, InMemOverflowFile* overflowFile, PageByteCursor& overflowCursor, + common::CopyDescription& copyDescription) { + auto varListVal = TableCopier::getArrowVarList(value, 1, length - 2, dataType, copyDescription); + auto val = overflowFile->copyList(*varListVal, overflowCursor); + copyValue(pageIdx, posInPage, (uint8_t*)&val); +} + +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage) { + auto val = common::Interval::FromCString(value, length); + copyValue(pageIdx, posInPage, (uint8_t*)&val); +} + +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage) { + auto val = common::Date::FromCString(value, length); + copyValue(pageIdx, posInPage, (uint8_t*)&val); +} + +template<> +void InMemColumnChunk::setValueFromString( + const char* value, uint64_t length, common::page_idx_t pageIdx, uint64_t posInPage) { + auto val = common::Timestamp::FromCString(value, length); + copyValue(pageIdx, posInPage, (uint8_t*)&val); +} + +} // namespace storage +} // namespace kuzu diff --git a/src/storage/in_mem_storage_structure/node_in_mem_column.cpp b/src/storage/in_mem_storage_structure/in_mem_node_column.cpp similarity index 90% rename from src/storage/in_mem_storage_structure/node_in_mem_column.cpp rename to src/storage/in_mem_storage_structure/in_mem_node_column.cpp index d27c5ec508..71a0512b4d 100644 --- a/src/storage/in_mem_storage_structure/node_in_mem_column.cpp +++ b/src/storage/in_mem_storage_structure/in_mem_node_column.cpp @@ -1,4 +1,4 @@ -#include "storage/in_mem_storage_structure/node_in_mem_column.h" +#include "storage/in_mem_storage_structure/in_mem_node_column.h" #include "common/constants.h" #include "common/file_utils.h" @@ -7,8 +7,8 @@ namespace kuzu { namespace storage { -NodeInMemColumn::NodeInMemColumn(std::string filePath, common::DataType dataType, - uint16_t numBytesForElement, uint64_t numElements, uint64_t numBlocks) +InMemNodeColumn::InMemNodeColumn(std::string filePath, common::DataType dataType, + uint16_t numBytesForElement, uint64_t numElements) : filePath{std::move(filePath)}, dataType{std::move(dataType)}, numBytesForElement{numBytesForElement}, numElements{numElements} { numElementsInAPage = PageUtils::getNumElementsInAPage(numBytesForElement, true /* hasNull */); @@ -23,7 +23,7 @@ NodeInMemColumn::NodeInMemColumn(std::string filePath, common::DataType dataType fileHandle = std::make_unique(this->filePath, O_WRONLY); } -void NodeInMemColumn::flushChunk( +void InMemNodeColumn::flushChunk( InMemColumnChunk* chunk, common::offset_t startOffset, common::offset_t endOffset) { auto firstPageIdx = CursorUtils::getPageIdx(startOffset, numElementsInAPage); auto firstPageOffset = (startOffset % numElementsInAPage) * numBytesForElement; @@ -51,15 +51,16 @@ void NodeInMemColumn::flushChunk( } } -void NodeInMemColumn::setElementInChunk( +void InMemNodeColumn::setElementInChunk( InMemColumnChunk* chunk, common::offset_t offset, const uint8_t* val) { - chunk->copyValue(offset, val); + auto pageCursor = PageUtils::getPageElementCursorForPos(offset, numElementsInAPage); + chunk->copyValue(pageCursor.pageIdx, pageCursor.elemPosInPage, val); if (nullMask != nullptr) { nullMask->setNull(offset, false); } } -std::unique_ptr NodeInMemColumn::encodeNullBits(common::page_idx_t pageIdx) { +std::unique_ptr InMemNodeColumn::encodeNullBits(common::page_idx_t pageIdx) { auto startElemOffset = pageIdx * numElementsInAPage; auto nullEntries = std::make_unique(numNullEntriesPerPage); std::fill(nullEntries.get(), nullEntries.get() + numNullEntriesPerPage, @@ -74,7 +75,7 @@ std::unique_ptr NodeInMemColumn::encodeNullBits(common::page_idx_t p return nullEntries; } -void NodeInMemColumn::flushNullBits() { +void InMemNodeColumn::flushNullBits() { auto maxPageIdx = numPages - 1; for (auto pageIdx = 0u; pageIdx < maxPageIdx; pageIdx++) { auto startElemOffset = pageIdx * numElementsInAPage; diff --git a/src/storage/storage_structure/in_mem_file.cpp b/src/storage/storage_structure/in_mem_file.cpp index 2fbed23d54..591f41f7c0 100644 --- a/src/storage/storage_structure/in_mem_file.cpp +++ b/src/storage/storage_structure/in_mem_file.cpp @@ -70,9 +70,10 @@ ku_string_t InMemOverflowFile::appendString(const char* rawString) { return result; } -ku_string_t InMemOverflowFile::copyString(const char* rawString, PageByteCursor& overflowCursor) { +ku_string_t InMemOverflowFile::copyString( + const char* rawString, common::page_offset_t length, PageByteCursor& overflowCursor) { ku_string_t kuString; - kuString.len = strlen(rawString); + kuString.len = length; std::memcpy(kuString.prefix, rawString, kuString.len <= ku_string_t::SHORT_STR_LENGTH ? kuString.len : ku_string_t::PREFIX_LENGTH); if (kuString.len > ku_string_t::SHORT_STR_LENGTH) { @@ -102,7 +103,8 @@ void InMemOverflowFile::copyVarSizedValuesInList(ku_list_t& resultKUList, const std::vector kuStrings(listVal.listVal.size()); for (auto i = 0u; i < listVal.listVal.size(); i++) { assert(listVal.listVal[i]->dataType.typeID == STRING); - kuStrings[i] = copyString(listVal.listVal[i]->strVal.c_str(), overflowCursor); + kuStrings[i] = copyString(listVal.listVal[i]->strVal.c_str(), + listVal.listVal[i]->strVal.length(), overflowCursor); } std::shared_lock lck(lock); for (auto i = 0u; i < listVal.listVal.size(); i++) {