Skip to content

Commit

Permalink
clean
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Sep 1, 2023
1 parent af04df1 commit 9d4ce42
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 154 deletions.
29 changes: 12 additions & 17 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
}
auto copyDescription =
std::make_unique<CopyDescription>(fileType, CopyDescription::ExecutionMode::SERIAL,
std::vector<std::string>{boundFilePath}, columnNames);
auto copyDescription = std::make_unique<CopyDescription>(
fileType, std::vector<std::string>{boundFilePath}, columnNames);
return std::make_unique<BoundCopyTo>(std::move(copyDescription), std::move(query));
}

Expand Down Expand Up @@ -72,26 +71,22 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
// Bind execution mode.
// For CSV file, and table with SERIAL columns, we need to read in serial from files.
auto executionMode = CopyDescription::ExecutionMode::PARALLEL;
bool readInSerialMode = actualFileType == CopyDescription::FileType::CSV;
expression_vector columnExpressions;
if (actualFileType == CopyDescription::FileType::CSV) {
executionMode = CopyDescription::ExecutionMode::SERIAL;
} else {
for (auto& property : tableSchema->getProperties()) {
if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
columnExpressions.push_back(createVariable(
property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}));
} else {
executionMode = CopyDescription::ExecutionMode::SERIAL;
}
for (auto& property : tableSchema->getProperties()) {
if (property->getDataType()->getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
columnExpressions.push_back(createVariable(
property->getName(), common::LogicalType{common::LogicalTypeID::ARROW_COLUMN}));
} else {
readInSerialMode = true;
}
}
auto copyDescription = std::make_unique<CopyDescription>(
actualFileType, executionMode, boundFilePaths, std::move(csvReaderConfig));
actualFileType, boundFilePaths, std::move(csvReaderConfig));
auto nodeOffsetExpression =
createVariable("nodeOffset", common::LogicalType{common::LogicalTypeID::INT64});
return std::make_unique<BoundCopyFrom>(std::move(copyDescription), tableID, tableName,
std::move(columnExpressions), std::move(nodeOffsetExpression));
return std::make_unique<BoundCopyFrom>(std::move(copyDescription), tableSchema,
std::move(columnExpressions), std::move(nodeOffsetExpression), readInSerialMode);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand Down
20 changes: 10 additions & 10 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,29 @@ namespace binder {
class BoundCopyFrom : public BoundStatement {
public:
BoundCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
common::table_id_t tableID, std::string tableName, expression_vector columnExpressions,
std::shared_ptr<Expression> nodeOffsetExpression)
catalog::TableSchema* tableSchema, expression_vector columnExpressions,
std::shared_ptr<Expression> nodeOffsetExpression, bool readInSerialMode_)
: BoundStatement{common::StatementType::COPY_FROM,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)},
columnExpressions{std::move(columnExpressions)}, nodeOffsetExpression{
std::move(nodeOffsetExpression)} {}
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, readInSerialMode_{
readInSerialMode_} {}

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline common::table_id_t getTableID() const { return tableID; }
inline std::string getTableName() const { return tableName; }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
}
inline bool readInSerialMode() const { return readInSerialMode_; }

private:
std::unique_ptr<common::CopyDescription> copyDescription;
common::table_id_t tableID;
std::string tableName;
catalog::TableSchema* tableSchema;
expression_vector columnExpressions;
std::shared_ptr<Expression> nodeOffsetExpression;
bool readInSerialMode_;
};

} // namespace binder
Expand Down
20 changes: 8 additions & 12 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,25 @@ struct CSVReaderConfig {

struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };
enum class ExecutionMode : uint8_t { PARALLEL = 0, SERIAL = 1 };

FileType fileType;
ExecutionMode executionMode; // TODO: change this
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;

// Copy From
CopyDescription(FileType fileType, ExecutionMode executionMode,
const std::vector<std::string>& filePaths, std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, executionMode{executionMode}, filePaths{filePaths},
csvReaderConfig{std::move(csvReaderConfig)} {}
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {}

// Copy To
CopyDescription(FileType fileType, ExecutionMode executionMode,
const std::vector<std::string>& filePaths, const std::vector<std::string>& columnNames)
: fileType{fileType}, executionMode{executionMode}, filePaths{filePaths},
columnNames{columnNames}, csvReaderConfig{nullptr} {}
CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
const std::vector<std::string>& columnNames)
: fileType{fileType}, filePaths{filePaths}, columnNames{columnNames}, csvReaderConfig{
nullptr} {}

CopyDescription(const CopyDescription& other)
: fileType{other.fileType}, executionMode{other.executionMode}, filePaths{other.filePaths},
columnNames{other.columnNames} {
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} {
if (other.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*other.csvReaderConfig);
}
Expand Down
33 changes: 15 additions & 18 deletions src/include/planner/logical_plan/copy/logical_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ namespace planner {
class LogicalCopyFrom : public LogicalOperator {

public:
LogicalCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector columnExpressions,
LogicalCopyFrom(std::unique_ptr<common::CopyDescription> copyDescription,
catalog::TableSchema* tableSchema, binder::expression_vector columnExpressions,
std::shared_ptr<binder::Expression> nodeOffsetExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY_FROM}, copyDescription{std::move(copyDescription)},
tableID{tableID}, tableName{std::move(tableName)}, columnExpressions{std::move(
columnExpressions)},
nodeOffsetExpression{std::move(nodeOffsetExpression)}, outputExpression{
std::move(outputExpression)} {}
std::shared_ptr<binder::Expression> outputExpression, bool preservingOrder_)
: LogicalOperator{LogicalOperatorType::COPY_FROM},
copyDescription{std::move(copyDescription)}, tableSchema{tableSchema},
columnExpressions{std::move(columnExpressions)}, nodeOffsetExpression{std::move(
nodeOffsetExpression)},
outputExpression{std::move(outputExpression)}, preservingOrder_{preservingOrder_} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }
inline std::string getExpressionsForPrinting() const override { return tableSchema->tableName; }

inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }
inline common::table_id_t getTableID() const { return tableID; }
inline catalog::TableSchema* getTableSchema() const { return tableSchema; }
inline binder::expression_vector getColumnExpressions() const { return columnExpressions; }
inline std::shared_ptr<binder::Expression> getNodeOffsetExpression() const {
return nodeOffsetExpression;
Expand All @@ -32,26 +32,23 @@ class LogicalCopyFrom : public LogicalOperator {
return outputExpression;
}

inline bool isPreservingOrder() {
return copyDescription->executionMode == common::CopyDescription::ExecutionMode::SERIAL;
}
inline bool preservingOrder() { return preservingOrder_; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyFrom>(copyDescription->copy(), tableID, tableName, columnExpressions,
nodeOffsetExpression, outputExpression);
return make_unique<LogicalCopyFrom>(copyDescription->copy(), tableSchema, columnExpressions,
nodeOffsetExpression, outputExpression, preservingOrder_);
}

private:
std::unique_ptr<common::CopyDescription> copyDescription;
common::table_id_t tableID;
std::string tableName;
catalog::TableSchema* tableSchema;
binder::expression_vector columnExpressions;
std::shared_ptr<binder::Expression> nodeOffsetExpression;
std::shared_ptr<binder::Expression> outputExpression;
// bool preservingOrder;
bool preservingOrder_;
};

} // namespace planner
Expand Down
13 changes: 7 additions & 6 deletions src/include/planner/logical_plan/copy/logical_copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,28 @@ namespace planner {

class LogicalCopyTo : public LogicalOperator {
public:
LogicalCopyTo(
std::shared_ptr<LogicalOperator> child, const common::CopyDescription& copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, child}, copyDescription{copyDescription} {}
LogicalCopyTo(std::shared_ptr<LogicalOperator> child,
std::unique_ptr<common::CopyDescription> copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, std::move(child)},
copyDescription{std::move(copyDescription)} {}

f_group_pos_set getGroupsPosToFlatten();

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }
inline common::CopyDescription* getCopyDescription() const { return copyDescription.get(); }

void computeFactorizedSchema() override;

void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyTo>(children[0]->copy(), copyDescription);
return make_unique<LogicalCopyTo>(children[0]->copy(), copyDescription->copy());
}

private:
std::shared_ptr<binder::Expression> outputExpression;
common::CopyDescription copyDescription;
std::unique_ptr<common::CopyDescription> copyDescription;
};

} // namespace planner
Expand Down
3 changes: 1 addition & 2 deletions src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class Planner {
const storage::NodesStatisticsAndDeletedIDs& nodesStatistics,
const storage::RelsStatistics& relsStatistics, const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopyFrom(
const catalog::Catalog& catalog, const BoundStatement& statement);
static std::unique_ptr<LogicalPlan> planCopyFrom(const BoundStatement& statement);
};

} // namespace planner
Expand Down
37 changes: 25 additions & 12 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,29 @@ namespace processor {

struct ReaderInfo {
DataPos nodeOffsetPos;
std::vector<DataPos> dataColumnPoses;
std::vector<DataPos> dataColumnsPos;
bool isOrderPreserving;
storage::read_rows_func_t readFunc;
storage::init_reader_data_func_t initFunc;

ReaderInfo(
const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos, bool isOrderPreserving)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)},
isOrderPreserving{isOrderPreserving} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos}, dataColumnsPos{other.dataColumnsPos},
isOrderPreserving{other.isOrderPreserving} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }

inline std::unique_ptr<ReaderInfo> copy() const { return std::make_unique<ReaderInfo>(*this); }
};

class Reader : public PhysicalOperator {
public:
Reader(ReaderInfo readerInfo, std::shared_ptr<storage::ReaderSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString},
readerInfo{std::move(readerInfo)}, sharedState{std::move(sharedState)}, readFuncData{
nullptr} {}
Reader(std::unique_ptr<ReaderInfo> info,
std::shared_ptr<storage::ReaderSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString}, info{std::move(info)},
sharedState{std::move(sharedState)}, readFuncData{nullptr} {}

void initGlobalStateInternal(ExecutionContext* context) final;

Expand All @@ -29,7 +39,7 @@ class Reader : public PhysicalOperator {
inline bool isSource() const final { return true; }

inline std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<Reader>(readerInfo, sharedState, getOperatorID(), paramsString);
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

protected:
Expand All @@ -40,15 +50,18 @@ class Reader : public PhysicalOperator {
void getNextNodeGroupInParallel();

private:
ReaderInfo readerInfo;
std::unique_ptr<ReaderInfo> info;
std::shared_ptr<storage::ReaderSharedState> sharedState;

storage::LeftArrowArrays leftArrowArrays;

std::unique_ptr<common::DataChunk> dataChunk;
std::unique_ptr<common::DataChunk> dataChunk = nullptr;
common::ValueVector* nodeOffsetVector = nullptr;

storage::read_rows_func_t readFunc = nullptr;
storage::init_reader_data_func_t initFunc = nullptr;
// For parallel reading.
std::unique_ptr<storage::ReaderFunctionData> readFuncData;
std::unique_ptr<storage::ReaderFunctionData> readFuncData = nullptr;
};

} // namespace processor
Expand Down
17 changes: 8 additions & 9 deletions src/include/storage/copier/reader_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,14 @@ class ReaderSharedState {
friend class processor::Reader;

public:
ReaderSharedState(common::CopyDescription::FileType fileType,
std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: fileType{fileType}, filePaths{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema}, numRows{0}, currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} {
validateFunc = ReaderFunctions::getValidateFunc(fileType);
initFunc = ReaderFunctions::getInitDataFunc(fileType);
countBlocksFunc = ReaderFunctions::getCountBlocksFunc(fileType);
readFunc = ReaderFunctions::getReadRowsFunc(fileType);
ReaderSharedState(
std::unique_ptr<common::CopyDescription> copyDescription, catalog::TableSchema* tableSchema)
: copyDescription{std::move(copyDescription)}, tableSchema{tableSchema}, numRows{0},
currFileIdx{0}, currBlockIdx{0}, currRowIdx{0} {
validateFunc = ReaderFunctions::getValidateFunc(this->copyDescription->fileType);
initFunc = ReaderFunctions::getInitDataFunc(this->copyDescription->fileType);
countBlocksFunc = ReaderFunctions::getCountBlocksFunc(this->copyDescription->fileType);
readFunc = ReaderFunctions::getReadRowsFunc(this->copyDescription->fileType);
}

void validate();
Expand Down
4 changes: 2 additions & 2 deletions src/planner/operator/copy/logical_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace planner {
void LogicalCopyFrom::computeFactorizedSchema() {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(dataColumnExpressions, groupPos);
schema->insertToGroupAndScope(columnExpressions, groupPos);
schema->insertToGroupAndScope(nodeOffsetExpression, groupPos);
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
Expand All @@ -15,7 +15,7 @@ void LogicalCopyFrom::computeFactorizedSchema() {
void LogicalCopyFrom::computeFlatSchema() {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(dataColumnExpressions, 0);
schema->insertToGroupAndScope(columnExpressions, 0);
schema->insertToGroupAndScope(nodeOffsetExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
}
Expand Down
16 changes: 9 additions & 7 deletions src/planner/plan/plan_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
#include "planner/logical_plan/copy/logical_copy_to.h"
#include "planner/planner.h"

using namespace kuzu::storage;
using namespace kuzu::catalog;
using namespace kuzu::common;

namespace kuzu {
namespace planner {

std::unique_ptr<LogicalPlan> Planner::planCopyFrom(
const catalog::Catalog& catalog, const BoundStatement& statement) {
std::unique_ptr<LogicalPlan> Planner::planCopyFrom(const BoundStatement& statement) {
auto& copyClause = reinterpret_cast<const BoundCopyFrom&>(statement);
auto plan = std::make_unique<LogicalPlan>();
auto copy = make_shared<LogicalCopyFrom>(copyClause.getCopyDescription()->copy(),
copyClause.getTableID(), copyClause.getTableName(), copyClause.getColumnExpressions(),
copyClause.getTableSchema(), copyClause.getColumnExpressions(),
copyClause.getNodeOffsetExpression(),
copyClause.getStatementResult()->getSingleExpressionToCollect());
copyClause.getStatementResult()->getSingleExpressionToCollect(),
copyClause.readInSerialMode());
plan->setLastOperator(std::move(copy));
return plan;
}
Expand All @@ -28,11 +30,11 @@ std::unique_ptr<LogicalPlan> Planner::planCopyTo(const Catalog& catalog,
auto regularQuery = copyClause.getRegularQuery();
assert(regularQuery->getStatementType() == StatementType::QUERY);
auto plan = QueryPlanner(catalog, nodesStatistics, relsStatistics).getBestPlan(*regularQuery);
auto logicalCopyTo =
make_shared<LogicalCopyTo>(plan->getLastOperator(), copyClause.getCopyDescription());
auto logicalCopyTo = make_shared<LogicalCopyTo>(
plan->getLastOperator(), copyClause.getCopyDescription()->copy());
plan->setLastOperator(std::move(logicalCopyTo));
return plan;
}

} // namespace planner
} // namespace kuzu
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/planner/planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::unique_ptr<LogicalPlan> Planner::getBestPlan(const Catalog& catalog,
plan = planCreateRdfGraph(statement);
} break;
case StatementType::COPY_FROM: {
plan = planCopyFrom(catalog, statement);
plan = planCopyFrom(statement);
} break;
case StatementType::COPY_TO: {
plan = planCopyTo(catalog, nodesStatistics, relsStatistics, statement);
Expand Down
Loading

0 comments on commit 9d4ce42

Please sign in to comment.