前言
最近的新闻里 C++20 已经确认的内容里已经有了协程组件,之前都是粗略看过这个协程草案。最近抽时间更加系统性的看了下接入和实现细节。
我的测试代码都是在MSVC下开启 /await
选项后测试的,在我本地的Linux clang环境中,可以通过 $LLVM_CLANG_PREFIX/bin/clang++ -std=c++2a -O0 -g -ggdb -stdlib=libc++ -fcoroutines-ts -lc++ -lc++abi -Wl,-rpath=$LLVM_CLANG_PREFIX/lib/ test.cpp
编译和运行。
在gcc 10+中,可以使用 g++ -std=c++20 -O0 -g -ggdb -fcoroutines
并把所有的 std::experimental::
都换成 std::
之后编译运行。
LLVM+Clang+libc++/libc++abi的编译安装脚本可以参见: https://github.com/owent-utils/bash-shell/tree/master/LLVM%26Clang Installer/7.0/
C++20 的协程基本原理
C++20 整个协程体系是 "无栈协程" 的思路,整个功能是需要结合编译器功能和STL来配合实现的。主要就是三个关键字(co_yield
、 co_await
或 co_return
)和围绕这三个关键字的接入。无栈协程对API的设计是有要求的,C++20 Coroutine也不例外, 编译器在检测到内部有使用 这三个关键字时会对函数的流程做patch,然后它的返回值类型必须符合你所使用的关键字的规范。这三个关键字的规范要求不太一样,下面会列举。
我原本以为的会放在协程的 awaiter 或者 handle 对象闭包里,然后由编译器分析和对闭包内的各级对象进行扩充的引用(类似Rust的那种实现)。但是在测试的MSVC和Clang的协程流程的过程中发现,实际上还是另外堆上分配空间来保存协程函数的栈上数据,并用这种方式实现Zero-Copy的。协程函数的执行栈和主函数执行栈并不在一个地址段内,这和之前猜想的不太一样。所以,C++20 的协程也不能完全说是 "无栈" ,只是在协程函数中需要能够评估出来它需要多少栈空间存数据,不像有栈协程那样会浪费比较大的地址空间且不利于内存页复用。
同时受限于这种设计,在C++20 的协程函数里,动态栈分配是不受支持的。在MSVC下,如果你使用了动态栈分配的函数 ( _alloca
) ,直接编译就不通过了。而在gcc/clang 下如果你使用了动态栈分配的函数 ( alloca
) ,分配出来的栈地址是不会受到协程的管理(即:多个协程分配出来的地址可能是重合的),在使用的时候用户得自己保证如果涉及协程且如何切出的话,运行结果不受这部分动态长度栈数据的影响,因为可能会被其他协程改掉(简单地说就是动态分配出来的栈只能在 co_yield
和 co_await
之前使用)。
不过我觉得类似GCC动态栈的那种方案可以让它支持动态栈空间,就是在栈溢出的signal里再mmap一段地址进去,按需增大栈空间。但是这玩意性能被诟病,信号和缺页中断都不稳定且栈空间地址分散不利于CPU Cache,估计最后也不会被采纳吧。
我目前看的提案以 N4736 为准(还有个使用文档是 p0973r0 )。一旦一个函数被断定为协程函数,那么它会被扩充为如下形式:
2019-09-29: 更新文档 https://en.cppreference.com/w/cpp/language/coroutines 这个 N4775 比 N4736 完整得多 : http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4775.pdf 后面会整合进 P0912R5 : http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p0912r5.html 目前还没有太大的变化。
Copy COROUTINE_OBJECT func ( args ...) {
try {
P p (promise_constructor_arguments);
// 这个P是自己定义的 using P = COROUTINE_OBJECT::promise_type;
// 文档上说promise_constructor_arguments 是空或者函数的参数的左值传入 args... ,但是目前版本的MSVC还仅支持空参数列表
COROUTINE_OBJECT r = p . get_return_object (); // MSVC 1, Clang 2
co_await p . initial_suspend (); // 初始化接口 MSVC 2, Clang 1
// 上面这两行是MSVC的顺序,在clang里上面两行的顺序相反
try {
// 原函数体 ...
p . return_void () or p . return_value (RET) // 取决于函数体里有没有 co_return RET
} catch (...) {
p . unhandled_exception (); // 未捕获的异常接口
}
final_suspend :
co_await p . final_suspend (); // final suspend point
return r;
} catch (...) {
return COROUTINE_OBJECT :: promise_type :: get_return_object_on_allocation_failure (); // noexcept
}
}
关键字 co_await
先简单理解为判定是否需要切出,后面会有更加详细一点的解释。 而协程函数一切的核心都在于上面的 COROUTINE_OBJECT
类型。里面有一些规范, co_yield
和 co_return
涉及 COROUTINE_OBJECT
里的 COROUTINE_OBJECT::promise_type
类型。必须实现某些公共接口和函数功能接口。而返回到外层的 COROUTINE_OBJECT
对象里也需要保存协程的handle(目前是 std::experimental::coroutine_handle<PROMISE_TYPE>
),以用于后续控制协程的上下文切换使用。 同样, 下面的例子因为必须写一个协程入口对象,所以有一部分比较烦杂的必须接入的代码。
结构如下:
协程闭包类型(promise)
要支持协程函数,首先要准备一个包装的类型,里面包含 promise_type
,然后提供基本的创建、维护handle的函数。比如:
Copy struct coroutine_task {
struct promise_type {
coroutine_task get_return_object () {
return coroutine_task{};
}
bool initial_suspend () const {
return false ;
}
bool final_suspend () const {
return false ;
}
};
};
然后,就可以声明使用协程函数 coroutine_task f();
了。但是要让他转变为协程函数,还需要至少接入一样协程关键字才行。我们先从最基本的 co_await
开始。
关键字 - co_await
关键字 co_await
是一个操作符,所以我们只要实现这个操作符重载就可以实现协程等待任意类型。 当然,返回的类型要求实现 bool await_ready(T&) noexcept
、 void await_suspend(T&, std::experimental::coroutine_handle<T>) noexcept
、 void await_resume(T&) noexcept
这三个接口。比如我们实现一个 co_wait
后结束的结构,可以按下面这种接入方式:
Copy struct wait_some_times {
int left_times;
std :: experimental :: coroutine_handle <> handle;
wait_some_times ( int t) : left_times (t) , handle ( nullptr ) {}
};
struct suspend_some_times {
wait_some_times & d;
suspend_some_times ( wait_some_times & _d) : d (_d) {}
bool await_ready () noexcept {
std :: cout << "call await_ready: " << d . left_times << std :: endl;
return d . left_times <= 0 ;
}
void await_suspend (std :: experimental :: coroutine_handle <> h) noexcept {
// 记下来handle以便后面resume用
d . handle = h;
std :: cout << "call await_suspend: " << ( -- d . left_times) << std :: endl;
}
void await_resume () noexcept {
std :: cout << "call await_resume: " << d . left_times << std :: endl;
d . handle = nullptr ;
}
};
然后把整个连起来,完整的例子如下:
Copy #include <iostream>
#include <iomanip>
#include <vector>
#include <memory>
#include <experimental/coroutine>
struct wait_some_times {
int left_times;
std :: experimental :: coroutine_handle <> handle;
wait_some_times ( int t) : left_times (t) , handle ( nullptr ) {}
};
struct suspend_some_times {
wait_some_times & d;
suspend_some_times ( wait_some_times & _d) : d (_d) {}
bool await_ready () noexcept {
std :: cout << "call await_ready: " << d . left_times << std :: endl;
return d . left_times <= 0 ;
}
void await_suspend (std :: experimental :: coroutine_handle <> h) noexcept {
// 记下来handle以便后面resume用
d . handle = h;
std :: cout << "call await_suspend: " << ( -- d . left_times) << std :: endl;
}
void await_resume () noexcept {
std :: cout << "call await_resume: " << d . left_times << std :: endl;
d . handle = nullptr ;
}
};
struct coroutine_task {
struct promise_type {
coroutine_task get_return_object () {
return coroutine_task{};
}
auto initial_suspend () {
return std :: experimental :: suspend_never{};
}
auto final_suspend () {
return std :: experimental :: suspend_never{};
}
void unhandled_exception () {}
void return_void () {}
};
};
auto operator co_await( wait_some_times & x) noexcept {
return suspend_some_times{ x };
}
coroutine_task f ( wait_some_times & waiter) {
std :: cout << "begin to co_await" << std :: endl;
co_await waiter; // 只有前三次会协程切出
co_await waiter;
co_await waiter;
co_await waiter; // 这之后await_ready返回true了,不会再切出
co_await waiter;
std :: cout << "end of corotine" << std :: endl;
}
int main ( int argc , char* argv[]) {
#ifdef __cpp_coroutines
std :: cout << "__cpp_coroutines: " << __cpp_coroutines << std :: endl;
#endif
wait_some_times waiter{ 3 };
f (waiter);
while ( waiter . handle && ! waiter . handle . done ()) {
std :: cout << "about to resume: " << waiter . left_times << std :: endl;
// 这里用传出的handle来恢复切入协程
waiter . handle . resume ();
}
return 0 ;
}
输出如下:
Copy __cpp_coroutines: 201703
begin to co_await
call await_ready: 3
call await_suspend: 2
about to resume: 2
call await_resume: 2
call await_ready: 2
call await_suspend: 1
about to resume: 1
call await_resume: 1
call await_ready: 1
call await_suspend: 0
about to resume: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
end of corotine
关键字 - co_yield
关键字 co_yield
要求实现 COROUTINE_OBJECT::promise_type::yield_value(参数)
。比较贴近于单独一次异步调用的实现。
简单的描述 co_yield VALUE
就是相当于 co_await p.yield_value(VALUE)
。
Copy #include <iostream>
#include <iomanip>
#include <vector>
#include <memory>
#include <experimental/coroutine>
struct test_rpc_generator {
test_rpc_generator ( const test_rpc_generator & ) = delete ;
test_rpc_generator ( test_rpc_generator && other) : coro ( other . coro) {
other . coro = nullptr ;
};
~test_rpc_generator () {
if (coro) {
coro . destroy ();
}
}
struct promise_type ;
using handle = std :: experimental :: coroutine_handle < promise_type >;
struct promise_type {
int* current_value;
static auto get_return_object_on_allocation_failure () {
return test_rpc_generator{ nullptr };
}
auto get_return_object () {
return test_rpc_generator{handle :: from_promise ( * this )};
}
auto initial_suspend () {
current_value = nullptr ;
return std :: experimental :: suspend_never{};
}
auto final_suspend () {
return std :: experimental :: suspend_always{};
}
void unhandled_exception () {
std :: terminate ();
}
void return_void () {
}
auto yield_value ( int* value) {
current_value = value;
return std :: experimental :: suspend_always{};
}
};
int* value () const {
if (coro) {
return coro . promise () . current_value;
}
return 0 ;
}
bool move_next ( int rpc_result) {
if (coro && coro . promise () . current_value) {
* coro . promise () . current_value = rpc_result;
}
return coro ? ( coro . resume () , ! coro . done ()) : false ;
}
bool await_ready () const {
return ! coro || coro . done ();
}
private :
test_rpc_generator ( handle h) : coro (h) {}
handle coro;
};
test_rpc_generator f () {
int rpc_res1 , rpc_res2;
co_yield & rpc_res1;
// _alloca(rpc_res1);
std :: cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << & rpc_res1 << ")" << std :: endl;
co_yield & rpc_res2;
// _alloca(rpc_res2);
std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << ", rpc_res2: "<< rpc_res2 <<"(@"<< &rpc_res2<< ")"<< std::endl;
}
int main ( int argc , char * argv[]) {
#ifdef __cpp_coroutines
std :: cout << "__cpp_coroutines: " << __cpp_coroutines << std :: endl;
#endif
int rpc_fake_data = 1 ;
auto g1 = f ();
auto g2 = f ();
void* detect_addr = malloc ( 4000 );
std :: cout << "detect_addr:" << detect_addr << std :: endl;
free (detect_addr);
for ( bool is_continue = true ; is_continue; is_continue = ( ! g1 . await_ready () || ! g2 . await_ready ())) {
if ( ! g1 . await_ready ()) {
g1 . move_next ( ++ rpc_fake_data);
std :: cout << "g1 value:" << g1 . value () << std :: endl;
}
if ( ! g2 . await_ready ()) {
g2 . move_next ( ++ rpc_fake_data);
std :: cout << "g2 value:" << g2 . value () << std :: endl;
}
}
return 0 ;
}
我这里一次示例输出是:
Copy __cpp_coroutines: 201703
detect_addr:00881BD0
resumed got rpc_res1: 2 ( @0087F96C )
g1 value:0087F980
resumed got rpc_res1: 3 ( @00881B1C )
g2 value:00881B30
resumed got rpc_res1: 2 ( @0087F96C ) , rpc_res2: 4 ( @0087F980 )
g1 value:0087F980
resumed got rpc_res1: 3 ( @00881B1C ) , rpc_res2: 5 ( @00881B30 )
g2 value:00881B30
关键字 - co_return
最后一个是 co_return
。 这个关键字主要是用于直接退出协程函数的, 因为协程函数的返回值是我们自己定义的这个 COROUTINE_OBJECT
, 所以函数逻辑附带的返回值就要用这个关键字来实现。 这个关键字要求实现 COROUTINE_OBJECT::promise_type::return_value(参数)
或者 COROUTINE_OBJECT::promise_type::return_void()
。如果协程函数中有 co_return VALUE
。则是最终调用了 COROUTINE_OBJECT::promise_type::return_value(VALUE)
, 否则是调用 COROUTINE_OBJECT::promise_type::return_void()
。
我们可以把协程函数的最终结果放在这里面来实现转储到某个地方。这部分的代码和 yield 的很像。就不另外贴了,下面贴一个全部整合到一起的吧。
全功能整合到一起
我们来一个功能完整,并且贴近单线程工程并且时候异步IO的实践的例子。
Copy #include <iostream>
#include <iomanip>
#include <vector>
#include <memory>
#include <experimental/coroutine>
static std :: vector < std :: pair <int* , std :: experimental :: coroutine_handle <> > > g_test_rpc_manager;
static int g_test_rpc_fake_data = 0 ;
struct test_rpc_generator {
struct test_rpc_data {
int final_value;
int yield_times;
std :: vector < std :: experimental :: coroutine_handle <> > follower;
};
struct promise_type ;
using data_ptr = std :: shared_ptr < test_rpc_data >;
struct promise_type {
data_ptr data;
static auto get_return_object_on_allocation_failure () {
return test_rpc_generator{ nullptr };
}
auto get_return_object () {
data = std :: make_shared < test_rpc_data >();
if (data) {
data -> final_value = 0 ;
data -> yield_times = 0 ;
}
return test_rpc_generator{ data };
}
auto initial_suspend () {
return std :: experimental :: suspend_never{}; // STL提供了一些自带的awaiter实现,我们其实很多情况下也不需要另外写,直接用STL就好了
}
auto final_suspend () {
return std :: experimental :: suspend_always{}; // 和上面一样,也是STL自带的awaiter实现
}
void unhandled_exception () {
std :: terminate ();
}
// 用以支持 co_return
void return_value ( int v) {
// 最终co_return时保存最终数据
if (data) {
data -> final_value = v;
auto followers = std :: move ( data -> follower);
for ( auto& h : followers) {
h . resume ();
}
}
}
// 用以支持 co_yield
auto yield_value ( int* value) {
// 每次调用都会执行,创建handle用以后面恢复数据
g_test_rpc_manager . emplace_back (std :: make_pair (value , std :: experimental :: coroutine_handle<> :: from_address (
std :: experimental :: coroutine_handle< promise_type > :: from_promise ( * this ) . address ()
)));
if (data) {
++ data -> yield_times;
}
return std :: experimental :: suspend_always{};
}
};
// 下面的接入用侵入式的方式支持 co_await test_rpc_generator
// MSVC 目前支持使用非侵入式的方式实现,但是clang不支持
bool await_ready () noexcept {
return value () > 0 ;
}
void await_resume () {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator resume for " << yield_times() << " time(s)" << std::endl;
}
void await_suspend (std :: experimental :: coroutine_handle <> h) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator yield for " << yield_times() << " time(s), wait for " << h.address() << std::endl;
// 记录要恢复父协程
if (h) {
add_follower (h);
}
}
int value () const {
if (data) {
return data -> final_value;
}
return 0 ;
}
int yield_times () const {
if (data) {
return data -> yield_times;
}
return - 1 ;
}
void add_follower (std :: experimental :: coroutine_handle <> h) {
if (data) {
data -> follower . emplace_back (std :: move (h));
}
}
private :
test_rpc_generator ( data_ptr d) : data (d) {}
data_ptr data;
};
// 异步协程函数
test_rpc_generator f () {
int rpc_res1 , rpc_res2;
co_yield & rpc_res1;
// _alloca(rpc_res1);
std :: cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << & rpc_res1 << ")" << std :: endl;
co_yield & rpc_res2;
// _alloca(rpc_res2);
std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << ", rpc_res2: " << rpc_res2 << "(@" << &rpc_res2 << ")" << std::endl;
// 模拟多次RPC然后返回最终结果
co_return rpc_res1 * 100 + rpc_res2;
}
// 这里模拟生成数据
void test_rpc_manager_run () {
std :: vector < std :: pair <int* , std :: experimental :: coroutine_handle <> > > rpc_manager;
g_test_rpc_manager . swap (rpc_manager);
for ( auto& generator : rpc_manager) {
if ( generator . first) {
* generator . first = ++ g_test_rpc_fake_data;
}
if ( generator . second && ! generator . second . done ()) {
generator . second . resume ();
}
}
}
struct test_task {
using ptr_t = std :: shared_ptr < test_task >;
test_task ( int ms) : status ( 0 ) , max_status (ms) {}
bool is_ready () const noexcept {
return status >= max_status;
}
int status;
int max_status;
};
struct test_task_future {
struct promise_type ;
using ptr_t = std :: shared_ptr < test_task >;
using handle = std :: experimental :: coroutine_handle < promise_type >;
struct promise_type {
static auto get_return_object_on_allocation_failure () {
return test_task_future{ nullptr };
}
auto get_return_object () {
return test_task_future{ handle :: from_promise ( * this ) };
}
auto initial_suspend () {
return std :: experimental :: suspend_never{};
}
auto final_suspend () {
return std :: experimental :: suspend_always{};
}
void unhandled_exception () {
std :: terminate ();
}
void return_void () {
for ( auto& h : follower) {
h . resume ();
}
}
// 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
auto yield_value ( ptr_t t) {
task = t;
return std :: experimental :: suspend_never{};
}
ptr_t task;
std :: vector < handle > follower;
};
// 下面的接入用侵入式的方式支持 co_await test_task::ptr_t
struct awaitable {
awaitable ( const test_task_future & pt) {
if ( pt . coro) {
data = pt . coro . promise () . task;
coro = pt . coro;
}
};
bool await_ready () const {
bool ret = ! data || data -> is_ready ();
if (ret) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " ready" << std::endl;
}
return ret;
}
void await_resume () {
if (data) {
++ data -> status;
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " resume to " << (data ? data->status : -1) << std::endl;
}
}
void await_suspend ( handle h) {
if (data) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " suspend test_task_future::handle from " << (data ? data->status : -1) <<
", wait for " << h . address () << std :: endl;
}
if (coro && h) {
coro . promise () . follower . emplace_back (h);
}
}
ptr_t data;
handle coro;
};
bool done () const {
return ! coro || coro . done ();
}
private :
test_task_future ( handle h) : coro (h) {}
handle coro;
};
// 接入 co_await test_task::ptr_t
auto operator co_await( const test_task_future & pt) noexcept {
return test_task_future :: awaitable{ pt };
}
test_task_future h (test_task :: ptr_t task) {
// 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
co_yield task;
// 模拟任务内部流程并调用外部RPC
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
test_rpc_generator rpc_res = f ();
co_await rpc_res;
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << " call f() ret: " << rpc_res.value() << std::endl;
}
test_task_future g (test_task :: ptr_t task) {
// 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
co_yield task;
// 等待子任务完成
while (task && ! task -> is_ready ()) {
std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
co_await h (task);
}
}
int main ( int argc , char* argv[]) {
#ifdef __cpp_coroutines
std :: cout << "__cpp_coroutines: " << __cpp_coroutines << std :: endl;
#endif
// 创建一个任务
test_task :: ptr_t task = std :: make_shared < test_task >( 3 );
// 运行任务
auto fut = g (task);
// 模拟从外部获取数据然会恢复协程
while ( ! fut . done ()) {
test_rpc_manager_run ();
}
return 0 ;
}
这回贴一下linux内clang的输出吧:
Copy __cpp_coroutines: 201703
g : 280: task 0x1d3c028
h : 268: task 0x1d3c028
await_suspend : 89: test_rpc_generator yield for 1 time ( s ) , wait for 0x1d3c320
await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 0, wait for 0x1d3c040
resumed got rpc_res1: 1 ( @0x1d3c730 )
resumed got rpc_res1: 1 ( @0x1d3c730 ) , rpc_res2: 2 ( @0x1d3c734 )
await_resume : 85: test_rpc_generator resume for 2 time ( s )
h : 271: task 0x1d3c028 call f () ret: 102
await_resume : 229: task 0x1d3c028 resume to 1
g : 280: task 0x1d3c028
h : 268: task 0x1d3c028
await_suspend : 89: test_rpc_generator yield for 1 time ( s ) , wait for 0x1d3c850
await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 1, wait for 0x1d3c040
resumed got rpc_res1: 3 ( @0x1d3cc60 )
resumed got rpc_res1: 3 ( @0x1d3cc60 ) , rpc_res2: 4 ( @0x1d3cc64 )
await_resume : 85: test_rpc_generator resume for 2 time ( s )
h : 271: task 0x1d3c028 call f () ret: 304
await_resume : 229: task 0x1d3c028 resume to 2
g : 280: task 0x1d3c028
h : 268: task 0x1d3c028
await_suspend : 89: test_rpc_generator yield for 1 time ( s ) , wait for 0x1d3cd40
await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 2, wait for 0x1d3c040
resumed got rpc_res1: 5 ( @0x1d3d150 )
resumed got rpc_res1: 5 ( @0x1d3d150 ) , rpc_res2: 6 ( @0x1d3d154 )
await_resume : 85: test_rpc_generator resume for 2 time ( s )
h : 271: task 0x1d3c028 call f () ret: 506
await_resume : 229: task 0x1d3c028 resume to 3
promise/future 支持
我本地测试的Clang版本(7.0.1)尚未实现支持。 MSVC的标准库用偏特化的方式对 std::promise 和 std::future 实现了协程的接入。它的接入代码如下(为了简短,精简掉了一个偏特化实现,流程和不特化的类似):
Copy namespace experimental {
template < class _Ty , class ... _ArgTypes >
struct coroutine_traits<future<_Ty>, _ArgTypes...> { // defines resumable traits for functions returning future<_Ty>
struct promise_type {
promise < _Ty > _MyPromise;
future < _Ty > get_return_object () {
return _MyPromise . get_future ();
}
bool initial_suspend () const {
return false ;
}
bool final_suspend () const {
return false ;
}
template < class _Ut >
void return_value ( _Ut && _Value) {
_MyPromise . set_value (_STD forward < _Ut >(_Value));
}
void set_exception ( exception_ptr _Exc) {
_MyPromise . set_exception (_STD move (_Exc));
}
};
};
} // namespace experimental
template < class _Ty >
bool await_ready ( future < _Ty > & _Fut) {
return _Fut . _Is_ready ();
}
template < class _Ty >
void await_suspend ( future < _Ty > & _Fut ,
experimental :: coroutine_handle <> _ResumeCb) { // change to .then when future gets .then
thread _WaitingThread ([ & _Fut , _ResumeCb] {
_Fut . wait ();
_ResumeCb ();
});
_WaitingThread . detach ();
}
template < class _Ty >
auto await_resume ( future < _Ty > & _Fut) {
return _Fut . get ();
}
可以看到,它的 co_await std::future<T>
挂起是开了个新线程来等待,真他喵暴力,建议不要用。但是还是可以比较容易地让自己的管理器接入 co_await
。
我们借助STL的协程接入, 可以实现一个最小化的自定义协程支持:
Copy #include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <experimental/coroutine>
struct custom_rpc_generator {};
struct suspend_custom_rpc : public std :: experimental :: suspend_always {
void await_suspend (std :: experimental :: coroutine_handle <> h) noexcept {
std :: thread thd{ [h]() {
using namespace std;
std :: this_thread :: sleep_for ( 2 s );
std :: cout << "start to resume coroutine" << std :: endl;
h . resume ();
} };
thd . detach ();
}
};
auto operator co_await( const custom_rpc_generator & ) {
return suspend_custom_rpc{};
}
std :: future < void > outter_fn () {
co_await custom_rpc_generator{};
}
int main () {
auto fut = outter_fn ();
std :: cout << "start to wait future" << std :: endl;
fut . wait ();
std :: cout << "future finished, ready to exit" << std :: endl;
return 0 ;
}
总结
总体感觉上,C++20协程为了兼顾灵活和支持非侵入式接入,设计了好几个互相交织的大模块,函数级有处理协程函数内部的 promise_type
、 协程函数对外暴露的交互对象 (我们这里统称为 future
) 、 用于协程上下文切换的 handle
,单次切换有用于支持await功能的 awaitable
和一些回调函数接口。几个对象之间的数据共享也并不是很方便。而且和传统有栈协程的区别仅仅是约束了返回值类型,并且可以依次在编译期推断出需要多少栈空间,从而减少浪费。
我打算后面有时间尝试对 libcopp 接入C++协程支持,在研究C++协程的时候也想到几个问题。在和 ultramanhu 讨论了一下以后主要的问题也有了一些初步的解决方案的想法,但是目前细节上还是有一些没太想清楚的地方。
首先是如果业务有自己的线程池,其实还是要由管理层来控制resume,不能直接像STL那样开线程,那基本上就和 std::future<T>
say Good-Bye 了。虽然在小心维护的情况下,避免 co_await std::future<T>
也是可以避免STL乱开线程的,但是我觉得一旦使用了,后面就很难控制住。特别是这个C++协程的对象关系互耦合插如此严重的情况下,本身就不容易理解。
第二个问题是调用链。C++协程接口设计是非对称的,我们实际业务中,肯定还是需要对称协程的支持,即子协程结束后能够自动恢复 co_await
它的父协程。这个需要我们自己去实现,上面 std::future<T>
也就是这里是开了个线程去实现的(都开线程了还用协程干啥)。在上面的sample代码中,我是开了一个共享数据区,在 await_suspend 的时候去追加等待链,在 return_void/return_value
的时候去执行父级协程的恢复切入,这里面写成vector是为了支持N个协程await一个协程的情况,实际上用链表会更好一些。这里有个比较“脏”的设计是handle的加入是在 awaitable
执行挂起的时候,而恢复是在 promise_type
的 return_void/return_value
事件里。这就分了两个地方。这里的 awaitable
对 promise_type
和N个协程等一个协程是保持一致的N:1关系。但是 awaitable
仅仅能访问 future
, 要让 promise_type
也能访问 future
的话,一种方法是开一个共享数据块,在 promise_type
创建 future
的时候传进去这样了。这也是上面sample代码里 using data_ptr = std::shared_ptr<test_rpc_data>;
的实现。 MSVC的STL的实现也是这样 promise::get_future()
是使用了promise内部的数据创建的future对象。 另外一种尝试的方法是后面task的,进入协程后先 co_yield
一次,然后 yield_value(VALUE)
函数返回 std::experimental::suspend_never
。这样不会造成协程挂起,并且可以给 promise_type
注入任意数据。不过这样就有个约定式的规范了,也不是很严谨。这方面等后面支持了 promise_type
的带参数构造函数可能可以好一些。
第三个问题是handle提前结束的问题。在看了MSVC的实现,这个handle是可以copy也可以转换的。copy开销也很小,里面只包含一个和 promise_type
地址有关的指针(就是 promise_type
的地址然后加了 align和padding)。比如一个RPC任务,我可能copy一个handle用来在有数据的时候resume,然后我还会copy一个handle在超时的时候强行resume然后走失败流程。现在这种情况,就是需要在 awaitable
的 await_suspend 里添加这两个handle到两个manager里,然后在 await_resume 里要把这两个都移除。不管哪个流程,肯定有一个handle是处于resume回调中的,这还涉及递归调用的问题(删除正在resume过程中的handle)。 我目前的想法是,封装一个handle resumer,让多个事件manager都指向同一个 handle resumer的weak_ptr。这个resumer的生命周期可以和 awaitable
共存,一旦 awaitable
消失,这次的await也就结束了。
libcopp 所有组件都是可拆卸和自定义的,所以剩下还有些细节就是接入哪些东西的哪些接口,如果拆卸掉某些组件之后怎么保证这些接入仍然可用和是否要支持 libcopp 内yield或是await其他C++协程对象。这些在后面结合一些实际应用再取舍吧。
最后,贴一个更详细一点的C++20 Coroutine生成的汇编( https://gist.github.com/owt5008137/aa7b093caddcea5a79f32d0ebf4efa88 )。如果上面有哪些理解不对的地方或者建议,欢迎有兴趣的小伙伴们一起来交流探讨哈。