Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Sep 5, 2023
1 parent f088537 commit 552c3ba
Show file tree
Hide file tree
Showing 14 changed files with 64 additions and 247 deletions.
25 changes: 0 additions & 25 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,6 @@ using namespace kuzu::utf8proc;
namespace kuzu {
namespace common {

// =======
// // Copy From
// CopyDescription::CopyDescription(
// const std::vector<std::string>& filePaths, FileType fileType, CSVReaderConfig csvReaderConfig)
// : filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
// this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
// }
//
// // Copy To
// CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
// const std::vector<std::string>& columnNames,
// const std::vector<common::LogicalType>& columnTypes)
// : filePaths{filePaths}, fileType{fileType}, columnNames{columnNames}, columnTypes{columnTypes} {
// }
//
// CopyDescription::CopyDescription(const CopyDescription& copyDescription)
// : filePaths{copyDescription.filePaths},
// csvReaderConfig{nullptr}, fileType{copyDescription.fileType},
// columnNames{copyDescription.columnNames}, columnTypes{copyDescription.columnTypes} {
// if (copyDescription.csvReaderConfig != nullptr) {
// this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
// }
// }
//
// >>>>>>> 031529ee (Support Parquet files on COPY TO)
CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::string& extension) {
CopyDescription::FileType fileType = CopyDescription::fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
Expand Down
24 changes: 13 additions & 11 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,27 @@ struct CSVReaderConfig {
struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;

// Copy From
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {}

// Copy To
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes)
: fileType{fileType}, filePaths{filePaths}, columnNames{columnNames}, columnTypes{columnTypes}, csvReaderConfig{nullptr} {}
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes)
: fileType{fileType}, filePaths{filePaths}, columnNames{columnNames},
columnTypes{columnTypes}, csvReaderConfig{nullptr} {}

CopyDescription(const CopyDescription& other)
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} {
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames},
columnTypes{other.columnTypes} {
if (other.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*other.csvReaderConfig);
}
Expand All @@ -58,12 +66,6 @@ struct CopyDescription {
static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
const std::vector<common::LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};

} // namespace common
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "common/task_system/task_scheduler.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/persistent/csv_parquet_writer.h"
#include "processor/operator/persistent/parquet_writer.h"
#include "processor/operator/persistent/parquet_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"
Expand All @@ -16,11 +16,11 @@ class CSVParquetWriterSharedState {
public:
CSVParquetWriterSharedState(common::CopyDescription::FileType fileType) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVWriter>();
fileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
fileWriter = std::make_unique<kuzu::processor::ParquetWriter>();
fileWriter = std::make_unique<kuzu::processor::ParquetFileWriter>();
} else {
common::NotImplementedException(
throw common::NotImplementedException(
"CSVParquetWriterSharedState::CSVParquetWriterSharedState");

Check warning on line 24 in src/include/processor/operator/persistent/copy_to.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to.h#L23-L24

Added lines #L23 - L24 were not covered by tests
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/persistent/csv_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
namespace kuzu {
namespace processor {

class CSVWriter : public CSVParquetWriter {
class CSVFileWriter : public CSVParquetWriter {
public:
CSVWriter(){};
CSVFileWriter(){};
void openFile(const std::string& filePath) override;
void init() override;
inline void closeFile() override { flush(); }
Expand Down
38 changes: 0 additions & 38 deletions src/include/processor/operator/persistent/csv_writer.h

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace processor {
// - calculate the max definition levels and number of primitive nodes
// - initialize parquetColumnWriter
// writeValues : take a vector of ValueVector and pass to parquetColumnWriter
class ParquetWriter : public CSVParquetWriter {
class ParquetFileWriter : public CSVParquetWriter {
public:
ParquetWriter(){};
ParquetFileWriter(){};
void openFile(const std::string& filePath) override;
void init() override;
void closeFile() override;
Expand Down
125 changes: 0 additions & 125 deletions src/processor/map/map_copy.cpp

This file was deleted.

20 changes: 16 additions & 4 deletions src/processor/map/map_copy_to.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyTo(LogicalOperator* logical
for (auto& expression : childSchema->getExpressionsInScope()) {
vectorsToCopyPos.emplace_back(childSchema->getExpressionPos(*expression));
}
auto sharedState = std::make_shared<WriteCSVFileSharedState>();
return std::make_unique<CopyTo>(sharedState, std::move(vectorsToCopyPos),
*copy->getCopyDescription(), getOperatorID(), copy->getExpressionsForPrinting(),
std::move(prevOperator));
auto sharedState =
std::make_shared<CSVParquetWriterSharedState>(copy->getCopyDescription()->fileType);

auto copyTo = std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
sharedState, std::move(vectorsToCopyPos), *copy->getCopyDescription(), getOperatorID(),
copy->getExpressionsForPrinting(), std::move(prevOperator));
std::shared_ptr<FactorizedTable> fTable;
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
fTable = std::make_shared<FactorizedTable>(memoryManager, std::move(ftTableSchema));
return createFactorizedTableScan(binder::expression_vector{}, std::vector<ft_col_idx_t>{},
childSchema, fTable, 0, std::move(copyTo));

// return std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
// sharedState, std::move(vectorsToCopyPos),
// *copy->getCopyDescription(), getOperatorID(), copy->getExpressionsForPrinting(),
// std::move(prevOperator));
}

} // namespace processor
Expand Down
11 changes: 0 additions & 11 deletions src/processor/operator/copy_to/CMakeLists.txt

This file was deleted.

2 changes: 2 additions & 0 deletions src/processor/operator/persistent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ add_library(kuzu_processor_operator_persistent
insert.cpp
insert_executor.cpp
merge.cpp
parquet_column_writer.cpp
parquet_file_writer.cpp
reader.cpp
set.cpp
set_executor.cpp)
Expand Down
Loading

0 comments on commit 552c3ba

Please sign in to comment.