AnyEvent::HTTP の同時接続数制限を理解する修行、その1
AnyEvent::HTTP では、同一ホストに対して同時に接続する数を制限している。セマフォを使うのではなく、単なるカウンタを使っていることらしいことが、AnyEvent + Coro での並行ダウンローダの習作 その 2 - 昨日知ったことに引用したコードからわかった。しかし、その意味を理解することはできなかった。このほど、http://mt.endeworks.jp/d-6/2009/11/anyevent-fifo-queue.htmlを発見した。FIFO からタスクを取り出して実行する、というコードが紹介されている。同じく AnyEvent::HTTP の実装を参考にしたとのことだが、エッセンスが抜き出してあって、元々の AnyEvent::HTTP のコードを読むよりも理解しやすい気がした。ので、自分なりに書いてみた。
まずスクリプトの後半。
my $ACTIVE = 0; # 今実行されているタスクの数 my $MAX = 3; # 同時に実行されるタスクの最大数 my @tasks = (); # タスクキュー push @tasks, mk_task $_ for 1..12; use AnyEvent::Util; sub drain_queue { while (@tasks && $ACTIVE < $MAX) { my $task = shift @tasks; $ACTIVE++; $task->(AnyEvent::Util::guard{ $ACTIVE--; drain_queue(); }); } } drain_queue; $cv->recv; __END__
mk_task で作ったタスクをキュー @tasks に格納しておく。drain_queue は「同時実行数を考慮しつつ、キューからタスクを取り出して実行」する関数。肝は「破棄されるとき、現在のアクティブなタスクをデクリメントし、drain_queue を実行する」という guard オブジェクトをタスクの実行時に渡すところ。タスクの側では、この guard オブジェクトを、自分の処理を実行している間は保持し、処理が終わったら破棄するようにする。これによって、同時実行の数が制限され、また、次のタスクがキューから取り出されるようになる。
実際にタスクをキューイングしてみる。スクリプトの前半。
use strict; use warnings; use utf8; use Encode; use AnyEvent; # wait と出力タイミング # 1------7------12-- # 2---5---8--10---- # 3--4--6--9--11-- # || |||| || | ||| #=> 32 4156 89 711|12 # 10 my %wait = do { my $i = 1; map { $i++ => $_ } qw/7 4 3 3 4 3 7 3 3 5 3 3/ }; my $cv = AnyEvent->condvar; sub mk_task { my $m = shift; $cv->begin; sub { my $guard = shift; my $wait = $wait{$m}; my $pid = open my $fh, "(sleep $wait; echo \$RANDOM)|" or die "$!"; warn "start $m (wait = $wait sec.)\n"; my $w; $w = AnyEvent->child( pid => $pid, cb => sub { chomp(my $line = <$fh>); my $msg = "$m : $line\n"; warn encode_utf8($msg); undef $guard; undef $w; close $fh; $cv->end; }); }; }
タスクは指定した秒数間 sleep コマンドで wait して、ランダムな数値を返す、というサブシェルの出力をパイプで拾うものにした。「wait と出力タイミング」のコメントに書いたような出力順になることが期待される。
実行結果。
start 1 (wait = 7 sec.) start 2 (wait = 4 sec.) start 3 (wait = 3 sec.) 3 : 30421 start 4 (wait = 3 sec.) 2 : 31918 start 5 (wait = 4 sec.) 4 : 16822 start 6 (wait = 3 sec.) 1 : 18717 start 7 (wait = 7 sec.) 5 : 29861 start 8 (wait = 3 sec.) 6 : 14935 start 9 (wait = 3 sec.) 8 : 26038 start 10 (wait = 5 sec.) 9 : 10095 start 11 (wait = 3 sec.) 7 : 16154 start 12 (wait = 3 sec.) 11 : 18772 10 : 3984 12 : 14018
期待どおりになった。
dequeue & execute はこの仕組みでできる。enqueue は単に push @tasks, mk_task すればいいだろう (後でやってみよう (一旦キューが空になって実行されているタスクもなくなっちゃうと drain_queue をキックしてやらないといけないな。push するたびに drain_queue をキックすればいいのかな))。AnyEvent::HTTP の _slot_schedule 関数は drain_queue 関数でやっていることのほかに、ホストごとに「アクティブなタスクの数のカウンタとタスクキューの対」のオブジェクト(無名配列で実装されている)を持ち、それが必要なくなったときには delete する、という処理も含まれている。
ところで
元々、スクリプトの前半、mk_task の中身は「cal コマンドを実行して 2010 年のある月が何曜日で始まるか」を調べるものだった。
use strict; use warnings; use utf8; use Encode; use AnyEvent; # wait と出力タイミング # 1------7------12-- # 2---5---8--10---- # 3--4--6--9--11-- # || |||| || | ||| #=> 32 4156 89 711|12 # 10 my %wait = do { my $i = 1; map { $i++ => $_ } qw/7 4 3 3 4 3 7 3 3 5 3 3/ }; my %week = do { my $i = 0; map { 1+3*$i++ => $_ } qw/日 月 火 水 木 金 土/ }; my $cv = AnyEvent->condvar; sub mk_task { my $m = shift; $cv->begin; sub { my $guard = shift; my $wait = $wait{$m}; my $pid = open my $fh, "(sleep $wait; cal $m 2010)|" or die "$!"; warn "start $m (wait = $wait sec.)\n"; my $w; $w = AnyEvent->io( fh => \$fh, pool => 'r', cb => sub { my $line; $line = <$fh>; $line = <$fh>; chomp($line = <$fh>); my $week = $week{index($line, '1')}; my $msg = "$m 月は${week}曜日始まり\n"; warn encode_utf8($msg); undef $guard; undef $w; close $fh; $cv->end; }); }; }
しかし、期待した動作にはならなかった。指定した wait 後にコールバックが実行されるのだが、$line = <$fh> で読み込むまでにタイムラグが発生している模様。よくわからない。それに 2 回に 1 回くらいの割合でアボートする。
start 1 (wait = 7 sec.) start 2 (wait = 4 sec.) start 3 (wait = 3 sec.) 3 月は月曜日始まり start 4 (wait = 3 sec.) 2 月は月曜日始まり start 5 (wait = 4 sec.) 1 月は金曜日始まり start 6 (wait = 3 sec.) 6 月は火曜日始まり start 7 (wait = 7 sec.) 5 月は土曜日始まり start 8 (wait = 3 sec.) 4 月は木曜日始まり start 9 (wait = 3 sec.) 9 月は水曜日始まり start 10 (wait = 5 sec.) 8 月は日曜日始まり start 11 (wait = 3 sec.) 7 月は木曜日始まり start 12 (wait = 3 sec.) 12 月は水曜日始まり 11 月は月曜日始まり 10 月は金曜日始まり