あめだまふぁくとりー

Boost.Graphとかできますん

boost::asio::asio_handler_is_continuation の効果

asio_handler_is_continuation の効果がいまいち良く分からなかったので調べてみました.

ある handler について, asio_handler_is_continuation の結果が true の場合, 処理が以下の様に少し変わります.

  • handler はスレッド固有の private queue に一度登録され, 後で全体の queue に登録される.
  • private queue から全体の queue に handler を移動した際, handler 待ちの他のスレッドを起こさない.

一つ目の結果により, handler を登録した関数が完了するまで, その handler が実行されることはありません (io_service::dispatch で登録した場合等は除きます).

二つ目の結果により, 登録した handler を同じスレッドで実行する確率が上昇します.

これらを以下は例で見てみます.

#include <iostream>
#include <chrono>
#include <thread>
#include <boost/asio.hpp>

struct my_handler
{
    void operator()() const
    {
        std::cout << "my_handler: " << std::this_thread::get_id() << std::endl;
    }

    friend auto asio_handler_is_continuation(my_handler* h)
        -> bool
    {
        return h->is_continuation;
    }

    bool is_continuation;
};

int main(int argc, char* argv[])
{
    boost::asio::io_service io_service{};

    io_service.post([&]
    {
        // handler を登録して 1 秒間 sleep
        io_service.post(my_handler{argc != 1});
        std::this_thread::sleep_for(std::chrono::seconds{1});

        std::cout << "handler1: " << std::this_thread::get_id() << std::endl;
    });

    auto t1 = std::thread{[&]{
        io_service.run();
    }};

    auto t2 = std::thread{[&]{
        io_service.run();
    }};

    t2.join();
    t1.join();
}
  • asio_handler_is_continuation の結果が false の場合:
% ./a.out
my_handler: 0x10dd9c000
handler1: 0x10dd19000

my_handlerio_service::post で登録されると, 登録したスレッドは別のスレッドで即座に実行されています.

  • asio_handler_is_continuation の結果が true の場合:
% ./a.out continuation
handler1: 0x1040bc000
my_handler: 0x1040bc000

my_handler を登録した関数は 1 秒間 sleep しているにも関わらず, その関数が終了するまで my_handler は実行されていません.

また, my_handler は, それを登録したスレッドと同一のスレッドで呼び出されています.

連続で async_write するまでの道のり (その 2)

単一の socket で連続して async_write するの 2 回目です.

前回の記事 では, 普通に socket に対して非同期送信関数を連続で呼び出す際の問題点について整理しました.

前回の内容を踏まえると, 以下のような動作をする async_write_some メソッドを持つ socket のラッパを作成してあげれば良さそうです.

  • 送信中のデータが既にある場合は, 送信データと callback を送信待ちキューに積む.
  • 送信中のデータがない場合は, 引数で指定された callback を wrapping callback でラッピングし, async_write を呼び出す. この wrapping callback が呼び出された場合, 以下の処理を実行します.

    1. 送信したデータに対応する callback を呼び出す.
    2. キューに送信待ちデータが存在する場合, wrapping callback を async_write の引数に指定してそのデータ送信を行う.

このように, wrapping callback の中で次のデータの送信を行うことで, 既に送信中のデータの送信完了を待ってから, 次のデータを送信します.

これを実装するだけなら, それほど対して手間はかかりませんが, 他に考慮すべき点があります. それは, マルチスレッドで socket を使用する場合です. 本記事ではその点について, Boost.Asio ではどのようにマルチスレッドを扱っているのかを見ていきます.

socket のマルチスレッドでの使用

Boost.Asio の socket オブジェクトを含むだいたいの IO Object はスレッドセーフではありません. よって, ここで作成する queueing socket もスレッドアンセーフとして, マルチスレッドでの使用は無視して OK... という訳にはいきません.

通常, Boost.Asio では socket 等の IO Object をマルチスレッドで扱う場合は, strand と呼ばれる仕組みを用いて複数のスレッドからの同時アクセスを防止します.

以下で strand の使用方法を簡単に見ていきます.

strand の概要

strand はオブジェクトであり, ある strand オブジェクトに関連する callback は必ず直列に実行されることを保証します.

strand の仕組みを簡単に説明すると, strand は callback リストを持っていて, それらを実行する strand 用の callback (以降 strand callback と呼ぶことにします) を一つだけ io_service に登録します. この strand callback が io_service から呼び出されると, callback リスト中のすべての callback を呼び出します. 各 strand オブジェクトの strand callback は最大一つしか io_service 中には存在しないので, その strand に関連づけされた callback は複数のスレッドから同時に実行されることはありません.

callback リストへ callback を追加するには strand::post() または strand::dispatch() を使用します. しかし, post または dispatch 経由で追加する callback は引数なしで呼び出されます. そのため, 引数を必要とする callback を strand に関連づけには別の方法を使用します.

引数を必要とする callback を strand オブジェクトに関連づけるには, strand::wrap() でその callback をラッピングします. ラッピング後の callback を引数付きで呼び出すと, ラッピング前の callback にそれらの引数を bind させた関数オブジェクトが, strand::dispatch() を用いて callback リストに追加されます.

strand の使用例

以下は strand を使用したサンプルコードです.

#include <iostream>
#include <functional>
#include <thread>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>

namespace asio = boost::asio;
using asio::ip::tcp;

std::array<char, 8> send_buffer{};
std::array<char, 8> recv_buffer{};

void handle_write(tcp::socket* socket)
{
    socket->async_write_some(asio::buffer(send_buffer)
            , [](boost::system::error_code, std::size_t)     // callback A
    {
        std::cout << "finish writing" << std::endl;
    });
}

void handle_read(tcp::socket* socket)
{
    socket->async_read_some(asio::buffer(recv_buffer)
            , [](boost::system::error_code, std::size_t)     // callback B
    {
        std::cout << "finish reading" << std::endl;
    });
}

void handle_connect(tcp::socket* socket, asio::io_service::strand strand)
{
    socket->async_write_some(asio::buffer(send_buffer)
            , strand.wrap(std::bind(handle_write, socket)));

    socket->async_read_some(asio::buffer(recv_buffer)
            , strand.wrap(std::bind(handle_read, socket)));
}

void start_connect(tcp::socket* socket, asio::io_service::strand strand)
{
    socket->async_connect(
          tcp::endpoint{asio::ip::address::from_string("127.0.0.1"), 35555}
        , strand.wrap(std::bind(&handle_connect, socket, strand)));
}

int main()
{
    asio::io_service io_service{};

    auto strand = asio::io_service::strand{io_service};
    auto socket = tcp::socket{io_service};

    strand.post(std::bind(start_connect, &socket, strand));

    auto t1 = std::thread([&]{ io_service.run(); });
    auto t2 = std::thread([&]{ io_service.run(); });

    t2.join();
    t1.join();
}

このサンプルコードでは, 二つのスレッド t1, t2 を起動し, データの送受信を 2 度行っています. socket に対する操作の関係をグラフで表現すると下図のようになります.

f:id:amedama41:20140920090432p:plain

このグラフの各ノードがいずれかのスレッドで実行されます (ノード間のリンクは実行順を示しているわけではありません). strand を使用しない場合だと, "送信" と "受信" が同時に実行される可能性があり, socket はスレッドセーフではないのでこのような使用方法はできません. また, "送信/受信" の最中に "送信" が実行される可能性もあります.

そのため, これらの処理を strand::wrap() を用いることで strand への関連づけを行っています.

    socket->async_write_some(asio::buffer(send_buffer)
            , strand.wrap(std::bind(handle_write, socket)));

    socket->async_read_some(asio::buffer(recv_buffer)
            , strand.wrap(std::bind(handle_read, socket)));

    /*....*/

    socket->async_connect(
          tcp::endpoint{asio::ip::address::from_string("127.0.0.1"), 35555}
        , strand.wrap(std::bind(&handle_connect, socket, strand)));

これにより, "送信/受信", "送信", "受信" が並列に実行されることはありません. "送信/受信" で他二つの処理を初期化しているので, "送信/受信" -> "送信" -> "受信", または "送信/受信" -> "受信" -> "送信" のいずれかの順で実行されます.

また, 上記の例では strand::post() を使用することで "接続" も並列には実行させないようにしています.

    strand.post(std::bind(start_connect, &socket, strand));

この strand への関連付けは上記の例では必要ない (はず) ですが, 非同期処理の初期化がどのコンテキストで実行されているか不明な場合は, 初期化処理も strand に関連づける必要があります.

callback A, callback B とコメントを付けた callback に関しては, std::cout はスレッドセーフのため同時に実行されても問題ないので strand への関連づけは行っていません (最終的な出力結果については気にしません).

ここまでのまとめ

  • マルチスレッドで使用する場合は strand を使用する.

次回は, queueing socket の wrapping callback と strand をどう関連づけするのか見ていきます.

(続きます)

連続で async_write するまでの道のり (その 1)

単一の socket を使用して, 一つ目の送信処理の完了を待たずに二つ目の送信処理を実行したいといった場合は多々あります. レスポンスの順番がリクエストの順番とは異なる非同期プロトコルを使用または実装する場合には, そういったケースに特に遭遇します.

Boost.Asio を使用してそのような実装をする場合, どのようにすれば良いのかは明確になっていないように思われます. 同じ趣旨の質問が stackoverflow でも見つかりましたが, 明確な回答はありません.

c++ - Multiple writes in boost::asio to a single socket - Stack Overflow

本記事では, この問題に対する解決策について考えます.

一般的かつ誤りのあるコード

まず, stackoverflow でも例に挙がっているコードについて, 何が問題なのかを整理します.

#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/error_code.hpp>

int main()
{
    using namespace boost;
    namespace ip = asio::ip;
    using tcp = ip::tcp;

    asio::io_service io_service{};
    auto const endpoint = tcp::endpoint{ip::address::from_string("127.0.0.1"), 12345};

    // server
    asio::spawn(io_service, [&](asio::yield_context yield) {
        auto acceptor = tcp::acceptor{io_service, endpoint};
        auto sock = tcp::socket{io_service};
        acceptor.async_accept(sock, yield);
        auto ec = system::error_code{};
        while (!ec) {
            auto buffer = std::array<char, 128>{};
            auto size = sock.async_read_some(asio::buffer(buffer), yield[ec]);
            std::cout.write(buffer.data(), size) << std::endl;
        }
    });
    io_service.run_one(); // start accept

    auto sock = tcp::socket{io_service};
    sock.connect(endpoint);

    sock.set_option(tcp::acceptor::send_buffer_size{128});

    auto buffer1 = std::vector<char>(256, 'A');
    auto buffer2 = std::vector<char>(256, 'B');
    auto counter = 0;
    asio::async_write(sock, asio::buffer(buffer1)
            , [&](system::error_code, std::size_t) {
        if (++counter == 2) sock.close();
    });
    asio::async_write(sock, asio::buffer(buffer2)
            , [&](system::error_code, std::size_t) {
        if (++counter == 2) sock.close();
    });

    io_service.run();
}

asio::spawn している部分はデータ受信用のサーバを用意しているだけなので読み飛ばして構いません.

上記の例では, async_write を二度呼び出しています. 一度目の async_write で 256 個の文字 A を送信し, 二度目の async_write で同じ数の文字 B を送信しています. socket のメンバである async_write_some を使用せずに async_write を使用するのは short write を避けるためです. これにより, 256 個の A と 256 個の B すべてが送信されることを保証します.

しかし, 実際に実行してみると, 以下のよう A と B が 128 Byte ずつ交互に受信していることがわかります.

AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB

A が 256 個表示され, 次に B が 256 個表示されるのが期待だったのですが, なぜこのような結果になってしまったのでしょうか.

この原因を理解するため, async_write の実装を少し見てます. その概要を以下に示します.

  1. async_write は受け取った callback を wrap して, それを引数に指定して socket の async_write_some メンバ関数を呼び出します. 以降 wrapping 前の callback を wrapped callback, 後の callback を wrapping callback と呼ぶことにします.

    上記の例では, wrapped callback は lambda 式で生成された関数オブジェクトです.

  2. データが一部でも送信されたら wrapping callback が呼び出されます.

  3. wrapping callback の中で送信したデータのバイト数をチェックして条件を満たしていたら, wrapped callback を呼び出します. 条件を見たさない場合は再び wrapping callback を引数に指定して async_write_some メンバ関数を呼び出します.

    上記の例では, 256 バイト書き込むことが wrapped callback が呼び出される条件になります.

このように async_write では複数回 wrapped callback が呼び出される可能性があります.

さらに, 上記の例では送信バッファサイズを 128 Byte に設定しているので,各 async_write でそれぞれ二回ずつ wrapping callback が呼び出されることになります. 複数の非同期処理を実行する場合, それらの callback が呼び出される順番は基本的には io_service (のサービス) に登録した順番です. よって, 上記の例は以下の流れで呼び出しが行われます.

   write(128 個の A) -> io_service に A の wrapping callback を登録
-> write(128 個の B) -> io_service に B の wrapping callback を登録
-> A の wrapped callback 呼び出し -> write(128 個の A) -> A の wrapping callback を登録
-> B の wrapped callback 呼び出し -> write(128 個の B) -> B の wrapping callback を登録
-> A の wrapped callback 呼び出し -> A のユーザ wrapped callback 呼び出し
-> B の wrapped callback 呼び出し -> B のユーザ wrapped callback 呼び出し

この例では, socket の送信バッファサイズを 128 Byte に絞ることで複数回送信関数が呼ばれる状況を意図的に作りましたが, このような状況は TCP のウィンドウサイズが送信するデータのサイズを下回った場合でも発生します. つまり, これは現実的な問題と考えることができます.

単純に送信関数を続けて呼び出すのは誤りです. 期待通りにデータを送信するには, 一つ目のデータ送信と二つ目のデータ送信は順序関係を持つ必要があります. 次章では, この順序関係を実現する方法を見てみます.

データ送信間の順序付け

本章ではデータ送信を順序付けするための三つの方法について見てみます.

案 1. callback の中で二つ目の送信処理を実行する

asio::async_write(socket, asio::buffer(buffer1), [&](system::error_code, std::size_t) {
    asio::async_write(socket, asio::buffer(buffer2), handler);
    // 一つ目の async_write に対する完了処理
});

この方法は, 一つ目の送信の callback 内で二つ目の送信を行うことで, 順序関係を持たせています. このような callback を用いて非同期処理を連鎖させる方法は一般的な方法です.

しかし, この方法は一つ目の送信を行う時点で, この後にさらに送信を行うことが分かっているときしか適用できません. HTTP のようにリクエストとレスポンスを交互に行うようなケースには適用できますが, 相手がこちらの応答を待たずに不定期にメッセージを送るようなケースには適用できません.

また, callback 内でデータの送信しかしないのであれば, 下記のように同時にデータを送信した方が I/O の回数を減らせて効率的 (のはず) です.

asio::async_write(socket, std::array<asio::const_buffer, 2>{
    {asio::buffer(buffer1), asio::buffer(buffer2)}
}, handler);

案 2. coroutine を使用する

asio::spawn(io_service, [&](asio::yield_context yield) {
   asio::async_write(socket, asio::buffer(buffer1), yield);
   asio::async_write(socket, asio::buffer(buffer2), yield);
});

coroutine を使用することで, 一つ目の送信完了を待つことができるようになります.

しかしながら, この方法は本質的には案 1 と変わりません. そのため同様の問題があります. また, 非同期処理実行中は spawn したコンテキストでは他の処理が一切実行できません. 複数の socket に対し一斉に送信を行うといったことができないので, 適用できるケースはより絞られることになります.

案 3. buffered_write_stream を使用する

buffered_write_stream はその名前の通り, 送信データをバッファリングし, バッファリングしたデータを一度に送信してくれます. 名前からして本命な感じがしますが, 今回の問題に対してこれを使用するのは誤りです.

以下のコードを見ながらその問題点を説明していきます.

#include <iostream>
#include <vector>
#include <boost/asio/buffered_write_stream.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/error_code.hpp>

int main()
{
    using namespace boost;
    namespace ip = asio::ip;
    using tcp = ip::tcp;

    asio::io_service io_service{};
    auto const endpoint = tcp::endpoint{ip::address::from_string("127.0.0.1"), 12345};

    // server
    asio::spawn(io_service, [&](asio::yield_context yield) {
        auto acceptor = tcp::acceptor{io_service, endpoint};
        auto sock = tcp::socket{io_service};
        acceptor.async_accept(sock, yield);
        std::cout << "accept" << std::endl;
        auto ec = system::error_code{};
        while (!ec) {
            auto buffer = std::array<char, 128>{};
            auto size = sock.async_read_some(asio::buffer(buffer), yield[ec]);
            std::cout.write(buffer.data(), size) << std::endl;
        }
    });
    io_service.run_one(); // start accept

    auto sock = tcp::socket{io_service};
    sock.connect(endpoint);
    sock.set_option(tcp::acceptor::send_buffer_size{128});

    auto buffer1 = std::vector<char>(256, 'A');
    auto buffer2 = std::vector<char>(256, 'B');
    auto counter = 0;

    // buffered_write_stream の作成 / 使用
    asio::buffered_write_stream<tcp::socket&> bsock{sock};
    bsock.async_write_some(asio::buffer(buffer1)
            , [&](system::error_code ec, std::size_t bytes_transferred) {
        if (++counter == 2) {
            bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
                sock.close();
            });
        }
    });
    bsock.async_write_some(asio::buffer(buffer2)
            , [&](system::error_code ec, std::size_t bytes_transferred) {
        if (++counter == 2) {
            bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
                sock.close();
            });
        }
    });

    io_service.run();
}

前半部分は include しているファイルを除けば, 一番最初に示した例と同じです. 実際にデータの送信を行っている部分を見ていきます.

bsock.async_write_some(asio::buffer(buffer1)
        , [&](system::error_code ec, std::size_t bytes_transferred) {
    if (++counter == 2) {
        bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
            sock.close();
        });
    }
});

buffered_write_stream には async_write_some メンバ関数async_flush メンバ関数が存在します. 後者は名前の通り, バッファリングしたデータを flush, つまりここでは送信します. asio::async_write を使用していないのは, async_flush の実装の中で使用されているためです.

async_write_some メソッドbuffered_write_stream の中で管理しているバッファの空き容量に応じて挙動が少し変化します. バッファに空きがある場合は, 送信データをバッファに書き込みます. 逆にバッファに空きがない場合は, 一度 async_flush を行ってからバッファに送信データを書き込みます.

このことらから送信データは, バッファに空きがなくなるまで, または明示的に async_flush をするまで送信されないことが分かります. そのため, データを即時に送信したい場合は上記の例の様に, async_flush を呼び出します.

ちなみに, 以下のように書くのは誤りです.

bsock.async_write_some(asio::buffer(buffer1), handler1);
bsock.async_write_some(asio::buffer(buffer1), handler2);
bsock.async_flush(handler3);

このコードはバッファにデータを書き込む前に async_flush を行うことになるので, 何も送信されません.

サンプルコードで示した callback では送信を行った回数を数え, async_flush が一度だけ呼ばれる様にしています. これは async_flush は一度に複数回呼ばれてはいけないためです. もし async_flush を一度に複数回呼び出すと同一のデータが複数回送信されてしまいます.

さらに, async_flush の呼び出しから, その callback の呼び出しの間で async_write_some を呼び出すのも安全ではありません (おそらく). 結局, buffered_write_stream を使用してもある送信処理が別の送信処理についての知識がないと効率的かつ安全な送信は困難です.

また, コードから読み取るのは難しいですが, async_write_some の callback はデータが送信されたタイミングではなく, バッファに書き込まれたタイミングで呼ばれるという動きになっています. 実際に送信されたタイミングと, callback が呼ばれるタイミングが異なるため, この挙動は受け入れられない場合があります.

例えば, TCP のハーフクローズを行う際に問題となる可能性があります. async_write_some の callback でハーフクローズを行うと, 送信前に socket の書き込み側を閉じてしまいデータの送信に失敗してしまいます.

最後にオブジェクトの寿命についての問題についても言及しておきます. 上記のサンプルコード中の buffered_write_stream の初期化は以下のようになっています.

asio::buffered_write_stream<tcp::socket&> bsock{sock};

このコードを見ると buffered_write_stream のテンプレート引数には tcp::socket への参照を渡しています. これは buffered_write_streamコンストラクタ中でテンプレート引数の型のオブジェクトのコピーを行うためです. socket はコピーできないため, このコピーを回避するため参照を渡しています. これにより, buffered_write_stream と socket の両方のオブジェクトを管理する必要性が出てきます.

おそらく buffered_write_stream 自体, socket と組み合わせることを想定していないのだと思います.

ここまでのまとめ

ここまでのことを整理すると連続してデータ送信を行うには以下の機能を持った socket が必要だと考えられます.

  • async_write 時に送信キューが空の場合は即時にデータが送信されること.
  • async_write 時に送信キューが空でない場合は, データはキューイングされること.
  • データが実際に送信されたタイミングで callback が呼ばれること.
  • 送信完了時にキューにデータがある場合は, 自動的にそれらのデータが送信されること.

ここまで見えれば, 後はこれらの機能を持った queueing socket を新たに実装すればいいだけと言いたいところですが, もう少し話は続きます.

(続きます)

標準入出力を Boost.Asio で非同期に行う

標準入出力を Boost.Asio で非同期に行いたい場合は, posix::stream_descriptor を使用するといいみたいです.

使用例

#include <chrono>
#include <iostream>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/streambuf.hpp>
#include <unistd.h>

int main()
{
    namespace asio = boost::asio;

    asio::io_service io_service{};

    auto input = asio::posix::stream_descriptor{io_service, ::dup(STDIN_FILENO)};

    asio::spawn(io_service, [&](auto yield) {
        asio::steady_timer timer{io_service};
        while (not io_service.stopped()) {
            timer.expires_from_now(std::chrono::seconds{1});
            timer.async_wait(yield);
            std::cout << "ticktack..." << std::endl;
        }
    });

    asio::streambuf input_buffer{};
    asio::async_read_until(input, input_buffer, '\n', [&](auto& error, auto bytes_transferred) {
        std::cout << "input data: " << &input_buffer << std::endl;
        io_service.stop();
    });

    io_service.run();
}

出力結果

ticktack...
ticktack...
ticktack...
ticktack...
hgoe
input data: hgoe

buffered_read_stream とかと組み合わせると, 入出力のバッファリングもできるかも知れません (調べていないのでよく分かりません).

Graph Visitor でアルゴリズムの振る舞いを描写してみる

学習用にグラフアルゴリズムの振る舞いを視覚的に見たい場合, Boost.Graph の Visitor を使用すれば, 一からそれ用のアルゴリズムを実装せずにすみそうです.

という訳で, 実装してみました. GUIPython + Tk で実装して, 表示する情報を Visitor から TCP で送信しています.

Graph algorithm visualization sample

以下の画像は実行したときの様子を示しています (4 倍速ぐらいで途中までしか実行していません).

f:id:amedama41:20140628120850g:plain

問題は Visitor を使用しているアルゴリズムが経路探索ぐらいしかないことでしょうか.

Boost.Asio で read / async_read を使用する際の注意点

1. boost::asio::streambuf と組み合わせる場合

うっかり,

boost::asio::async_read(socket, streambuf, handler);

見たいに書くと, 永遠に handler が起動されない可能性があるので注意しましょう. これは, async_read は指定されたサイズ分読みを行い, デフォルトでは 65535 になっているからです. 少量のデータを送受信する場合は, boost::asio::transfer_at_least 等と一緒に使用しましょう.

// 少なくとも 4 byte 読むが, 一度の read でそれ以上読めたら handler が起動される
boost::asio::async_read(socket, streambuf, boost::asio::transfer_at_least(4), handler);

2. coroutine を使用かつ socket が close された場合

ドキュメンにも記載されていますが, read() / async_read() は, 期待したサイズ分を読む前に socket の終端 (EOF) を読むと, boost::asio::error::eof をエラーコードに指定して終了してしまいます. そのため, 以下のコードのように try-catch でエラーハンドリングすると, 何バイト読んだのかが分からず, 読み込んだデータを処理できない場合があります. 最悪, 読み込んだデータが捨てられてしまいます.

try {
    auto const size = boost::asio::async_read(socket, boost::asio::buffer(buf), yield);
}
catch (boost::system::system_error& e) {
    if (e.code() == boost::asio::error::eof) {
        // 何バイト読めたのか分からない
    }
}

以上を考慮すると, error_code を指定してあげるのが良いと思われます.

auto ec = boost::system::error_code{};
auto const size = boost::asio::async_read(socket, boost::asio::buffer(buf), yield[ec]);
if (ec && ec != boost::asio::error::eof) {
   throw boost::system::system_error{ec};
}

// 何か有益な処理
// ...

if (ec && ec == boost::asio::error::eof) {
    socket.close();
    return;
}

Boost.Test で Test Suite と Test Case の Fixture の併用

Boost.Test では,Test Case に対して Fixture を設定すると,Test Suite レベルの Fixture がその Test Case に対して使用されなくなります. よって,以下のようなコードはコンパイルエラーになってしまいます.

#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE fixture_test
#include <boost/test/unit_test.hpp>
#include <iostream>

struct suite_fixture {
    suite_fixture() : suite_fix{1}
    { std::cout << __func__ << std::endl; }
    ~suite_fixture()
    { std::cout << __func__ << std::endl; }
    int suite_fix;
};

struct case_fixture {
    case_fixture() : case_fix{2}
    { std::cout << __func__ << std::endl; }
    ~case_fixture()
    { std::cout << __func__ << std::endl; }
    int case_fix;
};

// set test suite level fixture
BOOST_FIXTURE_TEST_SUITE(suite1, suite_fixture)

BOOST_AUTO_TEST_CASE(case1)
{
    BOOST_CHECK_EQUAL(1, suite_fix); // ok
}

// set test case fixture
BOOST_FIXTURE_TEST_CASE(case2, case_fixture)
{
    BOOST_CHECK_EQUAL(1, suite_fix); // compile error
    BOOST_CHECK_EQUAL(2, case_fix);
}

BOOST_AUTO_TEST_SUITE_END()

Boost.Test の Fixture の仕組みは,各 Test Case は Fixture のクラスを指定された場合は,そのクラスから派生し,そうでない場合はデフォルトのクラス,つまり,Test Suite で指定した Fixture のクラスから派生するといったものになっています.

なので,Test Case に渡すクラスを,指定したい Fixture クラスとデフォルトのクラスのリストにしてあげれば,多重継承になって両方の Fixture を利用できそうです. 以下がそのコードと出力になります.

ソースコード

#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE fixture_test
#include <boost/test/unit_test.hpp>
#include <boost/preprocessor/tuple/enum.hpp>
#include <iostream>

// クラス F とデフォルトの Fixture クラスのリストを BOOST_FIXTURE_TEST_CASE に指定
#define CANARD_FIXTURE_TEST_CASE(test_name, F) \
    BOOST_FIXTURE_TEST_CASE(test_name, BOOST_PP_TUPLE_ENUM(2, (BOOST_AUTO_TEST_CASE_FIXTURE, F)))

struct suite_fixture {
    suite_fixture() : suite_fix{1}
    { std::cout << __func__ << std::endl; }
    ~suite_fixture()
    { std::cout << __func__ << std::endl; }
    int suite_fix;
};

struct case_fixture {
    case_fixture() : case_fix{2}
    { std::cout << __func__ << std::endl; }
    ~case_fixture()
    { std::cout << __func__ << std::endl; }
    int case_fix;
};

BOOST_FIXTURE_TEST_SUITE(suite1, suite_fixture)

CANARD_FIXTURE_TEST_CASE(case2, case_fixture)
{
    BOOST_CHECK_EQUAL(1, suite_fix); // ok
    BOOST_CHECK_EQUAL(2, case_fix);
}

BOOST_AUTO_TEST_SUITE_END()

出力

!!!*** No errors detected!!!
Running 1 test case...
suite_fixture
case_fixture
~case_fixture
~suite_fixture