-
Notifications
You must be signed in to change notification settings - Fork 94
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
processor: use queue-based index building
This also moves index building to its own file. Future work may move it to its own standalone operator. These changes break RDF tests, so they have been disabled. They cause higher memory usage, so LDBC and LSQB buffer pool sizes have been adjusted. They vastly increase the performance - ingesting 100 million integers from a parquet file with 64 threads takes about 90 seconds on master, but about 5 seconds with this change.
- Loading branch information
Showing
21 changed files
with
426 additions
and
151 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
src/include/processor/operator/persistent/index_builder.h
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
#pragma once | ||
|
||
#include <variant> | ||
|
||
#include "common/copy_constructors.h" | ||
#include "common/mpsc_queue.h" | ||
#include "common/static_vector.h" | ||
#include "common/types/internal_id_t.h" | ||
#include "common/types/types.h" | ||
#include "processor/execution_context.h" | ||
#include "storage/index/hash_index_builder.h" | ||
#include "storage/store/column_chunk.h" | ||
|
||
namespace kuzu { | ||
namespace processor { | ||
|
||
constexpr size_t BUFFER_SIZE = 1024; | ||
using IntBuffer = common::StaticVector<std::pair<int64_t, common::offset_t>, BUFFER_SIZE>; | ||
using StringBuffer = common::StaticVector<std::pair<std::string, common::offset_t>, BUFFER_SIZE>; | ||
|
||
class IndexBuilderGlobalQueues { | ||
public: | ||
explicit IndexBuilderGlobalQueues(std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex); | ||
|
||
void flushToDisk() const; | ||
|
||
void insert(size_t index, StringBuffer elem); | ||
void insert(size_t index, IntBuffer elem); | ||
|
||
void consume(); | ||
|
||
common::LogicalTypeID pkTypeID() const { return pkIndex->keyTypeID(); } | ||
|
||
private: | ||
void maybeConsumeIndex(size_t index); | ||
|
||
std::array<std::mutex, storage::NUM_HASH_INDEXES> mutexes; | ||
std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex; | ||
|
||
using StringQueues = std::array<common::MPSCQueue<StringBuffer>, storage::NUM_HASH_INDEXES>; | ||
using IntQueues = std::array<common::MPSCQueue<IntBuffer>, storage::NUM_HASH_INDEXES>; | ||
|
||
// Queues for distributing primary keys. | ||
std::variant<StringQueues, IntQueues> queues; | ||
}; | ||
|
||
class IndexBuilderLocalBuffers { | ||
public: | ||
explicit IndexBuilderLocalBuffers(IndexBuilderGlobalQueues& globalQueues); | ||
|
||
void insert(std::string key, common::offset_t value); | ||
void insert(int64_t key, common::offset_t value); | ||
|
||
void flush(); | ||
|
||
private: | ||
IndexBuilderGlobalQueues* globalQueues; | ||
|
||
// These arrays are much too large to be inline. | ||
using StringBuffers = std::array<StringBuffer, storage::NUM_HASH_INDEXES>; | ||
using IntBuffers = std::array<IntBuffer, storage::NUM_HASH_INDEXES>; | ||
std::unique_ptr<StringBuffers> stringBuffers; | ||
std::unique_ptr<IntBuffers> intBuffers; | ||
}; | ||
|
||
class IndexBuilderSharedState { | ||
friend class IndexBuilder; | ||
|
||
public: | ||
explicit IndexBuilderSharedState(std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex); | ||
void consume() { globalQueues.consume(); } | ||
void flush() { globalQueues.flushToDisk(); } | ||
|
||
void addProducer() { producers.fetch_add(1, std::memory_order_relaxed); } | ||
void quitProducer(); | ||
bool isDone() { return done.load(std::memory_order_relaxed); } | ||
|
||
private: | ||
IndexBuilderGlobalQueues globalQueues; | ||
|
||
std::atomic<size_t> producers; | ||
std::atomic<bool> done; | ||
}; | ||
|
||
// RAII for producer counting. | ||
class ProducerToken { | ||
public: | ||
explicit ProducerToken(std::shared_ptr<IndexBuilderSharedState> sharedState) | ||
: sharedState(std::move(sharedState)) { | ||
this->sharedState->addProducer(); | ||
} | ||
DELETE_COPY_DEFAULT_MOVE(ProducerToken); | ||
|
||
void quit() { | ||
sharedState->quitProducer(); | ||
sharedState.reset(); | ||
} | ||
~ProducerToken() { | ||
if (sharedState) { | ||
quit(); | ||
} | ||
} | ||
|
||
private: | ||
std::shared_ptr<IndexBuilderSharedState> sharedState; | ||
}; | ||
|
||
class IndexBuilder { | ||
explicit IndexBuilder(std::shared_ptr<IndexBuilderSharedState> sharedState); | ||
|
||
public: | ||
DELETE_COPY_DEFAULT_MOVE(IndexBuilder); | ||
explicit IndexBuilder(std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex); | ||
|
||
IndexBuilder clone() { return IndexBuilder(sharedState); } | ||
|
||
void initGlobalStateInternal(ExecutionContext* /*context*/) {} | ||
void initLocalStateInternal(ExecutionContext* /*context*/) {} | ||
void insert( | ||
storage::ColumnChunk* chunk, common::offset_t nodeOffset, common::offset_t numNodes); | ||
|
||
ProducerToken getProducerToken() const { return ProducerToken(sharedState); } | ||
|
||
void finishedProducing(); | ||
void finalize(ExecutionContext* context); | ||
|
||
private: | ||
void checkNonNullConstraint(storage::NullColumnChunk* nullChunk, common::offset_t numNodes); | ||
std::shared_ptr<IndexBuilderSharedState> sharedState; | ||
|
||
IndexBuilderLocalBuffers localBuffers; | ||
}; | ||
|
||
} // namespace processor | ||
} // namespace kuzu |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.