Skip to content

Commit

Permalink
fix issue 1704
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jun 26, 2023
1 parent 67eb0c2 commit ab919c0
Show file tree
Hide file tree
Showing 23 changed files with 146 additions and 217 deletions.
2 changes: 2 additions & 0 deletions dataset/primary-key-tests/eFollows.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100,10020
101,10020
1 change: 0 additions & 1 deletion dataset/primary-key-tests/int-pk-tests/eKnows.csv

This file was deleted.

2 changes: 0 additions & 2 deletions dataset/primary-key-tests/int-pk-tests/vPerson.csv

This file was deleted.

1 change: 0 additions & 1 deletion dataset/primary-key-tests/string-pk-tests/eKnows.csv

This file was deleted.

2 changes: 0 additions & 2 deletions dataset/primary-key-tests/string-pk-tests/vPerson.csv

This file was deleted.

1 change: 1 addition & 0 deletions dataset/primary-key-tests/vOrg.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
10020
2 changes: 2 additions & 0 deletions dataset/primary-key-tests/vPerson.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100,Foo,10
101,Bar,11
24 changes: 15 additions & 9 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ uint64_t SerDeser::serializeValue<RelTableSchema>(
offset = SerDeser::serializeValue<TableSchema>((const TableSchema&)value, fileInfo, offset);
offset = SerDeser::serializeValue<RelMultiplicity>(value.relMultiplicity, fileInfo, offset);
offset = SerDeser::serializeValue<table_id_t>(value.srcTableID, fileInfo, offset);
return SerDeser::serializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::serializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::serializeValue<LogicalType>(value.srcPKDataType, fileInfo, offset);
return SerDeser::serializeValue<LogicalType>(value.dstPKDataType, fileInfo, offset);
}

template<>
Expand All @@ -156,7 +158,9 @@ uint64_t SerDeser::deserializeValue<RelTableSchema>(
offset = SerDeser::deserializeValue<TableSchema>((TableSchema&)value, fileInfo, offset);
offset = SerDeser::deserializeValue<RelMultiplicity>(value.relMultiplicity, fileInfo, offset);
offset = SerDeser::deserializeValue<table_id_t>(value.srcTableID, fileInfo, offset);
return SerDeser::deserializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::deserializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::deserializeValue<LogicalType>(value.srcPKDataType, fileInfo, offset);
return SerDeser::deserializeValue<LogicalType>(value.dstPKDataType, fileInfo, offset);
}

} // namespace common
Expand Down Expand Up @@ -205,7 +209,8 @@ table_id_t CatalogContent::addNodeTableSchema(
}

table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
std::vector<Property> properties, table_id_t srcTableID, table_id_t dstTableID) {
std::vector<Property> properties, table_id_t srcTableID, table_id_t dstTableID,
LogicalType srcPKDataType, LogicalType dstPKDataType) {
table_id_t tableID = assignNextTableID();
nodeTableSchemas[srcTableID]->addFwdRelTableID(tableID);
nodeTableSchemas[dstTableID]->addBwdRelTableID(tableID);
Expand All @@ -216,8 +221,9 @@ table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplic
properties[i].propertyID = i;
properties[i].tableID = tableID;
}
auto relTableSchema = std::make_unique<RelTableSchema>(std::move(tableName), tableID,
relMultiplicity, std::move(properties), srcTableID, dstTableID);
auto relTableSchema =
std::make_unique<RelTableSchema>(std::move(tableName), tableID, relMultiplicity,
std::move(properties), srcTableID, dstTableID, srcPKDataType, dstPKDataType);
relTableNameToIDMap[relTableSchema->tableName] = tableID;
relTableSchemas[tableID] = std::move(relTableSchema);
return tableID;
Expand Down Expand Up @@ -401,11 +407,11 @@ table_id_t Catalog::addNodeTableSchema(
}

table_id_t Catalog::addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
const std::vector<Property>& propertyDefinitions, table_id_t srcTableID,
table_id_t dstTableID) {
const std::vector<Property>& propertyDefinitions, table_id_t srcTableID, table_id_t dstTableID,
LogicalType srcPKDataType, LogicalType dstPKDataType) {
initCatalogContentForWriteTrxIfNecessary();
auto tableID = catalogContentForWriteTrx->addRelTableSchema(
std::move(tableName), relMultiplicity, propertyDefinitions, srcTableID, dstTableID);
auto tableID = catalogContentForWriteTrx->addRelTableSchema(std::move(tableName),
relMultiplicity, propertyDefinitions, srcTableID, dstTableID, srcPKDataType, dstPKDataType);
wal->logRelTableRecord(tableID);
return tableID;
}
Expand Down
6 changes: 4 additions & 2 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class CatalogContent {

common::table_id_t addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
std::vector<Property> properties, common::table_id_t srcTableID,
common::table_id_t dstTableID);
common::table_id_t dstTableID, common::LogicalType srcPKDataType,
common::LogicalType dstPKDataType);

inline bool containNodeTable(common::table_id_t tableID) const {
return nodeTableSchemas.contains(tableID);
Expand Down Expand Up @@ -192,7 +193,8 @@ class Catalog {

common::table_id_t addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
const std::vector<Property>& propertyDefinitions, common::table_id_t srcTableID,
common::table_id_t dstTableID);
common::table_id_t dstTableID, common::LogicalType srcPKDataType,
common::LogicalType dstPKDataType);

void dropTableSchema(common::table_id_t tableID);

Expand Down
26 changes: 9 additions & 17 deletions src/include/catalog/catalog_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,34 +122,24 @@ struct RelTableSchema : TableSchema {
RelTableSchema()
: TableSchema{"", common::INVALID_TABLE_ID, false /* isNodeTable */, {} /* properties */},
relMultiplicity{MANY_MANY}, srcTableID{common::INVALID_TABLE_ID},
dstTableID{common::INVALID_TABLE_ID} {}
dstTableID{common::INVALID_TABLE_ID}, srcPKDataType{common::LogicalType{
common::LogicalTypeID::ANY}},
dstPKDataType{common::LogicalType{common::LogicalTypeID::ANY}} {}
RelTableSchema(std::string tableName, common::table_id_t tableID,
RelMultiplicity relMultiplicity, std::vector<Property> properties,
common::table_id_t srcTableID, common::table_id_t dstTableID)
common::table_id_t srcTableID, common::table_id_t dstTableID,
common::LogicalType srcPKDataType, common::LogicalType dstPKDataType)
: TableSchema{std::move(tableName), tableID, false /* isNodeTable */,
std::move(properties)},
relMultiplicity{relMultiplicity}, srcTableID{srcTableID}, dstTableID{dstTableID} {}

inline Property& getRelIDDefinition() {
for (auto& property : properties) {
if (property.name == common::InternalKeyword::ID) {
return property;
}
}
throw common::InternalException("Cannot find internal rel ID definition.");
}
relMultiplicity{relMultiplicity}, srcTableID{srcTableID}, dstTableID{dstTableID},
srcPKDataType{std::move(srcPKDataType)}, dstPKDataType{std::move(dstPKDataType)} {}

inline bool isSingleMultiplicityInDirection(common::RelDataDirection direction) const {
return relMultiplicity == ONE_ONE ||
relMultiplicity ==
(direction == common::RelDataDirection::FWD ? MANY_ONE : ONE_MANY);
}

inline uint32_t getNumUserDefinedProperties() const {
// Note: the first column stores the relID property.
return properties.size() - 1;
}

inline bool isSrcOrDstTable(common::table_id_t tableID) const {
return srcTableID == tableID || dstTableID == tableID;
}
Expand All @@ -165,6 +155,8 @@ struct RelTableSchema : TableSchema {
RelMultiplicity relMultiplicity;
common::table_id_t srcTableID;
common::table_id_t dstTableID;
common::LogicalType srcPKDataType;
common::LogicalType dstPKDataType;
};

} // namespace catalog
Expand Down
12 changes: 6 additions & 6 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class ReadCSVMorsel : public ReadFileMorsel {

class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig,
catalog::NodeTableSchema* tableSchema, std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{std::move(csvReaderConfig)},
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema} {}

private:
Expand All @@ -30,16 +30,16 @@ class ReadCSVSharedState : public ReadFileSharedState {

private:
common::CSVReaderConfig csvReaderConfig;
catalog::NodeTableSchema* tableSchema;
catalog::TableSchema* tableSchema;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class ReadCSV : public ReadFile {
public:
ReadCSV(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
ReadCSV(std::vector<DataPos> arrowColumnPoses, const DataPos& offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), std::move(offsetVectorPos), std::move(sharedState),
: ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
Expand Down
44 changes: 20 additions & 24 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ class RelCopier {
public:
RelCopier(std::shared_ptr<CopySharedState> sharedState, const common::CopyDescription& copyDesc,
catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData,
DirectedInMemRelData* bwdRelData, std::vector<common::LogicalType> pkColumnTypes,
std::vector<PrimaryKeyIndex*> pkIndexes)
DirectedInMemRelData* bwdRelData, std::vector<PrimaryKeyIndex*> pkIndexes)
: sharedState{std::move(sharedState)}, copyDesc{copyDesc}, schema{schema},
fwdRelData{fwdRelData}, bwdRelData{bwdRelData},
pkColumnTypes{std::move(pkColumnTypes)}, pkIndexes{std::move(pkIndexes)} {}
fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, pkIndexes{std::move(pkIndexes)} {}
virtual ~RelCopier() = default;

void execute(processor::ExecutionContext* executionContext);
Expand Down Expand Up @@ -61,20 +59,18 @@ class RelCopier {
catalog::RelTableSchema* schema;
DirectedInMemRelData* fwdRelData;
DirectedInMemRelData* bwdRelData;
std::vector<common::LogicalType> pkColumnTypes; // src and dst node pk columns.
std::vector<PrimaryKeyIndex*> pkIndexes;
};

class RelListsCounterAndColumnCopier : public RelCopier {
public:
protected:
RelListsCounterAndColumnCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

protected:
void finalize() override;

static void buildRelListsHeaders(
Expand All @@ -89,13 +85,13 @@ class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCo
ParquetRelListsCounterAndColumnsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData,
bwdRelData, std::move(pkColumnTypes), std::move(pkIndexes)} {}
bwdRelData, std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<ParquetRelListsCounterAndColumnsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
Expand All @@ -111,27 +107,27 @@ class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier
CSVRelListsCounterAndColumnsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData,
bwdRelData, std::move(pkColumnTypes), std::move(pkIndexes)} {}
bwdRelData, std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<CSVRelListsCounterAndColumnsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
void executeInternal(std::unique_ptr<CopyMorsel> morsel) final;
};

class RelListsCopier : public RelCopier {
public:
protected:
RelListsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

private:
void finalize() final;
Expand All @@ -142,13 +138,13 @@ class ParquetRelListsCopier : public RelListsCopier {
ParquetRelListsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<ParquetRelListsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
Expand All @@ -164,13 +160,13 @@ class CSVRelListsCopier : public RelListsCopier {
CSVRelListsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<CSVRelListsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/copier/rel_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class DirectedInMemRelData {
class RelCopyExecutor {
public:
RelCopyExecutor(common::CopyDescription& copyDescription, WAL* wal,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics);
common::TaskScheduler& taskScheduler, storage::NodesStore& nodesStore,
storage::RelTable* table, catalog::RelTableSchema* tableSchema,
RelsStatistics* relsStatistics);

common::offset_t copy(processor::ExecutionContext* executionContext);

Expand All @@ -62,7 +63,6 @@ class RelCopyExecutor {
std::string outputDirectory;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
common::TaskScheduler& taskScheduler;
catalog::Catalog& catalog;
catalog::RelTableSchema* tableSchema;
uint64_t numTuples;
RelsStatistics* relsStatistics;
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ namespace processor {

uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto tableSchema = catalog->getReadOnlyVersion()->getRelTableSchema(tableID);
auto relCopier = std::make_unique<RelCopyExecutor>(
copyDescription, wal, *taskScheduler, *catalog, nodesStore, table, relsStatistics);
copyDescription, wal, *taskScheduler, nodesStore, table, tableSchema, relsStatistics);
auto numRelsCopied = relCopier->copy(executionContext);
return numRelsCopied;
}
Expand Down
8 changes: 6 additions & 2 deletions src/processor/operator/ddl/create_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ namespace kuzu {
namespace processor {

void CreateRelTable::executeDDLInternal() {
auto newRelTableID =
catalog->addRelTableSchema(tableName, relMultiplicity, properties, srcTableID, dstTableID);
auto srcPKDataType =
catalog->getReadOnlyVersion()->getNodeTableSchema(srcTableID)->getPrimaryKey().dataType;
auto dstPKDataType =
catalog->getReadOnlyVersion()->getNodeTableSchema(dstTableID)->getPrimaryKey().dataType;
auto newRelTableID = catalog->addRelTableSchema(tableName, relMultiplicity, properties,
srcTableID, dstTableID, srcPKDataType, dstPKDataType);
relsStatistics->addTableStatistic(catalog->getWriteVersion()->getRelTableSchema(newRelTableID));
}

Expand Down
Loading

0 comments on commit ab919c0

Please sign in to comment.