#pragma once #include "core.h" #include #include #include #include #include #include #include "refcounted.h" #include "intrusive_ptr.h" #include "Try.h" #include "function_traits.h" namespace vlc { template class future; template class promise; namespace detail { template class FutureObject : public Refcounted> { enum class State : int32_t { Start, ResultSet, CallbackSet, Finished }; public: using Function = std::function&&)>; FutureObject() : m_state(State::Start) { } explicit FutureObject(Try&& value) : m_state(State::ResultSet), m_result(std::move(value)) { } inline State state() const { return m_state.load(std::memory_order_acquire); } bool ready() const { switch (state()) { case State::ResultSet: case State::Finished: return true; default: return false; } } Try& result() { if (ready()) { return m_result; } throw std::logic_error("Future is not ready"); } template void setCallback(F&& f) { m_callback = std::forward(f); for (;;) { auto old_state = state(); switch (old_state) { case State::Start: if (m_state.compare_exchange_weak(old_state, State::CallbackSet, std::memory_order_release, std::memory_order_acquire)) return; // State has changed. Reload value and try again break; case State::ResultSet: m_state.store(State::Finished, std::memory_order_relaxed); fireAndRelease(); return; default: throw std::logic_error("Invalid state"); } } } void setResult(Try&& value) { m_result = std::move(value); for (;;) { auto old_state = state(); switch (old_state) { case State::Start: m_lock.lock(); if (m_state.compare_exchange_weak(old_state, State::ResultSet, std::memory_order_release, std::memory_order_acquire)) { m_lock.unlock(); cv.notify_all(); return; } m_lock.unlock(); // State has changed. Reload value and try again break; case State::CallbackSet: m_state.store(State::Finished, std::memory_order_relaxed); fireAndRelease(); return; default: throw std::logic_error("Invalid state"); } } } void wait() const { std::unique_lock lk(m_lock); assert(state() == State::Start || state() == State::ResultSet); cv.wait(lk, [this]() { return state() == State::ResultSet; }); assert(state() == State::ResultSet); } template bool wait_for(const std::chrono::duration& timeout_duration) const { std::unique_lock lk(m_lock); assert(state() == State::Start || state() == State::ResultSet); bool status = cv.wait_for(lk, timeout_duration, [this]() { return state() == State::ResultSet; }); assert(status || !(state() == State::ResultSet)); return status; } template bool wait_until(const std::chrono::time_point& timeout_time) const { std::unique_lock lk(m_lock); assert(state() == State::Start || state() == State::ResultSet); bool status = cv.wait_until(lk, timeout_time, [this]() { return state() == State::ResultSet; }); return status; } private: void fireAndRelease() { assert(m_callback); assert(m_state == State::Finished); m_callback(std::move(m_result)); m_callback = nullptr; } std::atomic m_state; Function m_callback; Try m_result; mutable std::condition_variable cv; mutable std::mutex m_lock; }; template struct IsFuture : std::false_type { using type = T; }; template struct IsFuture> : std::true_type { using type = T; }; template using result_of = decltype(std::declval()(std::declval()...)); template struct ArgResult { using Result = result_of; }; template struct IsCallableWith { template> static constexpr std::true_type check(std::nullptr_t); template static constexpr std::false_type check(...); static constexpr bool value = decltype(check(nullptr))::value; }; template struct CallableResult { using Argument = typename std::conditional< IsCallableWith::value, ArgResult, typename std::conditional< IsCallableWith::value, ArgResult, typename std::conditional< IsCallableWith::value, ArgResult, typename std::conditional< IsCallableWith&&>::value, ArgResult&&>, ArgResult&> >::type >::type >::type >::type; using ReturnsFuture = IsFuture; using Result = future; }; template struct CallableResult { using Argument = typename std::conditional < IsCallableWith::value, ArgResult, typename std::conditional< IsCallableWith&&>::value, ArgResult&&>, ArgResult&> >::type >::type; using ReturnsFuture = IsFuture; using Result = future; }; template struct FunctionReferenceToPointer { using type = T; }; template struct FunctionReferenceToPointer { using type = R(*)(Args...); }; template struct Passables : public Refcounted> { using TupleType = std::tuple::type...>; TupleType t; Passables(std::tuple&& v) : t(std::move(v)) { } template Passables(SourceArgs&&... args) : t(std::forward(args)...) { } Passables(Passables&& src) { std::swap(t, src.t); } Passables& operator = (Passables&& src) { std::swap(t, src.t); return *this; } template auto arg() -> typename std::tuple_element::type & { return std::get(t); } template auto arg() const -> const typename std::tuple_element::type& { return std::get(t); } Passables(const Passables&) = delete; Passables& operator = (const Passables&) = delete; }; } template class future { public: using value_type = T; future() noexcept = default; ~future() = default; future(const future&) = delete; future& operator = (const future&) = delete; future(future&& src) noexcept : m_obj(std::move(src.m_obj)) { } future& operator = (future&& src) noexcept { std::swap(m_obj, src.m_obj); return *this; } template::type>::value>::type > future(ValueType&& value) : m_obj(new detail::FutureObject(Try(std::forward(value)))) { } bool valid() const noexcept { return (bool)m_obj; } bool ready() const { return valid() ? m_obj->ready() : false; } future& wait() { if (ready()) { return *this; } throwIfInvalid(); m_obj->wait(); return *this; } template bool wait_for(const std::chrono::duration& timeout_time) const { if (ready()) { return true; } throwIfInvalid(); return m_obj->wait_for(timeout_time); } template bool wait_until(const std::chrono::time_point& timeout_time) const { if (ready()) { return true; } throwIfInvalid(); return m_obj->wait_until(timeout_time); } T get() { return std::move(wait().value()); } typename std::add_lvalue_reference::type value() { throwIfInvalid(); return m_obj->result().value(); } typename std::add_lvalue_reference::type value() const { throwIfInvalid(); return m_obj->result().value(); } template::type, typename R = detail::CallableResult> typename R::Result then(F&& func) { using Arguments = typename R::Argument; return then_impl(std::forward(func), Arguments()); } template future on_exception(F&& func) { return on_error_impl(std::forward(func)); } template future ensure(F&& func) { auto pt = make_intrusive>(std::forward(func)); return then([pt](Try&& result) { std::move(pt->template arg<0>())(); return make_future(std::move(result)); }); } /// Internal methods using FutureObjectPtr = IntrusivePtr>; template void setCallback(F&& func) { throwIfInvalid(); m_obj->setCallback(std::forward(func)); } private: FutureObjectPtr m_obj; template friend class promise; template friend future make_future(Try&&); explicit future(FutureObjectPtr obj) : m_obj(obj) { } void throwIfInvalid() const { if (!m_obj) { throw std::future_error(std::future_errc::no_state); } } template typename std::enable_if::type // If returns T then_impl(F&& func, detail::ArgResult) { static_assert(sizeof...(Args) <= 1, "then callback must accept one or zero arguments"); throwIfInvalid(); using RetType = typename R::ReturnsFuture::type; promise p; auto f = p.get_future(); auto pt = make_intrusive, F>>(std::move(p), std::forward(func)); setCallback([pt](Try&& result) mutable { auto&& pm = pt->template arg<0>(); auto&& func = pt->template arg<1>(); if (!isTry && result.isFailure()) { pm.set_exception(std::move(result.exception())); } else { pm.set_with([&] { return std::move(func)(result.template get()...); }); } }); return f; } template typename std::enable_if::type // If returns future then_impl(F&& func, detail::ArgResult) { static_assert(sizeof...(Args) <= 1, "then callback must accept one or zero arguments"); throwIfInvalid(); using RetType = typename R::ReturnsFuture::type; promise p; auto f = p.get_future(); auto pt = make_intrusive, F>>(std::move(p), std::forward(func)); setCallback([pt](Try&& result) mutable { auto&& pm = pt->template arg<0>(); auto&& func = pt->template arg<1>(); std::exception_ptr exc; if (result.isFailure()) { exc = std::move(result.exception()); } else { try { auto f2 = std::move(func)(result.template get()...); f2.setCallback([pt](Try&& result2) { pt->template arg<0>().set_try(std::move(result2)); }); } catch (...) { exc = std::current_exception(); } } if (exc) { pm.set_exception(exc); } }); return f; } template::result_type> // If callback returns T typename std::enable_if::value, future>::type on_error_impl(F&& func) { using Exc = typename function_traits::template arg<0>::type; static_assert(std::is_same::value, "Return type of on_error callback must be T or future"); promise p; auto f = p.get_future(); auto pt = make_intrusive, F>>(std::move(p), std::forward(func)); setCallback([pt](Try&& result) mutable { if (!result.template recoverWith([&](Exc& e) { auto&& pm = pt->template arg<0>(); auto&& func = pt->template arg<1>(); pm.set_with([&] { return std::move(func)(e); }); })) { pt->template arg<0>().set_try(std::move(result)); } }); return f; } template::result_type> // If callback returns future typename std::enable_if::value, future>::type on_error_impl(F&& func) { using Exc = typename function_traits::template arg<0>::type; static_assert(std::is_same>::value, "Return type of on_error callback must be T or future"); promise p; auto f = p.get_future(); auto pt = make_intrusive, F>>(std::move(p), std::forward(func)); setCallback([pt](Try&& result) { auto&& pm = pt->template arg<0>(); if (!result.template recoverWith([&](Exc& e) { auto&& f = pt->template arg<1>(); std::exception_ptr exc; try { auto f2 = std::move(f)(e); f2.setCallback([pt](Try&& result2) mutable { auto&& pm = pt->template arg<0>(); pm.set_try(std::move(result2)); }); } catch (...) { exc = std::current_exception(); } if (exc) { pm.set_exception(std::move(exc)); } })) { auto&& pm = pt->template arg<0>(); pm.set_try(std::move(result)); } }); return f; } }; template class promise { public: promise() : m_obj(new detail::FutureObject()) { } ~promise() { if (m_obj) { if (m_future_retrieved && !m_obj->ready()) { m_obj->setResult(Try(std::make_exception_ptr(std::future_error(std::future_errc::broken_promise)))); } } } promise(const promise&) = delete; promise& operator = (const promise&) = delete; promise(promise&& src) noexcept : m_obj(std::move(src.m_obj)), m_future_retrieved(src.m_future_retrieved) { src.m_future_retrieved = false; } promise& operator = (promise&& src) noexcept { std::swap(m_obj, src.m_obj); std::swap(m_future_retrieved, src.m_future_retrieved); return *this; } future get_future() { throwIfRetrieved(); m_future_retrieved = true; return future(m_obj); } void set_try(Try&& t) { throwIfFulfilled(); m_obj->setResult(std::move(t)); } template void set_value(const ValueType& value) { throwIfFulfilled(); m_obj->setResult(Try(value)); } template void set_value(ValueType&& value) { throwIfFulfilled(); m_obj->setResult(Try(std::forward(value))); } void set_exception(std::exception_ptr exc) { throwIfFulfilled(); m_obj->setResult(Try(exc)); } template void set_with(F&& setter) { throwIfFulfilled(); m_obj->setResult(make_try_with(std::forward(setter))); } private: using FutureObjectPtr = IntrusivePtr>; FutureObjectPtr m_obj; bool m_future_retrieved = false; void throwIfRetrieved() { if (m_future_retrieved) { throw std::future_error(std::future_errc::future_already_retrieved); } } void throwIfFulfilled() { if (!m_obj) { throw std::future_error(std::future_errc::no_state); } if (m_obj->ready()) { throw std::future_error(std::future_errc::promise_already_satisfied); } } }; /// Helpers template future make_future(Try&& t) { return future(make_intrusive>(std::move(t))); } template future::type> make_future(T&& value) { using DecayedType = typename std::decay::type; return make_future(Try(std::forward(value))); } template future make_future(const std::exception_ptr& e) { return make_future(Try(e)); } template typename std::enable_if::value, future>::type make_future(const E& e) { return make_future(std::make_exception_ptr(e)); } template::result_type> typename std::enable_if::value, R>::type make_future_with(F&& func) { using T = typename detail::IsFuture::type; try { return func(); } catch (...) { return make_future(std::current_exception()); } } template::result_type> typename std::enable_if::value, future>::type make_future_with(F&& func) { return make_future(make_try_with([&func]() mutable { return func(); })); } inline future make_future() { return future(); } template::value_type, typename ResultType = typename decltype(std::declval().then(std::declval()))::value_type > auto map(Iterator first, Iterator last, F&& func) -> std::vector> { std::vector> result(std::distance(first, last)); for (size_t id = 0; first != last; ++first, ++id) { result[id] = first->then(std::forward(func)); } return result; } template auto map(Container&& c, F&& f) -> decltype(map(c.begin(), c.end(), f)) { return map(c.begin(), c.end(), std::forward(f)); } namespace detail { template struct Collector { using ResultType = std::vector; explicit Collector(size_t n) : result(n), has_exception(false) { } ~Collector() { if (!has_exception) { p.set_value(std::move(result)); } } void setResult(uint32_t i, Try& t) { result[i] = std::move(t.value()); } promise p; ResultType result; std::atomic has_exception; }; template<> struct Collector { using ResultType = void; explicit Collector(size_t) : has_exception(false) { } ~Collector() { if (!has_exception) { p.set_value(Try()); } } void setResult(uint32_t, Try&) { } promise p; std::atomic has_exception; }; template struct CollectorAll { using ResultType = std::vector>; CollectorAll(size_t n) : results(n) { } ~CollectorAll() { p.set_value(std::move(results)); } void setResult(uint32_t i, Try&& t) { results[i] = std::forward>(t); } promise p; ResultType results; }; template<> struct CollectorAll { using ResultType = void; CollectorAll(size_t) { } ~CollectorAll() { p.set_value(Try()); } void setResult(uint32_t, Try&&) { } promise p; }; template using TypeFromIterator = typename std::iterator_traits::value_type; } /// Like collectAll, but will short circuit on the first exception. Thus, the /// type of the returned Future is std::vector instead of /// std::vector> template auto collect(Iterator first, Iterator last) -> future::value_type>::ResultType> { using ValueType = typename detail::TypeFromIterator::value_type; auto ctx = std::make_shared>(std::distance(first, last)); uint32_t id = 0; for (; first != last; ++first, ++id) { first->setCallback([id, ctx](Try&& t) { if (t.isFailure()) { if (!ctx->has_exception.exchange(true)) { ctx->p.set_exception(std::move(t.exception())); } } else if (!ctx->has_exception) { ctx->setResult(id, t); } }); } return ctx->p.get_future(); } template auto collect(Container&& c) -> decltype(collect(c.begin(), c.end())) { return collect(c.begin(), c.end()); } /** When all the input Futures complete, the returned Future will complete. Errors do not cause early termination; this Future will always succeed after all its Futures have finished (whether successfully or with an error). The Futures are moved in, so your copies are invalid. If you need to chain further from these Futures, use the variant with an output iterator. */ template auto collectAll(Iterator first, Iterator last) -> future::value_type>::ResultType> { using ValueType = typename detail::TypeFromIterator::value_type; auto ctx = std::make_shared>(std::distance(first, last)); uint32_t id = 0; for (; first != last; ++first, ++id) { first->setCallback([id, ctx](Try&& t) { ctx->setResult(id, std::forward>(t)); }); } return ctx->p.get_future(); } template auto collectAll(Container&& c) -> decltype(collectAll(c.begin(), c.end())) { return collectAll(c.begin(), c.end()); } namespace detail { template struct CollectorVariadic { promise> p; std::tuple...> result; std::atomic has_exception; using ResultType = future>; CollectorVariadic() : has_exception(false) { } ~CollectorVariadic() { if (!has_exception) { p.set_value(unwrap_tuple(std::move(result))); } } CollectorVariadic(const CollectorVariadic&) = delete; CollectorVariadic& operator = (const CollectorVariadic&) = delete; template void set_result(Try& t) { if (t.isFailure()) { if (!has_exception.exchange(true)) { p.set_exception(t.exception()); } } else if (!has_exception) { std::get(result) = std::move(t); } } }; template struct CollectorAllVariadic { promise...>> p; std::tuple...> result; using ResultType = future...>>; CollectorAllVariadic() = default; ~CollectorAllVariadic() { p.set_value(std::move(result)); } template void set_result(Try& t) { std::get(result) = std::move(t); } }; template< template class Ctx, typename... Ts> void setCallbackCollectorVariadic(const std::shared_ptr>&) { } template< template class Ctx, typename... Ts, typename Head, typename... Rest> void setCallbackCollectorVariadic(const std::shared_ptr>& ctx, Head&& head, Rest&&... rest) { head.setCallback([ctx](Try&& t) { ctx->template set_result(t); }); setCallbackCollectorVariadic(ctx, std::forward(rest)...); } } template auto collect(Futures&&... fs) -> typename detail::CollectorVariadic::type::value_type...>::ResultType { auto ctx = std::make_shared::type::value_type...>>(); detail::setCallbackCollectorVariadic(ctx, std::forward::type>(fs)...); return ctx->p.get_future(); } template auto collectAll(Futures&&... fs) -> typename detail::CollectorAllVariadic::type::value_type...>::ResultType { auto ctx = std::make_shared::type::value_type...>>(); detail::setCallbackCollectorVariadic(ctx, std::forward::type>(fs)...); return ctx->p.get_future(); } }