NAME
IO::Concurrent - Concurrent I/O framework
SYNOPSIS
use IO::Concurrent;
use IO::Concurrent::Scenario;
my @handlers = map {
$_->blocking(0); # should be non-blocking mode
$_;
} ($fh1, $fh2, ...);
my %items_map;
IO::Concurrent->new(
handlers => \@handlers,
)->run(
IO::Concurrent::Scenario->new->wait_for_writable(sub {
my $context = shift;
$context->handler->syswrite("stats items\r\n");
$context->next();
})->wait_for_readable(sub {
my $context = shift;
my $items_for_target = ($items_map{$context->handler->fileno} ||= {});
while (my $line = $context->handler->getline()) {
$line =~ s/\r\n$//; # chomp
if ($line eq 'END') {
$context->next();
return;
} elsif ($line =~ /^STAT items:(\d*):number (\d*)/) {
$items_for_target->{$1} = $2;
next;
}
# ignore others
}
$context->abort('Assertion failuer: terminator payload "END" has not come, but it EOF received');
})->wait_for_writable(sub {
my $context = shift;
my $items_for_target = $items_map{$context->handler->fileno};
my ($target_bucket) = keys %$items_for_target; # fetch one bucket id
unless ($target_bucket) {
$context->complete();
return;
}
my $bucket_items = delete $items_for_target->{$target_bucket};
$context->handler->syswrite("stats cachedump $target_bucket $bucket_items\r\n");
$context->next();
})->wait_for_readable(sub {
my $context = shift;
my $items_for_target = $items_map{$context->handler->fileno};
while (my $line = $context->handler->getline()) {
$line =~ s/\r\n$//; # chomp
if ($line eq 'END') {
my $has_more = keys %$items_for_target;
if (defined $has_more) {
$context->back();
} else {
$context->next();
}
return;
} elsif ($line =~ /^ITEM (\S+) \[.* (\d+) s\]/) {
my ($key, $expires) = ($1, $2);
print "key:$key\texpires:$expires\n";
next;
}
$context->abort("Invalid payload: $line");
}
$context->abort('Assertion failuer: terminator payload "END" has not come, but it EOF received');
})->wait_for_writable(sub {
my $context = shift;
$context->handler->blocking(1);
$context->handler->write("quit\r\n");
$context->handler->close();
})
);
my %items_map;
IO::Concurrent->new(
handlers => \@handlers,
)->run(
IO::Concurrent::Scenario->new->wait_for_writable(sub {
my $context = shift;
$context->handler->write("stats items\r\n");
$context->next();
})->wait_for_readable(sub {
my $context = shift;
my $items_for_target = ($items_map{$context->handler->fileno} ||= {});
while (my $line = $context->handler->getline()) {
$line =~ s/\r\n$//; # chomp
if ($line eq 'END') {
$context->next();
return;
} elsif ($line =~ /^STAT items:(\d*):number (\d*)/) {
$items_for_target->{$1} = $2;
}
$context->abort("Invalid payload: $line");
}
$context->abort('Assertion failuer: terminator payload "END" has not come, but it EOF received');
})->wait_for_writable(sub {
my $context = shift;
my $items_for_target = $items_map{$context->handler->fileno};
my ($target_bucket) = keys %$items_for_target; # fetch one bucket id
my $bucket_items = delete $items_for_target->{$target_bucket};
$context->handler->write("stats cachedump $target_bucket $bucket_items\r\n");
$context->next();
})->wait_for_readable(sub {
my $context = shift;
my $items_for_target = $items_map{$context->handler->fileno};
while (my $line = $context->handler->getline()) {
$line =~ s/\r\n$//; # chomp
if ($line eq 'END') {
my $has_more = keys %$items_for_target;
if ($has_more) {
$context->back();
} else {
$context->next();
}
return;
} elsif ($line =~ /^ITEM (\S+) \[.* (\d+) s\]/) {
my ($key, $expires) = @_;
print "key:$key\texpires:$expires\n";
}
$context->abort("Invalid payload: $line");
}
$context->abort('Assertion failuer: terminator payload "END" has not come, but it EOF received');
})->wait_for_writable(sub {
my $context = shift;
$context->handler->blocking(1);
$context->handler->write("quit\r\n");
$context->handler->close();
})
);
DESCRIPTION
IO::Concurrent is a concurrent I/O framework.
When implements concurrent non-blocking I/O using select(2)
or others (without AnyEvent/IO::AIO), it makes many complex procedural codes. This framework makes easy to write it by scenarios.
LICENSE
Copyright (C) karupanerura.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
karupanerura <karupa@cpan.org>