OWenT's blog
  • Introduction
  • About Me
  • 2020
    • 近期对libatapp的一些优化调整(增加服务发现和连接管理,支持yaml等)
    • xresloader转表工具链增加了一些新功能(map,oneof支持,输出矩阵,基于模板引擎的加载代码生成等)
    • 在游戏服务器中使用分布式事务
    • libcopp接入C++20 Coroutine和一些过渡期的设计
    • libatbus 的大幅优化
    • nftables初体验
    • 容器配置开发环境小计
  • 2019
    • PALM Tree - 适合多核并发架构的B+树 - 论文阅读小记
    • 跨平台协程库 - libcopp 简介
    • C++20 Coroutine 性能测试 (附带和libcopp/libco/libgo/goroutine/linux ucontext对比)
    • 尝鲜Github Action
    • 一些xresloader(转表工具)的改进
    • protobuf、flatbuffer、msgpack 针对小数据包的简单对比
    • 协程框架(libcopp) 小幅优化
    • Excel转表工具(xresloader) 增加protobuf插件功能和集成 UnrealEngine 支持
    • Anna(支持任意扩展和超高性能的KV数据库系统)阅读笔记
    • C++20 Coroutine
    • libcopp merge boost.context 1.69.0
    • Google去中心化分布式系统论文三件套(Percolator、Spanner、F1)读后感
    • Rust玩具-企业微信机器人通用服务
  • 2018
    • 使用ELK辅助监控开发测试环境服务质量和问题定位
    • Webpack+vue+boostrap+ejs构建Web版GM工具
    • 2018年的新通用伪随机数算法(xoshiro / xoroshiro)的C++(head only)实现
    • Rust的第二次接触-写个小服务器程序
    • 理解和适配AEAD加密套件
    • atsf4g-co的进化:协程框架v2、对象路由系统和一些其他细节优化
    • 协程框架(libcopp)v2优化、自适应栈池和同类库的Benchmark对比
    • 可执行文件压缩
    • 初识Rust
    • 使用restructedtext编写xresloader文档
    • atframework的etcd模块化重构
    • C++的backtrace
  • 2017
    • ECDH椭圆双曲线(比DH快10倍的密钥交换)算法简介和封装
    • protobuf-net的动态Message实现
    • pbc的proto3接入
    • atgateway内置协议流程优化-加密、算法协商和ECDH
    • 整理一波软件源镜像同步工具+DevOps工具
    • Blog切换到Hugo
    • libcopp v2的第一波优化完成
    • libcopp(v2) vs goroutine性能测试
    • libcopp的线程安全、栈池和merge boost.context 1.64.0
    • GCC 7和LLVM+Clang+libc++abi 4.0的构建脚本
    • libatbus的几个藏得很深的bug
    • 用cmake交叉编译到iOS和Android
    • 开源项目得一些小维护
    • atapp的c binding和c#适配
    • 对象路由系统设计
    • 2016年总结
    • 近期的一个协程流程BUG
  • 2016
    • 重写了llvm+clang+libc++和libc++abi的构建脚本
    • atsf4g完整游戏工程示例
    • atframework基本框架已经完成
    • 游戏服务器的不停服更新
    • 对atbus的小数据包的优化
    • Android和IOS的TLS问题
    • pbc的一个陈年老BUG
    • boost.context-1.61版本的设计模型变化
    • 接入letsencrypt+全面启用HTTP/2
    • 理解Raft算法
    • libatbus基本功能及单元测试终于写完啦
    • 博客文章和文档迁移到gitbook
  • 2015
    • 博客文章和文档迁移到gitbook
    • 给客户端写得LRU缓存
    • 近期活动比较零散
    • 关于BUS通信系统的一些思考(三)
    • 针对Java JIT的优化(转表工具:xresloader)
    • libcopp更新 (merge boost 1.59 context)
    • 小记最近踩得两个C++坑
    • Redis全异步(HA)Driver设计稿
    • Vim常用命令
    • 关于firewalld和systemd的一些命令速记
    • Jenkins(hudson)插件记录
    • 我们的Lua类绑定机制
    • LLVM+Clang+Libcxx+Libcxxabi(3.6)工具链编译(完成自举编译)
    • 回顾2014
    • Android NDK undefined reference to ___tls_get_addr 错误
    • gitlab腾讯企业邮箱配置
  • 2014
    • 回顾2013
    • C++11动态模板参数和type_traits
    • C++又一坑:动态链接库中的全局变量
    • tolua++内存释放坑
    • [转]类似github的框架
    • Lua性能分析
    • 集成Qt Webkit 到cocos2d-x
    • Gitlab环境搭建小计
    • 近期研究VPN的一些记录(OpenVPN,pptp,l2tp)
    • LLVM + Clang + Libcxx + Libcxxabi 工具链编译
    • 关于BUS通信系统的一些思考(二)
    • 关于BUS通信系统的一些思考(一)
    • [libiniloader] Project
    • 记录一些在线编辑器
    • [WP Code Highlight.js] Project
    • 再议 C++ 11 Lambda表达式
    • 基于Chrome插件的开发工具链
    • [ACM] HDU 1006 解题报告
    • Linux 编译安装 GCC 4.9
    • 又碰到了这个解谜游戏,顺带记下地址
    • 简单C++单元测试框架(支持一键切到GTest或Boost.Test)
    • 捣鼓一个协程库
  • 2013
    • std和boost的function与bind实现剖析
    • 不知道是哪一年的腾讯马拉松题目 照片评级 解题报告
    • Lua 挺好用的样子
    • VC和GCC成员函数指针实现的研究(三)
    • VC和GCC成员函数指针实现的研究(二)
    • VC和GCC内成员函数指针实现的研究(一)
    • 一个C++关于成员变量偏移地址的小Trick
    • ptmalloc,tcmalloc和jemalloc内存分配策略研究
    • POJ 2192 Zipper HDU 2059 龟兔赛跑
    • 从Javascript到Typescript到Node.js
    • 网络编程小结
    • 试试Boost.Asio
    • Lnmp yum 安装脚本 (for CentOS)
    • ARM 交叉编译环境搭建
    • Linux 编译安装 GCC 4.8
    • [记录]虚拟硬盘的压缩|磁盘写零
  • 2012
    • Boost.Spirit 初体验
    • “C++的90个坑”-阅读笔记
    • AC自动机
    • C++ 标准过渡期
    • 程序员修炼之道 -- 阅读笔记
    • [转载]狼与哈士奇
    • C++ 新特性学习(八) — 原子操作和多线程库[多工内存模型]
    • C++ 新特性学习(七) — 右值引用
    • 理解Protobuf的数据编码规则
    • 忆往昔ECUST的ACM时代
    • Linux编译安装GCC 4.7
    • JSON显示库 -- showJson (Javascript)
    • C++ 新特性学习(六) — 新的字符串编码和伪随机数
    • C++ 新特性学习(五) — 引用包装、元编程的类型属性和计算函数对象返回类型
    • C++ 新特性学习(四) — Bind和Function
  • 2011
    • C++ 新特性学习(三) — Regex库
    • C++ 新特性学习(二) -- Array、Tuple和Hash库
    • C++ 新特性学习(一) -- 概述+智能指针(smart_ptr)
    • Linux 和 Windows PowerShell 常用工具/命令 记录
    • 非常帅气的Linq to sql
    • 2011 Google Code Jam 小记
    • C++总是很神奇
    • 大学生创新项目[国家级]经费使用记录
    • 常用官方文档整理
    • 我们学校的IPV6很不错嘛
  • 2010
    • 线段树相关问题 (引用 PKU POJ题目) 整理
    • 2010 ACM 赛前笔记
    • POJ PKU 2596 Dice Stacking 解题报告
    • POJ PKU 3631 Cuckoo Hashing 解题报告
    • POJ PKU 1065 Wooden Sticks 3636 Nested Dolls 解题报告
    • HDU 3336 Count the string 解题报告
    • Hash模板 个人模板
    • ZOJ 3309 Search New Posts 解题报告
    • POJ PKU Let's Go to the Movies 解题报告
    • 注册表常用键值意义
    • PKU POJ 1724 ROADS 解题报告
    • 《神奇古今秘方集锦》&《民间秘术大全》
    • PKU POJ 1720 SQUARES 解题报告
    • POJ PKU 2155 Matrix 解题报告
    • PKU POJ 1141 Brackets Sequence 解题报告
    • PKU POJ 2728 Desert King 解题报告
    • PKU POJ 2976 Dropping tests 解题报告
    • PKU POJ 3757 Simple Distributed storage system 解题报告
    • GCD Determinant 解题报告
    • Southeastern European 2008 Sky Code 解题报告
    • HDU HDOJ 3400 Line belt 解题报告
    • 线性筛法求质数(素数)表 及其原理
    • HDU HDOJ 3398 String 解题报告
    • 树状数组模块(个人模板)
    • 浙江理工 省赛总结 team62 By OWenT of Coeus
    • POJ PKU 3659 Cell Phone Network 解题报告
    • USACO 2008 March Gold Cow Jogging 解题报告
    • C#格式化输出(记录)
    • 参加有道难题笔记
    • POJ PKU 2446 Chessboard 解题报告
    • POJ PKU 1986 Distance Queries 解题报告
    • 计算几何算法概览[转载]
    • 关于差分约束(转载)
    • POJ PKU 2826 An Easy Problem?! 解题报告
    • 数论模板(个人模板)
    • 简易四则运算(ACM个人模板)
    • Catalan 数
    • The 35th ACM/ICPC Asia Regional Tianjin Site —— Online Contest 1009 Convex 解题报告
    • JQuery扩展插件--提示信息
    • ACM 计算几何 个人模板
    • 解析网站字符串型参数 Javascript QueryString 操作 TQueryString类
    • POJ PKU 1474 Video Surveillance 解题报告
  • 2009
    • 模式匹配(kmp)个人模板
    • 并查集 模板
    • POJ 3267 The Cow Lexicon 解题报告
    • C/C++语言常用排序算法
    • POJ 2606 Rabbit hunt 2780 Linearity 1118 Lining Up 解题报告
    • 打造最快的Hash表(转) [以暴雪的游戏的Hash为例]
    • ECUST 09年 校赛个人赛第六,七场总结
    • ECUST 09年 校赛个人赛第三场部分解题报告(A,D,F,I)
    • 牛顿迭代解方程 ax^3+bX^2+cx+d=0
    • 09年8月9日 ECUST ACM 练习赛总结
    • 连接最多点直线 (OWenT 个人模板)
    • 点到直线距离 和 线段间最短距离 (OWenT 模板)
    • ECUST 09年 校赛个人训练赛第五场总结
    • ECUST 09年 校赛个人赛第八场(最后一场)总结
    • 09年8月14日 ECUST ACM 练习赛总结
    • 矩阵相关 (增强中)
    • Prime最小生成树(个人模板)
    • 最长单调子序列 复杂度nlog(n)
    • POJ PKU 2549 Sumsets 解题报告
    • POJ PKU 3277 City Horizon 解题报告
    • 我的ACM生涯
    • POJ PKU 2528 Mayor's posters 解题报告
    • POJ PKU 2378 Tree Cutting 解题报告
    • POJ PKU 1990 MooFest 解题报告
Powered by GitBook
On this page
  • Boost.Asio 依赖项:
  • 先来个简单的,系统信号量 Signal控制:
  • 第二个尝试,网络IO:
  • 小试牛的第三刀,定时器Timer:
  • 额外功能:
  • 设备文件支持
  • Unix系统功能
  • SSL支持
  • 简要性能测试

Was this helpful?

  1. 2013

试试Boost.Asio

慢慢一点一点看看Boost,这段时间就Asio库吧。 据说这货和libevent的效率差不多,但是Boost的平台兼容性,你懂得。还有它帮忙干掉了很多线程安全和线程分发的事情。

Boost.Asio 依赖项:

  1. Boost.System (所以它必须链接boost_system)

  2. [可选] 如果使用read_until() or async_read_until() 函数,则依赖Boost.Regex(boost_regex)

  3. [可选] SSL功能依赖OpenSSL

先来个简单的,系统信号量 Signal控制:

使用ASIO操作信号量有一个注意事项,不允许再使用其他库或工具管理信号量(如signal() 或 sigaction()函数)

#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <vector>

#include <boost/asio.hpp>
#include <boost/bind.hpp>

void signal_handler(
    boost::asio::signal_set& stSigs,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }

    std::cout<< "Catch a signal: "<< signal_number<< std::endl;

    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stSigs), _1, _2));
}

void signal_handler2(
    boost::asio::signal_set& stSigs,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }

    std::cout<< "Catch a signal - handle 2: "<< signal_number<< std::endl;

    stSigs.async_wait(boost::bind(signal_handler2, boost::ref(stSigs), _1, _2));
}

int main(int argc, char* argv[]) {

    boost::asio::io_service stMainService;
    boost::asio::signal_set stSigs(stMainService);

    stSigs.add(SIGINT);
    stSigs.add(SIGTERM);

    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stSigs), _1, _2));
    stSigs.async_wait(boost::bind(signal_handler2, boost::ref(stSigs), _1, _2));

    stMainService.run();

    return 0;
}

So easy吧,可以一次注册多个handler。 需要注意的是每次触发signal的handler后,handler就被取消了,需要重新注册一次。否则下一次就不会跳到这个handler了

第二个尝试,网络IO:

按照文档描述,除非使用宏来禁止功能,网络IO在不同的环境下采用了不同的实现方式:

  1. Windows: IOCP

  2. Linux: epoll

  3. Mac OS X, FreeBSD, NetBSD, OpenBSD: kqueue

  4. Solaris: /dev/poll

#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>

#include <boost/asio.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/atomic.hpp>


void signal_handler(
    boost::asio::io_service& stMainService,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }

    std::cout<< "Catch a signal: "<< signal_number<< std::endl;

    stMainService.stop();
}


/** 服务器 socket连接表 **/
std::map<boost::asio::ip::tcp::socket::native_handle_type, 
    std::pair<
        boost::shared_ptr<boost::asio::ip::tcp::socket>, 
        boost::shared_ptr<boost::asio::streambuf>
    >
> g_stServerSockMap;

boost::atomic_int32_t g_iMaxClientNumber, g_iCurClientNumber;

/**
 * 服务器异步发送数据回调函数
 * @param [in] ptrBuffStr 发送的数据buff(传过来仅是为了给智能指针计数+1,防止释放数据的)
 * @param [in] error 错误信息
 * @param [in] bytes_transferred 发送的数据大小
 */
void server_thread_send_handler(
    boost::shared_ptr<std::string> ptrBuffStr,
    const boost::system::error_code& error, // Result of operation.
    std::size_t bytes_transferred
    )
{
    if (error)
    {
        std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Send Failed: "<< error.message()<< std::endl;
    }
}

/**
 * 服务器异步接收数据回调函数
 * @param [in] ptrCurSock 收取数据的Socket
 * @param [in] ptrSockStreamBuff 收取数据的Buff对象
 * @param [in] error 错误信息
 */
void server_thread_recv_handler(
    boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock,
    boost::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff,
    const boost::system::error_code& error // Result of operation.
    )
{
    // 5.2.1 先接收buff长度
    if (ptrSockStreamBuff->size() == sizeof(size_t))
    {
        const size_t* pLen = boost::asio::buffer_cast<const size_t*>(ptrSockStreamBuff->data());
        boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(*pLen), boost::bind(
            server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
        ));

        // 移出socket buff
        ptrSockStreamBuff->consume(sizeof(size_t));
        return;
    }

    // 5.2.2 再接收buff数据
    if (ptrSockStreamBuff->size() > 0)
    {
        // 5.2.3 处理数据
        boost::asio::streambuf::const_buffers_type bufs = ptrSockStreamBuff->data();
        std::string strOut(boost::asio::buffers_begin(bufs), boost::asio::buffers_begin(bufs) + ptrSockStreamBuff->size());
        strOut += (char)(0);
        std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->native_handle()<< " Recv Data: "<< strOut<< std::endl;

        // // 5.2.4 移出socket buff
        ptrSockStreamBuff->consume(ptrSockStreamBuff->size());

        // Sleep 500 毫秒。模拟正忙
        boost::this_thread::sleep_for(boost::chrono::milliseconds(500));

        // 5.2.5 回包
        // 组装数据
        std::stringstream stServerSendBuf;
        stServerSendBuf << "Server Thread "<< boost::this_thread::get_id()<< " Say Got{ "<< strOut<< " }";
        boost::shared_ptr<std::string> ptrBuffStr = boost::shared_ptr<std::string>(new std::string(stServerSendBuf.str()));

        // 先发Buff长度
        size_t uBufLen = ptrBuffStr->size();
        boost::asio::async_write(*ptrCurSock, boost::asio::buffer(&uBufLen, sizeof(uBufLen)), boost::asio::transfer_exactly(sizeof(uBufLen)),
            boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
        );

        // 再发数据
        boost::asio::async_write(*ptrCurSock, boost::asio::buffer(*ptrBuffStr), boost::asio::transfer_exactly(ptrBuffStr->size()),
            boost::bind(server_thread_send_handler, ptrBuffStr, _1, _2)
        );

        // 5.2.6 重新监听
        boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
            server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
        ));
    }

    // 5.2.7 关闭退出
    if (error == boost::asio::error::eof)
    {
        std::cout<< "xxxx Server Thread "<< boost::this_thread::get_id()<< " Sock Release: "<< ptrCurSock->native_handle()<< std::endl;
        g_stServerSockMap.erase(ptrCurSock->native_handle());

        g_iCurClientNumber.fetch_sub(1);
        return;
    }

    if (error)
    {
        std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Recv Failed: "<< error.message()<< std::endl;
        return;
    }
}

/**
 * 服务器异步接受Socket回调函数
 * @param [in] stMainService 服务对象
 * @param [in] stAccepter 接受Socket
 * @param [in] ptrCurSock 建立连接的Socket
 * @param [in] error 错误信息
 */
void server_thread_accept_handler(
    boost::asio::io_service& stMainService,
    boost::asio::ip::tcp::acceptor& stAccepter,
    boost::shared_ptr<boost::asio::ip::tcp::socket>& ptrCurSock,
    const boost::system::error_code& error // Result of operation.
    )
{
    if (error)
    {
       std::cout<< "++++ Server Accept Failed: "<< error.message()<< std::endl;
       return;
    }

    std::cout<< "++++ Server Thread "<< boost::this_thread::get_id()<< " Accept a socket: "<< ptrCurSock->native_handle()<< std::endl;
    std::cout<< "\tServer Thread Local End Point:"<< std::endl<<
         "\t\tServer Thread Address: "<< ptrCurSock->local_endpoint().address().to_string()<< std::endl<< 
         "\t\tServer Thread Port: "<< ptrCurSock->local_endpoint().port()<< std::endl<< 
         "\t\tServer Thread Protocol: "<< ptrCurSock->local_endpoint().protocol().protocol()<< std::endl;
    std::cout<< "\tServer Thread Remote End Point:"<< std::endl<<
        "\t\tServer Thread Address: "<< ptrCurSock->remote_endpoint().address().to_string()<< std::endl<< 
         "\t\tServer Thread Port: "<< ptrCurSock->remote_endpoint().port()<< std::endl<< 
         "\t\tServer Thread Protocol: "<< ptrCurSock->remote_endpoint().protocol().protocol()<< std::endl;

    g_iCurClientNumber.fetch_add(1);
    g_iMaxClientNumber = std::max(g_iMaxClientNumber.load(), g_iCurClientNumber.load());

    // Step 5.1 接受链入的Socket
    g_stServerSockMap[ptrCurSock->native_handle()] = std::make_pair(
        ptrCurSock, 
        boost::shared_ptr<boost::asio::streambuf>(
            new boost::asio::streambuf()
        )
    );

    // Step 5.2 设置Socket接收数据回调
    boost::shared_ptr<boost::asio::streambuf> ptrSockStreamBuff = g_stServerSockMap[ptrCurSock->native_handle()].second;
    boost::asio::async_read(*ptrCurSock, *ptrSockStreamBuff, boost::asio::transfer_exactly(sizeof(size_t)), boost::bind(
        server_thread_recv_handler, ptrCurSock, ptrSockStreamBuff, _1
    ));

    // Step 5.3 创建Socket, 接受新Socket
    ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
        new boost::asio::ip::tcp::socket(stMainService)
    );

    stAccepter.async_accept(*ptrCurSock, boost::bind(
        server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
    ));
}

/**
 * 服务器工作线程
 * @param [in] stMainService 服务对象
 */
void server_thread(boost::asio::io_service& stMainService)
{
    std::cout<< "==== Server Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;
    stMainService.run(); // 这样,就加到线程池中去了
}

/**
 * 服务器主线程
 * @param [in] server_num 服务线程数
 */
void server_main_thread(int server_num)
{
    // Step 1. 创建服务对象
    static boost::asio::io_service stMainService;

    // Step 2. 创建地址生成器及生成地址
    boost::asio::ip::tcp::resolver stResolver(stMainService);
    // 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
    boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("127.0.0.1", "8731"));

    // Step 3. 创建接收器
    boost::asio::ip::tcp::acceptor stAccepter(stMainService, stEndpoint);

    // Step 4. 创建Socket
    boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
        new boost::asio::ip::tcp::socket(stMainService)
    );

    stAccepter.async_accept(*ptrCurSock, boost::bind(
        server_thread_accept_handler, boost::ref(stMainService), boost::ref(stAccepter), boost::ref(ptrCurSock), _1
    ));

    // Step 5. 创建线程并加入到服务线程池
    std::vector<boost::thread*> stls;
    for (int i = 0; i < server_num; ++ i)
    {
        // 打印输出错开
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        stls.push_back(
            new boost::thread(server_thread, boost::ref(stMainService))
        );
    }

    // Ctrl + C退出服务
    boost::asio::signal_set stSigs(stMainService);
    stSigs.add(SIGINT);
    stSigs.add(SIGTERM);
    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));

    // Step 6. 阻塞主线程,等待服务线程退出
    for (size_t i = 0; i < stls.size(); ++ i)
    {
        stls[i]->join();
        delete stls[i];
    }
}

/**
 * 客户端网络线程函数
 */
void client_thread()
{
    std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Started"<< std::endl;

    // Step 1. 创建服务对象
    static boost::asio::io_service stMainService;

    // Step 2. 创建地址生成器及生成地址
    boost::asio::ip::tcp::resolver stResolver(stMainService);
    // 其实第二个参数8731也可以写成http、ftp什么的,所以他这里用了字符串
    boost::asio::ip::tcp::endpoint stEndpoint = *stResolver.resolve(boost::asio::ip::tcp::resolver::query("127.0.0.1", "8731"));

    // Step 3. 创建Socket
    boost::shared_ptr<boost::asio::ip::tcp::socket> ptrCurSock = boost::shared_ptr<boost::asio::ip::tcp::socket>(
        new boost::asio::ip::tcp::socket(stMainService)
    );

    // Step 4. 连接服务器
    ptrCurSock->connect(stEndpoint);
    std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Connect Success => "<< ptrCurSock->local_endpoint().port()<< std::endl;

    //** 睡眠一段时间再发数据
    static boost::random::mt19937 rng;
    static boost::random::uniform_int_distribution<> sleep_rdm(0,5000);
    int iSleepFor = 1000 + sleep_rdm(rng);
    boost::this_thread::sleep_for(boost::chrono::milliseconds(iSleepFor));

    // ** 构造数据
    boost::asio::streambuf stSendedBuff;
    std::ostream stSendedBuffOS(&stSendedBuff);
    stSendedBuffOS<< "---- Client Thread "<< boost::this_thread::get_id()<< " Sock "<< ptrCurSock->local_endpoint().port()<< " Say Hello";

    // Step 5 客户端同步发送数据
    size_t ullBufSize = stSendedBuff.size();
    boost::asio::write(*ptrCurSock, boost::asio::buffer(&ullBufSize, sizeof(ullBufSize)), boost::asio::transfer_exactly(sizeof(ullBufSize)));
    boost::asio::write(*ptrCurSock, stSendedBuff, boost::asio::transfer_at_least(stSendedBuff.size() - sizeof(ullBufSize)));
    // sent data is removed from input sequence
    stSendedBuff.consume(stSendedBuff.size()); 

    std::cout<< "**** Client Thread "<< boost::this_thread::get_id()<< " Sended"<< std::endl;

    // Step 6 客户端同步接收数据
    char strRecvStr[2048];
    ptrCurSock->read_some(boost::asio::buffer(strRecvStr, sizeof(size_t)));
    ptrCurSock->read_some(boost::asio::buffer(strRecvStr + sizeof(size_t), *((size_t*)(strRecvStr))));
    std::cout<< "---- Client Thread "<< boost::this_thread::get_id()<< " Recv Data: "<< strRecvStr + sizeof(size_t)<< std::endl;

    boost::this_thread::sleep_for(boost::chrono::milliseconds(10000 - iSleepFor));

    // Step 7. 等待服务关闭
    stMainService.run();

    // Step 8. Socket 关闭退出
    ptrCurSock->close();
}

void print_asio_info();

int main(int argc, char* argv[]) {
    print_asio_info();

    g_iMaxClientNumber.store(0);
    g_iCurClientNumber.store(0);

    boost::thread st(server_main_thread, 3);
    std::vector<boost::thread*> ctl;

    const int max_client_num = 128;
    for (int i = 0; i < max_client_num; ++ i)
    {
        // 打印输出错开
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        ctl.push_back(new boost::thread(client_thread));
    }

    st.join();
    for (size_t i = 0; i < ctl.size(); ++ i)
    {
        ctl[i]->join();
        delete ctl[i];
    }

    std::cout<< "All done, max clients: "<< g_iMaxClientNumber.load()<< std::endl;

    print_asio_info();

    return 0;
}

void print_asio_info()
{
    #if defined(BOOST_ASIO_HAS_IOCP)
        std::cout<< "Boost.Asio using iocp"<< std::endl;

        #if defined(BOOST_ASIO_HAS_SERIAL_PORT)
            std::cout<< "\tSerial Port: enabled"<< std::endl;
        #else
            std::cout<< "\tSerial Port: disabled"<< std::endl;
        #endif

        #if defined(BOOST_ASIO_HAS_WINDOWS_STREAM_HANDLE)
            std::cout<< "\tStream handle: enabled"<< std::endl;
        #else
            std::cout<< "\tStream handle: disabled"<< std::endl;
        #endif

        #if defined(BOOST_ASIO_HAS_WINDOWS_RANDOM_ACCESS_HANDLE)
            std::cout<< "\tWindows Random Access Handle: enabled"<< std::endl;
        #else
            std::cout<< "\tWindows Random Access Handle: disabled"<< std::endl;
        #endif

        #if defined(BOOST_ASIO_HAS_WINDOWS_OBJECT_HANDLE)
            std::cout<< "\tObject Handle: enabled"<< std::endl;
        #else
            std::cout<< "\tObject Handle: disabled"<< std::endl;
        #endif

        #if defined(BOOST_ASIO_HAS_WINDOWS_OVERLAPPED_PTR)
            std::cout<< "\tWindows Overlapped Ptr: enabled"<< std::endl;
        #else
            std::cout<< "\tWindows Overlapped Ptr: disabled"<< std::endl;
        #endif
    #elif defined(BOOST_ASIO_HAS_EPOLL)
        std::cout<< "Boost.Asio using epoll."<< std::endl;
        #if defined(BOOST_ASIO_HAS_EVENTFD)
            std::cout<< "\teventfd: enabled"<< std::endl;
        #else
            std::cout<< "\teventfd: disabled"<< std::endl;
        #endif
        #if defined(BOOST_ASIO_HAS_TIMERFD)
            std::cout<< "\timerfd: enabled"<< std::endl;
        #else
            std::cout<< "\timerfd: disabled"<< std::endl;
        #endif
    #elif defined(BOOST_ASIO_HAS_KQUEUE)
        std::cout<< "Boost.Asio using kqueue"<< std::endl;
    #elif defined(BOOST_ASIO_HAS_DEV_POLL)
        std::cout<< "Boost.Asio using solaris: /dev/poll"<< std::endl;
    #endif

    #if defined(BOOST_ASIO_HAS_POSIX_STREAM_DESCRIPTOR)
        std::cout<< "Posix Stream Descriptor: enabled"<< std::endl;
    #else
        std::cout<< "Posix Stream Descriptor: disabled"<< std::endl;
    #endif

    #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
        std::cout<< "Unix domain socket: enabled"<< std::endl;
    #else
        std::cout<< "Unix domain socket: disabled"<< std::endl;
    #endif

    #if defined(BOOST_ASIO_HAS_SIGACTION)
        std::cout<< "sigaction: enabled"<< std::endl;
    #else
        std::cout<< "sigaction: disabled"<< std::endl;
    #endif
}

这个搞起来相当费劲,经常需要跳转到Boost的源码中,查看一些回调函数的定义式。 write和write_some函数在completion_condition返回0时才发送,否则将数据加入到发送窗口,并且没有发生数据拷贝,也就是说,如果是异步操作,开发者必须保证发送时数据有效。(这类函数默认的completion_condition是仿函数transfer_all,返回default_max_transfer_size: 65536) 同样,read和read_some也是这样。 Send和receive函数才是立即执行的(不推荐使用)。

另外,streambuf流用于管理发送或接收缓冲,但是在发送或接收完后,要执行consume函数移出或commit移入缓冲区,否则数据不会被销毁。

UDP和TCP的类似,我就不再多写一个demo了。 以上sample的client和server的读数据采用了两种不同的方式

有一点比较爽,在多线程条件下 io_service的run函数是线程安全的,也就是说,多个线程调用同一个run的时候,就自动被加入工作线程池,在消息到来的时候io_service会找到一个可用的线程进行处理。

注:以上代码Visual Studio中需要包含Boost的include目录和lib目录;GCC或Clang中需要加编译选项-I[BOOSTPREFIX目录]/include –L[BOOST_PREFIX目录]/lib -lboost_random -lboost_thread -lboost_system -lstdc++ -lrt -pthread -D_POSIX_MT, 如果BOOST不在系统动态库查找目录中且同时存在Boost的动态和静态库则加上 -static 选项

另外,这个demo代码包含一些功能检测的代码。

小试牛的第三刀,定时器Timer:

还是喜欢上代码,so…

#include <cstdio>
#include <iostream>
#include <cstdlib>
#include <stdint.h>
#include <cstring>
#include <string>
#include <sstream>
#include <map>

#include <boost/chrono.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>


void signal_handler(
    boost::asio::io_service& stMainService,
    const boost::system::error_code& error,
    int signal_number)
{
    if (error)
    {
        std::cout<< error.message()<< std::endl;
    }

    std::cout<< "Catch a signal: "<< signal_number<< std::endl;

    stMainService.stop();
}


void deadline_timer_handler(
  const boost::system::error_code& error // Result of operation.
)
{
    std::cout<< "On Deadline Timer Actived"<< std::endl;
}

void waitable_timer_handler(
  const boost::system::error_code& error // Result of operation.
)
{
    std::cout<< "On Waitable Timer Actived"<< std::endl;
}

void steady_timer_handler(
  boost::asio::basic_waitable_timer<boost::chrono::steady_clock>& stSteadyTimer,
  const boost::system::error_code& error // Result of operation.
)
{
    std::cout<< "On Steady Timer Actived"<< std::endl;
    if (error == boost::asio::error::operation_aborted)
    {
        std::cout<< "On Steady Timer Abouted"<< std::endl;
    }
    else
    {
        stSteadyTimer.expires_at(stSteadyTimer.expires_at() + boost::chrono::seconds(1));
        stSteadyTimer.async_wait(boost::bind(steady_timer_handler, boost::ref(stSteadyTimer), _1));
    }
}

int main(int argc, char* argv[]) {

    // Step 1. 创建服务对象
    boost::asio::io_service stMainService;

    // =========== deadline timer ===========
    // Step 2(1) 创建Timer对象
    boost::asio::deadline_timer stDeadlineTimer(stMainService);
    stDeadlineTimer.expires_from_now(boost::posix_time::seconds(2));

    // Step 3(1) 设置Timer回调
    stDeadlineTimer.async_wait(deadline_timer_handler);

    // =========== high_resolution_clock timer ===========
    // Step 2(2) 创建Timer对象
    boost::asio::basic_waitable_timer<boost::chrono::high_resolution_clock> stWaitableTimer(stMainService);
    stWaitableTimer.expires_from_now(boost::chrono::seconds(1));

    // Step 3(2) 设置Timer回调
    stWaitableTimer.async_wait(waitable_timer_handler);

    // =========== steady timer ===========
    // Step 2(3) 创建Timer对象
    boost::asio::basic_waitable_timer<boost::chrono::steady_clock> stSteadyTimer(stMainService);
    stSteadyTimer.expires_from_now(boost::chrono::seconds(1));

    // Step 3(3) 设置Timer回调
    stSteadyTimer.async_wait(boost::bind(steady_timer_handler, boost::ref(stSteadyTimer), _1));

    // =========== system_timer ===========
    // =========== 对应 chrono::system_clock 懒得演示了,都差不多,只是时间单位和载体不同而已 ===========

    // 绑定 Ctrl + C退出服务
    boost::asio::signal_set stSigs(stMainService);
    stSigs.add(SIGINT);
    stSigs.add(SIGTERM);
    stSigs.async_wait(boost::bind(signal_handler, boost::ref(stMainService), _1, _2));

    stMainService.run();

    return 0;
}

话说Boost.Asio每次异步wait回调之后还要重新wait一下挺麻烦的

额外功能:

设备文件支持

boost::asio::serial_port 可以打开一个Unix设备文件,并作为输入输出流,然后可以用自由函数boost::asio::read(),boost::asio::async_read(),boost::asio::write(),boost::asio::async_write(),boost::asio::read_until() 和 boost::asio::async_read_until()操作这些文件

在Windows上,需要系统支持I/O completion port时才能使用,可以通过BOOST_ASIO_HAS_SERIAL_PORTS 这个宏来检测是否可用这个功能(如果定义了则可用)。

Unix系统功能

第一项是Unix Domain Sockets

这种socket可以通过

local::datagram_protocol::socket socket1(my_io_service);
local::datagram_protocol::socket socket2(my_io_service);
local::connect_pair(socket1, socket2);
//或者
//服务端
local::stream_protocol::endpoint ep("/tmp/foobar");
local::stream_protocol::acceptor acceptor(my_io_service, ep);
local::stream_protocol::socket socket(my_io_service);
acceptor.accept(socket);
//客户端
local::stream_protocol::endpoint ep("/tmp/foobar");
local::stream_protocol::socket socket(my_io_service);
socket.connect(ep);

来使用,其他的部分和普通的Socket一样

第二项是指向流的文件描述符

posix::stream_descriptor in(my_io_service, ::dup(STDIN_FILENO)); posix::stream_descriptor out(my_io_service, ::dup(STDOUT_FILENO));

创建出的descriptor可以用asio的自由函数的读写函数操作

第三项是fork支持通过notify_fork函数来重建内部描述符

SSL支持

这部分依赖OpenSSL,简单的说,就是在socket外面包了一层,然后操作带ssl的socket。

大概就是

typedef ssl::stream<tcp::socket> ssl_socket;

然后用ssl_socket代替tcp::socket

简要性能测试

  • 测试机器: CPU Intel(R) Xeon(R) CPU X3440 @ 2.53GHz × 8 , 内存 8096MB

  • 测试环境: Linux 2.6.32.43, GCC 4.8.0, 编译优化配置 -O2 -g -ggdb

  • 程序配置: 16 工作进程,1主进程,主线程参与逻辑

  • 测试结果: 5000-8000连接,每秒收到约320K个报文,7MB流量,每秒发送约320K个报文,12MB流量, CPU 负载: 180%(5000连接) – 195% (8000连接)

结论: 不知道为什么,压力再也上不去了, 我是把输出重定向到文件的,所以开始以为是磁盘IO爆了,写出日志次数太多,但是gperftools显示磁盘IO占用并不高,性能分布还是比较平均的。但是基本上就在16万个报文了(每个包有一次发送长度的包[4字节]和一次数据的send[不定长])

主要就是这个样子,一些更底层的东西比如srand可以以后再看( ^ _ ^ )

Previous网络编程小结NextLnmp yum 安装脚本 (for CentOS)

Last updated 6 years ago

Was this helpful?

Boost.Asio的接口是仿IOCP的异步IO的形式的(参见:

测试代码地址: profile性能分析报告:

http://www.cnblogs.com/hello-leo/archive/2011/04/12/leo.html),所以,其他环境下都是模拟的。
https://gist.github.com/owt5008137/5660983
http://api.owent.net/resource/doc/link/test.asio.pdf