-
Notifications
You must be signed in to change notification settings - Fork 94
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use a lockfree data structure to store page states
- Loading branch information
1 parent
3b0ce25
commit fa40928
Showing
3 changed files
with
128 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
#pragma once | ||
|
||
#include <array> | ||
#include <cstdint> | ||
#include <memory> | ||
#include <vector> | ||
|
||
#include "common/assert.h" | ||
#include "common/constants.h" | ||
|
||
namespace kuzu { | ||
namespace common { | ||
|
||
template<typename T, uint64_t BLOCK_SIZE = DEFAULT_VECTOR_CAPACITY, | ||
uint64_t INDEX_SIZE = BLOCK_SIZE> | ||
// Vector which doesn't move when resizing | ||
// The initial size is fixed, and new elements are added in fixed sized blocks which indexed and the | ||
// indices are chained in a linked list. Currently only one thread can write concurrently, but any | ||
// number of threads can read, even when the vector is being written to. | ||
// | ||
// Accessing elements which existed when the vector was created is as fast as possible, requiring | ||
// just one comparison and one pointer reads, and accessing new elements is still reasonably fast, | ||
// usually requiring reading just two pointers, with a small amount of arithmetic, or maybe more if | ||
// an extremely large number of elements has been added | ||
// (access cost increases every BLOCK_SIZE * INDEX_SIZE elements). | ||
class ConcurrentVector { | ||
public: | ||
explicit ConcurrentVector(uint64_t initialNumElements, uint64_t initialBlockSize) | ||
: numElements{initialNumElements}, initialBlock{std::make_unique<T[]>(initialBlockSize)}, | ||
initialBlockSize{initialBlockSize}, firstIndex{nullptr} {} | ||
// resize never deallocates memory | ||
// Not thread-safe | ||
// It could be made to be thread-safe by storing the size atomically and doing compare and swap | ||
// when adding new indices and blocks | ||
void resize(uint64_t newSize) { | ||
while (newSize > initialBlockSize + blocks.size() * BLOCK_SIZE) { | ||
auto newBlock = std::make_unique<Block>(); | ||
if (indices.empty()) { | ||
auto index = std::make_unique<BlockIndex>(); | ||
index->blocks[0] = newBlock.get(); | ||
index->numBlocks = 1; | ||
firstIndex = index.get(); | ||
indices.push_back(std::move(index)); | ||
} else if (indices.back()->numBlocks < INDEX_SIZE) { | ||
auto& index = indices.back(); | ||
index->blocks[index->numBlocks] = newBlock.get(); | ||
index->numBlocks++; | ||
} else { | ||
KU_ASSERT(indices.back()->numBlocks == INDEX_SIZE); | ||
auto index = std::make_unique<BlockIndex>(); | ||
index->blocks[0] = newBlock.get(); | ||
index->numBlocks = 1; | ||
indices.back()->nextIndex = index.get(); | ||
indices.push_back(std::move(index)); | ||
} | ||
blocks.push_back(std::move(newBlock)); | ||
} | ||
numElements = newSize; | ||
} | ||
|
||
void push_back(T&& value) { | ||
auto index = numElements; | ||
resize(numElements + 1); | ||
(*this)[index] = std::move(value); | ||
} | ||
|
||
T& operator[](uint64_t elemPos) { | ||
if (elemPos < initialBlockSize) { | ||
return initialBlock[elemPos]; | ||
} else { | ||
auto blockNum = (elemPos - initialBlockSize) / BLOCK_SIZE; | ||
auto posInBlock = (elemPos - initialBlockSize) % BLOCK_SIZE; | ||
auto indexNum = blockNum / INDEX_SIZE; | ||
BlockIndex* index = firstIndex; | ||
KU_ASSERT(index != nullptr); | ||
while (indexNum > 0) { | ||
KU_ASSERT(index->nextIndex != nullptr); | ||
index = index->nextIndex; | ||
indexNum--; | ||
} | ||
return index->blocks[blockNum % INDEX_SIZE]->data[posInBlock]; | ||
} | ||
} | ||
|
||
uint64_t size() { return numElements; } | ||
|
||
private: | ||
uint64_t numElements; | ||
std::unique_ptr<T[]> initialBlock; | ||
uint64_t initialBlockSize; | ||
struct Block { | ||
std::array<T, BLOCK_SIZE> data; | ||
}; | ||
struct BlockIndex { | ||
BlockIndex() : nextIndex{nullptr}, blocks{}, numBlocks{0} {} | ||
BlockIndex* nextIndex; | ||
std::array<Block*, INDEX_SIZE> blocks; | ||
uint64_t numBlocks; | ||
}; | ||
BlockIndex* firstIndex; | ||
std::vector<std::unique_ptr<Block>> blocks; | ||
std::vector<std::unique_ptr<BlockIndex>> indices; | ||
}; | ||
|
||
} // namespace common | ||
} // namespace kuzu |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters