Skip to content

Commit

Permalink
Merge pull request #1767 from kuzudb/issue1738
Browse files Browse the repository at this point in the history
Fix issue 1738
  • Loading branch information
acquamarin committed Jul 6, 2023
2 parents 7bbea12 + 1610cd0 commit 9b49377
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 39 deletions.
9 changes: 4 additions & 5 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class ReadCSVMorsel : public ReadFileMorsel {

class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema} {}
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, std::vector<std::string> filePaths,
catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema}, csvReaderConfig{csvReaderConfig} {
}

private:
void countNumLines() override;
Expand All @@ -30,7 +30,6 @@ class ReadCSVSharedState : public ReadFileSharedState {

private:
common::CSVReaderConfig csvReaderConfig;
catalog::TableSchema* tableSchema;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

Expand Down
15 changes: 6 additions & 9 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ class ReadFileMorsel {

class ReadFileSharedState {
public:
explicit ReadFileSharedState(std::vector<std::string> filePaths)
: nodeOffset{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0}, numRows{
0} {}
explicit ReadFileSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: nodeOffset{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0} {}

virtual ~ReadFileSharedState() = default;

Expand All @@ -38,6 +39,7 @@ class ReadFileSharedState {

public:
uint64_t numRows;
catalog::TableSchema* tableSchema;

protected:
std::mutex mtx;
Expand All @@ -57,12 +59,7 @@ class ReadFile : public PhysicalOperator {
arrowColumnPoses)},
offsetVectorPos{std::move(offsetVectorPos)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
offsetVector = resultSet->getValueVector(offsetVectorPos).get();
for (auto& arrowColumnPos : arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override {
sharedState->countNumLines();
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ namespace processor {

class ReadParquetSharedState : public ReadFileSharedState {
public:
explicit ReadParquetSharedState(std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)} {}
explicit ReadParquetSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

private:
void countNumLines() override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class NodeCopier {
std::vector<std::shared_ptr<InMemColumn>> columns;
common::column_id_t pkColumnID;
common::column_id_t columnToCopy;
catalog::TableSchema* schema;
};

template<>
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/table_copy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TableCopyUtils {
static std::shared_ptr<arrow::csv::StreamingReader> createCSVReader(const std::string& filePath,
common::CSVReaderConfig* csvReaderConfig, catalog::TableSchema* tableSchema);
static std::unique_ptr<parquet::arrow::FileReader> createParquetReader(
const std::string& filePath);
const std::string& filePath, catalog::TableSchema* tableSchema);

static common::tuple_idx_t countNumLines(common::CopyDescription& copyDescription,
catalog::TableSchema* tableSchema,
Expand All @@ -132,6 +132,7 @@ class TableCopyUtils {
std::unordered_map<std::string, FileBlockInfo>& fileBlockInfos);
static std::unique_ptr<common::Value> convertStringToValue(std::string element,
const common::LogicalType& type, const common::CopyDescription& copyDescription);
static std::vector<std::string> getColumnNamesToRead(catalog::TableSchema* tableSchema);
};

} // namespace storage
Expand Down
9 changes: 5 additions & 4 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(Logic
}
auto offsetExpression = copy->getOffsetExpression();
auto offsetVectorPos = DataPos(outSchema->getExpressionPos(*offsetExpression));
auto nodeTableSchema =
catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID());
if (copy->getCopyDescription().fileType == common::CopyDescription::FileType::CSV) {
readFileSharedState =
std::make_shared<ReadCSVSharedState>(*copy->getCopyDescription().csvReaderConfig,
catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()),
copy->getCopyDescription().filePaths);
copy->getCopyDescription().filePaths, nodeTableSchema);
readFile = std::make_unique<ReadCSV>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
} else {
readFileSharedState =
std::make_shared<ReadParquetSharedState>(copy->getCopyDescription().filePaths);
readFileSharedState = std::make_shared<ReadParquetSharedState>(
copy->getCopyDescription().filePaths, nodeTableSchema);
readFile = std::make_unique<ReadParquet>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
}
Expand Down
7 changes: 7 additions & 0 deletions src/processor/operator/copy/read_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
namespace kuzu {
namespace processor {

void ReadFile::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
offsetVector = resultSet->getValueVector(offsetVectorPos).get();
for (auto& arrowColumnPos : arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) {
auto morsel = sharedState->getMorsel();
if (morsel == nullptr) {
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/copy/read_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace processor {
void ReadParquetSharedState::countNumLines() {
for (auto& filePath : filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader =
storage::TableCopyUtils::createParquetReader(filePath);
storage::TableCopyUtils::createParquetReader(filePath, tableSchema);
auto metadata = reader->parquet_reader()->metadata();
uint64_t numBlocks = metadata->num_row_groups();
std::vector<uint64_t> numLinesPerBlock(numBlocks);
Expand Down Expand Up @@ -47,7 +47,8 @@ std::shared_ptr<arrow::RecordBatch> ReadParquet::readTuples(
std::unique_ptr<ReadFileMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = storage::TableCopyUtils::createParquetReader(morsel->filePath);
reader = storage::TableCopyUtils::createParquetReader(
morsel->filePath, sharedState->tableSchema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ NodeCopier::NodeCopier(const std::string& directory, std::shared_ptr<CopySharedS
const CopyDescription& copyDesc, TableSchema* schema, tuple_idx_t numTuples,
column_id_t columnToCopy)
: sharedState{std::move(sharedState)}, copyDesc{copyDesc}, columnToCopy{columnToCopy},
pkColumnID{INVALID_COLUMN_ID} {
pkColumnID{INVALID_COLUMN_ID}, schema{schema} {
for (auto i = 0u; i < schema->properties.size(); i++) {
auto property = schema->properties[i];
if (property.dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
Expand Down Expand Up @@ -160,7 +160,7 @@ void CSVNodeCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
void ParquetNodeCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = TableCopyUtils::createParquetReader(morsel->filePath);
reader = TableCopyUtils::createParquetReader(morsel->filePath, schema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void RelListsCounterAndColumnCopier::buildRelListsHeaders(
void ParquetRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = TableCopyUtils::createParquetReader(morsel->filePath);
reader = TableCopyUtils::createParquetReader(morsel->filePath, schema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down Expand Up @@ -316,7 +316,7 @@ void RelListsCopier::finalize() {
void ParquetRelListsCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = TableCopyUtils::createParquetReader(morsel->filePath);
reader = TableCopyUtils::createParquetReader(morsel->filePath, schema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down
41 changes: 29 additions & 12 deletions src/storage/copier/table_copy_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ tuple_idx_t TableCopyUtils::countNumLinesParquet(CopyDescription& copyDescriptio
std::unordered_map<std::string, FileBlockInfo>& fileBlockInfos) {
tuple_idx_t numRows = 0;
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader = createParquetReader(filePath);
std::unique_ptr<parquet::arrow::FileReader> reader =
createParquetReader(filePath, tableSchema);
auto metadata = reader->parquet_reader()->metadata();
uint64_t numBlocks = metadata->num_row_groups();
std::vector<uint64_t> numLinesPerBlock(numBlocks);
Expand Down Expand Up @@ -166,20 +167,12 @@ std::shared_ptr<arrow::csv::StreamingReader> TableCopyUtils::createCSVReader(
throwCopyExceptionIfNotOK(arrow::io::ReadableFile::Open(filePath).Value(&inputStream));
auto csvReadOptions = arrow::csv::ReadOptions::Defaults();
csvReadOptions.block_size = CopyConstants::CSV_READING_BLOCK_SIZE;
if (!tableSchema->isNodeTable) {
csvReadOptions.column_names.emplace_back(Property::REL_FROM_PROPERTY_NAME);
csvReadOptions.column_names.emplace_back(Property::REL_TO_PROPERTY_NAME);
}
for (auto& property : tableSchema->properties) {
if (skipCopyForProperty(property)) {
continue;
}
csvReadOptions.column_names.push_back(property.name);
for (auto& columnName : getColumnNamesToRead(tableSchema)) {
csvReadOptions.column_names.push_back(columnName);
}
if (csvReaderConfig->hasHeader) {
csvReadOptions.skip_rows = 1;
}

auto csvParseOptions = arrow::csv::ParseOptions::Defaults();
csvParseOptions.delimiter = csvReaderConfig->delimiter;
csvParseOptions.escape_char = csvReaderConfig->escapeChar;
Expand Down Expand Up @@ -219,12 +212,21 @@ std::shared_ptr<arrow::csv::StreamingReader> TableCopyUtils::createCSVReader(
}

std::unique_ptr<parquet::arrow::FileReader> TableCopyUtils::createParquetReader(
const std::string& filePath) {
const std::string& filePath, TableSchema* tableSchema) {
std::shared_ptr<arrow::io::ReadableFile> infile;
throwCopyExceptionIfNotOK(arrow::io::ReadableFile::Open(filePath).Value(&infile));
std::unique_ptr<parquet::arrow::FileReader> reader;
throwCopyExceptionIfNotOK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
auto expectedNumColumns = getColumnNamesToRead(tableSchema).size();
auto actualNumColumns =
reader->parquet_reader()->metadata()->schema()->group_node()->field_count();
if (expectedNumColumns != actualNumColumns) {
// Note: Some parquet files may contain an index column.
throw common::CopyException(common::StringUtils::string_format(
"Unmatched number of columns in parquet file. Expect: {}, got: {}.", expectedNumColumns,
actualNumColumns));
}
return reader;
}

Expand Down Expand Up @@ -434,5 +436,20 @@ std::unique_ptr<Value> TableCopyUtils::convertStringToValue(
return value;
}

std::vector<std::string> TableCopyUtils::getColumnNamesToRead(catalog::TableSchema* tableSchema) {
std::vector<std::string> columnNamesToRead;
if (!tableSchema->isNodeTable) {
columnNamesToRead.emplace_back(Property::REL_FROM_PROPERTY_NAME);
columnNamesToRead.emplace_back(Property::REL_TO_PROPERTY_NAME);
}
for (auto& property : tableSchema->properties) {
if (skipCopyForProperty(property)) {
continue;
}
columnNamesToRead.push_back(property.name);
}
return columnNamesToRead;
}

} // namespace storage
} // namespace kuzu
16 changes: 16 additions & 0 deletions test/test_files/exceptions/copy/wrong_header.test
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,19 @@ Copy exception: COPY commands can only be executed once on a table.
-STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/wrong-header/eKnowsMissingColumn.csv" (HEADER=true)
---- error
Copy exception: Invalid: CSV parse error: Expected 4 columns, got 3: 10,24,1

-CASE NodeUnmatchedNumColumns
-STATEMENT create node table person (ID1 SERIAL, ID INT64, fName INT64, age INT64, PRIMARY KEY (ID1))
---- ok
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_1.parquet" (HEADER=true)
---- error
Copy exception: Unmatched number of columns in parquet file. Expect: 3, got: 13.

-CASE RelUnmatchedNumColumns
-STATEMENT create node table person (ID1 SERIAL, ID INT64, fName INT64, age INT64, PRIMARY KEY (ID1))
---- ok
-STATEMENT create rel table knows (FROM person TO person, time date, age INT64)
---- ok
-STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/demo-db/parquet/follows.parquet" (HEADER=true)
---- error
Copy exception: Unmatched number of columns in parquet file. Expect: 4, got: 3.

0 comments on commit 9b49377

Please sign in to comment.