OWenT's blog
  • Introduction
  • About Me
  • 2020
    • 近期对libatapp的一些优化调整(增加服务发现和连接管理,支持yaml等)
    • xresloader转表工具链增加了一些新功能(map,oneof支持,输出矩阵,基于模板引擎的加载代码生成等)
    • 在游戏服务器中使用分布式事务
    • libcopp接入C++20 Coroutine和一些过渡期的设计
    • libatbus 的大幅优化
    • nftables初体验
    • 容器配置开发环境小计
  • 2019
    • PALM Tree - 适合多核并发架构的B+树 - 论文阅读小记
    • 跨平台协程库 - libcopp 简介
    • C++20 Coroutine 性能测试 (附带和libcopp/libco/libgo/goroutine/linux ucontext对比)
    • 尝鲜Github Action
    • 一些xresloader(转表工具)的改进
    • protobuf、flatbuffer、msgpack 针对小数据包的简单对比
    • 协程框架(libcopp) 小幅优化
    • Excel转表工具(xresloader) 增加protobuf插件功能和集成 UnrealEngine 支持
    • Anna(支持任意扩展和超高性能的KV数据库系统)阅读笔记
    • C++20 Coroutine
    • libcopp merge boost.context 1.69.0
    • Google去中心化分布式系统论文三件套(Percolator、Spanner、F1)读后感
    • Rust玩具-企业微信机器人通用服务
  • 2018
    • 使用ELK辅助监控开发测试环境服务质量和问题定位
    • Webpack+vue+boostrap+ejs构建Web版GM工具
    • 2018年的新通用伪随机数算法(xoshiro / xoroshiro)的C++(head only)实现
    • Rust的第二次接触-写个小服务器程序
    • 理解和适配AEAD加密套件
    • atsf4g-co的进化:协程框架v2、对象路由系统和一些其他细节优化
    • 协程框架(libcopp)v2优化、自适应栈池和同类库的Benchmark对比
    • 可执行文件压缩
    • 初识Rust
    • 使用restructedtext编写xresloader文档
    • atframework的etcd模块化重构
    • C++的backtrace
  • 2017
    • ECDH椭圆双曲线(比DH快10倍的密钥交换)算法简介和封装
    • protobuf-net的动态Message实现
    • pbc的proto3接入
    • atgateway内置协议流程优化-加密、算法协商和ECDH
    • 整理一波软件源镜像同步工具+DevOps工具
    • Blog切换到Hugo
    • libcopp v2的第一波优化完成
    • libcopp(v2) vs goroutine性能测试
    • libcopp的线程安全、栈池和merge boost.context 1.64.0
    • GCC 7和LLVM+Clang+libc++abi 4.0的构建脚本
    • libatbus的几个藏得很深的bug
    • 用cmake交叉编译到iOS和Android
    • 开源项目得一些小维护
    • atapp的c binding和c#适配
    • 对象路由系统设计
    • 2016年总结
    • 近期的一个协程流程BUG
  • 2016
    • 重写了llvm+clang+libc++和libc++abi的构建脚本
    • atsf4g完整游戏工程示例
    • atframework基本框架已经完成
    • 游戏服务器的不停服更新
    • 对atbus的小数据包的优化
    • Android和IOS的TLS问题
    • pbc的一个陈年老BUG
    • boost.context-1.61版本的设计模型变化
    • 接入letsencrypt+全面启用HTTP/2
    • 理解Raft算法
    • libatbus基本功能及单元测试终于写完啦
    • 博客文章和文档迁移到gitbook
  • 2015
    • 博客文章和文档迁移到gitbook
    • 给客户端写得LRU缓存
    • 近期活动比较零散
    • 关于BUS通信系统的一些思考(三)
    • 针对Java JIT的优化(转表工具:xresloader)
    • libcopp更新 (merge boost 1.59 context)
    • 小记最近踩得两个C++坑
    • Redis全异步(HA)Driver设计稿
    • Vim常用命令
    • 关于firewalld和systemd的一些命令速记
    • Jenkins(hudson)插件记录
    • 我们的Lua类绑定机制
    • LLVM+Clang+Libcxx+Libcxxabi(3.6)工具链编译(完成自举编译)
    • 回顾2014
    • Android NDK undefined reference to ___tls_get_addr 错误
    • gitlab腾讯企业邮箱配置
  • 2014
    • 回顾2013
    • C++11动态模板参数和type_traits
    • C++又一坑:动态链接库中的全局变量
    • tolua++内存释放坑
    • [转]类似github的框架
    • Lua性能分析
    • 集成Qt Webkit 到cocos2d-x
    • Gitlab环境搭建小计
    • 近期研究VPN的一些记录(OpenVPN,pptp,l2tp)
    • LLVM + Clang + Libcxx + Libcxxabi 工具链编译
    • 关于BUS通信系统的一些思考(二)
    • 关于BUS通信系统的一些思考(一)
    • [libiniloader] Project
    • 记录一些在线编辑器
    • [WP Code Highlight.js] Project
    • 再议 C++ 11 Lambda表达式
    • 基于Chrome插件的开发工具链
    • [ACM] HDU 1006 解题报告
    • Linux 编译安装 GCC 4.9
    • 又碰到了这个解谜游戏,顺带记下地址
    • 简单C++单元测试框架(支持一键切到GTest或Boost.Test)
    • 捣鼓一个协程库
  • 2013
    • std和boost的function与bind实现剖析
    • 不知道是哪一年的腾讯马拉松题目 照片评级 解题报告
    • Lua 挺好用的样子
    • VC和GCC成员函数指针实现的研究(三)
    • VC和GCC成员函数指针实现的研究(二)
    • VC和GCC内成员函数指针实现的研究(一)
    • 一个C++关于成员变量偏移地址的小Trick
    • ptmalloc,tcmalloc和jemalloc内存分配策略研究
    • POJ 2192 Zipper HDU 2059 龟兔赛跑
    • 从Javascript到Typescript到Node.js
    • 网络编程小结
    • 试试Boost.Asio
    • Lnmp yum 安装脚本 (for CentOS)
    • ARM 交叉编译环境搭建
    • Linux 编译安装 GCC 4.8
    • [记录]虚拟硬盘的压缩|磁盘写零
  • 2012
    • Boost.Spirit 初体验
    • “C++的90个坑”-阅读笔记
    • AC自动机
    • C++ 标准过渡期
    • 程序员修炼之道 -- 阅读笔记
    • [转载]狼与哈士奇
    • C++ 新特性学习(八) — 原子操作和多线程库[多工内存模型]
    • C++ 新特性学习(七) — 右值引用
    • 理解Protobuf的数据编码规则
    • 忆往昔ECUST的ACM时代
    • Linux编译安装GCC 4.7
    • JSON显示库 -- showJson (Javascript)
    • C++ 新特性学习(六) — 新的字符串编码和伪随机数
    • C++ 新特性学习(五) — 引用包装、元编程的类型属性和计算函数对象返回类型
    • C++ 新特性学习(四) — Bind和Function
  • 2011
    • C++ 新特性学习(三) — Regex库
    • C++ 新特性学习(二) -- Array、Tuple和Hash库
    • C++ 新特性学习(一) -- 概述+智能指针(smart_ptr)
    • Linux 和 Windows PowerShell 常用工具/命令 记录
    • 非常帅气的Linq to sql
    • 2011 Google Code Jam 小记
    • C++总是很神奇
    • 大学生创新项目[国家级]经费使用记录
    • 常用官方文档整理
    • 我们学校的IPV6很不错嘛
  • 2010
    • 线段树相关问题 (引用 PKU POJ题目) 整理
    • 2010 ACM 赛前笔记
    • POJ PKU 2596 Dice Stacking 解题报告
    • POJ PKU 3631 Cuckoo Hashing 解题报告
    • POJ PKU 1065 Wooden Sticks 3636 Nested Dolls 解题报告
    • HDU 3336 Count the string 解题报告
    • Hash模板 个人模板
    • ZOJ 3309 Search New Posts 解题报告
    • POJ PKU Let's Go to the Movies 解题报告
    • 注册表常用键值意义
    • PKU POJ 1724 ROADS 解题报告
    • 《神奇古今秘方集锦》&《民间秘术大全》
    • PKU POJ 1720 SQUARES 解题报告
    • POJ PKU 2155 Matrix 解题报告
    • PKU POJ 1141 Brackets Sequence 解题报告
    • PKU POJ 2728 Desert King 解题报告
    • PKU POJ 2976 Dropping tests 解题报告
    • PKU POJ 3757 Simple Distributed storage system 解题报告
    • GCD Determinant 解题报告
    • Southeastern European 2008 Sky Code 解题报告
    • HDU HDOJ 3400 Line belt 解题报告
    • 线性筛法求质数(素数)表 及其原理
    • HDU HDOJ 3398 String 解题报告
    • 树状数组模块(个人模板)
    • 浙江理工 省赛总结 team62 By OWenT of Coeus
    • POJ PKU 3659 Cell Phone Network 解题报告
    • USACO 2008 March Gold Cow Jogging 解题报告
    • C#格式化输出(记录)
    • 参加有道难题笔记
    • POJ PKU 2446 Chessboard 解题报告
    • POJ PKU 1986 Distance Queries 解题报告
    • 计算几何算法概览[转载]
    • 关于差分约束(转载)
    • POJ PKU 2826 An Easy Problem?! 解题报告
    • 数论模板(个人模板)
    • 简易四则运算(ACM个人模板)
    • Catalan 数
    • The 35th ACM/ICPC Asia Regional Tianjin Site —— Online Contest 1009 Convex 解题报告
    • JQuery扩展插件--提示信息
    • ACM 计算几何 个人模板
    • 解析网站字符串型参数 Javascript QueryString 操作 TQueryString类
    • POJ PKU 1474 Video Surveillance 解题报告
  • 2009
    • 模式匹配(kmp)个人模板
    • 并查集 模板
    • POJ 3267 The Cow Lexicon 解题报告
    • C/C++语言常用排序算法
    • POJ 2606 Rabbit hunt 2780 Linearity 1118 Lining Up 解题报告
    • 打造最快的Hash表(转) [以暴雪的游戏的Hash为例]
    • ECUST 09年 校赛个人赛第六,七场总结
    • ECUST 09年 校赛个人赛第三场部分解题报告(A,D,F,I)
    • 牛顿迭代解方程 ax^3+bX^2+cx+d=0
    • 09年8月9日 ECUST ACM 练习赛总结
    • 连接最多点直线 (OWenT 个人模板)
    • 点到直线距离 和 线段间最短距离 (OWenT 模板)
    • ECUST 09年 校赛个人训练赛第五场总结
    • ECUST 09年 校赛个人赛第八场(最后一场)总结
    • 09年8月14日 ECUST ACM 练习赛总结
    • 矩阵相关 (增强中)
    • Prime最小生成树(个人模板)
    • 最长单调子序列 复杂度nlog(n)
    • POJ PKU 2549 Sumsets 解题报告
    • POJ PKU 3277 City Horizon 解题报告
    • 我的ACM生涯
    • POJ PKU 2528 Mayor's posters 解题报告
    • POJ PKU 2378 Tree Cutting 解题报告
    • POJ PKU 1990 MooFest 解题报告
Powered by GitBook
On this page
  • 前言
  • C++20 的协程基本原理
  • 协程闭包类型(promise)
  • 关键字 - co_await
  • 关键字 - co_yield
  • 关键字 - co_return
  • 全功能整合到一起
  • promise/future 支持
  • 总结

Was this helpful?

  1. 2019

C++20 Coroutine

PreviousAnna(支持任意扩展和超高性能的KV数据库系统)阅读笔记Nextlibcopp merge boost.context 1.69.0

Last updated 5 years ago

Was this helpful?

前言

最近的新闻里 C++20 已经确认的内容里已经有了协程组件,之前都是粗略看过这个协程草案。最近抽时间更加系统性的看了下接入和实现细节。

我的测试代码都是在MSVC下开启 /await 选项后测试的,在我本地的Linux clang环境中,可以通过 $LLVM_CLANG_PREFIX/bin/clang++ -std=c++2a -O0 -g -ggdb -stdlib=libc++ -fcoroutines-ts -lc++ -lc++abi -Wl,-rpath=$LLVM_CLANG_PREFIX/lib/ test.cpp 编译和运行。

在gcc 10+中,可以使用 g++ -std=c++20 -O0 -g -ggdb -fcoroutines 并把所有的 std::experimental:: 都换成 std:: 之后编译运行。

LLVM+Clang+libc++/libc++abi的编译安装脚本可以参见:

C++20 的协程基本原理

C++20 整个协程体系是 "无栈协程" 的思路,整个功能是需要结合编译器功能和STL来配合实现的。主要就是三个关键字(co_yield 、 co_await 或 co_return)和围绕这三个关键字的接入。无栈协程对API的设计是有要求的,C++20 Coroutine也不例外, 编译器在检测到内部有使用 这三个关键字时会对函数的流程做patch,然后它的返回值类型必须符合你所使用的关键字的规范。这三个关键字的规范要求不太一样,下面会列举。

我原本以为的会放在协程的 awaiter 或者 handle 对象闭包里,然后由编译器分析和对闭包内的各级对象进行扩充的引用(类似Rust的那种实现)。但是在测试的MSVC和Clang的协程流程的过程中发现,实际上还是另外堆上分配空间来保存协程函数的栈上数据,并用这种方式实现Zero-Copy的。协程函数的执行栈和主函数执行栈并不在一个地址段内,这和之前猜想的不太一样。所以,C++20 的协程也不能完全说是 "无栈" ,只是在协程函数中需要能够评估出来它需要多少栈空间存数据,不像有栈协程那样会浪费比较大的地址空间且不利于内存页复用。

同时受限于这种设计,在C++20 的协程函数里,动态栈分配是不受支持的。在MSVC下,如果你使用了动态栈分配的函数 ( _alloca ) ,直接编译就不通过了。而在gcc/clang 下如果你使用了动态栈分配的函数 ( alloca ) ,分配出来的栈地址是不会受到协程的管理(即:多个协程分配出来的地址可能是重合的),在使用的时候用户得自己保证如果涉及协程且如何切出的话,运行结果不受这部分动态长度栈数据的影响,因为可能会被其他协程改掉(简单地说就是动态分配出来的栈只能在 co_yield 和 co_await 之前使用)。

不过我觉得类似GCC动态栈的那种方案可以让它支持动态栈空间,就是在栈溢出的signal里再mmap一段地址进去,按需增大栈空间。但是这玩意性能被诟病,信号和缺页中断都不稳定且栈空间地址分散不利于CPU Cache,估计最后也不会被采纳吧。

我目前看的提案以 为准(还有个使用文档是 )。一旦一个函数被断定为协程函数,那么它会被扩充为如下形式:

2019-09-29: 更新文档 这个 比 完整得多 : 后面会整合进 : 目前还没有太大的变化。

COROUTINE_OBJECT func(args...) {
    try {
        P p(promise_constructor_arguments);
        // 这个P是自己定义的 using P = COROUTINE_OBJECT::promise_type;
        // 文档上说promise_constructor_arguments 是空或者函数的参数的左值传入 args... ,但是目前版本的MSVC还仅支持空参数列表

        COROUTINE_OBJECT r = p.get_return_object();  //             MSVC 1, Clang 2
        co_await p.initial_suspend();                // 初始化接口  MSVC 2, Clang 1
        // 上面这两行是MSVC的顺序,在clang里上面两行的顺序相反

        try {
            // 原函数体 ...
            p.return_void() or p.return_value(RET) // 取决于函数体里有没有 co_return RET
        } catch(...) {
            p.unhandled_exception();    // 未捕获的异常接口
        }

    final_suspend:
        co_await p.final_suspend();     // final suspend point

        return r;
    } catch(...) {
        return COROUTINE_OBJECT::promise_type::get_return_object_on_allocation_failure(); // noexcept
    }
}

关键字 co_await 先简单理解为判定是否需要切出,后面会有更加详细一点的解释。 而协程函数一切的核心都在于上面的 COROUTINE_OBJECT 类型。里面有一些规范, co_yield 和 co_return 涉及 COROUTINE_OBJECT 里的 COROUTINE_OBJECT::promise_type 类型。必须实现某些公共接口和函数功能接口。而返回到外层的 COROUTINE_OBJECT 对象里也需要保存协程的handle(目前是 std::experimental::coroutine_handle<PROMISE_TYPE> ),以用于后续控制协程的上下文切换使用。 同样, 下面的例子因为必须写一个协程入口对象,所以有一部分比较烦杂的必须接入的代码。

结构如下:

协程闭包类型(promise)

要支持协程函数,首先要准备一个包装的类型,里面包含 promise_type ,然后提供基本的创建、维护handle的函数。比如:

struct coroutine_task {
    struct promise_type {
        coroutine_task get_return_object() {
            return coroutine_task{};
        }
        bool initial_suspend() const {
            return false;
        }
        bool final_suspend() const {
            return false;
        }
    };
};

然后,就可以声明使用协程函数 coroutine_task f(); 了。但是要让他转变为协程函数,还需要至少接入一样协程关键字才行。我们先从最基本的 co_await 开始。

关键字 - co_await

关键字 co_await 是一个操作符,所以我们只要实现这个操作符重载就可以实现协程等待任意类型。 当然,返回的类型要求实现 bool await_ready(T&) noexcept 、 void await_suspend(T&, std::experimental::coroutine_handle<T>) noexcept 、 void await_resume(T&) noexcept 这三个接口。比如我们实现一个 co_wait 后结束的结构,可以按下面这种接入方式:

struct wait_some_times {
    int left_times;
    std::experimental::coroutine_handle<> handle;
    wait_some_times(int t) : left_times(t), handle(nullptr) {}
};

struct suspend_some_times {
    wait_some_times& d;
    suspend_some_times(wait_some_times& _d): d(_d) {}
    bool await_ready() noexcept {
        std::cout << "call await_ready: " << d.left_times << std::endl;
        return d.left_times <= 0;
    }

    void await_suspend(std::experimental::coroutine_handle <> h) noexcept {
        // 记下来handle以便后面resume用
        d.handle = h;

        std::cout << "call await_suspend: " << (--d.left_times) << std::endl;
    }

    void await_resume() noexcept {
        std::cout << "call await_resume: " << d.left_times << std::endl;
        d.handle = nullptr;
    }
};

然后把整个连起来,完整的例子如下:

#include <iostream>
#include <iomanip>
#include <vector>

#include <memory>

#include <experimental/coroutine>

struct wait_some_times {
    int left_times;
    std::experimental::coroutine_handle<> handle;
    wait_some_times(int t) : left_times(t), handle(nullptr) {}
};

struct suspend_some_times {
    wait_some_times& d;
    suspend_some_times(wait_some_times& _d) : d(_d) {}
    bool await_ready() noexcept {
        std::cout << "call await_ready: " << d.left_times << std::endl;
        return d.left_times <= 0;
    }

    void await_suspend(std::experimental::coroutine_handle <> h) noexcept {
        // 记下来handle以便后面resume用
        d.handle = h;

        std::cout << "call await_suspend: " << (--d.left_times) << std::endl;
    }

    void await_resume() noexcept {
        std::cout << "call await_resume: " << d.left_times << std::endl;
        d.handle = nullptr;
    }
};

struct coroutine_task {
    struct promise_type {
        coroutine_task get_return_object() {
            return coroutine_task{};
        }
        auto initial_suspend() {
            return std::experimental::suspend_never{};
        }
        auto final_suspend() {
            return std::experimental::suspend_never{};
        }

        void unhandled_exception() {}
        void return_void() {}
    };
};

auto operator co_await(wait_some_times& x) noexcept {
    return suspend_some_times{ x };
}

coroutine_task f(wait_some_times& waiter) {
    std::cout << "begin to co_await" << std::endl;
    co_await waiter; // 只有前三次会协程切出
    co_await waiter;
    co_await waiter;
    co_await waiter; // 这之后await_ready返回true了,不会再切出
    co_await waiter;
    std::cout << "end of corotine" << std::endl;
}

int main(int argc, char* argv[]) {
#ifdef __cpp_coroutines
    std::cout << "__cpp_coroutines: " << __cpp_coroutines << std::endl;
#endif
    wait_some_times waiter{ 3 };
    f(waiter);

    while (waiter.handle && !waiter.handle.done()) {
        std::cout << "about to resume: " << waiter.left_times << std::endl;
        // 这里用传出的handle来恢复切入协程
        waiter.handle.resume();
    }

    return 0;
}

输出如下:

__cpp_coroutines: 201703
begin to co_await
call await_ready: 3
call await_suspend: 2
about to resume: 2
call await_resume: 2
call await_ready: 2
call await_suspend: 1
about to resume: 1
call await_resume: 1
call await_ready: 1
call await_suspend: 0
about to resume: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
call await_ready: 0
call await_resume: 0
end of corotine

关键字 - co_yield

关键字 co_yield 要求实现 COROUTINE_OBJECT::promise_type::yield_value(参数) 。比较贴近于单独一次异步调用的实现。

简单的描述 co_yield VALUE 就是相当于 co_await p.yield_value(VALUE) 。

#include <iostream>
#include <iomanip>
#include <vector>

#include <memory>

#include <experimental/coroutine>

struct test_rpc_generator {
    test_rpc_generator(const test_rpc_generator&) = delete;
    test_rpc_generator(test_rpc_generator&& other): coro(other.coro) {
        other.coro = nullptr;
    };
    ~test_rpc_generator() {
        if (coro) {
            coro.destroy();
        }
    }

    struct promise_type;
    using handle = std::experimental::coroutine_handle<promise_type>;

    struct promise_type {
        int* current_value;
        static auto get_return_object_on_allocation_failure() {
            return test_rpc_generator{nullptr};
        }

        auto get_return_object() {
            return test_rpc_generator{handle::from_promise(*this)};
        }

        auto initial_suspend() {
            current_value = nullptr;
            return std::experimental::suspend_never{};
        }

        auto final_suspend() {
            return std::experimental::suspend_always{};
        }

        void unhandled_exception() {
            std::terminate();
        }

        void return_void() {
        }

        auto yield_value(int* value) {
            current_value = value;
            return std::experimental::suspend_always{};
        }
    };


    int* value() const {
        if (coro) {
            return coro.promise().current_value;
        }

        return 0;
    }

    bool move_next(int rpc_result) {
        if (coro && coro.promise().current_value) {
            *coro.promise().current_value = rpc_result;
        }

        return coro ? (coro.resume(), !coro.done()) : false; 
    }

    bool await_ready() const {
        return !coro || coro.done();
    }

private:
    test_rpc_generator(handle h) : coro(h) {}
    handle coro;
};

test_rpc_generator f() {
    int rpc_res1, rpc_res2;
    co_yield &rpc_res1;
    // _alloca(rpc_res1);
    std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << std::endl;

    co_yield &rpc_res2;
    // _alloca(rpc_res2);
    std::cout<< "resumed got rpc_res1: "<< rpc_res1<< "(@"<< &rpc_res1<< ")" << ", rpc_res2: "<< rpc_res2 <<"(@"<< &rpc_res2<< ")"<< std::endl;
}

int main(int argc, char * argv[]) {
#ifdef __cpp_coroutines
    std::cout<< "__cpp_coroutines: "<< __cpp_coroutines<< std::endl;
#endif

    int rpc_fake_data = 1;
    auto g1 = f();
    auto g2 = f();
    void* detect_addr = malloc(4000);
    std::cout << "detect_addr:" << detect_addr << std::endl;
    free(detect_addr);

    for (bool is_continue = true; is_continue; is_continue = (!g1.await_ready() || !g2.await_ready())) {
        if (!g1.await_ready()) {
            g1.move_next(++ rpc_fake_data);
            std::cout << "g1 value:" << g1.value() << std::endl;
        }

        if (!g2.await_ready()) {
            g2.move_next(++ rpc_fake_data);
            std::cout << "g2 value:" << g2.value() << std::endl;
        }
    }
    return 0;
}

我这里一次示例输出是:

__cpp_coroutines: 201703
detect_addr:00881BD0
resumed got rpc_res1: 2(@0087F96C)
g1 value:0087F980
resumed got rpc_res1: 3(@00881B1C)
g2 value:00881B30
resumed got rpc_res1: 2(@0087F96C), rpc_res2: 4(@0087F980)
g1 value:0087F980
resumed got rpc_res1: 3(@00881B1C), rpc_res2: 5(@00881B30)
g2 value:00881B30

关键字 - co_return

最后一个是 co_return 。 这个关键字主要是用于直接退出协程函数的, 因为协程函数的返回值是我们自己定义的这个 COROUTINE_OBJECT , 所以函数逻辑附带的返回值就要用这个关键字来实现。 这个关键字要求实现 COROUTINE_OBJECT::promise_type::return_value(参数) 或者 COROUTINE_OBJECT::promise_type::return_void() 。如果协程函数中有 co_return VALUE 。则是最终调用了 COROUTINE_OBJECT::promise_type::return_value(VALUE) , 否则是调用 COROUTINE_OBJECT::promise_type::return_void() 。

我们可以把协程函数的最终结果放在这里面来实现转储到某个地方。这部分的代码和 yield 的很像。就不另外贴了,下面贴一个全部整合到一起的吧。

全功能整合到一起

我们来一个功能完整,并且贴近单线程工程并且时候异步IO的实践的例子。

#include <iostream>
#include <iomanip>
#include <vector>

#include <memory>

#include <experimental/coroutine>

static std::vector<std::pair<int*, std::experimental::coroutine_handle<> > > g_test_rpc_manager;
static int g_test_rpc_fake_data = 0;

struct test_rpc_generator {
    struct test_rpc_data {
        int final_value;
        int yield_times;

        std::vector<std::experimental::coroutine_handle<> > follower;
    };

    struct promise_type;
    using data_ptr = std::shared_ptr<test_rpc_data>;

    struct promise_type {
        data_ptr data;
        static auto get_return_object_on_allocation_failure() {
            return test_rpc_generator{ nullptr };
        }

        auto get_return_object() {
            data = std::make_shared<test_rpc_data>();
            if (data) {
                data->final_value = 0;
                data->yield_times = 0;
            }
            return test_rpc_generator{ data };
        }

        auto initial_suspend() {
            return std::experimental::suspend_never{}; // STL提供了一些自带的awaiter实现,我们其实很多情况下也不需要另外写,直接用STL就好了
        }

        auto final_suspend() {
            return std::experimental::suspend_always{}; // 和上面一样,也是STL自带的awaiter实现
        }

        void unhandled_exception() {
            std::terminate();
        }

        // 用以支持 co_return
        void return_value(int v) {
            // 最终co_return时保存最终数据
            if (data) {
                data->final_value = v;

                auto followers = std::move(data->follower);
                for (auto& h : followers) {
                    h.resume();
                }
            }
        }

        // 用以支持 co_yield
        auto yield_value(int* value) {
            // 每次调用都会执行,创建handle用以后面恢复数据
            g_test_rpc_manager.emplace_back(std::make_pair(value, std::experimental::coroutine_handle<>::from_address(
                std::experimental::coroutine_handle<promise_type>::from_promise(*this).address()
            )));

            if (data) {
                ++data->yield_times;
            }

            return std::experimental::suspend_always{};
        }
    };

    // 下面的接入用侵入式的方式支持 co_await test_rpc_generator
    // MSVC 目前支持使用非侵入式的方式实现,但是clang不支持
    bool await_ready() noexcept {
        return value() > 0;
    }

    void await_resume() {
        std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator resume for " << yield_times() << " time(s)" << std::endl;
    }

    void await_suspend(std::experimental::coroutine_handle<> h) {
        std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": test_rpc_generator yield for " << yield_times() << " time(s), wait for " << h.address() << std::endl;

        // 记录要恢复父协程
        if (h) {
            add_follower(h);
        }
    }

    int value() const {
        if (data) {
            return data->final_value;
        }
        return 0;
    }

    int yield_times() const {
        if (data) {
            return data->yield_times;
        }

        return -1;
    }

    void add_follower(std::experimental::coroutine_handle<> h) {
        if (data) {
            data->follower.emplace_back(std::move(h));
        }
    }
private:
    test_rpc_generator(data_ptr d) : data(d) {}
    data_ptr data;
};

// 异步协程函数
test_rpc_generator f() {
    int rpc_res1, rpc_res2;
    co_yield &rpc_res1;
    // _alloca(rpc_res1);
    std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << std::endl;

    co_yield &rpc_res2;
    // _alloca(rpc_res2);
    std::cout << "resumed got rpc_res1: " << rpc_res1 << "(@" << &rpc_res1 << ")" << ", rpc_res2: " << rpc_res2 << "(@" << &rpc_res2 << ")" << std::endl;

    // 模拟多次RPC然后返回最终结果
    co_return rpc_res1 * 100 + rpc_res2;
}

// 这里模拟生成数据
void test_rpc_manager_run() {
    std::vector<std::pair<int*, std::experimental::coroutine_handle<> > > rpc_manager;
    g_test_rpc_manager.swap(rpc_manager);

    for (auto& generator : rpc_manager) {
        if (generator.first) {
            *generator.first = ++g_test_rpc_fake_data;
        }

        if (generator.second && !generator.second.done()) {
            generator.second.resume();
        }
    }
}

struct test_task {
    using ptr_t = std::shared_ptr<test_task>;

    test_task(int ms) : status(0), max_status(ms) {}

    bool is_ready() const noexcept {
        return status >= max_status;
    }

    int status;
    int max_status;
};

struct test_task_future {
    struct promise_type;
    using ptr_t = std::shared_ptr<test_task>;
    using handle = std::experimental::coroutine_handle<promise_type>;

    struct promise_type {
        static auto get_return_object_on_allocation_failure() {
            return test_task_future{ nullptr };
        }

        auto get_return_object() {
            return test_task_future{ handle::from_promise(*this) };
        }

        auto initial_suspend() {
            return std::experimental::suspend_never{};
        }

        auto final_suspend() {
            return std::experimental::suspend_always{};
        }

        void unhandled_exception() {
            std::terminate();
        }

        void return_void() {
            for (auto& h : follower) {
                h.resume();
            }
        }

        // 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
        auto yield_value(ptr_t t) {
            task = t;
            return std::experimental::suspend_never{};
        }

        ptr_t task;
        std::vector<handle> follower;
    };

    // 下面的接入用侵入式的方式支持 co_await test_task::ptr_t
    struct awaitable {
        awaitable(const test_task_future& pt) {
            if (pt.coro) {
                data = pt.coro.promise().task;
                coro = pt.coro;
            }
        };

        bool await_ready() const {
            bool ret = !data || data->is_ready();
            if (ret) {
                std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " ready" << std::endl;
            }

            return ret;
        }

        void await_resume() {
            if (data) {
                ++data->status;
                std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " resume to " << (data ? data->status : -1) << std::endl;
            }
        }

        void await_suspend(handle h) {
            if (data) {
                std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << data.get() << " suspend test_task_future::handle from " << (data ? data->status : -1) <<
                    ", wait for " << h.address() << std::endl;

            }

            if (coro && h) {
                coro.promise().follower.emplace_back(h);
            }
        }

        ptr_t data;
        handle coro;
    };

    bool done() const {
        return !coro || coro.done();
    }

private:
    test_task_future(handle h) : coro(h) {}
    handle coro;
};

// 接入 co_await test_task::ptr_t
auto operator co_await(const test_task_future & pt) noexcept {
    return test_task_future::awaitable{ pt };
}

test_task_future h(test_task::ptr_t task) {
    // 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
    co_yield task;

    // 模拟任务内部流程并调用外部RPC
    std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
    test_rpc_generator rpc_res = f();
    co_await rpc_res;
    std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << " call f() ret: " << rpc_res.value() << std::endl;
}

test_task_future g(test_task::ptr_t task) {
    // 这里是为了把task::ptr_t数据关联进去,目前版本不支持promise构造函数参数,没想到什么其他好方法
    co_yield task;

    // 等待子任务完成
    while (task && !task->is_ready()) {
        std::cout << std::setw(32) << __FUNCTION__ << std::setw(3) << ": " << __LINE__ << ": task " << task.get() << std::endl;
        co_await h(task);
    }
}

int main(int argc, char* argv[]) {
#ifdef __cpp_coroutines
    std::cout << "__cpp_coroutines: " << __cpp_coroutines << std::endl;
#endif

    // 创建一个任务
    test_task::ptr_t task = std::make_shared<test_task>(3);
    // 运行任务
    auto fut = g(task);
    // 模拟从外部获取数据然会恢复协程
    while (!fut.done()) {
        test_rpc_manager_run();
    }

    return 0;
}

这回贴一下linux内clang的输出吧:

__cpp_coroutines: 201703
                               g : 280: task 0x1d3c028
                               h : 268: task 0x1d3c028
                   await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3c320
                   await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 0, wait for 0x1d3c040
resumed got rpc_res1: 1(@0x1d3c730)
resumed got rpc_res1: 1(@0x1d3c730), rpc_res2: 2(@0x1d3c734)
                    await_resume : 85: test_rpc_generator resume for 2 time(s)
                               h : 271: task 0x1d3c028 call f() ret: 102
                    await_resume : 229: task 0x1d3c028 resume to 1
                               g : 280: task 0x1d3c028
                               h : 268: task 0x1d3c028
                   await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3c850
                   await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 1, wait for 0x1d3c040
resumed got rpc_res1: 3(@0x1d3cc60)
resumed got rpc_res1: 3(@0x1d3cc60), rpc_res2: 4(@0x1d3cc64)
                    await_resume : 85: test_rpc_generator resume for 2 time(s)
                               h : 271: task 0x1d3c028 call f() ret: 304
                    await_resume : 229: task 0x1d3c028 resume to 2
                               g : 280: task 0x1d3c028
                               h : 268: task 0x1d3c028
                   await_suspend : 89: test_rpc_generator yield for 1 time(s), wait for 0x1d3cd40
                   await_suspend : 235: task 0x1d3c028 suspend test_task_future::handle from 2, wait for 0x1d3c040
resumed got rpc_res1: 5(@0x1d3d150)
resumed got rpc_res1: 5(@0x1d3d150), rpc_res2: 6(@0x1d3d154)
                    await_resume : 85: test_rpc_generator resume for 2 time(s)
                               h : 271: task 0x1d3c028 call f() ret: 506
                    await_resume : 229: task 0x1d3c028 resume to 3

promise/future 支持

我本地测试的Clang版本(7.0.1)尚未实现支持。 MSVC的标准库用偏特化的方式对 std::promise 和 std::future 实现了协程的接入。它的接入代码如下(为了简短,精简掉了一个偏特化实现,流程和不特化的类似):

namespace experimental {
    template <class _Ty, class... _ArgTypes>
    struct coroutine_traits<future<_Ty>, _ArgTypes...> { // defines resumable traits for functions returning future<_Ty>
        struct promise_type {
            promise<_Ty> _MyPromise;

            future<_Ty> get_return_object() {
                return _MyPromise.get_future();
            }

            bool initial_suspend() const {
                return false;
            }

            bool final_suspend() const {
                return false;
            }

            template <class _Ut>
            void return_value(_Ut&& _Value) {
                _MyPromise.set_value(_STD forward<_Ut>(_Value));
            }

            void set_exception(exception_ptr _Exc) {
                _MyPromise.set_exception(_STD move(_Exc));
            }
        };
    };
} // namespace experimental

template <class _Ty>
bool await_ready(future<_Ty>& _Fut) {
    return _Fut._Is_ready();
}

template <class _Ty>
void await_suspend(future<_Ty>& _Fut,
    experimental::coroutine_handle<> _ResumeCb) { // change to .then when future gets .then
    thread _WaitingThread([&_Fut, _ResumeCb] {
        _Fut.wait();
        _ResumeCb();
    });
    _WaitingThread.detach();
}

template <class _Ty>
auto await_resume(future<_Ty>& _Fut) {
    return _Fut.get();
}

可以看到,它的 co_await std::future<T> 挂起是开了个新线程来等待,真他喵暴力,建议不要用。但是还是可以比较容易地让自己的管理器接入 co_await 。

我们借助STL的协程接入, 可以实现一个最小化的自定义协程支持:

#include <iostream>
#include <future>
#include <thread>
#include <chrono>

#include <experimental/coroutine>

struct custom_rpc_generator {};

struct suspend_custom_rpc : public std::experimental::suspend_always {
    void await_suspend(std::experimental::coroutine_handle<> h) noexcept {
        std::thread thd{ [h]() {
            using namespace std;
            std::this_thread::sleep_for(2s);
            std::cout << "start to resume coroutine" << std::endl;
            h.resume();
        } };

        thd.detach();
    }
};

auto operator co_await(const custom_rpc_generator&) {
    return suspend_custom_rpc{};
}

std::future<void> outter_fn() {
    co_await custom_rpc_generator{};
}

int main() {
    auto fut = outter_fn();
    std::cout << "start to wait future" << std::endl;
    fut.wait();
    std::cout << "future finished, ready to exit" << std::endl;
    return 0;
}

总结

总体感觉上,C++20协程为了兼顾灵活和支持非侵入式接入,设计了好几个互相交织的大模块,函数级有处理协程函数内部的 promise_type 、 协程函数对外暴露的交互对象 (我们这里统称为 future) 、 用于协程上下文切换的 handle ,单次切换有用于支持await功能的 awaitable 和一些回调函数接口。几个对象之间的数据共享也并不是很方便。而且和传统有栈协程的区别仅仅是约束了返回值类型,并且可以依次在编译期推断出需要多少栈空间,从而减少浪费。

首先是如果业务有自己的线程池,其实还是要由管理层来控制resume,不能直接像STL那样开线程,那基本上就和 std::future<T> say Good-Bye 了。虽然在小心维护的情况下,避免 co_await std::future<T> 也是可以避免STL乱开线程的,但是我觉得一旦使用了,后面就很难控制住。特别是这个C++协程的对象关系互耦合插如此严重的情况下,本身就不容易理解。

第二个问题是调用链。C++协程接口设计是非对称的,我们实际业务中,肯定还是需要对称协程的支持,即子协程结束后能够自动恢复 co_await 它的父协程。这个需要我们自己去实现,上面 std::future<T> 也就是这里是开了个线程去实现的(都开线程了还用协程干啥)。在上面的sample代码中,我是开了一个共享数据区,在 await_suspend 的时候去追加等待链,在 return_void/return_value 的时候去执行父级协程的恢复切入,这里面写成vector是为了支持N个协程await一个协程的情况,实际上用链表会更好一些。这里有个比较“脏”的设计是handle的加入是在 awaitable 执行挂起的时候,而恢复是在 promise_type 的 return_void/return_value 事件里。这就分了两个地方。这里的 awaitable 对 promise_type 和N个协程等一个协程是保持一致的N:1关系。但是 awaitable 仅仅能访问 future , 要让 promise_type 也能访问 future 的话,一种方法是开一个共享数据块,在 promise_type 创建 future 的时候传进去这样了。这也是上面sample代码里 using data_ptr = std::shared_ptr<test_rpc_data>; 的实现。 MSVC的STL的实现也是这样 promise::get_future() 是使用了promise内部的数据创建的future对象。 另外一种尝试的方法是后面task的,进入协程后先 co_yield 一次,然后 yield_value(VALUE) 函数返回 std::experimental::suspend_never。这样不会造成协程挂起,并且可以给 promise_type 注入任意数据。不过这样就有个约定式的规范了,也不是很严谨。这方面等后面支持了 promise_type 的带参数构造函数可能可以好一些。

第三个问题是handle提前结束的问题。在看了MSVC的实现,这个handle是可以copy也可以转换的。copy开销也很小,里面只包含一个和 promise_type 地址有关的指针(就是 promise_type 的地址然后加了 align和padding)。比如一个RPC任务,我可能copy一个handle用来在有数据的时候resume,然后我还会copy一个handle在超时的时候强行resume然后走失败流程。现在这种情况,就是需要在 awaitable 的 await_suspend 里添加这两个handle到两个manager里,然后在 await_resume 里要把这两个都移除。不管哪个流程,肯定有一个handle是处于resume回调中的,这还涉及递归调用的问题(删除正在resume过程中的handle)。 我目前的想法是,封装一个handle resumer,让多个事件manager都指向同一个 handle resumer的weak_ptr。这个resumer的生命周期可以和 awaitable 共存,一旦 awaitable 消失,这次的await也就结束了。

我打算后面有时间尝试对 接入C++协程支持,在研究C++协程的时候也想到几个问题。在和 讨论了一下以后主要的问题也有了一些初步的解决方案的想法,但是目前细节上还是有一些没太想清楚的地方。

所有组件都是可拆卸和自定义的,所以剩下还有些细节就是接入哪些东西的哪些接口,如果拆卸掉某些组件之后怎么保证这些接入仍然可用和是否要支持 内yield或是await其他C++协程对象。这些在后面结合一些实际应用再取舍吧。

最后,贴一个更详细一点的C++20 Coroutine生成的汇编( )。如果上面有哪些理解不对的地方或者建议,欢迎有兴趣的小伙伴们一起来交流探讨哈。

https://github.com/owent-utils/bash-shell/tree/master/LLVM%26Clang Installer/7.0/
N4736
p0973r0
https://en.cppreference.com/w/cpp/language/coroutines
N4775
N4736
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4775.pdf
P0912R5
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2019/p0912r5.html
libcopp
ultramanhu
libcopp
libcopp
https://gist.github.com/owt5008137/aa7b093caddcea5a79f32d0ebf4efa88
1904-01.png