名前

Mojo::IOLoop - 最小限のイベントループ

使い方

use Mojo::IOLoop;

# ポート3000でリッスン
Mojo::IOLoop->server({port => 3000} => sub {
  my ($loop, $stream) = @_;

  $stream->on(read => sub {
    my ($stream, $bytes) = @_;

    # 入力の処理
    say $bytes;

    # データを取得して、書き込む時
    $stream->write('HTTP/1.1 200 OK');
  });
});

# ポート3000に接続
my $id = Mojo::IOLoop->client({port => 3000} => sub {
  my ($loop, $err, $stream) = @_;

  $stream->on(read => sub {
    my ($stream, $bytes) = @_;

    # 入力の処理
    say "Input: $bytes";
  });

  # リクエストを書き込み
  $stream->write("GET / HTTP/1.1\x0d\x0a\x0d\x0a");
});

# タイマーの追加
Mojo::IOLoop->timer(5 => sub {
  my $loop = shift;
  $loop->remove($id);
});

# 必要であればループを開始
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

説明

Mojo::IOLoopMojo::Reactorをベースとした、最小限のイベントループです。 堅固でスケーラブルな非同期TPCクライアントとサーバーを構築するために 必要とされる完全な最小限の機能を備えています。

オペレーティングシステムに依存して、デフォルトのプロセスあたりとシステムワイドのファイルディスクリプタの制限が しばしばとても低く、よいスケーラビリティのために調整することが必要になります。 LIBEV_FLAGS環境変数は、もっともよいEVバックエンドが選択されるようにすべきです。 通常のデフォルトはあまりスケーラブルではないselectです。

LIBEV_FLAGS=1   # select
LIBEV_FLAGS=2   # poll
LIBEV_FLAGS=4   # epoll (Linux)
LIBEV_FLAGS=8   # kqueue (*BSD, OS X)

イベントループはTime::HiResを通して、モノトニック時刻が利用可能であれば、 時間ジャンプに対して回復力があるでしょう。

テストサーバーをできるだけ簡単に記述するためにTLS証明書とキーも組み込まれています。 利便性のために、PIPEシグナルがMojo::IOLoopを読み込んだときにIGNORE に設定されることに注意してください。

よりよいスケーラビリティ(epoll, kqueue)とTLSサポートと同様にIPv6の提供のためには、 オプショナルなモジュールのEV (4.0+)、Net::DNS::Native (0.15+) 、IO::Socket::Socks (0.64+)とIO::Socket::SSL (1.94+) が、インストールされていれば、自動的に利用されます。

個々の機能はMOJO_NO_NDNMOJO_NO_IPV6MOJO_NO_TLS環境変数で無効にすることもできます。

Mojolicious::Guides::CookbookREAL-TIME WEBの項目も見てください。

イベント

Mojo::IOLoopMojo::EventEmitterからすべてのイベント継承し、 次の新しいものを発行します。

finish

$loop->on(finish => sub {
  my $loop = shift;
  ...
});

イベントループが、緩やかにシャットダウンを要求し、 すべての存在する接続が閉じられるのを待っているときに発行されます。

reset

$loop->on(reset => sub {
  my $loop = shift;
  ...
});

イベントループがリセットされたときに発行されます。これは、通常は、プロセスが共有できないリソースをクリーンアップするために、フォークされたときです。

属性

Mojo::IOLoopは次の属性を実装しています。

max_accepts

my $max = $loop->max_accepts;
$loop   = $loop->max_accepts(1000);

存在している接続を中断することなしに、緩やかにシャットダウンされる前に、 このループが受け入れることのできる接続の最大数。デフォルトは<0>です。 この値を0にするとこのループは新しいコネクションを無限に受け入れます。 複数のサーバープロセスの間での ロードバランシングのロードを改善するために、 この値の半分はランダムに引き算されることに注意してください。

max_connections

my $max = $loop->max_connections;
$loop   = $loop->max_connections(100);

新しくやってくる接続を受け入れることをやめる前に、 このループが処理することの可能な並列接続の最大数。 デフォルトは1000です。 この値を0にするとこのループはすべての新しい接続の受け入れを 停止し、存在する接続を中断することなしに、緩やかにシャットダウンを行うこと ができます。

reactor

my $reactor = $loop->reactor;
$loop       = $loop->reactor(Mojo::Reactor->new);

低レベルのイベント装置。通常はMojo::Reactor::PollあるいはMojo::Reactor::EVオブジェクト。これらは、デフォルトでMojo::Reactorerrorイベントを購読します。

# I/Oイベントのためにハンドルを監視
$loop->reactor->io($handle => sub {
  my ($reactor, $writable) = @_;
  say $writable ? 'Handle is writable' : 'Handle is readable';
});

# ハンドルが書き込み可能になったときのみ、監視するように変更
$loop->reactor->watch($handle, 0, 1);

# ハンドルを再び削除
$loop->reactor->remove($handle);

メソッド

Mojo::IOLoopMojo::Baseからすべてのメソッドを継承しており、 次の新しいメソッドを実装しています。

acceptor

my $server = Mojo::IOLoop->acceptor($id);
my $server = $loop->acceptor($id);
my $id     = $loop->acceptor(Mojo::IOLoop::Server->new);

IDを指定してMojo::IOLoop::Serverオブジェクトを取得する、 あるいは、オブジェクトをアクセプターに変換する。

client

my $id =
  Mojo::IOLoop->client(address => '127.0.0.1', port => 3000, sub {...});
my $id = $loop->client(address => '127.0.0.1', port => 3000, sub {...});
my $id = $loop->client({address => '127.0.0.1', port => 3000}, sub {...});

client_classでTCPコネクションをオープンします。 普通はMojo::IOLoop::Clientになります。 Mojo::IOLoop::Clientconnectメソッドと同じ引数を受け取ります。

# ポート3000番で127.0.0.1に接続する
Mojo::IOLoop->client({port => 3000} => sub {
  my ($loop, $err, $stream) = @_;
  ...
});

delay

my $delay = Mojo::IOLoop->delay;
my $delay = $loop->delay;
my $delay = $loop->delay(sub {...});
my $delay = $loop->delay(sub {...}, sub {...});

イベントの流れを処理し、コントロールするためのMojo::IOLoop::Delayオブジェクトを取得します。 ひとつのコールバックは、終わったイベントへのサブスクライバーとして扱われ、 複数のコールバックは、このステップのチェーンです。

# プロミスによる連続渡しスタイルのAPIでラッピング
my $ua = Mojo::UserAgent->new;
sub get {
  my $promise = Mojo::IOLoop->delay;
  $ua->get(@_ => sub {
    my ($ua, $tx) = @_;
    my $err = $tx->error;
    if   (!$err || $err->{code}) { $promise->resolve($tx) }
    else                         { $promise->reject($err->{message}) }
  });
  return $promise;
}
my $mojo = get('https://mojolicious.org');
my $cpan = get('https://metacpan.org');
Mojo::Promise->race($mojo, $cpan)->then(sub { say shift->req->url })->wait;

# 複数のノンブロックオペレーションを同期
my $delay = Mojo::IOLoop->delay(sub { say 'BOOM!' });
for my $i (1 .. 10) {
  my $end = $delay->begin;
  Mojo::IOLoop->timer($i => sub {
    say 10 - $i;
    $end->();
  });
}
$delay->wait;
 
# 複数のイベントを直列化
Mojo::IOLoop->delay(
 
  # 最初のステップ (単純なタイマー)
  sub {
    my $delay = shift;
    Mojo::IOLoop->timer(2 => $delay->begin);
    say 'Second step in 2 seconds.';
  },
 
  # 二回目のステップ (並列タイマー)
  sub {
    my $delay = shift;
    Mojo::IOLoop->timer(1 => $delay->begin);
    Mojo::IOLoop->timer(3 => $delay->begin);
    say 'Third step in 3 seconds.';
  },
 
  # 三回目のステップ (終わり)
  sub { say 'And done after 5 seconds total.' }
)->wait;
 
# すべてのステップで、例外を処理
Mojo::IOLoop->delay(
  sub {
    my $delay = shift;
    die 'Intentional error!';
  },
  sub {
    my ($delay, @args) = @_;
    say 'Never actually reached.';
  }
)->catch(sub {
  my ($delay, $err) = @_;
  say "Something went wrong: $err";
})->wait;

is_running

my $bool = Mojo::IOLoop->is_running;
my $bool = $loop->is_running;

ループが実行されているかどうかをチェックします。

next_tick

my $undef = Mojo::IOLoop->next_tick(sub {...});
my $undef = $loop->next_tick(sub {...});

できるだけすぐにコールバックを呼び出します。 しかし、リターンする前ではありません。 いつもundefを返します。

# 次のリアクターのチックに処理を実行
Mojo::IOLoop->next_tick(sub {
  my $loop = shift;
  ...
});

one_tick

Mojo::IOLoop->one_tick;
$loop->one_tick;

ひとつのイベントが起こるまで、あるいはもやはイベントが監視されなくなるまで、リアクターを実行します。 このメソッドはリアクターに戻るので、注意して利用する必要があります。

# 0.5秒以上はブロックしない。
my $id = Mojo::IOLoop->timer(0.5 => sub {});
Mojo::IOLoop->one_tick;
Mojo::IOLoop->remove($id);

recurring

my $id = Mojo::IOLoop->recurring(3 => sub {...});
my $id = $loop->recurring(0 => sub {...});
my $id = $loop->recurring(0.25 => sub {...});

すべての装置の1目盛ごとに実行されるコールバック。 これはたとえば複数の装置を順番に実行することが可能になります。

# 5秒ごとに処理を実行する
Mojo::IOLoop->recurring(5 => sub {
  my $loop = shift;
  ...
});

remove

Mojo::IOLoop->remove($id);
$loop->remove($id);

IDを使って、切断します。 書き込みバッファにすべてのデータの書き込みを終えることによって、 緩やかにコネクションが切断されます。

reset

Mojo::IOLoop->reset;
$loop->reset;

すべてを取り除き、イベントループを停止します。

server

my $id = Mojo::IOLoop->server(port => 3000, sub {...});
my $id = $loop->server(port => 3000, sub {...});
my $id = $loop->server({port => 3000}, sub {...});

server_classでTCPコネクションを受け入れます。 通常はMojo::IOLoop::Serverになります。 Mojo::IOLoop::Serverlistenメソッドと同じ引数を受け取ります。

# ランダムポートでリッスン
# Listen on random port
my $id = Mojo::IOLoop->server({address => '127.0.0.1'} => sub {
  my ($loop, $stream, $id) = @_;
  ...
});
my $port = Mojo::IOLoop->acceptor($id)->port;

singleton

my $loop = Mojo::IOLoop->singleton;

グローバルなループオブジェクト。処理の内側のあらゆる場所から、 ひとつの共有されるループインスタンスにアクセスするために利用されます。

# 多くのメソッドはショットカットを行うことを許可します。
Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop });
Mojo::IOLoop->start;

# アクティブなタイマーの再起動
my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
Mojo::IOLoop->singleton->reactor->again($id);

# アクティブタイマーをリスタート
my $id = Mojo::IOLoop->timer(3 => sub { say 'Timeout!' });
Mojo::IOLoop->singleton->reactor->again($id);

# ファイルディスクリプタをハンドルに変換し、読み込み可能になるかを監視する
my $handle = IO::Handle->new_from_fd($fd, 'r');
Mojo::IOLoop->singleton->reactor->io($handle => sub {
  my ($reactor, $writable) = @_;
  say $writable ? 'Handle is writable' : 'Handle is readable';
})->watch($handle, 1, 0);

start

Mojo::IOLoop->start;
$loop->start;

ループをスタートします。これはstopが呼び出されるまでブロックされます。 いくつかのリアクターは、監視されるイベントがなくなった場合に、 自動的に停止されることに注意してください。

# すでに開始していない場合だけ開始する
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

stop

Mojo::IOLoop->stop;
$loop->stop;

ループを即座に停止します。 これはすべての存在する接続を中断することはなく、 ループはstartが再び実行されれば、再開始されます。

stop_gracefully

Mojo::IOLoop->stop_gracefully;
$loop->stop_gracefully;

新しい接続の受け入れを停止し、 イベントループを停止する前に、 すべての存在する接続が閉じられるのを待ちます。

stream

my $stream = Mojo::IOLoop->stream($id);
my $stream = $loop->stream($id);
my $id     = $loop->stream(Mojo::IOLoop::Stream->new);

IDを指定してMojo::IOLoop::Streamオブジェクトを取得するか、 オブジェクトをコネクションに変換します。

# 接続のためのインアクティビティタイムアウトを300秒に増やす
Mojo::IOLoop->stream($id)->timeout(300);

subprocess

my $subprocess = Mojo::IOLoop->subprocess(sub {...}, sub {...});
my $subprocess = $loop->subprocess;
my $subprocess = $loop->subprocess(sub {...}, sub {...});

計算が高価なオペレーションを実行するために、イベントループのブロックなしで、Mojo::IOLoop::Subprocessオブジェクトを構築します。 コールバックはMojo::IOLoop::Subprocessの"run"メソッドに渡されます。

# 5秒ブロックするイベントループをブロックするオペレーション
Mojo::IOLoop->subprocess(
  sub {
    my $subprocess = shift;
    sleep 5;
    return '♥', 'Mojolicious';
  },
  sub {
    my ($subprocess, $err, @results) = @_;
    say "Subprocess error: $err" and return if $err;
    say "I $results[0] $results[1]!";
  }
);

timer

my $id = Mojo::IOLoop->timer(3 => sub {...});
my $id = $loop->timer(0 => sub {...});
my $id = $loop->timer(0.25 => sub {...});

新しいタイマーを生成します。指定した秒数が経過後にコールバックは実行されます。

# 5秒のうちに処理を実行する
Mojo::IOLoop->timer(5 => sub {
  my $loop = shift;
  ...
});

デバッグ

より詳細な情報をSTDERRに出力するために、MOJO_IOLOOP_DEBUG環境変数を設定できます。

MOJO_IOLOOP_DEBUG=1

参考

Mojolicious, Mojolicious::Guides, http://mojolicio.us.

(Mojolicious 8.12を反映。2019年5月22日更新)

関連情報