Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert into the hash index builder one chunk at a time #2997

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/common/type_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class TypeUtils {
return common::PhysicalTypeID::INT128;
} else if constexpr (std::is_same_v<T, interval_t>) {
return common::PhysicalTypeID::INTERVAL;
} else if constexpr (std::is_same_v<T, ku_string_t>) {
} else if constexpr (std::same_as<T, ku_string_t> || std::same_as<T, std::string>) {
return common::PhysicalTypeID::STRING;
} else {
KU_UNREACHABLE;
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ template<typename T>
concept IndexHashable = ((std::integral<T> && !std::is_same_v<T, bool>) || std::floating_point<T> ||
std::is_same_v<T, common::int128_t> ||
std::is_same_v<T, common::ku_string_t> ||
std::is_same_v<T, std::string_view>);
std::is_same_v<T, std::string_view> || std::same_as<T, std::string>);

enum class KUZU_API LogicalTypeID : uint8_t {
ANY = 0,
Expand Down
9 changes: 3 additions & 6 deletions src/include/processor/operator/persistent/index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
namespace kuzu {
namespace processor {

constexpr size_t BUFFER_SIZE = 1024;
template<typename T>
using Buffer = common::StaticVector<std::pair<T, common::offset_t>, BUFFER_SIZE>;
const size_t SHOULD_FLUSH_QUEUE_SIZE = 32;

class IndexBuilderGlobalQueues {
Expand All @@ -28,7 +25,7 @@ class IndexBuilderGlobalQueues {
void flushToDisk() const;

template<typename T>
void insert(size_t index, Buffer<T> elem) {
void insert(size_t index, storage::IndexBuffer<T> elem) {
auto& typedQueues = std::get<Queue<T>>(queues).array;
typedQueues[index].push(std::move(elem));
if (typedQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) {
Expand All @@ -49,7 +46,7 @@ class IndexBuilderGlobalQueues {

template<typename T>
struct Queue {
std::array<common::MPSCQueue<Buffer<T>>, storage::NUM_HASH_INDEXES> array;
std::array<common::MPSCQueue<storage::IndexBuffer<T>>, storage::NUM_HASH_INDEXES> array;
// Type information to help std::visit. Value is not used
T type;
};
Expand Down Expand Up @@ -92,7 +89,7 @@ class IndexBuilderLocalBuffers {

// These arrays are much too large to be inline.
template<typename T>
using Buffers = std::array<Buffer<T>, storage::NUM_HASH_INDEXES>;
using Buffers = std::array<storage::IndexBuffer<T>, storage::NUM_HASH_INDEXES>;
template<typename T>
using UniqueBuffers = std::unique_ptr<Buffers<T>>;
std::variant<UniqueBuffers<std::string>, UniqueBuffers<int64_t>, UniqueBuffers<int32_t>,
Expand Down
93 changes: 53 additions & 40 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include "common/static_vector.h"
#include "common/type_utils.h"
#include "common/types/internal_id_t.h"
#include "common/types/ku_string.h"
#include "common/types/types.h"
#include "storage/index/hash_index_header.h"
#include "storage/index/hash_index_slot.h"
Expand All @@ -18,6 +21,10 @@
virtual void bulkReserve(uint32_t numEntries) = 0;
};

constexpr size_t BUFFER_SIZE = 1024;
template<typename T>
using IndexBuffer = common::StaticVector<std::pair<T, common::offset_t>, BUFFER_SIZE>;

/**
* Basic index file consists of three disk arrays: indexHeader, primary slots (pSlots), and overflow
* slots (oSlots).
Expand All @@ -44,9 +51,10 @@
*
* */

// T is the key type used to access values
// S is the stored type, which is usually the same as T, with the exception of strings
template<common::IndexHashable T, typename S = T>
// T is the key type stored in the slots.
// For strings this is different than the type used when inserting/searching
// (see BufferKeyType and Key)
template<common::IndexHashable T>
class HashIndexBuilder final : public InMemHashIndex {
static_assert(getSlotCapacity<T>() <= SlotHeader::FINGERPRINT_CAPACITY);
// Size of the validity mask
Expand All @@ -62,13 +70,21 @@
// Reserves space for at least the specified number of elements.
void bulkReserve(uint32_t numEntries) override;

bool append(T key, common::offset_t value);
bool lookup(T key, common::offset_t& result);
using BufferKeyType =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string, T>::type;
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
size_t append(const IndexBuffer<BufferKeyType>& buffer);
using Key =
typename std::conditional<std::same_as<T, common::ku_string_t>, std::string_view, T>::type;
bool lookup(Key key, common::offset_t& result);

void flush() override;

private:
Slot<S>* getSlot(const SlotInfo& slotInfo);
// Assumes that space has already been allocated for the entry
bool appendInternal(Key key, common::offset_t value, common::hash_t hash);
Slot<T>* getSlot(const SlotInfo& slotInfo);

uint32_t allocatePSlots(uint32_t numSlotsToAllocate);
uint32_t allocateAOSlot();
Expand All @@ -80,12 +96,12 @@
*/
void splitSlot(HashIndexHeader& header);

inline bool equals(T keyToLookup, const S& keyInEntry) const {
inline bool equals(Key keyToLookup, const T& keyInEntry) const {

Check warning on line 99 in src/include/storage/index/hash_index_builder.h

View check run for this annotation

Codecov / codecov/patch

src/include/storage/index/hash_index_builder.h#L99

Added line #L99 was not covered by tests
return keyToLookup == keyInEntry;
}

inline void insert(
T key, Slot<S>* slot, uint8_t entryPos, common::offset_t value, uint8_t fingerprint) {
Key key, Slot<T>* slot, uint8_t entryPos, common::offset_t value, uint8_t fingerprint) {
auto entry = slot->entries[entryPos].data;
memcpy(entry, &key, sizeof(T));
memcpy(entry + sizeof(T), &value, sizeof(common::offset_t));
Expand All @@ -94,14 +110,14 @@
}
void copy(const uint8_t* oldEntry, slot_id_t newSlotId, uint8_t fingerprint);
void insertToNewOvfSlot(
T key, Slot<S>* previousSlot, common::offset_t offset, uint8_t fingerprint);
common::hash_t hashStored(const S& key) const;
Key key, Slot<T>* previousSlot, common::offset_t offset, uint8_t fingerprint);
common::hash_t hashStored(const T& key) const;

struct SlotIterator {
explicit SlotIterator(slot_id_t newSlotId, HashIndexBuilder<T, S>* builder)
explicit SlotIterator(slot_id_t newSlotId, HashIndexBuilder<T>* builder)
: slotInfo{newSlotId, SlotType::PRIMARY}, slot(builder->getSlot(slotInfo)) {}
SlotInfo slotInfo;
Slot<S>* slot;
Slot<T>* slot;
};

inline bool nextChainedSlot(SlotIterator& iter) {
Expand All @@ -118,13 +134,13 @@
std::shared_ptr<FileHandle> fileHandle;
OverflowFileHandle* overflowFileHandle;
std::unique_ptr<InMemDiskArrayBuilder<HashIndexHeader>> headerArray;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<S>>> oSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<T>>> pSlots;
std::unique_ptr<InMemDiskArrayBuilder<Slot<T>>> oSlots;
std::unique_ptr<HashIndexHeader> indexHeader;
};

template<>
bool HashIndexBuilder<std::string_view, common::ku_string_t>::equals(
bool HashIndexBuilder<common::ku_string_t>::equals(
std::string_view keyToLookup, const common::ku_string_t& keyInEntry) const;

class PrimaryKeyIndexBuilder {
Expand All @@ -134,26 +150,15 @@

void bulkReserve(uint32_t numEntries);

// Note: append assumes that bulkRserve has been called before it and the index has reserved
// enough space already.
template<common::HashablePrimitive T>
bool append(T key, common::offset_t value) {
return appendWithIndexPos(key, value, HashIndexUtils::getHashIndexPosition(key));
}
bool append(std::string_view key, common::offset_t value) {
return appendWithIndexPos(key, value, HashIndexUtils::getHashIndexPosition(key));
}
template<common::HashablePrimitive T>
bool appendWithIndexPos(T key, common::offset_t value, uint64_t indexPos) {
// Appends the buffer to the index. Returns the number of values successfully inserted.
// I.e. if a key fails to insert, its index will be the return value
template<common::IndexHashable T>
size_t appendWithIndexPos(const IndexBuffer<T>& buffer, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::TypeUtils::getPhysicalTypeIDForType<T>());
KU_ASSERT(HashIndexUtils::getHashIndexPosition(key) == indexPos);
return getTypedHashIndex<T>(indexPos)->append(key, value);
}
bool appendWithIndexPos(std::string_view key, common::offset_t value, uint64_t indexPos) {
KU_ASSERT(keyDataTypeID == common::PhysicalTypeID::STRING);
KU_ASSERT(HashIndexUtils::getHashIndexPosition(key) == indexPos);
return getTypedHashIndex<std::string_view, common::ku_string_t>(indexPos)->append(
key, value);
KU_ASSERT(std::all_of(buffer.begin(), buffer.end(), [&](auto& elem) {
return HashIndexUtils::getHashIndexPosition(elem.first) == indexPos;
}));
return getTypedHashIndex<T>(indexPos)->append(buffer);
}
template<common::HashablePrimitive T>
bool lookup(T key, common::offset_t& result) {
Expand All @@ -162,8 +167,7 @@
}
bool lookup(std::string_view key, common::offset_t& result) {
KU_ASSERT(keyDataTypeID == common::PhysicalTypeID::STRING);
return getTypedHashIndex<std::string_view, common::ku_string_t>(
HashIndexUtils::getHashIndexPosition(key))
return getTypedHashIndex<common::ku_string_t>(HashIndexUtils::getHashIndexPosition(key))
->lookup(key, result);
}

Expand All @@ -173,10 +177,19 @@
common::PhysicalTypeID keyTypeID() const { return keyDataTypeID; }

private:
template<typename T, typename S = T>
inline HashIndexBuilder<T, S>* getTypedHashIndex(uint64_t indexPos) {
return common::ku_dynamic_cast<InMemHashIndex*, HashIndexBuilder<T, S>*>(
hashIndexBuilders[indexPos].get());
template<common::IndexHashable T>
using HashIndexType =
typename std::conditional<std::same_as<T, std::string_view> || std::same_as<T, std::string>,
common::ku_string_t, T>::type;
template<typename T>
inline HashIndexBuilder<HashIndexType<T>>* getTypedHashIndex(uint64_t indexPos) {
benjaminwinger marked this conversation as resolved.
Show resolved Hide resolved
if constexpr (std::same_as<HashIndexType<T>, common::ku_string_t>) {
return common::ku_dynamic_cast<InMemHashIndex*, HashIndexBuilder<common::ku_string_t>*>(
hashIndexBuilders[indexPos].get());
} else {
return common::ku_dynamic_cast<InMemHashIndex*, HashIndexBuilder<T>*>(
hashIndexBuilders[indexPos].get());
}
}

private:
Expand Down
48 changes: 20 additions & 28 deletions src/processor/operator/persistent/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,26 @@
if (!mutexes[index].try_lock()) {
return;
}
std::unique_lock lck{mutexes[index], std::adopt_lock};

std::visit(overload{[&](Queue<std::string>&& queues) {
using T = std::decay_t<decltype(queues.type)>;
Buffer<T> elem;
while (queues.array[index].pop(elem)) {
for (auto [key, value] : elem) {
if (!pkIndex->appendWithIndexPos(key, value, index)) {
throw CopyException(
ExceptionMessage::duplicatePKException(std::move(key)));
}
}
}
return;
},
[&](auto&& queues) {
using T = std::decay_t<decltype(queues.type)>;
Buffer<T> elem;
while (queues.array[index].pop(elem)) {
for (auto [key, value] : elem) {
if (!pkIndex->appendWithIndexPos(key, value, index)) {
throw CopyException(ExceptionMessage::duplicatePKException(
TypeUtils::toString(key)));
}
}
}
return;
}},

std::visit(
[&](auto&& queues) {
using T = std::decay_t<decltype(queues.type)>;
std::unique_lock lck{mutexes[index], std::adopt_lock};
IndexBuffer<T> buffer;
while (queues.array[index].pop(buffer)) {
auto numValuesInserted = pkIndex->appendWithIndexPos(buffer, index);
if (numValuesInserted < buffer.size()) {
if constexpr (std::same_as<T, std::string>) {
throw CopyException(ExceptionMessage::duplicatePKException(
std::move(buffer[numValuesInserted].first)));
} else {
throw CopyException(ExceptionMessage::duplicatePKException(
TypeUtils::toString(buffer[numValuesInserted].first)));

Check warning on line 51 in src/processor/operator/persistent/index_builder.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/index_builder.cpp#L51

Added line #L51 was not covered by tests
}
}
}
return;
},
std::move(queues));
}

Expand Down
Loading
Loading