Skip to content

Commit

Permalink
Change hash index concurrent build
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed May 24, 2023
1 parent ad3741e commit 2e486f2
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/binder/bound_statement_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace binder {

std::unique_ptr<BoundStatementResult> BoundStatementResult::createSingleStringColumnResult() {
auto result = std::make_unique<BoundStatementResult>();
auto columnName = std::string("outputMsg");
auto columnName = std::string("result");
auto value = std::make_unique<common::Value>(columnName);
auto stringColumn = std::make_shared<LiteralExpression>(std::move(value), columnName);
result->addColumn(stringColumn, expression_vector{stringColumn});
Expand Down
16 changes: 5 additions & 11 deletions src/include/storage/index/hash_index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,6 @@ class HashIndexBuilder : public BaseHashIndex {
uint32_t allocatePSlots(uint32_t numSlotsToAllocate);
uint32_t allocateAOSlot();

inline void lockSlot(SlotInfo& slotInfo) {
assert(slotInfo.slotType == SlotType::PRIMARY);
std::shared_lock sLck{pSlotSharedMutex};
pSlotsMutexes[slotInfo.slotId]->lock();
}
inline void unlockSlot(const SlotInfo& slotInfo) {
assert(slotInfo.slotType == SlotType::PRIMARY);
std::shared_lock sLck{pSlotSharedMutex};
pSlotsMutexes[slotInfo.slotId]->unlock();
}

private:
std::unique_ptr<FileHandle> fileHandle;
std::unique_ptr<InMemDiskArrayBuilder<HashIndexHeader>> headerArray;
Expand Down Expand Up @@ -151,6 +140,10 @@ class PrimaryKeyIndexBuilder {
}
}

inline void lock() { mtx.lock(); }

inline void unlock() { mtx.unlock(); }

inline void bulkReserve(uint32_t numEntries) {
keyDataTypeID == common::LogicalTypeID::INT64 ?
hashIndexBuilderForInt64->bulkReserve(numEntries) :
Expand Down Expand Up @@ -189,6 +182,7 @@ class PrimaryKeyIndexBuilder {
}

private:
std::mutex mtx;
common::LogicalTypeID keyDataTypeID;
std::unique_ptr<HashIndexBuilder<int64_t>> hashIndexBuilderForInt64;
std::unique_ptr<HashIndexBuilder<common::ku_string_t>> hashIndexBuilderForString;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove
}
}
// No nulls, so we can populate the index with actual values.
pkIndex->lock();
switch (chunk->getDataType().getLogicalTypeID()) {
case LogicalTypeID::INT64: {
appendToPKIndex<int64_t>(chunk, startOffset, numValues);
Expand All @@ -112,6 +113,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove
throw CopyException("Primary key must be either INT64, STRING or SERIAL.");
}
}
pkIndex->unlock();
}

template<>
Expand Down
2 changes: 0 additions & 2 deletions src/storage/copier/node_copy_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ static column_id_t getPKColumnID(
}

void NodeCopyExecutor::populateColumns(processor::ExecutionContext* executionContext) {
logger->info("Populating properties");
auto primaryKey = reinterpret_cast<NodeTableSchema*>(tableSchema)->getPrimaryKey();
std::unique_ptr<PrimaryKeyIndexBuilder> pkIndex;
if (primaryKey.dataType.getLogicalTypeID() != common::LogicalTypeID::SERIAL) {
Expand Down Expand Up @@ -80,7 +79,6 @@ void NodeCopyExecutor::populateColumns(processor::ExecutionContext* executionCon
for (auto& task : tasks) {
taskScheduler.scheduleTaskAndWaitOrError(task, executionContext);
}
logger->info("Done populating properties, constructing the pk index.");
}

} // namespace storage
Expand Down
11 changes: 0 additions & 11 deletions src/storage/index/hash_index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,10 @@ bool HashIndexBuilder<T>::appendInternal(const uint8_t* key, offset_t value) {
SlotInfo pSlotInfo{getPrimarySlotIdForKey(*indexHeader, key), SlotType::PRIMARY};
auto currentSlotInfo = pSlotInfo;
Slot<T>* currentSlot = nullptr;
lockSlot(pSlotInfo);
while (currentSlotInfo.slotType == SlotType::PRIMARY || currentSlotInfo.slotId != 0) {
currentSlot = getSlot(currentSlotInfo);
if (lookupOrExistsInSlotWithoutLock<false /* exists */>(currentSlot, key)) {
// Key already exists. No append is allowed.
unlockSlot(pSlotInfo);
return false;
}
if (currentSlot->header.numEntries < HashIndexConstants::SLOT_CAPACITY) {
Expand All @@ -78,7 +76,6 @@ bool HashIndexBuilder<T>::appendInternal(const uint8_t* key, offset_t value) {
}
assert(currentSlot);
insertToSlotWithoutLock(currentSlot, key, value);
unlockSlot(pSlotInfo);
numEntries.fetch_add(1);
return true;
}
Expand All @@ -101,20 +98,14 @@ bool HashIndexBuilder<T>::lookupInternalWithoutLock(const uint8_t* key, offset_t

template<typename T>
uint32_t HashIndexBuilder<T>::allocatePSlots(uint32_t numSlotsToAllocate) {
std::unique_lock xLock{pSlotSharedMutex};
auto oldNumSlots = pSlots->getNumElements();
auto newNumSlots = oldNumSlots + numSlotsToAllocate;
pSlots->resize(newNumSlots, true /* setToZero */);
pSlotsMutexes.resize(newNumSlots);
for (auto i = oldNumSlots; i < newNumSlots; i++) {
pSlotsMutexes[i] = std::make_unique<std::mutex>();
}
return oldNumSlots;
}

template<typename T>
uint32_t HashIndexBuilder<T>::allocateAOSlot() {
std::unique_lock xLock{oSlotsSharedMutex};
auto oldNumSlots = oSlots->getNumElements();
auto newNumSlots = oldNumSlots + 1;
oSlots->resize(newNumSlots, true /* setToZero */);
Expand All @@ -124,10 +115,8 @@ uint32_t HashIndexBuilder<T>::allocateAOSlot() {
template<typename T>
Slot<T>* HashIndexBuilder<T>::getSlot(const SlotInfo& slotInfo) {
if (slotInfo.slotType == SlotType::PRIMARY) {
std::shared_lock sLck{pSlotSharedMutex};
return &pSlots->operator[](slotInfo.slotId);
} else {
std::shared_lock sLck{oSlotsSharedMutex};
return &oSlots->operator[](slotInfo.slotId);
}
}
Expand Down

0 comments on commit 2e486f2

Please sign in to comment.