Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi Copy for Node Tables #3298

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataset/tinysnb/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CALL ENABLE_MULTI_COPY=true
COPY person FROM "dataset/tinysnb/vPerson.csv" (HeaDER=true, deLim=',');
COPY person FROM "dataset/tinysnb/vPerson2.csv" (deLim=',');
COPY organisation FROM "dataset/tinysnb/vOrganisation.csv";
COPY movies FROM "dataset/tinysnb/vMovies.csv";
COPY knows FROM "dataset/tinysnb/eKnows.csv";
Expand Down
3 changes: 0 additions & 3 deletions dataset/tinysnb/vPerson.csv
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@ id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDu
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,"[4,5]","[Carmen,Fred]","[[8,10]]","[91,75,21,95]",1.00,a0eebc999c0b4ef8bb6d6bb9bd380a13
5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,"[1,9]","[Wolfeschlegelstein,Daniel]","[[7,4],[8,8],[9]]","[76,88,99,89]",1.30,a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a14
7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]","[96,59,65,88]",1.463,{a0eebc99-9c0b4ef8-bb6d6bb9-bd380a15}
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]","[80,78,34,83]",1.51,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A16
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]","[43,83,67,43]",1.6,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A17
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]","[77,64,100,54]",1.323,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A18
3 changes: 3 additions & 0 deletions dataset/tinysnb/vPerson2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]","[80,78,34,83]",1.51,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A16
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]","[43,83,67,43]",1.6,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A17
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]","[77,64,100,54]",1.323,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A18
4 changes: 0 additions & 4 deletions src/common/exception/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ std::string ExceptionMessage::nullPKException() {
return "Found NULL, which violates the non-null constraint of the primary key column.";
}

std::string ExceptionMessage::notAllowCopyOnNonEmptyTableException() {
return "COPY commands can only be executed once on a table.";
}

std::string ExceptionMessage::overLargeStringPKValueException(uint64_t length) {
return stringFormat("The maximum length of primary key strings is 262144 bytes. The input "
"string's length was {}.",
Expand Down
2 changes: 0 additions & 2 deletions src/include/common/exception/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ struct ExceptionMessage {
static std::string nonExistentPKException(const std::string& pkString);
static std::string invalidPKType(const std::string& type);
static std::string nullPKException();
// Bulk insertion.
static std::string notAllowCopyOnNonEmptyTableException();
// Long string.
static std::string overLargeStringPKValueException(uint64_t length);
static std::string overLargeStringValueException(uint64_t length);
Expand Down
3 changes: 0 additions & 3 deletions src/include/main/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ struct ClientConfig {
bool enableProgressBar;
// time before displaying progress bar
uint64_t showProgressAfter;
// If multi copy is enabled.
bool enableMultiCopy;
// Semantic for recursive pattern, can be either WALK, TRAIL, ACYCLIC
common::PathSemantic recursivePatternSemantic;
// Scale factor for recursive pattern cardinality estimation.
Expand All @@ -40,7 +38,6 @@ struct ClientConfigDefault {
static constexpr bool ENABLE_SEMI_MASK = true;
static constexpr bool ENABLE_PROGRESS_BAR = true;
static constexpr uint64_t SHOW_PROGRESS_AFTER = 1000;
static constexpr bool ENABLE_MULTI_COPY = false;
static constexpr common::PathSemantic RECURSIVE_PATTERN_SEMANTIC = common::PathSemantic::WALK;
static constexpr uint32_t RECURSIVE_PATTERN_FACTOR = 1;
};
Expand Down
12 changes: 0 additions & 12 deletions src/include/main/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,6 @@ struct FileSearchPathSetting {
}
};

struct EnableMultiCopySetting {
static constexpr const char* name = "enable_multi_copy";
static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::BOOL;
static void setContext(ClientContext* context, const common::Value& parameter) {
parameter.validateType(inputType);
context->getClientConfigUnsafe()->enableMultiCopy = parameter.getValue<bool>();
}
static common::Value getSetting(ClientContext* context) {
return common::Value(context->getClientConfig()->enableMultiCopy);
}
};

struct RecursivePatternSemanticSetting {
static constexpr const char* name = "recursive_pattern_semantic";
static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::STRING;
Expand Down
3 changes: 0 additions & 3 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ class BatchInsert : public Sink {

inline std::shared_ptr<BatchInsertSharedState> getSharedState() const { return sharedState; }

protected:
void checkIfTableIsEmpty();

protected:
std::unique_ptr<BatchInsertInfo> info;
std::shared_ptr<BatchInsertSharedState> sharedState;
Expand Down
30 changes: 25 additions & 5 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include "common/cast.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "processor/operator/aggregate/hash_aggregate.h"
#include "processor/operator/call/in_query_call.h"
#include "processor/operator/persistent/batch_insert.h"
Expand All @@ -9,7 +11,11 @@
#include "storage/store/node_table.h"

namespace kuzu {
namespace transaction {
class Transaction;
};
namespace processor {
struct ExecutionContext;

struct NodeBatchInsertInfo final : public BatchInsertInfo {
std::vector<DataPos> columnPositions;
Expand Down Expand Up @@ -64,9 +70,6 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState {

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

void appendIncompleteNodeGroup(std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder);

inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; }

void calculateNumTuples();
Expand Down Expand Up @@ -109,12 +112,29 @@ class NodeBatchInsert final : public BatchInsert {
resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx,
static void writeAndResetNewNodeGroup(common::node_group_idx_t nodeGroupIdx,
std::optional<IndexBuilder>& indexBuilder, common::column_id_t pkColumnID,
storage::NodeTable* table, storage::ChunkedNodeGroup* nodeGroup);

// The node group will be reset so that the only values remaining are the ones which were not
// written
void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, ExecutionContext* context,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder);

private:
void copyToNodeGroup();
// Returns the number of nodes written from the group
uint64_t writeToExistingNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, std::optional<IndexBuilder>& indexBuilder,
common::column_id_t pkColumnID, storage::NodeTable* table,
storage::ChunkedNodeGroup* nodeGroup);

void appendIncompleteNodeGroup(std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder, ExecutionContext* context);
void clearToIndex(std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
common::offset_t startIndexInGroup);

void copyToNodeGroup(ExecutionContext* context);
};

} // namespace processor
Expand Down
12 changes: 3 additions & 9 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,9 @@ class HashIndex final : public OnDiskHashIndex {

using BufferKeyType =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string, T>::type;
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
size_t append(const IndexBuffer<BufferKeyType>& buffer) {
// Keep the same number of primary slots in the builder as we will eventually need when
// flushing to disk, so that we know each slot to write to
bulkInsertLocalStorage.reserve(
indexHeaderForWriteTrx->numEntries + bulkInsertLocalStorage.size() + buffer.size());
return bulkInsertLocalStorage.append(buffer);
}
// Appends the buffer to the index. Returns the number of values successfully inserted,
// or the index of the first value which cannot be inserted.
size_t append(const IndexBuffer<BufferKeyType>& buffer);

void prepareCommit() override;
void prepareRollback() override;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace kuzu {
namespace storage {

class ChunkedNodeGroup;

class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::offset_t nodeGroupStartOffset,
Expand All @@ -24,6 +26,8 @@ class LocalNodeNG final : public LocalNodeGroup {

bool insert(std::vector<common::ValueVector*> nodeIDVectors,
std::vector<common::ValueVector*> propertyVectors) override;
bool insert(common::offset_t startNodeOffset, ChunkedNodeGroup* nodeGroup,
common::offset_t numValues);
bool update(std::vector<common::ValueVector*> nodeIDVectors, common::column_id_t columnID,
common::ValueVector* propertyVector) override;
bool delete_(common::ValueVector* nodeIDVector,
Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include <unordered_map>

#include "common/types/internal_id_t.h"
#include "common/vector/value_vector.h"
#include "storage/store/chunked_node_group.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double check if the include of chunked_node_group.h is needed. I think it should be indirectly included by chunked_node_group_collection.h already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, but I don't like relying on indirect includes, they just make the changes more confusing when things are removed and you have to add unrelated headers (though admittedly in this case it is something that's closely related to the existing include).

#include "storage/store/chunked_node_group_collection.h"

namespace kuzu {
Expand Down Expand Up @@ -63,9 +65,11 @@ class LocalChunkedGroupCollection {
bool read(common::offset_t offset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);

ChunkedNodeGroup* getLastChunkedGroupAndAddNewGroupIfNecessary();
inline void append(common::offset_t offset, std::vector<common::ValueVector*> vectors) {
offsetToRowIdx[offset] = append(vectors);
}
void append(common::offset_t offset, ChunkedNodeGroup* nodeGroup, common::offset_t numValues);
// Only used for rel tables. Should be moved out later.
inline void append(common::offset_t nodeOffset, common::offset_t relOffset,
std::vector<common::ValueVector*> vectors) {
Expand Down Expand Up @@ -162,7 +166,7 @@ class LocalNodeGroup {
KU_ASSERT(columnID < updateChunks.size());
return updateChunks[columnID];
}
LocalChunkedGroupCollection& getInsesrtChunks() { return insertChunks; }
LocalChunkedGroupCollection& getInsertChunks() { return insertChunks; }

bool hasUpdatesOrDeletions() const;

Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/store/chunked_node_group.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/column_data_format.h"
#include "common/constants.h"
#include "common/copy_constructors.h"
#include "storage/store/column_chunk.h"

Expand Down Expand Up @@ -40,7 +41,10 @@ class ChunkedNodeGroup {

uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::SelectionVector& selVector, uint64_t numValuesToAppend);
common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup);
// Appends up to numValuesToAppend from the other chunked node group, returning the actual
// number of values appended
common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup,
common::offset_t numValuesToAppend = common::StorageConstants::NODE_GROUP_SIZE);
void write(const std::vector<std::unique_ptr<ColumnChunk>>& data,
common::column_id_t offsetColumnID);
void write(const ChunkedNodeGroup& data, common::column_id_t offsetColumnID);
Expand Down
16 changes: 16 additions & 0 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

#include "common/assert.h"
#include "common/cast.h"
#include "common/types/types.h"
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_store_statistics.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table_data.h"
#include "storage/store/table.h"

namespace kuzu {
namespace transaction {
class Transaction;
};
namespace storage {
class LocalNodeTable;

struct NodeTableReadState : public TableReadState {
NodeTableReadState(const common::ValueVector& nodeIDVector,
Expand Down Expand Up @@ -105,12 +110,23 @@ class NodeTable final : public Table {

void append(ChunkedNodeGroup* nodeGroup) { tableData->append(nodeGroup); }

void prepareCommitNodeGroup(common::node_group_idx_t nodeGroupIdx,
transaction::Transaction* transaction, storage::LocalNodeNG* localNodeGroup);
void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override;
void prepareCommit() override;
void prepareRollback(LocalTable* localTable) override;
void checkpointInMemory() override;
void rollbackInMemory() override;

inline common::node_group_idx_t getNumNodeGroups(transaction::Transaction* transaction) {
return tableData->getNumNodeGroups(transaction);
}

inline common::offset_t getNumTuplesInNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx) {
return tableData->getNumTuplesInNodeGroup(transaction, nodeGroupIdx);
}

private:
void updatePK(transaction::Transaction* transaction, common::column_id_t columnID,
const common::ValueVector& nodeIDVector, const common::ValueVector& pkVector);
Expand Down
10 changes: 10 additions & 0 deletions src/include/storage/store/node_table_data.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/table_data.h"

namespace kuzu {
Expand Down Expand Up @@ -37,6 +39,8 @@ class NodeTableData final : public TableData {
// Flush the nodeGroup to disk and update metadataDAs.
void append(ChunkedNodeGroup* nodeGroup) override;

void prepareLocalNodeGroupToCommit(common::node_group_idx_t nodeGroupIdx,
transaction::Transaction* transaction, LocalNodeNG* localNodeGroup);
void prepareLocalTableToCommit(transaction::Transaction* transaction,
LocalTableData* localTable) override;

Expand All @@ -45,6 +49,12 @@ class NodeTableData final : public TableData {
return columns[0]->getNumNodeGroups(transaction);
}

common::offset_t getNumTuplesInNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx) const {
KU_ASSERT(nodeGroupIdx < getNumNodeGroups(transaction));
return columns[0]->getMetadata(nodeGroupIdx, transaction->getType()).numValues;
}

private:
void initializeColumnReadStates(transaction::Transaction* transaction,
common::offset_t startNodeOffset, kuzu::storage::NodeDataReadState& readState,
Expand Down
1 change: 0 additions & 1 deletion src/main/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ ClientContext::ClientContext(Database* database) : database{database} {
config.varLengthMaxDepth = ClientConfigDefault::VAR_LENGTH_MAX_DEPTH;
config.enableProgressBar = ClientConfigDefault::ENABLE_PROGRESS_BAR;
config.showProgressAfter = ClientConfigDefault::SHOW_PROGRESS_AFTER;
config.enableMultiCopy = ClientConfigDefault::ENABLE_MULTI_COPY;
config.recursivePatternSemantic = ClientConfigDefault::RECURSIVE_PATTERN_SEMANTIC;
config.recursivePatternCardinalityScaleFactor = ClientConfigDefault::RECURSIVE_PATTERN_FACTOR;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/db_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ static ConfigurationOption options[] = { // NOLINT(cert-err58-cpp):
GET_CONFIGURATION(VarLengthExtendMaxDepthSetting), GET_CONFIGURATION(EnableSemiMaskSetting),
GET_CONFIGURATION(HomeDirectorySetting), GET_CONFIGURATION(FileSearchPathSetting),
GET_CONFIGURATION(ProgressBarSetting), GET_CONFIGURATION(ProgressBarTimerSetting),
GET_CONFIGURATION(EnableMultiCopySetting), GET_CONFIGURATION(RecursivePatternSemanticSetting),
GET_CONFIGURATION(RecursivePatternSemanticSetting),
GET_CONFIGURATION(RecursivePatternFactorSetting)};

ConfigurationOption* DBConfig::getOptionByName(const std::string& optionName) {
Expand Down
1 change: 0 additions & 1 deletion src/processor/operator/persistent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ add_subdirectory(writer/parquet)

add_library(kuzu_processor_operator_persistent
OBJECT
batch_insert.cpp
node_batch_insert.cpp
copy_rdf.cpp
rel_batch_insert.cpp
Expand Down
18 changes: 0 additions & 18 deletions src/processor/operator/persistent/batch_insert.cpp

This file was deleted.

Loading
Loading