あめだまふぁくとりー

Boost.Graphとかできますん

連続で async_write するまでの道のり (その 1)

単一の socket を使用して, 一つ目の送信処理の完了を待たずに二つ目の送信処理を実行したいといった場合は多々あります. レスポンスの順番がリクエストの順番とは異なる非同期プロトコルを使用または実装する場合には, そういったケースに特に遭遇します.

Boost.Asio を使用してそのような実装をする場合, どのようにすれば良いのかは明確になっていないように思われます. 同じ趣旨の質問が stackoverflow でも見つかりましたが, 明確な回答はありません.

c++ - Multiple writes in boost::asio to a single socket - Stack Overflow

本記事では, この問題に対する解決策について考えます.

一般的かつ誤りのあるコード

まず, stackoverflow でも例に挙がっているコードについて, 何が問題なのかを整理します.

#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/error_code.hpp>

int main()
{
    using namespace boost;
    namespace ip = asio::ip;
    using tcp = ip::tcp;

    asio::io_service io_service{};
    auto const endpoint = tcp::endpoint{ip::address::from_string("127.0.0.1"), 12345};

    // server
    asio::spawn(io_service, [&](asio::yield_context yield) {
        auto acceptor = tcp::acceptor{io_service, endpoint};
        auto sock = tcp::socket{io_service};
        acceptor.async_accept(sock, yield);
        auto ec = system::error_code{};
        while (!ec) {
            auto buffer = std::array<char, 128>{};
            auto size = sock.async_read_some(asio::buffer(buffer), yield[ec]);
            std::cout.write(buffer.data(), size) << std::endl;
        }
    });
    io_service.run_one(); // start accept

    auto sock = tcp::socket{io_service};
    sock.connect(endpoint);

    sock.set_option(tcp::acceptor::send_buffer_size{128});

    auto buffer1 = std::vector<char>(256, 'A');
    auto buffer2 = std::vector<char>(256, 'B');
    auto counter = 0;
    asio::async_write(sock, asio::buffer(buffer1)
            , [&](system::error_code, std::size_t) {
        if (++counter == 2) sock.close();
    });
    asio::async_write(sock, asio::buffer(buffer2)
            , [&](system::error_code, std::size_t) {
        if (++counter == 2) sock.close();
    });

    io_service.run();
}

asio::spawn している部分はデータ受信用のサーバを用意しているだけなので読み飛ばして構いません.

上記の例では, async_write を二度呼び出しています. 一度目の async_write で 256 個の文字 A を送信し, 二度目の async_write で同じ数の文字 B を送信しています. socket のメンバである async_write_some を使用せずに async_write を使用するのは short write を避けるためです. これにより, 256 個の A と 256 個の B すべてが送信されることを保証します.

しかし, 実際に実行してみると, 以下のよう A と B が 128 Byte ずつ交互に受信していることがわかります.

AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB

A が 256 個表示され, 次に B が 256 個表示されるのが期待だったのですが, なぜこのような結果になってしまったのでしょうか.

この原因を理解するため, async_write の実装を少し見てます. その概要を以下に示します.

  1. async_write は受け取った callback を wrap して, それを引数に指定して socket の async_write_some メンバ関数を呼び出します. 以降 wrapping 前の callback を wrapped callback, 後の callback を wrapping callback と呼ぶことにします.

    上記の例では, wrapped callback は lambda 式で生成された関数オブジェクトです.

  2. データが一部でも送信されたら wrapping callback が呼び出されます.

  3. wrapping callback の中で送信したデータのバイト数をチェックして条件を満たしていたら, wrapped callback を呼び出します. 条件を見たさない場合は再び wrapping callback を引数に指定して async_write_some メンバ関数を呼び出します.

    上記の例では, 256 バイト書き込むことが wrapped callback が呼び出される条件になります.

このように async_write では複数回 wrapped callback が呼び出される可能性があります.

さらに, 上記の例では送信バッファサイズを 128 Byte に設定しているので,各 async_write でそれぞれ二回ずつ wrapping callback が呼び出されることになります. 複数の非同期処理を実行する場合, それらの callback が呼び出される順番は基本的には io_service (のサービス) に登録した順番です. よって, 上記の例は以下の流れで呼び出しが行われます.

   write(128 個の A) -> io_service に A の wrapping callback を登録
-> write(128 個の B) -> io_service に B の wrapping callback を登録
-> A の wrapped callback 呼び出し -> write(128 個の A) -> A の wrapping callback を登録
-> B の wrapped callback 呼び出し -> write(128 個の B) -> B の wrapping callback を登録
-> A の wrapped callback 呼び出し -> A のユーザ wrapped callback 呼び出し
-> B の wrapped callback 呼び出し -> B のユーザ wrapped callback 呼び出し

この例では, socket の送信バッファサイズを 128 Byte に絞ることで複数回送信関数が呼ばれる状況を意図的に作りましたが, このような状況は TCP のウィンドウサイズが送信するデータのサイズを下回った場合でも発生します. つまり, これは現実的な問題と考えることができます.

単純に送信関数を続けて呼び出すのは誤りです. 期待通りにデータを送信するには, 一つ目のデータ送信と二つ目のデータ送信は順序関係を持つ必要があります. 次章では, この順序関係を実現する方法を見てみます.

データ送信間の順序付け

本章ではデータ送信を順序付けするための三つの方法について見てみます.

案 1. callback の中で二つ目の送信処理を実行する

asio::async_write(socket, asio::buffer(buffer1), [&](system::error_code, std::size_t) {
    asio::async_write(socket, asio::buffer(buffer2), handler);
    // 一つ目の async_write に対する完了処理
});

この方法は, 一つ目の送信の callback 内で二つ目の送信を行うことで, 順序関係を持たせています. このような callback を用いて非同期処理を連鎖させる方法は一般的な方法です.

しかし, この方法は一つ目の送信を行う時点で, この後にさらに送信を行うことが分かっているときしか適用できません. HTTP のようにリクエストとレスポンスを交互に行うようなケースには適用できますが, 相手がこちらの応答を待たずに不定期にメッセージを送るようなケースには適用できません.

また, callback 内でデータの送信しかしないのであれば, 下記のように同時にデータを送信した方が I/O の回数を減らせて効率的 (のはず) です.

asio::async_write(socket, std::array<asio::const_buffer, 2>{
    {asio::buffer(buffer1), asio::buffer(buffer2)}
}, handler);

案 2. coroutine を使用する

asio::spawn(io_service, [&](asio::yield_context yield) {
   asio::async_write(socket, asio::buffer(buffer1), yield);
   asio::async_write(socket, asio::buffer(buffer2), yield);
});

coroutine を使用することで, 一つ目の送信完了を待つことができるようになります.

しかしながら, この方法は本質的には案 1 と変わりません. そのため同様の問題があります. また, 非同期処理実行中は spawn したコンテキストでは他の処理が一切実行できません. 複数の socket に対し一斉に送信を行うといったことができないので, 適用できるケースはより絞られることになります.

案 3. buffered_write_stream を使用する

buffered_write_stream はその名前の通り, 送信データをバッファリングし, バッファリングしたデータを一度に送信してくれます. 名前からして本命な感じがしますが, 今回の問題に対してこれを使用するのは誤りです.

以下のコードを見ながらその問題点を説明していきます.

#include <iostream>
#include <vector>
#include <boost/asio/buffered_write_stream.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/error_code.hpp>

int main()
{
    using namespace boost;
    namespace ip = asio::ip;
    using tcp = ip::tcp;

    asio::io_service io_service{};
    auto const endpoint = tcp::endpoint{ip::address::from_string("127.0.0.1"), 12345};

    // server
    asio::spawn(io_service, [&](asio::yield_context yield) {
        auto acceptor = tcp::acceptor{io_service, endpoint};
        auto sock = tcp::socket{io_service};
        acceptor.async_accept(sock, yield);
        std::cout << "accept" << std::endl;
        auto ec = system::error_code{};
        while (!ec) {
            auto buffer = std::array<char, 128>{};
            auto size = sock.async_read_some(asio::buffer(buffer), yield[ec]);
            std::cout.write(buffer.data(), size) << std::endl;
        }
    });
    io_service.run_one(); // start accept

    auto sock = tcp::socket{io_service};
    sock.connect(endpoint);
    sock.set_option(tcp::acceptor::send_buffer_size{128});

    auto buffer1 = std::vector<char>(256, 'A');
    auto buffer2 = std::vector<char>(256, 'B');
    auto counter = 0;

    // buffered_write_stream の作成 / 使用
    asio::buffered_write_stream<tcp::socket&> bsock{sock};
    bsock.async_write_some(asio::buffer(buffer1)
            , [&](system::error_code ec, std::size_t bytes_transferred) {
        if (++counter == 2) {
            bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
                sock.close();
            });
        }
    });
    bsock.async_write_some(asio::buffer(buffer2)
            , [&](system::error_code ec, std::size_t bytes_transferred) {
        if (++counter == 2) {
            bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
                sock.close();
            });
        }
    });

    io_service.run();
}

前半部分は include しているファイルを除けば, 一番最初に示した例と同じです. 実際にデータの送信を行っている部分を見ていきます.

bsock.async_write_some(asio::buffer(buffer1)
        , [&](system::error_code ec, std::size_t bytes_transferred) {
    if (++counter == 2) {
        bsock.async_flush([&](system::error_code ec, std::size_t bytes_transferred) {
            sock.close();
        });
    }
});

buffered_write_stream には async_write_some メンバ関数async_flush メンバ関数が存在します. 後者は名前の通り, バッファリングしたデータを flush, つまりここでは送信します. asio::async_write を使用していないのは, async_flush の実装の中で使用されているためです.

async_write_some メソッドbuffered_write_stream の中で管理しているバッファの空き容量に応じて挙動が少し変化します. バッファに空きがある場合は, 送信データをバッファに書き込みます. 逆にバッファに空きがない場合は, 一度 async_flush を行ってからバッファに送信データを書き込みます.

このことらから送信データは, バッファに空きがなくなるまで, または明示的に async_flush をするまで送信されないことが分かります. そのため, データを即時に送信したい場合は上記の例の様に, async_flush を呼び出します.

ちなみに, 以下のように書くのは誤りです.

bsock.async_write_some(asio::buffer(buffer1), handler1);
bsock.async_write_some(asio::buffer(buffer1), handler2);
bsock.async_flush(handler3);

このコードはバッファにデータを書き込む前に async_flush を行うことになるので, 何も送信されません.

サンプルコードで示した callback では送信を行った回数を数え, async_flush が一度だけ呼ばれる様にしています. これは async_flush は一度に複数回呼ばれてはいけないためです. もし async_flush を一度に複数回呼び出すと同一のデータが複数回送信されてしまいます.

さらに, async_flush の呼び出しから, その callback の呼び出しの間で async_write_some を呼び出すのも安全ではありません (おそらく). 結局, buffered_write_stream を使用してもある送信処理が別の送信処理についての知識がないと効率的かつ安全な送信は困難です.

また, コードから読み取るのは難しいですが, async_write_some の callback はデータが送信されたタイミングではなく, バッファに書き込まれたタイミングで呼ばれるという動きになっています. 実際に送信されたタイミングと, callback が呼ばれるタイミングが異なるため, この挙動は受け入れられない場合があります.

例えば, TCP のハーフクローズを行う際に問題となる可能性があります. async_write_some の callback でハーフクローズを行うと, 送信前に socket の書き込み側を閉じてしまいデータの送信に失敗してしまいます.

最後にオブジェクトの寿命についての問題についても言及しておきます. 上記のサンプルコード中の buffered_write_stream の初期化は以下のようになっています.

asio::buffered_write_stream<tcp::socket&> bsock{sock};

このコードを見ると buffered_write_stream のテンプレート引数には tcp::socket への参照を渡しています. これは buffered_write_streamコンストラクタ中でテンプレート引数の型のオブジェクトのコピーを行うためです. socket はコピーできないため, このコピーを回避するため参照を渡しています. これにより, buffered_write_stream と socket の両方のオブジェクトを管理する必要性が出てきます.

おそらく buffered_write_stream 自体, socket と組み合わせることを想定していないのだと思います.

ここまでのまとめ

ここまでのことを整理すると連続してデータ送信を行うには以下の機能を持った socket が必要だと考えられます.

  • async_write 時に送信キューが空の場合は即時にデータが送信されること.
  • async_write 時に送信キューが空でない場合は, データはキューイングされること.
  • データが実際に送信されたタイミングで callback が呼ばれること.
  • 送信完了時にキューにデータがある場合は, 自動的にそれらのデータが送信されること.

ここまで見えれば, 後はこれらの機能を持った queueing socket を新たに実装すればいいだけと言いたいところですが, もう少し話は続きます.

(続きます)