Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate null bits from data in columns #1545

Merged
merged 1 commit into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.3.3 LANGUAGES CXX)
project(Kuzu VERSION 0.0.3.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
auto numBytesPerElement = dataVector->getNumBytesPerValue();
if (needResizeDataVector) {
auto buffer = std::make_unique<uint8_t[]>(capacity * numBytesPerElement);
memcpy(dataVector->valueBuffer.get(), buffer.get(), size * numBytesPerElement);
memcpy(buffer.get(), dataVector->valueBuffer.get(), size * numBytesPerElement);
dataVector->valueBuffer = std::move(buffer);
dataVector->nullMask->resize(capacity);
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using frame_group_idx_t = page_group_idx_t;
using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using column_id_t = property_id_t;
constexpr column_id_t INVALID_COLUMN_ID = INVALID_PROPERTY_ID;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/copy/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace processor {
class Copy : public PhysicalOperator {
public:
Copy(PhysicalOperatorType operatorType, catalog::Catalog* catalog,
common::CopyDescription copyDescription, common::table_id_t tableID, storage::WAL* wal,
uint32_t id, const std::string& paramsString)
const common::CopyDescription& copyDescription, common::table_id_t tableID,
storage::WAL* wal, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog},
copyDescription{std::move(copyDescription)}, tableID{tableID}, wal{wal} {}
copyDescription{copyDescription}, tableID{tableID}, wal{wal} {}

inline bool isSource() const override { return true; }

Expand Down
11 changes: 6 additions & 5 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ namespace processor {

class CopyNode : public Copy {
public:
CopyNode(catalog::Catalog* catalog, common::CopyDescription copyDescription,
common::table_id_t tableID, storage::WAL* wal,
CopyNode(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::NodeTable* table, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics, storage::RelsStore& relsStore,
uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_NODE, catalog, std::move(copyDescription), tableID, wal,
: Copy{PhysicalOperatorType::COPY_NODE, catalog, copyDescription, table->getTableID(), wal,
id, paramsString},
nodesStatistics{nodesStatistics}, relsStore{relsStore} {}
table{table}, nodesStatistics{nodesStatistics}, relsStore{relsStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(
catalog, copyDescription, tableID, wal, nodesStatistics, relsStore, id, paramsString);
catalog, copyDescription, table, wal, nodesStatistics, relsStore, id, paramsString);
}

protected:
Expand All @@ -31,6 +31,7 @@ class CopyNode : public Copy {
}

private:
storage::NodeTable* table;
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStore& relsStore;
};
Expand Down
13 changes: 7 additions & 6 deletions src/include/processor/operator/copy/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ namespace processor {

class CopyRel : public Copy {
public:
CopyRel(catalog::Catalog* catalog, common::CopyDescription copyDescription,
common::table_id_t tableID, storage::WAL* wal, storage::RelsStatistics* relsStatistics,
CopyRel(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::RelTable* table, storage::WAL* wal, storage::RelsStatistics* relsStatistics,
storage::NodesStore& nodesStore, uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_REL, catalog, std::move(copyDescription), tableID, wal,
id, paramsString},
relsStatistics{relsStatistics}, nodesStore{nodesStore} {}
: Copy{PhysicalOperatorType::COPY_REL, catalog, copyDescription, table->getRelTableID(),
wal, id, paramsString},
table{table}, relsStatistics{relsStatistics}, nodesStore{nodesStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRel>(
catalog, copyDescription, tableID, wal, relsStatistics, nodesStore, id, paramsString);
catalog, copyDescription, table, wal, relsStatistics, nodesStore, id, paramsString);
}

protected:
Expand All @@ -31,6 +31,7 @@ class CopyRel : public Copy {
}

private:
storage::RelTable* table;
storage::RelsStatistics* relsStatistics;
storage::NodesStore& nodesStore;
};
Expand Down
97 changes: 72 additions & 25 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_node_column.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/node_table.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
Expand Down Expand Up @@ -88,13 +89,20 @@ class CSVNodeCopySharedState : public NodeCopySharedState {

class NodeCopier {
public:
NodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::column_id_t pkColumnID);

// For clone.
NodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: sharedState{std::move(sharedState)}, pkIndex{pkIndex}, copyDesc{copyDesc},
columns{std::move(columns)}, pkColumnID{pkColumnID} {
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
overflowCursors.resize(this->columns.size());
const common::CopyDescription& copyDesc, NodeTable* table, common::property_id_t pkColumnID,
std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: sharedState{std::move(sharedState)}, pkIndex{pkIndex}, copyDesc{copyDesc}, table{table},
pkColumnID{pkColumnID}, columns{std::move(columns)}, properties{std::move(properties)} {
overflowCursors.resize(this->properties.size());
}

virtual ~NodeCopier() = default;

void execute(processor::ExecutionContext* executionContext);
Expand All @@ -103,10 +111,14 @@ class NodeCopier {
if (pkIndex) {
pkIndex->flush();
}
for (auto& column : columns) {
column->saveToFile();
}
}

virtual std::unique_ptr<NodeCopier> clone() const {
return std::make_unique<NodeCopier>(sharedState, pkIndex, copyDesc, columns, pkColumnID);
return std::make_unique<NodeCopier>(
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
}

protected:
Expand All @@ -115,14 +127,14 @@ class NodeCopier {
}

void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, common::column_id_t columnID,
arrow::Array& arrowArray, common::offset_t startNodeOffset,
common::CopyDescription& copyDescription);
arrow::Array& arrowArray, common::CopyDescription& copyDescription);
void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, common::offset_t startOffset, uint64_t numValues);
common::offset_t startOffset, uint64_t numValues);

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::column_id_t columnToFlush = common::INVALID_COLUMN_ID);

template<typename T, typename... Args>
void appendToPKIndex(
Expand All @@ -134,9 +146,12 @@ class NodeCopier {
std::shared_ptr<NodeCopySharedState> sharedState;
PrimaryKeyIndexBuilder* pkIndex;
common::CopyDescription copyDesc;
std::vector<InMemNodeColumn*> columns;
common::column_id_t pkColumnID;
NodeTable* table;
// The properties to be copied into. Each property corresponds to a column.
std::vector<catalog::Property> properties;
std::vector<std::shared_ptr<InMemColumn>> columns;
std::vector<PageByteCursor> overflowCursors;
common::column_id_t pkColumnID;
};

template<>
Expand All @@ -149,14 +164,23 @@ void NodeCopier::appendToPKIndex<common::ku_string_t, storage::InMemOverflowFile

class CSVNodeCopier : public NodeCopier {
public:
CSVNodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::property_id_t pkColumnID)
: NodeCopier{
directory, std::move(sharedState), pkIndex, copyDesc, schema, table, pkColumnID} {}

// For clone.
CSVNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}
const common::CopyDescription& copyDesc, NodeTable* table, common::column_id_t pkColumnID,
std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<CSVNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
}

protected:
Expand All @@ -165,14 +189,24 @@ class CSVNodeCopier : public NodeCopier {

class ParquetNodeCopier : public NodeCopier {
public:
ParquetNodeCopier(const std::string& directory,
std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema, NodeTable* table,
common::column_id_t pkColumnID)
: NodeCopier{
directory, std::move(sharedState), pkIndex, copyDesc, schema, table, pkColumnID} {}

// Clone.
ParquetNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
std::vector<InMemNodeColumn*> columns, common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc, NodeTable* table,
common::column_id_t pkColumnID, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<ParquetNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
}

protected:
Expand All @@ -185,20 +219,33 @@ class ParquetNodeCopier : public NodeCopier {

class NPYNodeCopier : public NodeCopier {
public:
NPYNodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::column_id_t pkColumnID,
common::column_id_t columnToCopy)
: NodeCopier{directory, std::move(sharedState), pkIndex, copyDesc, schema, table,
pkColumnID},
columnToCopy{columnToCopy} {}

// Clone.
NPYNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}
const common::CopyDescription& copyDesc, NodeTable* table, common::column_id_t pkColumnID,
common::column_id_t columnToCopy, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)},
columnToCopy{columnToCopy} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<NPYNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
sharedState, pkIndex, copyDesc, table, pkColumnID, columnToCopy, columns, properties);
}

protected:
void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) override;

private:
common::column_id_t columnToCopy;
std::unique_ptr<NpyReader> reader;
};

Expand Down
23 changes: 14 additions & 9 deletions src/include/storage/copier/node_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

#include "common/string_utils.h"
#include "storage/copier/node_copier.h"
#include "storage/in_mem_storage_structure/in_mem_node_column.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"

namespace kuzu {
Expand All @@ -12,24 +13,28 @@ class NodeCopyExecutor : public TableCopyExecutor {

public:
NodeCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, storage::NodeTable* table,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: TableCopyExecutor{copyDescription, std::move(outputDirectory), taskScheduler, catalog,
tableID, nodesStatisticsAndDeletedIDs} {}
table->getTableID(), nodesStatisticsAndDeletedIDs},
table{table} {}

protected:
void initializeColumnsAndLists() override;
void initializeColumnsAndLists() override {
// DO NOTHING
}

void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

// TODO(Guodong): do we need this? should go to finalize.
void saveToFile() override;

std::unordered_map<common::property_id_t, common::column_id_t> propertyIDToColumnIDMap;
std::vector<std::unique_ptr<InMemNodeColumn>> columns;
void saveToFile() override {
// DO NOTHING
}

private:
void populateColumns(processor::ExecutionContext* executionContext);

private:
storage::NodeTable* table;
};

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/copier/npy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class NpyReader {

size_t getNumElementsPerRow() const;

void* getPointerToRow(size_t row) const;
uint8_t* getPointerToRow(size_t row) const;

inline std::string getFilePath() const { return filePath; }

Expand Down
17 changes: 11 additions & 6 deletions src/include/storage/copier/rel_copy_executor.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once

#include "storage/copier/table_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_lists.h"
#include "storage/index/hash_index.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_statistics.h"
#include "table_copy_executor.h"

namespace kuzu {
namespace storage {
Expand All @@ -17,8 +19,7 @@ class RelCopyExecutor : public TableCopyExecutor {
public:
RelCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
storage::NodesStore& nodesStore, common::table_id_t tableID,
RelsStatistics* relsStatistics);
storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics);

private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);
Expand Down Expand Up @@ -107,7 +108,7 @@ class RelCopyExecutor : public TableCopyExecutor {
const std::string& filePath);

static void sortOverflowValuesOfPropertyColumnTask(const common::DataType& dataType,
common::offset_t offsetStart, common::offset_t offsetEnd, InMemColumn* propertyColumn,
common::offset_t offsetStart, common::offset_t offsetEnd, InMemColumnChunk* propertyColumn,
InMemOverflowFile* unorderedInMemOverflowFile, InMemOverflowFile* orderedInMemOverflowFile);

static void sortOverflowValuesOfPropertyListsTask(const common::DataType& dataType,
Expand All @@ -116,7 +117,7 @@ class RelCopyExecutor : public TableCopyExecutor {
InMemOverflowFile* orderedInMemOverflowFile);

static void putValueIntoColumns(uint64_t propertyIdx,
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>>>&
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumnChunk>>>&
directionTablePropertyColumns,
const std::vector<common::nodeID_t>& nodeIDs, uint8_t* val);

Expand Down Expand Up @@ -144,12 +145,16 @@ class RelCopyExecutor : public TableCopyExecutor {

private:
storage::NodesStore& nodesStore;
storage::RelTable* table;
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, PrimaryKeyIndex*> pkIndexes;
std::atomic<uint64_t> numRels = 0;
std::vector<std::unique_ptr<atomic_uint64_vec_t>> listSizesPerDirection{2};
std::vector<std::unique_ptr<InMemAdjColumn>> adjColumnsPerDirection{2};
std::vector<std::unique_ptr<InMemColumnChunk>> adjColumnChunksPerDirection{2};
std::vector<std::unique_ptr<InMemColumn>> adjColumnsPerDirection{2};
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumnChunk>>>
propertyColumnChunksPerDirection{2};
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>>>
propertyColumnsPerDirection{2};
std::vector<std::unique_ptr<InMemAdjLists>> adjListsPerDirection{2};
Expand Down
Loading