Skip to content

Commit

Permalink
feat: add promise::unharvest()
Browse files Browse the repository at this point in the history
  • Loading branch information
yulon committed Aug 1, 2023
1 parent 2f5d3a7 commit ebd206b
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 34 deletions.
29 changes: 12 additions & 17 deletions include/rua/sync/chan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class chan {

chan &operator=(const chan &) = delete;

bool send(T val) {
bool send(expected<T> val) {
optional<promise<T> *> recv_wtr;
if ($vals.emplace_front_if(
[this, &recv_wtr]() -> bool {
Expand All @@ -37,26 +37,21 @@ class chan {
}
assert(recv_wtr);
assert(*recv_wtr);
auto refunded = (*recv_wtr)->fulfill(std::move(val));
if (refunded) {
send(std::move(refunded).value());
}
(*recv_wtr)->fulfill(
std::move(val), [this](expected<T> val) { send(std::move(val)); });
return true;
}

optional<T> try_recv() {
future<T> recv() {
auto val_opt =
#ifdef NDEBUG
return $vals.pop_back();
$vals.pop_back();
#else
return $vals.pop_back_if_non_empty_and([this]() -> bool {
assert(!$recv_wtrs);
return !$recv_wtrs;
});
$vals.pop_back_if_non_empty_and([this]() -> bool {
assert(!$recv_wtrs);
return !$recv_wtrs;
});
#endif
}

future<T> recv() {
auto val_opt = try_recv();
if (val_opt) {
return *std::move(val_opt);
}
Expand All @@ -69,13 +64,13 @@ class chan {
return future<T>(*prm);
}

prm->unfulfill_and_harvest();
prm->unuse();

return *std::move(val_opt);
}

private:
lockfree_list<T> $vals;
lockfree_list<expected<T>> $vals;
lockfree_list<promise<T> *> $recv_wtrs;
};

Expand Down
2 changes: 1 addition & 1 deletion include/rua/sync/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class future : private enable_await_operators {

void reset() noexcept {
if ($v.template type_is<promise<PromiseValue> *>()) {
$v.template as<promise<PromiseValue> *>()->await_resume();
$v.template as<promise<PromiseValue> *>()->unharvest();
}
$v.reset();
}
Expand Down
2 changes: 1 addition & 1 deletion include/rua/sync/mutex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class mutex {
return future<>(*prm);
}

prm->unfulfill_and_harvest();
prm->unuse();

return future<>();
}
Expand Down
42 changes: 28 additions & 14 deletions include/rua/sync/promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,42 +50,38 @@ class promise<T, void> {

//////////////////// fulfill ////////////////////

expected<T> fulfill(
expected<T> value = expected_or<T>(err_promise_unfulfilled)) noexcept {
void fulfill(
expected<T> value = expected_or<T>(err_promise_unfulfilled),
std::function<void(expected<T>)> on_unharvested = nullptr) noexcept {

assert(std::is_void<T>::value || !$val);

$val = std::move(value);
$on_unharvested = std::move(on_unharvested);

auto old_state = $state.exchange(promise_state::fulfilled);

assert(old_state != promise_state::fulfilled);

expected<T> refunded;

switch (old_state) {

case promise_state::harvested:
assert(
$state.exchange(promise_state::destroying) ==
promise_state::fulfilled);
refunded = std::move($val);
if ($on_unharvested && $val) {
$on_unharvested(std::move($val));
}
on_destroy();
break;

case promise_state::has_notify: {
assert($notify);
auto notify = std::move($notify);
notify();
RUA_FALLTHROUGH;
}

default:
refunded = err_promise_fulfilled;
break;
}

return refunded;
}
}

void unfulfill() noexcept {
Expand Down Expand Up @@ -137,11 +133,28 @@ class promise<T, void> {
return r;
}

void unharvest() noexcept {
auto old_state = $state.exchange(promise_state::harvested);

assert(old_state != promise_state::harvested);

if (old_state != promise_state::fulfilled) {
return;
}
assert(
$state.exchange(promise_state::destroying) ==
promise_state::harvested);
if ($on_unharvested && $val) {
$on_unharvested(std::move($val));
}
on_destroy();
}

//////////////////// unused ////////////////////

void unfulfill_and_harvest() noexcept {
void unuse() noexcept {
unfulfill();
await_resume();
unharvest();
}

protected:
Expand All @@ -151,6 +164,7 @@ class promise<T, void> {
std::atomic<promise_state> $state;
expected<T> $val;
std::function<void()> $notify;
std::function<void(expected<T>)> $on_unharvested;
};

template <typename T, typename Extend>
Expand Down
2 changes: 1 addition & 1 deletion include/rua/sync/then.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ inline future<R> then(Awaitable &&awaitable, Callback &&callback) {
return prm->extend().aw->await_resume();
});

prm->unfulfill_and_harvest();
prm->unuse();

return exp;
}
Expand Down

0 comments on commit ebd206b

Please sign in to comment.