diff --git a/src/include/common/copier_config/copier_config.h b/src/include/common/copier_config/copier_config.h index fd665c948d..8f38e02e4e 100644 --- a/src/include/common/copier_config/copier_config.h +++ b/src/include/common/copier_config/copier_config.h @@ -11,6 +11,13 @@ namespace kuzu { namespace common { struct CSVReaderConfig { + char escapeChar; + char delimiter; + char quoteChar; + char listBeginChar; + char listEndChar; + bool hasHeader; + CSVReaderConfig() : escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR}, delimiter{CopyConstants::DEFAULT_CSV_DELIMITER}, @@ -18,13 +25,14 @@ struct CSVReaderConfig { listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR}, listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR}, hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {} + CSVReaderConfig(const CSVReaderConfig& other) + : escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar}, + listBeginChar{other.listBeginChar}, + listEndChar{other.listEndChar}, hasHeader{other.hasHeader} {} - char escapeChar; - char delimiter; - char quoteChar; - char listBeginChar; - char listEndChar; - bool hasHeader; + inline std::unique_ptr copy() const { + return std::make_unique(*this); + } }; struct CopyDescription { diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 8a8d9b5961..03b7c60cfe 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -72,8 +72,8 @@ class CopyNode : public Sink { for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) { dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get()); } - localNodeGroup = - std::make_unique(sharedState->tableSchema, &sharedState->copyDesc); + localNodeGroup = std::make_unique( + sharedState->tableSchema, sharedState->copyDesc.csvReaderConfig.get()); } void initGlobalStateInternal(ExecutionContext* context) final; diff --git a/src/include/storage/copier/column_chunk.h b/src/include/storage/copier/column_chunk.h index 4e8b3aba26..b2b6857eb5 100644 --- a/src/include/storage/copier/column_chunk.h +++ b/src/include/storage/copier/column_chunk.h @@ -56,8 +56,8 @@ class ColumnChunk { // ColumnChunks must be initialized after construction, so this constructor should only be used // through the ColumnChunkFactory - explicit ColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription, - bool hasNullChunk = true); + explicit ColumnChunk(common::LogicalType dataType, + std::unique_ptr csvReaderConfig, bool hasNullChunk = true); virtual ~ColumnChunk() = default; template @@ -158,7 +158,7 @@ class ColumnChunk { std::unique_ptr buffer; std::unique_ptr nullChunk; std::vector> childrenChunks; - const common::CopyDescription* copyDescription; + std::unique_ptr csvReaderConfig; uint64_t numValues; }; @@ -176,9 +176,10 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const { class BoolColumnChunk : public ColumnChunk { public: - BoolColumnChunk(common::CopyDescription* copyDescription, bool hasNullChunk = true) - : ColumnChunk( - common::LogicalType(common::LogicalTypeID::BOOL), copyDescription, hasNullChunk) {} + BoolColumnChunk( + std::unique_ptr csvReaderConfig, bool hasNullChunk = true) + : ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL), std::move(csvReaderConfig), + hasNullChunk) {} void append(common::ValueVector* vector, common::offset_t startPosInChunk) final; @@ -237,8 +238,9 @@ class NullColumnChunk : public BoolColumnChunk { class FixedListColumnChunk : public ColumnChunk { public: - FixedListColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription) - : ColumnChunk(std::move(dataType), copyDescription, true /* hasNullChunk */) {} + FixedListColumnChunk( + common::LogicalType dataType, std::unique_ptr csvReaderConfig) + : ColumnChunk(std::move(dataType), std::move(csvReaderConfig), true /* hasNullChunk */) {} void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final; @@ -262,7 +264,7 @@ class SerialColumnChunk : public ColumnChunk { struct ColumnChunkFactory { static std::unique_ptr createColumnChunk( - const common::LogicalType& dataType, common::CopyDescription* copyDescription = nullptr); + const common::LogicalType& dataType, common::CSVReaderConfig* csvReaderConfig = nullptr); }; template<> diff --git a/src/include/storage/copier/node_group.h b/src/include/storage/copier/node_group.h index c5de1e53d1..ee5fa66343 100644 --- a/src/include/storage/copier/node_group.h +++ b/src/include/storage/copier/node_group.h @@ -12,8 +12,7 @@ class NodeTable; class NodeGroup { public: - explicit NodeGroup( - catalog::TableSchema* schema, common::CopyDescription* copyDescription = nullptr); + explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig); explicit NodeGroup(NodeTable* table); inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; } diff --git a/src/include/storage/copier/string_column_chunk.h b/src/include/storage/copier/string_column_chunk.h index ea35c1e625..6a15eda39f 100644 --- a/src/include/storage/copier/string_column_chunk.h +++ b/src/include/storage/copier/string_column_chunk.h @@ -9,7 +9,8 @@ namespace storage { class StringColumnChunk : public ColumnChunk { public: - StringColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription); + StringColumnChunk( + common::LogicalType dataType, std::unique_ptr csvReaderConfig); void resetToEmpty() final; void append(common::ValueVector* vector, common::offset_t startPosInChunk) final; diff --git a/src/include/storage/copier/struct_column_chunk.h b/src/include/storage/copier/struct_column_chunk.h index 41a121fb6d..9f37f35239 100644 --- a/src/include/storage/copier/struct_column_chunk.h +++ b/src/include/storage/copier/struct_column_chunk.h @@ -7,7 +7,8 @@ namespace storage { class StructColumnChunk : public ColumnChunk { public: - StructColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription); + StructColumnChunk( + common::LogicalType dataType, std::unique_ptr csvReaderConfig); protected: void append( diff --git a/src/include/storage/copier/var_list_column_chunk.h b/src/include/storage/copier/var_list_column_chunk.h index 2d72f1eb64..f0aedc5cc6 100644 --- a/src/include/storage/copier/var_list_column_chunk.h +++ b/src/include/storage/copier/var_list_column_chunk.h @@ -3,8 +3,6 @@ #include "arrow/array/array_nested.h" #include "storage/copier/column_chunk.h" -using namespace kuzu::common; - namespace kuzu { namespace storage { @@ -13,13 +11,14 @@ struct VarListDataColumnChunk { uint64_t capacity; explicit VarListDataColumnChunk(std::unique_ptr dataChunk) - : dataColumnChunk{std::move(dataChunk)}, capacity{StorageConstants::NODE_GROUP_SIZE} {} + : dataColumnChunk{std::move(dataChunk)}, capacity{ + common::StorageConstants::NODE_GROUP_SIZE} {} void reset(); void resizeBuffer(uint64_t numValues); - inline void append(ValueVector* dataVector) const { + inline void append(common::ValueVector* dataVector) const { dataColumnChunk->append(dataVector, dataColumnChunk->getNumValues()); } @@ -32,7 +31,8 @@ struct VarListDataColumnChunk { class VarListColumnChunk : public ColumnChunk { public: - VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription); + VarListColumnChunk( + common::LogicalType dataType, std::unique_ptr csvReaderConfig); inline ColumnChunk* getDataColumnChunk() const { return varListDataColumnChunk.dataColumnChunk.get(); @@ -54,17 +54,18 @@ class VarListColumnChunk : public ColumnChunk { return varListDataColumnChunk.dataColumnChunk->getNumPages() + ColumnChunk::getNumPages(); } - void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override; + void append( + arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) override; void append(ColumnChunk* other, common::offset_t startPosInOtherChunk, common::offset_t startPosInChunk, uint32_t numValuesToAppend) final; void copyVarListFromArrowString( - arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend); + arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend); template void copyVarListFromArrowList( - arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) { + arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) { auto listArray = (T*)array; auto dataChunkOffsetToAppend = varListDataColumnChunk.getNumValues(); auto curListOffset = varListDataColumnChunk.getNumValues(); @@ -91,7 +92,7 @@ class VarListColumnChunk : public ColumnChunk { return getListOffset(offset + 1) - getListOffset(offset); } - inline offset_t getListOffset(common::offset_t offset) const { + inline common::offset_t getListOffset(common::offset_t offset) const { return offset == 0 ? 0 : getValue(offset - 1); } }; diff --git a/src/include/storage/in_mem_storage_structure/in_mem_column.h b/src/include/storage/in_mem_storage_structure/in_mem_column.h index 749efe1ded..af861fd600 100644 --- a/src/include/storage/in_mem_storage_structure/in_mem_column.h +++ b/src/include/storage/in_mem_storage_structure/in_mem_column.h @@ -15,20 +15,21 @@ class InMemColumn { void flushChunk(InMemColumnChunk* chunk); std::unique_ptr createInMemColumnChunk(common::offset_t startNodeOffset, - common::offset_t endNodeOffset, std::unique_ptr copyDescription) { + common::offset_t endNodeOffset, common::CSVReaderConfig* csvReaderConfig) { + auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr; switch (dataType.getPhysicalType()) { case common::PhysicalTypeID::STRING: case common::PhysicalTypeID::VAR_LIST: { return std::make_unique(dataType, startNodeOffset, - endNodeOffset, std::move(copyDescription), inMemOverflowFile.get()); + endNodeOffset, std::move(csvReaderConfigCopy), inMemOverflowFile.get()); } case common::PhysicalTypeID::FIXED_LIST: { return std::make_unique( - dataType, startNodeOffset, endNodeOffset, std::move(copyDescription)); + dataType, startNodeOffset, endNodeOffset, std::move(csvReaderConfigCopy)); } default: { return std::make_unique( - dataType, startNodeOffset, endNodeOffset, std::move(copyDescription)); + dataType, startNodeOffset, endNodeOffset, std::move(csvReaderConfigCopy)); } } } diff --git a/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h b/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h index 6d5c541a54..0bb1d6830b 100644 --- a/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h +++ b/src/include/storage/in_mem_storage_structure/in_mem_column_chunk.h @@ -19,7 +19,7 @@ struct PropertyCopyState { class InMemColumnChunk { public: InMemColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset, - common::offset_t endNodeOffset, std::unique_ptr copyDescription, + common::offset_t endNodeOffset, std::unique_ptr csvReaderConfig, bool requireNullBits = true); virtual ~InMemColumnChunk() = default; @@ -81,16 +81,16 @@ class InMemColumnChunk { std::uint64_t numBytes; std::unique_ptr buffer; std::unique_ptr nullChunk; - std::unique_ptr copyDescription; + std::unique_ptr csvReaderConfig; }; class InMemColumnChunkWithOverflow : public InMemColumnChunk { public: InMemColumnChunkWithOverflow(common::LogicalType dataType, common::offset_t startNodeOffset, - common::offset_t endNodeOffset, std::unique_ptr copyDescription, + common::offset_t endNodeOffset, std::unique_ptr csvReaderConfig, InMemOverflowFile* inMemOverflowFile) : InMemColumnChunk{std::move(dataType), startNodeOffset, endNodeOffset, - std::move(copyDescription)}, + std::move(csvReaderConfig)}, inMemOverflowFile{inMemOverflowFile}, blobBuffer{std::make_unique( common::BufferPoolConstants::PAGE_4KB_SIZE)} {} @@ -123,7 +123,7 @@ class InMemColumnChunkWithOverflow : public InMemColumnChunk { class InMemFixedListColumnChunk : public InMemColumnChunk { public: InMemFixedListColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset, - common::offset_t endNodeOffset, std::unique_ptr copyDescription); + common::offset_t endNodeOffset, std::unique_ptr csvReaderConfig); void flush(common::FileInfo* walFileInfo) override; diff --git a/src/include/storage/in_mem_storage_structure/in_mem_lists.h b/src/include/storage/in_mem_storage_structure/in_mem_lists.h index dc66c7bc65..40a90619b8 100644 --- a/src/include/storage/in_mem_storage_structure/in_mem_lists.h +++ b/src/include/storage/in_mem_storage_structure/in_mem_lists.h @@ -37,9 +37,9 @@ class InMemLists { public: InMemLists(std::string fName, common::LogicalType dataType, uint64_t numBytesForElement, uint64_t numNodes, std::shared_ptr listHeadersBuilder, - std::unique_ptr copyDescription, bool hasNullBytes) + std::unique_ptr csvReaderConfig, bool hasNullBytes) : InMemLists{std::move(fName), numBytesForElement, std::move(dataType), numNodes, - std::move(copyDescription), hasNullBytes} { + std::move(csvReaderConfig), hasNullBytes} { this->listHeadersBuilder = std::move(listHeadersBuilder); } virtual ~InMemLists() = default; @@ -75,7 +75,7 @@ class InMemLists { protected: InMemLists(std::string fName, uint64_t numBytesForElement, common::LogicalType dataType, - uint64_t numNodes, std::unique_ptr copyDescription, + uint64_t numNodes, std::unique_ptr csvReaderConfig, bool hasNullBytes); private: @@ -111,7 +111,7 @@ class InMemLists { uint64_t numElementsInAPage; std::unique_ptr listsMetadataBuilder; std::shared_ptr listHeadersBuilder; - std::unique_ptr copyDescription; + std::unique_ptr csvReaderConfig; }; class InMemRelIDLists : public InMemLists { @@ -127,7 +127,7 @@ class InMemListsWithOverflow : public InMemLists { protected: InMemListsWithOverflow(std::string fName, common::LogicalType dataType, uint64_t numNodes, std::shared_ptr listHeadersBuilder, - std::unique_ptr copyDescription); + std::unique_ptr csvReaderConfig); void copyArrowArray(arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists, arrow::Array* array, PropertyCopyState* copyState) final; @@ -182,16 +182,16 @@ class InMemListLists : public InMemListsWithOverflow { public: InMemListLists(std::string fName, common::LogicalType dataType, uint64_t numNodes, std::shared_ptr listHeadersBuilder, - std::unique_ptr copyDescription) + std::unique_ptr csvReaderConfig) : InMemListsWithOverflow{std::move(fName), std::move(dataType), numNodes, - std::move(listHeadersBuilder), std::move(copyDescription)} {}; + std::move(listHeadersBuilder), std::move(csvReaderConfig)} {}; }; class InMemListsFactory { public: static std::unique_ptr getInMemPropertyLists(const std::string& fName, const common::LogicalType& dataType, uint64_t numNodes, - std::unique_ptr copyDescription, + common::CSVReaderConfig* csvReaderConfig, std::shared_ptr listHeadersBuilder = nullptr); }; diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index 79a172848e..b220b662b1 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -95,7 +95,7 @@ std::unique_ptr PlanMapper::createCopyRelColumnsOrLists( static std::unique_ptr initializeDirectedInMemRelData( common::RelDataDirection direction, RelTableSchema* schema, NodesStore& nodesStore, - const std::string& outputDirectory, const CopyDescription* copyDescription) { + const std::string& outputDirectory, CSVReaderConfig* csvReaderConfig) { auto directedInMemRelData = std::make_unique(); auto boundTableID = schema->getBoundTableID(direction); auto numNodes = @@ -109,7 +109,7 @@ static std::unique_ptr initializeDirectedInMemRelData( schema->tableID, direction, DBFileType::ORIGINAL), LogicalType(LogicalTypeID::INTERNAL_ID)); relColumns->adjColumnChunk = - relColumns->adjColumn->createInMemColumnChunk(0, numNodes - 1, copyDescription->copy()); + relColumns->adjColumn->createInMemColumnChunk(0, numNodes - 1, csvReaderConfig); for (auto i = 0u; i < schema->getNumProperties(); ++i) { auto propertyID = schema->properties[i]->getPropertyID(); auto propertyDataType = schema->properties[i]->getDataType(); @@ -119,7 +119,7 @@ static std::unique_ptr initializeDirectedInMemRelData( propertyID, std::make_unique(fName, *propertyDataType)); relColumns->propertyColumnChunks.emplace( propertyID, relColumns->propertyColumns.at(propertyID) - ->createInMemColumnChunk(0, numNodes - 1, copyDescription->copy())); + ->createInMemColumnChunk(0, numNodes - 1, csvReaderConfig)); } directedInMemRelData->setColumns(std::move(relColumns)); } else { @@ -136,7 +136,7 @@ static std::unique_ptr initializeDirectedInMemRelData( direction, property->getPropertyID(), DBFileType::ORIGINAL); relLists->propertyLists.emplace(property->getPropertyID(), InMemListsFactory::getInMemPropertyLists(fName, *property->getDataType(), numNodes, - copyDescription->copy(), relLists->adjList->getListHeadersBuilder())); + csvReaderConfig, relLists->adjList->getListHeadersBuilder())); } directedInMemRelData->setRelLists(std::move(relLists)); } @@ -151,10 +151,10 @@ std::unique_ptr PlanMapper::mapCopyRelFrom( auto tableSchema = reinterpret_cast(copyFromInfo->tableSchema); auto fwdRelData = initializeDirectedInMemRelData(RelDataDirection::FWD, tableSchema, storageManager.getNodesStore(), storageManager.getDirectory(), - copyFromInfo->fileScanInfo->copyDesc.get()); + copyFromInfo->fileScanInfo->copyDesc->csvReaderConfig.get()); auto bwdRelData = initializeDirectedInMemRelData(RelDataDirection::BWD, tableSchema, storageManager.getNodesStore(), storageManager.getDirectory(), - copyFromInfo->fileScanInfo->copyDesc.get()); + copyFromInfo->fileScanInfo->copyDesc->csvReaderConfig.get()); auto copyRelSharedState = std::make_shared(tableSchema->tableID, &storageManager.getRelsStore().getRelsStatistics(), std::move(fwdRelData), std::move(bwdRelData), memoryManager); diff --git a/src/storage/copier/column_chunk.cpp b/src/storage/copier/column_chunk.cpp index 9abd3b9414..e2d1dd938b 100644 --- a/src/storage/copier/column_chunk.cpp +++ b/src/storage/copier/column_chunk.cpp @@ -14,9 +14,10 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -ColumnChunk::ColumnChunk(LogicalType dataType, CopyDescription* copyDescription, bool hasNullChunk) +ColumnChunk::ColumnChunk( + LogicalType dataType, std::unique_ptr csvReaderConfig, bool hasNullChunk) : dataType{std::move(dataType)}, numBytesPerValue{getDataTypeSizeInChunk(this->dataType)}, - copyDescription{copyDescription}, numValues{0} { + csvReaderConfig{std::move(csvReaderConfig)}, numValues{0} { if (hasNullChunk) { nullChunk = std::make_unique(); } @@ -457,11 +458,12 @@ void FixedListColumnChunk::write(const Value& fixedListVal, uint64_t posToWrite) } std::unique_ptr ColumnChunkFactory::createColumnChunk( - const LogicalType& dataType, CopyDescription* copyDescription) { + const LogicalType& dataType, CSVReaderConfig* csvReaderConfig) { + auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr; std::unique_ptr chunk; switch (dataType.getPhysicalType()) { case PhysicalTypeID::BOOL: { - chunk = std::make_unique(copyDescription); + chunk = std::make_unique(std::move(csvReaderConfigCopy)); } break; case PhysicalTypeID::INT64: case PhysicalTypeID::INT32: @@ -473,20 +475,20 @@ std::unique_ptr ColumnChunkFactory::createColumnChunk( if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) { chunk = std::make_unique(); } else { - chunk = std::make_unique(dataType, copyDescription); + chunk = std::make_unique(dataType, std::move(csvReaderConfigCopy)); } } break; case PhysicalTypeID::FIXED_LIST: { - chunk = std::make_unique(dataType, copyDescription); + chunk = std::make_unique(dataType, std::move(csvReaderConfigCopy)); } break; case PhysicalTypeID::STRING: { - chunk = std::make_unique(dataType, copyDescription); + chunk = std::make_unique(dataType, std::move(csvReaderConfigCopy)); } break; case PhysicalTypeID::VAR_LIST: { - chunk = std::make_unique(dataType, copyDescription); + chunk = std::make_unique(dataType, std::move(csvReaderConfigCopy)); } break; case PhysicalTypeID::STRUCT: { - chunk = std::make_unique(dataType, copyDescription); + chunk = std::make_unique(dataType, std::move(csvReaderConfigCopy)); } break; default: { throw NotImplementedException("ColumnChunkFactory::createColumnChunk for data type " + @@ -510,8 +512,8 @@ void ColumnChunk::setValueFromString(const char* value, uint64_t length, u // Fixed list template<> void ColumnChunk::setValueFromString(const char* value, uint64_t length, uint64_t pos) { - auto fixedListVal = TableCopyUtils::getArrowFixedList( - value, 1, length - 2, dataType, *copyDescription->csvReaderConfig); + auto fixedListVal = + TableCopyUtils::getArrowFixedList(value, 1, length - 2, dataType, *csvReaderConfig); memcpy(buffer.get() + pos * numBytesPerValue, fixedListVal.get(), numBytesPerValue); } diff --git a/src/storage/copier/node_group.cpp b/src/storage/copier/node_group.cpp index 20b8869364..3ca658c073 100644 --- a/src/storage/copier/node_group.cpp +++ b/src/storage/copier/node_group.cpp @@ -11,11 +11,11 @@ using namespace kuzu::transaction; namespace kuzu { namespace storage { -NodeGroup::NodeGroup(TableSchema* schema, CopyDescription* copyDescription) +NodeGroup::NodeGroup(TableSchema* schema, CSVReaderConfig* csvReaderConfig) : nodeGroupIdx{UINT64_MAX}, numNodes{0} { for (auto& property : schema->properties) { chunks[property->getPropertyID()] = - ColumnChunkFactory::createColumnChunk(*property->getDataType(), copyDescription); + ColumnChunkFactory::createColumnChunk(*property->getDataType(), csvReaderConfig); } } diff --git a/src/storage/copier/string_column_chunk.cpp b/src/storage/copier/string_column_chunk.cpp index b4b11082b6..775d4ee818 100644 --- a/src/storage/copier/string_column_chunk.cpp +++ b/src/storage/copier/string_column_chunk.cpp @@ -10,8 +10,9 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -StringColumnChunk::StringColumnChunk(LogicalType dataType, CopyDescription* copyDescription) - : ColumnChunk{std::move(dataType), copyDescription} { +StringColumnChunk::StringColumnChunk( + LogicalType dataType, std::unique_ptr csvReaderConfig) + : ColumnChunk{std::move(dataType), std::move(csvReaderConfig)} { overflowFile = std::make_unique(); overflowCursor.pageIdx = 0; overflowCursor.offsetInPage = 0; diff --git a/src/storage/copier/struct_column_chunk.cpp b/src/storage/copier/struct_column_chunk.cpp index 688c9ca999..64d8d52142 100644 --- a/src/storage/copier/struct_column_chunk.cpp +++ b/src/storage/copier/struct_column_chunk.cpp @@ -14,12 +14,14 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -StructColumnChunk::StructColumnChunk(LogicalType dataType, CopyDescription* copyDescription) - : ColumnChunk{std::move(dataType), copyDescription} { +StructColumnChunk::StructColumnChunk( + LogicalType dataType, std::unique_ptr csvReaderConfig) + : ColumnChunk{std::move(dataType), std::move(csvReaderConfig)} { auto fieldTypes = StructType::getFieldTypes(&this->dataType); childrenChunks.resize(fieldTypes.size()); for (auto i = 0u; i < fieldTypes.size(); i++) { - childrenChunks[i] = ColumnChunkFactory::createColumnChunk(*fieldTypes[i], copyDescription); + childrenChunks[i] = + ColumnChunkFactory::createColumnChunk(*fieldTypes[i], this->csvReaderConfig.get()); } } @@ -74,8 +76,8 @@ void StructColumnChunk::setStructFields(const char* value, uint64_t length, uint switch (dataType.getLogicalTypeID()) { case LogicalTypeID::STRUCT: { auto structString = std::string(value, length).substr(1, length - 2); - auto structFieldIdxAndValuePairs = TableCopyUtils::parseStructFieldNameAndValues( - dataType, structString, *copyDescription->csvReaderConfig); + auto structFieldIdxAndValuePairs = + TableCopyUtils::parseStructFieldNameAndValues(dataType, structString, *csvReaderConfig); for (auto& fieldIdxAndValue : structFieldIdxAndValuePairs) { setValueToStructField(pos, fieldIdxAndValue.fieldValue, fieldIdxAndValue.fieldIdx); } diff --git a/src/storage/copier/var_list_column_chunk.cpp b/src/storage/copier/var_list_column_chunk.cpp index 1d1741bf57..d5ca4cc7a2 100644 --- a/src/storage/copier/var_list_column_chunk.cpp +++ b/src/storage/copier/var_list_column_chunk.cpp @@ -4,6 +4,8 @@ #include "common/types/value/nested.h" #include "storage/copier/table_copy_utils.h" +using namespace kuzu::common; + namespace kuzu { namespace storage { @@ -21,10 +23,11 @@ void VarListDataColumnChunk::resizeBuffer(uint64_t numValues) { dataColumnChunk->resize(capacity); } -VarListColumnChunk::VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription) - : ColumnChunk{std::move(dataType), copyDescription, true /* hasNullChunk */}, +VarListColumnChunk::VarListColumnChunk( + LogicalType dataType, std::unique_ptr csvReaderConfig) + : ColumnChunk{std::move(dataType), std::move(csvReaderConfig), true /* hasNullChunk */}, varListDataColumnChunk{ColumnChunkFactory::createColumnChunk( - *VarListType::getChildType(&this->dataType), copyDescription)} { + *VarListType::getChildType(&this->dataType), this->csvReaderConfig.get())} { assert(this->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST); } @@ -82,7 +85,7 @@ void VarListColumnChunk::copyVarListFromArrowString( } auto value = stringArray->GetView(i); auto listVal = TableCopyUtils::getVarListValue( - value.data(), 1, value.size() - 2, dataType, *copyDescription->csvReaderConfig); + value.data(), 1, value.size() - 2, dataType, *csvReaderConfig); write(*listVal, posInChunk); } } else { @@ -90,7 +93,7 @@ void VarListColumnChunk::copyVarListFromArrowString( auto value = stringArray->GetView(i); auto posInChunk = startPosInChunk + i; auto listVal = TableCopyUtils::getVarListValue( - value.data(), 1, value.size() - 2, dataType, *copyDescription->csvReaderConfig); + value.data(), 1, value.size() - 2, dataType, *csvReaderConfig); write(*listVal, posInChunk); } } @@ -109,8 +112,8 @@ void VarListColumnChunk::write(const Value& listVal, uint64_t posToWrite) { } void VarListColumnChunk::setValueFromString(const char* value, uint64_t length, uint64_t pos) { - auto listVal = TableCopyUtils::getVarListValue( - value, 1, length - 2, dataType, *copyDescription->csvReaderConfig); + auto listVal = + TableCopyUtils::getVarListValue(value, 1, length - 2, dataType, *csvReaderConfig); write(*listVal, pos); } diff --git a/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp b/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp index dfb63d65b9..02a1c214d2 100644 --- a/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp +++ b/src/storage/in_mem_storage_structure/in_mem_column_chunk.cpp @@ -13,14 +13,14 @@ namespace kuzu { namespace storage { InMemColumnChunk::InMemColumnChunk(LogicalType dataType, offset_t startNodeOffset, - offset_t endNodeOffset, std::unique_ptr copyDescription, bool requireNullBits) - : dataType{std::move(dataType)}, startNodeOffset{startNodeOffset}, copyDescription{std::move( - copyDescription)} { + offset_t endNodeOffset, std::unique_ptr csvReaderConfig, bool requireNullBits) + : dataType{std::move(dataType)}, startNodeOffset{startNodeOffset}, csvReaderConfig{std::move( + csvReaderConfig)} { numBytesPerValue = getDataTypeSizeInColumn(this->dataType); numBytes = numBytesPerValue * (endNodeOffset - startNodeOffset + 1); buffer = std::make_unique(numBytes); if (requireNullBits) { - auto copyDescCloned = this->copyDescription ? this->copyDescription->copy() : nullptr; + auto copyDescCloned = this->csvReaderConfig ? this->csvReaderConfig->copy() : nullptr; nullChunk = std::make_unique(LogicalType{LogicalTypeID::BOOL}, startNodeOffset, endNodeOffset, std::move(copyDescCloned), false /* hasNull */); memset(nullChunk->getData(), true, nullChunk->getNumBytes()); @@ -369,16 +369,16 @@ void InMemColumnChunkWithOverflow::setValWithOverflow( template<> void InMemColumnChunkWithOverflow::setValWithOverflow( PageByteCursor& overflowCursor, const char* value, uint64_t length, uint64_t pos) { - auto varListVal = TableCopyUtils::getVarListValue( - value, 1, length - 2, dataType, *copyDescription->csvReaderConfig); + auto varListVal = + TableCopyUtils::getVarListValue(value, 1, length - 2, dataType, *csvReaderConfig); auto val = inMemOverflowFile->copyList(*varListVal, overflowCursor); setValue(val, pos); } InMemFixedListColumnChunk::InMemFixedListColumnChunk(LogicalType dataType, offset_t startNodeOffset, - offset_t endNodeOffset, std::unique_ptr copyDescription) + offset_t endNodeOffset, std::unique_ptr csvReaderConfig) : InMemColumnChunk{ - std::move(dataType), startNodeOffset, endNodeOffset, std::move(copyDescription)} { + std::move(dataType), startNodeOffset, endNodeOffset, std::move(csvReaderConfig)} { numElementsInAPage = PageUtils::getNumElementsInAPage(numBytesPerValue, false /* hasNull */); auto startNodeOffsetCursor = PageUtils::getPageByteCursorForPos(startNodeOffset, numElementsInAPage, numBytesPerValue); @@ -426,8 +426,8 @@ void InMemColumnChunk::setValueFromString(const char* value, uint64_t leng template<> void InMemColumnChunk::setValueFromString( const char* value, uint64_t length, uint64_t pos) { - auto fixedListVal = TableCopyUtils::getArrowFixedList( - value, 1, length - 2, dataType, *copyDescription->csvReaderConfig); + auto fixedListVal = + TableCopyUtils::getArrowFixedList(value, 1, length - 2, dataType, *csvReaderConfig); // TODO(Guodong): Keep value size as a class field. memcpy(buffer.get() + pos * storage::StorageUtils::getDataTypeSize(dataType), fixedListVal.get(), storage::StorageUtils::getDataTypeSize(dataType)); diff --git a/src/storage/in_mem_storage_structure/in_mem_lists.cpp b/src/storage/in_mem_storage_structure/in_mem_lists.cpp index 68a818f24d..598102d020 100644 --- a/src/storage/in_mem_storage_structure/in_mem_lists.cpp +++ b/src/storage/in_mem_storage_structure/in_mem_lists.cpp @@ -24,9 +24,9 @@ PageElementCursor InMemLists::calcPageElementCursor( } InMemLists::InMemLists(std::string fName, uint64_t numBytesForElement, LogicalType dataType, - uint64_t numNodes, std::unique_ptr copyDescription, bool hasNullBytes) + uint64_t numNodes, std::unique_ptr csvReaderConfig, bool hasNullBytes) : fName{std::move(fName)}, dataType{std::move(dataType)}, - numBytesForElement{numBytesForElement}, copyDescription{std::move(copyDescription)} { + numBytesForElement{numBytesForElement}, csvReaderConfig{std::move(csvReaderConfig)} { listsMetadataBuilder = make_unique(this->fName); auto numChunks = StorageUtils::getListChunkIdx(numNodes); if (0 != (numNodes & (ListsMetadataConstants::LISTS_CHUNK_SIZE - 1))) { @@ -168,8 +168,8 @@ void InMemLists::setValueFromString( template<> void InMemLists::setValueFromString( offset_t nodeOffset, uint64_t pos, const char* val, uint64_t length) { - auto fixedListVal = TableCopyUtils::getArrowFixedList( - val, 1, length - 2, dataType, *copyDescription->csvReaderConfig); + auto fixedListVal = + TableCopyUtils::getArrowFixedList(val, 1, length - 2, dataType, *csvReaderConfig); setValue(nodeOffset, pos, fixedListVal.get()); } @@ -330,9 +330,9 @@ void InMemAdjLists::saveToFile() { InMemListsWithOverflow::InMemListsWithOverflow(std::string fName, LogicalType dataType, uint64_t numNodes, std::shared_ptr listHeadersBuilder, - std::unique_ptr copyDescription) + std::unique_ptr csvReaderConfig) : InMemLists{std::move(fName), StorageUtils::getDataTypeSize(dataType), std::move(dataType), - numNodes, std::move(copyDescription), true}, + numNodes, std::move(csvReaderConfig), true}, blobBuffer{std::make_unique(BufferPoolConstants::PAGE_4KB_SIZE)} { assert(this->dataType.getLogicalTypeID() == LogicalTypeID::STRING || this->dataType.getLogicalTypeID() == LogicalTypeID::VAR_LIST); @@ -422,8 +422,7 @@ template<> void InMemListsWithOverflow::setValueFromStringWithOverflow( PageByteCursor& overflowCursor, offset_t nodeOffset, uint64_t pos, const char* val, uint64_t length) { - auto varList = TableCopyUtils::getVarListValue( - val, 1, length - 2, dataType, *copyDescription->csvReaderConfig); + auto varList = TableCopyUtils::getVarListValue(val, 1, length - 2, dataType, *csvReaderConfig); auto listVal = overflowInMemFile->copyList(*varList, overflowCursor); setValue(nodeOffset, pos, (uint8_t*)&listVal); } @@ -434,9 +433,9 @@ void InMemListsWithOverflow::saveToFile() { } std::unique_ptr InMemListsFactory::getInMemPropertyLists(const std::string& fName, - const LogicalType& dataType, uint64_t numNodes, - std::unique_ptr copyDescription, + const LogicalType& dataType, uint64_t numNodes, common::CSVReaderConfig* csvReaderConfig, std::shared_ptr listHeadersBuilder) { + auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr; switch (dataType.getLogicalTypeID()) { case LogicalTypeID::INT64: case LogicalTypeID::INT32: @@ -451,13 +450,13 @@ std::unique_ptr InMemListsFactory::getInMemPropertyLists(const std:: case LogicalTypeID::FIXED_LIST: return make_unique(fName, dataType, storage::StorageUtils::getDataTypeSize(dataType), numNodes, - std::move(listHeadersBuilder), std::move(copyDescription), true /* hasNULLBytes */); + std::move(listHeadersBuilder), std::move(csvReaderConfigCopy), true /* hasNULLBytes */); case LogicalTypeID::BLOB: case LogicalTypeID::STRING: return make_unique(fName, numNodes, std::move(listHeadersBuilder)); case LogicalTypeID::VAR_LIST: - return make_unique( - fName, dataType, numNodes, std::move(listHeadersBuilder), std::move(copyDescription)); + return make_unique(fName, dataType, numNodes, std::move(listHeadersBuilder), + std::move(csvReaderConfigCopy)); case LogicalTypeID::INTERNAL_ID: return make_unique(fName, numNodes, std::move(listHeadersBuilder)); default: