Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement query interruption #1389

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,10 @@ class TransactionManagerException : public Exception {
explicit TransactionManagerException(const std::string& msg) : Exception(msg){};
};

class InterruptException : public Exception {
public:
explicit InterruptException() : Exception("Interrupted by the user."){};
};

} // namespace common
} // namespace kuzu
7 changes: 7 additions & 0 deletions src/include/main/client_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <atomic>
#include <cstdint>

#include "common/api.h"
Expand All @@ -24,8 +25,14 @@ class ClientContext {
*/
KUZU_API ~ClientContext() = default;

/**
* @brief Returns whether the current query is interrupted or not.
*/
KUZU_API bool isInterrupted() const { return interrupted; }
acquamarin marked this conversation as resolved.
Show resolved Hide resolved

private:
uint64_t numThreadsForExecution;
std::atomic<bool> interrupted;
};

} // namespace main
Expand Down
11 changes: 11 additions & 0 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ class Connection {
// TODO(Change): move this to C-header once we have C-APIs.
KUZU_API std::unique_ptr<QueryResult> kuzu_query(const char* queryString);

/**
* @brief interrupts all queries currently executed within this connection.
*/
KUZU_API void interrupt();

protected:
ConnectionTransactionMode getTransactionMode();
void setTransactionModeNoLock(ConnectionTransactionMode newTransactionMode);
Expand Down Expand Up @@ -179,6 +184,12 @@ class Connection {

void beginTransactionIfAutoCommit(PreparedStatement* preparedStatement);

private:
inline std::unique_ptr<QueryResult> getQueryResultWithError(std::string exceptionMessage) {
rollbackIfNecessaryNoLock();
return queryResultWithError(exceptionMessage);
}

protected:
Database* database;
std::unique_ptr<ClientContext> clientContext;
Expand Down
7 changes: 5 additions & 2 deletions src/include/processor/execution_context.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/profiler.h"
#include "main/client_context.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/buffer_manager/memory_manager.h"
#include "transaction/transaction.h"
Expand All @@ -11,16 +12,18 @@ namespace processor {
struct ExecutionContext {

ExecutionContext(uint64_t numThreads, common::Profiler* profiler,
storage::MemoryManager* memoryManager, storage::BufferManager* bufferManager)
storage::MemoryManager* memoryManager, storage::BufferManager* bufferManager,
main::ClientContext* clientContext)
: numThreads{numThreads}, profiler{profiler}, memoryManager{memoryManager},
bufferManager{bufferManager}, transaction{nullptr} {}
bufferManager{bufferManager}, transaction{nullptr}, clientContext{clientContext} {}

uint64_t numThreads;
common::Profiler* profiler;
storage::MemoryManager* memoryManager;
storage::BufferManager* bufferManager;

transaction::Transaction* transaction;
main::ClientContext* clientContext;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BaseAggregateScan : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override = 0;
bool getNextTuplesInternal(ExecutionContext* context) override = 0;

std::unique_ptr<PhysicalOperator> clone() override = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HashAggregateScan : public BaseAggregateScan {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<HashAggregateScan>(sharedState, groupByKeyVectorsPos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SimpleAggregateScan : public BaseAggregateScan {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

// SimpleAggregateScan is the source operator of a pipeline, so it should not clone its child.
std::unique_ptr<PhysicalOperator> clone() override {
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Copy : public PhysicalOperator {

std::string execute(common::TaskScheduler* taskScheduler, ExecutionContext* executionContext);

bool getNextTuplesInternal() override {
bool getNextTuplesInternal(ExecutionContext* context) override {
throw common::InternalException(
"getNextTupleInternal() should not be called on CopyCSV operator.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/cross_product.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class CrossProduct : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CrossProduct>(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DDL : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

protected:
virtual std::string getOutputMsg() = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Filter : public PhysicalOperator, public SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Filter>(expressionEvaluator->clone(), dataChunkToSelectPos,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Flatten : public PhysicalOperator, SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Flatten>(dataChunkToFlattenPos, children[0]->clone(), id, paramsString);
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class HashJoinProbe : public PhysicalOperator, SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<HashJoinProbe>(
Expand All @@ -76,7 +76,7 @@ class HashJoinProbe : public PhysicalOperator, SelVectorOverWriter {

private:
bool hasMoreLeft();
bool getNextBatchOfMatchedTuples();
bool getNextBatchOfMatchedTuples(ExecutionContext* context);
uint64_t getNextInnerJoinResult();
uint64_t getNextLeftJoinResult();
uint64_t getNextMarkJoinResult();
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class IndexScan : public PhysicalOperator, public SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<IndexScan>(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/intersect/intersect.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Intersect : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<PhysicalOperator>> clonedChildren;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/limit.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Limit : public PhysicalOperator {
dataChunkToSelectPos{dataChunkToSelectPos},
dataChunksPosInScope(std::move(dataChunksPosInScope)) {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Limit>(limitNumber, counter, dataChunkToSelectPos, dataChunksPosInScope,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/multiplicity_reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MultiplicityReducer : public PhysicalOperator {
paramsString},
prevMultiplicity{1}, numRepeat{0} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<MultiplicityReducer>(children[0]->clone(), id, paramsString);
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class OrderByScan : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<OrderByScan>(outVectorPos, sharedState, id, paramsString);
Expand Down
9 changes: 6 additions & 3 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ class PhysicalOperator {
// Local state is initialized for each thread.
void initLocalState(ResultSet* resultSet, ExecutionContext* context);

inline bool getNextTuple() {
inline bool getNextTuple(ExecutionContext* context) {
if (context->clientContext->isInterrupted()) {
throw common::InterruptException{};
}
metrics->executionTime.start();
auto result = getNextTuplesInternal();
auto result = getNextTuplesInternal(context);
metrics->executionTime.stop();
return result;
}
Expand All @@ -129,7 +132,7 @@ class PhysicalOperator {
virtual void initGlobalStateInternal(ExecutionContext* context) {}
virtual void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) {}
// Return false if no more tuples to pull, otherwise return true
virtual bool getNextTuplesInternal() = 0;
virtual bool getNextTuplesInternal(ExecutionContext* context) = 0;

inline std::string getTimeMetricKey() const { return "time-" + std::to_string(id); }
inline std::string getNumTupleMetricKey() const { return "numTuple-" + std::to_string(id); }
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/projection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Projection : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GenericScanRelTables : public ScanRelTable {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableCollection>>
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ScanSingleNodeTable : public ScanColumns {
paramsString},
table{table}, propertyColumnIds{std::move(propertyColumnIds)} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanSingleNodeTable>(inputNodeIDVectorPos, outPropertyVectorsPos, table,
Expand All @@ -39,7 +39,7 @@ class ScanMultiNodeTables : public ScanColumns {
paramsString},
tables{std::move(tables)}, tableIDToScanColumnIds{std::move(tableIDToScanColumnIds)} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanMultiNodeTables>(inputNodeIDVectorPos, outPropertyVectorsPos, tables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ScanRelTableColumns : public ScanRelTable, public SelVectorOverWriter {
std::move(propertyIds), storage::RelTableDataType::COLUMNS);
}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTableColumns>(tableData, scanState->propertyIds,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/scan/scan_rel_table_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ScanRelTableLists : public ScanRelTable {
std::move(propertyIds), storage::RelTableDataType::LISTS);
}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanRelTableLists>(tableData, scanState->propertyIds, inNodeIDVectorPos,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/scan_node_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ScanNodeID : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanNodeID>(outDataPos, sharedState, id, paramsString);
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/semi_masker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SingleTableSemiMasker : public BaseSemiMasker {

void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
auto result = std::make_unique<SingleTableSemiMasker>(
Expand All @@ -56,7 +56,7 @@ class MultiTableSemiMasker : public BaseSemiMasker {

void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
auto result = std::make_unique<MultiTableSemiMasker>(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Sink : public PhysicalOperator {
protected:
virtual void executeInternal(ExecutionContext* context) = 0;

bool getNextTuplesInternal() final {
bool getNextTuplesInternal(ExecutionContext* context) final {
throw common::InternalException(
"getNextTupleInternal() should not be called on sink operator.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/skip.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Skip : public PhysicalOperator, public SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Skip>(skipNumber, counter, dataChunkToSelectPos, dataChunksPosInScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class BaseTableScan : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

protected:
uint64_t maxMorselSize;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/unwind.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Unwind : public PhysicalOperator {
outDataType{std::move(outDataType)}, outDataPos{outDataPos},
expressionEvaluator{std::move(expressionEvaluator)}, startIndex{0u} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/update/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CreateNode : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<CreateNodeInfo>> clonedCreateNodeInfos;
Expand Down Expand Up @@ -88,7 +88,7 @@ class CreateRel : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<CreateRelInfo>> clonedCreateRelInfos;
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/update/delete.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DeleteNode : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<DeleteNodeInfo>> clonedDeleteNodeInfos;
Expand Down Expand Up @@ -77,7 +77,7 @@ class DeleteRel : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<DeleteRelInfo>> clonedDeleteRelInfos;
Expand Down
Loading