Skip to content

Commit

Permalink
Merge pull request #1389 from kuzudb/interrupt
Browse files Browse the repository at this point in the history
Implement query interruption
  • Loading branch information
acquamarin committed Mar 21, 2023
2 parents 93dda97 + f046975 commit b5805d6
Show file tree
Hide file tree
Showing 76 changed files with 182 additions and 124 deletions.
5 changes: 5 additions & 0 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,10 @@ class TransactionManagerException : public Exception {
explicit TransactionManagerException(const std::string& msg) : Exception(msg){};
};

class InterruptException : public Exception {
public:
explicit InterruptException() : Exception("Interrupted by the user."){};
};

} // namespace common
} // namespace kuzu
7 changes: 7 additions & 0 deletions src/include/main/client_context.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <atomic>
#include <cstdint>

#include "common/api.h"
Expand All @@ -24,8 +25,14 @@ class ClientContext {
*/
KUZU_API ~ClientContext() = default;

/**
* @brief Returns whether the current query is interrupted or not.
*/
KUZU_API bool isInterrupted() const { return interrupted; }

private:
uint64_t numThreadsForExecution;
std::atomic<bool> interrupted;
};

} // namespace main
Expand Down
11 changes: 11 additions & 0 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ class Connection {
// TODO(Change): move this to C-header once we have C-APIs.
KUZU_API std::unique_ptr<QueryResult> kuzu_query(const char* queryString);

/**
* @brief interrupts all queries currently executed within this connection.
*/
KUZU_API void interrupt();

protected:
ConnectionTransactionMode getTransactionMode();
void setTransactionModeNoLock(ConnectionTransactionMode newTransactionMode);
Expand Down Expand Up @@ -179,6 +184,12 @@ class Connection {

void beginTransactionIfAutoCommit(PreparedStatement* preparedStatement);

private:
inline std::unique_ptr<QueryResult> getQueryResultWithError(std::string exceptionMessage) {
rollbackIfNecessaryNoLock();
return queryResultWithError(exceptionMessage);
}

protected:
Database* database;
std::unique_ptr<ClientContext> clientContext;
Expand Down
7 changes: 5 additions & 2 deletions src/include/processor/execution_context.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "common/profiler.h"
#include "main/client_context.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/buffer_manager/memory_manager.h"
#include "transaction/transaction.h"
Expand All @@ -11,16 +12,18 @@ namespace processor {
struct ExecutionContext {

ExecutionContext(uint64_t numThreads, common::Profiler* profiler,
storage::MemoryManager* memoryManager, storage::BufferManager* bufferManager)
storage::MemoryManager* memoryManager, storage::BufferManager* bufferManager,
main::ClientContext* clientContext)
: numThreads{numThreads}, profiler{profiler}, memoryManager{memoryManager},
bufferManager{bufferManager}, transaction{nullptr} {}
bufferManager{bufferManager}, transaction{nullptr}, clientContext{clientContext} {}

uint64_t numThreads;
common::Profiler* profiler;
storage::MemoryManager* memoryManager;
storage::BufferManager* bufferManager;

transaction::Transaction* transaction;
main::ClientContext* clientContext;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BaseAggregateScan : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override = 0;
bool getNextTuplesInternal(ExecutionContext* context) override = 0;

std::unique_ptr<PhysicalOperator> clone() override = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HashAggregateScan : public BaseAggregateScan {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<HashAggregateScan>(sharedState, groupByKeyVectorsPos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SimpleAggregateScan : public BaseAggregateScan {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

// SimpleAggregateScan is the source operator of a pipeline, so it should not clone its child.
std::unique_ptr<PhysicalOperator> clone() override {
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Copy : public PhysicalOperator {

std::string execute(common::TaskScheduler* taskScheduler, ExecutionContext* executionContext);

bool getNextTuplesInternal() override {
bool getNextTuplesInternal(ExecutionContext* context) override {
throw common::InternalException(
"getNextTupleInternal() should not be called on CopyCSV operator.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/cross_product.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class CrossProduct : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CrossProduct>(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class DDL : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

protected:
virtual std::string getOutputMsg() = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Filter : public PhysicalOperator, public SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Filter>(expressionEvaluator->clone(), dataChunkToSelectPos,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Flatten : public PhysicalOperator, SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Flatten>(dataChunkToFlattenPos, children[0]->clone(), id, paramsString);
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/hash_join/hash_join_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class HashJoinProbe : public PhysicalOperator, SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<HashJoinProbe>(
Expand All @@ -76,7 +76,7 @@ class HashJoinProbe : public PhysicalOperator, SelVectorOverWriter {

private:
bool hasMoreLeft();
bool getNextBatchOfMatchedTuples();
bool getNextBatchOfMatchedTuples(ExecutionContext* context);
uint64_t getNextInnerJoinResult();
uint64_t getNextLeftJoinResult();
uint64_t getNextMarkJoinResult();
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class IndexScan : public PhysicalOperator, public SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<IndexScan>(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/intersect/intersect.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Intersect : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<PhysicalOperator>> clonedChildren;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/limit.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Limit : public PhysicalOperator {
dataChunkToSelectPos{dataChunkToSelectPos},
dataChunksPosInScope(std::move(dataChunksPosInScope)) {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Limit>(limitNumber, counter, dataChunkToSelectPos, dataChunksPosInScope,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/multiplicity_reducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MultiplicityReducer : public PhysicalOperator {
paramsString},
prevMultiplicity{1}, numRepeat{0} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<MultiplicityReducer>(children[0]->clone(), id, paramsString);
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/order_by/order_by_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class OrderByScan : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<OrderByScan>(outVectorPos, sharedState, id, paramsString);
Expand Down
9 changes: 6 additions & 3 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,12 @@ class PhysicalOperator {
// Local state is initialized for each thread.
void initLocalState(ResultSet* resultSet, ExecutionContext* context);

inline bool getNextTuple() {
inline bool getNextTuple(ExecutionContext* context) {
if (context->clientContext->isInterrupted()) {
throw common::InterruptException{};
}
metrics->executionTime.start();
auto result = getNextTuplesInternal();
auto result = getNextTuplesInternal(context);
metrics->executionTime.stop();
return result;
}
Expand All @@ -129,7 +132,7 @@ class PhysicalOperator {
virtual void initGlobalStateInternal(ExecutionContext* context) {}
virtual void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) {}
// Return false if no more tuples to pull, otherwise return true
virtual bool getNextTuplesInternal() = 0;
virtual bool getNextTuplesInternal(ExecutionContext* context) = 0;

inline std::string getTimeMetricKey() const { return "time-" + std::to_string(id); }
inline std::string getNumTupleMetricKey() const { return "numTuple-" + std::to_string(id); }
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/projection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Projection : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GenericScanRelTables : public ScanRelTable {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableCollection>>
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/scan/scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ScanSingleNodeTable : public ScanColumns {
paramsString},
table{table}, propertyColumnIds{std::move(propertyColumnIds)} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanSingleNodeTable>(inputNodeIDVectorPos, outPropertyVectorsPos, table,
Expand All @@ -39,7 +39,7 @@ class ScanMultiNodeTables : public ScanColumns {
paramsString},
tables{std::move(tables)}, tableIDToScanColumnIds{std::move(tableIDToScanColumnIds)} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanMultiNodeTables>(inputNodeIDVectorPos, outPropertyVectorsPos, tables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ScanRelTableColumns : public ScanRelTable, public SelVectorOverWriter {
std::move(propertyIds), storage::RelTableDataType::COLUMNS);
}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanRelTableColumns>(tableData, scanState->propertyIds,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/scan/scan_rel_table_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ScanRelTableLists : public ScanRelTable {
std::move(propertyIds), storage::RelTableDataType::LISTS);
}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<ScanRelTableLists>(tableData, scanState->propertyIds, inNodeIDVectorPos,
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/scan_node_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ScanNodeID : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ScanNodeID>(outDataPos, sharedState, id, paramsString);
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/semi_masker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SingleTableSemiMasker : public BaseSemiMasker {

void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
auto result = std::make_unique<SingleTableSemiMasker>(
Expand All @@ -56,7 +56,7 @@ class MultiTableSemiMasker : public BaseSemiMasker {

void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
auto result = std::make_unique<MultiTableSemiMasker>(
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Sink : public PhysicalOperator {
protected:
virtual void executeInternal(ExecutionContext* context) = 0;

bool getNextTuplesInternal() final {
bool getNextTuplesInternal(ExecutionContext* context) final {
throw common::InternalException(
"getNextTupleInternal() should not be called on sink operator.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/skip.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Skip : public PhysicalOperator, public SelVectorOverWriter {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<Skip>(skipNumber, counter, dataChunkToSelectPos, dataChunksPosInScope,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class BaseTableScan : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

protected:
uint64_t maxMorselSize;
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/unwind.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Unwind : public PhysicalOperator {
outDataType{std::move(outDataType)}, outDataPos{outDataPos},
expressionEvaluator{std::move(expressionEvaluator)}, startIndex{0u} {}

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/update/create.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CreateNode : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<CreateNodeInfo>> clonedCreateNodeInfos;
Expand Down Expand Up @@ -88,7 +88,7 @@ class CreateRel : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<CreateRelInfo>> clonedCreateRelInfos;
Expand Down
4 changes: 2 additions & 2 deletions src/include/processor/operator/update/delete.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DeleteNode : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<DeleteNodeInfo>> clonedDeleteNodeInfos;
Expand Down Expand Up @@ -77,7 +77,7 @@ class DeleteRel : public PhysicalOperator {

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;
bool getNextTuplesInternal(ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
std::vector<std::unique_ptr<DeleteRelInfo>> clonedDeleteRelInfos;
Expand Down
Loading

0 comments on commit b5805d6

Please sign in to comment.