Skip to content

Commit

Permalink
Rewrite the Hash Index overflow file to support multiple copies and f…
Browse files Browse the repository at this point in the history
…ull parallelism
  • Loading branch information
benjaminwinger committed Mar 8, 2024
1 parent 38e4398 commit fa9fb62
Show file tree
Hide file tree
Showing 16 changed files with 441 additions and 514 deletions.
13 changes: 6 additions & 7 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/disk_array.h"
#include "storage/storage_structure/disk_overflow_file.h"
#include "storage/storage_structure/overflow_file.h"
#include "transaction/transaction.h"

namespace kuzu {
Expand Down Expand Up @@ -51,9 +51,8 @@ class HashIndex final : public OnDiskHashIndex {

public:
HashIndex(const DBFileIDAndName& dbFileIDAndName,
const std::shared_ptr<BMFileHandle>& fileHandle,
const std::shared_ptr<DiskOverflowFile>& overflowFile, uint64_t indexPos,
BufferManager& bufferManager, WAL* wal);
const std::shared_ptr<BMFileHandle>& fileHandle, OverflowFileHandle* overflowFileHandle,
uint64_t indexPos, BufferManager& bufferManager, WAL* wal);

~HashIndex() override;

Expand Down Expand Up @@ -181,7 +180,7 @@ class HashIndex final : public OnDiskHashIndex {
std::unique_ptr<BaseDiskArray<HashIndexHeader>> headerArray;
std::unique_ptr<BaseDiskArray<Slot<S>>> pSlots;
std::unique_ptr<BaseDiskArray<Slot<S>>> oSlots;
std::shared_ptr<DiskOverflowFile> diskOverflowFile;
OverflowFileHandle* overflowFileHandle;
std::unique_ptr<HashIndexLocalStorage<T, LocalStorageType>> localStorage;
std::unique_ptr<HashIndexHeader> indexHeaderForReadTrx;
std::unique_ptr<HashIndexHeader> indexHeaderForWriteTrx;
Expand Down Expand Up @@ -262,12 +261,12 @@ class PrimaryKeyIndex {
void prepareCommit();
void prepareRollback();
BMFileHandle* getFileHandle() { return fileHandle.get(); }
DiskOverflowFile* getDiskOverflowFile() { return overflowFile.get(); }
OverflowFile* getOverflowFile() { return overflowFile.get(); }

private:
common::PhysicalTypeID keyDataTypeID;
std::shared_ptr<BMFileHandle> fileHandle;
std::shared_ptr<DiskOverflowFile> overflowFile;
std::unique_ptr<OverflowFile> overflowFile;
std::vector<std::unique_ptr<OnDiskHashIndex>> hashIndices;
};

Expand Down
8 changes: 4 additions & 4 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "storage/index/hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/disk_array.h"
#include "storage/storage_structure/in_mem_file.h"
#include "storage/storage_structure/overflow_file.h"

namespace kuzu {
namespace storage {
Expand Down Expand Up @@ -55,7 +55,7 @@ class HashIndexBuilder final : public InMemHashIndex {

public:
HashIndexBuilder(const std::shared_ptr<FileHandle>& handle,
std::unique_ptr<InMemFile> overflowFile, uint64_t indexPos,
OverflowFileHandle* overflowFileHandle, uint64_t indexPos,
common::PhysicalTypeID keyDataType);

public:
Expand Down Expand Up @@ -116,7 +116,7 @@ class HashIndexBuilder final : public InMemHashIndex {

private:
std::shared_ptr<FileHandle> fileHandle;
std::unique_ptr<InMemFile> inMemOverflowFile;
OverflowFileHandle* overflowFileHandle;
std::unique_ptr<InMemDiskArrayBuilder<HashIndexHeader>> headerArray;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> oSlots;
Expand Down Expand Up @@ -181,7 +181,7 @@ class PrimaryKeyIndexBuilder {

private:
common::PhysicalTypeID keyDataTypeID;
std::atomic<common::page_idx_t> overflowPageCounter;
std::unique_ptr<OverflowFile> overflowFile;
std::vector<std::unique_ptr<InMemHashIndex>> hashIndexBuilders;
};

Expand Down
83 changes: 0 additions & 83 deletions src/include/storage/storage_structure/disk_overflow_file.h

This file was deleted.

55 changes: 0 additions & 55 deletions src/include/storage/storage_structure/in_mem_file.h

This file was deleted.

159 changes: 159 additions & 0 deletions src/include/storage/storage_structure/overflow_file.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#pragma once

#include <fcntl.h>

#include <cstdint>
#include <memory>
#include <string_view>
#include <vector>

#include "common/cast.h"
#include "common/constants.h"
#include "common/types/types.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/in_mem_page.h"
#include "storage/storage_utils.h"
#include "storage/wal/wal.h"
#include "storage/wal/wal_record.h"
#include "transaction/transaction.h"

namespace kuzu {
namespace storage {

class OverflowFile;

class OverflowFileHandle {

public:
OverflowFileHandle(OverflowFile& overflowFile, PageCursor& nextPosToWriteTo)
: nextPosToWriteTo(nextPosToWriteTo), overflowFile{overflowFile} {}
// The OverflowFile stores the handles and returns pointers to them.
// Moving the handle would invalidate those pointers
OverflowFileHandle(OverflowFileHandle&& other) = delete;

std::string readString(transaction::TransactionType trxType, const common::ku_string_t& str);

bool equals(transaction::TransactionType trxType, std::string_view keyToLookup,
const common::ku_string_t& keyInEntry) const;

common::ku_string_t writeString(std::string_view rawString);
inline common::ku_string_t writeString(const char* rawString) {
return writeString(std::string_view(rawString));
}

void prepareCommit();
inline void checkpointInMemory() { pageCache.clear(); }
inline void rollbackInMemory(PageCursor nextPosToWriteTo) {
pageCache.clear();
this->nextPosToWriteTo = nextPosToWriteTo;
}

private:
uint8_t* addANewPage();
void setStringOverflow(
const char* inMemSrcStr, uint64_t len, common::ku_string_t& diskDstString);

inline void read(transaction::TransactionType trxType, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) const;

private:
static const common::page_idx_t END_OF_PAGE =
common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t);
// This is the index of the last free byte to which we can write.
PageCursor& nextPosToWriteTo;
OverflowFile& overflowFile;
// Cached pages which have been written in the current transaction
std::unordered_map<common::page_idx_t, std::unique_ptr<InMemPage>> pageCache;
};

// Stores the current state of the overflow file
// The number of pages in use are stored here so that we can write new pages directly
// to the overflow file, and in the case of an interruption and rollback the header will
// still record the correct place in the file to allocate new pages
struct StringOverflowFileHeader {
common::page_idx_t pages;
PageCursor cursors[NUM_HASH_INDEXES];

StringOverflowFileHeader() : pages{1} {
std::fill(cursors, cursors + NUM_HASH_INDEXES, PageCursor());
}
};

class OverflowFile {
public:
// For reading an existing overflow file
OverflowFile(const DBFileIDAndName& dbFileIdAndName, BufferManager* bufferManager, WAL* wal,
bool readOnly, common::VirtualFileSystem* vfs);

// For creating an overflow file from scratch
OverflowFile(const std::string& fName, common::VirtualFileSystem* vfs)
: numPagesOnDisk{0}, fileHandle{std::make_unique<FileHandle>(
fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs)},
bufferManager{nullptr}, wal{nullptr}, pageCounter{0}, headerChanged{true} {
// Reserve a page for the header
getNewPageIdx();
}

// Handles contain a reference to the overflow file
OverflowFile(OverflowFile&& other) = delete;

void rollbackInMemory();
void prepareCommit();
void checkpointInMemory();

static inline DBFileIDAndName constructDBFileIDAndName(
const DBFileIDAndName& dbFileIdAndNameForMainDBFile) {
DBFileIDAndName copy = dbFileIdAndNameForMainDBFile;
copy.dbFileID.isOverflow = true;
copy.fName = StorageUtils::getOverflowFileName(dbFileIdAndNameForMainDBFile.fName);
return copy;
}

inline OverflowFileHandle* addHandle() {
KU_ASSERT(handles.size() < NUM_HASH_INDEXES);
handles.emplace_back(
std::make_unique<OverflowFileHandle>(*this, header.cursors[handles.size()]));
return handles.back().get();
}

inline common::page_idx_t getNewPageIdx() {
// If this isn't the first call reserving the page header, then the header flag must be set
// prior to this
KU_ASSERT(pageCounter == 0 || headerChanged);
return pageCounter.fetch_add(1);
}

inline void setHeaderChanged() { headerChanged = true; }

inline BufferManager& getBM() { return *bufferManager; }

inline WAL& getWAL() { return *wal; }

inline BMFileHandle* getBMFileHandle() const {
return common::ku_dynamic_cast<FileHandle*, BMFileHandle*>(fileHandle.get());
}

inline DBFileID& getFileID() { return dbFileID; }

void readFromDisk(transaction::TransactionType trxType, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func) const;

// Writes new pages directly to the file and existing pages to the WAL
void writePageToDisk(common::page_idx_t pageIdx, uint8_t* data) const;

private:
std::vector<std::unique_ptr<OverflowFileHandle>> handles;
StringOverflowFileHeader header;
common::page_idx_t numPagesOnDisk;
DBFileID dbFileID;
std::unique_ptr<FileHandle> fileHandle;
BufferManager* bufferManager;
WAL* wal;
std::atomic<common::page_idx_t> pageCounter;
std::atomic<bool> headerChanged;
};
} // namespace storage
} // namespace kuzu
Loading

0 comments on commit fa9fb62

Please sign in to comment.