Skip to content

Commit

Permalink
Insert into the hash index builder one chunk at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Mar 12, 2024
1 parent b7e3bc7 commit 654065f
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 127 deletions.
2 changes: 1 addition & 1 deletion src/include/common/type_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TypeUtils {
return common::PhysicalTypeID::INT128;
} else if constexpr (std::is_same_v<T, interval_t>) {
return common::PhysicalTypeID::INTERVAL;
} else if constexpr (std::is_same_v<T, ku_string_t>) {
} else if constexpr (std::same_as<T, ku_string_t> || std::same_as<T, std::string>) {
return common::PhysicalTypeID::STRING;
} else {
KU_UNREACHABLE;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ template<typename T>
concept IndexHashable = ((std::integral<T> && !std::is_same_v<T, bool>) || std::floating_point<T> ||
std::is_same_v<T, common::int128_t> ||
std::is_same_v<T, common::ku_string_t> ||
std::is_same_v<T, std::string_view>);
std::is_same_v<T, std::string_view> || std::same_as<T, std::string>);

enum class KUZU_API LogicalTypeID : uint8_t {
ANY = 0,
Expand Down
9 changes: 3 additions & 6 deletions src/include/processor/operator/persistent/index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
namespace kuzu {
namespace processor {

constexpr size_t BUFFER_SIZE = 1024;
template<typename T>
using Buffer = common::StaticVector<std::pair<T, common::offset_t>, BUFFER_SIZE>;
const size_t SHOULD_FLUSH_QUEUE_SIZE = 32;

class IndexBuilderGlobalQueues {
Expand All @@ -28,7 +25,7 @@ class IndexBuilderGlobalQueues {
void flushToDisk() const;

template<typename T>
void insert(size_t index, Buffer<T> elem) {
void insert(size_t index, storage::IndexBuffer<T> elem) {
auto& typedQueues = std::get<Queue<T>>(queues).array;
typedQueues[index].push(std::move(elem));
if (typedQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) {
Expand All @@ -49,7 +46,7 @@ class IndexBuilderGlobalQueues {

template<typename T>
struct Queue {
std::array<common::MPSCQueue<Buffer<T>>, storage::NUM_HASH_INDEXES> array;
std::array<common::MPSCQueue<storage::IndexBuffer<T>>, storage::NUM_HASH_INDEXES> array;
// Type information to help std::visit. Value is not used
T type;
};
Expand Down Expand Up @@ -92,7 +89,7 @@ class IndexBuilderLocalBuffers {

// These arrays are much too large to be inline.
template<typename T>
using Buffers = std::array<Buffer<T>, storage::NUM_HASH_INDEXES>;
using Buffers = std::array<storage::IndexBuffer<T>, storage::NUM_HASH_INDEXES>;
template<typename T>
using UniqueBuffers = std::unique_ptr<Buffers<T>>;
std::variant<UniqueBuffers<std::string>, UniqueBuffers<int64_t>, UniqueBuffers<int32_t>,
Expand Down
93 changes: 53 additions & 40 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include "common/static_vector.h"
#include "common/type_utils.h"
#include "common/types/internal_id_t.h"
#include "common/types/ku_string.h"
#include "common/types/types.h"
#include "storage/index/hash_index_header.h"
#include "storage/index/hash_index_slot.h"
Expand All @@ -18,6 +21,10 @@ class InMemHashIndex {
virtual void bulkReserve(uint32_t numEntries) = 0;
};

constexpr size_t BUFFER_SIZE = 1024;
template<typename T>
using IndexBuffer = common::StaticVector<std::pair<T, common::offset_t>, BUFFER_SIZE>;

/**
* Basic index file consists of three disk arrays: indexHeader, primary slots (pSlots), and overflow
* slots (oSlots).
Expand All @@ -44,9 +51,10 @@ class InMemHashIndex {
*
* */

// T is the key type used to access values
// S is the stored type, which is usually the same as T, with the exception of strings
template<common::IndexHashable T, typename S = T>
// T is the key type stored in the slots.
// For strings this is different than the type used when inserting/searching
// (see BufferKeyType and Key)
template<common::IndexHashable T>
class HashIndexBuilder final : public InMemHashIndex {
static_assert(getSlotCapacity<T>() <= SlotHeader::FINGERPRINT_CAPACITY);
// Size of the validity mask
Expand All @@ -62,13 +70,21 @@ class HashIndexBuilder final : public InMemHashIndex {
// Reserves space for at least the specified number of elements.
void bulkReserve(uint32_t numEntries) override;

bool append(T key, common::offset_t value);
bool lookup(T key, common::offset_t& result);
using BufferKeyType =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string, T>::type;
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
size_t append(const IndexBuffer<BufferKeyType>& buffer);
using Key =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string_view, T>::type;
bool lookup(Key key, common::offset_t& result);

void flush() override;

private:
Slot<S>* getSlot(const SlotInfo& slotInfo);
// Assumes that space has already been allocated for the entry
bool appendInternal(Key key, common::offset_t value, common::hash_t hash);
Slot<T>* getSlot(const SlotInfo& slotInfo);

uint32_t allocatePSlots(uint32_t numSlotsToAllocate);
uint32_t allocateAOSlot();
Expand All @@ -80,12 +96,12 @@ class HashIndexBuilder final : public InMemHashIndex {
*/
void splitSlot(HashIndexHeader& header);

inline bool equals(T keyToLookup, const S& keyInEntry) const {
inline bool equals(Key keyToLookup, const T& keyInEntry) const {

Check warning on line 99 in src/include/storage/index/hash_index_builder.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/index/hash_index_builder.h#L99

Added line #L99 was not covered by tests
return keyToLookup == keyInEntry;
}

inline void insert(
T key, Slot<S>* slot, uint8_t entryPos, common::offset_t value, uint8_t fingerprint) {
Key key, Slot<T>* slot, uint8_t entryPos, common::offset_t value, uint8_t fingerprint) {
auto entry = slot->entries[entryPos].data;
memcpy(entry, &key, sizeof(T));
memcpy(entry + sizeof(T), &value, sizeof(common::offset_t));
Expand All @@ -94,14 +110,14 @@ class HashIndexBuilder final : public InMemHashIndex {
}
void copy(const uint8_t* oldEntry, slot_id_t newSlotId, uint8_t fingerprint);
void insertToNewOvfSlot(
T key, Slot<S>* previousSlot, common::offset_t offset, uint8_t fingerprint);
common::hash_t hashStored(const S& key) const;
Key key, Slot<T>* previousSlot, common::offset_t offset, uint8_t fingerprint);
common::hash_t hashStored(const T& key) const;

struct SlotIterator {
explicit SlotIterator(slot_id_t newSlotId, HashIndexBuilder<T, S>* builder)
explicit SlotIterator(slot_id_t newSlotId, HashIndexBuilder<T>* builder)
: slotInfo{newSlotId, SlotType::PRIMARY}, slot(builder->getSlot(slotInfo)) {}
SlotInfo slotInfo;
Slot<S>* slot;
Slot<T>* slot;
};

inline bool nextChainedSlot(SlotIterator& iter) {
Expand All @@ -118,13 +134,13 @@ class HashIndexBuilder final : public InMemHashIndex {
std::shared_ptr<FileHandle> fileHandle;
std::unique_ptr<InMemFile> inMemOverflowFile;
std::unique_ptr<InMemDiskArrayBuilder<HashIndexHeader>> headerArray;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> oSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<T>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<T>>> oSlots;
std::unique_ptr<HashIndexHeader> indexHeader;
};

template<>
bool HashIndexBuilder<std::string_view, common::ku_string_t>::equals(
bool HashIndexBuilder<common::ku_string_t>::equals(
std::string_view keyToLookup, const common::ku_string_t& keyInEntry) const;

class PrimaryKeyIndexBuilder {
Expand All @@ -134,26 +150,15 @@ class PrimaryKeyIndexBuilder {

void bulkReserve(uint32_t numEntries);

// Note: append assumes that bulkRserve has been called before it and the index has reserved
// enough space already.
template<common::HashablePrimitive T>
bool append(T key, common::offset_t value) {
return appendWithIndexPos(key, value, HashIndexUtils::getHashIndexPosition(key));
}
bool append(std::string_view key, common::offset_t value) {
return appendWithIndexPos(key, value, HashIndexUtils::getHashIndexPosition(key));
}
template<common::HashablePrimitive T>
bool appendWithIndexPos(T key, common::offset_t value, uint64_t indexPos) {
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
template<common::IndexHashable T>
size_t appendWithIndexPos(const IndexBuffer<T>& buffer, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::TypeUtils::getPhysicalTypeIDForType<T>());
KU_ASSERT(HashIndexUtils::getHashIndexPosition(key) == indexPos);
return getTypedHashIndex<T>(indexPos)->append(key, value);
}
bool appendWithIndexPos(std::string_view key, common::offset_t value, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::PhysicalTypeID::STRING);
KU_ASSERT(HashIndexUtils::getHashIndexPosition(key) == indexPos);
return getTypedHashIndex<std::string_view, common::ku_string_t>(indexPos)->append(
key, value);
KU_ASSERT(std::all_of(buffer.begin(), buffer.end(), [&](auto& elem) {
return HashIndexUtils::getHashIndexPosition(elem.first) == indexPos;
}));
return getTypedHashIndex<T>(indexPos)->append(buffer);
}
template<common::HashablePrimitive T>
bool lookup(T key, common::offset_t& result) {
Expand All @@ -162,8 +167,7 @@ class PrimaryKeyIndexBuilder {
}
bool lookup(std::string_view key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::PhysicalTypeID::STRING);
return getTypedHashIndex<std::string_view, common::ku_string_t>(
HashIndexUtils::getHashIndexPosition(key))
return getTypedHashIndex<common::ku_string_t>(HashIndexUtils::getHashIndexPosition(key))
->lookup(key, result);
}

Expand All @@ -173,10 +177,19 @@ class PrimaryKeyIndexBuilder {
common::PhysicalTypeID keyTypeID() const { return keyDataTypeID; }

private:
template<typename T, typename S = T>
inline HashIndexBuilder<T, S>* getTypedHashIndex(uint64_t indexPos) {
return common::ku_dynamic_cast<InMemHashIndex*, HashIndexBuilder<T, S>*>(
hashIndexBuilders[indexPos].get());
template<common::IndexHashable T>
using HashIndexType =
typename std::conditional<std::same_as<T, std::string_view> || std::same_as<T, std::string>,
common::ku_string_t, T>::type;
template<typename T>
inline HashIndexBuilder<HashIndexType<T>>* getTypedHashIndex(uint64_t indexPos) {
if constexpr (std::same_as<HashIndexType<T>, common::ku_string_t>) {
return common::ku_dynamic_cast<InMemHashIndex*, HashIndexBuilder<common::ku_string_t>*>(
hashIndexBuilders[indexPos].get());
} else {
return common::ku_dynamic_cast<InMemHashIndex*, HashIndexBuilder<T>*>(
hashIndexBuilders[indexPos].get());
}
}

private:
Expand Down
48 changes: 20 additions & 28 deletions src/processor/operator/persistent/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,26 @@ void IndexBuilderGlobalQueues::maybeConsumeIndex(size_t index) {
if (!mutexes[index].try_lock()) {
return;
}
std::unique_lock lck{mutexes[index], std::adopt_lock};

std::visit(overload{[&](Queue<std::string>&& queues) {
using T = std::decay_t<decltype(queues.type)>;
Buffer<T> elem;
while (queues.array[index].pop(elem)) {
for (auto [key, value] : elem) {
if (!pkIndex->appendWithIndexPos(key, value, index)) {
throw CopyException(
ExceptionMessage::duplicatePKException(std::move(key)));
}
}
}
return;
},
[&](auto&& queues) {
using T = std::decay_t<decltype(queues.type)>;
Buffer<T> elem;
while (queues.array[index].pop(elem)) {
for (auto [key, value] : elem) {
if (!pkIndex->appendWithIndexPos(key, value, index)) {
throw CopyException(ExceptionMessage::duplicatePKException(
TypeUtils::toString(key)));
}
}
}
return;
}},

std::visit(
[&](auto&& queues) {
using T = std::decay_t<decltype(queues.type)>;
std::unique_lock lck{mutexes[index], std::adopt_lock};
IndexBuffer<T> buffer;
while (queues.array[index].pop(buffer)) {
auto numValuesInserted = pkIndex->appendWithIndexPos(buffer, index);
if (numValuesInserted < buffer.size()) {
if constexpr (std::same_as<T, std::string>) {
throw CopyException(ExceptionMessage::duplicatePKException(
std::move(buffer[numValuesInserted].first)));
} else {
throw CopyException(ExceptionMessage::duplicatePKException(
TypeUtils::toString(buffer[numValuesInserted].first)));

Check warning on line 51 in src/processor/operator/persistent/index_builder.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/index_builder.cpp#L51

Added line #L51 was not covered by tests
}
}
}
return;
},
std::move(queues));
}

Expand Down
Loading

0 comments on commit 654065f

Please sign in to comment.