Skip to content

Commit

Permalink
Enable multi copy for node tables (#3298)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Apr 19, 2024
1 parent b5b4d6a commit b41b649
Show file tree
Hide file tree
Showing 39 changed files with 408 additions and 182 deletions.
2 changes: 1 addition & 1 deletion dataset/tinysnb/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CALL ENABLE_MULTI_COPY=true
COPY person FROM "dataset/tinysnb/vPerson.csv" (HeaDER=true, deLim=',');
COPY person FROM "dataset/tinysnb/vPerson2.csv" (deLim=',');
COPY organisation FROM "dataset/tinysnb/vOrganisation.csv";
COPY movies FROM "dataset/tinysnb/vMovies.csv";
COPY knows FROM "dataset/tinysnb/eKnows.csv";
Expand Down
3 changes: 0 additions & 3 deletions dataset/tinysnb/vPerson.csv
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,3 @@ id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDu
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,"[4,5]","[Carmen,Fred]","[[8,10]]","[91,75,21,95]",1.00,a0eebc999c0b4ef8bb6d6bb9bd380a13
5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,"[1,9]","[Wolfeschlegelstein,Daniel]","[[7,4],[8,8],[9]]","[76,88,99,89]",1.30,a0ee-bc99-9c0b-4ef8-bb6d-6bb9-bd38-0a14
7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]","[96,59,65,88]",1.463,{a0eebc99-9c0b4ef8-bb6d6bb9-bd380a15}
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]","[80,78,34,83]",1.51,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A16
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]","[43,83,67,43]",1.6,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A17
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]","[77,64,100,54]",1.323,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A18
3 changes: 3 additions & 0 deletions dataset/tinysnb/vPerson2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]","[80,78,34,83]",1.51,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A16
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]","[43,83,67,43]",1.6,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A17
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]","[77,64,100,54]",1.323,A0EEBC99-9C0B-4EF8-BB6D-6BB9BD380A18
4 changes: 0 additions & 4 deletions src/common/exception/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ std::string ExceptionMessage::nullPKException() {
return "Found NULL, which violates the non-null constraint of the primary key column.";
}

std::string ExceptionMessage::notAllowCopyOnNonEmptyTableException() {
return "COPY commands can only be executed once on a table.";
}

std::string ExceptionMessage::overLargeStringPKValueException(uint64_t length) {
return stringFormat("The maximum length of primary key strings is 262144 bytes. The input "
"string's length was {}.",
Expand Down
2 changes: 0 additions & 2 deletions src/include/common/exception/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ struct ExceptionMessage {
static std::string nonExistentPKException(const std::string& pkString);
static std::string invalidPKType(const std::string& type);
static std::string nullPKException();
// Bulk insertion.
static std::string notAllowCopyOnNonEmptyTableException();
// Long string.
static std::string overLargeStringPKValueException(uint64_t length);
static std::string overLargeStringValueException(uint64_t length);
Expand Down
3 changes: 0 additions & 3 deletions src/include/main/client_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ struct ClientConfig {
bool enableProgressBar;
// time before displaying progress bar
uint64_t showProgressAfter;
// If multi copy is enabled.
bool enableMultiCopy;
// Semantic for recursive pattern, can be either WALK, TRAIL, ACYCLIC
common::PathSemantic recursivePatternSemantic;
// Scale factor for recursive pattern cardinality estimation.
Expand All @@ -40,7 +38,6 @@ struct ClientConfigDefault {
static constexpr bool ENABLE_SEMI_MASK = true;
static constexpr bool ENABLE_PROGRESS_BAR = true;
static constexpr uint64_t SHOW_PROGRESS_AFTER = 1000;
static constexpr bool ENABLE_MULTI_COPY = false;
static constexpr common::PathSemantic RECURSIVE_PATTERN_SEMANTIC = common::PathSemantic::WALK;
static constexpr uint32_t RECURSIVE_PATTERN_FACTOR = 1;
};
Expand Down
12 changes: 0 additions & 12 deletions src/include/main/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,6 @@ struct FileSearchPathSetting {
}
};

struct EnableMultiCopySetting {
static constexpr const char* name = "enable_multi_copy";
static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::BOOL;
static void setContext(ClientContext* context, const common::Value& parameter) {
parameter.validateType(inputType);
context->getClientConfigUnsafe()->enableMultiCopy = parameter.getValue<bool>();
}
static common::Value getSetting(ClientContext* context) {
return common::Value(context->getClientConfig()->enableMultiCopy);
}
};

struct RecursivePatternSemanticSetting {
static constexpr const char* name = "recursive_pattern_semantic";
static constexpr const common::LogicalTypeID inputType = common::LogicalTypeID::STRING;
Expand Down
3 changes: 0 additions & 3 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ class BatchInsert : public Sink {

inline std::shared_ptr<BatchInsertSharedState> getSharedState() const { return sharedState; }

protected:
void checkIfTableIsEmpty();

protected:
std::unique_ptr<BatchInsertInfo> info;
std::shared_ptr<BatchInsertSharedState> sharedState;
Expand Down
30 changes: 25 additions & 5 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include "common/cast.h"
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "processor/operator/aggregate/hash_aggregate.h"
#include "processor/operator/call/in_query_call.h"
#include "processor/operator/persistent/batch_insert.h"
Expand All @@ -9,7 +11,11 @@
#include "storage/store/node_table.h"

namespace kuzu {
namespace transaction {
class Transaction;
};
namespace processor {
struct ExecutionContext;

struct NodeBatchInsertInfo final : public BatchInsertInfo {
std::vector<DataPos> columnPositions;
Expand Down Expand Up @@ -64,9 +70,6 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState {

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

void appendIncompleteNodeGroup(std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder);

inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; }

void calculateNumTuples();
Expand Down Expand Up @@ -109,12 +112,29 @@ class NodeBatchInsert final : public BatchInsert {
resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx,
static void writeAndResetNewNodeGroup(common::node_group_idx_t nodeGroupIdx,
std::optional<IndexBuilder>& indexBuilder, common::column_id_t pkColumnID,
storage::NodeTable* table, storage::ChunkedNodeGroup* nodeGroup);

// The node group will be reset so that the only values remaining are the ones which were not
// written
void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, ExecutionContext* context,
std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
std::optional<IndexBuilder>& indexBuilder);

private:
void copyToNodeGroup();
// Returns the number of nodes written from the group
uint64_t writeToExistingNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, std::optional<IndexBuilder>& indexBuilder,
common::column_id_t pkColumnID, storage::NodeTable* table,
storage::ChunkedNodeGroup* nodeGroup);

void appendIncompleteNodeGroup(std::unique_ptr<storage::ChunkedNodeGroup> localNodeGroup,
std::optional<IndexBuilder>& indexBuilder, ExecutionContext* context);
void clearToIndex(std::unique_ptr<storage::ChunkedNodeGroup>& nodeGroup,
common::offset_t startIndexInGroup);

void copyToNodeGroup(ExecutionContext* context);
};

} // namespace processor
Expand Down
12 changes: 3 additions & 9 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,9 @@ class HashIndex final : public OnDiskHashIndex {

using BufferKeyType =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string, T>::type;
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
size_t append(const IndexBuffer<BufferKeyType>& buffer) {
// Keep the same number of primary slots in the builder as we will eventually need when
// flushing to disk, so that we know each slot to write to
bulkInsertLocalStorage.reserve(
indexHeaderForWriteTrx->numEntries + bulkInsertLocalStorage.size() + buffer.size());
return bulkInsertLocalStorage.append(buffer);
}
// Appends the buffer to the index. Returns the number of values successfully inserted,
// or the index of the first value which cannot be inserted.
size_t append(const IndexBuffer<BufferKeyType>& buffer);

void prepareCommit() override;
void prepareRollback() override;
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/local_storage/local_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
namespace kuzu {
namespace storage {

class ChunkedNodeGroup;

class LocalNodeNG final : public LocalNodeGroup {
public:
LocalNodeNG(common::offset_t nodeGroupStartOffset,
Expand All @@ -24,6 +26,8 @@ class LocalNodeNG final : public LocalNodeGroup {

bool insert(std::vector<common::ValueVector*> nodeIDVectors,
std::vector<common::ValueVector*> propertyVectors) override;
bool insert(common::offset_t startNodeOffset, ChunkedNodeGroup* nodeGroup,
common::offset_t numValues);
bool update(std::vector<common::ValueVector*> nodeIDVectors, common::column_id_t columnID,
common::ValueVector* propertyVector) override;
bool delete_(common::ValueVector* nodeIDVector,
Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include <unordered_map>

#include "common/types/internal_id_t.h"
#include "common/vector/value_vector.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/chunked_node_group_collection.h"

namespace kuzu {
Expand Down Expand Up @@ -63,9 +65,11 @@ class LocalChunkedGroupCollection {
bool read(common::offset_t offset, common::column_id_t columnID,
common::ValueVector* outputVector, common::sel_t posInOutputVector);

ChunkedNodeGroup* getLastChunkedGroupAndAddNewGroupIfNecessary();
inline void append(common::offset_t offset, std::vector<common::ValueVector*> vectors) {
offsetToRowIdx[offset] = append(vectors);
}
void append(common::offset_t offset, ChunkedNodeGroup* nodeGroup, common::offset_t numValues);
// Only used for rel tables. Should be moved out later.
inline void append(common::offset_t nodeOffset, common::offset_t relOffset,
std::vector<common::ValueVector*> vectors) {
Expand Down Expand Up @@ -162,7 +166,7 @@ class LocalNodeGroup {
KU_ASSERT(columnID < updateChunks.size());
return updateChunks[columnID];
}
LocalChunkedGroupCollection& getInsesrtChunks() { return insertChunks; }
LocalChunkedGroupCollection& getInsertChunks() { return insertChunks; }

bool hasUpdatesOrDeletions() const;

Expand Down
6 changes: 5 additions & 1 deletion src/include/storage/store/chunked_node_group.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/column_data_format.h"
#include "common/constants.h"
#include "common/copy_constructors.h"
#include "storage/store/column_chunk.h"

Expand Down Expand Up @@ -40,7 +41,10 @@ class ChunkedNodeGroup {

uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::SelectionVector& selVector, uint64_t numValuesToAppend);
common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup);
// Appends up to numValuesToAppend from the other chunked node group, returning the actual
// number of values appended
common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup,
common::offset_t numValuesToAppend = common::StorageConstants::NODE_GROUP_SIZE);
void write(const std::vector<std::unique_ptr<ColumnChunk>>& data,
common::column_id_t offsetColumnID);
void write(const ChunkedNodeGroup& data, common::column_id_t offsetColumnID);
Expand Down
16 changes: 16 additions & 0 deletions src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

#include "common/assert.h"
#include "common/cast.h"
#include "common/types/types.h"
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_store_statistics.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table_data.h"
#include "storage/store/table.h"

namespace kuzu {
namespace transaction {
class Transaction;
};
namespace storage {
class LocalNodeTable;

struct NodeTableReadState : public TableReadState {
NodeTableReadState(const common::ValueVector& nodeIDVector,
Expand Down Expand Up @@ -105,12 +110,23 @@ class NodeTable final : public Table {

void append(ChunkedNodeGroup* nodeGroup) { tableData->append(nodeGroup); }

void prepareCommitNodeGroup(common::node_group_idx_t nodeGroupIdx,
transaction::Transaction* transaction, storage::LocalNodeNG* localNodeGroup);
void prepareCommit(transaction::Transaction* transaction, LocalTable* localTable) override;
void prepareCommit() override;
void prepareRollback(LocalTable* localTable) override;
void checkpointInMemory() override;
void rollbackInMemory() override;

inline common::node_group_idx_t getNumNodeGroups(transaction::Transaction* transaction) {
return tableData->getNumNodeGroups(transaction);
}

inline common::offset_t getNumTuplesInNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx) {
return tableData->getNumTuplesInNodeGroup(transaction, nodeGroupIdx);
}

private:
void updatePK(transaction::Transaction* transaction, common::column_id_t columnID,
const common::ValueVector& nodeIDVector, const common::ValueVector& pkVector);
Expand Down
10 changes: 10 additions & 0 deletions src/include/storage/store/node_table_data.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "storage/store/table_data.h"

namespace kuzu {
Expand Down Expand Up @@ -37,6 +39,8 @@ class NodeTableData final : public TableData {
// Flush the nodeGroup to disk and update metadataDAs.
void append(ChunkedNodeGroup* nodeGroup) override;

void prepareLocalNodeGroupToCommit(common::node_group_idx_t nodeGroupIdx,
transaction::Transaction* transaction, LocalNodeNG* localNodeGroup);
void prepareLocalTableToCommit(transaction::Transaction* transaction,
LocalTableData* localTable) override;

Expand All @@ -45,6 +49,12 @@ class NodeTableData final : public TableData {
return columns[0]->getNumNodeGroups(transaction);
}

common::offset_t getNumTuplesInNodeGroup(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx) const {
KU_ASSERT(nodeGroupIdx < getNumNodeGroups(transaction));
return columns[0]->getMetadata(nodeGroupIdx, transaction->getType()).numValues;
}

private:
void initializeColumnReadStates(transaction::Transaction* transaction,
common::offset_t startNodeOffset, kuzu::storage::NodeDataReadState& readState,
Expand Down
1 change: 0 additions & 1 deletion src/main/client_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ ClientContext::ClientContext(Database* database) : database{database} {
config.varLengthMaxDepth = ClientConfigDefault::VAR_LENGTH_MAX_DEPTH;
config.enableProgressBar = ClientConfigDefault::ENABLE_PROGRESS_BAR;
config.showProgressAfter = ClientConfigDefault::SHOW_PROGRESS_AFTER;
config.enableMultiCopy = ClientConfigDefault::ENABLE_MULTI_COPY;
config.recursivePatternSemantic = ClientConfigDefault::RECURSIVE_PATTERN_SEMANTIC;
config.recursivePatternCardinalityScaleFactor = ClientConfigDefault::RECURSIVE_PATTERN_FACTOR;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/db_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ static ConfigurationOption options[] = { // NOLINT(cert-err58-cpp):
GET_CONFIGURATION(VarLengthExtendMaxDepthSetting), GET_CONFIGURATION(EnableSemiMaskSetting),
GET_CONFIGURATION(HomeDirectorySetting), GET_CONFIGURATION(FileSearchPathSetting),
GET_CONFIGURATION(ProgressBarSetting), GET_CONFIGURATION(ProgressBarTimerSetting),
GET_CONFIGURATION(EnableMultiCopySetting), GET_CONFIGURATION(RecursivePatternSemanticSetting),
GET_CONFIGURATION(RecursivePatternSemanticSetting),
GET_CONFIGURATION(RecursivePatternFactorSetting)};

ConfigurationOption* DBConfig::getOptionByName(const std::string& optionName) {
Expand Down
1 change: 0 additions & 1 deletion src/processor/operator/persistent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ add_subdirectory(writer/parquet)

add_library(kuzu_processor_operator_persistent
OBJECT
batch_insert.cpp
node_batch_insert.cpp
copy_rdf.cpp
rel_batch_insert.cpp
Expand Down
18 changes: 0 additions & 18 deletions src/processor/operator/persistent/batch_insert.cpp

This file was deleted.

Loading

0 comments on commit b41b649

Please sign in to comment.