Skip to content

Commit

Permalink
node group-based node table storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Aug 2, 2023
1 parent 416d392 commit d03b06a
Show file tree
Hide file tree
Showing 120 changed files with 3,764 additions and 1,566 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.6.3 LANGUAGES CXX)
project(Kuzu VERSION 0.0.6.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
7 changes: 3 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
if (expectedFileType == CopyDescription::FileType::UNKNOWN &&
actualFileType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
}
if (expectedFileType == common::CopyDescription::FileType::NPY &&
actualFileType != expectedFileType) {
if (expectedFileType == CopyDescription::FileType::NPY && actualFileType != expectedFileType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
if (actualFileType == CopyDescription::FileType::NPY) {
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFil
table_id_t tableID;
for (auto i = 0u; i < numTables; i++) {
SerDeser::deserializeValue(tableID, fileInfo.get(), offset);
tableSchemas.emplace(tableID, TableSchema::deserialize(fileInfo.get(), offset));
tableSchemas[tableID] = TableSchema::deserialize(fileInfo.get(), offset);
}
for (auto& [tableID_, tableSchema] : tableSchemas) {
tableNameToIDMap.emplace(tableSchema->tableName, tableID_);
tableNameToIDMap[tableSchema->tableName] = tableID_;
}
SerDeser::deserializeValue(nextTableID, fileInfo.get(), offset);
SerDeser::deserializeUnorderedMap(macros, fileInfo.get(), offset);
Expand Down
20 changes: 19 additions & 1 deletion src/catalog/property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,27 @@ using namespace kuzu::common;
namespace kuzu {
namespace catalog {

void MetadataDAHInfo::serialize(FileInfo* fileInfo, uint64_t& offset) const {
SerDeser::serializeValue(dataDAHPageIdx, fileInfo, offset);
SerDeser::serializeValue(nullDAHPageIdx, fileInfo, offset);
SerDeser::serializeVectorOfPtrs(childrenInfos, fileInfo, offset);
}

std::unique_ptr<MetadataDAHInfo> MetadataDAHInfo::deserialize(
FileInfo* fileInfo, uint64_t& offset) {
auto metadataDAHInfo = std::make_unique<MetadataDAHInfo>();
SerDeser::deserializeValue(metadataDAHInfo->dataDAHPageIdx, fileInfo, offset);
SerDeser::deserializeValue(metadataDAHInfo->nullDAHPageIdx, fileInfo, offset);
SerDeser::deserializeVectorOfPtrs(metadataDAHInfo->childrenInfos, fileInfo, offset);
return metadataDAHInfo;
}

void Property::serialize(FileInfo* fileInfo, uint64_t& offset) const {
SerDeser::serializeValue(name, fileInfo, offset);
dataType->serialize(fileInfo, offset);
SerDeser::serializeValue(propertyID, fileInfo, offset);
SerDeser::serializeValue(tableID, fileInfo, offset);
metadataDAHInfo->serialize(fileInfo, offset);
}

std::unique_ptr<Property> Property::deserialize(FileInfo* fileInfo, uint64_t& offset) {
Expand All @@ -22,7 +38,9 @@ std::unique_ptr<Property> Property::deserialize(FileInfo* fileInfo, uint64_t& of
auto dataType = LogicalType::deserialize(fileInfo, offset);
SerDeser::deserializeValue(propertyID, fileInfo, offset);
SerDeser::deserializeValue(tableID, fileInfo, offset);
return std::make_unique<Property>(name, std::move(dataType), propertyID, tableID);
auto metadataDAHInfo = MetadataDAHInfo::deserialize(fileInfo, offset);
return std::make_unique<Property>(
name, std::move(dataType), propertyID, tableID, std::move(metadataDAHInfo));
}

std::vector<std::unique_ptr<catalog::Property>> Property::copyProperties(
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/table_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "common/string_utils.h"

using namespace kuzu::common;
using namespace kuzu::storage;

namespace kuzu {
namespace catalog {
Expand Down Expand Up @@ -124,7 +125,7 @@ std::unique_ptr<TableSchema> TableSchema::deserialize(FileInfo* fileInfo, uint64
result = RelTableSchema::deserialize(fileInfo, offset);
} break;
default: {
throw common::NotImplementedException{"TableSchema::deserialize"};
throw NotImplementedException{"TableSchema::deserialize"};
}
}
result->tableName = tableName;
Expand Down
6 changes: 0 additions & 6 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
#endif
}

void FileUtils::createFileWithSize(const std::string& path, uint64_t size) {
auto fileInfo = common::FileUtils::openFile(path, O_WRONLY | O_CREAT);
common::FileUtils::truncateFileToSize(fileInfo.get(), size);
fileInfo.reset();
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = fileInfo->getFileSize();
Expand Down
9 changes: 9 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/null_buffer.h"
#include "common/vector/auxiliary_buffer.h"
#include <arrow/array.h>

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -181,6 +182,14 @@ void ArrowColumnVector::setArrowColumn(ValueVector* vector, std::shared_ptr<arro
arrowColumnBuffer->column = std::move(column);
}

void ArrowColumnVector::slice(ValueVector* vector, offset_t offset) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
auto arrowColumn = arrowColumnBuffer->column;
auto slicedColumn = arrowColumn->Slice((int64_t)offset);
setArrowColumn(vector, slicedColumn);
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
46 changes: 43 additions & 3 deletions src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@
namespace kuzu {
namespace catalog {

// DAH is the abbreviation for Disk Array Header.
class MetadataDAHInfo {
public:
MetadataDAHInfo() : MetadataDAHInfo{common::INVALID_PAGE_IDX, common::INVALID_PAGE_IDX} {}
MetadataDAHInfo(common::page_idx_t dataDAHPageIdx)
: MetadataDAHInfo{dataDAHPageIdx, common::INVALID_PAGE_IDX} {}
MetadataDAHInfo(common::page_idx_t dataDAHPageIdx, common::page_idx_t nullDAHPageIdx)
: dataDAHPageIdx{dataDAHPageIdx}, nullDAHPageIdx{nullDAHPageIdx} {}

inline std::unique_ptr<MetadataDAHInfo> copy() {
auto result = std::make_unique<MetadataDAHInfo>(dataDAHPageIdx, nullDAHPageIdx);
result->childrenInfos.resize(childrenInfos.size());
for (size_t i = 0; i < childrenInfos.size(); ++i) {
result->childrenInfos[i] = childrenInfos[i]->copy();
}
return result;
}

void serialize(common::FileInfo* fileInfo, uint64_t& offset) const;
static std::unique_ptr<MetadataDAHInfo> deserialize(
common::FileInfo* fileInfo, uint64_t& offset);

common::page_idx_t dataDAHPageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t nullDAHPageIdx = common::INVALID_PAGE_IDX;
std::vector<std::unique_ptr<MetadataDAHInfo>> childrenInfos;
};

class Property {
public:
static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_FROM_";
Expand All @@ -16,9 +43,14 @@ class Property {
common::INVALID_TABLE_ID} {}

Property(std::string name, std::unique_ptr<common::LogicalType> dataType,
common::property_id_t propertyID, common::table_id_t tableID)
common::property_id_t propertyID, common::table_id_t tableID,
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo = nullptr)
: name{std::move(name)}, dataType{std::move(dataType)},
propertyID{propertyID}, tableID{tableID} {}
propertyID{propertyID}, tableID{tableID}, metadataDAHInfo{std::move(metadataDAHInfo)} {
if (this->metadataDAHInfo == nullptr) {
this->metadataDAHInfo = std::make_unique<MetadataDAHInfo>();
}
}

inline std::string getName() const { return name; }

Expand All @@ -28,10 +60,16 @@ class Property {

inline common::table_id_t getTableID() const { return tableID; }

inline MetadataDAHInfo* getMetadataDAHInfo() const { return metadataDAHInfo.get(); }

inline void setPropertyID(common::property_id_t propertyID_) { this->propertyID = propertyID_; }

inline void setTableID(common::table_id_t tableID_) { this->tableID = tableID_; }

inline void setMetadataDAHInfo(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo_) {
this->metadataDAHInfo = std::move(metadataDAHInfo_);
}

inline void rename(std::string newName) { this->name = std::move(newName); }

void serialize(common::FileInfo* fileInfo, uint64_t& offset) const;
Expand All @@ -41,14 +79,16 @@ class Property {
const std::vector<std::unique_ptr<catalog::Property>>& propertiesToCopy);

inline std::unique_ptr<Property> copy() const {
return std::make_unique<Property>(name, dataType->copy(), propertyID, tableID);
return std::make_unique<Property>(
name, dataType->copy(), propertyID, tableID, metadataDAHInfo->copy());
}

private:
std::string name;
std::unique_ptr<common::LogicalType> dataType;
common::property_id_t propertyID;
common::table_id_t tableID;
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo;
};

} // namespace catalog
Expand Down
4 changes: 4 additions & 0 deletions src/include/catalog/table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include "property.h"

namespace kuzu {
namespace storage {
class BMFileHandle;
}

namespace catalog {

enum class TableType : uint8_t { NODE = 0, REL = 1, INVALID = 2 };
Expand Down
15 changes: 7 additions & 8 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,18 @@ struct StorageConstants {
"nodes.statistics_and_deleted.ids.wal";
static constexpr char RELS_METADATA_FILE_NAME[] = "rels.statistics";
static constexpr char RELS_METADATA_FILE_NAME_FOR_WAL[] = "rels.statistics.wal";
static constexpr char CATALOG_FILE_NAME[] = "catalog.bin";
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.bin.wal";
static constexpr char CATALOG_FILE_NAME[] = "catalog.kz";
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.kz.wal";
static constexpr char DATA_FILE_NAME[] = "data.kz";
static constexpr char METADATA_FILE_NAME[] = "metadata.kz";

// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
static constexpr uint64_t PAGE_GROUP_SIZE = (uint64_t)1 << PAGE_GROUP_SIZE_LOG2;
static constexpr uint64_t PAGE_IDX_IN_GROUP_MASK = ((uint64_t)1 << PAGE_GROUP_SIZE_LOG2) - 1;

static constexpr uint64_t NODE_GROUP_SIZE_LOG2 = 17; // 64 * 2048 nodes per group
static constexpr uint64_t NODE_GROUP_SIZE = (uint64_t)1 << NODE_GROUP_SIZE_LOG2;
};

struct ListsMetadataConstants {
Expand All @@ -109,12 +114,6 @@ struct CopyConstants {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of tasks to be assigned in a batch when reading files.
static constexpr uint64_t NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH = 200;

// Lower bound for number of incomplete tasks in copier to trigger scheduling a new batch.
static constexpr uint64_t MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE = 50;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

Expand Down
1 change: 1 addition & 0 deletions src/include/common/data_chunk/data_chunk_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DataChunkState {
selVector->selectedSize = size;
}
inline bool isFlat() const { return currIdx != -1; }
inline void setToUnflat() { currIdx = -1; }
inline uint64_t getNumSelectedValues() const { return isFlat() ? 1 : selVector->selectedSize; }

public:
Expand Down
1 change: 0 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);

static void createFileWithSize(const std::string& path, uint64_t size);
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ constexpr struct_field_idx_t INVALID_STRUCT_FIELD_IDX = UINT64_MAX;
using row_idx_t = uint64_t;
constexpr row_idx_t INVALID_ROW_IDX = UINT64_MAX;
constexpr uint32_t UNDEFINED_CAST_COST = UINT32_MAX;
using node_group_idx_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ class ArrowColumnVector {
}

static void setArrowColumn(ValueVector* vector, std::shared_ptr<arrow::Array> column);

// Slice the arrow column vector from the given offset to the end.
static void slice(ValueVector* vector, offset_t offset);
};

class NodeIDVector {
Expand Down
4 changes: 2 additions & 2 deletions src/include/main/storage_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace kuzu {
namespace storage {
class Column;
class NodeColumn;
}

namespace main {
Expand All @@ -26,7 +26,7 @@ class StorageDriver {

private:
void scanColumn(
storage::Column* column, common::offset_t* offsets, size_t size, uint8_t* result);
storage::NodeColumn* column, common::offset_t* offsets, size_t size, uint8_t* result);

private:
catalog::Catalog* catalog;
Expand Down
22 changes: 4 additions & 18 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ class LogicalCopy : public LogicalOperator {
public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector dataColumnExpressions,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
dataColumnExpressions{std::move(dataColumnExpressions)}, rowIdxExpression{std::move(
rowIdxExpression)},
filePathExpression{std::move(filePathExpression)}, outputExpression{
std::move(outputExpression)} {}
dataColumnExpressions{std::move(dataColumnExpressions)}, outputExpression{std::move(
outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

Expand All @@ -32,14 +28,6 @@ class LogicalCopy : public LogicalOperator {
return dataColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
return rowIdxExpression;
}

inline std::shared_ptr<binder::Expression> getFilePathExpression() const {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -48,8 +36,8 @@ class LogicalCopy : public LogicalOperator {
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, dataColumnExpressions,
rowIdxExpression, filePathExpression, outputExpression);
return make_unique<LogicalCopy>(
copyDescription, tableID, tableName, dataColumnExpressions, outputExpression);
}

private:
Expand All @@ -58,8 +46,6 @@ class LogicalCopy : public LogicalOperator {
// Used for printing only.
std::string tableName;
binder::expression_vector dataColumnExpressions;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "catalog/table_schema.h"
#include "logical_create_table.h"

namespace kuzu {
Expand Down
Loading

0 comments on commit d03b06a

Please sign in to comment.