Skip to content

Commit

Permalink
node group-based node table storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 20, 2023
1 parent 0262e82 commit f26db56
Show file tree
Hide file tree
Showing 116 changed files with 3,716 additions and 1,526 deletions.
10 changes: 5 additions & 5 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statement) {
auto& copyCSV = (Copy&)statement;
auto& copyStatement = (Copy&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyCSV.getTableName();
auto tableName = copyStatement.getTableName();
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths());
auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyCSV.getFileType();
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
Expand Down
72 changes: 61 additions & 11 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "catalog/catalog.h"

#include "common/string_utils.h"
#include "spdlog/spdlog.h"
#include "storage/storage_utils.h"

using namespace kuzu::common;
Expand Down Expand Up @@ -36,12 +35,34 @@ uint64_t SerDeser::deserializeValue<std::string>(
return offset + valueLength;
}

template<>
uint64_t SerDeser::serializeValue<MetaDiskArrayHeaderInfo>(
const MetaDiskArrayHeaderInfo& value, FileInfo* fileInfo, uint64_t offset) {
offset =
SerDeser::serializeValue<common::page_idx_t>(value.mainHeaderPageIdx, fileInfo, offset);
offset =
SerDeser::serializeValue<common::page_idx_t>(value.nullHeaderPageIdx, fileInfo, offset);
return SerDeser::serializeVector(value.childrenMetaDAHeaderInfos, fileInfo, offset);
}

template<>
uint64_t SerDeser::deserializeValue(
MetaDiskArrayHeaderInfo& value, FileInfo* fileInfo, uint64_t offset) {
offset =
SerDeser::deserializeValue<common::page_idx_t>(value.mainHeaderPageIdx, fileInfo, offset);
offset =
SerDeser::deserializeValue<common::page_idx_t>(value.nullHeaderPageIdx, fileInfo, offset);
return SerDeser::deserializeVector(value.childrenMetaDAHeaderInfos, fileInfo, offset);
}

template<>
uint64_t SerDeser::serializeValue<Property>(
const Property& value, FileInfo* fileInfo, uint64_t offset) {
offset = SerDeser::serializeValue<std::string>(value.name, fileInfo, offset);
offset = SerDeser::serializeValue<LogicalType>(value.dataType, fileInfo, offset);
offset = SerDeser::serializeValue<property_id_t>(value.propertyID, fileInfo, offset);
offset = SerDeser::serializeValue<MetaDiskArrayHeaderInfo>(
value.metaDiskArrayHeaderInfo, fileInfo, offset);
return SerDeser::serializeValue<table_id_t>(value.tableID, fileInfo, offset);
}

Expand All @@ -51,6 +72,8 @@ uint64_t SerDeser::deserializeValue<Property>(
offset = SerDeser::deserializeValue<std::string>(value.name, fileInfo, offset);
offset = SerDeser::deserializeValue<LogicalType>(value.dataType, fileInfo, offset);
offset = SerDeser::deserializeValue<property_id_t>(value.propertyID, fileInfo, offset);
offset = SerDeser::deserializeValue<MetaDiskArrayHeaderInfo>(
value.metaDiskArrayHeaderInfo, fileInfo, offset);
return SerDeser::deserializeValue<table_id_t>(value.tableID, fileInfo, offset);
}

Expand Down Expand Up @@ -170,15 +193,12 @@ namespace kuzu {
namespace catalog {

CatalogContent::CatalogContent() : nextTableID{0} {
logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG);
registerBuiltInFunctions();
}

CatalogContent::CatalogContent(const std::string& directory) {
logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG);
logger->info("Initializing catalog.");
CatalogContent::CatalogContent(const std::string& directory) : nextTableID{0} {
assert(FileUtils::fileOrPathExists(directory));
readFromFile(directory, DBFileType::ORIGINAL);
logger->info("Initializing catalog done.");
registerBuiltInFunctions();
}

Expand Down Expand Up @@ -232,7 +252,7 @@ table_id_t CatalogContent::addRelTableSchema(std::string tableName, RelMultiplic
return tableID;
}

const Property& CatalogContent::getNodeProperty(
Property& CatalogContent::getNodeProperty(
table_id_t tableID, const std::string& propertyName) const {
for (auto& property : nodeTableSchemas.at(tableID)->properties) {
if (propertyName == property.name) {
Expand All @@ -242,7 +262,7 @@ const Property& CatalogContent::getNodeProperty(
throw CatalogException("Cannot find node property " + propertyName + ".");
}

const Property& CatalogContent::getRelProperty(
Property& CatalogContent::getRelProperty(
table_id_t tableID, const std::string& propertyName) const {
for (auto& property : relTableSchemas.at(tableID)->properties) {
if (propertyName == property.name) {
Expand Down Expand Up @@ -295,7 +315,6 @@ void CatalogContent::saveToFile(const std::string& directory, DBFileType dbFileT

void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(directory, dbFileType);
logger->debug("Reading from {}.", catalogPath);
auto fileInfo = FileUtils::openFile(catalogPath, O_RDONLY);
uint64_t offset = 0;
validateMagicBytes(fileInfo.get(), offset);
Expand Down Expand Up @@ -395,6 +414,10 @@ Catalog::Catalog() : wal{nullptr} {

Catalog::Catalog(WAL* wal) : wal{wal} {
catalogContentForReadOnlyTrx = std::make_unique<CatalogContent>(wal->getDirectory());
nodeGroupsMetaFH = wal->getBufferManager()->getBMFileHandle(
StorageUtils::getNodeGroupsMetaFName(wal->getDirectory()),
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
BMFileHandle::FileVersionedType::VERSIONED_FILE);
}

void Catalog::prepareCommitOrRollback(TransactionAction action) {
Expand All @@ -420,6 +443,9 @@ ExpressionType Catalog::getFunctionType(const std::string& name) const {
table_id_t Catalog::addNodeTableSchema(
std::string tableName, property_id_t primaryKeyId, std::vector<Property> propertyDefinitions) {
initCatalogContentForWriteTrxIfNecessary();
for (auto& property : propertyDefinitions) {
addMetaDAHeaderPageForProperty(property.dataType, property.metaDiskArrayHeaderInfo);
}
auto tableID = catalogContentForWriteTrx->addNodeTableSchema(
std::move(tableName), primaryKeyId, std::move(propertyDefinitions));
wal->logNodeTableRecord(tableID);
Expand All @@ -442,16 +468,21 @@ void Catalog::dropTableSchema(table_id_t tableID) {
wal->logDropTableRecord(tableID);
}

void Catalog::renameTable(table_id_t tableID, std::string newName) {
void Catalog::renameTable(table_id_t tableID, const std::string& newName) {
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->renameTable(tableID, std::move(newName));
catalogContentForWriteTrx->renameTable(tableID, newName);
}

void Catalog::addProperty(
table_id_t tableID, const std::string& propertyName, LogicalType dataType) {
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->getTableSchema(tableID)->addProperty(
propertyName, std::move(dataType));
if (catalogContentForWriteTrx->containNodeTable(tableID)) {
auto& addedNodeProperty = catalogContentForWriteTrx->getNodeProperty(tableID, propertyName);
addMetaDAHeaderPageForProperty(
addedNodeProperty.dataType, addedNodeProperty.metaDiskArrayHeaderInfo);
}
wal->logAddPropertyRecord(
tableID, catalogContentForWriteTrx->getTableSchema(tableID)->getPropertyID(propertyName));
}
Expand Down Expand Up @@ -491,5 +522,24 @@ void Catalog::addScalarMacroFunction(
catalogContentForReadOnlyTrx->addScalarMacroFunction(std::move(name), std::move(macro));
}

void Catalog::addMetaDAHeaderPageForProperty(
const common::LogicalType& dataType, MetaDiskArrayHeaderInfo& diskArrayHeaderInfo) {
diskArrayHeaderInfo.mainHeaderPageIdx = nodeGroupsMetaFH->addNewPage();
diskArrayHeaderInfo.nullHeaderPageIdx = nodeGroupsMetaFH->addNewPage();
switch (dataType.getLogicalTypeID()) {
case LogicalTypeID::STRUCT: {
auto fields = StructType::getFields(&dataType);
diskArrayHeaderInfo.childrenMetaDAHeaderInfos.resize(fields.size());
for (auto i = 0u; i < fields.size(); i++) {
addMetaDAHeaderPageForProperty(
*fields[i]->getType(), diskArrayHeaderInfo.childrenMetaDAHeaderInfos[i]);
}
} break;
default: {
// DO NOTHING.
}
}
}

} // namespace catalog
} // namespace kuzu
6 changes: 0 additions & 6 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
#endif
}

void FileUtils::createFileWithSize(const std::string& path, uint64_t size) {
auto fileInfo = common::FileUtils::openFile(path, O_WRONLY | O_CREAT);
common::FileUtils::truncateFileToSize(fileInfo.get(), size);
fileInfo.reset();
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = fileInfo->getFileSize();
Expand Down
10 changes: 10 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/null_buffer.h"
#include "common/vector/auxiliary_buffer.h"
#include <arrow/array.h>

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -179,6 +180,15 @@ void ArrowColumnVector::setArrowColumn(ValueVector* vector, std::shared_ptr<arro
arrowColumnBuffer->column = std::move(column);
}

void ArrowColumnVector::slice(
ValueVector* vectorToSlice, ValueVector* slicedVector, int64_t offset, int64_t length) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vectorToSlice->auxiliaryBuffer.get());
auto arrowColumn = arrowColumnBuffer->column;
auto slicedColumn = arrowColumn->Slice(offset, length);
setArrowColumn(slicedVector, slicedColumn);
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
23 changes: 8 additions & 15 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@
#include "storage/wal/wal.h"
#include "transaction/transaction.h"

namespace spdlog {
class logger;
}

namespace kuzu {
namespace catalog {

class CatalogContent {
friend class Catalog;

public:
// This constructor is only used for mock catalog testing only.
CatalogContent();

explicit CatalogContent(const std::string& directory);

CatalogContent(const CatalogContent& other);

virtual ~CatalogContent() = default;

/**
* Node and Rel table functions.
*/
Expand Down Expand Up @@ -100,10 +93,8 @@ class CatalogContent {
*/
// getNodeProperty and getRelProperty should be called after checking if property exists
// (containNodeProperty and containRelProperty).
const Property& getNodeProperty(
common::table_id_t tableID, const std::string& propertyName) const;
const Property& getRelProperty(
common::table_id_t tableID, const std::string& propertyName) const;
Property& getNodeProperty(common::table_id_t tableID, const std::string& propertyName) const;
Property& getRelProperty(common::table_id_t tableID, const std::string& propertyName) const;

inline const std::vector<Property>& getNodeProperties(common::table_id_t tableID) const {
return nodeTableSchemas.at(tableID)->getProperties();
Expand Down Expand Up @@ -164,7 +155,6 @@ class CatalogContent {
void registerBuiltInFunctions();

private:
std::shared_ptr<spdlog::logger> logger;
std::unordered_map<common::table_id_t, std::unique_ptr<NodeTableSchema>> nodeTableSchemas;
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableSchema>> relTableSchemas;
// These two maps are maintained as caches. They are not serialized to the catalog file, but
Expand All @@ -184,8 +174,6 @@ class Catalog {

explicit Catalog(storage::WAL* wal);

virtual ~Catalog() = default;

// TODO(Guodong): Get rid of these two functions.
inline CatalogContent* getReadOnlyVersion() const { return catalogContentForReadOnlyTrx.get(); }
inline CatalogContent* getWriteVersion() const { return catalogContentForWriteTrx.get(); }
Expand Down Expand Up @@ -227,7 +215,7 @@ class Catalog {

void dropTableSchema(common::table_id_t tableID);

void renameTable(common::table_id_t tableID, std::string newName);
void renameTable(common::table_id_t tableID, const std::string& newName);

void addProperty(
common::table_id_t tableID, const std::string& propertyName, common::LogicalType dataType);
Expand All @@ -248,14 +236,19 @@ class Catalog {
inline function::ScalarMacroFunction* getScalarMacroFunction(std::string name) const {
return catalogContentForReadOnlyTrx->macros.at(name).get();
}
inline storage::BMFileHandle* getNodeGroupsMetaFH() const { return nodeGroupsMetaFH.get(); }

private:
inline bool hasUpdates() { return catalogContentForWriteTrx != nullptr; }

void addMetaDAHeaderPageForProperty(
const common::LogicalType& dataType, MetaDiskArrayHeaderInfo& diskArrayHeaderInfo);

protected:
std::unique_ptr<CatalogContent> catalogContentForReadOnlyTrx;
std::unique_ptr<CatalogContent> catalogContentForWriteTrx;
storage::WAL* wal;
std::unique_ptr<storage::BMFileHandle> nodeGroupsMetaFH;
};

} // namespace catalog
Expand Down
7 changes: 7 additions & 0 deletions src/include/catalog/catalog_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ enum RelMultiplicity : uint8_t { MANY_MANY, MANY_ONE, ONE_MANY, ONE_ONE };
RelMultiplicity getRelMultiplicityFromString(const std::string& relMultiplicityString);
std::string getRelMultiplicityAsString(RelMultiplicity relMultiplicity);

struct MetaDiskArrayHeaderInfo {
common::page_idx_t mainHeaderPageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t nullHeaderPageIdx = common::INVALID_PAGE_IDX;
std::vector<MetaDiskArrayHeaderInfo> childrenMetaDAHeaderInfos;
};

struct Property {
public:
static constexpr std::string_view REL_FROM_PROPERTY_NAME = "_FROM_";
Expand All @@ -35,6 +41,7 @@ struct Property {
common::LogicalType dataType;
common::property_id_t propertyID;
common::table_id_t tableID;
MetaDiskArrayHeaderInfo metaDiskArrayHeaderInfo;
};

struct TableSchema {
Expand Down
9 changes: 7 additions & 2 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,18 @@ struct StorageConstants {
"nodes.statistics_and_deleted.ids.wal";
static constexpr char RELS_METADATA_FILE_NAME[] = "rels.statistics";
static constexpr char RELS_METADATA_FILE_NAME_FOR_WAL[] = "rels.statistics.wal";
static constexpr char CATALOG_FILE_NAME[] = "catalog.bin";
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.bin.wal";
static constexpr char CATALOG_FILE_NAME[] = "catalog.kz";
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.kz.wal";
static constexpr char NODE_GROUPS_DATA_FILE_NAME[] = "data.kz";
static constexpr char NODE_GROUPS_META_FILE_NAME[] = "metadata.kz";

// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
static constexpr uint64_t PAGE_GROUP_SIZE = (uint64_t)1 << PAGE_GROUP_SIZE_LOG2;
static constexpr uint64_t PAGE_IDX_IN_GROUP_MASK = ((uint64_t)1 << PAGE_GROUP_SIZE_LOG2) - 1;

static constexpr uint64_t NODE_GROUP_SIZE_LOG2 = 17; // 64 * 2048 nodes per group
static constexpr uint64_t NODE_GROUP_SIZE = (uint64_t)1 << NODE_GROUP_SIZE_LOG2;
};

struct ListsMetadataConstants {
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct CopyDescription {

static std::string getFileTypeName(FileType fileType);

const std::vector<std::string> filePaths;
std::vector<std::string> filePaths;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};
Expand Down
1 change: 0 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);

static void createFileWithSize(const std::string& path, uint64_t size);
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ class ArrowColumnVector {
}

static void setArrowColumn(ValueVector* vector, std::shared_ptr<arrow::Array> column);

static void slice(
ValueVector* vectorToSlice, ValueVector* slicedVector, int64_t offset, int64_t length);
};

class NodeIDVector {
Expand Down
4 changes: 2 additions & 2 deletions src/include/main/storage_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace kuzu {
namespace storage {
class Column;
class NodeColumn;
}

namespace main {
Expand All @@ -26,7 +26,7 @@ class StorageDriver {

private:
void scanColumn(
storage::Column* column, common::offset_t* offsets, size_t size, uint8_t* result);
storage::NodeColumn* column, common::offset_t* offsets, size_t size, uint8_t* result);

private:
catalog::Catalog* catalog;
Expand Down
Loading

0 comments on commit f26db56

Please sign in to comment.