NAME

Async::Queue - control concurrency of asynchronous tasks

VERSION

Version 0.021

SYNOPSIS

use Async::Queue;

## create a queue object with concurrency 2
my $q = Async::Queue->new(
    concurrency => 2, worker => sub {
        my ($task, $callback) = @_;
        print "hello $task->{name}\n";
        $callback->();
    }
);

## assign a callback
$q->drain(sub {
    print "all items have been processed\n";
});

## add some items to the queue
$q->push({name => 'foo'}, sub {
    print "finished processing foo\n";
});
$q->push({name => 'bar'}, sub {
    print "finished processing bar\n";
});

DESCRIPTION

Async::Queue is used to process tasks with the specified concurrency. The tasks given to Async::Queue are processed in parallel with its worker routine up to the concurrency level. If more tasks arrive at the Async::Queue object, those tasks will wait for currently running tasks to finish. When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order.

In short, Async::Queue is a Perl port of the queue object of async.js (https://github.com/caolan/async#queue).

The basic usage of Async::Queue is as follows:

  1. Create Async::Queue object with worker attribute and optional concurrency attribute. worker is a subroutine reference that processes tasks. concurrency is the concurrency level.

  2. Push tasks to the Async::Queue object via push() method with optional callback functions.

    The tasks will be processed in FIFO order by the worker subroutine. When a task is finished, the callback function, if any, is called with the results.

CLASS METHODS

$queue = Async::Queue->new(%attributes);

Creates an Async::Queue object.

It takes named arguments to initialize attributes of the Async::Queue object. See "ATTRIBUTES" for the list of the attributes.

worker attribute is mandatory.

ATTRIBUTES

An Async::Queue object has the following attributes.

You can initialize the attributes in new() method. You can get and set the attributes of an Async::Queue object via their accessor methods (See "OBJECT METHODS").

Note that you cannot set any attribute listed here while there is a task running in the Async::Queue object. This is because changing the attributes during task execution is very confusing and leads to unpredictable behavior. So if you want to set an attribute, make sure there is no task running (running() method can be useful).

worker (CODE($task, $callback, $queue), mandatory)

worker attribute is a subroutine reference that processes a task. It must not be undef.

worker subroutine reference takes three arguments, $task, $callback and $queue.

$task is the task object the worker is supposed to process.

$callback is a callback subroutine reference that worker must call when the task is finished. $callback can take any list of arguments, which will be passed to the $finish_callback given to the push() method (See "OBJECT METHODS").

$queue is the Async::Queue object that holds the worker.

So the worker attribute is something like:

my $q = Async::Queue->new(worker => sub {
    my ($task, $callback, $queue) = @_;
    my @results = some_processing($task);
    $callback->(@results);
});

You can do asynchonous processing by deferring the call to $callback:

my $q = Async::Queue->new(worker => sub {
    my ($task, $callback, $queue) = @_;
    some_async_processing($task, on_finish => sub {
        my @results = @_;
        $callback->(@results);
    });
});

concurrency (INT, optional, default = 1)

concurrency attribute is the maximum number of tasks that can be processed at the same time. It must be an integer number.

If concurrency is set to 0 or any negative number, the concurrency level becomes infinite, i.e. pushed tasks are immediately processed no matter how many are already running.

If concurrency is set to undef (or omitted in new() method), it will be 1.

saturated (CODE($queue), optional, default = undef)

saturated attribute is a subroutine reference that is called when the number of running tasks hits concurrency. This means further tasks will wait in the queue.

saturated subroutine reference takes one argument ($queue), which is the Async::Queue object holding it.

empty (CODE($queue), optional, default = undef)

empty attribute is a subroutine reference that is called when the last task from the queue is given to the worker. This means there is no task waiting in the Async::Queue object.

If the Async::Queue object is not saturated, empty subroutine is called every time a task is pushed. This is because every pushed task goes into the queue first even if the Async::Queue object can process the task immediately.

empty subroutine reference takes one argument ($queue), which is the Async::Queue object holding it.

drain (CODE($queue), optional, default = undef)

drain attribute is a subroutine reference that is called when the last task in the Async::Queue object has finished. This means there is no task running or waiting in the Async::Queue object.

drain subroutine reference takes one argument ($queue), which is the Async::Queue object holding it.

OBJECT METHODS

$queue->push($task, [$finish_callback->(@results)] );

Pushes a task into the Async::Queue object. The argument $task is mandatory, while $finish_callback is optional.

$task is a task that the worker will process. It will be given as the $task argument to the worker subroutine.

$finish_callback is a subroutine reference that will be called when the worker finishes processing the task. The arguments for $finish_callback (@results) are the arguments for the $callback subroutine reference in the worker subroutine.

push() method returns the Async::Queue object.

$running_num = $queue->running();

Returns the number of currently running tasks in the Async::Queue object.

$waiting_num = $queue->waiting();

Returns the number of waiting tasks in the Async::Queue object.

$waiting_num = $queue->length();

Alias for waiting() method. It returns the number of waiting tasks in the Async::Queue object.

$worker = $queue->worker([$new_worker]);

Accessor for the worker attribute.

$concurrency = $queue->concurrency([$new_concurrency]);

Accessor for the concurrency attribute.

$saturated = $queue->saturated([$new_saturated]);

Accessor for the saturated attribute.

$empty = $queue->empty([$new_empty]);

Accessor for the empty attribute.

$drain = $queue->drain([$new_drain]);

Accessor for the drain attribute.

EXAMPLE

Concurrent HTTP downloader

use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use Async::Queue;

my $q = Async::Queue->new(concurrency => 3, worker => sub {
    my ($url, $callback) = @_;
    print STDERR "Start $url\n";
    http_get $url, sub {
        my ($data, $headers) = @_;
        print STDERR "End $url\n";
        $callback->($data);
    };
});

my @urls = (
    'http://www.debian.org/',
    'http://www.ubuntu.com/',
    'http://fedoraproject.org/',
    'http://www.opensuse.org/',
    'http://www.centos.org/',
    'http://www.slackware.com/',
    'http://www.gentoo.org/',
    'http://www.archlinux.org/',
    'http://trisquel.info/',
);

my %results = ();
my $cv = AnyEvent->condvar;
foreach my $url (@urls) {
    $cv->begin();
    $q->push($url, sub {
        my ($data) = @_;
        $results{$url} = $data;
        $cv->end();
    });
}
$cv->recv;

foreach my $key (keys %results) {
    print STDERR "$key: " . length($results{$key}) . "bytes\n";
}

This example uses AnyEvent::HTTP to send HTTP GET requests for multiple URLs simultaneously. While simultaneous requests dramatically improve efficiency, it may overload the client host and/or the network.

This is where Async::Queue comes in handy. With Async::Queue you can control the concurrency level of the HTTP sessions (in this case, up to three).

SEE ALSO

AnyEvent::FIFO

The goal of AnyEvent::FIFO is the same as that of Async::Queue: to control concurrency level of asynchronous tasks. The big difference is that AnyEvent::FIFO is a queue of subroutines while Async::Queue is a queue of tasks (data). In Async::Queue, worker subroutine is registered with the object in advance. In AnyEvent::FIFO, it is workers that are pushed to the queue.

You can emulate AnyEvent::FIFO with Async::Queue by pushing subroutine references to it as tasks.

AUTHOR

Toshio Ito, <debug.ito at gmail.com>

REPOSITORY

https://github.com/debug-ito/Async-Queue

BUGS

Please report any bugs or feature requests to bug-async-queue at rt.cpan.org, or through the web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Async-Queue. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.

SUPPORT

You can find documentation for this module with the perldoc command.

perldoc Async::Queue

You can also look for information at:

LICENSE AND COPYRIGHT

Copyright 2012 Toshio Ito.

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.