NAME

Parallel::Queue - OO and imperitive interface for forking queues.

SYNOPSIS

############################################################
# simple queue is an array of subrefs that get dispatched
# N-way parallel (or single-file if forking is turned off).
#
# subs returning non-zero will abort the queue, returning
# the unused portion for your money back.

use Parallel::Queue;

my @queue 
= map
{
    -s > 8192 
    ? sub{ squish $_ }
    :
    ()
}
glob $glob;

my @remaining = runqueue 4, @queue;

die "Incomplete jobs" if @remaining;

# for testing forking can be explicitly turned off.
# 'configure' procsses the same arguments as import.
#
# these have the same results:

use Parallel::Queue qw( nofork );

Parallel::Queue->configure(  qw( nofork ) );

# debugging turns on verbose, off fork.
# these are indentical.

use Parallel::Queue qw( debug );
use Parallel::Queue qw( nofork debug );


############################################################
# if an object is found on the queue that can 'next_job' 
# then it the result of its next_job() should be a subref.
# the object should contain its own queue and return false
# when the queue is completed.
#
# nice thing about this approach is saving time and memory
# not having to create a thousand closures and being able
# to easily read the unfinished queue.

my @queue   = map { ... } glob $glob;

my $squash_me
= do
{
    package Squash;

    my $squish  = Some::Package->can( 'squish' );

    sub next_job
    {
        my $que     = shift;
        my $path    = shift @$que
        or return;

        sub{ $path->$squish }
    }

    bless [ @queue ], __PACKAGE__
};


runqueue 4 => $squash_me;

log_error "Unfinished business\n", @$squash_me
if @$squash_me;

############################################################
# subrefs and objects can be mixed on the stack.
# 
# any objects that can( 'next_job' ) get called until they
# return false, at which point the stack is advance and the
# next item can be a subref or object.
#
# when all of the objects have finished returning new
# jobs then any subrefs left on the stack are executed
#
# 'finish' reports non-zero exits from forked jobs but 
# does not stop the queue. without it the first non-zero
# exit aborts the queue.

use Parallel::Queue qw( finish );

my @daily_cleanups = 
(
    Cleanup->new( $path_to_logs => 'rm', '14'   ) 
  , Cleanup->new( $path_to_logs => 'gzip', 1    ) 
  , Cleanup->can( 'sync_data_access'            )
  , Cleanup->new( $path_to_data => 'gzip' 2     )
  , Cleanup->can( 'unlock_data_access'          )
);

runqueue 8, @daily_cleanups;

############################################################
# verbose turns on lightweight progress reports. 
# nofork runs the queue within the current process.
# export=<name> allows using a different name for 'runqueue'
# or not exporting it at all.
# debug turns on nofork + verbose.  
#
# all of the terms can be prefixed with 'no' to invert them.


use Parallel::Queue qw( nofork );           # mainly for testing
use Parallel::Queue qw( verbose);           # progress messages

use Parallel::Queue qw( debug );            # verbose + nofork

use Parallel::Queue qw( finish  );          # don't stop

use Parallel::Queue qw( export=run_this );  # install "run_this"
use Parallel::Queue qw( noexport );

# options other than 'export' can be set at runtime 
# via confgure.

Parallel::Queue->configure( qw( debug finish ) )
if $^P;

DESCRIPTION

This module is mostly boilerplate around fork, reporting the outcome of forked jobs. Jobs arrive either as coderefs or objects that can "next_job" (blessed coderefs which cannot "next_job" are treated as ordinary coderefs). Jobs can be dispatched with or without forking.

Queues are executed by passing them runqueue with a non- negative integer job count. If the count is zero then jobs are not forked; otherwise the count (up to the queue size) of jobs is initially forked and a wait-loop forks additional as each job exits until the queue is consumed.

If any jobs return a non-zero exit code then the default behavior is to abort the queue, reap any existing forks, and return the remaining jobs. If the "finish" option is selected then jobs will continue to be forked until the queue is finished, regardless of errors.

Arguments to use (or configure).

With the exeptions of "export" and "debug", arguments are passed as named flags, with a "no" prefix turning off the feature.

export=<name>

This is only dealt with as an argument to "use" (i.e., in import). This allows renaming the exported sub from "runqueue" to any other valid Perl subname or turning off the export with "noexport".

Examples are:

qw( export=run_this )   # alternate subname.
qw( export          )   # default, gets "runqueue".
qw( noexport        )   # no sub is exported.
debug

This has no corresponding "nodebug", it is equivalent to using qw( verbose nofork ).

finish (default nofinish)

This causes the queue to finsih even if there are non-zero exits on the way. Exits will be logged but will not stop the queue.

verbose (default noverbose)

This adds some progress messages (e.g., pid's forked, reaped, class of an object used for dispatch).

KNOWN ISSUES

Non-numeric count arguments.

The runqueue sub uses Scalar::Util::looks_like_number validate the count. This may cause problems for objects which don't look like numbers.

SEE ALSO

Parallel::Queue::Manager

OO Interface to runqueue with re-usable queue manager.

Debugging forks.

<http://perlmonks.org/index.pl?node_id=128283>

COPYRIGHT

This code is released under the same terms as Perl-5.24 or any later version of Perl.

AUTHOR

Steven Lembark <lembark@wrkhors.com>