diff --git a/google/cloud/internal/default_completion_queue_impl.cc b/google/cloud/internal/default_completion_queue_impl.cc index 84d615ba27025..0cfd08aecb703 100644 --- a/google/cloud/internal/default_completion_queue_impl.cc +++ b/google/cloud/internal/default_completion_queue_impl.cc @@ -64,12 +64,25 @@ class AsyncTimerFuture : public AsyncGrpcOperation { void Set(grpc::CompletionQueue& cq, std::chrono::system_clock::time_point deadline, void* tag) { + std::unique_lock lk(mu_); + if (state_ != kIdle) return; deadline_ = deadline; + state_ = kSet; alarm_.Set(&cq, deadline, tag); } void Cancel() override { ScopedCallContext scope(call_context_); + std::unique_lock lk(mu_); + if (state_ == kCancelled) return; + if (state_ == kIdle) { + state_ = kCancelled; + lk.unlock(); + // Release the lock before (potentially) calling application code. + promise_.set_value(Cancelled()); + return; + } + state_ = kCancelled; alarm_.Cancel(); } @@ -78,18 +91,22 @@ class AsyncTimerFuture : public AsyncGrpcOperation { bool Notify(bool ok) override { ScopedCallContext scope(call_context_); - promise_.set_value(ok ? ValueType(deadline_) : Canceled()); + promise_.set_value(ok ? ValueType(deadline_) : Cancelled()); return true; } - static ValueType Canceled() { + static ValueType Cancelled() { return Status{StatusCode::kCancelled, "timer canceled"}; } + enum State { kIdle, kSet, kCancelled }; + promise promise_; std::chrono::system_clock::time_point deadline_; - grpc::Alarm alarm_; CallContext call_context_; + std::mutex mu_; + State state_ = kIdle; + grpc::Alarm alarm_; }; } // namespace @@ -190,11 +207,9 @@ void DefaultCompletionQueueImpl::Run() { } void DefaultCompletionQueueImpl::Shutdown() { - { - std::lock_guard lk(mu_); - shutdown_ = true; - shutdown_guard_.reset(); - } + std::lock_guard lk(mu_); + shutdown_ = true; + shutdown_guard_.reset(); } void DefaultCompletionQueueImpl::CancelAll() {