Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 1738 #1767

Merged
merged 1 commit into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ class ReadCSVMorsel : public ReadFileMorsel {

class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema} {}
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, std::vector<std::string> filePaths,
catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema}, csvReaderConfig{csvReaderConfig} {
}

private:
void countNumLines() override;
Expand All @@ -30,7 +30,6 @@ class ReadCSVSharedState : public ReadFileSharedState {

private:
common::CSVReaderConfig csvReaderConfig;
catalog::TableSchema* tableSchema;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

Expand Down
15 changes: 6 additions & 9 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ class ReadFileMorsel {

class ReadFileSharedState {
public:
explicit ReadFileSharedState(std::vector<std::string> filePaths)
: nodeOffset{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0}, numRows{
0} {}
explicit ReadFileSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: nodeOffset{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0} {}

virtual ~ReadFileSharedState() = default;

Expand All @@ -38,6 +39,7 @@ class ReadFileSharedState {

public:
uint64_t numRows;
catalog::TableSchema* tableSchema;

protected:
std::mutex mtx;
Expand All @@ -57,12 +59,7 @@ class ReadFile : public PhysicalOperator {
arrowColumnPoses)},
offsetVectorPos{std::move(offsetVectorPos)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
offsetVector = resultSet->getValueVector(offsetVectorPos).get();
for (auto& arrowColumnPos : arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override {
sharedState->countNumLines();
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ namespace processor {

class ReadParquetSharedState : public ReadFileSharedState {
public:
explicit ReadParquetSharedState(std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)} {}
explicit ReadParquetSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

private:
void countNumLines() override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class NodeCopier {
std::vector<std::shared_ptr<InMemColumn>> columns;
common::column_id_t pkColumnID;
common::column_id_t columnToCopy;
catalog::TableSchema* schema;
};

template<>
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/copier/table_copy_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TableCopyUtils {
static std::shared_ptr<arrow::csv::StreamingReader> createCSVReader(const std::string& filePath,
common::CSVReaderConfig* csvReaderConfig, catalog::TableSchema* tableSchema);
static std::unique_ptr<parquet::arrow::FileReader> createParquetReader(
const std::string& filePath);
const std::string& filePath, catalog::TableSchema* tableSchema);

static common::tuple_idx_t countNumLines(common::CopyDescription& copyDescription,
catalog::TableSchema* tableSchema,
Expand All @@ -132,6 +132,7 @@ class TableCopyUtils {
std::unordered_map<std::string, FileBlockInfo>& fileBlockInfos);
static std::unique_ptr<common::Value> convertStringToValue(std::string element,
const common::LogicalType& type, const common::CopyDescription& copyDescription);
static std::vector<std::string> getColumnNamesToRead(catalog::TableSchema* tableSchema);
};

} // namespace storage
Expand Down
9 changes: 5 additions & 4 deletions src/processor/mapper/map_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(Logic
}
auto offsetExpression = copy->getOffsetExpression();
auto offsetVectorPos = DataPos(outSchema->getExpressionPos(*offsetExpression));
auto nodeTableSchema =
catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID());
if (copy->getCopyDescription().fileType == common::CopyDescription::FileType::CSV) {
readFileSharedState =
std::make_shared<ReadCSVSharedState>(*copy->getCopyDescription().csvReaderConfig,
catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID()),
copy->getCopyDescription().filePaths);
copy->getCopyDescription().filePaths, nodeTableSchema);
readFile = std::make_unique<ReadCSV>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
} else {
readFileSharedState =
std::make_shared<ReadParquetSharedState>(copy->getCopyDescription().filePaths);
readFileSharedState = std::make_shared<ReadParquetSharedState>(
copy->getCopyDescription().filePaths, nodeTableSchema);
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
readFile = std::make_unique<ReadParquet>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
}
Expand Down
7 changes: 7 additions & 0 deletions src/processor/operator/copy/read_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
namespace kuzu {
namespace processor {

void ReadFile::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
offsetVector = resultSet->getValueVector(offsetVectorPos).get();
for (auto& arrowColumnPos : arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) {
auto morsel = sharedState->getMorsel();
if (morsel == nullptr) {
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/copy/read_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace processor {
void ReadParquetSharedState::countNumLines() {
for (auto& filePath : filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader =
storage::TableCopyUtils::createParquetReader(filePath);
storage::TableCopyUtils::createParquetReader(filePath, tableSchema);
auto metadata = reader->parquet_reader()->metadata();
uint64_t numBlocks = metadata->num_row_groups();
std::vector<uint64_t> numLinesPerBlock(numBlocks);
Expand Down Expand Up @@ -47,7 +47,8 @@ std::shared_ptr<arrow::RecordBatch> ReadParquet::readTuples(
std::unique_ptr<ReadFileMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = storage::TableCopyUtils::createParquetReader(morsel->filePath);
reader = storage::TableCopyUtils::createParquetReader(
morsel->filePath, sharedState->tableSchema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ NodeCopier::NodeCopier(const std::string& directory, std::shared_ptr<CopySharedS
const CopyDescription& copyDesc, TableSchema* schema, tuple_idx_t numTuples,
column_id_t columnToCopy)
: sharedState{std::move(sharedState)}, copyDesc{copyDesc}, columnToCopy{columnToCopy},
pkColumnID{INVALID_COLUMN_ID} {
pkColumnID{INVALID_COLUMN_ID}, schema{schema} {
for (auto i = 0u; i < schema->properties.size(); i++) {
auto property = schema->properties[i];
if (property.dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
Expand Down Expand Up @@ -160,7 +160,7 @@ void CSVNodeCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
void ParquetNodeCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = TableCopyUtils::createParquetReader(morsel->filePath);
reader = TableCopyUtils::createParquetReader(morsel->filePath, schema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/copier/rel_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void RelListsCounterAndColumnCopier::buildRelListsHeaders(
void ParquetRelListsCounterAndColumnsCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = TableCopyUtils::createParquetReader(morsel->filePath);
reader = TableCopyUtils::createParquetReader(morsel->filePath, schema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down Expand Up @@ -316,7 +316,7 @@ void RelListsCopier::finalize() {
void ParquetRelListsCopier::executeInternal(std::unique_ptr<CopyMorsel> morsel) {
assert(!morsel->filePath.empty());
if (!reader || filePath != morsel->filePath) {
reader = TableCopyUtils::createParquetReader(morsel->filePath);
reader = TableCopyUtils::createParquetReader(morsel->filePath, schema);
filePath = morsel->filePath;
}
std::shared_ptr<arrow::Table> table;
Expand Down
41 changes: 29 additions & 12 deletions src/storage/copier/table_copy_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ tuple_idx_t TableCopyUtils::countNumLinesParquet(CopyDescription& copyDescriptio
std::unordered_map<std::string, FileBlockInfo>& fileBlockInfos) {
tuple_idx_t numRows = 0;
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> reader = createParquetReader(filePath);
std::unique_ptr<parquet::arrow::FileReader> reader =
createParquetReader(filePath, tableSchema);
auto metadata = reader->parquet_reader()->metadata();
uint64_t numBlocks = metadata->num_row_groups();
std::vector<uint64_t> numLinesPerBlock(numBlocks);
Expand Down Expand Up @@ -166,20 +167,12 @@ std::shared_ptr<arrow::csv::StreamingReader> TableCopyUtils::createCSVReader(
throwCopyExceptionIfNotOK(arrow::io::ReadableFile::Open(filePath).Value(&inputStream));
auto csvReadOptions = arrow::csv::ReadOptions::Defaults();
csvReadOptions.block_size = CopyConstants::CSV_READING_BLOCK_SIZE;
if (!tableSchema->isNodeTable) {
csvReadOptions.column_names.emplace_back(Property::REL_FROM_PROPERTY_NAME);
csvReadOptions.column_names.emplace_back(Property::REL_TO_PROPERTY_NAME);
}
for (auto& property : tableSchema->properties) {
if (skipCopyForProperty(property)) {
continue;
}
csvReadOptions.column_names.push_back(property.name);
for (auto& columnName : getColumnNamesToRead(tableSchema)) {
csvReadOptions.column_names.push_back(columnName);
}
if (csvReaderConfig->hasHeader) {
csvReadOptions.skip_rows = 1;
}

auto csvParseOptions = arrow::csv::ParseOptions::Defaults();
csvParseOptions.delimiter = csvReaderConfig->delimiter;
csvParseOptions.escape_char = csvReaderConfig->escapeChar;
Expand Down Expand Up @@ -219,12 +212,21 @@ std::shared_ptr<arrow::csv::StreamingReader> TableCopyUtils::createCSVReader(
}

std::unique_ptr<parquet::arrow::FileReader> TableCopyUtils::createParquetReader(
const std::string& filePath) {
const std::string& filePath, TableSchema* tableSchema) {
std::shared_ptr<arrow::io::ReadableFile> infile;
throwCopyExceptionIfNotOK(arrow::io::ReadableFile::Open(filePath).Value(&infile));
std::unique_ptr<parquet::arrow::FileReader> reader;
throwCopyExceptionIfNotOK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
auto expectedNumColumns = getColumnNamesToRead(tableSchema).size();
auto actualNumColumns =
reader->parquet_reader()->metadata()->schema()->group_node()->field_count();
if (expectedNumColumns != actualNumColumns) {
// Note: Some parquet files may contain an index column.
throw common::CopyException(common::StringUtils::string_format(
"Unmatched number of columns in parquet file. Expect: {}, got: {}.", expectedNumColumns,
actualNumColumns));
}
return reader;
}

Expand Down Expand Up @@ -434,5 +436,20 @@ std::unique_ptr<Value> TableCopyUtils::convertStringToValue(
return value;
}

std::vector<std::string> TableCopyUtils::getColumnNamesToRead(catalog::TableSchema* tableSchema) {
std::vector<std::string> columnNamesToRead;
if (!tableSchema->isNodeTable) {
columnNamesToRead.emplace_back(Property::REL_FROM_PROPERTY_NAME);
columnNamesToRead.emplace_back(Property::REL_TO_PROPERTY_NAME);
}
for (auto& property : tableSchema->properties) {
if (skipCopyForProperty(property)) {
continue;
}
columnNamesToRead.push_back(property.name);
}
return columnNamesToRead;
}

} // namespace storage
} // namespace kuzu
16 changes: 16 additions & 0 deletions test/test_files/exceptions/copy/wrong_header.test
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,19 @@ Copy exception: COPY commands can only be executed once on a table.
-STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-fault-tests/wrong-header/eKnowsMissingColumn.csv" (HEADER=true)
---- error
Copy exception: Invalid: CSV parse error: Expected 4 columns, got 3: 10,24,1

-CASE NodeUnmatchedNumColumns
-STATEMENT create node table person (ID1 SERIAL, ID INT64, fName INT64, age INT64, PRIMARY KEY (ID1))
---- ok
-STATEMENT COPY person FROM "${KUZU_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k_1.parquet" (HEADER=true)
---- error
Copy exception: Unmatched number of columns in parquet file. Expect: 3, got: 13.

-CASE RelUnmatchedNumColumns
-STATEMENT create node table person (ID1 SERIAL, ID INT64, fName INT64, age INT64, PRIMARY KEY (ID1))
---- ok
-STATEMENT create rel table knows (FROM person TO person, time date, age INT64)
---- ok
-STATEMENT COPY knows FROM "${KUZU_ROOT_DIRECTORY}/dataset/demo-db/parquet/follows.parquet" (HEADER=true)
---- error
Copy exception: Unmatched number of columns in parquet file. Expect: 4, got: 3.
Loading