Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error messages related to primary keys during copy #1830

Merged
merged 1 commit into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dataset/copy-fault-tests/invalid-row/eWatches.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
0,0
5,0
2,0
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
create node table person (ID INT64, gender INT32, PRIMARY KEY (ID));
create node table movie (ID INT64, length INT32, PRIMARY KEY (ID));
create rel table watch (FROM person TO movie);
1 change: 1 addition & 0 deletions dataset/copy-fault-tests/invalid-row/vMovie-valid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0,3232
2 changes: 2 additions & 0 deletions dataset/copy-fault-tests/invalid-row/vPerson-valid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0,4
2,12
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/null-pk/eLikes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Ed,0
Charlie,0
,0
Alice,0
2 changes: 2 additions & 0 deletions dataset/copy-fault-tests/null-pk/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
create node table person (fName STRING, PRIMARY KEY (fName));
create node table movie (ID INT64, length INT32, PRIMARY KEY (ID));
create rel table like (FROM person TO movie);
1 change: 1 addition & 0 deletions dataset/copy-fault-tests/null-pk/vMovie.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0,32
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/null-pk/vPerson-valid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Dave
Alice
Charlie
Ed
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/null-pk/vPerson2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Charlie
Adam

Dave
1 change: 1 addition & 0 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {

CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
Expand Down
3 changes: 1 addition & 2 deletions src/include/binder/copy/bound_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ class BoundCopy : public BoundStatement {
common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName)
: BoundStatement{common::StatementType::COPY,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)} {}
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

Expand Down
6 changes: 0 additions & 6 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ class CatalogException : public Exception {
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg){};
};

class HashIndexException : public Exception {
public:
explicit HashIndexException(const std::string& msg)
: Exception("HashIndex exception: " + msg){};
};

class StorageException : public Exception {
public:
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg){};
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ constexpr column_id_t INVALID_COLUMN_ID = INVALID_PROPERTY_ID;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;
constexpr block_idx_t INVALID_BLOCK_IDX = UINT64_MAX;
using field_idx_t = uint64_t;
using struct_field_idx_t = uint64_t;
using union_field_idx_t = uint64_t;
constexpr struct_field_idx_t INVALID_STRUCT_FIELD_IDX = UINT64_MAX;
using tuple_idx_t = uint64_t;
using row_idx_t = uint64_t;
constexpr row_idx_t INVALID_ROW_IDX = UINT64_MAX;
constexpr uint32_t UNDEFINED_CAST_COST = UINT32_MAX;

// System representation for a variable-sized overflow value.
Expand Down
25 changes: 16 additions & 9 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ class LogicalCopy : public LogicalOperator {
public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> columnIdxExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
arrowColumnExpressions{std::move(arrowColumnExpressions)}, offsetExpression{std::move(
offsetExpression)},
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
rowIdxExpression{std::move(rowIdxExpression)}, filePathExpression{std::move(
filePathExpression)},
columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{
std::move(outputExpression)} {}

Expand All @@ -32,8 +34,12 @@ class LogicalCopy : public LogicalOperator {
return arrowColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getOffsetExpression() const {
return offsetExpression;
inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
return rowIdxExpression;
}

inline std::shared_ptr<binder::Expression> getFilePathExpression() const {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getColumnIdxExpression() const {
Expand All @@ -49,7 +55,7 @@ class LogicalCopy : public LogicalOperator {

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, columnIdxExpression, outputExpression);
rowIdxExpression, filePathExpression, columnIdxExpression, outputExpression);
}

private:
Expand All @@ -58,7 +64,8 @@ class LogicalCopy : public LogicalOperator {
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> columnIdxExpression;
std::shared_ptr<binder::Expression> outputExpression;
};
Expand Down
27 changes: 17 additions & 10 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ class CopyNodeSharedState {
};

struct CopyNodeDataInfo {
DataPos offsetVectorPos;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
};

class CopyNode : public Sink {
public:
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
common::CopyDescription copyDesc, storage::NodeTable* table, storage::RelsStore* relsStore,
catalog::Catalog* catalog, storage::WAL* wal,
const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal} {
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
rowIdxVector{nullptr} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
Expand All @@ -58,7 +60,8 @@ class CopyNode : public Sink {
}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
offsetVector = resultSet->getValueVector(copyNodeDataInfo.offsetVectorPos).get();
rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
Expand All @@ -84,12 +87,14 @@ class CopyNode : public Sink {

protected:
void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);
common::offset_t startOffset, uint64_t numValues, const std::string& filePath,
common::row_idx_t startRowIdxInFile);

void logCopyWALRecord();

std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
std::pair<common::row_idx_t, common::row_idx_t> getStartAndEndRowIdx(
common::vector_idx_t columnIdx);
std::pair<std::string, common::row_idx_t> getFilePathAndRowIdxInFile();

private:
inline bool isCopyAllowed() {
Expand All @@ -100,10 +105,11 @@ class CopyNode : public Sink {

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
const std::string& filePath, common::row_idx_t startRowIdxInFile);

template<typename T, typename... Args>
void appendToPKIndex(storage::InMemColumnChunk* chunk, common::offset_t startOffset,
uint64_t appendToPKIndex(storage::InMemColumnChunk* chunk, common::offset_t startOffset,
uint64_t numValues, Args... args) {
throw common::CopyException("appendToPKIndex1 not implemented");
}
Expand All @@ -116,7 +122,8 @@ class CopyNode : public Sink {
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
common::ValueVector* offsetVector;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<std::unique_ptr<storage::PropertyCopyState>> copyStates;
};
Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/operator/copy/copy_npy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace processor {
class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
DataPos columnIdxPos, const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
const DataPos& columnIdxPos, const common::CopyDescription& copyDesc,
storage::NodeTable* table, storage::RelsStore* relsStore, catalog::Catalog* catalog,
storage::WAL* wal, std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(sharedState), std::move(copyNodeDataInfo), copyDesc, table, relsStore,
catalog, wal, std::move(resultSetDescriptor), std::move(child), id, paramsString},
Expand All @@ -19,17 +19,15 @@ class CopyNPYNode : public CopyNode {
void executeInternal(ExecutionContext* context) final;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final {
offsetVector = resultSet->getValueVector(copyNodeDataInfo.offsetVectorPos).get();
CopyNode::initLocalStateInternal(resultSet, context);
columnIdxVector = resultSet->getValueVector(columnIdxPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

void flushChunksAndPopulatePKIndexSingleColumn(
std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::vector_idx_t columnToCopy);
common::vector_idx_t columnToCopy, const std::string& filePath,
common::row_idx_t startRowIdxInFile);

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyNPYNode>(sharedState, copyNodeDataInfo, columnIdxPos, copyDesc,
Expand Down
19 changes: 10 additions & 9 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ namespace processor {
// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class ReadCSVMorsel : public ReadFileMorsel {
public:
ReadCSVMorsel(common::offset_t startOffset, std::string filePath,
ReadCSVMorsel(common::row_idx_t rowIdx, std::string filePath, common::row_idx_t rowIdxInFile,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: ReadFileMorsel{startOffset, BLOCK_IDX_INVALID, UINT64_MAX, std::move(filePath)},
: ReadFileMorsel{rowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX,
std::move(filePath), rowIdxInFile},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
Expand All @@ -24,7 +25,7 @@ class ReadCSVSharedState : public ReadFileSharedState {
}

private:
void countNumLines() override;
void countNumRows() override;

std::unique_ptr<ReadFileMorsel> getMorsel() override;

Expand All @@ -35,11 +36,11 @@ class ReadCSVSharedState : public ReadFileSharedState {

class ReadCSV : public ReadFile {
public:
ReadCSV(std::vector<DataPos> arrowColumnPoses, const DataPos& offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) override {
Expand All @@ -49,7 +50,7 @@ class ReadCSV : public ReadFile {

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(
arrowColumnPoses, offsetVectorPos, sharedState, id, paramsString);
rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString);
}
};

Expand Down
48 changes: 26 additions & 22 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,63 @@ namespace processor {

class ReadFileMorsel {
public:
static constexpr common::block_idx_t BLOCK_IDX_INVALID = UINT64_MAX;

ReadFileMorsel(common::offset_t nodeOffset, common::block_idx_t blockIdx, uint64_t numNodes,
std::string filePath)
: nodeOffset{nodeOffset}, blockIdx{blockIdx}, numNodes{numNodes}, filePath{std::move(
filePath)} {};
ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx,
common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile)
: rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)},
rowIdxInFile{rowIdxInFile} {};

virtual ~ReadFileMorsel() = default;

public:
common::offset_t nodeOffset;
// When reading from multiple files, rowIdx is accumulated.
common::row_idx_t rowIdx;
common::block_idx_t blockIdx;
uint64_t numNodes;
common::row_idx_t numRows;
std::string filePath;
// Row idx in the current file. Equal to `rowIdx` when reading from only a single file.
common::row_idx_t rowIdxInFile;
};

class ReadFileSharedState {
public:
explicit ReadFileSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: nodeOffset{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0} {}
: currRowIdx{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0}, currRowIdxInCurrFile{1} {}

virtual ~ReadFileSharedState() = default;

virtual void countNumLines() = 0;
virtual void countNumRows() = 0;

virtual std::unique_ptr<ReadFileMorsel> getMorsel() = 0;

public:
uint64_t numRows;
common::row_idx_t numRows;
catalog::TableSchema* tableSchema;

protected:
std::mutex mtx;
common::offset_t nodeOffset;
common::row_idx_t currRowIdx;
std::unordered_map<std::string, storage::FileBlockInfo> fileBlockInfos;
common::block_idx_t curBlockIdx;
std::vector<std::string> filePaths;
common::vector_idx_t curFileIdx;
common::row_idx_t currRowIdxInCurrFile;
};

class ReadFile : public PhysicalOperator {
public:
ReadFile(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, PhysicalOperatorType operatorType,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, arrowColumnPoses{std::move(
arrowColumnPoses)},
offsetVectorPos{std::move(offsetVectorPos)}, sharedState{std::move(sharedState)} {}
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)},
sharedState{std::move(sharedState)}, rowIdxVector{nullptr}, filePathVector{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override {
sharedState->countNumLines();
sharedState->countNumRows();
}

inline bool isSource() const override { return true; }
Expand All @@ -74,9 +76,11 @@ class ReadFile : public PhysicalOperator {

protected:
std::shared_ptr<ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
};

Expand Down
Loading
Loading