AnyEvent::HTTP の同時接続数制限を理解する修行、その1

AnyEvent::HTTP では、同一ホストに対して同時に接続する数を制限している。セマフォを使うのではなく、単なるカウンタを使っていることらしいことが、AnyEvent + Coro での並行ダウンローダの習作 その 2 - 昨日知ったことに引用したコードからわかった。しかし、その意味を理解することはできなかった。このほど、http://mt.endeworks.jp/d-6/2009/11/anyevent-fifo-queue.htmlを発見した。FIFO からタスクを取り出して実行する、というコードが紹介されている。同じく AnyEvent::HTTP の実装を参考にしたとのことだが、エッセンスが抜き出してあって、元々の AnyEvent::HTTP のコードを読むよりも理解しやすい気がした。ので、自分なりに書いてみた。

まずスクリプトの後半。

my $ACTIVE = 0;    # 今実行されているタスクの数
my $MAX = 3;       # 同時に実行されるタスクの最大数
my @tasks = ();    # タスクキュー
push @tasks, mk_task $_ for 1..12;

use AnyEvent::Util;
sub drain_queue {
    while (@tasks && $ACTIVE < $MAX) {
        my $task = shift @tasks;
        $ACTIVE++;
        $task->(AnyEvent::Util::guard{
            $ACTIVE--;
            drain_queue();
        });
    }
}

drain_queue;
$cv->recv;
__END__

mk_task で作ったタスクをキュー @tasks に格納しておく。drain_queue は「同時実行数を考慮しつつ、キューからタスクを取り出して実行」する関数。肝は「破棄されるとき、現在のアクティブなタスクをデクリメントし、drain_queue を実行する」という guard オブジェクトをタスクの実行時に渡すところ。タスクの側では、この guard オブジェクトを、自分の処理を実行している間は保持し、処理が終わったら破棄するようにする。これによって、同時実行の数が制限され、また、次のタスクがキューから取り出されるようになる。

実際にタスクをキューイングしてみる。スクリプトの前半。

use strict;
use warnings;
use utf8;
use Encode;
use AnyEvent;

# wait と出力タイミング                                                         
# 1------7------12--                                                            
# 2---5---8--10----                                                             
# 3--4--6--9--11--                                                              
#   || |||| || | |||                                                            
#=> 32 4156 89 711|12                                                           
#                 10                                                            
my %wait = do { my $i = 1; map { $i++ => $_ } qw/7 4 3 3 4 3 7 3 3 5 3 3/ };

my $cv = AnyEvent->condvar;

sub mk_task {
    my $m = shift;
    $cv->begin;
    sub {
        my $guard = shift;
        my $wait = $wait{$m};
        my $pid = open my $fh, "(sleep $wait; echo \$RANDOM)|" or die "$!";
        warn "start $m (wait = $wait sec.)\n";
        my $w; $w = AnyEvent->child( pid => $pid,
                                  cb => sub {
                                      chomp(my $line = <$fh>);
                                      my $msg = "$m : $line\n";
                                      warn encode_utf8($msg);
                                      undef $guard;
                                      undef $w;
                                      close $fh;
                                      $cv->end;
                                  });
        };
}

タスクは指定した秒数間 sleep コマンドで wait して、ランダムな数値を返す、というサブシェルの出力をパイプで拾うものにした。「wait と出力タイミング」のコメントに書いたような出力順になることが期待される。
実行結果。

start 1 (wait = 7 sec.)
start 2 (wait = 4 sec.)
start 3 (wait = 3 sec.)
3 : 30421
start 4 (wait = 3 sec.)
2 : 31918
start 5 (wait = 4 sec.)
4 : 16822
start 6 (wait = 3 sec.)
1 : 18717
start 7 (wait = 7 sec.)
5 : 29861
start 8 (wait = 3 sec.)
6 : 14935
start 9 (wait = 3 sec.)
8 : 26038
start 10 (wait = 5 sec.)
9 : 10095
start 11 (wait = 3 sec.)
7 : 16154
start 12 (wait = 3 sec.)
11 : 18772
10 : 3984
12 : 14018

期待どおりになった。

dequeue & execute はこの仕組みでできる。enqueue は単に push @tasks, mk_task すればいいだろう (後でやってみよう (一旦キューが空になって実行されているタスクもなくなっちゃうと drain_queue をキックしてやらないといけないな。push するたびに drain_queue をキックすればいいのかな))。AnyEvent::HTTP の _slot_schedule 関数は drain_queue 関数でやっていることのほかに、ホストごとに「アクティブなタスクの数のカウンタとタスクキューの対」のオブジェクト(無名配列で実装されている)を持ち、それが必要なくなったときには delete する、という処理も含まれている。

ところで

元々、スクリプトの前半、mk_task の中身は「cal コマンドを実行して 2010 年のある月が何曜日で始まるか」を調べるものだった。

use strict;
use warnings;
use utf8;
use Encode;
use AnyEvent;

# wait と出力タイミング                                                         
# 1------7------12--                                                            
# 2---5---8--10----                                                             
# 3--4--6--9--11--                                                              
#   || |||| || | |||                                                            
#=> 32 4156 89 711|12                                                           
#                 10                                                            
my %wait = do { my $i = 1; map { $i++ => $_ } qw/7 4 3 3 4 3 7 3 3 5 3 3/ };
my %week = do { my $i = 0; map { 1+3*$i++ => $_ } qw/日 月 火 水 木 金 土/ };

my $cv = AnyEvent->condvar;

sub mk_task {
    my $m = shift;
    $cv->begin;
    sub {
        my $guard = shift;
        my $wait = $wait{$m};
        my $pid = open my $fh, "(sleep $wait; cal $m 2010)|" or die "$!";
        warn "start $m (wait = $wait sec.)\n";
        my $w; $w = AnyEvent->io( fh => \$fh, pool => 'r',
                                  cb => sub {
                                      my $line;
                                      $line = <$fh>;
                                      $line = <$fh>;
                                      chomp($line = <$fh>);
                                      my $week = $week{index($line, '1')};
                                      my $msg = "$m 月は${week}曜日始まり\n";
                                      warn encode_utf8($msg);
                                      undef $guard;
                                      undef $w;
                                      close $fh;
                                      $cv->end;
                                  });
        };
}

しかし、期待した動作にはならなかった。指定した wait 後にコールバックが実行されるのだが、$line = <$fh> で読み込むまでにタイムラグが発生している模様。よくわからない。それに 2 回に 1 回くらいの割合でアボートする。

start 1 (wait = 7 sec.)
start 2 (wait = 4 sec.)
start 3 (wait = 3 sec.)
3 月は月曜日始まり
start 4 (wait = 3 sec.)
2 月は月曜日始まり
start 5 (wait = 4 sec.)
1 月は金曜日始まり
start 6 (wait = 3 sec.)
6 月は火曜日始まり
start 7 (wait = 7 sec.)
5 月は土曜日始まり
start 8 (wait = 3 sec.)
4 月は木曜日始まり
start 9 (wait = 3 sec.)
9 月は水曜日始まり
start 10 (wait = 5 sec.)
8 月は日曜日始まり
start 11 (wait = 3 sec.)
7 月は木曜日始まり
start 12 (wait = 3 sec.)
12 月は水曜日始まり
11 月は月曜日始まり
10 月は金曜日始まり