asio note
1 asio 简介
1.1 tutorial
asio 是一个跨平台的 c++ 网络库, 现在的最新版本为1.18.2, 在 Windows 下调用 IOCP, Linux 下调用 epoll, BSD 下调用 kqueue
introduction: Using Asio with C++11
asio 库的官方示例文档: asio tutorial
作者的 github: chriskohlhoff asio
install asio
# for mac brew install asio # for ubuntu apt install libasio-dev
1.2 timer
- asio::deadline_timer
- 使用的计量时间是系统时间 (posix_time), 因此修改系统时间会影响 deadline_timer 的行为
- asio::steady_timer
- 基于 std::chrono::steady_clock 是一个不会受系统时间影响的定时器
1.3 boost::asio 和 asio
asio 可以独立于 boost 库, 大部分新的编译器(vs 2012, gcc 4.7 -std=c11 后) 会自动添加 ASIO_STANDALONE, 表示 asio 不需要用到 boost 库
most of Asio may now be used without a dependency on Boost header files or libraries.
Boost.Asio 1.18.2 is also included in Boost 1.76.
2 异步 asio 使用
2.1 async echo server with callback
下面为一个简单的使用异步 asio 的 echo 服务器
首先创建一个 CRTP 类 Session
- CRTP 奇异递归模板模式, 更一般地被称作 F-bound polymorphism
- 派生类继承自模板类, 派生类将自身作为参数传给模板类
- 基类转换成派生类用的是 static_cast 而不是 dynamic_cast, 降低了继承带来的虚函数表查询开销
- enable_shared_from_this 和 ranges::view_interface 属于 CRTP
// CRTP class Session : public std::enable_shared_from_this<Session> { public: Session(asio::ip::tcp::socket socket) : socket_(std::move(socket)) {} void start() { do_read(); } private: void do_read() { auto self(shared_from_this()); socket_.async_read_some(asio::buffer(data_, maxLen), [this, self] (std::error_code ec, size_t len) { if (!ec) // this->do_write(len) do_write(len); }); } void do_write(size_t len) { auto self(shared_from_this()); asio::async_write(socket_, asio::buffer(data_, len), [this, self](std::error_code ec, size_t) { if (!ec) do_read(); }); } asio::ip::tcp::socket socket_; enum { maxLen = 1024 }; char data_[maxLen]; };
这里使用 callback handler 函数的方式实现异步, 使用了 async_read_some 和 async_write 对 socket 进行读写
async_read_some(asio::error_code, std::size_t)>::return_type async_read_some(const MutableBufferSequence &buffers, const ReadHandler &handler) async_write(asio::error_code, std::size_t)>::return_type async_write(AsyncWriteStream &s, DynamicBuffer_v2 buffers, CompletionCondition completion_condition, const WriteHandler &handler, typename enable_if<is_dynamic_buffer_v2<DynamicBuffer_v2> ::value>::type * = 0)
- socket 的传递需要使用 std::move()
接下来创建 Server 类:
class Server { public: Server(asio::io_context &io_context, short port) : acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) { do_accept(); } private: void do_accept() { acceptor_.async_accept([this](std::error_code ec, asio::ip::tcp::socket socket) { if (!ec) std::make_shared<Session>(std::move(socket))->start(); do_accept(); }); } asio::ip::tcp::acceptor acceptor_; };
- socket 的传递始终使用 std::move()
- 这里使用 make_shared<T>() 创建 Session, 使用 make_shared 和 shared_ptr+new 的优劣 trade-offs 如下:
- std::shared_ptr<T>(new T(args…)) 调用两次 allocations (one for the object T and one for the control block of the shared pointer), 而 std::make_shared<T> 只调用一次 allocation
- 如果有 std::weak_ptr 引用由 std::make_shared 创建的控制块, 当所有 shared 结束生命周期后, 内存要等到所有 weak 销毁后才释放
- std::make_shared 需要构造函数为 public
- std::make_shared 使用的是 ::new, 修改 operator new 会导致行为和 std::shared_ptr<T>(new T(args…)) 不同
使用 async_accept 接收新的连接
async_accept(asio::error_code)>::return_type async_accept(basic_socket<protocol_type, Executor1> &peer, endpoint_type &peer_endpoint, const AcceptHandler &handler)
最后是 asio::io_context 的执行:
#include <stdio.h> #include <memory> #include <utility> #include "asio.hpp" int main(int argc, char *argv[]) { try { asio::io_context io_context; // listen to localhost:9999 Server server(io_context, 9999); io_context.run(); } catch (std::exception &e) { printf("exception %s\n", e.what()); } return 0; }
2.2 multi-threaded
上面的例子是单进程单线程的, 如希望使用多个线程来处理同一个 io_context 需要加上 asio::strand 解决竞态问题
- strand 用于串行化回调函数的执行
- asio::strand 基于 mutex 实现, 保证 callback 的顺序, 使用 strand.post/wrap 包装非线程安全的操作, wrap 最近已经弃用, 使用 asio::bind_executor 代替
下面是根据 strand 改进的 echo server:
#include <stdio.h> #include <memory> #include <vector> #include <thread> #include <utility> #include "asio.hpp" // CRTP class Session : public std::enable_shared_from_this<Session> { public: Session(asio::io_context &io_context) : socket_(io_context), strand_(io_context) {} void start() { do_read(); } asio::ip::tcp::socket &socket() { return socket_; } private: void do_read() { auto self(shared_from_this()); socket_.async_read_some(asio::buffer(data_, maxLen), asio::bind_executor(strand_, [this, self] (std::error_code ec, size_t len) { if (!ec) do_write(len); })); } void do_write(size_t len) { auto self(shared_from_this()); asio::async_write(socket_, asio::buffer(data_, len), asio::bind_executor(strand_, [this, self](std::error_code ec, size_t) { if (!ec) do_read(); })); } asio::io_context::strand strand_; asio::ip::tcp::socket socket_; enum { maxLen = 1024 }; char data_[maxLen]; }; class Server { public: Server(asio::io_context &io_context, short port) : acceptor_(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), io_context_(io_context) { do_accept(); } private: void do_accept() { auto session = std::make_shared<Session>(io_context_); acceptor_.async_accept(session->socket(), [this, session](std::error_code ec) { if (!ec) session->start(); do_accept(); }); } asio::io_context &io_context_; asio::ip::tcp::acceptor acceptor_; }; int main(int argc, char *argv[]) { try { asio::io_context io_context; Server server(io_context, 9999); auto thread_num = std::thread::hardware_concurrency(); printf("thread number is: %d\n", thread_num); std::vector<std::thread> thread_pool; for (int i = 0; i < thread_num; ++i) { thread_pool.emplace_back([&io_context](){ io_context.run(); }); } for (auto &t : thread_pool) t.join(); } catch (std::exception &e) { printf("exception %s\n", e.what()); } return 0; }
- 使用 asio::bind_executor(strand, handler) 包装回调函数
- std::thread::hardware_concurrency() 可以获取支持的并发线程数
2.3 使用 future 代替 callback
我们可以使用 c++ 的新特性 std::future 来代替异步调用中的 callback
下面我简单实现了一个 async echo server with std::future:
先看 echo 函数:
std::future<void> echo(asio::ip::tcp::socket socket) { try { char data[1024]; while (1) { std::future<size_t> f_read = socket.async_read_some(asio::buffer(data), asio::use_future); auto len = f_read.get(); asio::async_write(socket, asio::buffer(data, len), asio::use_future); } } catch (std::exception &e) { printf("echo exception: %s\n", e.what()); } }
这里使用 asio::use_future 来代替 handler
接下来是监听函数:
std::future<void> listener(asio::io_context &io_context) { asio::ip::tcp::acceptor acceptor(io_context, {asio::ip::tcp::v4(), 9999}); while (1) { std::future<asio::ip::tcp::socket> f_socket = acceptor.async_accept(asio::use_future); auto socket = f_socket.get(); std::future<void> f_echo = std::async(std::launch::async, [&]{ return echo(std::move(socket)).get(); }); f_echo.get(); } }
最后是 main 函数
#include <stdio.h> #include <future> #include <asio.hpp> int main(int argc, char *argv[]) { try { asio::io_context io_context(1); asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { // signal handler io_context.stop(); }); std::future<void> f = std::async(std::launch::async, [&]{ return listener(io_context).get(); }); io_context.run(); f.get(); } catch (std::exception &e) { printf("exception: %s\n", e.what()); } return 0; }
- TODO: 好像有点 bug 待处理
2.4 使用 coroutine 代替 callback
同样, 我们可以使用 asio 中的协程来代替 callback, 下面是一个简单的使用 asio 协程的 echo 服务器:
先看 echo 函数:
asio::awaitable<void> echo(asio::ip::tcp::socket socket) { try { char data[1024]; while (1) { size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable); co_await async_write(socket, asio::buffer(data, n), asio::use_awaitable); } } catch (std::exception &e) { printf("echo exception: %s\n", e.what()); } }
上面函数中用到的关键字:
- asio::awaitable
可等待体
template< typename T, typename Executor = executor> class awaitable
- co_await
- 一元运算符 co_await 暂停协程并将控制返回给调用方
- asio::use_awaitable
- 表示当前正在执行的协程的 completion token
然后是监听:
asio::awaitable<void> listener() { auto executor = co_await asio::this_coro::executor; asio::ip::tcp::acceptor acceptor(executor, {asio::ip::tcp::v4(), 9999}); while (1) { asio::ip::tcp::socket socket = co_await acceptor.async_accept(asio::use_awaitable); asio::co_spawn(executor, echo(std::move(socket)), asio::detached); } }
- spawn 函数是用于 stackful 的协程的高级包装器, spawn 函数使程序能够以同步方式实现异步逻辑
- co_spawn 用于 spawn 一个新的 coroutined-based thread
- 第一个参数是 executor 决定了执行协程的上下文
第四个参数 completion token, 用于产生 completion handler 在 coroutine 结束时调用, 我们这里使用了 asio::detached, 表示显式忽略异步操作的结果
template< typename Executor, typename T, typename AwaitableExecutor, typename CompletionToken = DEFAULT> DEDUCED co_spawn
main 函数使用 co_spawn 产生新的协程:
int main(int argc, char *argv[]) { try { asio::io_context io_context(1); asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { // signal handler io_context.stop(); }); asio::co_spawn(io_context, listener(), asio::detached); io_context.run(); } catch (std::exception &e) { printf("exception: %s\n", e.what()); } return 0; }
3 misc
3.1 composed asynchronous operation
我们可以 async_write 函数包装一下成一个新的函数:
template <typename CompletionToken> auto async_write_message(tcp::socket& socket, const char* message, CompletionToken&& token) { return asio::async_write(socket, asio::buffer(message, std::strlen(message)), std::forward<CompletionToken>(token)); }
这里的返回值会根据 token 的类型进行自动类型推导:
- 当 completion token 是简单的 callback, 返回值为 void
- 当 completion token 是 asio::use_future, 返回值为 std::future<std::size_t>
- 当 completion token 是 asio::yield_context (用于 stackful coroutines), 返回值为 std::size_t
这里使用完美转发 std::forwar<>() 自动判断左值右值, 进行 copy 或 move
- 使用 callback
async_write_message(socket, "callback\r\n", [](const std::error_code &error, std::size_t n) { if (!error) printf("%d bytes transferred\n", n); else printf("error: %s\n", error.message().c_str()); });
- 使用 future
std::future<std::size_t> f = async_write_message( socket, "future\r\n", asio::use_future); io_context.run(); try { std::size_t n = f.get(); printf("%d bytes transferred\n", n); } catch (const std::exception& e) { printf("error: %s\n", e.what()); }
3.2 asio 更新
boost 1.66 中, io_service 已经改名为 io_context
io_service -> io_context
io_service.post() -> io_context.get_executor().post()
io_service.post() -> post(io_context, handler))
io_service.dispatch() -> io_context.get_executor().dispatch()
io_service::strand -> strand<io_context::executor_type>
…
lambda 代替 bind 将成员函数注册为回调函数
class tcp_connection { void handle_write(const boost::system::error_code &/*error*/, size_t /*bytes_transferred*/) {} // ... } // bind asio::async_write(socket_, asio::buffer(message_), std::bind(&tcp_connection::handle_write, shared_from_this(), asio::placeholders::error, asio::placeholders::bytes_transferred)); // lambda auto self = shared_from_this(); asio::async_write(socket_, asio::buffer(message_), [self = std::move(self)] (auto error, auto bytes_transferred) { self->handle_write(error, bytes_transferred); });