Skip to content

Commit

Permalink
Separate null bits in columns
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed May 18, 2023
1 parent 996b1e1 commit e7d3584
Show file tree
Hide file tree
Showing 40 changed files with 942 additions and 1,344 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.3.3 LANGUAGES CXX)
project(Kuzu VERSION 0.0.3.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
auto numBytesPerElement = dataVector->getNumBytesPerValue();
if (needResizeDataVector) {
auto buffer = std::make_unique<uint8_t[]>(capacity * numBytesPerElement);
memcpy(dataVector->valueBuffer.get(), buffer.get(), size * numBytesPerElement);
memcpy(buffer.get(), dataVector->valueBuffer.get(), size * numBytesPerElement);
dataVector->valueBuffer = std::move(buffer);
dataVector->nullMask->resize(capacity);
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using frame_group_idx_t = page_group_idx_t;
using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using column_id_t = property_id_t;
constexpr column_id_t INVALID_COLUMN_ID = INVALID_PROPERTY_ID;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/copy/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace processor {
class Copy : public PhysicalOperator {
public:
Copy(PhysicalOperatorType operatorType, catalog::Catalog* catalog,
common::CopyDescription copyDescription, common::table_id_t tableID, storage::WAL* wal,
uint32_t id, const std::string& paramsString)
const common::CopyDescription& copyDescription, common::table_id_t tableID,
storage::WAL* wal, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog},
copyDescription{std::move(copyDescription)}, tableID{tableID}, wal{wal} {}
copyDescription{copyDescription}, tableID{tableID}, wal{wal} {}

inline bool isSource() const override { return true; }

Expand Down
11 changes: 6 additions & 5 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ namespace processor {

class CopyNode : public Copy {
public:
CopyNode(catalog::Catalog* catalog, common::CopyDescription copyDescription,
common::table_id_t tableID, storage::WAL* wal,
CopyNode(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::NodeTable* table, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics, storage::RelsStore& relsStore,
uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_NODE, catalog, std::move(copyDescription), tableID, wal,
: Copy{PhysicalOperatorType::COPY_NODE, catalog, copyDescription, table->getTableID(), wal,
id, paramsString},
nodesStatistics{nodesStatistics}, relsStore{relsStore} {}
table{table}, nodesStatistics{nodesStatistics}, relsStore{relsStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(
catalog, copyDescription, tableID, wal, nodesStatistics, relsStore, id, paramsString);
catalog, copyDescription, table, wal, nodesStatistics, relsStore, id, paramsString);
}

protected:
Expand All @@ -31,6 +31,7 @@ class CopyNode : public Copy {
}

private:
storage::NodeTable* table;
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStore& relsStore;
};
Expand Down
13 changes: 7 additions & 6 deletions src/include/processor/operator/copy/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ namespace processor {

class CopyRel : public Copy {
public:
CopyRel(catalog::Catalog* catalog, common::CopyDescription copyDescription,
common::table_id_t tableID, storage::WAL* wal, storage::RelsStatistics* relsStatistics,
CopyRel(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::RelTable* table, storage::WAL* wal, storage::RelsStatistics* relsStatistics,
storage::NodesStore& nodesStore, uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_REL, catalog, std::move(copyDescription), tableID, wal,
id, paramsString},
relsStatistics{relsStatistics}, nodesStore{nodesStore} {}
: Copy{PhysicalOperatorType::COPY_REL, catalog, copyDescription, table->getRelTableID(),
wal, id, paramsString},
table{table}, relsStatistics{relsStatistics}, nodesStore{nodesStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyRel>(
catalog, copyDescription, tableID, wal, relsStatistics, nodesStore, id, paramsString);
catalog, copyDescription, table, wal, relsStatistics, nodesStore, id, paramsString);
}

protected:
Expand All @@ -31,6 +31,7 @@ class CopyRel : public Copy {
}

private:
storage::RelTable* table;
storage::RelsStatistics* relsStatistics;
storage::NodesStore& nodesStore;
};
Expand Down
97 changes: 72 additions & 25 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_node_column.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/node_table.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
Expand Down Expand Up @@ -88,13 +89,20 @@ class CSVNodeCopySharedState : public NodeCopySharedState {

class NodeCopier {
public:
NodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::column_id_t pkColumnID);

// For clone.
NodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: sharedState{std::move(sharedState)}, pkIndex{pkIndex}, copyDesc{copyDesc},
columns{std::move(columns)}, pkColumnID{pkColumnID} {
overflowCursors.resize(this->columns.size());
const common::CopyDescription& copyDesc, NodeTable* table, common::property_id_t pkColumnID,
std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: sharedState{std::move(sharedState)}, pkIndex{pkIndex}, copyDesc{copyDesc}, table{table},
pkColumnID{pkColumnID}, columns{std::move(columns)}, properties{std::move(properties)} {
overflowCursors.resize(this->properties.size());
}

virtual ~NodeCopier() = default;

void execute(processor::ExecutionContext* executionContext);
Expand All @@ -103,10 +111,14 @@ class NodeCopier {
if (pkIndex) {
pkIndex->flush();
}
for (auto& column : columns) {
column->saveToFile();
}
}

virtual std::unique_ptr<NodeCopier> clone() const {
return std::make_unique<NodeCopier>(sharedState, pkIndex, copyDesc, columns, pkColumnID);
return std::make_unique<NodeCopier>(
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
}

protected:
Expand All @@ -115,14 +127,14 @@ class NodeCopier {
}

void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, common::column_id_t columnID,
arrow::Array& arrowArray, common::offset_t startNodeOffset,
common::CopyDescription& copyDescription);
arrow::Array& arrowArray, common::CopyDescription& copyDescription);
void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, common::offset_t startOffset, uint64_t numValues);
common::offset_t startOffset, uint64_t numValues);

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::column_id_t columnToFlush = common::INVALID_COLUMN_ID);

template<typename T, typename... Args>
void appendToPKIndex(
Expand All @@ -134,9 +146,12 @@ class NodeCopier {
std::shared_ptr<NodeCopySharedState> sharedState;
PrimaryKeyIndexBuilder* pkIndex;
common::CopyDescription copyDesc;
std::vector<InMemNodeColumn*> columns;
common::column_id_t pkColumnID;
NodeTable* table;
// The properties to be copied into. Each property corresponds to a column.
std::vector<catalog::Property> properties;
std::vector<std::shared_ptr<InMemColumn>> columns;
std::vector<PageByteCursor> overflowCursors;
common::column_id_t pkColumnID;
};

template<>
Expand All @@ -149,14 +164,23 @@ void NodeCopier::appendToPKIndex<common::ku_string_t, storage::InMemOverflowFile

class CSVNodeCopier : public NodeCopier {
public:
CSVNodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::property_id_t pkColumnID)
: NodeCopier{
directory, std::move(sharedState), pkIndex, copyDesc, schema, table, pkColumnID} {}

// For clone.
CSVNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}
const common::CopyDescription& copyDesc, NodeTable* table, common::column_id_t pkColumnID,
std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<CSVNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
}

protected:
Expand All @@ -165,14 +189,24 @@ class CSVNodeCopier : public NodeCopier {

class ParquetNodeCopier : public NodeCopier {
public:
ParquetNodeCopier(const std::string& directory,
std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema, NodeTable* table,
common::column_id_t pkColumnID)
: NodeCopier{
directory, std::move(sharedState), pkIndex, copyDesc, schema, table, pkColumnID} {}

// Clone.
ParquetNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
std::vector<InMemNodeColumn*> columns, common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc, NodeTable* table,
common::column_id_t pkColumnID, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<ParquetNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
}

protected:
Expand All @@ -185,20 +219,33 @@ class ParquetNodeCopier : public NodeCopier {

class NPYNodeCopier : public NodeCopier {
public:
NPYNodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::column_id_t pkColumnID,
common::column_id_t columnToCopy)
: NodeCopier{directory, std::move(sharedState), pkIndex, copyDesc, schema, table,
pkColumnID},
columnToCopy{columnToCopy} {}

// Clone.
NPYNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, std::vector<InMemNodeColumn*> columns,
common::column_id_t pkColumnID)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, std::move(columns), pkColumnID} {}
const common::CopyDescription& copyDesc, NodeTable* table, common::column_id_t pkColumnID,
common::column_id_t columnToCopy, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)},
columnToCopy{columnToCopy} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<NPYNodeCopier>(
this->sharedState, this->pkIndex, this->copyDesc, this->columns, this->pkColumnID);
sharedState, pkIndex, copyDesc, table, pkColumnID, columnToCopy, columns, properties);
}

protected:
void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) override;

private:
common::column_id_t columnToCopy;
std::unique_ptr<NpyReader> reader;
};

Expand Down
23 changes: 14 additions & 9 deletions src/include/storage/copier/node_copy_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

#include "common/string_utils.h"
#include "storage/copier/node_copier.h"
#include "storage/in_mem_storage_structure/in_mem_node_column.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"

namespace kuzu {
Expand All @@ -12,24 +13,28 @@ class NodeCopyExecutor : public TableCopyExecutor {

public:
NodeCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, storage::NodeTable* table,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: TableCopyExecutor{copyDescription, std::move(outputDirectory), taskScheduler, catalog,
tableID, nodesStatisticsAndDeletedIDs} {}
table->getTableID(), nodesStatisticsAndDeletedIDs},
table{table} {}

protected:
void initializeColumnsAndLists() override;
void initializeColumnsAndLists() override {
// DO NOTHING
}

void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

// TODO(Guodong): do we need this? should go to finalize.
void saveToFile() override;

std::unordered_map<common::property_id_t, common::column_id_t> propertyIDToColumnIDMap;
std::vector<std::unique_ptr<InMemNodeColumn>> columns;
void saveToFile() override {
// DO NOTHING
}

private:
void populateColumns(processor::ExecutionContext* executionContext);

private:
storage::NodeTable* table;
};

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/copier/npy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class NpyReader {

size_t getNumElementsPerRow() const;

void* getPointerToRow(size_t row) const;
uint8_t* getPointerToRow(size_t row) const;

inline std::string getFilePath() const { return filePath; }

Expand Down
17 changes: 11 additions & 6 deletions src/include/storage/copier/rel_copy_executor.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once

#include "storage/copier/table_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_lists.h"
#include "storage/index/hash_index.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_statistics.h"
#include "table_copy_executor.h"

namespace kuzu {
namespace storage {
Expand All @@ -17,8 +19,7 @@ class RelCopyExecutor : public TableCopyExecutor {
public:
RelCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
storage::NodesStore& nodesStore, common::table_id_t tableID,
RelsStatistics* relsStatistics);
storage::NodesStore& nodesStore, storage::RelTable* table, RelsStatistics* relsStatistics);

private:
static std::string getTaskTypeName(PopulateTaskType populateTaskType);
Expand Down Expand Up @@ -107,7 +108,7 @@ class RelCopyExecutor : public TableCopyExecutor {
const std::string& filePath);

static void sortOverflowValuesOfPropertyColumnTask(const common::DataType& dataType,
common::offset_t offsetStart, common::offset_t offsetEnd, InMemColumn* propertyColumn,
common::offset_t offsetStart, common::offset_t offsetEnd, InMemColumnChunk* propertyColumn,
InMemOverflowFile* unorderedInMemOverflowFile, InMemOverflowFile* orderedInMemOverflowFile);

static void sortOverflowValuesOfPropertyListsTask(const common::DataType& dataType,
Expand All @@ -116,7 +117,7 @@ class RelCopyExecutor : public TableCopyExecutor {
InMemOverflowFile* orderedInMemOverflowFile);

static void putValueIntoColumns(uint64_t propertyIdx,
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>>>&
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumnChunk>>>&
directionTablePropertyColumns,
const std::vector<common::nodeID_t>& nodeIDs, uint8_t* val);

Expand Down Expand Up @@ -144,12 +145,16 @@ class RelCopyExecutor : public TableCopyExecutor {

private:
storage::NodesStore& nodesStore;
storage::RelTable* table;
const std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerTable;
std::unique_ptr<transaction::Transaction> dummyReadOnlyTrx;
std::map<common::table_id_t, PrimaryKeyIndex*> pkIndexes;
std::atomic<uint64_t> numRels = 0;
std::vector<std::unique_ptr<atomic_uint64_vec_t>> listSizesPerDirection{2};
std::vector<std::unique_ptr<InMemAdjColumn>> adjColumnsPerDirection{2};
std::vector<std::unique_ptr<InMemColumnChunk>> adjColumnChunksPerDirection{2};
std::vector<std::unique_ptr<InMemColumn>> adjColumnsPerDirection{2};
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumnChunk>>>
propertyColumnChunksPerDirection{2};
std::vector<std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>>>
propertyColumnsPerDirection{2};
std::vector<std::unique_ptr<InMemAdjLists>> adjListsPerDirection{2};
Expand Down
Loading

0 comments on commit e7d3584

Please sign in to comment.