Skip to content

Commit

Permalink
X
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Nov 14, 2023
1 parent eb1739c commit 1713ef7
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 124 deletions.
35 changes: 9 additions & 26 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ static void validateCopyNpyNotForRelTables(TableSchema* schema) {
}
}

// static bool bindContainsSerial(TableSchema* tableSchema) {
// bool containsSerial = false;
// for (auto& property : tableSchema->properties) {
// if (property->getDataType()->getLogicalTypeID() == LogicalTypeID::SERIAL) {
// containsSerial = true;
// break;
// }
// }
// return containsSerial;
//}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = (const CopyFrom&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
Expand Down Expand Up @@ -138,8 +127,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statem
LogicalType(LogicalTypeID::INT64), InternalKeyword::ANONYMOUS);
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(tableSchema,
std::move(boundFileScanInfo), containsSerial, std::move(columns), nullptr /* extraInfo */);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand All @@ -162,7 +151,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement&
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), common::InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ROW_OFFSET));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, offset);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
Expand All @@ -173,19 +162,13 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement&
auto srcKey = columns[0];
auto dstKey = columns[1];
auto srcNodeID =
createVariable(std::string(Property::REL_BOUND_OFFSET_NAME), LogicalTypeID::INT64);
createVariable(std::string(InternalKeyword::SRC_OFFSET), LogicalTypeID::INT64);
auto dstNodeID =
createVariable(std::string(Property::REL_NBR_OFFSET_NAME), LogicalTypeID::INT64);
createVariable(std::string(InternalKeyword::DST_OFFSET), LogicalTypeID::INT64);
auto extraCopyRelInfo = std::make_unique<ExtraBoundCopyRelInfo>(
srcTableSchema, dstTableSchema, srcNodeID, dstNodeID, srcKey, dstKey);
// Skip the first two columns.
expression_vector columnsToCopy{std::move(srcNodeID), std::move(dstNodeID), std::move(offset)};
for (auto i = NUM_COLUMNS_TO_SKIP_IN_REL_FILE; i < columns.size(); i++) {
columnsToCopy.push_back(columns[i]);
}
auto boundCopyFromInfo =
std::make_unique<BoundCopyFromInfo>(tableSchema, std::move(boundFileScanInfo),
containsSerial, std::move(columnsToCopy), std::move(extraCopyRelInfo));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, std::move(extraCopyRelInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down Expand Up @@ -248,8 +231,8 @@ void Binder::bindExpectedRelColumns(TableSchema* tableSchema,
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getSrcTableID()));
auto dstTable = reinterpret_cast<NodeTableSchema*>(
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getDstTableID()));
columnNames.push_back(std::string(Property::REL_FROM_PROPERTY_NAME));
columnNames.push_back(std::string(Property::REL_TO_PROPERTY_NAME));
columnNames.push_back("from");
columnNames.push_back("to");
auto srcPKColumnType = srcTable->getPrimaryKey()->getDataType()->copy();
if (srcPKColumnType->getLogicalTypeID() == LogicalTypeID::SERIAL) {
srcPKColumnType = std::make_unique<LogicalType>(LogicalTypeID::INT64);
Expand Down
4 changes: 2 additions & 2 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ std::unique_ptr<BoundReadingClause> Binder::bindInQueryCall(const ReadingClause&
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), common::InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ROW_OFFSET));
return std::make_unique<BoundInQueryCall>(
std::move(tableFunction), std::move(bindData), std::move(columns), offset);
}
Expand Down Expand Up @@ -166,7 +166,7 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), common::InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ROW_OFFSET));
auto info = std::make_unique<BoundFileScanInfo>(
scanFunction, std::move(bindData), std::move(columns), std::move(offset));
auto boundLoadFrom = std::make_unique<BoundLoadFrom>(std::move(info));
Expand Down
17 changes: 9 additions & 8 deletions src/binder/bind/copy/bind_copy_rdf_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(const Statement& /*s
auto stringType = LogicalType{LogicalTypeID::STRING};
std::vector<std::string> columnNames;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
columnNames.push_back(std::string(InternalKeyword::ANONYMOUS));
columnNames.emplace_back(rdf::IRI);
if (tableSchema->tableName.ends_with(rdf::RESOURCE_TABLE_SUFFIX)) {
containsSerial = false;
columnTypes.push_back(stringType.copy());
Expand All @@ -42,11 +42,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(const Statement& /*s
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), common::InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), InternalKeyword::ROW_OFFSET);
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(tableSchema,
std::move(boundFileScanInfo), containsSerial, std::move(columns), nullptr /* extraInfo */);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand All @@ -55,10 +55,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(const Statement& /*st
auto func = getScanFunction(config->fileType, config->csvReaderConfig->parallel);
auto containsSerial = false;
std::vector<std::string> columnNames;
columnNames.emplace_back(InternalKeyword::SRC_OFFSET);
columnNames.emplace_back(rdf::PID);
columnNames.emplace_back(InternalKeyword::DST_OFFSET);
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
for (auto i = 0u; i < 3; ++i) {
auto columnName = std::string(InternalKeyword::ANONYMOUS) + std::to_string(i);
columnNames.push_back(columnName);
columnTypes.push_back(std::make_unique<LogicalType>(LogicalTypeID::INT64));
}
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
Expand All @@ -79,14 +80,14 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(const Statement& /*st
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), common::InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), InternalKeyword::ROW_OFFSET);
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, offset);
auto extraInfo = std::make_unique<ExtraBoundCopyRdfRelInfo>(columns[0], columns[2]);
expression_vector columnsToCopy = {columns[0], columns[2], offset, columns[1]};
auto boundCopyFromInfo =
std::make_unique<BoundCopyFromInfo>(tableSchema, std::move(boundFileScanInfo),
containsSerial, std::move(columnsToCopy), std::move(extraInfo));
containsSerial, std::move(extraInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down
10 changes: 2 additions & 8 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,16 @@ struct BoundCopyFromInfo {
catalog::TableSchema* tableSchema;
std::unique_ptr<BoundFileScanInfo> fileScanInfo;
bool containsSerial;
expression_vector columns;
std::unique_ptr<ExtraBoundCopyFromInfo> extraInfo;

BoundCopyFromInfo(catalog::TableSchema* tableSchema,
std::unique_ptr<BoundFileScanInfo> fileScanInfo, bool containsSerial,
expression_vector columns, std::unique_ptr<ExtraBoundCopyFromInfo> extraInfo)
std::unique_ptr<ExtraBoundCopyFromInfo> extraInfo)
: tableSchema{tableSchema}, fileScanInfo{std::move(fileScanInfo)},
containsSerial{containsSerial}, columns{std::move(columns)}, extraInfo{
std::move(extraInfo)} {}
containsSerial{containsSerial}, extraInfo{std::move(extraInfo)} {}
BoundCopyFromInfo(const BoundCopyFromInfo& other)
: tableSchema{other.tableSchema}, fileScanInfo{other.fileScanInfo->copy()},
containsSerial{other.containsSerial} {
columns.reserve(other.columns.size());
for (auto& column : other.columns) {
columns.push_back(column->copy());
}
if (other.extraInfo) {
extraInfo = other.extraInfo->copy();
}
Expand Down
9 changes: 4 additions & 5 deletions src/include/binder/copy/bound_file_scan_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ struct BoundFileScanInfo {
function::TableFunction* copyFunc;
std::unique_ptr<function::TableFuncBindData> bindData;
binder::expression_vector columns;
// TODO: rename
std::shared_ptr<Expression> internalID;
std::shared_ptr<Expression> offset;

BoundFileScanInfo(function::TableFunction* copyFunc,
std::unique_ptr<function::TableFuncBindData> bindData, binder::expression_vector columns,
std::shared_ptr<Expression> internalID)
std::shared_ptr<Expression> offset)
: copyFunc{copyFunc}, bindData{std::move(bindData)}, columns{std::move(columns)},
internalID{std::move(internalID)} {}
offset{std::move(offset)} {}
BoundFileScanInfo(const BoundFileScanInfo& other)
: copyFunc{other.copyFunc}, bindData{other.bindData->copy()}, columns{other.columns},
internalID{other.internalID} {}
offset{other.offset} {}

inline std::unique_ptr<BoundFileScanInfo> copy() const {
return std::make_unique<BoundFileScanInfo>(*this);
Expand Down
9 changes: 4 additions & 5 deletions src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ namespace catalog {
class Property {
public:
// TODO: these should be guarded as reserved property names.
static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_FROM_";
static constexpr std::string_view REL_TO_PROPERTY_NAME = "_TO_";
static constexpr std::string_view INTERNAL_ID_NAME = "_ID_";
static constexpr std::string_view REL_BOUND_OFFSET_NAME = "_BOUND_OFFSET_";
static constexpr std::string_view REL_NBR_OFFSET_NAME = "_NBR_OFFSET_";
// static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_from_";
// static constexpr std::string_view REL_TO_PROPERTY_NAME = "_to_";
// static constexpr std::string_view REL_SRC_OFFSET_NAME = "_src_offset_";
// static constexpr std::string_view REL_DST_OFFSET_NAME = "_nbr_offset_";

Property(std::string name, std::unique_ptr<common::LogicalType> dataType)
: Property{std::move(name), std::move(dataType), common::INVALID_PROPERTY_ID,
Expand Down
5 changes: 0 additions & 5 deletions src/include/catalog/rel_table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@ class RelTableSchema : public TableSchema {
inline common::table_id_t getBoundTableID(common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? srcTableID : dstTableID;
}

inline common::table_id_t getNbrTableID(common::RelDataDirection relDirection) const {
return relDirection == common::RelDataDirection::FWD ? dstTableID : srcTableID;
}

inline RelMultiplicity getRelMultiplicity() const { return relMultiplicity; }

inline common::table_id_t getSrcTableID() const { return srcTableID; }

inline common::table_id_t getDstTableID() const { return dstTableID; }

static std::unique_ptr<RelTableSchema> deserialize(common::Deserializer& deserializer);
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ struct InternalKeyword {
static constexpr char PLACE_HOLDER[] = "_PLACE_HOLDER";
static constexpr char MAP_KEY[] = "KEY";
static constexpr char MAP_VALUE[] = "VALUE";

static constexpr char ROW_OFFSET[] = "_row_offset";
static constexpr char SRC_OFFSET[] = "_src_offset";
static constexpr char DST_OFFSET[] = "_dst_offset";
};

enum PageSizeClass : uint8_t {
Expand Down
10 changes: 7 additions & 3 deletions src/include/planner/operator/logical_partitioner.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "binder/copy/bound_copy_from.h"
#include "catalog/table_schema.h"
#include "common/column_data_format.h"
#include "planner/operator/logical_operator.h"
Expand All @@ -14,10 +15,13 @@ struct LogicalPartitionerInfo {
catalog::TableSchema* tableSchema;

LogicalPartitionerInfo(std::shared_ptr<binder::Expression> key,
binder::expression_vector payloads, common::ColumnDataFormat dataFormat, catalog::TableSchema* tableSchema)
: key{std::move(key)}, payloads{std::move(payloads)}, dataFormat{dataFormat}, tableSchema{tableSchema} {}
binder::expression_vector payloads, common::ColumnDataFormat dataFormat,
catalog::TableSchema* tableSchema)
: key{std::move(key)}, payloads{std::move(payloads)}, dataFormat{dataFormat},
tableSchema{tableSchema} {}
LogicalPartitionerInfo(const LogicalPartitionerInfo& other)
: key{other.key}, payloads{other.payloads}, dataFormat{other.dataFormat}, tableSchema{other.tableSchema} {}
: key{other.key}, payloads{other.payloads}, dataFormat{other.dataFormat},
tableSchema{other.tableSchema} {}

inline std::unique_ptr<LogicalPartitionerInfo> copy() {
return std::make_unique<LogicalPartitionerInfo>(*this);
Expand Down
7 changes: 4 additions & 3 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ struct PartitioningInfo {

PartitioningInfo(DataPos keyDataPos, std::vector<DataPos> columnDataPositions,
common::logical_types_t columnTypes, partitioner_func_t partitionerFunc)
: keyDataPos{keyDataPos}, columnDataPositions{std::move(columnDataPositions)}, columnTypes{std::move(columnTypes)},
partitionerFunc{partitionerFunc} {}
: keyDataPos{keyDataPos}, columnDataPositions{std::move(columnDataPositions)},
columnTypes{std::move(columnTypes)}, partitionerFunc{partitionerFunc} {}
inline std::unique_ptr<PartitioningInfo> copy() {
return std::make_unique<PartitioningInfo>(keyDataPos, columnDataPositions, common::LogicalType::copy(columnTypes), partitionerFunc);
return std::make_unique<PartitioningInfo>(keyDataPos, columnDataPositions,
common::LogicalType::copy(columnTypes), partitionerFunc);
}

static std::vector<std::unique_ptr<PartitioningInfo>> copy(
Expand Down
4 changes: 2 additions & 2 deletions src/planner/operator/scan/logical_scan_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ void LogicalScanFile::computeFactorizedSchema() {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(info->columns, groupPos);
schema->insertToGroupAndScope(info->internalID, groupPos);
schema->insertToGroupAndScope(info->offset, groupPos);
}

void LogicalScanFile::computeFlatSchema() {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(info->columns, 0);
schema->insertToGroupAndScope(info->internalID, 0);
schema->insertToGroupAndScope(info->offset, 0);
}

} // namespace planner
Expand Down
18 changes: 12 additions & 6 deletions src/planner/plan/plan_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,38 @@ static void appendPartitioner(BoundCopyFromInfo* copyFromInfo, LogicalPlan& plan
reinterpret_cast<function::ScanBindData*>(copyFromInfo->fileScanInfo->bindData.get())
->config;
auto fileType = readerConfig.fileType;
auto payloads = copyFromInfo->fileScanInfo->columns;
payloads.push_back(copyFromInfo->fileScanInfo->offset);
// TODO(Xiyang): Merge TURTLE case with other data types.
switch (fileType) {
case FileType::TURTLE: {
auto extraInfo = reinterpret_cast<ExtraBoundCopyRdfRelInfo*>(copyFromInfo->extraInfo.get());
infos.push_back(std::make_unique<LogicalPartitionerInfo>(
extraInfo->subjectOffset, copyFromInfo->columns, ColumnDataFormat::CSR, nullptr));
extraInfo->subjectOffset, payloads, ColumnDataFormat::CSR, copyFromInfo->tableSchema));
infos.push_back(std::make_unique<LogicalPartitionerInfo>(
extraInfo->objectOffset, copyFromInfo->columns, ColumnDataFormat::CSR, nullptr));
extraInfo->objectOffset, payloads, ColumnDataFormat::CSR, copyFromInfo->tableSchema));
} break;
case FileType::CSV:
case FileType::NPY:
case FileType::PARQUET: {
auto extraInfo = reinterpret_cast<ExtraBoundCopyRelInfo*>(copyFromInfo->extraInfo.get());
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
payloads.push_back(extraInfo->srcOffset);
payloads.push_back(extraInfo->dstOffset);
// Partitioner for FWD direction rel data.
infos.push_back(
std::make_unique<LogicalPartitionerInfo>(extraInfo->srcOffset, copyFromInfo->columns,
std::make_unique<LogicalPartitionerInfo>(extraInfo->srcOffset, payloads,
tableSchema->isSingleMultiplicityInDirection(RelDataDirection::FWD) ?
ColumnDataFormat::REGULAR :
ColumnDataFormat::CSR, tableSchema));
ColumnDataFormat::CSR,
tableSchema));
// Partitioner for BWD direction rel data.
infos.push_back(
std::make_unique<LogicalPartitionerInfo>(extraInfo->dstOffset, copyFromInfo->columns,
std::make_unique<LogicalPartitionerInfo>(extraInfo->dstOffset, payloads,
tableSchema->isSingleMultiplicityInDirection(RelDataDirection::BWD) ?
ColumnDataFormat::REGULAR :
ColumnDataFormat::CSR, tableSchema));
ColumnDataFormat::CSR,
tableSchema));
} break;
default: {
KU_UNREACHABLE;
Expand Down
Loading

0 comments on commit 1713ef7

Please sign in to comment.