Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize copy node memory usage #1425

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
return make_unique<FileInfo>(path, fd);
}

void FileUtils::createFileWithSize(const std::string& path, uint64_t size) {
auto fileInfo = common::FileUtils::openFile(path, O_WRONLY | O_CREAT);
common::FileUtils::truncateFileToSize(fileInfo.get(), size);
fileInfo.reset();
}

void FileUtils::writeToFile(
FileInfo* fileInfo, uint8_t* buffer, uint64_t numBytes, uint64_t offset) {
auto fileSize = getFileSize(fileInfo->fd);
Expand Down Expand Up @@ -109,10 +115,6 @@ void FileUtils::removeFileIfExists(const std::string& path) {
}
}

void FileUtils::truncateFileToEmpty(FileInfo* fileInfo) {
ftruncate(fileInfo->fd, 0);
}

std::vector<std::string> FileUtils::globFilePath(const std::string& path) {
std::vector<std::string> result;
glob_t globResult;
Expand Down
8 changes: 7 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);

static void createFileWithSize(const std::string& path, uint64_t size);
static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
static void writeToFile(
Expand All @@ -44,7 +45,12 @@ class FileUtils {

static void renameFileIfExists(const std::string& oldName, const std::string& newName);
static void removeFileIfExists(const std::string& path);
static void truncateFileToEmpty(FileInfo* fileInfo);
static inline void truncateFileToEmpty(FileInfo* fileInfo) {
truncateFileToSize(fileInfo, 0 /* size */);
}
static inline void truncateFileToSize(FileInfo* fileInfo, uint64_t size) {
ftruncate(fileInfo->fd, size);
}
static inline bool fileOrPathExists(const std::string& path) {
return std::filesystem::exists(path);
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ using property_id_t = uint32_t;
constexpr property_id_t INVALID_PROPERTY_ID = UINT32_MAX;
using vector_idx_t = uint32_t;
constexpr vector_idx_t INVALID_VECTOR_IDX = UINT32_MAX;
using block_idx_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
29 changes: 19 additions & 10 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#pragma once

#include "copy_structures_arrow.h"
#include "storage/in_mem_storage_structure/node_in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"
#include "table_copier.h"

namespace kuzu {
namespace storage {

class NodeCopier : public CopyStructuresArrow {
class NodeCopier : public TableCopier {

public:
NodeCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, common::table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: CopyStructuresArrow{copyDescription, std::move(outputDirectory), taskScheduler, catalog,
tableID},
: TableCopier{copyDescription, std::move(outputDirectory), taskScheduler, catalog, tableID},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs} {}

protected:
Expand All @@ -29,10 +29,11 @@ class NodeCopier : public CopyStructuresArrow {
void saveToFile() override;

template<typename T>
static void populatePKIndex(InMemColumn* column, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset, uint64_t numValues);
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);

std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>> columns;
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>> columns;

private:
template<typename T>
Expand All @@ -46,10 +47,12 @@ class NodeCopier : public CopyStructuresArrow {

template<typename T>
static void putPropsOfLineIntoColumns(
std::unordered_map<common::property_id_t, std::unique_ptr<InMemColumn>>& columns,
std::unordered_map<uint64_t, std::unique_ptr<InMemColumnChunk>>& chunks,
std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>>&
propertyColumns,
std::vector<PageByteCursor>& overflowCursors,
const std::vector<std::shared_ptr<T>>& arrow_columns, uint64_t nodeOffset,
uint64_t bufferOffset, common::CopyDescription& copyDescription);
const std::vector<std::shared_ptr<T>>& arrow_columns, common::offset_t nodeOffset,
uint64_t blockOffset, common::CopyDescription& copyDescription);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
Expand All @@ -68,6 +71,12 @@ class NodeCopier : public CopyStructuresArrow {
common::offset_t startOffset, std::string filePath,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
assert(false);
}

private:
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

class CopyNodeNpy : public NodeCopier {
class NpyNodeCopier : public NodeCopier {

public:
CopyNodeNpy(CopyDescription& copyDescription, std::string outputDirectory,
NpyNodeCopier(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, catalog::Catalog& catalog, table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: NodeCopier(copyDescription, outputDirectory, taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs){};

~CopyNodeNpy() = default;
~NpyNodeCopier() override = default;

private:
void populateColumnsAndLists() override;
Expand All @@ -42,7 +42,8 @@ class CopyNodeNpy : public NodeCopier {

static void batchPopulateColumnsTask(common::property_id_t primaryKeyPropertyIdx,
uint64_t blockIdx, offset_t startOffset, uint64_t numLinesInCurBlock,
HashIndexBuilder<int64_t>* pkIndex, CopyNodeNpy* copier, common::property_id_t propertyIdx);
HashIndexBuilder<int64_t>* pkIndex, NpyNodeCopier* copier,
common::property_id_t propertyIdx);

private:
std::unordered_map<common::property_id_t, std::unique_ptr<NpyReader>> npyReaderMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#pragma once

#include "copy_structures_arrow.h"
#include "storage/index/hash_index.h"
#include "storage/store/rels_statistics.h"
#include "table_copier.h"

namespace kuzu {
namespace storage {

class CopyRelArrow : public CopyStructuresArrow {
class RelCopier : public TableCopier {
enum class PopulateTaskType : uint8_t {
populateAdjColumnsAndCountRelsInAdjListsTask = 0,
populateListsTask = 1
};

public:
CopyRelArrow(common::CopyDescription& copyDescription, std::string outputDirectory,
RelCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
std::map<common::table_id_t, common::offset_t> maxNodeOffsetsPerNodeTable,
BufferManager* bufferManager, common::table_id_t tableID, RelsStatistics* relsStatistics);
Expand Down Expand Up @@ -75,13 +75,13 @@ class CopyRelArrow : public CopyStructuresArrow {
transaction::Transaction* transaction, int64_t blockOffset, int64_t& colIndex);

template<typename T>
static void putPropsOfLineIntoColumns(CopyRelArrow* copier,
static void putPropsOfLineIntoColumns(RelCopier* copier,
std::vector<PageByteCursor>& inMemOverflowFileCursors,
const std::vector<std::shared_ptr<T>>& batchColumns,
const std::vector<common::nodeID_t>& nodeIDs, int64_t blockOffset, int64_t& colIndex);

template<typename T>
static void putPropsOfLineIntoLists(CopyRelArrow* copier,
static void putPropsOfLineIntoLists(RelCopier* copier,
std::vector<PageByteCursor>& inMemOverflowFileCursors,
const std::vector<std::shared_ptr<T>>& batchColumns,
const std::vector<common::nodeID_t>& nodeIDs, const std::vector<uint64_t>& reversePos,
Expand All @@ -99,11 +99,11 @@ class CopyRelArrow : public CopyStructuresArrow {
// Concurrent tasks.
template<typename T>
static void populateAdjColumnsAndCountRelsInAdjListsTask(uint64_t blockIdx,
uint64_t blockStartRelID, CopyRelArrow* copier,
uint64_t blockStartRelID, RelCopier* copier,
const std::vector<std::shared_ptr<T>>& batchColumns, const std::string& filePath);

template<typename T>
static void populateListsTask(uint64_t blockId, uint64_t blockStartRelID, CopyRelArrow* copier,
static void populateListsTask(uint64_t blockId, uint64_t blockStartRelID, RelCopier* copier,
const std::vector<std::shared_ptr<T>>& batchColumns, const std::string& filePath);

static void sortOverflowValuesOfPropertyColumnTask(const common::DataType& dataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace kuzu {
namespace storage {

class CopyStructuresArrow {
class TableCopier {
struct FileBlockInfo {
FileBlockInfo(common::offset_t startOffset, uint64_t numBlocks,
std::vector<uint64_t> numLinesPerBlock)
Expand All @@ -34,11 +34,11 @@ class CopyStructuresArrow {
};

public:
CopyStructuresArrow(common::CopyDescription& copyDescription, std::string outputDirectory,
TableCopier(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);

virtual ~CopyStructuresArrow() = default;
virtual ~TableCopier() = default;

uint64_t copy();

Expand Down Expand Up @@ -88,6 +88,14 @@ class CopyStructuresArrow {

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

uint64_t getNumBlocks() const {
uint64_t numBlocks = 0;
for (auto& [_, info] : fileBlockInfos) {
numBlocks += info.numBlocks;
}
return numBlocks;
}

protected:
std::shared_ptr<spdlog::logger> logger;
common::CopyDescription& copyDescription;
Expand Down
147 changes: 147 additions & 0 deletions src/include/storage/in_mem_storage_structure/node_in_mem_column.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#pragma once

#include "common/types/types.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/storage_structure/in_mem_file.h"

namespace kuzu {
namespace storage {

class InMemColumnChunk {
public:
InMemColumnChunk(common::offset_t startOffset, common::offset_t endOffset,
uint16_t numBytesForElement, uint64_t numElementsInAPage)
: numBytesForElement{numBytesForElement}, numElementsInAPage{numElementsInAPage} {
startPageIdx = CursorUtils::getPageIdx(startOffset, numElementsInAPage);
endPageIdx = CursorUtils::getPageIdx(endOffset, numElementsInAPage);
auto numPages = endPageIdx - startPageIdx + 1;
pages = std::make_unique<uint8_t[]>(numPages * common::BufferPoolConstants::PAGE_4KB_SIZE);
memset(pages.get(), 0, numPages * common::BufferPoolConstants::PAGE_4KB_SIZE);
}

inline uint8_t* getPage(common::page_idx_t pageIdx) {
assert(pageIdx <= endPageIdx && pageIdx >= startPageIdx);
auto pageIdxInSet = pageIdx - startPageIdx;
return pages.get() + (pageIdxInSet * common::BufferPoolConstants::PAGE_4KB_SIZE);
}

void copyValue(common::offset_t nodeOffset, const uint8_t* val) {
auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage);
auto page = getPage(cursor.pageIdx);
auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement;
memcpy(page + elemPosInPageInBytes, val, numBytesForElement);
}

uint8_t* getValue(common::offset_t nodeOffset) {
auto cursor = CursorUtils::getPageElementCursor(nodeOffset, numElementsInAPage);
auto elemPosInPageInBytes = cursor.elemPosInPage * numBytesForElement;
return getPage(cursor.pageIdx) + elemPosInPageInBytes;
}

private:
uint16_t numBytesForElement;
uint64_t numElementsInAPage;
common::page_idx_t startPageIdx;
common::page_idx_t endPageIdx;
std::unique_ptr<uint8_t[]> pages;
};

class NodeInMemColumn {

public:
NodeInMemColumn(std::string filePath, common::DataType dataType, uint16_t numBytesForElement,
uint64_t numElements, uint64_t numBlocks);
virtual ~NodeInMemColumn() = default;

// Encode and flush null bits.
virtual inline void saveToFile() { flushNullBits(); }

inline common::DataType getDataType() { return dataType; }

// Flush pages which holds nodeOffsets in the range [startOffset, endOffset] (inclusive).
void flushChunk(
InMemColumnChunk* chunk, common::offset_t startOffset, common::offset_t endOffset);

void setElementInChunk(InMemColumnChunk* chunk, common::offset_t offset, const uint8_t* val);

inline bool isNullAtNodeOffset(common::offset_t nodeOffset) {
return nullMask->isNull(nodeOffset);
}

virtual inline InMemOverflowFile* getInMemOverflowFile() { return nullptr; }

inline common::NullMask* getNullMask() { return nullMask.get(); }
inline uint16_t getNumBytesForElement() const { return numBytesForElement; }
inline uint64_t getNumElementsInAPage() const { return numElementsInAPage; }

private:
std::unique_ptr<uint64_t[]> encodeNullBits(common::page_idx_t pageIdx);
void flushNullBits();

protected:
std::string filePath;
uint16_t numBytesForElement;
uint64_t numElementsInAPage;
uint64_t nullEntriesOffset;
uint64_t numNullEntriesPerPage;
std::unique_ptr<FileHandle> fileHandle;
std::unique_ptr<common::NullMask> nullMask;
common::DataType dataType;
uint64_t numElements;
uint64_t numPages;
};

class NodeInMemColumnWithOverflow : public NodeInMemColumn {

public:
NodeInMemColumnWithOverflow(std::string filePath, common::DataType dataType,
uint16_t numBytesForElement, uint64_t numElements, uint64_t numBlocks)
: NodeInMemColumn{std::move(filePath), std::move(dataType), numBytesForElement, numElements,
numBlocks} {
assert(
this->dataType.typeID == common::STRING || this->dataType.typeID == common::VAR_LIST);
inMemOverflowFile =
make_unique<InMemOverflowFile>(StorageUtils::getOverflowFileName(this->filePath));
}

inline InMemOverflowFile* getInMemOverflowFile() override { return inMemOverflowFile.get(); }

void saveToFile() override {
NodeInMemColumn::saveToFile();
inMemOverflowFile->flush();
}

protected:
std::unique_ptr<InMemOverflowFile> inMemOverflowFile;
};

class InMemBMPageCollectionFactory {

public:
static std::unique_ptr<NodeInMemColumn> getInMemBMPageCollection(const std::string& filePath,
const common::DataType& dataType, uint64_t numElements, uint64_t numBlocks) {
switch (dataType.typeID) {
case common::INT64:
case common::INT32:
case common::INT16:
case common::DOUBLE:
case common::FLOAT:
case common::BOOL:
case common::DATE:
case common::TIMESTAMP:
case common::INTERVAL:
case common::FIXED_LIST:
return make_unique<NodeInMemColumn>(filePath, dataType,
common::Types::getDataTypeSize(dataType), numElements, numBlocks);
case common::STRING:
case common::VAR_LIST:
return make_unique<NodeInMemColumnWithOverflow>(filePath, dataType,
common::Types::getDataTypeSize(dataType), numElements, numBlocks);
default:
throw common::CopyException("Invalid type for property column creation.");
}
}
};

} // namespace storage
} // namespace kuzu
Loading