From ebd206be1d536982ed82131b3b665b3b0dad4865 Mon Sep 17 00:00:00 2001 From: Yulon Date: Wed, 2 Aug 2023 01:52:47 +0800 Subject: [PATCH] feat: add promise::unharvest() --- include/rua/sync/chan.hpp | 29 +++++++++++-------------- include/rua/sync/future.hpp | 2 +- include/rua/sync/mutex.hpp | 2 +- include/rua/sync/promise.hpp | 42 ++++++++++++++++++++++++------------ include/rua/sync/then.hpp | 2 +- 5 files changed, 43 insertions(+), 34 deletions(-) diff --git a/include/rua/sync/chan.hpp b/include/rua/sync/chan.hpp index 092268f..970c804 100644 --- a/include/rua/sync/chan.hpp +++ b/include/rua/sync/chan.hpp @@ -25,7 +25,7 @@ class chan { chan &operator=(const chan &) = delete; - bool send(T val) { + bool send(expected val) { optional *> recv_wtr; if ($vals.emplace_front_if( [this, &recv_wtr]() -> bool { @@ -37,26 +37,21 @@ class chan { } assert(recv_wtr); assert(*recv_wtr); - auto refunded = (*recv_wtr)->fulfill(std::move(val)); - if (refunded) { - send(std::move(refunded).value()); - } + (*recv_wtr)->fulfill( + std::move(val), [this](expected val) { send(std::move(val)); }); return true; } - optional try_recv() { + future recv() { + auto val_opt = #ifdef NDEBUG - return $vals.pop_back(); + $vals.pop_back(); #else - return $vals.pop_back_if_non_empty_and([this]() -> bool { - assert(!$recv_wtrs); - return !$recv_wtrs; - }); + $vals.pop_back_if_non_empty_and([this]() -> bool { + assert(!$recv_wtrs); + return !$recv_wtrs; + }); #endif - } - - future recv() { - auto val_opt = try_recv(); if (val_opt) { return *std::move(val_opt); } @@ -69,13 +64,13 @@ class chan { return future(*prm); } - prm->unfulfill_and_harvest(); + prm->unuse(); return *std::move(val_opt); } private: - lockfree_list $vals; + lockfree_list> $vals; lockfree_list *> $recv_wtrs; }; diff --git a/include/rua/sync/future.hpp b/include/rua/sync/future.hpp index 02d9ecd..bcf536f 100644 --- a/include/rua/sync/future.hpp +++ b/include/rua/sync/future.hpp @@ -145,7 +145,7 @@ class future : private enable_await_operators { void reset() noexcept { if ($v.template type_is *>()) { - $v.template as *>()->await_resume(); + $v.template as *>()->unharvest(); } $v.reset(); } diff --git a/include/rua/sync/mutex.hpp b/include/rua/sync/mutex.hpp index 9bcae8a..a32dfab 100644 --- a/include/rua/sync/mutex.hpp +++ b/include/rua/sync/mutex.hpp @@ -47,7 +47,7 @@ class mutex { return future<>(*prm); } - prm->unfulfill_and_harvest(); + prm->unuse(); return future<>(); } diff --git a/include/rua/sync/promise.hpp b/include/rua/sync/promise.hpp index 904d697..517f76b 100644 --- a/include/rua/sync/promise.hpp +++ b/include/rua/sync/promise.hpp @@ -50,26 +50,28 @@ class promise { //////////////////// fulfill //////////////////// - expected fulfill( - expected value = expected_or(err_promise_unfulfilled)) noexcept { + void fulfill( + expected value = expected_or(err_promise_unfulfilled), + std::function)> on_unharvested = nullptr) noexcept { assert(std::is_void::value || !$val); $val = std::move(value); + $on_unharvested = std::move(on_unharvested); auto old_state = $state.exchange(promise_state::fulfilled); assert(old_state != promise_state::fulfilled); - expected refunded; - switch (old_state) { case promise_state::harvested: assert( $state.exchange(promise_state::destroying) == promise_state::fulfilled); - refunded = std::move($val); + if ($on_unharvested && $val) { + $on_unharvested(std::move($val)); + } on_destroy(); break; @@ -77,15 +79,9 @@ class promise { assert($notify); auto notify = std::move($notify); notify(); - RUA_FALLTHROUGH; - } - - default: - refunded = err_promise_fulfilled; break; } - - return refunded; + } } void unfulfill() noexcept { @@ -137,11 +133,28 @@ class promise { return r; } + void unharvest() noexcept { + auto old_state = $state.exchange(promise_state::harvested); + + assert(old_state != promise_state::harvested); + + if (old_state != promise_state::fulfilled) { + return; + } + assert( + $state.exchange(promise_state::destroying) == + promise_state::harvested); + if ($on_unharvested && $val) { + $on_unharvested(std::move($val)); + } + on_destroy(); + } + //////////////////// unused //////////////////// - void unfulfill_and_harvest() noexcept { + void unuse() noexcept { unfulfill(); - await_resume(); + unharvest(); } protected: @@ -151,6 +164,7 @@ class promise { std::atomic $state; expected $val; std::function $notify; + std::function)> $on_unharvested; }; template diff --git a/include/rua/sync/then.hpp b/include/rua/sync/then.hpp index b4bc7b9..4ce340c 100644 --- a/include/rua/sync/then.hpp +++ b/include/rua/sync/then.hpp @@ -47,7 +47,7 @@ inline future then(Awaitable &&awaitable, Callback &&callback) { return prm->extend().aw->await_resume(); }); - prm->unfulfill_and_harvest(); + prm->unuse(); return exp; }