NAME

Parallel::Fork::BossWorkerAsync - Perl extension for creating asynchronous forking queue processing applications.

SYNOPSIS

use Parallel::Fork::BossWorkerAsync ();
my $bw = Parallel::Fork::BossWorkerAsync->new(
  work_handler    => \&work,
  result_handler  => \&handle_result,
  global_timeout  => 2,
);

# Jobs are hashrefs
$bw->add_work( {a => 3, b => 4} );
while ($bw->pending()) {
  my $ref = $bw->get_result();
  if ($ref->{ERROR}) {
    print STDERR $ref->{ERROR};
  } else {
    print "$ref->{product}\n";
    print "$ref->{string}\n";
  }
}
$bw->shut_down();

sub work {
  my ($job)=@_;

  # Uncomment to test timeout
  # sleep(3);
  
  # Uncomment to test worker error
  # die("rattle");
  
  # do something with hash ref $job
  my $c = $job->{a} * $job->{b};

  
  # Return values are hashrefs
  return { product => $c };
}

sub handle_result {
  my ($result)=@_;
  if (exists($result->{product})) {
    $result->{string} = "the answer is: $result->{product}";
  }
  return $result;
}

__END__
Prints:
12
the answer is: 12

DESCRIPTION

Parallel::Fork::BossWorkerAsync is a multiprocess preforking server. On construction, the current process forks a "Boss" process (the server), which then forks one or more "Worker" processes. The Boss acts as a manager, accepting jobs from the main process, queueing and passing them to the next available idle Worker. The Boss then listens for, and collects any responses from the Workers as they complete jobs, queueing them for the main process.

The main process can collect available responses from the Boss, and/or send it more jobs, at any time. While waiting for jobs to complete, the main process can enter a blocking wait loop, or do something else altogether, opting to check back later.

In general, it's a good idea to construct the object early in a program's life, before any threads are spawned, and before much memory is allocated, as the Boss, and each Worker will inherit the memory footprint.

The 0.09 release includes Windows compatibility! (see Credits below)

METHODS

new(...)

Creates and returns a new Parallel::Fork::BossWorkerAsync object.

my $bw = Parallel::Fork::BossWorkerAsync->new(
  work_handler    => \&work_sub,
  result_handler  => \&result_sub,
  init_handler    => \&init_sub,
  exit_handler    => \&exit_sub,
  global_timeout  => 0,
  worker_count    => 3,
  msg_delimiter   => "\0\0\0",
  read_size       => 1024 * 1024,
);
  • work_handler => \&work_sub

    work_handler is the only required argument. The sub is called with it's first and only argument being one of the values (hashrefs) in the work queue. Each worker calls this sub each time it receives work from the boss process. The handler may trap $SIG{ALRM}, which may be called if global_timeout is specified.

    The work_handler should clean up after itself, as the workers may call the work_handler more than once.

    The work_handler is expected to return a hashref.

  • result_handler => \&result_sub

    The result_handler argument is optional. The sub is called with it's first and only argument being the return value of work_handler, which is expected to be a hashref. If defined, the boss process calls this sub each time the application requests (and receives) a result. This handler is not timed out via $SIG{ALRM}.

    The result_handler is expected to return a hashref.

  • init_handler => \&init_sub

    The init_handler argument is optional. The referenced function receives no arguments and returns nothing. It is called only once by each worker, just after it's forked off from the boss, and before entering the job processing loop. This subroutine is not affected by the value of global_timeout. This could be used to connect to a database, instantiate a non-shared object, etc.

  • exit_handler => \&exit_sub

    The exit_handler argument is optional. The referenced function receives no arguments and returns nothing. It is called only once by each worker, just before exiting. This subroutine is not affected by the value of global_timeout. This could be used to disconnect from a database, etc.

  • global_timeout => $seconds

    By default, a handler can execute forever. If global_timeout is specified, an alarm is setup to terminate the work_handler so processing can continue.

  • worker_count => $count

    By default, 3 workers are started to process the data queue. Specifying worker_count can scale the worker count to any number of workers you wish.

  • msg_delimiter => $delimiter

    Sending messages to and from the child processes is accomplished using Data::Dumper. When transmitting data, a delimiter must be used to identify the breaks in messages. By default, this delimiter is "\0\0\0". This delimiter may not appear in your data.

  • read_size => $number_of_bytes

    The default read buffer size is 1 megabyte. The application, the boss, and each worker all sysread() from their respective socket connections. Ideally, the read buffer is just large enough to hold all the data that's ready to read. Depending on your application, the default might be ridiculously large, if for example you only pass lookup keys in, and error codes out. If you're running in a memory-constrained environment, you might lower the buffer significantly, perhaps to 64k (1024 * 64), or all the way down to 1k (1024 bytes). If for example you're passing (copying) high resolution audio/video, you will likely benefit from increasing the buffer size.

    An issue has cropped up, reported in more detail under the Bugs section below. Regardless of how large you set the read buffer with this parameter, BSD ignores this, and uses 8192 bytes instead. This can be a big problem if you pass megs of data back and forth, resulting in so many small reads tha the application appears to hang. It will eventually complete, but it's not pretty. Bottom line: don't pass huge chunks of data cross-process under BSD.

add_work(\%work)

Adds work to the instance's queue. It accepts a list of hash refs. add_work() can be called at any time before shut_down(). All work can be added at the beginning, and then the results gathered, or these can be interleaved: add a few jobs, grab the results of one of them, add a few more, grab more results, etc.

Note: Jobs are not guaranteed to be processed in the order they're added. This is because they are farmed out to multiple workers running concurrently.

$bw->add_work({data => "my data"}, {data => "more stuff"}, ...);

pending()

This simple function returns a true value if there are jobs that have been submitted for which the results have not yet been retrieved.

Note: This says nothing about the readiness of the results. Just that at some point, now or in the future, the results will be available for collection.

while ($bw->pending()) { }

get_result()

Requests the next single available job result from the Boss' result queue. Returns the return value of the work_handler. If there is a result_handler defined, it's called here, and the return value of this function is returned instead. Return from either function is expected to be a hashref. Depending on what your work_handler, or result_handler, does, it may not be interesting to capture this result.

By default, get_result() is a blocking call. If there are no completed job results available, main application processing will stop here and wait.

my $href = $bw->get_result();

If you want nonblocking behavior:

my $href = $bw->get_result( blocking => 0 );
-OR-
my $href = $bw->get_result_nb();

In this case, if the call would block, because there is no result to retrieve, it returns immediately, returning undef.

shut_down()

Tells the Boss and all Workers to exit. All results should have been retrieved via get_result() prior to calling shut_down(). If shut_down() is called earlier, the queue *will* be processed, but depending on timing the subsequent calls to get_result() may fail if the boss has already written all result data into the socket buffer and exited.

$bw->shut_down();

If you just want the Boss and Workers to go away, and don't care about work in progress, use:

$bw->shut_down( force => 1 );

Error handling

Errors generated by your work_handler do not cause the worker process to die. These are stuffed in the result hash with a key of 'ERROR'. The value is $@.

If global_timeout is set, and a timeout occurs, the worker returns: { ERROR => 'BossWorkerAsync: timed out' }

BUGS

Please report bugs to jvann.cpan@gmail.com.

The Boss and Worker processes are long-lived. There is no restart mechanism for processes that exit prematurely. If it's the Boss, you're dead anyway, but if it's one or more Workers, the app will continue running, but throughput will suck.

The code should in some way overcome the tiny socket buffer limitations of BSD operating systems. Unbuffered reads are limited to 8192 byte chunks. If you pass megabytes of data with each job, the processing will not fail, but it will seem to be hung -- it can get VERY slow! This is not an issue on Linux, and will not be a problem on BSD if you pass less then say, 64k, between processes. If you know how to force an unbuffered socket read to use an arbitrarily large buffer (1 megabyte, for example), please shoot me an email.

CREDITS

I'd like to thank everyone who has reported a bug, asked a question, or offered a suggestion.

Jeff Rodriguez: wrote the module Parallel::Fork::BossWorker, which inspired this module.

Rob Navarro: reported -- and fixed! -- errors in fork() error handling, and in the reaping of dead child processes.

Mario Roy: contributed the Windows socket code.

COPYRIGHT AND LICENSE

Copyright (C) 2009-2013 by joe vannucci, <jvann.cpan@gmail.com>

All rights reserved. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.