あめだまふぁくとりー

Boost.Graphとかできますん

連続で async_write するまでの道のり (その 3)

単一の socket で連続して async_write するの 3 回目で, 最終回です.

前回の記事では strand を使用方法について見ました.

本記事では作成する queueing socket の wrapping callback に strand を適用する方法を見ていきます.

wrapping callback の実装

queueing socket の async_write_some は以下のように, 送信待ちキューが空の場合に async_write を呼ぶ実装になります.

void async_write_some(Buffers const& buffers, Callback callback)
{
    auto is_empty = queue_.empty();     // ★A

    ... // buffers と callback を持つキューの要素を生成
    queue_.push(生成したキューの要素);

    if (is_empty) {
        async_write(socket_, buffers,
            wrapping_callback(callback, queue_));
    }
}

特に難しいところはないと思います.

wrapping callback の実装の概略も見てみます. こっちは逆に送信待ちキューが空でなければ, async_write を呼びます.

struct wrapping_callback
{
    void operator()(error_code const& ec, std::size_t bytes)
    {
        ... // queue から送信済みの buffer と callback を取り出す処理 // ★B
        wrapped_callback(ec, bytes);

        if (not queue_.empty()) {
            ... // queue から次の buffer と callback を参照する処理
            async_write(socket_, next_buffers, 
                wrapping_callback(next_callback, queue_));
        }
    }

    Callback wrapped_callback; // write 中のデータに対応する callback
    Socket& socket_;           // ラップされたソケット
    Queue& queue_;             // write 待ちのデータと callback のキュー
};

こっちも特に難しい点はないと思います.

問題はマルチスレッドでこれらを使用した場合です.

async_write_somewrapping_callback複数のスレッドから呼び出されるケースを考えてみます.

今, キューの中には送信中のデータしかないとします.

  1. このとき, async_write_some を呼び出し, 上記の ★A の処理が行われると, キューが空でないと判定されます.

  2. ここで wrapping_callback が呼び出され, ★B の処理でキューを空にすると wrapping_callback の中ではデータの送信は行われません.

  3. そして, async_write_some に戻って ★A 以降の処理を実行しても, is_empty は false になるため, こちらでもデータの送信は行われません.

これにより, 送信されないデータがキューに残り続けることになります.

これケース以外でも, read と write が平行して呼び出されるケース等があります. こららの問題に対応するために, queueing socket に strand を適用する方法を考える必要があります.

strand の適用 (微妙な解)

以下の三つが, strand に関連づけが必要な関数です.

  • queueing_socket の async_write_some.
  • queueing_socket の async_read_some.
  • wrapping callback.

最初の二つは問題ありません. 以下のようにユーザが strand::dispatch 等を経由して呼び出してやればいいだけです.

strand.dispatch([&] {
    queueing_sock.async_write_some(...);
});

問題は三つ目です. wrapping callback は queueing socket の内部で使用されているので, queueing socket の外にある strand に直接触れることはできません.

最も簡単な解は queueing socket の内部に strand を持つことです.

class queueing_socket {
public:
    void async_write_some(...) {
        this->strand_.dispatch([=] {
            // wrapping callback を strand に関連づける
            async_write(stream_, buffers
                , this->strand_.wrap(wrapping_callback{callback}));
        });
    }

private:
    Straem stream_;
    io_service::strand strand_; // strand を 内部に持つ.
};

こうすることで, 安全にデータの書き込みが可能になる上, queueing socket のユーザは strand を意識する必要がなくなります.

しかし, この実装は strand と queueing socket との結合度が大きいため, 以下にあげる欠点があります.

  • シングルスレッドの場合も, strand を使用によるオーバヘッドがある.
  • strand 以外のコンテキストに関連づけて使用できない.
  • queueing socket の外で使用している strand を使用してデータを書き込むことができない.

三つ目については, 外部で使用する strand を queueing socket のコンストラクタ経由で渡して, それで内部の strand を初期化するといった方法をとれば解決はできます.

しかし, 一つ目はゼロ・オーバヘッド原則に違反し, 二つ目は拡張性に問題が出ます. このため, 別の解決策が必要になります.

asio::async_write による解決策

asio::async_write は内部で複数書き込み関数を呼び出すため, 同様の問題があるはずです. そこでこの関数はどのようにしてこの問題に対処しているのか覗いてみます.

stream.async_write_some(buffers, inner_callback);

内部用の callback を渡す際には特にラッピング等はしていません.

これでどういうことか不思議に思いましたが, 仕掛けは別のところにあります.

asio_handler_invoke

通常, Boost.Asio の内部では直接 callback を呼び出すことはせず, asio_handler_invoke という関数を経由して callback を呼び出します.

この関数のシグネチャを以下に示します.

void asio_handler_invoke(Function& function, Callback* callback);

callback はユーザが指定した callback へのポインタ, function はその callback に実引数を bind して生成した関数オブジェクトです.

この関数のデフォルトの実装は単に function を呼び出すだけですが, 第二引数の Callback をオーバロードすることで, デフォルト以外の実装を使用することができます.

strand::wrap でラッピングされた関数オブジェクトに対しては以下の様に定義されています.

void asio_handler_invoke(
        Function& function, WrappedCallback* callback)
{
    callback->strand.dispatch(function);
}

callback が strand でラッピングされている場合, その callback を strand を経由して呼び出すわけです.

asio::async_write の内部で使用している callback に対しても, asio_handler_invoke が定義されています.

void asio_handler_invoke(
        Function& function, InnerCallback* innter_callback)
{
    asio_handler_invoke(
          function  // async_write 内部で使用する callback に引数を bind したもの
        , &innter_callback->callback_);  // ユーザが指定した callback
}

callback->callback_ はユーザが指定した callback なので, もしこれが strand でラッピングされていれば, 上述の通り内部で使用する callback は strand::dispatch 経由で呼び出されることになります.

つまり, async_write の内部で使用する callback は, ユーザが指定した callback と同じコンテキストで呼び出されるようになっているのです.

まとめ

結論として async_write と同様に asio_handler_invoke を用意してあげればいいということになります.

void asio_handler_invoke(
        Function& function, wrapping_callback* callback)
{
    asio_handler_invoke(function, callback->wrapped_callback);
}

これによって, strand を使用しない場合のオーバヘッドを回避し, strand 以外のコンテキストでも queueing socket を使用することができるようになりました.

実際には例外安全等他にも考慮しなければいけない点は多々ありますが, とりあえず連続で async_write するまでの道のりはこれにて終了です.

実際に実装したものは GitHub で公開しています.

allium/queueing_write_stream.hpp at of10 · amedama41/allium · GitHub