Skip to content

Commit

Permalink
Fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Mar 22, 2024
1 parent 388d259 commit 6b4d510
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class HashIndex final : public OnDiskHashIndex {
// TODO: If locking is removed from the disk arrays, then many of these functions could be const
std::string toString(transaction::TransactionType trxType);
void validateEntries(transaction::TransactionType trxType);
void validateBulkInserts();

// Resizes the local storage to support the given number of new entries
inline void bulkReserve(uint64_t /*newEntries*/) override {
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class HashIndexBuilder final {

uint64_t size() { return this->indexHeader->numEntries; }

void forEach(std::function<void(slot_id_t, uint8_t, SlotEntry<T>)> func);
std::string toString();

// Assumes that space has already been allocated for the entry
bool appendInternal(Key key, common::offset_t value, common::hash_t hash);
Slot<T>* getSlot(const SlotInfo& slotInfo);
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ class InMemDiskArrayBuilderInternal : public BaseInMemDiskArray {

void saveToDisk();

inline uint64_t getNumElements() { return header.numElements; }
inline uint64_t getNumElements() const { return header.numElements; }

private:
inline uint64_t getNumArrayPagesNeededForElements(uint64_t numElements) {
Expand All @@ -536,8 +536,8 @@ class InMemDiskArrayBuilder {
diskArray.resize(newNumElements, setToZero);
}

inline uint64_t getNumElements() { return diskArray.getNumElements(); }
inline uint64_t size() { return diskArray.getNumElements(); }
inline uint64_t getNumElements() const { return diskArray.getNumElements(); }
inline uint64_t size() const { return diskArray.getNumElements(); }

inline void saveToDisk() { diskArray.saveToDisk(); }

Expand Down
80 changes: 51 additions & 29 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,37 @@ void HashIndex<T>::prepareCommit() {
}
}

template<typename T>
// Checks that everything in the bulkInsertLocalStorage can be found in the index
// Only meaningful if called after mergeBulkInserts and before checkpointing.
void HashIndex<T>::validateBulkInserts() {
for (uint64_t slotId = 0; slotId < bulkInsertLocalStorage.pSlots.size(); slotId++) {
auto localSlot =
typename HashIndexBuilder<T>::SlotIterator(slotId, &bulkInsertLocalStorage);
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!localSlot.slot->header.isEntryValid(entryPos)) {
continue;
}
offset_t result;
if constexpr (std::same_as<T, ku_string_t>) {
auto string = overflowFileHandle->readString(
TransactionType::WRITE, localSlot.slot->entries[entryPos].key);
if (!lookupInPersistentIndex(TransactionType::WRITE, string, result)) {
KU_ASSERT(false);
}
} else {
if (!lookupInPersistentIndex(
TransactionType::WRITE, localSlot.slot->entries[entryPos].key, result)) {
KU_ASSERT(false);
}
}
KU_ASSERT(result == localSlot.slot->entries[entryPos].value);
}
while (bulkInsertLocalStorage.nextChainedSlot(localSlot))
;
}
}

template<typename T>
void HashIndex<T>::prepareRollback() {
if (localStorage->hasUpdates()) {
Expand Down Expand Up @@ -352,9 +383,7 @@ void HashIndex<T>::rehashSlots(HashIndexHeader& header) {
hash_t hash = this->hashStored(TransactionType::WRITE, key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hash);
auto newSlotId = hash & header.higherLevelHashMask;
if (newSlotId >= pSlots->getNumElements(TransactionType::WRITE)) {
KU_ASSERT(false);
}
KU_ASSERT(newSlotId < pSlots->getNumElements(TransactionType::WRITE));
copyEntryToSlot(newSlotId, key, fingerprint);
}
}
Expand Down Expand Up @@ -455,10 +484,6 @@ void HashIndex<T>::mergeBulkInserts() {
// those one at a time into the disk slots. That will keep the low memory requirements and still
// let us update each on-disk slot one at a time.

// TODO: Also use an iterator for the overflow disk slot in case they are adjacent. It should be
// possible to support backwards seeking That will also handle updates, but it may be difficult
// to also handle appends (ideally we can append to the current cached frame if possible, and
// update the header just once at the end in the destructor instead of after each pushback
// TODO: This will write to the WAL file for every slot visited
// If the primary slot is already full, it will not need to be written,
// but we would only want to skip the write if no primary slots on the page are modified
Expand All @@ -479,47 +504,44 @@ void HashIndex<T>::mergeBulkInserts() {
// builder
continue;
}
// Skip slots which were empty in the local storage
diskSlotIterator.seek(slotId);
slot_id_t diskEntryPos = 0u;
auto& diskSlot = *diskSlotIterator;
auto* diskSlot = &*diskSlotIterator;
// Merge slot from local storage to existing slot
// Values which need to be rehashed to a different slot get moved into local storage
do {
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (localSlot.slot->header.isEntryValid(entryPos)) {
// Find the next empty entry, or add a new slot if there are no more entries
while (diskSlot.header.isEntryValid(diskEntryPos) ||
while (diskSlot->header.isEntryValid(diskEntryPos) ||
diskEntryPos >= getSlotCapacity<T>()) {
diskEntryPos++;
if (diskEntryPos >= getSlotCapacity<T>()) {
// If there are no more disk slots in this chain, we need to add one
// To avoid updating the slot twice, use the current number of overflow
// slots as the next index to be added.
if (diskSlot.header.nextOvfSlotId == 0) {
// If it's a new disk slot, then it will be written to the end of
// the overflow slots and we want the following slot id as the next
// slot id
diskSlot.header.nextOvfSlotId = diskOverflowSlotIterator.size();
if (diskSlot->header.nextOvfSlotId == 0) {
// If there are no more disk slots in this chain, we need to add one
diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size();
// This may invalidate diskSlot
diskOverflowSlotIterator.pushBack();
} else {
diskOverflowSlotIterator.seek(diskSlot.header.nextOvfSlotId);
diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId);
}
diskSlot = *diskOverflowSlotIterator;
diskSlot = &*diskOverflowSlotIterator;
diskEntryPos = 0;
}
}
KU_ASSERT(diskEntryPos < getSlotCapacity<T>());
copyAndUpdateSlotHeader<const T&, true>(diskSlot, diskEntryPos,
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos,
localSlot.slot->entries[entryPos].key, UINT32_MAX,
localSlot.slot->header.fingerprints[entryPos]);
/*auto hash =
hashStored(TransactionType::WRITE, localSlot.slot->entries[entryPos].key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
if (primarySlot != slotId) {
KU_ASSERT(false);
}*/
KU_ASSERT([&]() {
auto hash = hashStored(
TransactionType::WRITE, localSlot.slot->entries[entryPos].key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
KU_ASSERT(localSlot.slot->header.fingerprints[entryPos] ==
HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == slotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
diskEntryPos++;
}
Expand Down
39 changes: 39 additions & 0 deletions src/storage/index/hash_index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,45 @@ bool HashIndexBuilder<ku_string_t>::equals(
}
}

template<typename T>
void HashIndexBuilder<T>::forEach(std::function<void(slot_id_t, uint8_t, SlotEntry<T>)> func) {
for (auto slotId = 0u; slotId < pSlots.size(); slotId++) {
auto iter = SlotIterator{slotId, const_cast<HashIndexBuilder<T>*>(this)};
do {
if (!iter.slot->header.validityMask) {
continue;
}
for (auto entryPos = 0; entryPos < getSlotCapacity<T>(); entryPos++) {
if (iter.slot->header.isEntryValid(entryPos)) {
func(slotId, iter.slot->header.fingerprints[entryPos],
iter.slot->entries[entryPos]);
}
}
} while (nextChainedSlot(iter));
}
}

template<typename T>
std::string HashIndexBuilder<T>::toString() {
std::string result;
std::optional<slot_id_t> currentSlotId;
forEach([&](slot_id_t slotId, uint8_t, SlotEntry<T> entry) {
if (slotId != currentSlotId) {
result += "\nSlot " + std::to_string(slotId) + ": ";
currentSlotId = slotId;
}
if constexpr (std::same_as<T, ku_string_t>) {
auto str =
overflowFileHandle->readString(transaction::TransactionType::WRITE, entry.key);
result += "(" + str + ", ";
} else {
result += "(" + TypeUtils::toString(entry.key) + ", ";
}
result += std::to_string(entry.value) + ") ";
});
return result;
}

template class HashIndexBuilder<int64_t>;
template class HashIndexBuilder<int32_t>;
template class HashIndexBuilder<int16_t>;
Expand Down

0 comments on commit 6b4d510

Please sign in to comment.