あめだまふぁくとりー

Boost.Graphとかできますん

連続で async_write するまでの道のり (その 2)

単一の socket で連続して async_write するの 2 回目です.

前回の記事 では, 普通に socket に対して非同期送信関数を連続で呼び出す際の問題点について整理しました.

前回の内容を踏まえると, 以下のような動作をする async_write_some メソッドを持つ socket のラッパを作成してあげれば良さそうです.

  • 送信中のデータが既にある場合は, 送信データと callback を送信待ちキューに積む.
  • 送信中のデータがない場合は, 引数で指定された callback を wrapping callback でラッピングし, async_write を呼び出す. この wrapping callback が呼び出された場合, 以下の処理を実行します.

    1. 送信したデータに対応する callback を呼び出す.
    2. キューに送信待ちデータが存在する場合, wrapping callback を async_write の引数に指定してそのデータ送信を行う.

このように, wrapping callback の中で次のデータの送信を行うことで, 既に送信中のデータの送信完了を待ってから, 次のデータを送信します.

これを実装するだけなら, それほど対して手間はかかりませんが, 他に考慮すべき点があります. それは, マルチスレッドで socket を使用する場合です. 本記事ではその点について, Boost.Asio ではどのようにマルチスレッドを扱っているのかを見ていきます.

socket のマルチスレッドでの使用

Boost.Asio の socket オブジェクトを含むだいたいの IO Object はスレッドセーフではありません. よって, ここで作成する queueing socket もスレッドアンセーフとして, マルチスレッドでの使用は無視して OK... という訳にはいきません.

通常, Boost.Asio では socket 等の IO Object をマルチスレッドで扱う場合は, strand と呼ばれる仕組みを用いて複数のスレッドからの同時アクセスを防止します.

以下で strand の使用方法を簡単に見ていきます.

strand の概要

strand はオブジェクトであり, ある strand オブジェクトに関連する callback は必ず直列に実行されることを保証します.

strand の仕組みを簡単に説明すると, strand は callback リストを持っていて, それらを実行する strand 用の callback (以降 strand callback と呼ぶことにします) を一つだけ io_service に登録します. この strand callback が io_service から呼び出されると, callback リスト中のすべての callback を呼び出します. 各 strand オブジェクトの strand callback は最大一つしか io_service 中には存在しないので, その strand に関連づけされた callback は複数のスレッドから同時に実行されることはありません.

callback リストへ callback を追加するには strand::post() または strand::dispatch() を使用します. しかし, post または dispatch 経由で追加する callback は引数なしで呼び出されます. そのため, 引数を必要とする callback を strand に関連づけには別の方法を使用します.

引数を必要とする callback を strand オブジェクトに関連づけるには, strand::wrap() でその callback をラッピングします. ラッピング後の callback を引数付きで呼び出すと, ラッピング前の callback にそれらの引数を bind させた関数オブジェクトが, strand::dispatch() を用いて callback リストに追加されます.

strand の使用例

以下は strand を使用したサンプルコードです.

#include <iostream>
#include <functional>
#include <thread>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>

namespace asio = boost::asio;
using asio::ip::tcp;

std::array<char, 8> send_buffer{};
std::array<char, 8> recv_buffer{};

void handle_write(tcp::socket* socket)
{
    socket->async_write_some(asio::buffer(send_buffer)
            , [](boost::system::error_code, std::size_t)     // callback A
    {
        std::cout << "finish writing" << std::endl;
    });
}

void handle_read(tcp::socket* socket)
{
    socket->async_read_some(asio::buffer(recv_buffer)
            , [](boost::system::error_code, std::size_t)     // callback B
    {
        std::cout << "finish reading" << std::endl;
    });
}

void handle_connect(tcp::socket* socket, asio::io_service::strand strand)
{
    socket->async_write_some(asio::buffer(send_buffer)
            , strand.wrap(std::bind(handle_write, socket)));

    socket->async_read_some(asio::buffer(recv_buffer)
            , strand.wrap(std::bind(handle_read, socket)));
}

void start_connect(tcp::socket* socket, asio::io_service::strand strand)
{
    socket->async_connect(
          tcp::endpoint{asio::ip::address::from_string("127.0.0.1"), 35555}
        , strand.wrap(std::bind(&handle_connect, socket, strand)));
}

int main()
{
    asio::io_service io_service{};

    auto strand = asio::io_service::strand{io_service};
    auto socket = tcp::socket{io_service};

    strand.post(std::bind(start_connect, &socket, strand));

    auto t1 = std::thread([&]{ io_service.run(); });
    auto t2 = std::thread([&]{ io_service.run(); });

    t2.join();
    t1.join();
}

このサンプルコードでは, 二つのスレッド t1, t2 を起動し, データの送受信を 2 度行っています. socket に対する操作の関係をグラフで表現すると下図のようになります.

f:id:amedama41:20140920090432p:plain

このグラフの各ノードがいずれかのスレッドで実行されます (ノード間のリンクは実行順を示しているわけではありません). strand を使用しない場合だと, "送信" と "受信" が同時に実行される可能性があり, socket はスレッドセーフではないのでこのような使用方法はできません. また, "送信/受信" の最中に "送信" が実行される可能性もあります.

そのため, これらの処理を strand::wrap() を用いることで strand への関連づけを行っています.

    socket->async_write_some(asio::buffer(send_buffer)
            , strand.wrap(std::bind(handle_write, socket)));

    socket->async_read_some(asio::buffer(recv_buffer)
            , strand.wrap(std::bind(handle_read, socket)));

    /*....*/

    socket->async_connect(
          tcp::endpoint{asio::ip::address::from_string("127.0.0.1"), 35555}
        , strand.wrap(std::bind(&handle_connect, socket, strand)));

これにより, "送信/受信", "送信", "受信" が並列に実行されることはありません. "送信/受信" で他二つの処理を初期化しているので, "送信/受信" -> "送信" -> "受信", または "送信/受信" -> "受信" -> "送信" のいずれかの順で実行されます.

また, 上記の例では strand::post() を使用することで "接続" も並列には実行させないようにしています.

    strand.post(std::bind(start_connect, &socket, strand));

この strand への関連付けは上記の例では必要ない (はず) ですが, 非同期処理の初期化がどのコンテキストで実行されているか不明な場合は, 初期化処理も strand に関連づける必要があります.

callback A, callback B とコメントを付けた callback に関しては, std::cout はスレッドセーフのため同時に実行されても問題ないので strand への関連づけは行っていません (最終的な出力結果については気にしません).

ここまでのまとめ

  • マルチスレッドで使用する場合は strand を使用する.

次回は, queueing socket の wrapping callback と strand をどう関連づけするのか見ていきます.

(続きます)