あめだまふぁくとりー

Boost.Graphとかできますん

Asio の coroutine で明示的に yield する

実行時間がかかる処理 A を coroutine の中で行うと, その coroutine と同じ strand 内の処理は処理 A が完了するまで待たされてしまいます.

io_service::post または strand::post を使用することで, 処理 A の合間で明示的に coroutine を切り替えることができます (dispatch は使用できません. 必ず post を使用してください).

#include <ctime>
#include <chrono>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>

using namespace boost;

void heavy_task()
{
    std::this_thread::sleep_for(std::chrono::seconds{1});
}

auto print_now(char const* str)
    -> std::ostream&
{
    auto const now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    return std::cout << str << " " << std::ctime(&now);
}

int main()
{
    asio::io_service io_service{};

    auto strand = asio::io_service::strand{io_service};

    // 実行時間のかかる処理 A
    asio::spawn(strand, [&](asio::yield_context yield) {
        print_now("start heavy_task");
        for (auto i = 0; i < 5; ++i) {
            io_service.post(yield); // ★ ここで切り替え
            heavy_task();
            io_service.post(yield); // ★ ここでも切り替え
        }
        print_now("finish heavy_task");
    });

    // 処理 A と同じ strand 内の処理
    asio::spawn(strand, [&](asio::yield_context yield) {
        asio::steady_timer timer{io_service};
        for (auto i = 0; i < 4; ++i) {
            timer.expires_from_now(std::chrono::seconds{1});
            timer.async_wait(yield);
            print_now("tick tack...    ");
        }
    });

    io_service.run();
}

明示的に切り替えた場合の実行結果

start heavy_task Tue Dec 23 11:21:13 2014
tick tack...     Tue Dec 23 11:21:14 2014
tick tack...     Tue Dec 23 11:21:15 2014
tick tack...     Tue Dec 23 11:21:16 2014
tick tack...     Tue Dec 23 11:21:17 2014
finish heavy_task Tue Dec 23 11:21:18 2014

切り替えない場合の実行結果

start heavy_task Tue Dec 23 11:32:17 2014
finish heavy_task Tue Dec 23 11:32:22 2014
tick tack...     Tue Dec 23 11:32:23 2014
tick tack...     Tue Dec 23 11:32:24 2014
tick tack...     Tue Dec 23 11:32:25 2014
tick tack...     Tue Dec 23 11:32:26 2014

ただし, スケジューリングのタイミングがシビアなので, 実行時間がかかるものは別スレッドで実行した方がいいでしょう.

boost::asio::async_write / read の限界

asio::async_writeasio::async_read に難癖をつけてみます.

stream のコンテキストとハンドラのコンテキストが分離されていない

socket への書き込み (読み込み) 処理は sock_strand 内で実行し, その書き込み (読み込み) 完了のハンドラは handler_strand 内で実行したい場合, どのようにコードを書けばいいでしょうか?

素直に書いてみると以下の様になります.

sock_strand.dispatch([=, &sock]{
    boost::asio::async_write(sock, buffers
            , sock_strand.wrap(handler_strand.wrap(handler));

残念ながらこれは間違いです. 正解はこうなります.

sock_strand.dispatch([=, &sock]{
    boost::asio::async_write(sock, buffers
            , sock_strand.wrap([=](auto const& ec, auto bytes)
    {
        handler_strand.post(bind(handler, ec, bytes));
    }));

前者がダメな理由は async_write / read の内部で使用されるハンドラが最終的に handler_strand 内で呼ばれるため, sock複数のスレッドから操作される可能性があるからです.

asio::async_write / read は対象とする stream とハンドラのコンテキストを上手く分離できません.

なので上記の正解の例でも, ハンドラが呼ばれる際に sock_strand を一度経由するオーバヘッドが発生してしまいます.

内部で例外が起きた場合ハンドラが呼ばれない

asio::async_write / read は内部で複数回 async_write / read_some メンバ関数を呼ぶ可能性があります.

ここで一つ疑問が浮かび上がります.

二回目以降の async_write / read_some メンバ関数の呼び出しで例外が投げられた場合でも, handler は呼ばれるのでしょうか?

実はこの場合, handler は呼ばれません. なのでいくつデータを読み書きしたのかがわからなくなってしまいます.

実際, 例外が投げられそうなのはメモリの allocate に失敗したときぐらいに見えるのであまり心配する必要はないとは思います.

consuming_buffers が Const / MutableBufferSequence の条件を満たさない

consuming_buffers というのは asio::async_write / read の内部で使用されている送信 / 受信データのラッパーです.

Const / MutableBufferSequence::const_iterator は bidirectional iterator でないといけないと, ドキュメントに記載されているのですが, consuming_buffers::const_iterator は forwarding iterator の条件しか満たしていません.

そもそもなんで bidirectional iterator が要求されているんでしょう?

連続で async_write するまでの道のり (その 3)

単一の socket で連続して async_write するの 3 回目で, 最終回です.

前回の記事では strand を使用方法について見ました.

本記事では作成する queueing socket の wrapping callback に strand を適用する方法を見ていきます.

wrapping callback の実装

queueing socket の async_write_some は以下のように, 送信待ちキューが空の場合に async_write を呼ぶ実装になります.

void async_write_some(Buffers const& buffers, Callback callback)
{
    auto is_empty = queue_.empty();     // ★A

    ... // buffers と callback を持つキューの要素を生成
    queue_.push(生成したキューの要素);

    if (is_empty) {
        async_write(socket_, buffers,
            wrapping_callback(callback, queue_));
    }
}

特に難しいところはないと思います.

wrapping callback の実装の概略も見てみます. こっちは逆に送信待ちキューが空でなければ, async_write を呼びます.

struct wrapping_callback
{
    void operator()(error_code const& ec, std::size_t bytes)
    {
        ... // queue から送信済みの buffer と callback を取り出す処理 // ★B
        wrapped_callback(ec, bytes);

        if (not queue_.empty()) {
            ... // queue から次の buffer と callback を参照する処理
            async_write(socket_, next_buffers, 
                wrapping_callback(next_callback, queue_));
        }
    }

    Callback wrapped_callback; // write 中のデータに対応する callback
    Socket& socket_;           // ラップされたソケット
    Queue& queue_;             // write 待ちのデータと callback のキュー
};

こっちも特に難しい点はないと思います.

問題はマルチスレッドでこれらを使用した場合です.

async_write_somewrapping_callback複数のスレッドから呼び出されるケースを考えてみます.

今, キューの中には送信中のデータしかないとします.

  1. このとき, async_write_some を呼び出し, 上記の ★A の処理が行われると, キューが空でないと判定されます.

  2. ここで wrapping_callback が呼び出され, ★B の処理でキューを空にすると wrapping_callback の中ではデータの送信は行われません.

  3. そして, async_write_some に戻って ★A 以降の処理を実行しても, is_empty は false になるため, こちらでもデータの送信は行われません.

これにより, 送信されないデータがキューに残り続けることになります.

これケース以外でも, read と write が平行して呼び出されるケース等があります. こららの問題に対応するために, queueing socket に strand を適用する方法を考える必要があります.

strand の適用 (微妙な解)

以下の三つが, strand に関連づけが必要な関数です.

  • queueing_socket の async_write_some.
  • queueing_socket の async_read_some.
  • wrapping callback.

最初の二つは問題ありません. 以下のようにユーザが strand::dispatch 等を経由して呼び出してやればいいだけです.

strand.dispatch([&] {
    queueing_sock.async_write_some(...);
});

問題は三つ目です. wrapping callback は queueing socket の内部で使用されているので, queueing socket の外にある strand に直接触れることはできません.

最も簡単な解は queueing socket の内部に strand を持つことです.

class queueing_socket {
public:
    void async_write_some(...) {
        this->strand_.dispatch([=] {
            // wrapping callback を strand に関連づける
            async_write(stream_, buffers
                , this->strand_.wrap(wrapping_callback{callback}));
        });
    }

private:
    Straem stream_;
    io_service::strand strand_; // strand を 内部に持つ.
};

こうすることで, 安全にデータの書き込みが可能になる上, queueing socket のユーザは strand を意識する必要がなくなります.

しかし, この実装は strand と queueing socket との結合度が大きいため, 以下にあげる欠点があります.

  • シングルスレッドの場合も, strand を使用によるオーバヘッドがある.
  • strand 以外のコンテキストに関連づけて使用できない.
  • queueing socket の外で使用している strand を使用してデータを書き込むことができない.

三つ目については, 外部で使用する strand を queueing socket のコンストラクタ経由で渡して, それで内部の strand を初期化するといった方法をとれば解決はできます.

しかし, 一つ目はゼロ・オーバヘッド原則に違反し, 二つ目は拡張性に問題が出ます. このため, 別の解決策が必要になります.

asio::async_write による解決策

asio::async_write は内部で複数書き込み関数を呼び出すため, 同様の問題があるはずです. そこでこの関数はどのようにしてこの問題に対処しているのか覗いてみます.

stream.async_write_some(buffers, inner_callback);

内部用の callback を渡す際には特にラッピング等はしていません.

これでどういうことか不思議に思いましたが, 仕掛けは別のところにあります.

asio_handler_invoke

通常, Boost.Asio の内部では直接 callback を呼び出すことはせず, asio_handler_invoke という関数を経由して callback を呼び出します.

この関数のシグネチャを以下に示します.

void asio_handler_invoke(Function& function, Callback* callback);

callback はユーザが指定した callback へのポインタ, function はその callback に実引数を bind して生成した関数オブジェクトです.

この関数のデフォルトの実装は単に function を呼び出すだけですが, 第二引数の Callback をオーバロードすることで, デフォルト以外の実装を使用することができます.

strand::wrap でラッピングされた関数オブジェクトに対しては以下の様に定義されています.

void asio_handler_invoke(
        Function& function, WrappedCallback* callback)
{
    callback->strand.dispatch(function);
}

callback が strand でラッピングされている場合, その callback を strand を経由して呼び出すわけです.

asio::async_write の内部で使用している callback に対しても, asio_handler_invoke が定義されています.

void asio_handler_invoke(
        Function& function, InnerCallback* innter_callback)
{
    asio_handler_invoke(
          function  // async_write 内部で使用する callback に引数を bind したもの
        , &innter_callback->callback_);  // ユーザが指定した callback
}

callback->callback_ はユーザが指定した callback なので, もしこれが strand でラッピングされていれば, 上述の通り内部で使用する callback は strand::dispatch 経由で呼び出されることになります.

つまり, async_write の内部で使用する callback は, ユーザが指定した callback と同じコンテキストで呼び出されるようになっているのです.

まとめ

結論として async_write と同様に asio_handler_invoke を用意してあげればいいということになります.

void asio_handler_invoke(
        Function& function, wrapping_callback* callback)
{
    asio_handler_invoke(function, callback->wrapped_callback);
}

これによって, strand を使用しない場合のオーバヘッドを回避し, strand 以外のコンテキストでも queueing socket を使用することができるようになりました.

実際には例外安全等他にも考慮しなければいけない点は多々ありますが, とりあえず連続で async_write するまでの道のりはこれにて終了です.

実際に実装したものは GitHub で公開しています.

allium/queueing_write_stream.hpp at of10 · amedama41/allium · GitHub

boost::asio::asio_handler_is_continuation の効果

asio_handler_is_continuation の効果がいまいち良く分からなかったので調べてみました.

ある handler について, asio_handler_is_continuation の結果が true の場合, 処理が以下の様に少し変わります.

  • handler はスレッド固有の private queue に一度登録され, 後で全体の queue に登録される.
  • private queue から全体の queue に handler を移動した際, handler 待ちの他のスレッドを起こさない.

一つ目の結果により, handler を登録した関数が完了するまで, その handler が実行されることはありません (io_service::dispatch で登録した場合等は除きます).

二つ目の結果により, 登録した handler を同じスレッドで実行する確率が上昇します.

これらを以下は例で見てみます.

#include <iostream>
#include <chrono>
#include <thread>
#include <boost/asio.hpp>

struct my_handler
{
    void operator()() const
    {
        std::cout << "my_handler: " << std::this_thread::get_id() << std::endl;
    }

    friend auto asio_handler_is_continuation(my_handler* h)
        -> bool
    {
        return h->is_continuation;
    }

    bool is_continuation;
};

int main(int argc, char* argv[])
{
    boost::asio::io_service io_service{};

    io_service.post([&]
    {
        // handler を登録して 1 秒間 sleep
        io_service.post(my_handler{argc != 1});
        std::this_thread::sleep_for(std::chrono::seconds{1});

        std::cout << "handler1: " << std::this_thread::get_id() << std::endl;
    });

    auto t1 = std::thread{[&]{
        io_service.run();
    }};

    auto t2 = std::thread{[&]{
        io_service.run();
    }};

    t2.join();
    t1.join();
}
  • asio_handler_is_continuation の結果が false の場合:
% ./a.out
my_handler: 0x10dd9c000
handler1: 0x10dd19000

my_handlerio_service::post で登録されると, 登録したスレッドは別のスレッドで即座に実行されています.

  • asio_handler_is_continuation の結果が true の場合:
% ./a.out continuation
handler1: 0x1040bc000
my_handler: 0x1040bc000

my_handler を登録した関数は 1 秒間 sleep しているにも関わらず, その関数が終了するまで my_handler は実行されていません.

また, my_handler は, それを登録したスレッドと同一のスレッドで呼び出されています.

連続で async_write するまでの道のり (その 2)

単一の socket で連続して async_write するの 2 回目です.

前回の記事 では, 普通に socket に対して非同期送信関数を連続で呼び出す際の問題点について整理しました.

前回の内容を踏まえると, 以下のような動作をする async_write_some メソッドを持つ socket のラッパを作成してあげれば良さそうです.

  • 送信中のデータが既にある場合は, 送信データと callback を送信待ちキューに積む.
  • 送信中のデータがない場合は, 引数で指定された callback を wrapping callback でラッピングし, async_write を呼び出す. この wrapping callback が呼び出された場合, 以下の処理を実行します.

    1. 送信したデータに対応する callback を呼び出す.
    2. キューに送信待ちデータが存在する場合, wrapping callback を async_write の引数に指定してそのデータ送信を行う.

このように, wrapping callback の中で次のデータの送信を行うことで, 既に送信中のデータの送信完了を待ってから, 次のデータを送信します.

これを実装するだけなら, それほど対して手間はかかりませんが, 他に考慮すべき点があります. それは, マルチスレッドで socket を使用する場合です. 本記事ではその点について, Boost.Asio ではどのようにマルチスレッドを扱っているのかを見ていきます.

socket のマルチスレッドでの使用

Boost.Asio の socket オブジェクトを含むだいたいの IO Object はスレッドセーフではありません. よって, ここで作成する queueing socket もスレッドアンセーフとして, マルチスレッドでの使用は無視して OK... という訳にはいきません.

通常, Boost.Asio では socket 等の IO Object をマルチスレッドで扱う場合は, strand と呼ばれる仕組みを用いて複数のスレッドからの同時アクセスを防止します.

以下で strand の使用方法を簡単に見ていきます.

strand の概要

strand はオブジェクトであり, ある strand オブジェクトに関連する callback は必ず直列に実行されることを保証します.

strand の仕組みを簡単に説明すると, strand は callback リストを持っていて, それらを実行する strand 用の callback (以降 strand callback と呼ぶことにします) を一つだけ io_service に登録します. この strand callback が io_service から呼び出されると, callback リスト中のすべての callback を呼び出します. 各 strand オブジェクトの strand callback は最大一つしか io_service 中には存在しないので, その strand に関連づけされた callback は複数のスレッドから同時に実行されることはありません.

callback リストへ callback を追加するには strand::post() または strand::dispatch() を使用します. しかし, post または dispatch 経由で追加する callback は引数なしで呼び出されます. そのため, 引数を必要とする callback を strand に関連づけには別の方法を使用します.

引数を必要とする callback を strand オブジェクトに関連づけるには, strand::wrap() でその callback をラッピングします. ラッピング後の callback を引数付きで呼び出すと, ラッピング前の callback にそれらの引数を bind させた関数オブジェクトが, strand::dispatch() を用いて callback リストに追加されます.

strand の使用例

以下は strand を使用したサンプルコードです.

#include <iostream>
#include <functional>
#include <thread>
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>

namespace asio = boost::asio;
using asio::ip::tcp;

std::array<char, 8> send_buffer{};
std::array<char, 8> recv_buffer{};

void handle_write(tcp::socket* socket)
{
    socket->async_write_some(asio::buffer(send_buffer)
            , [](boost::system::error_code, std::size_t)     // callback A
    {
        std::cout << "finish writing" << std::endl;
    });
}

void handle_read(tcp::socket* socket)
{
    socket->async_read_some(asio::buffer(recv_buffer)
            , [](boost::system::error_code, std::size_t)     // callback B
    {
        std::cout << "finish reading" << std::endl;
    });
}

void handle_connect(tcp::socket* socket, asio::io_service::strand strand)
{
    socket->async_write_some(asio::buffer(send_buffer)
            , strand.wrap(std::bind(handle_write, socket)));

    socket->async_read_some(asio::buffer(recv_buffer)
            , strand.wrap(std::bind(handle_read, socket)));
}

void start_connect(tcp::socket* socket, asio::io_service::strand strand)
{
    socket->async_connect(
          tcp::endpoint{asio::ip::address::from_string("127.0.0.1"), 35555}
        , strand.wrap(std::bind(&handle_connect, socket, strand)));
}

int main()
{
    asio::io_service io_service{};

    auto strand = asio::io_service::strand{io_service};
    auto socket = tcp::socket{io_service};

    strand.post(std::bind(start_connect, &socket, strand));

    auto t1 = std::thread([&]{ io_service.run(); });
    auto t2 = std::thread([&]{ io_service.run(); });

    t2.join();
    t1.join();
}

このサンプルコードでは, 二つのスレッド t1, t2 を起動し, データの送受信を 2 度行っています. socket に対する操作の関係をグラフで表現すると下図のようになります.

f:id:amedama41:20140920090432p:plain

このグラフの各ノードがいずれかのスレッドで実行されます (ノード間のリンクは実行順を示しているわけではありません). strand を使用しない場合だと, "送信" と "受信" が同時に実行される可能性があり, socket はスレッドセーフではないのでこのような使用方法はできません. また, "送信/受信" の最中に "送信" が実行される可能性もあります.

そのため, これらの処理を strand::wrap() を用いることで strand への関連づけを行っています.

    socket->async_write_some(asio::buffer(send_buffer)
            , strand.wrap(std::bind(handle_write, socket)));

    socket->async_read_some(asio::buffer(recv_buffer)
            , strand.wrap(std::bind(handle_read, socket)));

    /*....*/

    socket->async_connect(
          tcp::endpoint{asio::ip::address::from_string("127.0.0.1"), 35555}
        , strand.wrap(std::bind(&handle_connect, socket, strand)));

これにより, "送信/受信", "送信", "受信" が並列に実行されることはありません. "送信/受信" で他二つの処理を初期化しているので, "送信/受信" -> "送信" -> "受信", または "送信/受信" -> "受信" -> "送信" のいずれかの順で実行されます.

また, 上記の例では strand::post() を使用することで "接続" も並列には実行させないようにしています.

    strand.post(std::bind(start_connect, &socket, strand));

この strand への関連付けは上記の例では必要ない (はず) ですが, 非同期処理の初期化がどのコンテキストで実行されているか不明な場合は, 初期化処理も strand に関連づける必要があります.

callback A, callback B とコメントを付けた callback に関しては, std::cout はスレッドセーフのため同時に実行されても問題ないので strand への関連づけは行っていません (最終的な出力結果については気にしません).

ここまでのまとめ

  • マルチスレッドで使用する場合は strand を使用する.

次回は, queueing socket の wrapping callback と strand をどう関連づけするのか見ていきます.

(続きます)

連続で async_write するまでの道のり (その 1)

単一の socket を使用して, 一つ目の送信処理の完了を待たずに二つ目の送信処理を実行したいといった場合は多々あります. レスポンスの順番がリクエストの順番とは異なる非同期プロトコルを使用または実装する場合には, そういったケースに特に遭遇します.

Boost.Asio を使用してそのような実装をする場合, どのようにすれば良いのかは明確になっていないように思われます. 同じ趣旨の質問が stackoverflow でも見つかりましたが, 明確な回答はありません.

c++ - Multiple writes in boost::asio to a single socket - Stack Overflow

本記事では, この問題に対する解決策について考えます.

一般的かつ誤りのあるコード

まず, stackoverflow でも例に挙がっているコードについて, 何が問題なのかを整理します.

#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/error_code.hpp>

int main()
{
    using namespace boost;
    namespace ip = asio::ip;
    using tcp = ip::tcp;

    asio::io_service io_service{};
    auto const endpoint = tcp::endpoint{ip::address::from_string("127.0.0.1"), 12345};

    // server
    asio::spawn(io_service, [&](asio::yield_context yield) {
        auto acceptor = tcp::acceptor{io_service, endpoint};
        auto sock = tcp::socket{io_service};
        acceptor.async_accept(sock, yield);
        auto ec = system::error_code{};
        while (!ec) {
            auto buffer = std::array<char, 128>{};
            auto size = sock.async_read_some(asio::buffer(buffer), yield[ec]);
            std::cout.write(buffer.data(), size) << std::endl;
        }
    });
    io_service.run_one(); // start accept

    auto sock = tcp::socket{io_service};
    sock.connect(endpoint);

    sock.set_option(tcp::acceptor::send_buffer_size{128});

    auto buffer1 = std::vector<char>(256, 'A');
    auto buffer2 = std::vector<char>(256, 'B');
    auto counter = 0;
    asio::async_write(sock, asio::buffer(buffer1)
            , [&](system::error_code, std::size_t) {
        if (++counter == 2) sock.close();
    });
    asio::async_write(sock, asio::buffer(buffer2)
            , [&](system::error_code, std::size_t) {
        if (++counter == 2) sock.close();
    });

    io_service.run();
}

asio::spawn している部分はデータ受信用のサーバを用意しているだけなので読み飛ばして構いません.

上記の例では, async_write を二度呼び出しています. 一度目の async_write で 256 個の文字 A を送信し, 二度目の async_write で同じ数の文字 B を送信しています. socket のメンバである async_write_some を使用せずに async_write を使用するのは short write を避けるためです. これにより, 256 個の A と 256 個の B すべてが送信されることを保証します.

しかし, 実際に実行してみると, 以下のよう A と B が 128 Byte ずつ交互に受信していることがわかります.

AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB

A が 256 個表示され, 次に B が 256 個表示されるのが期待だったのですが, なぜこのような結果になってしまったのでしょうか.

この原因を理解するため, async_write の実装を少し見てます. その概要を以下に示します.

  1. async_write は受け取った callback を wrap して, それを引数に指定して socket の async_write_some メンバ関数を呼び出します. 以降 wrapping 前の callback を wrapped callback, 後の callback を wrapping callback と呼ぶことにします.

    上記の例では, wrapped callback は lambda 式で生成された関数オブジェクトです.

  2. データが一部でも送信されたら wrapping callback が呼び出されます.

  3. wrapping callback の中で送信したデータのバイト数をチェックして条件を満たしていたら, wrapped callback を呼び出します. 条件を見たさない場合は再び wrapping callback を引数に指定して async_write_some メンバ関数を呼び出します.

    上記の例では, 256 バイト書き込むことが wrapped callback が呼び出される条件になります.

このように async_write では複数回 wrapped callback が呼び出される可能性があります.

さらに, 上記の例では送信バッファサイズを 128 Byte に設定しているので,各 async_write でそれぞれ二回ずつ wrapping callback が呼び出されることになります. 複数の非同期処理を実行する場合, それらの callback が呼び出される順番は基本的には io_service (のサービス) に登録した順番です. よって, 上記の例は以下の流れで呼び出しが行われます.

   write(128 個の A) -> io_service に A の wrapping callback を登録
-> write(128 個の B) -> io_service に B の wrapping callback を登録
-> A の wrapped callback 呼び出し -> write(128 個の A) -> A の wrapping callback を登録
-> B の wrapped callback 呼び出し -> write(128 個の B) -> B の wrapping callback を登録
-> A の wrapped callback 呼び出し -> A のユーザ wrapped callback 呼び出し
-> B の wrapped callback 呼び出し -> B のユーザ wrapped callback 呼び出し

この例では, socket の送信バッファサイズを 128 Byte に絞ることで複数回送信関数が呼ばれる状況を意図的に作りましたが, このような状況は TCP のウィンドウサイズが送信するデータのサイズを下回った場合でも発生します. つまり, これは現実的な問題と考えることができます.

単純に送信関数を続けて呼び出すのは誤りです. 期待通りにデータを送信するには, 一つ目のデータ送信と二つ目のデータ送信は順序関係を持つ必要があります. 次章では, この順序関係を実現する方法を見てみます.

データ送信間の順序付け

本章ではデータ送信を順序付けするための三つの方法について見てみます.

案 1. callback の中で二つ目の送信処理を実行する

asio::async_write(socket, asio::buffer(buffer1), [&](system::error_code, std::size_t) {
    asio::async_write(socket, asio::buffer(buffer2), handler);
    // 一つ目の async_write に対する完了処理
});

この方法は, 一つ目の送信の callback 内で二つ目の送信を行うことで, 順序関係を持たせています. このような callback を用いて非同期処理を連鎖させる方法は一般的な方法です.

しかし, この方法は一つ目の送信を行う時点で, この後にさらに送信を行うことが分かっているときしか適用できません. HTTP のようにリクエストとレスポンスを交互に行うようなケースには適用できますが, 相手がこちらの応答を待たずに不定期にメッセージを送るようなケースには適用できません.

また, callback 内でデータの送信しかしないのであれば, 下記のように同時にデータを送信した方が I/O の回数を減らせて効率的 (のはず) です.

asio::async_write(socket, std::array<asio::const_buffer, 2>{
    {asio::buffer(buffer1), asio::buffer(buffer2)}
}, handler);

案 2. coroutine を使用する

asio::spawn(io_service, [&](asio::yield_context yield) {
   asio::async_write(socket, asio::buffer(buffer1), yield);
   asio::async_write(socket, asio::buffer(buffer2), yield);
});

coroutine を使用することで, 一つ目の送信完了を待つことができるようになります.

しかしながら, この方法は本質的には案 1 と変わりません. そのため同様の問題があります. また, 非同期処理実行中は spawn したコンテキストでは他の処理が一切実行できません. 複数の socket に対し一斉に送信を行うといったことができないので, 適用できるケースはより絞られることになります.

案 3. buffered_write_stream を使用する

buffered_write_stream はその名前の通り, 送信データをバッファリングし, バッファリングしたデータを一度に送信してくれます. 名前からして本命な感じがしますが, 今回の問題に対してこれを使用するのは誤りです.

以下のコードを見ながらその問題点を説明していきます.

#include <iostream>
#include <vector>
#include <boost/asio/buffered_write_stream.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/error_code.hpp>

int main()
{
    using namespace boost;
    namespace ip = asio::ip;
    using tcp = ip::tcp;

    asio::io_service io_service{};
    auto const endpoint = tcp::endpoint{ip::address::from_string("127.0.0.1"), 12345};

    // server
    asio::spawn(io_service, [&](asio::yield_context yield) {
        auto acceptor = tcp::acceptor{io_service, endpoint};
        auto sock = tcp::socket{io_service};
        acceptor.async_accept(sock, yield);
        std::cout << "accept" << std::endl;
        auto ec = system::error_code{};
        while (!ec) {
            auto buffer = std::array<char, 128>{};
            auto size = sock.async_read_some(asio::buffer(buffer), yield[ec]);
            std::cout.write(buffer.data(), size) << std::endl;
        }
    });
    io_service.run_one(); // start accept

    auto sock = tcp::socket{io_service};
    sock.connect(endpoint);
    sock.set_option(tcp::acceptor::send_buffer_size{128});

    auto buffer1 = std::vector<char>(256, 'A');
    auto buffer2 = std::vector<char>(256, 'B');
    auto counter = 0;

    // buffered_write_stream の作成 / 使用
    asio::buffered_write_stream<tcp::socket&> bsock{sock};
    bsock.async_write_some(asio::buffer(buffer1)
            , [&](system::error_code ec, std::size_t bytes_transferred) {
        if (++counter == 2) {
            bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
                sock.close();
            });
        }
    });
    bsock.async_write_some(asio::buffer(buffer2)
            , [&](system::error_code ec, std::size_t bytes_transferred) {
        if (++counter == 2) {
            bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
                sock.close();
            });
        }
    });

    io_service.run();
}

前半部分は include しているファイルを除けば, 一番最初に示した例と同じです. 実際にデータの送信を行っている部分を見ていきます.

bsock.async_write_some(asio::buffer(buffer1)
        , [&](system::error_code ec, std::size_t bytes_transferred) {
    if (++counter == 2) {
        bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
            sock.close();
        });
    }
});

buffered_write_stream には async_write_some メンバ関数async_flush メンバ関数が存在します. 後者は名前の通り, バッファリングしたデータを flush, つまりここでは送信します. asio::async_write を使用していないのは, async_flush の実装の中で使用されているためです.

async_write_some メソッドbuffered_write_stream の中で管理しているバッファの空き容量に応じて挙動が少し変化します. バッファに空きがある場合は, 送信データをバッファに書き込みます. 逆にバッファに空きがない場合は, 一度 async_flush を行ってからバッファに送信データを書き込みます.

このことらから送信データは, バッファに空きがなくなるまで, または明示的に async_flush をするまで送信されないことが分かります. そのため, データを即時に送信したい場合は上記の例の様に, async_flush を呼び出します.

ちなみに, 以下のように書くのは誤りです.

bsock.async_write_some(asio::buffer(buffer1), handler1);
bsock.async_write_some(asio::buffer(buffer1), handler2);
bsock.async_flush(handler3);

このコードはバッファにデータを書き込む前に async_flush を行うことになるので, 何も送信されません.

サンプルコードで示した callback では送信を行った回数を数え, async_flush が一度だけ呼ばれる様にしています. これは async_flush は一度に複数回呼ばれてはいけないためです. もし async_flush を一度に複数回呼び出すと同一のデータが複数回送信されてしまいます.

さらに, async_flush の呼び出しから, その callback の呼び出しの間で async_write_some を呼び出すのも安全ではありません (おそらく). 結局, buffered_write_stream を使用してもある送信処理が別の送信処理についての知識がないと効率的かつ安全な送信は困難です.

また, コードから読み取るのは難しいですが, async_write_some の callback はデータが送信されたタイミングではなく, バッファに書き込まれたタイミングで呼ばれるという動きになっています. 実際に送信されたタイミングと, callback が呼ばれるタイミングが異なるため, この挙動は受け入れられない場合があります.

例えば, TCP のハーフクローズを行う際に問題となる可能性があります. async_write_some の callback でハーフクローズを行うと, 送信前に socket の書き込み側を閉じてしまいデータの送信に失敗してしまいます.

最後にオブジェクトの寿命についての問題についても言及しておきます. 上記のサンプルコード中の buffered_write_stream の初期化は以下のようになっています.

asio::buffered_write_stream<tcp::socket&> bsock{sock};

このコードを見ると buffered_write_stream のテンプレート引数には tcp::socket への参照を渡しています. これは buffered_write_streamコンストラクタ中でテンプレート引数の型のオブジェクトのコピーを行うためです. socket はコピーできないため, このコピーを回避するため参照を渡しています. これにより, buffered_write_stream と socket の両方のオブジェクトを管理する必要性が出てきます.

おそらく buffered_write_stream 自体, socket と組み合わせることを想定していないのだと思います.

ここまでのまとめ

ここまでのことを整理すると連続してデータ送信を行うには以下の機能を持った socket が必要だと考えられます.

  • async_write 時に送信キューが空の場合は即時にデータが送信されること.
  • async_write 時に送信キューが空でない場合は, データはキューイングされること.
  • データが実際に送信されたタイミングで callback が呼ばれること.
  • 送信完了時にキューにデータがある場合は, 自動的にそれらのデータが送信されること.

ここまで見えれば, 後はこれらの機能を持った queueing socket を新たに実装すればいいだけと言いたいところですが, もう少し話は続きます.

(続きます)

標準入出力を Boost.Asio で非同期に行う

標準入出力を Boost.Asio で非同期に行いたい場合は, posix::stream_descriptor を使用するといいみたいです.

使用例

#include <chrono>
#include <iostream>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/streambuf.hpp>
#include <unistd.h>

int main()
{
    namespace asio = boost::asio;

    asio::io_service io_service{};

    auto input = asio::posix::stream_descriptor{io_service, ::dup(STDIN_FILENO)};

    asio::spawn(io_service, [&](auto yield) {
        asio::steady_timer timer{io_service};
        while (not io_service.stopped()) {
            timer.expires_from_now(std::chrono::seconds{1});
            timer.async_wait(yield);
            std::cout << "ticktack..." << std::endl;
        }
    });

    asio::streambuf input_buffer{};
    asio::async_read_until(input, input_buffer, '\n', [&](auto& error, auto bytes_transferred) {
        std::cout << "input data: " << &input_buffer << std::endl;
        io_service.stop();
    });

    io_service.run();
}

出力結果

ticktack...
ticktack...
ticktack...
ticktack...
hgoe
input data: hgoe

buffered_read_stream とかと組み合わせると, 入出力のバッファリングもできるかも知れません (調べていないのでよく分かりません).