NAME
Async::Queue - control concurrency of asynchronous tasks
VERSION
Version 0.021
SYNOPSIS
use Async::Queue;
## create a queue object with concurrency 2
my $q = Async::Queue->new(
concurrency => 2, worker => sub {
my ($task, $callback) = @_;
print "hello $task->{name}\n";
$callback->();
}
);
## assign a callback
$q->drain(sub {
print "all items have been processed\n";
});
## add some items to the queue
$q->push({name => 'foo'}, sub {
print "finished processing foo\n";
});
$q->push({name => 'bar'}, sub {
print "finished processing bar\n";
});
DESCRIPTION
Async::Queue is used to process tasks with the specified concurrency. The tasks given to Async::Queue are processed in parallel with its worker routine up to the concurrency level. If more tasks arrive at the Async::Queue object, those tasks will wait for currently running tasks to finish. When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order.
In short, Async::Queue is a Perl port of the queue
object of async.js (https://github.com/caolan/async#queue).
The basic usage of Async::Queue is as follows:
Create Async::Queue object with
worker
attribute and optionalconcurrency
attribute.worker
is a subroutine reference that processes tasks.concurrency
is the concurrency level.Push tasks to the Async::Queue object via
push()
method with optional callback functions.The tasks will be processed in FIFO order by the
worker
subroutine. When a task is finished, the callback function, if any, is called with the results.
CLASS METHODS
$queue = Async::Queue->new(%attributes);
Creates an Async::Queue object.
It takes named arguments to initialize attributes of the Async::Queue object. See "ATTRIBUTES" for the list of the attributes.
worker
attribute is mandatory.
ATTRIBUTES
An Async::Queue object has the following attributes.
You can initialize the attributes in new()
method. You can get and set the attributes of an Async::Queue object via their accessor methods (See "OBJECT METHODS").
Note that you cannot set any attribute listed here while there is a task running in the Async::Queue object. This is because changing the attributes during task execution is very confusing and leads to unpredictable behavior. So if you want to set an attribute, make sure there is no task running (running()
method can be useful).
worker (CODE($task, $callback, $queue), mandatory)
worker
attribute is a subroutine reference that processes a task. It must not be undef
.
worker
subroutine reference takes three arguments, $task
, $callback
and $queue
.
$task
is the task object the worker
is supposed to process.
$callback
is a callback subroutine reference that worker
must call when the task is finished. $callback
can take any list of arguments, which will be passed to the $finish_callback
given to the push()
method (See "OBJECT METHODS").
$queue
is the Async::Queue object that holds the worker.
So the worker
attribute is something like:
my $q = Async::Queue->new(worker => sub {
my ($task, $callback, $queue) = @_;
my @results = some_processing($task);
$callback->(@results);
});
You can do asynchonous processing by deferring the call to $callback
:
my $q = Async::Queue->new(worker => sub {
my ($task, $callback, $queue) = @_;
some_async_processing($task, on_finish => sub {
my @results = @_;
$callback->(@results);
});
});
concurrency (INT, optional, default = 1)
concurrency
attribute is the maximum number of tasks that can be processed at the same time. It must be an integer number.
If concurrency
is set to 0 or any negative number, the concurrency level becomes infinite, i.e. pushed tasks are immediately processed no matter how many are already running.
If concurrency
is set to undef
(or omitted in new()
method), it will be 1.
saturated (CODE($queue), optional, default = undef)
saturated
attribute is a subroutine reference that is called when the number of running tasks hits concurrency
. This means further tasks will wait in the queue.
saturated
subroutine reference takes one argument ($queue
), which is the Async::Queue object holding it.
empty (CODE($queue), optional, default = undef)
empty
attribute is a subroutine reference that is called when the last task from the queue is given to the worker. This means there is no task waiting in the Async::Queue object.
If the Async::Queue object is not saturated, empty
subroutine is called every time a task is pushed. This is because every pushed task goes into the queue first even if the Async::Queue object can process the task immediately.
empty
subroutine reference takes one argument ($queue
), which is the Async::Queue object holding it.
drain (CODE($queue), optional, default = undef)
drain
attribute is a subroutine reference that is called when the last task in the Async::Queue object has finished. This means there is no task running or waiting in the Async::Queue object.
drain
subroutine reference takes one argument ($queue
), which is the Async::Queue
object holding it.
OBJECT METHODS
$queue->push($task, [$finish_callback->(@results)] );
Pushes a task into the Async::Queue object. The argument $task
is mandatory, while $finish_callback
is optional.
$task
is a task that the worker will process. It will be given as the $task
argument to the worker
subroutine.
$finish_callback
is a subroutine reference that will be called when the worker finishes processing the task. The arguments for $finish_callback
(@results
) are the arguments for the $callback
subroutine reference in the worker
subroutine.
push()
method returns the Async::Queue object.
$running_num = $queue->running();
Returns the number of currently running tasks in the Async::Queue object.
$waiting_num = $queue->waiting();
Returns the number of waiting tasks in the Async::Queue object.
$waiting_num = $queue->length();
Alias for waiting()
method. It returns the number of waiting tasks in the Async::Queue object.
$worker = $queue->worker([$new_worker]);
Accessor for the worker
attribute.
$concurrency = $queue->concurrency([$new_concurrency]);
Accessor for the concurrency
attribute.
$saturated = $queue->saturated([$new_saturated]);
Accessor for the saturated
attribute.
$empty = $queue->empty([$new_empty]);
Accessor for the empty
attribute.
$drain = $queue->drain([$new_drain]);
Accessor for the drain
attribute.
EXAMPLE
Concurrent HTTP downloader
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use Async::Queue;
my $q = Async::Queue->new(concurrency => 3, worker => sub {
my ($url, $callback) = @_;
print STDERR "Start $url\n";
http_get $url, sub {
my ($data, $headers) = @_;
print STDERR "End $url\n";
$callback->($data);
};
});
my @urls = (
'http://www.debian.org/',
'http://www.ubuntu.com/',
'http://fedoraproject.org/',
'http://www.opensuse.org/',
'http://www.centos.org/',
'http://www.slackware.com/',
'http://www.gentoo.org/',
'http://www.archlinux.org/',
'http://trisquel.info/',
);
my %results = ();
my $cv = AnyEvent->condvar;
foreach my $url (@urls) {
$cv->begin();
$q->push($url, sub {
my ($data) = @_;
$results{$url} = $data;
$cv->end();
});
}
$cv->recv;
foreach my $key (keys %results) {
print STDERR "$key: " . length($results{$key}) . "bytes\n";
}
This example uses AnyEvent::HTTP to send HTTP GET requests for multiple URLs simultaneously. While simultaneous requests dramatically improve efficiency, it may overload the client host and/or the network.
This is where Async::Queue comes in handy. With Async::Queue you can control the concurrency level of the HTTP sessions (in this case, up to three).
SEE ALSO
- AnyEvent::FIFO
-
The goal of AnyEvent::FIFO is the same as that of Async::Queue: to control concurrency level of asynchronous tasks. The big difference is that AnyEvent::FIFO is a queue of subroutines while Async::Queue is a queue of tasks (data). In Async::Queue, worker subroutine is registered with the object in advance. In AnyEvent::FIFO, it is workers that are pushed to the queue.
You can emulate AnyEvent::FIFO with Async::Queue by pushing subroutine references to it as tasks.
AUTHOR
Toshio Ito, <debug.ito at gmail.com>
REPOSITORY
https://github.com/debug-ito/Async-Queue
BUGS
Please report any bugs or feature requests to bug-async-queue at rt.cpan.org
, or through the web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Async-Queue. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.
SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Async::Queue
You can also look for information at:
RT: CPAN's request tracker (report bugs here)
AnnoCPAN: Annotated CPAN documentation
CPAN Ratings
Search CPAN
LICENSE AND COPYRIGHT
Copyright 2012 Toshio Ito.
This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.
See http://dev.perl.org/licenses/ for more information.