Mojo::IOLoop - 最小限のイベントループ
使い方
use Mojo::IOLoop; # ポート3000でリッスン Mojo::IOLoop->server({port => 3000} => sub { my ($loop, $stream) = @_; $stream->on(read => sub { my ($stream, $bytes) = @_; # 入力の処理 say $bytes; # データを取得して、書き込む時 $stream->write('HTTP/1.1 200 OK'); }); }); # ポート3000に接続 my $id = Mojo::IOLoop->client({port => 3000} => sub { my ($loop, $err, $stream) = @_; $stream->on(read => sub { my ($stream, $bytes) = @_; # 入力の処理 say "Input: $bytes"; }); # リクエストを書き込み $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a"); }); # タイマーの追加 Mojo::IOLoop->timer(5 => sub { my $loop = shift; $loop->remove($id); }); # 必要であればループを開始 Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
説明
Mojo::IOLoopはMojo::Reactorをベースとした、最小限のイベントループです。 堅固でスケーラブルな非同期TPCクライアントとサーバーを構築するために 必要とされる完全な最小限の機能を備えています。
オペレーティングシステムに依存して、デフォルトのプロセスあたりとシステムワイドのファイルディスクリプタの制限が しばしばとても低く、よいスケーラビリティのために調整することが必要になります。 LIBEV_FLAGS
環境変数は、もっともよいEVバックエンドが選択されるようにすべきです。 通常のデフォルトはあまりスケーラブルではないselect
です。
LIBEV_FLAGS=1 # select LIBEV_FLAGS=2 # poll LIBEV_FLAGS=4 # epoll (Linux) LIBEV_FLAGS=8 # kqueue (*BSD, OS X)
イベントループはTime::HiResを通して、モノトニック時刻が利用可能であれば、 時間ジャンプに対して回復力があるでしょう。
テストサーバーをできるだけ簡単に記述するためにTLS証明書とキーも組み込まれています。 利便性のために、PIPE
シグナルがMojo::IOLoopを読み込んだときにIGNORE
に設定されることに注意してください。
よりよいスケーラビリティ(epoll, kqueue)とTLSサポートと同様にIPv6の提供のためには、 オプショナルなモジュールのEV (4.0+)、Net::DNS::Native (0.15+) 、IO::Socket::Socks (0.64+)とIO::Socket::SSL (1.94+) が、インストールされていれば、自動的に利用されます。
個々の機能はMOJO_NO_NDN
、MOJO_NO_IPV6
、MOJO_NO_TLS
環境変数で無効にすることもできます。
Mojolicious::Guides::CookbookのREAL-TIME WEB
の項目も見てください。
イベント
Mojo::IOLoopはMojo::EventEmitterからすべてのイベント継承し、 次の新しいものを発行します。
finish
$loop->on(finish => sub { my $loop = shift; ... });
イベントループが、緩やかにシャットダウンを要求し、 すべての存在する接続が閉じられるのを待っているときに発行されます。
reset
$loop->on(reset => sub { my $loop = shift; ... });
イベントループがリセットされたときに発行されます。これは、通常は、プロセスが共有できないリソースをクリーンアップするために、フォークされたときです。
属性
Mojo::IOLoopは次の属性を実装しています。
max_accepts
my $max = $loop->max_accepts; $loop = $loop->max_accepts(1000);
存在している接続を中断することなしに、緩やかにシャットダウンされる前に、 このループが受け入れることのできる接続の最大数。デフォルトは<0>です。 この値を0
にするとこのループは新しいコネクションを無限に受け入れます。 複数のサーバープロセスの間での ロードバランシングのロードを改善するために、 この値の半分はランダムに引き算されることに注意してください。
max_connections
my $max = $loop->max_connections; $loop = $loop->max_connections(100);
新しくやってくる接続を受け入れることをやめる前に、 このループが処理することの可能な並列接続の最大数。 デフォルトは1000
です。 この値を0
にするとこのループはすべての新しい接続の受け入れを 停止し、存在する接続を中断することなしに、緩やかにシャットダウンを行うこと ができます。
reactor
my $reactor = $loop->reactor; $loop = $loop->reactor(Mojo::Reactor->new);
低レベルのイベント装置。通常はMojo::Reactor::PollあるいはMojo::Reactor::EVオブジェクト。これらは、デフォルトでMojo::Reactorのerror
イベントを購読します。
# I/Oイベントのためにハンドルを監視 $loop->reactor->io($handle => sub { my ($reactor, $writable) = @_; say $writable ? 'Handle is writable' : 'Handle is readable'; }); # ハンドルが書き込み可能になったときのみ、監視するように変更 $loop->reactor->watch($handle, 0, 1); # ハンドルを再び削除 $loop->reactor->remove($handle);
メソッド
Mojo::IOLoopはMojo::Baseからすべてのメソッドを継承しており、 次の新しいメソッドを実装しています。
acceptor
my $server = Mojo::IOLoop->acceptor($id); my $server = $loop->acceptor($id); my $id = $loop->acceptor(Mojo::IOLoop::Server->new);
IDを指定してMojo::IOLoop::Serverオブジェクトを取得する、 あるいは、オブジェクトをアクセプターに変換する。
client
my $id = Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...}); my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...}); my $id = $loop->client({address => '127.0.0.1', port => 3000}, sub {...});
client_class
でTCPコネクションをオープンします。 普通はMojo::IOLoop::Clientになります。 Mojo::IOLoop::Clientのconnect
メソッドと同じ引数を受け取ります。
# ポート3000番で127.0.0.1に接続する Mojo::IOLoop->client({port => 3000} => sub { my ($loop, $err, $stream) = @_; ... });
delay
my $delay = Mojo::IOLoop->delay; my $delay = $loop->delay; my $delay = $loop->delay(sub {...}); my $delay = $loop->delay(sub {...}, sub {...});
イベントの流れを処理し、コントロールするためのMojo::IOLoop::Delayオブジェクトを取得します。 ひとつのコールバックは、終わったイベントへのサブスクライバーとして扱われ、 複数のコールバックは、このステップのチェーンです。
# プロミスによる連続渡しスタイルのAPIでラッピング my $ua = Mojo::UserAgent->new; sub get { my $promise = Mojo::IOLoop->delay; $ua->get(@_ => sub { my ($ua, $tx) = @_; my $err = $tx->error; if (!$err || $err->{code}) { $promise->resolve($tx) } else { $promise->reject($err->{message}) } }); return $promise; } my $mojo = get('https://mojolicious.org'); my $cpan = get('https://metacpan.org'); Mojo::Promise->race($mojo, $cpan)->then(sub { say shift->req->url })->wait; # 複数のノンブロックオペレーションを同期 my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' }); for my $i (1 .. 10) { my $end = $delay->begin; Mojo::IOLoop->timer($i => sub { say 10 - $i; $end->(); }); } $delay->wait; # 複数のイベントを直列化 Mojo::IOLoop->delay( # 最初のステップ (単純なタイマー) sub { my $delay = shift; Mojo::IOLoop->timer(2 => $delay->begin); say 'Second step in 2 seconds.'; }, # 二回目のステップ (並列タイマー) sub { my $delay = shift; Mojo::IOLoop->timer(1 => $delay->begin); Mojo::IOLoop->timer(3 => $delay->begin); say 'Third step in 3 seconds.'; }, # 三回目のステップ (終わり) sub { say 'And done after 5 seconds total.' } )->wait; # すべてのステップで、例外を処理 Mojo::IOLoop->delay( sub { my $delay = shift; die 'Intentional error!'; }, sub { my ($delay, @args) = @_; say 'Never actually reached.'; } )->catch(sub { my ($delay, $err) = @_; say "Something went wrong: $err"; })->wait;
is_running
my $bool = Mojo::IOLoop->is_running; my $bool = $loop->is_running;
ループが実行されているかどうかをチェックします。
next_tick
my $undef = Mojo::IOLoop->next_tick(sub {...}); my $undef = $loop->next_tick(sub {...});
できるだけすぐにコールバックを呼び出します。 しかし、リターンする前ではありません。 いつもundef
を返します。
# 次のリアクターのチックに処理を実行 Mojo::IOLoop->next_tick(sub { my $loop = shift; ... });
one_tick
Mojo::IOLoop->one_tick; $loop->one_tick;
ひとつのイベントが起こるまで、あるいはもやはイベントが監視されなくなるまで、リアクターを実行します。 このメソッドはリアクターに戻るので、注意して利用する必要があります。
# 0.5秒以上はブロックしない。 my $id = Mojo::IOLoop->timer(0.5 => sub {}); Mojo::IOLoop->one_tick; Mojo::IOLoop->remove($id);
recurring
my $id = Mojo::IOLoop->recurring(3 => sub {...}); my $id = $loop->recurring(0 => sub {...}); my $id = $loop->recurring(0.25 => sub {...});
すべての装置の1目盛ごとに実行されるコールバック。 これはたとえば複数の装置を順番に実行することが可能になります。
# 5秒ごとに処理を実行する Mojo::IOLoop->recurring(5 => sub { my $loop = shift; ... });
remove
Mojo::IOLoop->remove($id); $loop->remove($id);
IDを使って、切断します。 書き込みバッファにすべてのデータの書き込みを終えることによって、 緩やかにコネクションが切断されます。
reset
Mojo::IOLoop->reset; $loop->reset;
すべてを取り除き、イベントループを停止します。
server
my $id = Mojo::IOLoop->server(port => 3000, sub {...}); my $id = $loop->server(port => 3000, sub {...}); my $id = $loop->server({port => 3000}, sub {...});
server_class
でTCPコネクションを受け入れます。 通常はMojo::IOLoop::Serverになります。 Mojo::IOLoop::Serverのlisten
メソッドと同じ引数を受け取ります。
# ランダムポートでリッスン # Listen on random port my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub { my ($loop, $stream, $id) = @_; ... }); my $port = Mojo::IOLoop->acceptor($id)->port;
singleton
my $loop = Mojo::IOLoop->singleton;
グローバルなループオブジェクト。処理の内側のあらゆる場所から、 ひとつの共有されるループインスタンスにアクセスするために利用されます。
# 多くのメソッドはショットカットを行うことを許可します。 Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop }); Mojo::IOLoop->start; # アクティブなタイマーの再起動 my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' }); Mojo::IOLoop->singleton->reactor->again($id); # アクティブタイマーをリスタート my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' }); Mojo::IOLoop->singleton->reactor->again($id); # ファイルディスクリプタをハンドルに変換し、読み込み可能になるかを監視する my $handle = IO::Handle->new_from_fd($fd, 'r'); Mojo::IOLoop->singleton->reactor->io($handle => sub { my ($reactor, $writable) = @_; say $writable ? 'Handle is writable' : 'Handle is readable'; })->watch($handle, 1, 0);
start
Mojo::IOLoop->start; $loop->start;
ループをスタートします。これはstop
が呼び出されるまでブロックされます。 いくつかのリアクターは、監視されるイベントがなくなった場合に、 自動的に停止されることに注意してください。
# すでに開始していない場合だけ開始する Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
stop
Mojo::IOLoop->stop; $loop->stop;
ループを即座に停止します。 これはすべての存在する接続を中断することはなく、 ループはstart
が再び実行されれば、再開始されます。
stop_gracefully
Mojo::IOLoop->stop_gracefully; $loop->stop_gracefully;
新しい接続の受け入れを停止し、 イベントループを停止する前に、 すべての存在する接続が閉じられるのを待ちます。
stream
my $stream = Mojo::IOLoop->stream($id); my $stream = $loop->stream($id); my $id = $loop->stream(Mojo::IOLoop::Stream->new);
IDを指定してMojo::IOLoop::Streamオブジェクトを取得するか、 オブジェクトをコネクションに変換します。
# 接続のためのインアクティビティタイムアウトを300秒に増やす Mojo::IOLoop->stream($id)->timeout(300);
subprocess
my $subprocess = Mojo::IOLoop->subprocess(sub {...}, sub {...}); my $subprocess = $loop->subprocess; my $subprocess = $loop->subprocess(sub {...}, sub {...});
計算が高価なオペレーションを実行するために、イベントループのブロックなしで、Mojo::IOLoop::Subprocessオブジェクトを構築します。 コールバックはMojo::IOLoop::Subprocessの"run"メソッドに渡されます。
# 5秒ブロックするイベントループをブロックするオペレーション Mojo::IOLoop->subprocess( sub { my $subprocess = shift; sleep 5; return '♥', 'Mojolicious'; }, sub { my ($subprocess, $err, @results) = @_; say "Subprocess error: $err" and return if $err; say "I $results[0] $results[1]!"; } );
timer
my $id = Mojo::IOLoop->timer(3 => sub {...}); my $id = $loop->timer(0 => sub {...}); my $id = $loop->timer(0.25 => sub {...});
新しいタイマーを生成します。指定した秒数が経過後にコールバックは実行されます。
# 5秒のうちに処理を実行する Mojo::IOLoop->timer(5 => sub { my $loop = shift; ... });
デバッグ
より詳細な情報をSTDERR
に出力するために、MOJO_IOLOOP_DEBUG
環境変数を設定できます。
MOJO_IOLOOP_DEBUG=1
参考
Mojolicious, Mojolicious::Guides, http://mojolicio.us.
(Mojolicious 8.12を反映。2019年5月22日更新)