Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace CopyDescription with CSVReaderConfig for storage classes. #2046

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@ namespace kuzu {
namespace common {

struct CSVReaderConfig {
char escapeChar;
char delimiter;
char quoteChar;
char listBeginChar;
char listEndChar;
bool hasHeader;

CSVReaderConfig()
: escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR},
delimiter{CopyConstants::DEFAULT_CSV_DELIMITER},
quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR},
listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR},
listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR},
hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {}
CSVReaderConfig(const CSVReaderConfig& other)
: escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar},
listBeginChar{other.listBeginChar},
listEndChar{other.listEndChar}, hasHeader{other.hasHeader} {}

char escapeChar;
char delimiter;
char quoteChar;
char listBeginChar;
char listEndChar;
bool hasHeader;
inline std::unique_ptr<CSVReaderConfig> copy() const {
return std::make_unique<CSVReaderConfig>(*this);
}
};

struct CopyDescription {
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
localNodeGroup =
std::make_unique<storage::NodeGroup>(sharedState->tableSchema, &sharedState->copyDesc);
localNodeGroup = std::make_unique<storage::NodeGroup>(
sharedState->tableSchema, sharedState->copyDesc.csvReaderConfig.get());
}

void initGlobalStateInternal(ExecutionContext* context) final;
Expand Down
20 changes: 11 additions & 9 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class ColumnChunk {

// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
explicit ColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription,
bool hasNullChunk = true);
explicit ColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true);
virtual ~ColumnChunk() = default;

template<typename T>
Expand Down Expand Up @@ -158,7 +158,7 @@ class ColumnChunk {
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
const common::CopyDescription* copyDescription;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
uint64_t numValues;
};

Expand All @@ -176,9 +176,10 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {

class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(common::CopyDescription* copyDescription, bool hasNullChunk = true)
: ColumnChunk(
common::LogicalType(common::LogicalTypeID::BOOL), copyDescription, hasNullChunk) {}
BoolColumnChunk(
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL), std::move(csvReaderConfig),
hasNullChunk) {}

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

Expand Down Expand Up @@ -237,8 +238,9 @@ class NullColumnChunk : public BoolColumnChunk {

class FixedListColumnChunk : public ColumnChunk {
public:
FixedListColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription)
: ColumnChunk(std::move(dataType), copyDescription, true /* hasNullChunk */) {}
FixedListColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
: ColumnChunk(std::move(dataType), std::move(csvReaderConfig), true /* hasNullChunk */) {}

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
Expand All @@ -262,7 +264,7 @@ class SerialColumnChunk : public ColumnChunk {

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(
const common::LogicalType& dataType, common::CopyDescription* copyDescription = nullptr);
const common::LogicalType& dataType, common::CSVReaderConfig* csvReaderConfig = nullptr);
};

template<>
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/copier/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ class NodeTable;

class NodeGroup {
public:
explicit NodeGroup(
catalog::TableSchema* schema, common::CopyDescription* copyDescription = nullptr);
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig);
explicit NodeGroup(NodeTable* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StringColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

void resetToEmpty() final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StructColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

protected:
void append(
Expand Down
19 changes: 10 additions & 9 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include "arrow/array/array_nested.h"
#include "storage/copier/column_chunk.h"

using namespace kuzu::common;

namespace kuzu {
namespace storage {

Expand All @@ -13,13 +11,14 @@ struct VarListDataColumnChunk {
uint64_t capacity;

explicit VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataColumnChunk{std::move(dataChunk)}, capacity{StorageConstants::NODE_GROUP_SIZE} {}
: dataColumnChunk{std::move(dataChunk)}, capacity{
common::StorageConstants::NODE_GROUP_SIZE} {}

void reset();

void resizeBuffer(uint64_t numValues);

inline void append(ValueVector* dataVector) const {
inline void append(common::ValueVector* dataVector) const {
dataColumnChunk->append(dataVector, dataColumnChunk->getNumValues());
}

Expand All @@ -32,7 +31,8 @@ struct VarListDataColumnChunk {

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);
VarListColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataColumnChunk.get();
Expand All @@ -54,17 +54,18 @@ class VarListColumnChunk : public ColumnChunk {
return varListDataColumnChunk.dataColumnChunk->getNumPages() + ColumnChunk::getNumPages();
}

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;
void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

template<typename T>
void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto dataChunkOffsetToAppend = varListDataColumnChunk.getNumValues();
auto curListOffset = varListDataColumnChunk.getNumValues();
Expand All @@ -91,7 +92,7 @@ class VarListColumnChunk : public ColumnChunk {
return getListOffset(offset + 1) - getListOffset(offset);
}

inline offset_t getListOffset(common::offset_t offset) const {
inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}
};
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/in_mem_storage_structure/in_mem_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ class InMemColumn {
void flushChunk(InMemColumnChunk* chunk);

std::unique_ptr<InMemColumnChunk> createInMemColumnChunk(common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription) {
common::offset_t endNodeOffset, common::CSVReaderConfig* csvReaderConfig) {
auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr;
switch (dataType.getPhysicalType()) {
case common::PhysicalTypeID::STRING:
case common::PhysicalTypeID::VAR_LIST: {
return std::make_unique<InMemColumnChunkWithOverflow>(dataType, startNodeOffset,
endNodeOffset, std::move(copyDescription), inMemOverflowFile.get());
endNodeOffset, std::move(csvReaderConfigCopy), inMemOverflowFile.get());
}
case common::PhysicalTypeID::FIXED_LIST: {
return std::make_unique<InMemFixedListColumnChunk>(
dataType, startNodeOffset, endNodeOffset, std::move(copyDescription));
dataType, startNodeOffset, endNodeOffset, std::move(csvReaderConfigCopy));
}
default: {
return std::make_unique<InMemColumnChunk>(
dataType, startNodeOffset, endNodeOffset, std::move(copyDescription));
dataType, startNodeOffset, endNodeOffset, std::move(csvReaderConfigCopy));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct PropertyCopyState {
class InMemColumnChunk {
public:
InMemColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription,
common::offset_t endNodeOffset, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig,
bool requireNullBits = true);

virtual ~InMemColumnChunk() = default;
Expand Down Expand Up @@ -81,16 +81,16 @@ class InMemColumnChunk {
std::uint64_t numBytes;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<InMemColumnChunk> nullChunk;
std::unique_ptr<common::CopyDescription> copyDescription;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
};

class InMemColumnChunkWithOverflow : public InMemColumnChunk {
public:
InMemColumnChunkWithOverflow(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription,
common::offset_t endNodeOffset, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig,
InMemOverflowFile* inMemOverflowFile)
: InMemColumnChunk{std::move(dataType), startNodeOffset, endNodeOffset,
std::move(copyDescription)},
std::move(csvReaderConfig)},
inMemOverflowFile{inMemOverflowFile}, blobBuffer{std::make_unique<uint8_t[]>(
common::BufferPoolConstants::PAGE_4KB_SIZE)} {}

Expand Down Expand Up @@ -123,7 +123,7 @@ class InMemColumnChunkWithOverflow : public InMemColumnChunk {
class InMemFixedListColumnChunk : public InMemColumnChunk {
public:
InMemFixedListColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription);
common::offset_t endNodeOffset, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

void flush(common::FileInfo* walFileInfo) override;

Expand Down
16 changes: 8 additions & 8 deletions src/include/storage/in_mem_storage_structure/in_mem_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class InMemLists {
public:
InMemLists(std::string fName, common::LogicalType dataType, uint64_t numBytesForElement,
uint64_t numNodes, std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
std::unique_ptr<common::CopyDescription> copyDescription, bool hasNullBytes)
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullBytes)
: InMemLists{std::move(fName), numBytesForElement, std::move(dataType), numNodes,
std::move(copyDescription), hasNullBytes} {
std::move(csvReaderConfig), hasNullBytes} {
this->listHeadersBuilder = std::move(listHeadersBuilder);
}
virtual ~InMemLists() = default;
Expand Down Expand Up @@ -75,7 +75,7 @@ class InMemLists {

protected:
InMemLists(std::string fName, uint64_t numBytesForElement, common::LogicalType dataType,
uint64_t numNodes, std::unique_ptr<common::CopyDescription> copyDescription,
uint64_t numNodes, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig,
bool hasNullBytes);

private:
Expand Down Expand Up @@ -111,7 +111,7 @@ class InMemLists {
uint64_t numElementsInAPage;
std::unique_ptr<ListsMetadataBuilder> listsMetadataBuilder;
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder;
std::unique_ptr<common::CopyDescription> copyDescription;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
};

class InMemRelIDLists : public InMemLists {
Expand All @@ -127,7 +127,7 @@ class InMemListsWithOverflow : public InMemLists {
protected:
InMemListsWithOverflow(std::string fName, common::LogicalType dataType, uint64_t numNodes,
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
std::unique_ptr<common::CopyDescription> copyDescription);
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

void copyArrowArray(arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists,
arrow::Array* array, PropertyCopyState* copyState) final;
Expand Down Expand Up @@ -182,16 +182,16 @@ class InMemListLists : public InMemListsWithOverflow {
public:
InMemListLists(std::string fName, common::LogicalType dataType, uint64_t numNodes,
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
std::unique_ptr<common::CopyDescription> copyDescription)
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
: InMemListsWithOverflow{std::move(fName), std::move(dataType), numNodes,
std::move(listHeadersBuilder), std::move(copyDescription)} {};
std::move(listHeadersBuilder), std::move(csvReaderConfig)} {};
};

class InMemListsFactory {
public:
static std::unique_ptr<InMemLists> getInMemPropertyLists(const std::string& fName,
const common::LogicalType& dataType, uint64_t numNodes,
std::unique_ptr<common::CopyDescription> copyDescription,
common::CSVReaderConfig* csvReaderConfig,
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder = nullptr);
};

Expand Down
12 changes: 6 additions & 6 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(

static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
common::RelDataDirection direction, RelTableSchema* schema, NodesStore& nodesStore,
const std::string& outputDirectory, const CopyDescription* copyDescription) {
const std::string& outputDirectory, CSVReaderConfig* csvReaderConfig) {
auto directedInMemRelData = std::make_unique<DirectedInMemRelData>();
auto boundTableID = schema->getBoundTableID(direction);
auto numNodes =
Expand All @@ -109,7 +109,7 @@ static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
schema->tableID, direction, DBFileType::ORIGINAL),
LogicalType(LogicalTypeID::INTERNAL_ID));
relColumns->adjColumnChunk =
relColumns->adjColumn->createInMemColumnChunk(0, numNodes - 1, copyDescription->copy());
relColumns->adjColumn->createInMemColumnChunk(0, numNodes - 1, csvReaderConfig);
for (auto i = 0u; i < schema->getNumProperties(); ++i) {
auto propertyID = schema->properties[i]->getPropertyID();
auto propertyDataType = schema->properties[i]->getDataType();
Expand All @@ -119,7 +119,7 @@ static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
propertyID, std::make_unique<InMemColumn>(fName, *propertyDataType));
relColumns->propertyColumnChunks.emplace(
propertyID, relColumns->propertyColumns.at(propertyID)
->createInMemColumnChunk(0, numNodes - 1, copyDescription->copy()));
->createInMemColumnChunk(0, numNodes - 1, csvReaderConfig));
}
directedInMemRelData->setColumns(std::move(relColumns));
} else {
Expand All @@ -136,7 +136,7 @@ static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
direction, property->getPropertyID(), DBFileType::ORIGINAL);
relLists->propertyLists.emplace(property->getPropertyID(),
InMemListsFactory::getInMemPropertyLists(fName, *property->getDataType(), numNodes,
copyDescription->copy(), relLists->adjList->getListHeadersBuilder()));
csvReaderConfig, relLists->adjList->getListHeadersBuilder()));
}
directedInMemRelData->setRelLists(std::move(relLists));
}
Expand All @@ -151,10 +151,10 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyRelFrom(
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto fwdRelData = initializeDirectedInMemRelData(RelDataDirection::FWD, tableSchema,
storageManager.getNodesStore(), storageManager.getDirectory(),
copyFromInfo->fileScanInfo->copyDesc.get());
copyFromInfo->fileScanInfo->copyDesc->csvReaderConfig.get());
auto bwdRelData = initializeDirectedInMemRelData(RelDataDirection::BWD, tableSchema,
storageManager.getNodesStore(), storageManager.getDirectory(),
copyFromInfo->fileScanInfo->copyDesc.get());
copyFromInfo->fileScanInfo->copyDesc->csvReaderConfig.get());
auto copyRelSharedState = std::make_shared<CopyRelSharedState>(tableSchema->tableID,
&storageManager.getRelsStore().getRelsStatistics(), std::move(fwdRelData),
std::move(bwdRelData), memoryManager);
Expand Down
Loading