Skip to content

Commit

Permalink
better error message during copy
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 19, 2023
1 parent 1e9d4fd commit db75e30
Show file tree
Hide file tree
Showing 46 changed files with 435 additions and 489 deletions.
3 changes: 3 additions & 0 deletions dataset/copy-fault-tests/invalid-row/eWatches.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
0,0
5,0
2,0
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
create node table person (ID INT64, gender INT32, PRIMARY KEY (ID));
create node table movie (ID INT64, length INT32, PRIMARY KEY (ID));
create rel table watch (FROM person TO movie);
1 change: 1 addition & 0 deletions dataset/copy-fault-tests/invalid-row/vMovie-valid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0,3232
2 changes: 2 additions & 0 deletions dataset/copy-fault-tests/invalid-row/vPerson-valid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
0,4
2,12
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/null-pk/eLikes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Ed,0
Charlie,0
,0
Alice,0
2 changes: 2 additions & 0 deletions dataset/copy-fault-tests/null-pk/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
create node table person (fName STRING, PRIMARY KEY (fName));
create node table movie (ID INT64, length INT32, PRIMARY KEY (ID));
create rel table like (FROM person TO movie);
1 change: 1 addition & 0 deletions dataset/copy-fault-tests/null-pk/vMovie.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0,32
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/null-pk/vPerson-valid.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Dave
Alice
Charlie
Ed
4 changes: 4 additions & 0 deletions dataset/copy-fault-tests/null-pk/vPerson2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Charlie
Adam

Dave
1 change: 1 addition & 0 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {

CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
Expand Down
3 changes: 1 addition & 2 deletions src/include/binder/copy/bound_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ class BoundCopy : public BoundStatement {
common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName)
: BoundStatement{common::StatementType::COPY,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)} {}
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

Expand Down
6 changes: 0 additions & 6 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ class CatalogException : public Exception {
explicit CatalogException(const std::string& msg) : Exception("Catalog exception: " + msg){};
};

class HashIndexException : public Exception {
public:
explicit HashIndexException(const std::string& msg)
: Exception("HashIndex exception: " + msg){};
};

class StorageException : public Exception {
public:
explicit StorageException(const std::string& msg) : Exception("Storage exception: " + msg){};
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ constexpr column_id_t INVALID_COLUMN_ID = INVALID_PROPERTY_ID;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;
constexpr block_idx_t INVALID_BLOCK_IDX = UINT64_MAX;
using field_idx_t = uint64_t;
using struct_field_idx_t = uint64_t;
using union_field_idx_t = uint64_t;
constexpr struct_field_idx_t INVALID_STRUCT_FIELD_IDX = UINT64_MAX;
using tuple_idx_t = uint64_t;
using row_idx_t = uint64_t;
constexpr row_idx_t INVALID_ROW_IDX = UINT64_MAX;
constexpr uint32_t UNDEFINED_CAST_COST = UINT32_MAX;

// System representation for a variable-sized overflow value.
Expand Down
25 changes: 16 additions & 9 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ class LogicalCopy : public LogicalOperator {
public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> columnIdxExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
arrowColumnExpressions{std::move(arrowColumnExpressions)}, offsetExpression{std::move(
offsetExpression)},
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
rowIdxExpression{std::move(rowIdxExpression)}, filePathExpression{std::move(
filePathExpression)},
columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{
std::move(outputExpression)} {}

Expand All @@ -32,8 +34,12 @@ class LogicalCopy : public LogicalOperator {
return arrowColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getOffsetExpression() const {
return offsetExpression;
inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
return rowIdxExpression;
}

inline std::shared_ptr<binder::Expression> getFilePathExpression() const {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getColumnIdxExpression() const {
Expand All @@ -49,7 +55,7 @@ class LogicalCopy : public LogicalOperator {

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, columnIdxExpression, outputExpression);
rowIdxExpression, filePathExpression, columnIdxExpression, outputExpression);
}

private:
Expand All @@ -58,7 +64,8 @@ class LogicalCopy : public LogicalOperator {
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> columnIdxExpression;
std::shared_ptr<binder::Expression> outputExpression;
};
Expand Down
27 changes: 17 additions & 10 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ class CopyNodeSharedState {
};

struct CopyNodeDataInfo {
DataPos offsetVectorPos;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
};

class CopyNode : public Sink {
public:
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
common::CopyDescription copyDesc, storage::NodeTable* table, storage::RelsStore* relsStore,
catalog::Catalog* catalog, storage::WAL* wal,
const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal} {
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
rowIdxVector{nullptr} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
Expand All @@ -58,7 +60,8 @@ class CopyNode : public Sink {
}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
offsetVector = resultSet->getValueVector(copyNodeDataInfo.offsetVectorPos).get();
rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
Expand All @@ -84,12 +87,14 @@ class CopyNode : public Sink {

protected:
void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);
common::offset_t startOffset, uint64_t numValues, const std::string& filePath,
common::row_idx_t startRowIdxInFile);

void logCopyWALRecord();

std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
std::pair<common::row_idx_t, common::row_idx_t> getStartAndEndRowIdx(
common::vector_idx_t columnIdx);
std::pair<std::string, common::row_idx_t> getFilePathAndRowIdxInFile();

private:
inline bool isCopyAllowed() {
Expand All @@ -100,10 +105,11 @@ class CopyNode : public Sink {

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
const std::string& filePath, common::row_idx_t startRowIdxInFile);

template<typename T, typename... Args>
void appendToPKIndex(storage::InMemColumnChunk* chunk, common::offset_t startOffset,
uint64_t appendToPKIndex(storage::InMemColumnChunk* chunk, common::offset_t startOffset,
uint64_t numValues, Args... args) {
throw common::CopyException("appendToPKIndex1 not implemented");
}
Expand All @@ -116,7 +122,8 @@ class CopyNode : public Sink {
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
common::ValueVector* offsetVector;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<std::unique_ptr<storage::PropertyCopyState>> copyStates;
};
Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/operator/copy/copy_npy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace processor {
class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
DataPos columnIdxPos, const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
const DataPos& columnIdxPos, const common::CopyDescription& copyDesc,
storage::NodeTable* table, storage::RelsStore* relsStore, catalog::Catalog* catalog,
storage::WAL* wal, std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(sharedState), std::move(copyNodeDataInfo), copyDesc, table, relsStore,
catalog, wal, std::move(resultSetDescriptor), std::move(child), id, paramsString},
Expand All @@ -19,17 +19,15 @@ class CopyNPYNode : public CopyNode {
void executeInternal(ExecutionContext* context) final;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final {
offsetVector = resultSet->getValueVector(copyNodeDataInfo.offsetVectorPos).get();
CopyNode::initLocalStateInternal(resultSet, context);
columnIdxVector = resultSet->getValueVector(columnIdxPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

void flushChunksAndPopulatePKIndexSingleColumn(
std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::vector_idx_t columnToCopy);
common::vector_idx_t columnToCopy, const std::string& filePath,
common::row_idx_t startRowIdxInFile);

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyNPYNode>(sharedState, copyNodeDataInfo, columnIdxPos, copyDesc,
Expand Down
19 changes: 10 additions & 9 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ namespace processor {
// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class ReadCSVMorsel : public ReadFileMorsel {
public:
ReadCSVMorsel(common::offset_t startOffset, std::string filePath,
ReadCSVMorsel(common::row_idx_t rowIdx, std::string filePath, common::row_idx_t rowIdxInFile,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: ReadFileMorsel{startOffset, BLOCK_IDX_INVALID, UINT64_MAX, std::move(filePath)},
: ReadFileMorsel{rowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX,
std::move(filePath), rowIdxInFile},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
Expand All @@ -24,7 +25,7 @@ class ReadCSVSharedState : public ReadFileSharedState {
}

private:
void countNumLines() override;
void countNumRows() override;

std::unique_ptr<ReadFileMorsel> getMorsel() override;

Expand All @@ -35,11 +36,11 @@ class ReadCSVSharedState : public ReadFileSharedState {

class ReadCSV : public ReadFile {
public:
ReadCSV(std::vector<DataPos> arrowColumnPoses, const DataPos& offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) override {
Expand All @@ -49,7 +50,7 @@ class ReadCSV : public ReadFile {

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(
arrowColumnPoses, offsetVectorPos, sharedState, id, paramsString);
rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString);
}
};

Expand Down
48 changes: 26 additions & 22 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,63 @@ namespace processor {

class ReadFileMorsel {
public:
static constexpr common::block_idx_t BLOCK_IDX_INVALID = UINT64_MAX;

ReadFileMorsel(common::offset_t nodeOffset, common::block_idx_t blockIdx, uint64_t numNodes,
std::string filePath)
: nodeOffset{nodeOffset}, blockIdx{blockIdx}, numNodes{numNodes}, filePath{std::move(
filePath)} {};
ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx,
common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile)
: rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)},
rowIdxInFile{rowIdxInFile} {};

virtual ~ReadFileMorsel() = default;

public:
common::offset_t nodeOffset;
// When reading from multiple files, rowIdx is accumulated.
common::row_idx_t rowIdx;
common::block_idx_t blockIdx;
uint64_t numNodes;
common::row_idx_t numRows;
std::string filePath;
// Row idx in the current file. Equal to `rowIdx` when reading from only a single file.
common::row_idx_t rowIdxInFile;
};

class ReadFileSharedState {
public:
explicit ReadFileSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: nodeOffset{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0} {}
: currRowIdx{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0}, currRowIdxInCurrFile{1} {}

virtual ~ReadFileSharedState() = default;

virtual void countNumLines() = 0;
virtual void countNumRows() = 0;

virtual std::unique_ptr<ReadFileMorsel> getMorsel() = 0;

public:
uint64_t numRows;
common::row_idx_t numRows;
catalog::TableSchema* tableSchema;

protected:
std::mutex mtx;
common::offset_t nodeOffset;
common::row_idx_t currRowIdx;
std::unordered_map<std::string, storage::FileBlockInfo> fileBlockInfos;
common::block_idx_t curBlockIdx;
std::vector<std::string> filePaths;
common::vector_idx_t curFileIdx;
common::row_idx_t currRowIdxInCurrFile;
};

class ReadFile : public PhysicalOperator {
public:
ReadFile(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, PhysicalOperatorType operatorType,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, arrowColumnPoses{std::move(
arrowColumnPoses)},
offsetVectorPos{std::move(offsetVectorPos)}, sharedState{std::move(sharedState)} {}
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)},
sharedState{std::move(sharedState)}, rowIdxVector{nullptr}, filePathVector{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override {
sharedState->countNumLines();
sharedState->countNumRows();
}

inline bool isSource() const override { return true; }
Expand All @@ -74,9 +76,11 @@ class ReadFile : public PhysicalOperator {

protected:
std::shared_ptr<ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
};

Expand Down
Loading

0 comments on commit db75e30

Please sign in to comment.