Skip to content

Commit

Permalink
Merge pull request #1425 from kuzudb/low-mem-copy-os-allocate
Browse files Browse the repository at this point in the history
Optimize copy node memory usage
  • Loading branch information
ray6080 committed Mar 29, 2023
2 parents d0da796 + 19176d0 commit e307fbc
Show file tree
Hide file tree
Showing 20 changed files with 493 additions and 166 deletions.
10 changes: 6 additions & 4 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
return make_unique<FileInfo>(path, fd);
}

void FileUtils::createFileWithSize(const std::string& path, uint64_t size) {
auto fileInfo = common::FileUtils::openFile(path, O_WRONLY | O_CREAT);
common::FileUtils::truncateFileToSize(fileInfo.get(), size);
fileInfo.reset();
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = getFileSize(fileInfo->fd);
Expand Down Expand Up @@ -109,10 +115,6 @@ void FileUtils::removeFileIfExists(const std::string& path) {
}
}

void FileUtils::truncateFileToEmpty(FileInfo* fileInfo) {
ftruncate(fileInfo->fd, 0);
}

std::vector<std::string> FileUtils::globFilePath(const std::string& path) {
std::vector<std::string> result;
glob_t globResult;
Expand Down
8 changes: 7 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);

static void createFileWithSize(const std::string& path, uint64_t size);
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
Expand All @@ -44,7 +45,12 @@ class FileUtils {

static void renameFileIfExists(const std::string& oldName, const std::string& newName);
static void removeFileIfExists(const std::string& path);
static void truncateFileToEmpty(FileInfo* fileInfo);
static inline void truncateFileToEmpty(FileInfo* fileInfo) {
truncateFileToSize(fileInfo, 0 /* size */);
}
static inline void truncateFileToSize(FileInfo* fileInfo, uint64_t size) {
ftruncate(fileInfo->fd, size);
}
static inline bool fileOrPathExists(const std::string& path) {
return std::filesystem::exists(path);
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
29 changes: 19 additions & 10 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#pragma once

#include "copy_structures_arrow.h"
#include "storage/in_mem_storage_structure/node_in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"
#include "table_copier.h"

namespace kuzu {
namespace storage {

class NodeCopier : public CopyStructuresArrow {
class NodeCopier : public TableCopier {

public:
NodeCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: CopyStructuresArrow{copyDescription, std::move(outputDirectory), taskScheduler, catalog,
tableID},
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {}

protected:
Expand All @@ -29,10 +29,11 @@ class NodeCopier : public CopyStructuresArrow {
void saveToFile() override;

template<typename T>
static void populatePKIndex(InMemColumn* column, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset, uint64_t numValues);
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);

std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>> columns;
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>> columns;

private:
template<typename T>
Expand All @@ -46,10 +47,12 @@ class NodeCopier : public CopyStructuresArrow {

template<typename T>
static void putPropsOfLineIntoColumns(
std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>>& columns,
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>&
propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, uint64_t nodeOffset,
uint64_t bufferOffset, common::CopyDescription& copyDescription);
const std::vector<std::shared_ptr<T>>& arrow_columns, common::offset_t nodeOffset,
uint64_t blockOffset, common::CopyDescription& copyDescription);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
Expand All @@ -68,6 +71,12 @@ class NodeCopier : public CopyStructuresArrow {
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
assert(false);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

class CopyNodeNpy : public NodeCopier {
class NpyNodeCopier : public NodeCopier {

public:
CopyNodeNpy(CopyDescription& copyDescription, std::string outputDirectory,
NpyNodeCopier(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, catalog::Catalog& catalog, table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: NodeCopier(copyDescription, outputDirectory, taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs){};

~CopyNodeNpy() = default;
~NpyNodeCopier() override = default;

private:
void populateColumnsAndLists() override;
Expand All @@ -42,7 +42,8 @@ class CopyNodeNpy : public NodeCopier {

static void batchPopulateColumnsTask(common::property_id_t primaryKeyPropertyIdx,
uint64_t blockIdx, offset_t startOffset, uint64_t numLinesInCurBlock,
HashIndexBuilder<int64_t>* pkIndex, CopyNodeNpy* copier, common::property_id_t propertyIdx);
HashIndexBuilder<int64_t>* pkIndex, NpyNodeCopier* copier,
common::property_id_t propertyIdx);

private:
std::unordered_map<common::property_id_t, std::unique_ptr<NpyReader>> npyReaderMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#pragma once

#include "copy_structures_arrow.h"
#include "storage/index/hash_index.h"
#include "storage/store/rels_statistics.h"
#include "table_copier.h"

namespace kuzu {
namespace storage {

class CopyRelArrow : public CopyStructuresArrow {
class RelCopier : public TableCopier {
enum class PopulateTaskType : uint8_t {
populateAdjColumnsAndCountRelsInAdjListsTask = 0,
populateListsTask = 1
};

public:
CopyRelArrow(common::CopyDescription& copyDescription, std::string outputDirectory,
RelCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerNodeTable,
BufferManager* bufferManager, common::table_id_t tableID, RelsStatistics* relsStatistics);
Expand Down Expand Up @@ -75,13 +75,13 @@ class CopyRelArrow : public CopyStructuresArrow {
transaction::Transaction* transaction, int64_t blockOffset, int64_t& colIndex);

template<typename T>
static void putPropsOfLineIntoColumns(CopyRelArrow* copier,
static void putPropsOfLineIntoColumns(RelCopier* copier,
std::vector<PageByteCursor>& inMemOverflowFileCursors,
const std::vector<std::shared_ptr<T>>& batchColumns,
const std::vector<common::nodeID_t>& nodeIDs, int64_t blockOffset, int64_t& colIndex);

template<typename T>
static void putPropsOfLineIntoLists(CopyRelArrow* copier,
static void putPropsOfLineIntoLists(RelCopier* copier,
std::vector<PageByteCursor>& inMemOverflowFileCursors,
const std::vector<std::shared_ptr<T>>& batchColumns,
const std::vector<common::nodeID_t>& nodeIDs, const std::vector<uint64_t>& reversePos,
Expand All @@ -99,11 +99,11 @@ class CopyRelArrow : public CopyStructuresArrow {
// Concurrent tasks.
template<typename T>
static void populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t blockIdx,
uint64_t blockStartRelID, CopyRelArrow* copier,
uint64_t blockStartRelID, RelCopier* copier,
const std::vector<std::shared_ptr<T>>& batchColumns, const std::string& filePath);

template<typename T>
static void populateListsTask(uint64_t blockId, uint64_t blockStartRelID, CopyRelArrow* copier,
static void populateListsTask(uint64_t blockId, uint64_t blockStartRelID, RelCopier* copier,
const std::vector<std::shared_ptr<T>>& batchColumns, const std::string& filePath);

static void sortOverflowValuesOfPropertyColumnTask(const common::DataType& dataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace kuzu {
namespace storage {

class CopyStructuresArrow {
class TableCopier {
struct FileBlockInfo {
FileBlockInfo(common::offset_t startOffset, uint64_t numBlocks,
std::vector<uint64_t> numLinesPerBlock)
Expand All @@ -34,11 +34,11 @@ class CopyStructuresArrow {
};

public:
CopyStructuresArrow(common::CopyDescription& copyDescription, std::string outputDirectory,
TableCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);

virtual ~CopyStructuresArrow() = default;
virtual ~TableCopier() = default;

uint64_t copy();

Expand Down Expand Up @@ -88,6 +88,14 @@ class CopyStructuresArrow {

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

uint64_t getNumBlocks() const {
uint64_t numBlocks = 0;
for (auto& [_, info] : fileBlockInfos) {
numBlocks += info.numBlocks;
}
return numBlocks;
}

protected:
std::shared_ptr<spdlog::logger> logger;
common::CopyDescription& copyDescription;
Expand Down
147 changes: 147 additions & 0 deletions src/include/storage/in_mem_storage_structure/node_in_mem_column.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#pragma once

#include "common/types/types.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/storage_structure/in_mem_file.h"

namespace kuzu {
namespace storage {

class InMemColumnChunk {
public:
InMemColumnChunk(common::offset_t startOffset, common::offset_t endOffset,
uint16_t numBytesForElement, uint64_t numElementsInAPage)
: numBytesForElement{numBytesForElement}, numElementsInAPage{numElementsInAPage} {
startPageIdx = CursorUtils::getPageIdx(startOffset, numElementsInAPage);
endPageIdx = CursorUtils::getPageIdx(endOffset, numElementsInAPage);
auto numPages = endPageIdx - startPageIdx + 1;
pages = std::make_unique<uint8_t[]>(numPages * common::BufferPoolConstants::PAGE_4KB_SIZE);
memset(pages.get(), 0, numPages * common::BufferPoolConstants::PAGE_4KB_SIZE);
}

inline uint8_t* getPage(common::page_idx_t pageIdx) {
assert(pageIdx <= endPageIdx && pageIdx >= startPageIdx);
auto pageIdxInSet = pageIdx - startPageIdx;
return pages.get() + (pageIdxInSet * common::BufferPoolConstants::PAGE_4KB_SIZE);
}

void copyValue(common::offset_t nodeOffset, const uint8_t* val) {
auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage);
auto page = getPage(cursor.pageIdx);
auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement;
memcpy(page + elemPosInPageInBytes, val, numBytesForElement);
}

uint8_t* getValue(common::offset_t nodeOffset) {
auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage);
auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement;
return getPage(cursor.pageIdx) + elemPosInPageInBytes;
}

private:
uint16_t numBytesForElement;
uint64_t numElementsInAPage;
common::page_idx_t startPageIdx;
common::page_idx_t endPageIdx;
std::unique_ptr<uint8_t[]> pages;
};

class NodeInMemColumn {

public:
NodeInMemColumn(std::string filePath, common::DataType dataType, uint16_t numBytesForElement,
uint64_t numElements, uint64_t numBlocks);
virtual ~NodeInMemColumn() = default;

// Encode and flush null bits.
virtual inline void saveToFile() { flushNullBits(); }

inline common::DataType getDataType() { return dataType; }

// Flush pages which holds nodeOffsets in the range [startOffset, endOffset] (inclusive).
void flushChunk(
InMemColumnChunk* chunk, common::offset_t startOffset, common::offset_t endOffset);

void setElementInChunk(InMemColumnChunk* chunk, common::offset_t offset, const uint8_t* val);

inline bool isNullAtNodeOffset(common::offset_t nodeOffset) {
return nullMask->isNull(nodeOffset);
}

virtual inline InMemOverflowFile* getInMemOverflowFile() { return nullptr; }

inline common::NullMask* getNullMask() { return nullMask.get(); }
inline uint16_t getNumBytesForElement() const { return numBytesForElement; }
inline uint64_t getNumElementsInAPage() const { return numElementsInAPage; }

private:
std::unique_ptr<uint64_t[]> encodeNullBits(common::page_idx_t pageIdx);
void flushNullBits();

protected:
std::string filePath;
uint16_t numBytesForElement;
uint64_t numElementsInAPage;
uint64_t nullEntriesOffset;
uint64_t numNullEntriesPerPage;
std::unique_ptr<FileHandle> fileHandle;
std::unique_ptr<common::NullMask> nullMask;
common::DataType dataType;
uint64_t numElements;
uint64_t numPages;
};

class NodeInMemColumnWithOverflow : public NodeInMemColumn {

public:
NodeInMemColumnWithOverflow(std::string filePath, common::DataType dataType,
uint16_t numBytesForElement, uint64_t numElements, uint64_t numBlocks)
: NodeInMemColumn{std::move(filePath), std::move(dataType), numBytesForElement, numElements,
numBlocks} {
assert(
this->dataType.typeID == common::STRING || this->dataType.typeID == common::VAR_LIST);
inMemOverflowFile =
make_unique<InMemOverflowFile>(StorageUtils::getOverflowFileName(this->filePath));
}

inline InMemOverflowFile* getInMemOverflowFile() override { return inMemOverflowFile.get(); }

void saveToFile() override {
NodeInMemColumn::saveToFile();
inMemOverflowFile->flush();
}

protected:
std::unique_ptr<InMemOverflowFile> inMemOverflowFile;
};

class InMemBMPageCollectionFactory {

public:
static std::unique_ptr<NodeInMemColumn> getInMemBMPageCollection(const std::string& filePath,
const common::DataType& dataType, uint64_t numElements, uint64_t numBlocks) {
switch (dataType.typeID) {
case common::INT64:
case common::INT32:
case common::INT16:
case common::DOUBLE:
case common::FLOAT:
case common::BOOL:
case common::DATE:
case common::TIMESTAMP:
case common::INTERVAL:
case common::FIXED_LIST:
return make_unique<NodeInMemColumn>(filePath, dataType,
common::Types::getDataTypeSize(dataType), numElements, numBlocks);
case common::STRING:
case common::VAR_LIST:
return make_unique<NodeInMemColumnWithOverflow>(filePath, dataType,
common::Types::getDataTypeSize(dataType), numElements, numBlocks);
default:
throw common::CopyException("Invalid type for property column creation.");
}
}
};

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit e307fbc

Please sign in to comment.