diff --git a/include/asioexec/completion_token.hpp b/include/asioexec/completion_token.hpp index e0817012f..fb295e2fc 100644 --- a/include/asioexec/completion_token.hpp +++ b/include/asioexec/completion_token.hpp @@ -136,10 +136,7 @@ namespace asioexec { ::STDEXEC::set_stopped_t() >; - template - class completion_handler; - - template + template struct operation_state_base { class frame_; @@ -153,57 +150,71 @@ namespace asioexec { Receiver r_; asio_impl::cancellation_signal signal_; - std::recursive_mutex m_; + [[no_unique_address]] + Mutex m_; frame_* frames_{nullptr}; std::exception_ptr ex_; bool abandoned_{false}; class frame_ { - operation_state_base& self_; - std::unique_lock l_; + operation_state_base* self_; frame_* prev_; public: explicit frame_(operation_state_base& self) noexcept - : self_(self) - , l_(self.m_) + : self_([&]() noexcept { + self.m_.lock(); + return &self; + }()) , prev_(self.frames_) { self.frames_ = this; } + constexpr frame_(frame_&& other) noexcept + : self_(std::exchange(other.self_, nullptr)) + , prev_(std::exchange(other.prev_, nullptr)) { + } + frame_(const frame_&) = delete; ~frame_() noexcept { - if (l_) { - STDEXEC_ASSERT(self_.frames_ == this); - self_.frames_ = prev_; - if (!self_.frames_ && self_.abandoned_) { + if (self_) { + std::unique_lock l(self_->m_, std::adopt_lock); + STDEXEC_ASSERT(self_->frames_ == this); + self_->frames_ = prev_; + if (!self_->frames_ && self_->abandoned_) { // We are the last frame and the handler is gone so it's up to us to // finalize the operation - l_.unlock(); - self_.callback_.reset(); - if (self_.ex_) { - ::STDEXEC::set_error(static_cast(self_.r_), std::move(self_.ex_)); + l.unlock(); + self_->callback_.reset(); + if (self_->ex_) { + ::STDEXEC::set_error(static_cast(self_->r_), std::move(self_->ex_)); } else { - ::STDEXEC::set_stopped(static_cast(self_.r_)); + ::STDEXEC::set_stopped(static_cast(self_->r_)); } } } } explicit operator bool() const noexcept { - return bool(l_); + return bool(self_); } void release() noexcept { - auto ptr = this; - do { - STDEXEC_ASSERT(ptr->l_); - STDEXEC_ASSERT(self_.frames_ == ptr); - ptr = ptr->prev_; - self_.frames_->l_.unlock(); - self_.frames_->prev_ = nullptr; - self_.frames_ = ptr; - } while (ptr); + auto&& self = *self_; + STDEXEC_ASSERT(this == self.frames_); + for (;;) { + STDEXEC_ASSERT(self.frames_); + STDEXEC_ASSERT(self.frames_->self_ == &self); + self.frames_->self_ = nullptr; + const auto prev = self.frames_->prev_; + self.frames_->prev_ = nullptr; + self.frames_ = prev; + self.m_.unlock(); + if (!prev) { + break; + } + } + STDEXEC_ASSERT(!self_); } }; @@ -238,11 +249,12 @@ namespace asioexec { callback_; }; - template + template class completion_handler { - operation_state_base* self_; + using operation_state_type_ = operation_state_base; + operation_state_type_* self_; public: - explicit completion_handler(operation_state_base& self) noexcept + explicit completion_handler(operation_state_type_& self) noexcept : self_(&self) { } @@ -257,7 +269,7 @@ namespace asioexec { // When this goes out of scope it might send set stopped or set error, or // it might defer that to the executor frames above us on the call stack // (if any) - const typename operation_state_base::frame_ frame(*self_); + const typename operation_state_type_::frame_ frame(*self_); self_->abandoned_ = true; } } @@ -296,15 +308,20 @@ namespace asioexec { return self_->signal_.slot(); } - operation_state_base& state() const noexcept { + operation_state_type_& state() const noexcept { STDEXEC_ASSERT(self_); return *self_; } }; - template - class operation_state : operation_state_base { - using base_ = operation_state_base; + template < + typename Mutex, + typename Signatures, + typename Receiver, + typename Initiation, + typename Args> + class operation_state : operation_state_base { + using base_ = operation_state_base; Initiation init_; Args args_; public: @@ -326,7 +343,7 @@ namespace asioexec { [&](auto&&... args) { std::invoke( static_cast(init_), - completion_handler(*this), + completion_handler(*this), static_cast(args)...); }, std::move(args_)); @@ -349,9 +366,12 @@ namespace asioexec { } }; - template + template class sender { using args_type_ = std::tuple...>; + template + using operation_state_type_ = + operation_state, Initiation, args_type_>; public: using sender_concept = ::STDEXEC::sender_t; @@ -375,32 +395,25 @@ namespace asioexec { template requires ::STDEXEC::receiver_of< std::remove_cvref_t, - ::STDEXEC::completion_signatures_of_t> - > - constexpr auto connect(Receiver&& receiver) const & noexcept( - std::is_nothrow_constructible_v< - operation_state, Initiation, args_type_>, - Receiver, - const Initiation&, - const args_type_& - >) { - return operation_state, Initiation, args_type_>( - static_cast(receiver), init_, args_); + ::STDEXEC::completion_signatures_of_t>> + constexpr auto connect(Receiver&& receiver) const & noexcept(std::is_nothrow_constructible_v< + operation_state_type_, + Receiver, + const Initiation&, + const args_type_&>) { + return operation_state_type_(static_cast(receiver), init_, args_); } template requires ::STDEXEC::receiver_of< std::remove_cvref_t, - ::STDEXEC::completion_signatures_of_t> - > - constexpr auto connect(Receiver&& receiver) && noexcept( - std::is_nothrow_constructible_v< - operation_state, Initiation, args_type_>, - Receiver, - Initiation, - args_type_ - >) { - return operation_state, Initiation, args_type_>( + ::STDEXEC::completion_signatures_of_t>> + constexpr auto connect(Receiver&& receiver) && noexcept(std::is_nothrow_constructible_v< + operation_state_type_, + Receiver, + Initiation, + args_type_>) { + return operation_state_type_( static_cast(receiver), static_cast(init_), static_cast(args_)); @@ -410,9 +423,10 @@ namespace asioexec { args_type_ args_; }; - template + template class executor { - operation_state_base& self_; + using operation_state_type_ = operation_state_base; + operation_state_type_& self_; Executor ex_; template @@ -422,9 +436,7 @@ namespace asioexec { }; } public: - constexpr explicit executor( - operation_state_base& self, - const Executor& ex) noexcept + constexpr explicit executor(operation_state_type_& self, const Executor& ex) noexcept : self_(self) , ex_(ex) { } @@ -443,7 +455,7 @@ namespace asioexec { } constexpr decltype(auto) prefer(Args&&... args) const noexcept { const auto ex = asio_impl::prefer(ex_, static_cast(args)...); - return executor>(self_, ex); + return executor>(self_, ex); } template @@ -452,7 +464,7 @@ namespace asioexec { } constexpr decltype(auto) require(Args&&... args) const noexcept { const auto ex = asio_impl::require(ex_, static_cast(args)...); - return executor>(self_, ex); + return executor>(self_, ex); } template @@ -503,59 +515,78 @@ namespace asioexec { bool operator!=(const executor& rhs) const = default; }; + template + struct t { + static constexpr auto as_default_on = ::asioexec::as_default_on; + template + using as_default_on_t = ::asioexec::as_default_on_t; + }; + + struct null_basic_lockable { + constexpr void lock() noexcept { + } + constexpr void unlock() noexcept { + } + }; + } // namespace detail::completion_token - struct completion_token_t { - static constexpr auto as_default_on = asioexec::as_default_on; - template - using as_default_on_t = asioexec::as_default_on_t; - }; + using completion_token_t = detail::completion_token::t; inline const completion_token_t completion_token{}; + using thread_unsafe_completion_token_t = + detail::completion_token::t; + + inline const thread_unsafe_completion_token_t thread_unsafe_completion_token{}; + } // namespace asioexec namespace ASIOEXEC_ASIO_NAMESPACE { - template - struct async_result<::asioexec::completion_token_t, Signatures...> { + template + struct async_result<::asioexec::detail::completion_token::t, Signatures...> { template requires(std::is_constructible_v, Args> && ...) - static constexpr auto - initiate(Initiation&& i, const ::asioexec::completion_token_t&, Args&&... args) { + static constexpr auto initiate( + Initiation&& i, + const ::asioexec::detail::completion_token::t&, + Args&&... args) { return ::asioexec::detail::completion_token::sender< + Mutex, ::asioexec::detail::completion_token::completion_signatures, std::remove_cvref_t, - Args... - >(static_cast(i), static_cast(args)...); + Args...>(static_cast(i), static_cast(args)...); } }; - template + template struct associated_executor< - ::asioexec::detail::completion_token::completion_handler, - Executor - > { - using type = ::asioexec::detail::completion_token::executor; + ::asioexec::detail::completion_token::completion_handler, + Executor> { + using type = + ::asioexec::detail::completion_token::executor; static type get( - const ::asioexec::detail::completion_token::completion_handler& h, + const ::asioexec::detail::completion_token::completion_handler& + h, const Executor& ex) noexcept { return type(h.state(), ex); } }; - template + template requires requires(const Receiver& r) { ::STDEXEC::get_allocator(::STDEXEC::get_env(r)); } struct associated_allocator< - ::asioexec::detail::completion_token::completion_handler, + ::asioexec::detail::completion_token::completion_handler, Allocator > { using type = std::remove_cvref_t())))>; static type get( - const ::asioexec::detail::completion_token::completion_handler& h, + const ::asioexec::detail::completion_token::completion_handler& + h, const Allocator&) noexcept { return ::STDEXEC::get_allocator(::STDEXEC::get_env(h.state().r_)); } diff --git a/include/asioexec/use_sender.hpp b/include/asioexec/use_sender.hpp index 291ddf181..3c60cafcd 100644 --- a/include/asioexec/use_sender.hpp +++ b/include/asioexec/use_sender.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -190,30 +191,38 @@ namespace asioexec { template explicit sender(Sender) -> sender; + template + struct t { + static constexpr auto as_default_on = ::asioexec::as_default_on; + template + using as_default_on_t = ::asioexec::as_default_on_t; + }; + } // namespace detail::use_sender - struct use_sender_t { - static constexpr auto as_default_on = asioexec::as_default_on; - template - using as_default_on_t = asioexec::as_default_on_t; - }; + using use_sender_t = detail::use_sender::t; inline const use_sender_t use_sender{}; + using thread_unsafe_use_sender_t = + detail::use_sender::t; + + inline const thread_unsafe_use_sender_t thread_unsafe_use_sender{}; + } // namespace asioexec namespace ASIOEXEC_ASIO_NAMESPACE { - template - struct async_result<::asioexec::use_sender_t, Signatures...> { + template + struct async_result<::asioexec::detail::use_sender::t, Signatures...> { template requires(std::is_constructible_v, Args> && ...) static constexpr auto - initiate(Initiation&& i, const ::asioexec::use_sender_t&, Args&&... args) { + initiate(Initiation&& i, const ::asioexec::detail::use_sender::t&, Args&&... args) { return ::asioexec::detail::use_sender::sender( - async_result<::asioexec::completion_token_t, Signatures...>::initiate( + async_result<::asioexec::detail::completion_token::t, Signatures...>::initiate( static_cast(i), - ::asioexec::completion_token, + ::asioexec::detail::completion_token::t{}, static_cast(args)...)); } }; diff --git a/test/asioexec/test_completion_token.cpp b/test/asioexec/test_completion_token.cpp index 362e92ce3..25fcf11a6 100644 --- a/test/asioexec/test_completion_token.cpp +++ b/test/asioexec/test_completion_token.cpp @@ -162,7 +162,7 @@ namespace { asio_impl::io_context ctx; asio_impl::system_timer t(ctx); t.expires_after(std::chrono::years(1)); - auto sender = t.async_wait(completion_token); + auto sender = t.async_wait(thread_unsafe_completion_token); static_assert(set_equivalent< completion_signatures_of_t>, completion_signatures< diff --git a/test/asioexec/test_use_sender.cpp b/test/asioexec/test_use_sender.cpp index 1d1655d60..b5dedcf0b 100644 --- a/test/asioexec/test_use_sender.cpp +++ b/test/asioexec/test_use_sender.cpp @@ -82,7 +82,7 @@ namespace { asio_impl::io_context ctx; asio_impl::system_timer t(ctx); t.expires_after(std::chrono::years(1)); - auto sender = t.async_wait(use_sender); + auto sender = t.async_wait(thread_unsafe_use_sender); static_assert(::STDEXEC::sender_in); static_assert( ::STDEXEC::sender_of<