Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 1704 #1723

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.5 LANGUAGES CXX)
project(Kuzu VERSION 0.0.5.1 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
2 changes: 2 additions & 0 deletions dataset/primary-key-tests/eFollows.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100,10020
101,10020
1 change: 0 additions & 1 deletion dataset/primary-key-tests/int-pk-tests/eKnows.csv

This file was deleted.

2 changes: 0 additions & 2 deletions dataset/primary-key-tests/int-pk-tests/vPerson.csv

This file was deleted.

1 change: 0 additions & 1 deletion dataset/primary-key-tests/string-pk-tests/eKnows.csv

This file was deleted.

2 changes: 0 additions & 2 deletions dataset/primary-key-tests/string-pk-tests/vPerson.csv

This file was deleted.

1 change: 1 addition & 0 deletions dataset/primary-key-tests/vOrg.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
10020
2 changes: 2 additions & 0 deletions dataset/primary-key-tests/vPerson.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100,Foo,10
101,Bar,11
24 changes: 15 additions & 9 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ uint64_t SerDeser::serializeValue<RelTableSchema>(
offset = SerDeser::serializeValue<TableSchema>((const TableSchema&)value, fileInfo, offset);
offset = SerDeser::serializeValue<RelMultiplicity>(value.relMultiplicity, fileInfo, offset);
offset = SerDeser::serializeValue<table_id_t>(value.srcTableID, fileInfo, offset);
return SerDeser::serializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::serializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::serializeValue<LogicalType>(value.srcPKDataType, fileInfo, offset);
return SerDeser::serializeValue<LogicalType>(value.dstPKDataType, fileInfo, offset);
}

template<>
Expand All @@ -156,7 +158,9 @@ uint64_t SerDeser::deserializeValue<RelTableSchema>(
offset = SerDeser::deserializeValue<TableSchema>((TableSchema&)value, fileInfo, offset);
offset = SerDeser::deserializeValue<RelMultiplicity>(value.relMultiplicity, fileInfo, offset);
offset = SerDeser::deserializeValue<table_id_t>(value.srcTableID, fileInfo, offset);
return SerDeser::deserializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::deserializeValue<table_id_t>(value.dstTableID, fileInfo, offset);
offset = SerDeser::deserializeValue<LogicalType>(value.srcPKDataType, fileInfo, offset);
return SerDeser::deserializeValue<LogicalType>(value.dstPKDataType, fileInfo, offset);
}

} // namespace common
Expand Down Expand Up @@ -205,7 +209,8 @@ table_id_t CatalogContent::addNodeTableSchema(
}

table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
std::vector<Property> properties, table_id_t srcTableID, table_id_t dstTableID) {
std::vector<Property> properties, table_id_t srcTableID, table_id_t dstTableID,
LogicalType srcPKDataType, LogicalType dstPKDataType) {
table_id_t tableID = assignNextTableID();
nodeTableSchemas[srcTableID]->addFwdRelTableID(tableID);
nodeTableSchemas[dstTableID]->addBwdRelTableID(tableID);
Expand All @@ -216,8 +221,9 @@ table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplic
properties[i].propertyID = i;
properties[i].tableID = tableID;
}
auto relTableSchema = std::make_unique<RelTableSchema>(std::move(tableName), tableID,
relMultiplicity, std::move(properties), srcTableID, dstTableID);
auto relTableSchema =
std::make_unique<RelTableSchema>(std::move(tableName), tableID, relMultiplicity,
std::move(properties), srcTableID, dstTableID, srcPKDataType, dstPKDataType);
relTableNameToIDMap[relTableSchema->tableName] = tableID;
relTableSchemas[tableID] = std::move(relTableSchema);
return tableID;
Expand Down Expand Up @@ -401,11 +407,11 @@ table_id_t Catalog::addNodeTableSchema(
}

table_id_t Catalog::addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
const std::vector<Property>& propertyDefinitions, table_id_t srcTableID,
table_id_t dstTableID) {
const std::vector<Property>& propertyDefinitions, table_id_t srcTableID, table_id_t dstTableID,
LogicalType srcPKDataType, LogicalType dstPKDataType) {
initCatalogContentForWriteTrxIfNecessary();
auto tableID = catalogContentForWriteTrx->addRelTableSchema(
std::move(tableName), relMultiplicity, propertyDefinitions, srcTableID, dstTableID);
auto tableID = catalogContentForWriteTrx->addRelTableSchema(std::move(tableName),
relMultiplicity, propertyDefinitions, srcTableID, dstTableID, srcPKDataType, dstPKDataType);
wal->logRelTableRecord(tableID);
return tableID;
}
Expand Down
6 changes: 4 additions & 2 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class CatalogContent {

common::table_id_t addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
std::vector<Property> properties, common::table_id_t srcTableID,
common::table_id_t dstTableID);
common::table_id_t dstTableID, common::LogicalType srcPKDataType,
common::LogicalType dstPKDataType);

inline bool containNodeTable(common::table_id_t tableID) const {
return nodeTableSchemas.contains(tableID);
Expand Down Expand Up @@ -192,7 +193,8 @@ class Catalog {

common::table_id_t addRelTableSchema(std::string tableName, RelMultiplicity relMultiplicity,
const std::vector<Property>& propertyDefinitions, common::table_id_t srcTableID,
common::table_id_t dstTableID);
common::table_id_t dstTableID, common::LogicalType srcPKDataType,
common::LogicalType dstPKDataType);

void dropTableSchema(common::table_id_t tableID);

Expand Down
26 changes: 9 additions & 17 deletions src/include/catalog/catalog_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,34 +122,24 @@ struct RelTableSchema : TableSchema {
RelTableSchema()
: TableSchema{"", common::INVALID_TABLE_ID, false /* isNodeTable */, {} /* properties */},
relMultiplicity{MANY_MANY}, srcTableID{common::INVALID_TABLE_ID},
dstTableID{common::INVALID_TABLE_ID} {}
dstTableID{common::INVALID_TABLE_ID}, srcPKDataType{common::LogicalType{
common::LogicalTypeID::ANY}},
dstPKDataType{common::LogicalType{common::LogicalTypeID::ANY}} {}
RelTableSchema(std::string tableName, common::table_id_t tableID,
RelMultiplicity relMultiplicity, std::vector<Property> properties,
common::table_id_t srcTableID, common::table_id_t dstTableID)
common::table_id_t srcTableID, common::table_id_t dstTableID,
common::LogicalType srcPKDataType, common::LogicalType dstPKDataType)
: TableSchema{std::move(tableName), tableID, false /* isNodeTable */,
std::move(properties)},
relMultiplicity{relMultiplicity}, srcTableID{srcTableID}, dstTableID{dstTableID} {}

inline Property& getRelIDDefinition() {
for (auto& property : properties) {
if (property.name == common::InternalKeyword::ID) {
return property;
}
}
throw common::InternalException("Cannot find internal rel ID definition.");
}
relMultiplicity{relMultiplicity}, srcTableID{srcTableID}, dstTableID{dstTableID},
srcPKDataType{std::move(srcPKDataType)}, dstPKDataType{std::move(dstPKDataType)} {}

inline bool isSingleMultiplicityInDirection(common::RelDataDirection direction) const {
return relMultiplicity == ONE_ONE ||
relMultiplicity ==
(direction == common::RelDataDirection::FWD ? MANY_ONE : ONE_MANY);
}

inline uint32_t getNumUserDefinedProperties() const {
// Note: the first column stores the relID property.
return properties.size() - 1;
}

inline bool isSrcOrDstTable(common::table_id_t tableID) const {
return srcTableID == tableID || dstTableID == tableID;
}
Expand All @@ -165,6 +155,8 @@ struct RelTableSchema : TableSchema {
RelMultiplicity relMultiplicity;
common::table_id_t srcTableID;
common::table_id_t dstTableID;
common::LogicalType srcPKDataType;
common::LogicalType dstPKDataType;
};

} // namespace catalog
Expand Down
12 changes: 6 additions & 6 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class ReadCSVMorsel : public ReadFileMorsel {

class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig,
catalog::NodeTableSchema* tableSchema, std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{std::move(csvReaderConfig)},
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths)}, csvReaderConfig{csvReaderConfig},
tableSchema{tableSchema} {}

private:
Expand All @@ -30,16 +30,16 @@ class ReadCSVSharedState : public ReadFileSharedState {

private:
common::CSVReaderConfig csvReaderConfig;
catalog::NodeTableSchema* tableSchema;
catalog::TableSchema* tableSchema;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class ReadCSV : public ReadFile {
public:
ReadCSV(std::vector<DataPos> arrowColumnPoses, DataPos offsetVectorPos,
ReadCSV(std::vector<DataPos> arrowColumnPoses, const DataPos& offsetVectorPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), std::move(offsetVectorPos), std::move(sharedState),
: ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
Expand Down
44 changes: 20 additions & 24 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ class RelCopier {
public:
RelCopier(std::shared_ptr<CopySharedState> sharedState, const common::CopyDescription& copyDesc,
catalog::RelTableSchema* schema, DirectedInMemRelData* fwdRelData,
DirectedInMemRelData* bwdRelData, std::vector<common::LogicalType> pkColumnTypes,
std::vector<PrimaryKeyIndex*> pkIndexes)
DirectedInMemRelData* bwdRelData, std::vector<PrimaryKeyIndex*> pkIndexes)
: sharedState{std::move(sharedState)}, copyDesc{copyDesc}, schema{schema},
fwdRelData{fwdRelData}, bwdRelData{bwdRelData},
pkColumnTypes{std::move(pkColumnTypes)}, pkIndexes{std::move(pkIndexes)} {}
fwdRelData{fwdRelData}, bwdRelData{bwdRelData}, pkIndexes{std::move(pkIndexes)} {}
virtual ~RelCopier() = default;

void execute(processor::ExecutionContext* executionContext);
Expand Down Expand Up @@ -61,20 +59,18 @@ class RelCopier {
catalog::RelTableSchema* schema;
DirectedInMemRelData* fwdRelData;
DirectedInMemRelData* bwdRelData;
std::vector<common::LogicalType> pkColumnTypes; // src and dst node pk columns.
std::vector<PrimaryKeyIndex*> pkIndexes;
};

class RelListsCounterAndColumnCopier : public RelCopier {
public:
protected:
RelListsCounterAndColumnCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

protected:
void finalize() override;

static void buildRelListsHeaders(
Expand All @@ -89,13 +85,13 @@ class ParquetRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCo
ParquetRelListsCounterAndColumnsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData,
bwdRelData, std::move(pkColumnTypes), std::move(pkIndexes)} {}
bwdRelData, std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<ParquetRelListsCounterAndColumnsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
Expand All @@ -111,27 +107,27 @@ class CSVRelListsCounterAndColumnsCopier : public RelListsCounterAndColumnCopier
CSVRelListsCounterAndColumnsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCounterAndColumnCopier{std::move(sharedState), copyDesc, schema, fwdRelData,
bwdRelData, std::move(pkColumnTypes), std::move(pkIndexes)} {}
bwdRelData, std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<CSVRelListsCounterAndColumnsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
void executeInternal(std::unique_ptr<CopyMorsel> morsel) final;
};

class RelListsCopier : public RelCopier {
public:
protected:
RelListsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

private:
void finalize() final;
Expand All @@ -142,13 +138,13 @@ class ParquetRelListsCopier : public RelListsCopier {
ParquetRelListsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<ParquetRelListsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
Expand All @@ -164,13 +160,13 @@ class CSVRelListsCopier : public RelListsCopier {
CSVRelListsCopier(std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::RelTableSchema* schema,
DirectedInMemRelData* fwdRelData, DirectedInMemRelData* bwdRelData,
std::vector<common::LogicalType> pkColumnTypes, std::vector<PrimaryKeyIndex*> pkIndexes)
std::vector<PrimaryKeyIndex*> pkIndexes)
: RelListsCopier{std::move(sharedState), copyDesc, schema, fwdRelData, bwdRelData,
std::move(pkColumnTypes), std::move(pkIndexes)} {}
std::move(pkIndexes)} {}

std::unique_ptr<RelCopier> clone() const final {
return std::make_unique<CSVRelListsCopier>(
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkColumnTypes, pkIndexes);
sharedState, copyDesc, schema, fwdRelData, bwdRelData, pkIndexes);
}

private:
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/copier/rel_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class DirectedInMemRelData {
class RelCopyExecutor {
public:
RelCopyExecutor(common::CopyDescription& copyDescription, WAL* wal,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics);
common::TaskScheduler& taskScheduler, storage::NodesStore& nodesStore,
storage::RelTable* table, catalog::RelTableSchema* tableSchema,
RelsStatistics* relsStatistics);

common::offset_t copy(processor::ExecutionContext* executionContext);

Expand All @@ -62,7 +63,6 @@ class RelCopyExecutor {
std::string outputDirectory;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
common::TaskScheduler& taskScheduler;
catalog::Catalog& catalog;
catalog::RelTableSchema* tableSchema;
uint64_t numTuples;
RelsStatistics* relsStatistics;
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4},
{"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.5.1", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5},
{"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ namespace processor {

uint64_t CopyRel::executeInternal(
kuzu::common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) {
auto tableSchema = catalog->getReadOnlyVersion()->getRelTableSchema(tableID);
auto relCopier = std::make_unique<RelCopyExecutor>(
copyDescription, wal, *taskScheduler, *catalog, nodesStore, table, relsStatistics);
copyDescription, wal, *taskScheduler, nodesStore, table, tableSchema, relsStatistics);
auto numRelsCopied = relCopier->copy(executionContext);
return numRelsCopied;
}
Expand Down
8 changes: 6 additions & 2 deletions src/processor/operator/ddl/create_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ namespace kuzu {
namespace processor {

void CreateRelTable::executeDDLInternal() {
auto newRelTableID =
catalog->addRelTableSchema(tableName, relMultiplicity, properties, srcTableID, dstTableID);
auto srcPKDataType =
catalog->getReadOnlyVersion()->getNodeTableSchema(srcTableID)->getPrimaryKey().dataType;
auto dstPKDataType =
catalog->getReadOnlyVersion()->getNodeTableSchema(dstTableID)->getPrimaryKey().dataType;
auto newRelTableID = catalog->addRelTableSchema(tableName, relMultiplicity, properties,
srcTableID, dstTableID, srcPKDataType, dstPKDataType);
relsStatistics->addTableStatistic(catalog->getWriteVersion()->getRelTableSchema(newRelTableID));
}

Expand Down
Loading
Loading