Skip to content

Commit

Permalink
storage: use parallel hash index
Browse files Browse the repository at this point in the history
The design is quite simple: every hash index is now represented
internally as 256 hash indexes. This way, when copying, we can easily
operator on multiple indexes at once without locking.
  • Loading branch information
Riolku committed Dec 29, 2023
1 parent 2476aff commit 81b9466
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 203 deletions.
116 changes: 44 additions & 72 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ template<typename T>
class HashIndex : public BaseHashIndex {

public:
HashIndex(const DBFileIDAndName& dbFileIDAndName, bool readOnly,
const common::LogicalType& keyDataType, BufferManager& bufferManager, WAL* wal,
common::VirtualFileSystem* vfs);
HashIndex(const DBFileIDAndName& dbFileIDAndName,
const std::shared_ptr<BMFileHandle>& fileHandle,
const std::shared_ptr<DiskOverflowFile>& overflowFile, uint64_t indexPos,
const common::LogicalType& keyDataType, BufferManager& bufferManager, WAL* wal);

public:
bool lookupInternal(
Expand All @@ -93,7 +94,7 @@ class HashIndex : public BaseHashIndex {
void prepareCommit();
void prepareRollback();
void checkpointInMemory();
void rollbackInMemory() const;
void rollbackInMemory();
inline BMFileHandle* getFileHandle() const { return fileHandle.get(); }

private:
Expand Down Expand Up @@ -133,102 +134,73 @@ class HashIndex : public BaseHashIndex {
DBFileIDAndName dbFileIDAndName;
BufferManager& bm;
WAL* wal;
std::unique_ptr<BMFileHandle> fileHandle;
std::shared_ptr<BMFileHandle> fileHandle;
std::unique_ptr<BaseDiskArray<HashIndexHeader>> headerArray;
std::unique_ptr<BaseDiskArray<Slot<T>>> pSlots;
std::unique_ptr<BaseDiskArray<Slot<T>>> oSlots;
insert_function_t keyInsertFunc;
equals_function_t keyEqualsFunc;
std::unique_ptr<DiskOverflowFile> diskOverflowFile;
std::shared_ptr<DiskOverflowFile> diskOverflowFile;
std::unique_ptr<HashIndexLocalStorage> localStorage;
uint8_t slotCapacity;
};

class PrimaryKeyIndex {

public:
PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool readOnly,
const common::LogicalType& keyDataType, BufferManager& bufferManager, WAL* wal,
common::VirtualFileSystem* vfs)
: keyDataTypeID{keyDataType.getLogicalTypeID()} {
if (keyDataTypeID == common::LogicalTypeID::INT64) {
hashIndexForInt64 = std::make_unique<HashIndex<int64_t>>(
dbFileIDAndName, readOnly, keyDataType, bufferManager, wal, vfs);
} else {
hashIndexForString = std::make_unique<HashIndex<common::ku_string_t>>(
dbFileIDAndName, readOnly, keyDataType, bufferManager, wal, vfs);
}
}
common::VirtualFileSystem* vfs);

bool lookup(transaction::Transaction* trx, const char* key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::STRING);
return hashIndexForString[getHashIndexPosition(key)]->lookupInternal(
trx, reinterpret_cast<const uint8_t*>(key), result);
}
bool lookup(transaction::Transaction* trx, int64_t key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::INT64);
return hashIndexForInt64[getHashIndexPosition(key)]->lookupInternal(
trx, reinterpret_cast<const uint8_t*>(&key), result);
}
bool lookup(transaction::Transaction* trx, common::ValueVector* keyVector, uint64_t vectorPos,
common::offset_t& result);

void delete_(common::ValueVector* keyVector);

bool insert(common::ValueVector* keyVector, uint64_t vectorPos, common::offset_t value);

// These two lookups are used by InMemRelCSVCopier.
inline bool lookup(
transaction::Transaction* transaction, int64_t key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::INT64);
return hashIndexForInt64->lookupInternal(
transaction, reinterpret_cast<const uint8_t*>(&key), result);
}
inline bool lookup(
transaction::Transaction* transaction, const char* key, common::offset_t& result) {
bool insert(const char* key, common::offset_t value) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::STRING);
return hashIndexForString->lookupInternal(
transaction, reinterpret_cast<const uint8_t*>(key), result);
}

inline void checkpointInMemory() {
keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->checkpointInMemory() :
hashIndexForString->checkpointInMemory();
return hashIndexForString[getHashIndexPosition(key)]->insertInternal(
reinterpret_cast<const uint8_t*>(key), value);
}
inline void rollbackInMemory() {
keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->rollbackInMemory() :
hashIndexForString->rollbackInMemory();
}
inline void prepareCommit() {
keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->prepareCommit() :
hashIndexForString->prepareCommit();
}
inline void prepareRollback() {
keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->prepareRollback() :
hashIndexForString->prepareRollback();
}
inline BMFileHandle* getFileHandle() {
return keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexForInt64->getFileHandle() :
hashIndexForString->getFileHandle();
}
inline DiskOverflowFile* getDiskOverflowFile() {
return keyDataTypeID == common::LogicalTypeID::STRING ?
hashIndexForString->diskOverflowFile.get() :
nullptr;
}

private:
inline void deleteKey(int64_t key) {
bool insert(int64_t key, common::offset_t value) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::INT64);
hashIndexForInt64->deleteInternal(reinterpret_cast<const uint8_t*>(&key));
return hashIndexForInt64[getHashIndexPosition(key)]->insertInternal(
reinterpret_cast<const uint8_t*>(&key), value);
}
inline void deleteKey(const char* key) {
bool insert(common::ValueVector* keyVector, uint64_t vectorPos, common::offset_t value);

void delete_(const char* key) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::STRING);
hashIndexForString->deleteInternal(reinterpret_cast<const uint8_t*>(key));
return hashIndexForString[getHashIndexPosition(key)]->deleteInternal(
reinterpret_cast<const uint8_t*>(key));
}
inline bool insert(int64_t key, common::offset_t value) {
void delete_(int64_t key) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::INT64);
return hashIndexForInt64->insertInternal(reinterpret_cast<const uint8_t*>(&key), value);
}
inline bool insert(const char* key, common::offset_t value) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::STRING);
return hashIndexForString->insertInternal(reinterpret_cast<const uint8_t*>(key), value);
return hashIndexForInt64[getHashIndexPosition(key)]->deleteInternal(
reinterpret_cast<const uint8_t*>(&key));
}
void delete_(common::ValueVector* keyVector);

void checkpointInMemory();
void rollbackInMemory();
void prepareCommit();
void prepareRollback();
BMFileHandle* getFileHandle() { return fileHandle.get(); }
DiskOverflowFile* getDiskOverflowFile() { return overflowFile.get(); }

private:
common::LogicalTypeID keyDataTypeID;
std::unique_ptr<HashIndex<int64_t>> hashIndexForInt64;
std::unique_ptr<HashIndex<common::ku_string_t>> hashIndexForString;
std::shared_ptr<BMFileHandle> fileHandle;
std::shared_ptr<DiskOverflowFile> overflowFile;
std::vector<std::unique_ptr<HashIndex<int64_t>>> hashIndexForInt64;
std::vector<std::unique_ptr<HashIndex<common::ku_string_t>>> hashIndexForString;
};

} // namespace storage
Expand Down
89 changes: 41 additions & 48 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/mutex.h"
#include "hash_index_header.h"
#include "hash_index_slot.h"
#include "storage/index/hash_index_utils.h"
Expand All @@ -12,6 +13,7 @@ namespace storage {
static constexpr common::page_idx_t INDEX_HEADER_ARRAY_HEADER_PAGE_IDX = 0;
static constexpr common::page_idx_t P_SLOTS_HEADER_PAGE_IDX = 1;
static constexpr common::page_idx_t O_SLOTS_HEADER_PAGE_IDX = 2;
static constexpr common::page_idx_t HEADER_PAGES = 3;
static constexpr uint64_t INDEX_HEADER_IDX_IN_ARRAY = 0;

/**
Expand Down Expand Up @@ -73,35 +75,23 @@ class BaseHashIndex {
template<typename T>
class HashIndexBuilder : public BaseHashIndex {
public:
HashIndexBuilder(const std::string& fName, const common::LogicalType& keyDataType,
common::VirtualFileSystem* vfs);
HashIndexBuilder(const std::shared_ptr<FileHandle>& handle,
const std::shared_ptr<common::Mutex<InMemFile>>& overflowFile, uint64_t indexPos,
const common::LogicalType& keyDataType);

public:
// Reserves space for at least the specified number of elements.
void bulkReserve(uint32_t numEntries);

// Note: append assumes that bulkReserve has been called before it and the index has reserved
// enough space already.
inline bool append(int64_t key, common::offset_t value) {
return appendInternal(reinterpret_cast<const uint8_t*>(&key), value);
}
inline bool append(const char* key, common::offset_t value) {
return appendInternal(reinterpret_cast<const uint8_t*>(key), value);
}
inline bool lookup(int64_t key, common::offset_t& result) {
return lookupInternalWithoutLock(reinterpret_cast<const uint8_t*>(&key), result);
}
inline bool lookup(const char* key, common::offset_t& result) {
return lookupInternalWithoutLock(reinterpret_cast<const uint8_t*>(key), result);
}
bool append(const uint8_t* key, common::offset_t value);
bool lookup(const uint8_t* key, common::offset_t& result);

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
void flush();

private:
bool appendInternal(const uint8_t* key, common::offset_t value);
bool lookupInternalWithoutLock(const uint8_t* key, common::offset_t& result);

template<bool IS_LOOKUP>
bool lookupOrExistsInSlotWithoutLock(
Slot<T>* slot, const uint8_t* key, common::offset_t* result = nullptr);
Expand All @@ -111,15 +101,15 @@ class HashIndexBuilder : public BaseHashIndex {
uint32_t allocateAOSlot();

private:
std::unique_ptr<FileHandle> fileHandle;
std::shared_ptr<FileHandle> fileHandle;
std::shared_ptr<common::Mutex<InMemFile>> inMemOverflowFile;
std::unique_ptr<InMemDiskArrayBuilder<HashIndexHeader>> headerArray;
std::shared_mutex oSlotsSharedMutex;
std::unique_ptr<InMemDiskArrayBuilder<Slot<T>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<T>>> oSlots;
std::vector<std::unique_ptr<std::mutex>> pSlotsMutexes;
in_mem_insert_function_t keyInsertFunc;
in_mem_equals_function_t keyEqualsFunc;
std::unique_ptr<InMemFile> inMemOverflowFile;
uint8_t slotCapacity;
std::atomic<uint64_t> numEntries;
};
Expand All @@ -129,47 +119,50 @@ class PrimaryKeyIndexBuilder {
PrimaryKeyIndexBuilder(const std::string& fName, const common::LogicalType& keyDataType,
common::VirtualFileSystem* vfs);

inline void lock() { mtx.lock(); }

inline void unlock() { mtx.unlock(); }
void bulkReserve(uint32_t numEntries);

inline void bulkReserve(uint32_t numEntries) {
keyDataTypeID == common::LogicalTypeID::INT64 ?
hashIndexBuilderForInt64->bulkReserve(numEntries) :
hashIndexBuilderForString->bulkReserve(numEntries);
}
// Note: append assumes that bulkRserve has been called before it and the index has reserved
// enough space already.
inline bool append(int64_t key, common::offset_t value) {
return keyDataTypeID == common::LogicalTypeID::INT64 ?
hashIndexBuilderForInt64->append(key, value) :
hashIndexBuilderForString->append(key, value);
bool append(int64_t key, common::offset_t value) {
return appendWithIndexPos(key, value, getHashIndexPosition(key));
}
inline bool append(const char* key, common::offset_t value) {
return keyDataTypeID == common::LogicalTypeID::INT64 ?
hashIndexBuilderForInt64->append(key, value) :
hashIndexBuilderForString->append(key, value);
bool append(const char* key, common::offset_t value) {
return appendWithIndexPos(key, value, getHashIndexPosition(key));
}
inline bool lookup(int64_t key, common::offset_t& result) {
return keyDataTypeID == common::LogicalTypeID::INT64 ?
hashIndexBuilderForInt64->lookup(key, result) :
hashIndexBuilderForString->lookup(key, result);
bool appendWithIndexPos(int64_t key, common::offset_t value, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::INT64);
KU_ASSERT(getHashIndexPosition(key) == indexPos);
return hashIndexBuilderForInt64[indexPos]->append(
reinterpret_cast<const uint8_t*>(&key), value);
}
inline bool lookup(const char* key, common::offset_t& result) {
return hashIndexBuilderForString->lookup(key, result);
bool appendWithIndexPos(const char* key, common::offset_t value, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::STRING);
KU_ASSERT(getHashIndexPosition(key) == indexPos);
return hashIndexBuilderForString[indexPos]->append(
reinterpret_cast<const uint8_t*>(key), value);
}

// Non-thread safe. This should only be called in the copyCSV and never be called in parallel.
inline void flush() {
keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexBuilderForInt64->flush() :
hashIndexBuilderForString->flush();
bool lookup(int64_t key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::INT64);
return hashIndexBuilderForInt64[getHashIndexPosition(key)]->lookup(
reinterpret_cast<const uint8_t*>(&key), result);
}
bool lookup(const char* key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::LogicalTypeID::STRING);
return hashIndexBuilderForString[getHashIndexPosition(key)]->lookup(
reinterpret_cast<const uint8_t*>(key), result);
}

// Not thread safe.
void flush();

common::LogicalTypeID keyTypeID() const { return keyDataTypeID; }

private:
std::mutex mtx;
common::LogicalTypeID keyDataTypeID;
std::unique_ptr<HashIndexBuilder<int64_t>> hashIndexBuilderForInt64;
std::unique_ptr<HashIndexBuilder<common::ku_string_t>> hashIndexBuilderForString;
std::shared_ptr<common::Mutex<InMemFile>> overflowFile;
std::vector<std::unique_ptr<HashIndexBuilder<int64_t>>> hashIndexBuilderForInt64;
std::vector<std::unique_ptr<HashIndexBuilder<common::ku_string_t>>> hashIndexBuilderForString;
};

} // namespace storage
Expand Down
14 changes: 14 additions & 0 deletions src/include/storage/index/hash_index_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ using in_mem_insert_function_t =
using in_mem_equals_function_t =
std::function<bool(const uint8_t*, const uint8_t*, const InMemFile*)>;

const uint64_t NUM_HASH_INDEXES_LOG2 = 8;
const uint64_t NUM_HASH_INDEXES = 1 << NUM_HASH_INDEXES_LOG2;

inline uint64_t getHashIndexPosition(int64_t key) {
common::hash_t hash;
function::Hash::operation(key, hash);
return (hash >> (64 - NUM_HASH_INDEXES_LOG2)) & (NUM_HASH_INDEXES - 1);
}
inline uint64_t getHashIndexPosition(const char* key) {
auto view = std::string_view(key);
return (std::hash<std::string_view>()(view) >> (64 - NUM_HASH_INDEXES_LOG2)) &
(NUM_HASH_INDEXES - 1);
}

class InMemHashIndexUtils {
public:
static in_mem_equals_function_t initializeEqualsFunc(common::LogicalTypeID dataTypeID);
Expand Down
Loading

0 comments on commit 81b9466

Please sign in to comment.