Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node group-based node table storage #1802

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.6.3 LANGUAGES CXX)
project(Kuzu VERSION 0.0.6.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
7 changes: 3 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
if (expectedFileType == CopyDescription::FileType::UNKNOWN &&
actualFileType == CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
}
if (expectedFileType == common::CopyDescription::FileType::NPY &&
actualFileType != expectedFileType) {
if (expectedFileType == CopyDescription::FileType::NPY && actualFileType != expectedFileType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
if (actualFileType == CopyDescription::FileType::NPY) {
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFil
table_id_t tableID;
for (auto i = 0u; i < numTables; i++) {
SerDeser::deserializeValue(tableID, fileInfo.get(), offset);
tableSchemas.emplace(tableID, TableSchema::deserialize(fileInfo.get(), offset));
tableSchemas[tableID] = TableSchema::deserialize(fileInfo.get(), offset);
}
for (auto& [tableID_, tableSchema] : tableSchemas) {
tableNameToIDMap.emplace(tableSchema->tableName, tableID_);
tableNameToIDMap[tableSchema->tableName] = tableID_;
}
SerDeser::deserializeValue(nextTableID, fileInfo.get(), offset);
SerDeser::deserializeUnorderedMap(macros, fileInfo.get(), offset);
Expand Down
20 changes: 19 additions & 1 deletion src/catalog/property.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,27 @@ using namespace kuzu::common;
namespace kuzu {
namespace catalog {

void MetadataDAHInfo::serialize(FileInfo* fileInfo, uint64_t& offset) const {
SerDeser::serializeValue(dataDAHPageIdx, fileInfo, offset);
SerDeser::serializeValue(nullDAHPageIdx, fileInfo, offset);
SerDeser::serializeVectorOfPtrs(childrenInfos, fileInfo, offset);
}

std::unique_ptr<MetadataDAHInfo> MetadataDAHInfo::deserialize(
FileInfo* fileInfo, uint64_t& offset) {
auto metadataDAHInfo = std::make_unique<MetadataDAHInfo>();
SerDeser::deserializeValue(metadataDAHInfo->dataDAHPageIdx, fileInfo, offset);
SerDeser::deserializeValue(metadataDAHInfo->nullDAHPageIdx, fileInfo, offset);
SerDeser::deserializeVectorOfPtrs(metadataDAHInfo->childrenInfos, fileInfo, offset);
return metadataDAHInfo;
}

void Property::serialize(FileInfo* fileInfo, uint64_t& offset) const {
SerDeser::serializeValue(name, fileInfo, offset);
dataType->serialize(fileInfo, offset);
SerDeser::serializeValue(propertyID, fileInfo, offset);
SerDeser::serializeValue(tableID, fileInfo, offset);
metadataDAHInfo->serialize(fileInfo, offset);
}

std::unique_ptr<Property> Property::deserialize(FileInfo* fileInfo, uint64_t& offset) {
Expand All @@ -22,7 +38,9 @@ std::unique_ptr<Property> Property::deserialize(FileInfo* fileInfo, uint64_t& of
auto dataType = LogicalType::deserialize(fileInfo, offset);
SerDeser::deserializeValue(propertyID, fileInfo, offset);
SerDeser::deserializeValue(tableID, fileInfo, offset);
return std::make_unique<Property>(name, std::move(dataType), propertyID, tableID);
auto metadataDAHInfo = MetadataDAHInfo::deserialize(fileInfo, offset);
return std::make_unique<Property>(
name, std::move(dataType), propertyID, tableID, std::move(metadataDAHInfo));
}

std::vector<std::unique_ptr<catalog::Property>> Property::copyProperties(
Expand Down
3 changes: 2 additions & 1 deletion src/catalog/table_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "common/string_utils.h"

using namespace kuzu::common;
using namespace kuzu::storage;

namespace kuzu {
namespace catalog {
Expand Down Expand Up @@ -124,7 +125,7 @@ std::unique_ptr<TableSchema> TableSchema::deserialize(FileInfo* fileInfo, uint64
result = RelTableSchema::deserialize(fileInfo, offset);
} break;
default: {
throw common::NotImplementedException{"TableSchema::deserialize"};
throw NotImplementedException{"TableSchema::deserialize"};
}
}
result->tableName = tableName;
Expand Down
6 changes: 0 additions & 6 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
#endif
}

void FileUtils::createFileWithSize(const std::string& path, uint64_t size) {
auto fileInfo = common::FileUtils::openFile(path, O_WRONLY | O_CREAT);
common::FileUtils::truncateFileToSize(fileInfo.get(), size);
fileInfo.reset();
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = fileInfo->getFileSize();
Expand Down
9 changes: 9 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/null_buffer.h"
#include "common/vector/auxiliary_buffer.h"
#include <arrow/array.h>

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -181,6 +182,14 @@ void ArrowColumnVector::setArrowColumn(ValueVector* vector, std::shared_ptr<arro
arrowColumnBuffer->column = std::move(column);
}

void ArrowColumnVector::slice(ValueVector* vector, offset_t offset) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
auto arrowColumn = arrowColumnBuffer->column;
auto slicedColumn = arrowColumn->Slice((int64_t)offset);
setArrowColumn(vector, slicedColumn);
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
46 changes: 43 additions & 3 deletions src/include/catalog/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@
namespace kuzu {
namespace catalog {

// DAH is the abbreviation for Disk Array Header.
class MetadataDAHInfo {
public:
MetadataDAHInfo() : MetadataDAHInfo{common::INVALID_PAGE_IDX, common::INVALID_PAGE_IDX} {}
MetadataDAHInfo(common::page_idx_t dataDAHPageIdx)
: MetadataDAHInfo{dataDAHPageIdx, common::INVALID_PAGE_IDX} {}
MetadataDAHInfo(common::page_idx_t dataDAHPageIdx, common::page_idx_t nullDAHPageIdx)
: dataDAHPageIdx{dataDAHPageIdx}, nullDAHPageIdx{nullDAHPageIdx} {}

inline std::unique_ptr<MetadataDAHInfo> copy() {
auto result = std::make_unique<MetadataDAHInfo>(dataDAHPageIdx, nullDAHPageIdx);
result->childrenInfos.resize(childrenInfos.size());
for (size_t i = 0; i < childrenInfos.size(); ++i) {
result->childrenInfos[i] = childrenInfos[i]->copy();
}
return result;
}

void serialize(common::FileInfo* fileInfo, uint64_t& offset) const;
static std::unique_ptr<MetadataDAHInfo> deserialize(
common::FileInfo* fileInfo, uint64_t& offset);

common::page_idx_t dataDAHPageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t nullDAHPageIdx = common::INVALID_PAGE_IDX;
std::vector<std::unique_ptr<MetadataDAHInfo>> childrenInfos;
};

class Property {
public:
static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_FROM_";
Expand All @@ -16,9 +43,14 @@ class Property {
common::INVALID_TABLE_ID} {}

Property(std::string name, std::unique_ptr<common::LogicalType> dataType,
common::property_id_t propertyID, common::table_id_t tableID)
common::property_id_t propertyID, common::table_id_t tableID,
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo = nullptr)
: name{std::move(name)}, dataType{std::move(dataType)},
propertyID{propertyID}, tableID{tableID} {}
propertyID{propertyID}, tableID{tableID}, metadataDAHInfo{std::move(metadataDAHInfo)} {
if (this->metadataDAHInfo == nullptr) {
this->metadataDAHInfo = std::make_unique<MetadataDAHInfo>();
}
}

inline std::string getName() const { return name; }

Expand All @@ -28,10 +60,16 @@ class Property {

inline common::table_id_t getTableID() const { return tableID; }

inline MetadataDAHInfo* getMetadataDAHInfo() const { return metadataDAHInfo.get(); }

inline void setPropertyID(common::property_id_t propertyID_) { this->propertyID = propertyID_; }

inline void setTableID(common::table_id_t tableID_) { this->tableID = tableID_; }

inline void setMetadataDAHInfo(std::unique_ptr<MetadataDAHInfo> metadataDAHInfo_) {
this->metadataDAHInfo = std::move(metadataDAHInfo_);
}

inline void rename(std::string newName) { this->name = std::move(newName); }

void serialize(common::FileInfo* fileInfo, uint64_t& offset) const;
Expand All @@ -41,14 +79,16 @@ class Property {
const std::vector<std::unique_ptr<catalog::Property>>& propertiesToCopy);

inline std::unique_ptr<Property> copy() const {
return std::make_unique<Property>(name, dataType->copy(), propertyID, tableID);
return std::make_unique<Property>(
name, dataType->copy(), propertyID, tableID, metadataDAHInfo->copy());
}

private:
std::string name;
std::unique_ptr<common::LogicalType> dataType;
common::property_id_t propertyID;
common::table_id_t tableID;
std::unique_ptr<MetadataDAHInfo> metadataDAHInfo;
};

} // namespace catalog
Expand Down
4 changes: 4 additions & 0 deletions src/include/catalog/table_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include "property.h"

namespace kuzu {
namespace storage {
class BMFileHandle;
}

namespace catalog {

enum class TableType : uint8_t { NODE = 0, REL = 1, INVALID = 2 };
Expand Down
15 changes: 7 additions & 8 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,18 @@ struct StorageConstants {
"nodes.statistics_and_deleted.ids.wal";
static constexpr char RELS_METADATA_FILE_NAME[] = "rels.statistics";
static constexpr char RELS_METADATA_FILE_NAME_FOR_WAL[] = "rels.statistics.wal";
static constexpr char CATALOG_FILE_NAME[] = "catalog.bin";
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.bin.wal";
static constexpr char CATALOG_FILE_NAME[] = "catalog.kz";
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.kz.wal";
static constexpr char DATA_FILE_NAME[] = "data.kz";
static constexpr char METADATA_FILE_NAME[] = "metadata.kz";

// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
static constexpr uint64_t PAGE_GROUP_SIZE = (uint64_t)1 << PAGE_GROUP_SIZE_LOG2;
static constexpr uint64_t PAGE_IDX_IN_GROUP_MASK = ((uint64_t)1 << PAGE_GROUP_SIZE_LOG2) - 1;

static constexpr uint64_t NODE_GROUP_SIZE_LOG2 = 17; // 64 * 2048 nodes per group
static constexpr uint64_t NODE_GROUP_SIZE = (uint64_t)1 << NODE_GROUP_SIZE_LOG2;
};

struct ListsMetadataConstants {
Expand All @@ -109,12 +114,6 @@ struct CopyConstants {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of tasks to be assigned in a batch when reading files.
static constexpr uint64_t NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH = 200;

// Lower bound for number of incomplete tasks in copier to trigger scheduling a new batch.
static constexpr uint64_t MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE = 50;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

Expand Down
1 change: 1 addition & 0 deletions src/include/common/data_chunk/data_chunk_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DataChunkState {
selVector->selectedSize = size;
}
inline bool isFlat() const { return currIdx != -1; }
inline void setToUnflat() { currIdx = -1; }
inline uint64_t getNumSelectedValues() const { return isFlat() ? 1 : selVector->selectedSize; }

public:
Expand Down
1 change: 0 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);

static void createFileWithSize(const std::string& path, uint64_t size);
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ constexpr struct_field_idx_t INVALID_STRUCT_FIELD_IDX = UINT64_MAX;
using row_idx_t = uint64_t;
constexpr row_idx_t INVALID_ROW_IDX = UINT64_MAX;
constexpr uint32_t UNDEFINED_CAST_COST = UINT32_MAX;
using node_group_idx_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ class ArrowColumnVector {
}

static void setArrowColumn(ValueVector* vector, std::shared_ptr<arrow::Array> column);

// Slice the arrow column vector from the given offset to the end.
static void slice(ValueVector* vector, offset_t offset);
};

class NodeIDVector {
Expand Down
4 changes: 2 additions & 2 deletions src/include/main/storage_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace kuzu {
namespace storage {
class Column;
class NodeColumn;
}

namespace main {
Expand All @@ -26,7 +26,7 @@ class StorageDriver {

private:
void scanColumn(
storage::Column* column, common::offset_t* offsets, size_t size, uint8_t* result);
storage::NodeColumn* column, common::offset_t* offsets, size_t size, uint8_t* result);

private:
catalog::Catalog* catalog;
Expand Down
22 changes: 4 additions & 18 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ class LogicalCopy : public LogicalOperator {
public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector dataColumnExpressions,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
dataColumnExpressions{std::move(dataColumnExpressions)}, rowIdxExpression{std::move(
rowIdxExpression)},
filePathExpression{std::move(filePathExpression)}, outputExpression{
std::move(outputExpression)} {}
dataColumnExpressions{std::move(dataColumnExpressions)}, outputExpression{std::move(
outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

Expand All @@ -32,14 +28,6 @@ class LogicalCopy : public LogicalOperator {
return dataColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
return rowIdxExpression;
}

inline std::shared_ptr<binder::Expression> getFilePathExpression() const {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -48,8 +36,8 @@ class LogicalCopy : public LogicalOperator {
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, dataColumnExpressions,
rowIdxExpression, filePathExpression, outputExpression);
return make_unique<LogicalCopy>(
copyDescription, tableID, tableName, dataColumnExpressions, outputExpression);
}

private:
Expand All @@ -58,8 +46,6 @@ class LogicalCopy : public LogicalOperator {
// Used for printing only.
std::string tableName;
binder::expression_vector dataColumnExpressions;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "catalog/table_schema.h"
#include "logical_create_table.h"

namespace kuzu {
Expand Down
Loading
Loading