26 5 2021

asio note

1 asio 简介

1.1 tutorial

asio 是一个跨平台的 c++ 网络库, 现在的最新版本为1.18.2, 在 Windows 下调用 IOCP, Linux 下调用 epoll, BSD 下调用 kqueue

introduction: Using Asio with C++11

asio 库的官方示例文档: asio tutorial

作者的 github: chriskohlhoff asio

  • install asio

    # for mac
    brew install asio
    # for ubuntu
    apt install libasio-dev
    

1.2 timer

asio::deadline_timer
使用的计量时间是系统时间 (posix_time), 因此修改系统时间会影响 deadline_timer 的行为
asio::steady_timer
基于 std::chrono::steady_clock 是一个不会受系统时间影响的定时器

1.3 boost::asio 和 asio

asio 可以独立于 boost 库, 大部分新的编译器(vs 2012, gcc 4.7 -std=c11 后) 会自动添加 ASIO_STANDALONE, 表示 asio 不需要用到 boost 库

most of Asio may now be used without a dependency on Boost header files or libraries.

Boost.Asio 1.18.2 is also included in Boost 1.76.

2 异步 asio 使用

2.1 async echo server with callback

下面为一个简单的使用异步 asio 的 echo 服务器

首先创建一个 CRTP 类 Session

  • CRTP 奇异递归模板模式, 更一般地被称作 F-bound polymorphism
  • 派生类继承自模板类, 派生类将自身作为参数传给模板类
  • 基类转换成派生类用的是 static_cast 而不是 dynamic_cast, 降低了继承带来的虚函数表查询开销
  • enable_shared_from_this 和 ranges::view_interface 属于 CRTP
// CRTP
class Session : public std::enable_shared_from_this<Session>
{
public:
  Session(asio::ip::tcp::socket socket)
    : socket_(std::move(socket)) {}

  void start()
  {
    do_read();
  }

private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(asio::buffer(data_, maxLen),
			    [this, self]
			    (std::error_code ec, size_t len)
			    {
			      if (!ec)
				// this->do_write(len)
				do_write(len);
			    });
  }

  void do_write(size_t len)
  {
    auto self(shared_from_this());
    asio::async_write(socket_, asio::buffer(data_, len),
		      [this, self](std::error_code ec, size_t)
		      {
			if (!ec)
			  do_read();
		      });
  }

  asio::ip::tcp::socket socket_;
  enum { maxLen = 1024 };
  char data_[maxLen];
};
  • 这里使用 callback handler 函数的方式实现异步, 使用了 async_read_someasync_write 对 socket 进行读写

    async_read_some(asio::error_code, std::size_t)>::return_type
      async_read_some(const MutableBufferSequence &buffers,
      const ReadHandler &handler)
    
    
    async_write(asio::error_code, std::size_t)>::return_type
      async_write(AsyncWriteStream &s, DynamicBuffer_v2 buffers,
      CompletionCondition completion_condition,
      const WriteHandler &handler,
      typename enable_if<is_dynamic_buffer_v2<DynamicBuffer_v2>
      ::value>::type * = 0)
    
    
  • socket 的传递需要使用 std::move()

接下来创建 Server 类:

class Server
{
public:
  Server(asio::io_context &io_context, short port)
    : acceptor_(io_context,
		asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port))
  {
    do_accept();
  }

private:
  void do_accept()
  {
    acceptor_.async_accept([this](std::error_code ec,
				  asio::ip::tcp::socket socket)
    {
      if (!ec)
	std::make_shared<Session>(std::move(socket))->start();
      do_accept();
    });
  }

  asio::ip::tcp::acceptor acceptor_;
};
  • socket 的传递始终使用 std::move()
  • 这里使用 make_shared<T>() 创建 Session, 使用 make_shared 和 shared_ptr+new 的优劣 trade-offs 如下:
    • std::shared_ptr<T>(new T(args…)) 调用两次 allocations (one for the object T and one for the control block of the shared pointer), 而 std::make_shared<T> 只调用一次 allocation
    • 如果有 std::weak_ptr 引用由 std::make_shared 创建的控制块, 当所有 shared 结束生命周期后, 内存要等到所有 weak 销毁后才释放
    • std::make_shared 需要构造函数为 public
    • std::make_shared 使用的是 ::new, 修改 operator new 会导致行为和 std::shared_ptr<T>(new T(args…)) 不同
  • 使用 async_accept 接收新的连接

    async_accept(asio::error_code)>::return_type
      async_accept(basic_socket<protocol_type, Executor1> &peer,
      endpoint_type &peer_endpoint, const AcceptHandler &handler)
    
    

最后是 asio::io_context 的执行:

#include <stdio.h>
#include <memory>
#include <utility>
#include "asio.hpp"

int main(int argc, char *argv[])
{
  try
    {
      asio::io_context io_context;
      // listen to localhost:9999
      Server server(io_context, 9999);
      io_context.run();
    }
  catch (std::exception &e)
    {
      printf("exception %s\n", e.what());
    }
  return 0;
}

2.2 multi-threaded

上面的例子是单进程单线程的, 如希望使用多个线程来处理同一个 io_context 需要加上 asio::strand 解决竞态问题

  • strand 用于串行化回调函数的执行
  • asio::strand 基于 mutex 实现, 保证 callback 的顺序, 使用 strand.post/wrap 包装非线程安全的操作, wrap 最近已经弃用, 使用 asio::bind_executor 代替

下面是根据 strand 改进的 echo server:

#include <stdio.h>
#include <memory>
#include <vector>
#include <thread>
#include <utility>
#include "asio.hpp"

// CRTP
class Session : public std::enable_shared_from_this<Session>
{
public:
  Session(asio::io_context &io_context)
    : socket_(io_context), strand_(io_context) {}

  void start()
  {
    do_read();
  }

  asio::ip::tcp::socket &socket() { return socket_; }

private:
  void do_read()
  {
    auto self(shared_from_this());
    socket_.async_read_some(asio::buffer(data_, maxLen),
			    asio::bind_executor(strand_,
						[this, self]
			    (std::error_code ec, size_t len)
			    {
			      if (!ec)
				do_write(len);
			    }));
  }

  void do_write(size_t len)
  {
    auto self(shared_from_this());
    asio::async_write(socket_, asio::buffer(data_, len),
		      asio::bind_executor(strand_,
		       [this, self](std::error_code ec, size_t)
		      {
			if (!ec)
			  do_read();
		      }));
  }

  asio::io_context::strand strand_;
  asio::ip::tcp::socket socket_;
  enum { maxLen = 1024 };
  char data_[maxLen];
};

class Server
{
public:
  Server(asio::io_context &io_context, short port)
    : acceptor_(io_context,
		asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)),
      io_context_(io_context)
  {
    do_accept();
  }

private:
  void do_accept()
  {
    auto session = std::make_shared<Session>(io_context_);
    acceptor_.async_accept(session->socket(),
			   [this, session](std::error_code ec)
			   {
			     if (!ec)
			       session->start();
			     do_accept();
			   });
  }

  asio::io_context &io_context_;
  asio::ip::tcp::acceptor acceptor_;
};

int main(int argc, char *argv[])
{
  try
    {
      asio::io_context io_context;
      Server server(io_context, 9999);
      auto thread_num = std::thread::hardware_concurrency();
      printf("thread number is: %d\n", thread_num);
      std::vector<std::thread> thread_pool;
      for (int i = 0; i < thread_num; ++i)
	{
	  thread_pool.emplace_back([&io_context](){ io_context.run(); });
	}

      for (auto &t : thread_pool)
	t.join();
    }
  catch (std::exception &e)
    {
      printf("exception %s\n", e.what());
    }
  return 0;
}

  • 使用 asio::bind_executor(strand, handler) 包装回调函数
  • std::thread::hardware_concurrency() 可以获取支持的并发线程数

2.3 使用 future 代替 callback

我们可以使用 c++ 的新特性 std::future 来代替异步调用中的 callback

下面我简单实现了一个 async echo server with std::future:

先看 echo 函数:

std::future<void> echo(asio::ip::tcp::socket socket)
{
  try
    {
      char data[1024];
      while (1)
	{
	  std::future<size_t> f_read = 
	    socket.async_read_some(asio::buffer(data),
				   asio::use_future);
	  auto len = f_read.get();
	  asio::async_write(socket, asio::buffer(data, len),
			    asio::use_future);
	}
    }
  catch (std::exception &e)
    {
      printf("echo exception: %s\n", e.what());
    }
}

这里使用 asio::use_future 来代替 handler

接下来是监听函数:

std::future<void> listener(asio::io_context &io_context)
{
  asio::ip::tcp::acceptor acceptor(io_context,
				   {asio::ip::tcp::v4(), 9999});
  while (1)
    {
      std::future<asio::ip::tcp::socket> f_socket =
	acceptor.async_accept(asio::use_future);

      auto socket = f_socket.get();
      std::future<void> f_echo
	= std::async(std::launch::async,
		     [&]{ return echo(std::move(socket)).get(); });
      f_echo.get();
    }
}

最后是 main 函数

#include <stdio.h>
#include <future>
#include <asio.hpp>

int main(int argc, char *argv[])
{
  try
    {
      asio::io_context io_context(1);
      asio::signal_set signals(io_context, SIGINT, SIGTERM);
      signals.async_wait([&](auto, auto)
      {
	// signal handler
	io_context.stop();
      });
      std::future<void> f
	= std::async(std::launch::async,
		     [&]{ return listener(io_context).get(); });
      io_context.run();
      f.get();
    }
  catch (std::exception &e)
    {
      printf("exception: %s\n", e.what());  
    }

  return 0;
}
  • TODO: 好像有点 bug 待处理

2.4 使用 coroutine 代替 callback

同样, 我们可以使用 asio 中的协程来代替 callback, 下面是一个简单的使用 asio 协程的 echo 服务器:

先看 echo 函数:

asio::awaitable<void> echo(asio::ip::tcp::socket socket)
{
  try
    {
      char data[1024];
      while (1)
	{
	  size_t n = co_await
	    socket.async_read_some(asio::buffer(data),
				   asio::use_awaitable);
	  co_await async_write(socket, asio::buffer(data, n),
			       asio::use_awaitable);
	}
    }
  catch (std::exception &e)
    {
      printf("echo exception: %s\n", e.what());  
    }
}

上面函数中用到的关键字:

asio::awaitable

可等待体

template<
  typename T,
  typename Executor = executor>
class awaitable
co_await
一元运算符 co_await 暂停协程并将控制返回给调用方
asio::use_awaitable
表示当前正在执行的协程的 completion token

然后是监听:

asio::awaitable<void> listener()
{
  auto executor = co_await asio::this_coro::executor;
  asio::ip::tcp::acceptor acceptor(executor,
				   {asio::ip::tcp::v4(), 9999});
  while (1)
    {
      asio::ip::tcp::socket socket = co_await
	acceptor.async_accept(asio::use_awaitable);

      asio::co_spawn(executor, echo(std::move(socket)),
		     asio::detached);
    }
}
  • spawn 函数是用于 stackful 的协程的高级包装器, spawn 函数使程序能够以同步方式实现异步逻辑
  • co_spawn 用于 spawn 一个新的 coroutined-based thread
  • 第一个参数是 executor 决定了执行协程的上下文
  • 第四个参数 completion token, 用于产生 completion handler 在 coroutine 结束时调用, 我们这里使用了 asio::detached, 表示显式忽略异步操作的结果

    template<
      typename Executor,
      typename T,
      typename AwaitableExecutor,
      typename CompletionToken = DEFAULT>
    DEDUCED co_spawn
    

main 函数使用 co_spawn 产生新的协程:

int main(int argc, char *argv[])
{
  try
    {
      asio::io_context io_context(1);
      asio::signal_set signals(io_context, SIGINT, SIGTERM);
      signals.async_wait([&](auto, auto)
      {
	// signal handler
	    io_context.stop();
      });
      asio::co_spawn(io_context, listener(), asio::detached);
      io_context.run();
    }
  catch (std::exception &e)
    {
      printf("exception: %s\n", e.what());  
    }

  return 0;
}

3 misc

3.1 composed asynchronous operation

我们可以 async_write 函数包装一下成一个新的函数:

template <typename CompletionToken>
auto async_write_message(tcp::socket& socket,
    const char* message, CompletionToken&& token)
{
  return asio::async_write(socket,
      asio::buffer(message, std::strlen(message)),
      std::forward<CompletionToken>(token));
}

这里的返回值会根据 token 的类型进行自动类型推导:

  • 当 completion token 是简单的 callback, 返回值为 void
  • 当 completion token 是 asio::use_future, 返回值为 std::future<std::size_t>
  • 当 completion token 是 asio::yield_context (用于 stackful coroutines), 返回值为 std::size_t

这里使用完美转发 std::forwar<>() 自动判断左值右值, 进行 copy 或 move

使用 callback
async_write_message(socket, "callback\r\n",
    [](const std::error_code &error, std::size_t n)
    {
      if (!error)
	printf("%d bytes transferred\n", n);
      else
	printf("error: %s\n", error.message().c_str());
    });
使用 future
std::future<std::size_t> f = async_write_message(
    socket, "future\r\n", asio::use_future);

io_context.run();

try
{
  std::size_t n = f.get();
  printf("%d bytes transferred\n", n);
}
catch (const std::exception& e)
{
  printf("error: %s\n", e.what());
}

3.2 asio 更新

  • boost 1.66 中, io_service 已经改名为 io_context

    io_service -> io_context

    io_service.post() -> io_context.get_executor().post()

    io_service.post() -> post(io_context, handler))

    io_service.dispatch() -> io_context.get_executor().dispatch()

    io_service::strand -> strand<io_context::executor_type>

  • lambda 代替 bind 将成员函数注册为回调函数

    class tcp_connection
    {
      void handle_write(const boost::system::error_code &/*error*/,
    		    size_t /*bytes_transferred*/) {}
      // ...
    }
    
    // bind
    asio::async_write(socket_, asio::buffer(message_),
    		  std::bind(&tcp_connection::handle_write,
    			    shared_from_this(),
    			    asio::placeholders::error,
    			    asio::placeholders::bytes_transferred));
    
    // lambda
    auto self = shared_from_this();
    asio::async_write(socket_, asio::buffer(message_),
    		  [self = std::move(self)]
    		  (auto error, auto bytes_transferred)
    		  {
    		    self->handle_write(error, bytes_transferred);
    		  });
    
    
Tags: c++