Why not adopt me?
NAME
Proc::Parallel::Unreliable - maintain a pool of unreliable slave processes
SYNOPSIS
use Proc::Parallel::Unreliable;
run_subprocess(
input => $input_filehandle,
output => $output_callback_function,
cb => $slave_process_start_function,
count => $number_of_slaves_to_run,
timeout => $maximum_time_to_process_a_line,
input_id => $callback_to_identify_input,
input_hlines => $number_of_header_lines_in_input_stream,
output_hlines => $number_of_header_lines_in_output_stream,
output_id => $callback_to_identify_output,
items => $number_of_buffer_lines_per_slave,
track_items => $identify_items_flag,
debug => $debug_level,
pidfile => $process_id_file_to_update,
);
DESCRIPTION
This module deals with a particular situation: needing to run code that may crash or hang while processing a stream of data.
It reads input, farms the input out to one or more slave process, collects the output from the slave processes, and writes a single stream of output.
Each line of input must be self-contained because it can be randomly sent to any of the slaves.
It has two modes: tracking items, or not tracking items. When it tracks items, it uses the input_id
callback to identify the input items and the output_id
callback to identify the results it gets back from the slaves. When tracking items, the slave processes must produce output for each input item: they'll be killed off if they don't.
When not tracking items, output is sent to the slave processes until the buffers fill up. It's not as elegant.
If the input stream or the output stream has headers, the input stream headers will be reproduced for each slave and the output stream headers will be suppresssed except for one slave. Use the input_hlines
and output_hlines
parameters for this.
Output is written with a callbac: output
.
The process id file, pidfile
will be updated to show the current set of processes.
EXAMPLE
run_subprocess(
output => sub {
print $output $o;
},
input => $input,
cb => sub {
my ($my_input_fd, $my_output_fd) = @_;
run_transformation($my_input_fd,$my_output_fd);
},
count => 3,
timeout => 60,
input_id => sub {
my ($iref) = @_;
$$iref =~ /^(\d+)/
return $1;
},
input_hlines => 0,
output_hlines => 1,
output_id => sub {
my ($oref) = @_;
$$iref =~ /^(\d+)/
return $1;
},
items => 4,
track_items => 1,
debug => $debug,
pidfile => "/var/run/myprocess.pid",
);
LICENSE
This package may be used and redistributed under the terms of either the Artistic 2.0 or LGPL 2.1 license.