あめだまふぁくとりー

Boost.Graphとかできますん

initiating function 内で yield_context を呼べるようになった

Boost 1.58.0 以降では, Asynchronous operation の initiating function (async_write_some など) 内で直接 yield_context の handler の呼び出しが可能になりました.

以前まではどうだったかというと,

の順番で処理が実行される必要がありました.

例えば, io_service::dispatch は内部で handler を呼び出す可能性があるので上記の順番と逆の処理になります. そのため, この関数に yield_context を渡すのは今までは未定義動作でしたが, 1.58.0 からは問題なく動作します (そもそもこの関数に yield_context を渡すこと自体意味はないですが).

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

namespace asio = boost::asio;

int main(int argc, char const* argv[])
{
    asio::io_service io_service{};
    asio::spawn(io_service, [&](asio::yield_context yield) {
        io_service.dispatch(yield); // OK from Boost 1.58.0
    });
    io_service.run();
    return 0;
}

この変更によって, 今まで理論上コンテキストスイッチを必要としなかった部分を省略するといった最適化が可能になります (逆にいうと今までは必ずコンテキストスイッチが発生していました).

// ブロックなしで書き込みが可能な場合や,
// コネクションが Abort している場合等は
// コンテキストスイッチは必要無い.
socket.async_write_some(bufs, yield[ec]);

ただし, Asyncronous Operation の要求 では initiating function 内での handler の呼び出しを禁じているので, 今後このような最適化が行われるかはわかりません.

Boost Asio のドキュメントより一部抜粋.

When an asynchronous operation is complete, the handler for the operation will be invoked as if by:

Constructing a bound completion handler bch for the handler, as described below.
Calling ios.post(bch) to schedule the handler for deferred invocation, where ios is the associated io_service.

This implies that the handler must not be called directly from within the initiating function, even if the asynchronous operation completes immediately.

もしかしたら, asio_handler_is_continuation のような helper 関数を使用して実現するといったことも考えられます.

GCC 4.9.1 で List-initialization の評価順が直っていた

List-initialization におけるリストの各要素の評価順は左から右に評価されるように規定されています. しかし, GCC 4.9.0 以前では List-initialization でコンストラクタが呼び出される場合は正しい順序で引数が評価されていませんでした.

#include <iostream>

int f(int i)
{
    std::cout << i << std::endl;
    return i;
}

struct S {
    S(int, int, int) {}
};


int main()
{
    std::cout << "1 2 3 の順に出力されるはず" << std::endl;
    int a[3] = {f(0), f(1), f(2)};
    (void)a;

    std::cout << "出力順は規定されていない" << std::endl;
    S(f(0), f(1), f(2));

    std::cout << "1 2 3 の順に出力されるはず" << std::endl;
    S{f(0), f(1), f(2)};
}

GCC 4.9.0 での出力::

1 2 3 の順に出力されるはず
0
1
2
出力順は規定されていない
2
1
0
1 2 3 の順に出力されるはず
2
1
0

GCC 4.9.1 ではこのバグが修正されたので, 正しい順序で評価されます.

1 2 3 の順に出力されるはず
0
1
2
出力順は規定されていない
2
1
0
1 2 3 の順に出力されるはず
0
1
2

melpon.org

バグ報告されてから修正されるまで 2 年半以上もかかった息の長いバグでした.

51253 – [C++11][DR 1030] Evaluation order (sequenced-before relation) among initializer-clauses in braced-init-list

Boost 1.59 を GitHub リポジトリのソースからビルド

いつもは zip か, Homebrew でインストールしていたのを使用していましたが, 今回は GitHub から持ってきたのでメモを残しておきます.

# boost のトップリポジトリを取得
git clone https://github.com/boostorg/boost.git

# 各ライブラリのリポジトリを取得
cd boost
git submodule init
git submodule update

# Boost 1.59 をチェックアウト
git checkout -b boost-1.59.0 boost-1.59.0
git submodule update

# Clang C++11 を使用してビルドするように user-config.jam を編集
touch ~/user-config.jam
echo 'using clang : 3.6 : /usr/local/opt/llvm/bin/clang++ : <cxxflags>"-std=c++11 -stdlib=libc++" <linkflags>"-stdlib=libc++" ;' >> ~/user-config.jam
# Boost.MPI と Boost.Parallel Graph もビルドするように設定
echo "using mpi ;" >> ~/user-config.jam

# ヘッダファイル群の構成
b2 headers
# ICU_PATH を指定しないと regex のビルドに失敗するので ICU_PATH を指定してビルド
b2 --debug-configuration --layout=versioned -sICU_PATH=/usr/local/opt/icu4c stage 2>&1 | tee stage.log 

Boost.Build, OpenMPI, ICU は Homebrew でインストール済みでした.

Coroutine2 を使用するには C++14 でビルドする必要がありそうです.

Boost.Function を使わない SCOPE_EXIT_ALL

修正(2015-02-22) RVO を無効化した場合, 指定した処理が複数回実行されてしまうのを修正しました.

Boost.Scope Exit の実装を覗いてみたら, C++11 (or later) 用の BOOST_SCOPE_EXIT_ALL には Boost.Function が使用されていました. Boost.Function は動的メモリの確保を必要とすると思うので, 使わないでいい方法を考えてみました.

#include <iostream>
#include <utility>

template <class F>
struct guard
{
    explicit guard(F&& f) : f_(std::forward<F>(f)), callable_(true) {}
    guard(guard&& other)
        : f_(std::forward<F>(other.f_)), callable_(true)
    {
        other.callable_ = false;
    }
    ~guard() {
        if (callable_) f_();
    }
private:
    F f_;
    bool callable_;
};

namespace detail {
    struct guard_gen
    {
        template <class F>
        auto operator=(F&& f) const -> guard<F>
        {
            return guard<F>{std::forward<F>(f)};
        }
    };
} // namespace detail

// 変数名とかキャプチャの処理はここでは省略する
#define SCOPE_EXIT_ALL() \
    auto scope_exit = detail::guard_gen{} = []()

int main(int argc, char const* argv[])
{
    {
        SCOPE_EXIT_ALL() {
            std::cout << "exit" << std::endl;
        };
        std::cout << "start" << std::endl;
    }
    return 0;
}

実行結果

start
exit

Asio の coroutine で明示的に yield する

実行時間がかかる処理 A を coroutine の中で行うと, その coroutine と同じ strand 内の処理は処理 A が完了するまで待たされてしまいます.

io_service::post または strand::post を使用することで, 処理 A の合間で明示的に coroutine を切り替えることができます (dispatch は使用できません. 必ず post を使用してください).

#include <ctime>
#include <chrono>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>

using namespace boost;

void heavy_task()
{
    std::this_thread::sleep_for(std::chrono::seconds{1});
}

auto print_now(char const* str)
    -> std::ostream&
{
    auto const now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
    return std::cout << str << " " << std::ctime(&now);
}

int main()
{
    asio::io_service io_service{};

    auto strand = asio::io_service::strand{io_service};

    // 実行時間のかかる処理 A
    asio::spawn(strand, [&](asio::yield_context yield) {
        print_now("start heavy_task");
        for (auto i = 0; i < 5; ++i) {
            io_service.post(yield); // ★ ここで切り替え
            heavy_task();
            io_service.post(yield); // ★ ここでも切り替え
        }
        print_now("finish heavy_task");
    });

    // 処理 A と同じ strand 内の処理
    asio::spawn(strand, [&](asio::yield_context yield) {
        asio::steady_timer timer{io_service};
        for (auto i = 0; i < 4; ++i) {
            timer.expires_from_now(std::chrono::seconds{1});
            timer.async_wait(yield);
            print_now("tick tack...    ");
        }
    });

    io_service.run();
}

明示的に切り替えた場合の実行結果

start heavy_task Tue Dec 23 11:21:13 2014
tick tack...     Tue Dec 23 11:21:14 2014
tick tack...     Tue Dec 23 11:21:15 2014
tick tack...     Tue Dec 23 11:21:16 2014
tick tack...     Tue Dec 23 11:21:17 2014
finish heavy_task Tue Dec 23 11:21:18 2014

切り替えない場合の実行結果

start heavy_task Tue Dec 23 11:32:17 2014
finish heavy_task Tue Dec 23 11:32:22 2014
tick tack...     Tue Dec 23 11:32:23 2014
tick tack...     Tue Dec 23 11:32:24 2014
tick tack...     Tue Dec 23 11:32:25 2014
tick tack...     Tue Dec 23 11:32:26 2014

ただし, スケジューリングのタイミングがシビアなので, 実行時間がかかるものは別スレッドで実行した方がいいでしょう.

boost::asio::async_write / read の限界

asio::async_writeasio::async_read に難癖をつけてみます.

stream のコンテキストとハンドラのコンテキストが分離されていない

socket への書き込み (読み込み) 処理は sock_strand 内で実行し, その書き込み (読み込み) 完了のハンドラは handler_strand 内で実行したい場合, どのようにコードを書けばいいでしょうか?

素直に書いてみると以下の様になります.

sock_strand.dispatch([=, &sock]{
    boost::asio::async_write(sock, buffers
            , sock_strand.wrap(handler_strand.wrap(handler));

残念ながらこれは間違いです. 正解はこうなります.

sock_strand.dispatch([=, &sock]{
    boost::asio::async_write(sock, buffers
            , sock_strand.wrap([=](auto const& ec, auto bytes)
    {
        handler_strand.post(bind(handler, ec, bytes));
    }));

前者がダメな理由は async_write / read の内部で使用されるハンドラが最終的に handler_strand 内で呼ばれるため, sock複数のスレッドから操作される可能性があるからです.

asio::async_write / read は対象とする stream とハンドラのコンテキストを上手く分離できません.

なので上記の正解の例でも, ハンドラが呼ばれる際に sock_strand を一度経由するオーバヘッドが発生してしまいます.

内部で例外が起きた場合ハンドラが呼ばれない

asio::async_write / read は内部で複数回 async_write / read_some メンバ関数を呼ぶ可能性があります.

ここで一つ疑問が浮かび上がります.

二回目以降の async_write / read_some メンバ関数の呼び出しで例外が投げられた場合でも, handler は呼ばれるのでしょうか?

実はこの場合, handler は呼ばれません. なのでいくつデータを読み書きしたのかがわからなくなってしまいます.

実際, 例外が投げられそうなのはメモリの allocate に失敗したときぐらいに見えるのであまり心配する必要はないとは思います.

consuming_buffers が Const / MutableBufferSequence の条件を満たさない

consuming_buffers というのは asio::async_write / read の内部で使用されている送信 / 受信データのラッパーです.

Const / MutableBufferSequence::const_iterator は bidirectional iterator でないといけないと, ドキュメントに記載されているのですが, consuming_buffers::const_iterator は forwarding iterator の条件しか満たしていません.

そもそもなんで bidirectional iterator が要求されているんでしょう?

連続で async_write するまでの道のり (その 3)

単一の socket で連続して async_write するの 3 回目で, 最終回です.

前回の記事では strand を使用方法について見ました.

本記事では作成する queueing socket の wrapping callback に strand を適用する方法を見ていきます.

wrapping callback の実装

queueing socket の async_write_some は以下のように, 送信待ちキューが空の場合に async_write を呼ぶ実装になります.

void async_write_some(Buffers const& buffers, Callback callback)
{
    auto is_empty = queue_.empty();     // ★A

    ... // buffers と callback を持つキューの要素を生成
    queue_.push(生成したキューの要素);

    if (is_empty) {
        async_write(socket_, buffers,
            wrapping_callback(callback, queue_));
    }
}

特に難しいところはないと思います.

wrapping callback の実装の概略も見てみます. こっちは逆に送信待ちキューが空でなければ, async_write を呼びます.

struct wrapping_callback
{
    void operator()(error_code const& ec, std::size_t bytes)
    {
        ... // queue から送信済みの buffer と callback を取り出す処理 // ★B
        wrapped_callback(ec, bytes);

        if (not queue_.empty()) {
            ... // queue から次の buffer と callback を参照する処理
            async_write(socket_, next_buffers, 
                wrapping_callback(next_callback, queue_));
        }
    }

    Callback wrapped_callback; // write 中のデータに対応する callback
    Socket& socket_;           // ラップされたソケット
    Queue& queue_;             // write 待ちのデータと callback のキュー
};

こっちも特に難しい点はないと思います.

問題はマルチスレッドでこれらを使用した場合です.

async_write_somewrapping_callback複数のスレッドから呼び出されるケースを考えてみます.

今, キューの中には送信中のデータしかないとします.

  1. このとき, async_write_some を呼び出し, 上記の ★A の処理が行われると, キューが空でないと判定されます.

  2. ここで wrapping_callback が呼び出され, ★B の処理でキューを空にすると wrapping_callback の中ではデータの送信は行われません.

  3. そして, async_write_some に戻って ★A 以降の処理を実行しても, is_empty は false になるため, こちらでもデータの送信は行われません.

これにより, 送信されないデータがキューに残り続けることになります.

これケース以外でも, read と write が平行して呼び出されるケース等があります. こららの問題に対応するために, queueing socket に strand を適用する方法を考える必要があります.

strand の適用 (微妙な解)

以下の三つが, strand に関連づけが必要な関数です.

  • queueing_socket の async_write_some.
  • queueing_socket の async_read_some.
  • wrapping callback.

最初の二つは問題ありません. 以下のようにユーザが strand::dispatch 等を経由して呼び出してやればいいだけです.

strand.dispatch([&] {
    queueing_sock.async_write_some(...);
});

問題は三つ目です. wrapping callback は queueing socket の内部で使用されているので, queueing socket の外にある strand に直接触れることはできません.

最も簡単な解は queueing socket の内部に strand を持つことです.

class queueing_socket {
public:
    void async_write_some(...) {
        this->strand_.dispatch([=] {
            // wrapping callback を strand に関連づける
            async_write(stream_, buffers
                , this->strand_.wrap(wrapping_callback{callback}));
        });
    }

private:
    Straem stream_;
    io_service::strand strand_; // strand を 内部に持つ.
};

こうすることで, 安全にデータの書き込みが可能になる上, queueing socket のユーザは strand を意識する必要がなくなります.

しかし, この実装は strand と queueing socket との結合度が大きいため, 以下にあげる欠点があります.

  • シングルスレッドの場合も, strand を使用によるオーバヘッドがある.
  • strand 以外のコンテキストに関連づけて使用できない.
  • queueing socket の外で使用している strand を使用してデータを書き込むことができない.

三つ目については, 外部で使用する strand を queueing socket のコンストラクタ経由で渡して, それで内部の strand を初期化するといった方法をとれば解決はできます.

しかし, 一つ目はゼロ・オーバヘッド原則に違反し, 二つ目は拡張性に問題が出ます. このため, 別の解決策が必要になります.

asio::async_write による解決策

asio::async_write は内部で複数書き込み関数を呼び出すため, 同様の問題があるはずです. そこでこの関数はどのようにしてこの問題に対処しているのか覗いてみます.

stream.async_write_some(buffers, inner_callback);

内部用の callback を渡す際には特にラッピング等はしていません.

これでどういうことか不思議に思いましたが, 仕掛けは別のところにあります.

asio_handler_invoke

通常, Boost.Asio の内部では直接 callback を呼び出すことはせず, asio_handler_invoke という関数を経由して callback を呼び出します.

この関数のシグネチャを以下に示します.

void asio_handler_invoke(Function& function, Callback* callback);

callback はユーザが指定した callback へのポインタ, function はその callback に実引数を bind して生成した関数オブジェクトです.

この関数のデフォルトの実装は単に function を呼び出すだけですが, 第二引数の Callback をオーバロードすることで, デフォルト以外の実装を使用することができます.

strand::wrap でラッピングされた関数オブジェクトに対しては以下の様に定義されています.

void asio_handler_invoke(
        Function& function, WrappedCallback* callback)
{
    callback->strand.dispatch(function);
}

callback が strand でラッピングされている場合, その callback を strand を経由して呼び出すわけです.

asio::async_write の内部で使用している callback に対しても, asio_handler_invoke が定義されています.

void asio_handler_invoke(
        Function& function, InnerCallback* innter_callback)
{
    asio_handler_invoke(
          function  // async_write 内部で使用する callback に引数を bind したもの
        , &innter_callback->callback_);  // ユーザが指定した callback
}

callback->callback_ はユーザが指定した callback なので, もしこれが strand でラッピングされていれば, 上述の通り内部で使用する callback は strand::dispatch 経由で呼び出されることになります.

つまり, async_write の内部で使用する callback は, ユーザが指定した callback と同じコンテキストで呼び出されるようになっているのです.

まとめ

結論として async_write と同様に asio_handler_invoke を用意してあげればいいということになります.

void asio_handler_invoke(
        Function& function, wrapping_callback* callback)
{
    asio_handler_invoke(function, callback->wrapped_callback);
}

これによって, strand を使用しない場合のオーバヘッドを回避し, strand 以外のコンテキストでも queueing socket を使用することができるようになりました.

実際には例外安全等他にも考慮しなければいけない点は多々ありますが, とりあえず連続で async_write するまでの道のりはこれにて終了です.

実際に実装したものは GitHub で公開しています.

allium/queueing_write_stream.hpp at of10 · amedama41/allium · GitHub