Skip to content

Commit

Permalink
Hash index bulk insert
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Mar 22, 2024
1 parent 5267f00 commit e8bb1f9
Show file tree
Hide file tree
Showing 14 changed files with 459 additions and 220 deletions.
9 changes: 4 additions & 5 deletions src/include/processor/operator/persistent/index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "common/types/internal_id_t.h"
#include "common/types/types.h"
#include "processor/execution_context.h"
#include "storage/index/hash_index_builder.h"
#include "storage/index/hash_index.h"
#include "storage/index/hash_index_utils.h"
#include "storage/store/column_chunk.h"

Expand All @@ -20,7 +20,7 @@ const size_t SHOULD_FLUSH_QUEUE_SIZE = 32;

class IndexBuilderGlobalQueues {
public:
explicit IndexBuilderGlobalQueues(storage::PrimaryKeyIndexBuilder* pkIndex);
explicit IndexBuilderGlobalQueues(storage::PrimaryKeyIndex* pkIndex);

void flushToDisk() const;

Expand All @@ -42,7 +42,7 @@ class IndexBuilderGlobalQueues {
void maybeConsumeIndex(size_t index);

std::array<std::mutex, storage::NUM_HASH_INDEXES> mutexes;
storage::PrimaryKeyIndexBuilder* pkIndex;
storage::PrimaryKeyIndex* pkIndex;

template<typename T>
struct Queue {
Expand Down Expand Up @@ -103,8 +103,7 @@ class IndexBuilderSharedState {
friend class IndexBuilder;

public:
explicit IndexBuilderSharedState(storage::PrimaryKeyIndexBuilder* pkIndex)
: globalQueues{pkIndex} {}
explicit IndexBuilderSharedState(storage::PrimaryKeyIndex* pkIndex) : globalQueues{pkIndex} {}
inline void consume() { globalQueues.consume(); }
inline void flush() { globalQueues.flushToDisk(); }

Expand Down
8 changes: 6 additions & 2 deletions src/include/processor/operator/persistent/node_batch_insert.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/cast.h"
#include "processor/operator/aggregate/hash_aggregate.h"
#include "processor/operator/call/in_query_call.h"
#include "processor/operator/persistent/batch_insert.h"
Expand Down Expand Up @@ -33,7 +34,7 @@ struct NodeBatchInsertInfo final : public BatchInsertInfo {

struct NodeBatchInsertSharedState final : public BatchInsertSharedState {
// Primary key info
std::shared_ptr<storage::PrimaryKeyIndexBuilder> pkIndex;
storage::PrimaryKeyIndex* pkIndex;
common::vector_idx_t pkColumnIdx;
common::LogicalType pkType;
std::optional<IndexBuilder> globalIndexBuilder = std::nullopt;
Expand All @@ -49,7 +50,10 @@ struct NodeBatchInsertSharedState final : public BatchInsertSharedState {
NodeBatchInsertSharedState(
storage::Table* table, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal)
: BatchInsertSharedState{table, fTable, wal}, readerSharedState{nullptr},
distinctSharedState{nullptr}, currentNodeGroupIdx{0}, sharedNodeGroup{nullptr} {};
distinctSharedState{nullptr}, currentNodeGroupIdx{0}, sharedNodeGroup{nullptr} {
pkIndex =
common::ku_dynamic_cast<storage::Table*, storage::NodeTable*>(table)->getPKIndex();
}

void initPKIndex(ExecutionContext* context);

Expand Down
94 changes: 88 additions & 6 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@

#include <string_view>

#include "common/cast.h"
#include "common/file_system/virtual_file_system.h"
#include "common/type_utils.h"
#include "common/types/internal_id_t.h"
#include "common/types/ku_string.h"
#include "common/types/types.h"
#include "hash_index_header.h"
#include "hash_index_slot.h"
#include "storage/buffer_manager/bm_file_handle.h"
#include "storage/file_handle.h"
#include "storage/index/hash_index_builder.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_structure/disk_array.h"
#include "storage/storage_structure/overflow_file.h"
Expand All @@ -25,6 +31,7 @@ class OnDiskHashIndex {
virtual void prepareRollback() = 0;
virtual void checkpointInMemory() = 0;
virtual void rollbackInMemory() = 0;
virtual void bulkReserve(uint64_t numValuesToAppend) = 0;
};

// HashIndex is the entrance to handle all updates and lookups into the index after building from
Expand Down Expand Up @@ -65,6 +72,18 @@ class HashIndex final : public OnDiskHashIndex {
void deleteInternal(Key key) const;
bool insertInternal(Key key, common::offset_t value);

using BufferKeyType =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string, T>::type;
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
size_t append(const IndexBuffer<BufferKeyType>& buffer) {
// Keep the same number of primary slots in the builder as we will eventually need when
// flushing to disk, so that we know each slot to write to
bulkInsertLocalStorage.bulkReserve(
indexHeaderForWriteTrx->numEntries + bulkInsertLocalStorage.size() + buffer.size());
return bulkInsertLocalStorage.append(buffer);
}

void prepareCommit() override;
void prepareRollback() override;
void checkpointInMemory() override;
Expand Down Expand Up @@ -99,13 +118,33 @@ class HashIndex final : public OnDiskHashIndex {
appendPSlot();
rehashSlots(header);
header.incrementNextSplitSlotId();
validateEntries(transaction::TransactionType::WRITE);
}
void rehashSlots(HashIndexHeader& header);

void forEach(transaction::TransactionType trxType,
std::function<void(slot_id_t, uint8_t, SlotEntry<T>)> func);
// TODO: If locking is removed from the disk arrays, then many of these functions could be const
std::string toString(transaction::TransactionType trxType);
void validateEntries(transaction::TransactionType trxType);

// Resizes the local storage to support the given number of new entries
inline void bulkReserve(uint64_t /*newEntries*/) override {
// FIXME: Don't bulk reserve for now. because of the way things are split up, we may reserve
// more than we actually need, and then when flushing the number of primary slots won't
// match Another reason it may be better to start smaller and split slots when merging to
// persistent storage bulkInsertLocalStorage.bulkReserve(indexHeaderForWriteTrx->numEntries
// + newEntries);
}
// Resizes the on-disk index to support the given number of new entries
void reserve(uint64_t newEntries);
void mergeBulkInserts();

inline bool equals(
transaction::TransactionType /*trxType*/, Key keyToLookup, const T& keyInEntry) const {
return keyToLookup == keyInEntry;
}
// TODO: Replace with separate copy and insert functions
template<typename K, bool isCopyEntry>
void copyAndUpdateSlotHeader(
Slot<T>& slot, entry_pos_t entryPos, K key, common::offset_t value, uint8_t fingerprint) {
Expand Down Expand Up @@ -172,10 +211,6 @@ class HashIndex final : public OnDiskHashIndex {
void copyEntryToSlot(slot_id_t slotId, const T& entry, uint8_t fingerprint);

private:
// Local Storage for strings stores an std::string, but still takes std::string_view as keys to
// avoid unnecessary copying
using LocalStorageType = typename std::conditional<std::is_same<T, common::ku_string_t>::value,
std::string, T>::type;
DBFileIDAndName dbFileIDAndName;
BufferManager& bm;
WAL* wal;
Expand All @@ -184,7 +219,8 @@ class HashIndex final : public OnDiskHashIndex {
std::unique_ptr<BaseDiskArray<Slot<T>>> pSlots;
std::unique_ptr<BaseDiskArray<Slot<T>>> oSlots;
OverflowFileHandle* overflowFileHandle;
std::unique_ptr<HashIndexLocalStorage<LocalStorageType>> localStorage;
std::unique_ptr<HashIndexLocalStorage<BufferKeyType>> localStorage;
HashIndexBuilder<T> bulkInsertLocalStorage;
std::unique_ptr<HashIndexHeader> indexHeaderForReadTrx;
std::unique_ptr<HashIndexHeader> indexHeaderForWriteTrx;
};
Expand All @@ -203,6 +239,8 @@ class PrimaryKeyIndex {
common::PhysicalTypeID keyDataType, BufferManager& bufferManager, WAL* wal,
common::VirtualFileSystem* vfs);

void initHashIndices();

~PrimaryKeyIndex();

template<typename T>
Expand All @@ -214,6 +252,11 @@ class PrimaryKeyIndex {
return common::ku_dynamic_cast<OnDiskHashIndex*, HashIndex<HashIndexType<T>>*>(
hashIndices[HashIndexUtils::getHashIndexPosition(key)].get());
}
template<common::IndexHashable T>
inline HashIndex<T>* getTypedHashIndexByPos(uint64_t indexPos) {
return common::ku_dynamic_cast<OnDiskHashIndex*, HashIndex<HashIndexType<T>>*>(
hashIndices[indexPos].get());
}

inline bool lookup(
transaction::Transaction* trx, common::ku_string_t key, common::offset_t& result) {
Expand All @@ -238,6 +281,24 @@ class PrimaryKeyIndex {
}
bool insert(common::ValueVector* keyVector, uint64_t vectorPos, common::offset_t value);

// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
template<common::IndexHashable T>
size_t appendWithIndexPos(const IndexBuffer<T>& buffer, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::TypeUtils::getPhysicalTypeIDForType<T>());
KU_ASSERT(std::all_of(buffer.begin(), buffer.end(), [&](auto& elem) {
return HashIndexUtils::getHashIndexPosition(elem.first) == indexPos;
}));
return getTypedHashIndexByPos<HashIndexType<T>>(indexPos)->append(buffer);
}

void bulkReserve(uint64_t numValuesToAppend) {
uint32_t eachSize = numValuesToAppend / NUM_HASH_INDEXES + 1;
for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {
hashIndices[i]->bulkReserve(eachSize);
}
}

inline void delete_(common::ku_string_t key) { return delete_(key.getAsStringView()); }
template<common::IndexHashable T>
inline void delete_(T key) {
Expand All @@ -251,9 +312,30 @@ class PrimaryKeyIndex {
void rollbackInMemory();
void prepareCommit();
void prepareRollback();
BMFileHandle* getFileHandle() { return fileHandle.get(); }
BMFileHandle* getFileHandle() {
return common::ku_dynamic_cast<FileHandle*, BMFileHandle*>(fileHandle.get());
}
OverflowFile* getOverflowFile() { return overflowFile.get(); }

inline common::PhysicalTypeID keyTypeID() { return keyDataTypeID; }

static void createEmptyHashIndexFiles(
common::PhysicalTypeID typeID, const std::string& fName, common::VirtualFileSystem* vfs) {
FileHandle fileHandle(fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs);
fileHandle.addNewPages(NUM_HEADER_PAGES * NUM_HASH_INDEXES);
common::TypeUtils::visit(
typeID,
[&]<common::IndexHashable T>(T) {
if constexpr (std::same_as<T, common::ku_string_t>) {
OverflowFile::createEmptyFiles(StorageUtils::getOverflowFileName(fName), vfs);
}
for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {
HashIndexBuilder<T>::createEmptyIndexFiles(i, fileHandle);
}
},
[&](auto) { KU_UNREACHABLE; });
}

private:
common::PhysicalTypeID keyDataTypeID;
std::shared_ptr<BMFileHandle> fileHandle;
Expand Down
Loading

0 comments on commit e8bb1f9

Please sign in to comment.