近期的一个协程流程BUG

最近一直没什么时间整理近期碰到的问题,今天思考了一下之前碰到的一个临时处理的BUG,顺便写点东西清理一下思路。

其实严格来说这个BUG更应该是一个流程试用问题,不过这个问题应该是需要能在协程库里检测并抛出错误来。

libcopp的执行流程

这个BUG涉及最底层context的执行流程,这个协程库切入有两个接口(start和resume),截止目前为止,这两个接口其实是对等的,然后有一个切出接口(yield)。切入的时候需要记录调用者的执行上下文,用于在切出时上下文切出到哪里。问题是各种情况下的这个上下文保存的执行状态。

然后再来看上下文切换的代码,第一部分是首次切入的入口(调用start函数)。

void coroutine_context_base::coroutine_context_callback(::copp::fcontext::transfer_t src_ctx) {
    assert(src_ctx.data);
    if (NULL == src_ctx.data) {
        abort();
        return;
    }

    // copy jump_src_data_t in case it's destroyed later
    jump_src_data_t jump_src = *reinterpret_cast<jump_src_data_t *>(src_ctx.data);

    // this must in a coroutine
    coroutine_context_base *ins_ptr = jump_src.to_co;
    assert(ins_ptr);
    if (NULL == ins_ptr) {
        abort();
        return;
    }

    // update caller of to_co
    ins_ptr->caller_ = src_ctx.fctx;

    // save from_co's fcontext and switch status
    if (UTIL_CONFIG_NULLPTR != jump_src.from_co) {
        jump_src.from_co->callee_ = src_ctx.fctx;
        int from_status = status_t::EN_CRS_RUNNING; // from coroutine change status from running to ready
        jump_src.from_co->status_.compare_exchange_strong(from_status, status_t::EN_CRS_READY);
    }

    // this_coroutine
    set_this_coroutine_context(ins_ptr);

    // run logic code
    ins_ptr->run_and_recv_retcode(jump_src.priv_data);

    ins_ptr->status_.store(status_t::EN_CRS_FINISHED);
    // jump back to caller
    ins_ptr->yield();
}

最重要的是save from_co's fcontext and switch status下面的这一段。这里的作用就是切入后要把跳入前的上下文保存到来源的协程中。

然后是恢复的流程(调用resume/yield函数):

void coroutine_context_base::jump_to(fcontext::fcontext_t &to_fctx, stack_context &from_sctx, stack_context &to_sctx,
                                     jump_src_data_t &jump_transfer) UTIL_CONFIG_NOEXCEPT {

    copp::fcontext::transfer_t res;
    jump_src_data_t *jump_src;
    int from_status;
    bool swap_success;
// can not use any more stack now
// can not initialize those vars here

#ifdef COPP_MACRO_USE_SEGMENTED_STACKS
    assert(&from_sctx != &to_sctx);
    // ROOT->A: jump_transfer.from_co == NULL, jump_transfer.to_co == A, from_sctx == A.caller_stack_, skip backup segments
    // A->B.start(): jump_transfer.from_co == A, jump_transfer.to_co == B, from_sctx == B.caller_stack_, backup segments
    // B.yield()->A: jump_transfer.from_co == B, jump_transfer.to_co == NULL, from_sctx == B.callee_stack_, skip backup segments
    if (UTIL_CONFIG_NULLPTR != jump_transfer.from_co) {
        __splitstack_getcontext(jump_transfer.from_co->callee_stack_.segments_ctx);
        if (&from_sctx != &jump_transfer.from_co->callee_stack_) {
            memcpy(&from_sctx.segments_ctx, &jump_transfer.from_co->callee_stack_.segments_ctx, sizeof(from_sctx.segments_ctx));
        }
    } else {
        __splitstack_getcontext(from_sctx.segments_ctx);
    }
    __splitstack_setcontext(to_sctx.segments_ctx);
#endif
    res = copp::fcontext::copp_jump_fcontext(to_fctx, &jump_transfer);
    if (NULL == res.data) {
        abort();
        return;
    }
    jump_src = reinterpret_cast<jump_src_data_t *>(res.data);
    assert(jump_src);

    /**
     * save from_co's fcontext and switch status
     * we should use from_co in transfer_t, because it may not jump from jump_transfer.to_co
     *
     * if we jump sequence is A->B->C->A.resume(), and if this call is A->B, then
     * jump_src->from_co = C, jump_src->to_co = A, jump_transfer.from_co = A, jump_transfer.to_co = B
     * and now we should save the callee of C and set the caller of A = C
     *
     * if we jump sequence is A->B.yield()->A, and if this call is A->B, then
     * jump_src->from_co = B, jump_src->to_co = NULL, jump_transfer.from_co = A, jump_transfer.to_co = B
     * and now we should save the callee of B and should change the caller of A
     *
     */

    // update caller of to_co if not jump from yield mode
    if (UTIL_CONFIG_NULLPTR != jump_src->to_co) {
        jump_src->to_co->caller_ = res.fctx;
    }

    if (UTIL_CONFIG_NULLPTR != jump_src->from_co) {
        jump_src->from_co->callee_ = res.fctx;
        from_status = jump_src->from_co->status_.load();
        if (status_t::EN_CRS_RUNNING == from_status) {
            jump_src->from_co->status_.compare_exchange_strong(from_status, status_t::EN_CRS_READY);
        } else if (status_t::EN_CRS_FINISHED == from_status) {
            // if in finished status, change it to exited
            jump_src->from_co->status_.store(status_t::EN_CRS_EXITED);
        }
    }

    // private data
    jump_transfer.priv_data = jump_src->priv_data;

    // this_coroutine
    set_this_coroutine_context(jump_transfer.from_co);
    // resume running status of from_co
    if (NULL != jump_transfer.from_co) {
        from_status = jump_transfer.from_co->status_.load();
        swap_success = false;
        while (!swap_success && status_t::EN_CRS_READY == from_status) {
            swap_success = jump_transfer.from_co->status_.compare_exchange_strong(from_status, status_t::EN_CRS_RUNNING);
        }
    }
}

同样,需要关注的部分是update caller of to_co if not jump from yield mode后面的保存上下下文的部分。

所以到这里可以看到第一个麻烦的地方,流程上的两种(start/resume和yield)和上下文切换的两种(首次和后续)并不匹配;第二个麻烦的地方在于只有在切入完成以后才能拿到切入时切入方的上下文。所以导致执行流程比较复杂。

执行流程

首先是正常的执行流程。

外部->协程A(yield)[->外部(resume)->协程A(yield)]...->外部

这时候,首次切入的时候,由首次的回调函数保存上下文。yield时,相当于切入的copp_jump_fcontext完成return,而后由后面的代码完成把上下文保存进协程的工作。resume的时候,相当于yield的copp_jump_fcontext完成return,然后用同样的方式保存上下文。

其次是一个协程启动另一个协程,然后yield回前一个,走类似链式的流程,也是类似栈的流程。

外部->协程A(start)->协程B(yield)->协程A(yield)->外部

这种流程在携程B会记录caller为A的切出点,那么切回时流程也是走上面第一种情况一样的流程。

问题流程

这两种流程最常用,并且已经使用很久了并不会有什么问题。然后第三种情况是最近一个特殊流程导致的。

外部->协程A(start)->协程B(resume)->协程A(yield)->...

按最早的设计,协程A通过resume或者start接口第二次切入后,会更新协程A的caller为协程B(注释里写得状态信息更完整一些,但是考虑的调用链更长)。但是现在精简下调用链和这个执行流程,就不容易发现,这也是不正确的。因为这时候再也切不回最外部的调用者了。

然后我的第一反应是,这样的话可以尝试后续的切入流程不再更新caller,只在第一次切入时更新caller。但是这样也不正确,因为假设协程B第二次通过resume切入协程A的时候,如果没有更新调用信息,那么其实协程A已经运行了一段代码了,而协程B里记录的还是老的值,这样协程B如果使用yield那么也是不对的。

其实我还考虑过,协程的caller保存协程指针而不只是上下文。但是这样也是有问题的,一个是数据保存的流程和生命周期很难管理,其次是碰到复杂的调用流程的话,还是没法通知某些协程通知caller。

解决方案

所以最终最安全的解决方案还是加个flag禁止掉这种调用。即一旦一个协程被切入了,只能通过yield切出。而同理,没有被切入的协程,必须通过start/resume切入。

不过其实会触发这个BUG还有另一个原因,libatbus有一处优化是,如果发送的消息目标是自己,那么会通过递归的方式直接回调接收事件(这也能够使发送给自己的消息少一次copy)。然后这个事件能导致协程调用resume。

但是根据实际的使用情况,自己发给自己的消息是非常少见的,并且对外发豆油一次copy,发给自己减少掉这次copy意义不是很大。另外解包的CPU消耗应该本身比copy要高得多,所以我打算果断时间对libatbus给自己发消息的时序也做一次优化,模拟成异步消息。这样也能解决一些隐性的前期以来问题。

当然有限还是先加libcopp的这个patch和单元测试。也是之前的单元测试没有覆盖到这种情况才导致现在才发现这种流程BUG。

Written with StackEdit.

Last updated