Skip to content

Commit

Permalink
Merge pull request #2046 from kuzudb/replace-copy-description-with-cs…
Browse files Browse the repository at this point in the history
…v-reader-config-in-storage

Replace CopyDescription with CSVReaderConfig for storage classes.
  • Loading branch information
andyfengHKU committed Sep 18, 2023
2 parents 2e460d6 + 79f0c5a commit 1c63a71
Show file tree
Hide file tree
Showing 18 changed files with 123 additions and 103 deletions.
20 changes: 14 additions & 6 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@ namespace kuzu {
namespace common {

struct CSVReaderConfig {
char escapeChar;
char delimiter;
char quoteChar;
char listBeginChar;
char listEndChar;
bool hasHeader;

CSVReaderConfig()
: escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR},
delimiter{CopyConstants::DEFAULT_CSV_DELIMITER},
quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR},
listBeginChar{CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR},
listEndChar{CopyConstants::DEFAULT_CSV_LIST_END_CHAR},
hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {}
CSVReaderConfig(const CSVReaderConfig& other)
: escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar},
listBeginChar{other.listBeginChar},
listEndChar{other.listEndChar}, hasHeader{other.hasHeader} {}

char escapeChar;
char delimiter;
char quoteChar;
char listBeginChar;
char listEndChar;
bool hasHeader;
inline std::unique_ptr<CSVReaderConfig> copy() const {
return std::make_unique<CSVReaderConfig>(*this);
}
};

struct CopyDescription {
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
localNodeGroup =
std::make_unique<storage::NodeGroup>(sharedState->tableSchema, &sharedState->copyDesc);
localNodeGroup = std::make_unique<storage::NodeGroup>(
sharedState->tableSchema, sharedState->copyDesc.csvReaderConfig.get());
}

void initGlobalStateInternal(ExecutionContext* context) final;
Expand Down
20 changes: 11 additions & 9 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class ColumnChunk {

// ColumnChunks must be initialized after construction, so this constructor should only be used
// through the ColumnChunkFactory
explicit ColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription,
bool hasNullChunk = true);
explicit ColumnChunk(common::LogicalType dataType,
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true);
virtual ~ColumnChunk() = default;

template<typename T>
Expand Down Expand Up @@ -158,7 +158,7 @@ class ColumnChunk {
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<NullColumnChunk> nullChunk;
std::vector<std::unique_ptr<ColumnChunk>> childrenChunks;
const common::CopyDescription* copyDescription;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
uint64_t numValues;
};

Expand All @@ -176,9 +176,10 @@ inline bool ColumnChunk::getValue(common::offset_t pos) const {

class BoolColumnChunk : public ColumnChunk {
public:
BoolColumnChunk(common::CopyDescription* copyDescription, bool hasNullChunk = true)
: ColumnChunk(
common::LogicalType(common::LogicalTypeID::BOOL), copyDescription, hasNullChunk) {}
BoolColumnChunk(
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullChunk = true)
: ColumnChunk(common::LogicalType(common::LogicalTypeID::BOOL), std::move(csvReaderConfig),
hasNullChunk) {}

void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;

Expand Down Expand Up @@ -237,8 +238,9 @@ class NullColumnChunk : public BoolColumnChunk {

class FixedListColumnChunk : public ColumnChunk {
public:
FixedListColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription)
: ColumnChunk(std::move(dataType), copyDescription, true /* hasNullChunk */) {}
FixedListColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
: ColumnChunk(std::move(dataType), std::move(csvReaderConfig), true /* hasNullChunk */) {}

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;
Expand All @@ -262,7 +264,7 @@ class SerialColumnChunk : public ColumnChunk {

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(
const common::LogicalType& dataType, common::CopyDescription* copyDescription = nullptr);
const common::LogicalType& dataType, common::CSVReaderConfig* csvReaderConfig = nullptr);
};

template<>
Expand Down
3 changes: 1 addition & 2 deletions src/include/storage/copier/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ class NodeTable;

class NodeGroup {
public:
explicit NodeGroup(
catalog::TableSchema* schema, common::CopyDescription* copyDescription = nullptr);
explicit NodeGroup(catalog::TableSchema* schema, common::CSVReaderConfig* csvReaderConfig);
explicit NodeGroup(NodeTable* table);

inline void setNodeGroupIdx(uint64_t nodeGroupIdx_) { this->nodeGroupIdx = nodeGroupIdx_; }
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/string_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace storage {

class StringColumnChunk : public ColumnChunk {
public:
StringColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StringColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

void resetToEmpty() final;
void append(common::ValueVector* vector, common::offset_t startPosInChunk) final;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ namespace storage {

class StructColumnChunk : public ColumnChunk {
public:
StructColumnChunk(common::LogicalType dataType, common::CopyDescription* copyDescription);
StructColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

protected:
void append(
Expand Down
19 changes: 10 additions & 9 deletions src/include/storage/copier/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include "arrow/array/array_nested.h"
#include "storage/copier/column_chunk.h"

using namespace kuzu::common;

namespace kuzu {
namespace storage {

Expand All @@ -13,13 +11,14 @@ struct VarListDataColumnChunk {
uint64_t capacity;

explicit VarListDataColumnChunk(std::unique_ptr<ColumnChunk> dataChunk)
: dataColumnChunk{std::move(dataChunk)}, capacity{StorageConstants::NODE_GROUP_SIZE} {}
: dataColumnChunk{std::move(dataChunk)}, capacity{
common::StorageConstants::NODE_GROUP_SIZE} {}

void reset();

void resizeBuffer(uint64_t numValues);

inline void append(ValueVector* dataVector) const {
inline void append(common::ValueVector* dataVector) const {
dataColumnChunk->append(dataVector, dataColumnChunk->getNumValues());
}

Expand All @@ -32,7 +31,8 @@ struct VarListDataColumnChunk {

class VarListColumnChunk : public ColumnChunk {
public:
VarListColumnChunk(LogicalType dataType, CopyDescription* copyDescription);
VarListColumnChunk(
common::LogicalType dataType, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

inline ColumnChunk* getDataColumnChunk() const {
return varListDataColumnChunk.dataColumnChunk.get();
Expand All @@ -54,17 +54,18 @@ class VarListColumnChunk : public ColumnChunk {
return varListDataColumnChunk.dataColumnChunk->getNumPages() + ColumnChunk::getNumPages();
}

void append(arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) override;
void append(
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) override;

void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
common::offset_t startPosInChunk, uint32_t numValuesToAppend) final;

void copyVarListFromArrowString(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend);
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend);

template<typename T>
void copyVarListFromArrowList(
arrow::Array* array, offset_t startPosInChunk, uint32_t numValuesToAppend) {
arrow::Array* array, common::offset_t startPosInChunk, uint32_t numValuesToAppend) {
auto listArray = (T*)array;
auto dataChunkOffsetToAppend = varListDataColumnChunk.getNumValues();
auto curListOffset = varListDataColumnChunk.getNumValues();
Expand All @@ -91,7 +92,7 @@ class VarListColumnChunk : public ColumnChunk {
return getListOffset(offset + 1) - getListOffset(offset);
}

inline offset_t getListOffset(common::offset_t offset) const {
inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}
};
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/in_mem_storage_structure/in_mem_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ class InMemColumn {
void flushChunk(InMemColumnChunk* chunk);

std::unique_ptr<InMemColumnChunk> createInMemColumnChunk(common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription) {
common::offset_t endNodeOffset, common::CSVReaderConfig* csvReaderConfig) {
auto csvReaderConfigCopy = csvReaderConfig ? csvReaderConfig->copy() : nullptr;
switch (dataType.getPhysicalType()) {
case common::PhysicalTypeID::STRING:
case common::PhysicalTypeID::VAR_LIST: {
return std::make_unique<InMemColumnChunkWithOverflow>(dataType, startNodeOffset,
endNodeOffset, std::move(copyDescription), inMemOverflowFile.get());
endNodeOffset, std::move(csvReaderConfigCopy), inMemOverflowFile.get());
}
case common::PhysicalTypeID::FIXED_LIST: {
return std::make_unique<InMemFixedListColumnChunk>(
dataType, startNodeOffset, endNodeOffset, std::move(copyDescription));
dataType, startNodeOffset, endNodeOffset, std::move(csvReaderConfigCopy));
}
default: {
return std::make_unique<InMemColumnChunk>(
dataType, startNodeOffset, endNodeOffset, std::move(copyDescription));
dataType, startNodeOffset, endNodeOffset, std::move(csvReaderConfigCopy));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct PropertyCopyState {
class InMemColumnChunk {
public:
InMemColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription,
common::offset_t endNodeOffset, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig,
bool requireNullBits = true);

virtual ~InMemColumnChunk() = default;
Expand Down Expand Up @@ -81,16 +81,16 @@ class InMemColumnChunk {
std::uint64_t numBytes;
std::unique_ptr<uint8_t[]> buffer;
std::unique_ptr<InMemColumnChunk> nullChunk;
std::unique_ptr<common::CopyDescription> copyDescription;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
};

class InMemColumnChunkWithOverflow : public InMemColumnChunk {
public:
InMemColumnChunkWithOverflow(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription,
common::offset_t endNodeOffset, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig,
InMemOverflowFile* inMemOverflowFile)
: InMemColumnChunk{std::move(dataType), startNodeOffset, endNodeOffset,
std::move(copyDescription)},
std::move(csvReaderConfig)},
inMemOverflowFile{inMemOverflowFile}, blobBuffer{std::make_unique<uint8_t[]>(
common::BufferPoolConstants::PAGE_4KB_SIZE)} {}

Expand Down Expand Up @@ -123,7 +123,7 @@ class InMemColumnChunkWithOverflow : public InMemColumnChunk {
class InMemFixedListColumnChunk : public InMemColumnChunk {
public:
InMemFixedListColumnChunk(common::LogicalType dataType, common::offset_t startNodeOffset,
common::offset_t endNodeOffset, std::unique_ptr<common::CopyDescription> copyDescription);
common::offset_t endNodeOffset, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

void flush(common::FileInfo* walFileInfo) override;

Expand Down
16 changes: 8 additions & 8 deletions src/include/storage/in_mem_storage_structure/in_mem_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class InMemLists {
public:
InMemLists(std::string fName, common::LogicalType dataType, uint64_t numBytesForElement,
uint64_t numNodes, std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
std::unique_ptr<common::CopyDescription> copyDescription, bool hasNullBytes)
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig, bool hasNullBytes)
: InMemLists{std::move(fName), numBytesForElement, std::move(dataType), numNodes,
std::move(copyDescription), hasNullBytes} {
std::move(csvReaderConfig), hasNullBytes} {
this->listHeadersBuilder = std::move(listHeadersBuilder);
}
virtual ~InMemLists() = default;
Expand Down Expand Up @@ -75,7 +75,7 @@ class InMemLists {

protected:
InMemLists(std::string fName, uint64_t numBytesForElement, common::LogicalType dataType,
uint64_t numNodes, std::unique_ptr<common::CopyDescription> copyDescription,
uint64_t numNodes, std::unique_ptr<common::CSVReaderConfig> csvReaderConfig,
bool hasNullBytes);

private:
Expand Down Expand Up @@ -111,7 +111,7 @@ class InMemLists {
uint64_t numElementsInAPage;
std::unique_ptr<ListsMetadataBuilder> listsMetadataBuilder;
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder;
std::unique_ptr<common::CopyDescription> copyDescription;
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig;
};

class InMemRelIDLists : public InMemLists {
Expand All @@ -127,7 +127,7 @@ class InMemListsWithOverflow : public InMemLists {
protected:
InMemListsWithOverflow(std::string fName, common::LogicalType dataType, uint64_t numNodes,
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
std::unique_ptr<common::CopyDescription> copyDescription);
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig);

void copyArrowArray(arrow::Array* boundNodeOffsets, arrow::Array* posInRelLists,
arrow::Array* array, PropertyCopyState* copyState) final;
Expand Down Expand Up @@ -182,16 +182,16 @@ class InMemListLists : public InMemListsWithOverflow {
public:
InMemListLists(std::string fName, common::LogicalType dataType, uint64_t numNodes,
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder,
std::unique_ptr<common::CopyDescription> copyDescription)
std::unique_ptr<common::CSVReaderConfig> csvReaderConfig)
: InMemListsWithOverflow{std::move(fName), std::move(dataType), numNodes,
std::move(listHeadersBuilder), std::move(copyDescription)} {};
std::move(listHeadersBuilder), std::move(csvReaderConfig)} {};
};

class InMemListsFactory {
public:
static std::unique_ptr<InMemLists> getInMemPropertyLists(const std::string& fName,
const common::LogicalType& dataType, uint64_t numNodes,
std::unique_ptr<common::CopyDescription> copyDescription,
common::CSVReaderConfig* csvReaderConfig,
std::shared_ptr<ListHeadersBuilder> listHeadersBuilder = nullptr);
};

Expand Down
12 changes: 6 additions & 6 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRelColumnsOrLists(

static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
common::RelDataDirection direction, RelTableSchema* schema, NodesStore& nodesStore,
const std::string& outputDirectory, const CopyDescription* copyDescription) {
const std::string& outputDirectory, CSVReaderConfig* csvReaderConfig) {
auto directedInMemRelData = std::make_unique<DirectedInMemRelData>();
auto boundTableID = schema->getBoundTableID(direction);
auto numNodes =
Expand All @@ -109,7 +109,7 @@ static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
schema->tableID, direction, DBFileType::ORIGINAL),
LogicalType(LogicalTypeID::INTERNAL_ID));
relColumns->adjColumnChunk =
relColumns->adjColumn->createInMemColumnChunk(0, numNodes - 1, copyDescription->copy());
relColumns->adjColumn->createInMemColumnChunk(0, numNodes - 1, csvReaderConfig);
for (auto i = 0u; i < schema->getNumProperties(); ++i) {
auto propertyID = schema->properties[i]->getPropertyID();
auto propertyDataType = schema->properties[i]->getDataType();
Expand All @@ -119,7 +119,7 @@ static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
propertyID, std::make_unique<InMemColumn>(fName, *propertyDataType));
relColumns->propertyColumnChunks.emplace(
propertyID, relColumns->propertyColumns.at(propertyID)
->createInMemColumnChunk(0, numNodes - 1, copyDescription->copy()));
->createInMemColumnChunk(0, numNodes - 1, csvReaderConfig));
}
directedInMemRelData->setColumns(std::move(relColumns));
} else {
Expand All @@ -136,7 +136,7 @@ static std::unique_ptr<DirectedInMemRelData> initializeDirectedInMemRelData(
direction, property->getPropertyID(), DBFileType::ORIGINAL);
relLists->propertyLists.emplace(property->getPropertyID(),
InMemListsFactory::getInMemPropertyLists(fName, *property->getDataType(), numNodes,
copyDescription->copy(), relLists->adjList->getListHeadersBuilder()));
csvReaderConfig, relLists->adjList->getListHeadersBuilder()));
}
directedInMemRelData->setRelLists(std::move(relLists));
}
Expand All @@ -151,10 +151,10 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyRelFrom(
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto fwdRelData = initializeDirectedInMemRelData(RelDataDirection::FWD, tableSchema,
storageManager.getNodesStore(), storageManager.getDirectory(),
copyFromInfo->fileScanInfo->copyDesc.get());
copyFromInfo->fileScanInfo->copyDesc->csvReaderConfig.get());
auto bwdRelData = initializeDirectedInMemRelData(RelDataDirection::BWD, tableSchema,
storageManager.getNodesStore(), storageManager.getDirectory(),
copyFromInfo->fileScanInfo->copyDesc.get());
copyFromInfo->fileScanInfo->copyDesc->csvReaderConfig.get());
auto copyRelSharedState = std::make_shared<CopyRelSharedState>(tableSchema->tableID,
&storageManager.getRelsStore().getRelsStatistics(), std::move(fwdRelData),
std::move(bwdRelData), memoryManager);
Expand Down
Loading

0 comments on commit 1c63a71

Please sign in to comment.