Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the Hash Index overflow file to support multiple copies #3012

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/disk_array.h"
#include "storage/storage_structure/disk_overflow_file.h"
#include "storage/storage_structure/overflow_file.h"
#include "transaction/transaction.h"

namespace kuzu {
Expand Down Expand Up @@ -51,9 +51,8 @@ class HashIndex final : public OnDiskHashIndex {

public:
HashIndex(const DBFileIDAndName& dbFileIDAndName,
const std::shared_ptr<BMFileHandle>& fileHandle,
const std::shared_ptr<DiskOverflowFile>& overflowFile, uint64_t indexPos,
BufferManager& bufferManager, WAL* wal);
const std::shared_ptr<BMFileHandle>& fileHandle, OverflowFileHandle* overflowFileHandle,
uint64_t indexPos, BufferManager& bufferManager, WAL* wal);

~HashIndex() override;

Expand Down Expand Up @@ -181,7 +180,7 @@ class HashIndex final : public OnDiskHashIndex {
std::unique_ptr<BaseDiskArray<HashIndexHeader>> headerArray;
std::unique_ptr<BaseDiskArray<Slot<S>>> pSlots;
std::unique_ptr<BaseDiskArray<Slot<S>>> oSlots;
std::shared_ptr<DiskOverflowFile> diskOverflowFile;
OverflowFileHandle* overflowFileHandle;
std::unique_ptr<HashIndexLocalStorage<T, LocalStorageType>> localStorage;
std::unique_ptr<HashIndexHeader> indexHeaderForReadTrx;
std::unique_ptr<HashIndexHeader> indexHeaderForWriteTrx;
Expand Down Expand Up @@ -262,12 +261,12 @@ class PrimaryKeyIndex {
void prepareCommit();
void prepareRollback();
BMFileHandle* getFileHandle() { return fileHandle.get(); }
DiskOverflowFile* getDiskOverflowFile() { return overflowFile.get(); }
OverflowFile* getOverflowFile() { return overflowFile.get(); }

private:
common::PhysicalTypeID keyDataTypeID;
std::shared_ptr<BMFileHandle> fileHandle;
std::shared_ptr<DiskOverflowFile> overflowFile;
std::unique_ptr<OverflowFile> overflowFile;
std::vector<std::unique_ptr<OnDiskHashIndex>> hashIndices;
};

Expand Down
8 changes: 4 additions & 4 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "storage/index/hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/disk_array.h"
#include "storage/storage_structure/in_mem_file.h"
#include "storage/storage_structure/overflow_file.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -55,7 +55,7 @@ class HashIndexBuilder final : public InMemHashIndex {

public:
HashIndexBuilder(const std::shared_ptr<FileHandle>& handle,
std::unique_ptr<InMemFile> overflowFile, uint64_t indexPos,
OverflowFileHandle* overflowFileHandle, uint64_t indexPos,
common::PhysicalTypeID keyDataType);

public:
Expand Down Expand Up @@ -116,7 +116,7 @@ class HashIndexBuilder final : public InMemHashIndex {

private:
std::shared_ptr<FileHandle> fileHandle;
std::unique_ptr<InMemFile> inMemOverflowFile;
OverflowFileHandle* overflowFileHandle;
std::unique_ptr<InMemDiskArrayBuilder<HashIndexHeader>> headerArray;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> oSlots;
Expand Down Expand Up @@ -181,7 +181,7 @@ class PrimaryKeyIndexBuilder {

private:
common::PhysicalTypeID keyDataTypeID;
std::atomic<common::page_idx_t> overflowPageCounter;
std::unique_ptr<OverflowFile> overflowFile;
std::vector<std::unique_ptr<InMemHashIndex>> hashIndexBuilders;
};

Expand Down
83 changes: 0 additions & 83 deletions src/include/storage/storage_structure/disk_overflow_file.h

This file was deleted.

55 changes: 0 additions & 55 deletions src/include/storage/storage_structure/in_mem_file.h

This file was deleted.

149 changes: 149 additions & 0 deletions src/include/storage/storage_structure/overflow_file.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#pragma once

#include <fcntl.h>

#include <cstdint>
#include <memory>
#include <string_view>
#include <vector>

#include "common/cast.h"
#include "common/constants.h"
#include "common/types/types.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/in_mem_page.h"
#include "storage/storage_utils.h"
#include "storage/wal/wal.h"
#include "storage/wal/wal_record.h"
#include "transaction/transaction.h"

namespace kuzu {
namespace storage {

class OverflowFile;

class OverflowFileHandle {

public:
OverflowFileHandle(OverflowFile& overflowFile, PageCursor& nextPosToWriteTo)
: nextPosToWriteTo(nextPosToWriteTo), overflowFile{overflowFile} {}
// The OverflowFile stores the handles and returns pointers to them.
// Moving the handle would invalidate those pointers
OverflowFileHandle(OverflowFileHandle&& other) = delete;

std::string readString(transaction::TransactionType trxType, const common::ku_string_t& str);

bool equals(transaction::TransactionType trxType, std::string_view keyToLookup,
const common::ku_string_t& keyInEntry) const;

common::ku_string_t writeString(std::string_view rawString);
inline common::ku_string_t writeString(const char* rawString) {
return writeString(std::string_view(rawString));
}

void prepareCommit();
inline void checkpointInMemory() { pageWriteCache.clear(); }
inline void rollbackInMemory(PageCursor nextPosToWriteTo_) {
pageWriteCache.clear();
this->nextPosToWriteTo = nextPosToWriteTo_;
}

private:
uint8_t* addANewPage();
void setStringOverflow(
const char* inMemSrcStr, uint64_t len, common::ku_string_t& diskDstString);

inline void read(transaction::TransactionType trxType, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) const;

private:
static const common::page_idx_t END_OF_PAGE =
common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t);
// This is the index of the last free byte to which we can write.
PageCursor& nextPosToWriteTo;
OverflowFile& overflowFile;
// Cached pages which have been written in the current transaction
std::unordered_map<common::page_idx_t, std::unique_ptr<InMemPage>> pageWriteCache;
};

// Stores the current state of the overflow file
// The number of pages in use are stored here so that we can write new pages directly
// to the overflow file, and in the case of an interruption and rollback the header will
// still record the correct place in the file to allocate new pages
struct StringOverflowFileHeader {
common::page_idx_t pages;
PageCursor cursors[NUM_HASH_INDEXES];

// pages starts at one to reserve space for this header
StringOverflowFileHeader() : pages{1} {
std::fill(cursors, cursors + NUM_HASH_INDEXES, PageCursor());
}
};

class OverflowFile {
friend class OverflowFileHandle;

public:
// For reading an existing overflow file
OverflowFile(const DBFileIDAndName& dbFileIdAndName, BufferManager* bufferManager, WAL* wal,
bool readOnly, common::VirtualFileSystem* vfs);

// For creating an overflow file from scratch
OverflowFile(const std::string& fName, common::VirtualFileSystem* vfs)
: numPagesOnDisk{0}, fileHandle{std::make_unique<FileHandle>(
fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs)},
bufferManager{nullptr}, wal{nullptr}, pageCounter{0}, headerChanged{true} {
// Reserve a page for the header
getNewPageIdx();
}

// Handles contain a reference to the overflow file
OverflowFile(OverflowFile&& other) = delete;

void rollbackInMemory();
void prepareCommit();
void checkpointInMemory();

inline OverflowFileHandle* addHandle() {
KU_ASSERT(handles.size() < NUM_HASH_INDEXES);
handles.emplace_back(
std::make_unique<OverflowFileHandle>(*this, header.cursors[handles.size()]));
return handles.back().get();
}

inline BMFileHandle* getBMFileHandle() const {
return common::ku_dynamic_cast<FileHandle*, BMFileHandle*>(fileHandle.get());
}

private:
void readFromDisk(transaction::TransactionType trxType, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) const;

// Writes new pages directly to the file and existing pages to the WAL
void writePageToDisk(common::page_idx_t pageIdx, uint8_t* data) const;

inline common::page_idx_t getNewPageIdx() {
// If this isn't the first call reserving the page header, then the header flag must be set
// prior to this
KU_ASSERT(pageCounter == HEADER_PAGE_IDX || headerChanged);
return pageCounter.fetch_add(1);
}

private:
static const uint64_t HEADER_PAGE_IDX = 0;

std::vector<std::unique_ptr<OverflowFileHandle>> handles;
StringOverflowFileHeader header;
common::page_idx_t numPagesOnDisk;
DBFileID dbFileID;
std::unique_ptr<FileHandle> fileHandle;
BufferManager* bufferManager;
WAL* wal;
std::atomic<common::page_idx_t> pageCounter;
std::atomic<bool> headerChanged;
};
} // namespace storage
} // namespace kuzu
Loading
Loading