Skip to content

Commit

Permalink
fix(common): gRPC alarms require more locking (#12406)
Browse files Browse the repository at this point in the history
Turns out `grpc::Alarm` is not thread-safe. The documentation is about
to say so, and a future implementation will trigger more faults in our
code.
  • Loading branch information
coryan authored Aug 18, 2023
1 parent 62fd7f8 commit 88c38d3
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions google/cloud/internal/default_completion_queue_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,25 @@ class AsyncTimerFuture : public AsyncGrpcOperation {

void Set(grpc::CompletionQueue& cq,
std::chrono::system_clock::time_point deadline, void* tag) {
std::unique_lock<std::mutex> lk(mu_);
if (state_ != kIdle) return;
deadline_ = deadline;
state_ = kSet;
alarm_.Set(&cq, deadline, tag);
}

void Cancel() override {
ScopedCallContext scope(call_context_);
std::unique_lock<std::mutex> lk(mu_);
if (state_ == kCancelled) return;
if (state_ == kIdle) {
state_ = kCancelled;
lk.unlock();
// Release the lock before (potentially) calling application code.
promise_.set_value(Cancelled());
return;
}
state_ = kCancelled;
alarm_.Cancel();
}

Expand All @@ -78,18 +91,22 @@ class AsyncTimerFuture : public AsyncGrpcOperation {

bool Notify(bool ok) override {
ScopedCallContext scope(call_context_);
promise_.set_value(ok ? ValueType(deadline_) : Canceled());
promise_.set_value(ok ? ValueType(deadline_) : Cancelled());
return true;
}

static ValueType Canceled() {
static ValueType Cancelled() {
return Status{StatusCode::kCancelled, "timer canceled"};
}

enum State { kIdle, kSet, kCancelled };

promise<ValueType> promise_;
std::chrono::system_clock::time_point deadline_;
grpc::Alarm alarm_;
CallContext call_context_;
std::mutex mu_;
State state_ = kIdle;
grpc::Alarm alarm_;
};

} // namespace
Expand Down Expand Up @@ -190,11 +207,9 @@ void DefaultCompletionQueueImpl::Run() {
}

void DefaultCompletionQueueImpl::Shutdown() {
{
std::lock_guard<std::mutex> lk(mu_);
shutdown_ = true;
shutdown_guard_.reset();
}
std::lock_guard<std::mutex> lk(mu_);
shutdown_ = true;
shutdown_guard_.reset();
}

void DefaultCompletionQueueImpl::CancelAll() {
Expand Down

0 comments on commit 88c38d3

Please sign in to comment.