The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Parallel::DataPipe - parallel data processing conveyor

SYNOPSIS

use Parallel::DataPipe;
Parallel::DataPipe::run {
    input => [1..100],
    process => sub { "$_:$$" },
    number_of_data_processors => 100,
    output => sub { print "$_\n" },
};

DESCRIPTION

If you have some long running script processing data item by item (having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc) you can speed it up 4-20 times using parallel datapipe conveyour. Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24! And huge amount of memory: memory is cheap now. So they are ready for parallel data processing. With this script there is an easy and flexible way to use that power.

So what are the benefits of this module?

1) because it uses input_iterator it does not have to know all input data before starting parallel processing

2) because it uses merge_data processed data is ready for using in main thread immediately.

1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.

If you don't want to overload your database with multiple simultaneous queries you make queries only within input_iterator and then process_data and then flush it with merge_data. On the other hand you usually win if make queries in process_data and do a lot of data processors. Possibly even more then physical cores if database queries takes a long time and then small amount to process.

It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one.

Make tests and you will know.

To (re)write your script for using all processing power of your server you have to find out:

1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.

2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.

3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.

Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations

SUBROUTINES

run

This is subroutine which covers magic of parallelizing data processing. It receives paramaters with these keys via hash ref.

input - reference to array or subroutine which should return data item to be processed. in case of subroutine it should return undef to signal EOF. In case of array it uses it as queue, i.e. shift(@$array) until there is no data item, This behaviour has been introduced in 0.06. Also you can use these aliases: input_iterator, queue, data

Note: in version before 0.06 it was input_iterator and if reffered to array it remained untouched.
while new behaviour is to treat this parameter like a queue.
0.06 support old behaviour only for input_iterator,
while in the future it will behave as a queue to make life easier

process - reference to subroutine which process data items. they are passed via $_ variable Then it should return processed data. this subroutine is executed in forked process so don't use any shared resources inside it. Also you can update children state, but it will not affect parent state. Also you can use these aliases: process_data

These parameters are optional and has reasonable defaults, so you change them only know what you do

output - optional. either reference to a subroutine or array which receives processed data item. subroutine can use $_ or $_[0] to access data item and $_[1] to access item_number. this subroutine is executed in parent thread, so you can rely on changes that it made. if you don't specify this parameter array with processed data can be received as a subroutine result. You can use this aliseases for this parameter: merge_data, merge

number_of_data_processors - (optional) number of parallel data processors. if you don't specify, it tries to find out a number of cpu cores and create the same number of data processor children. It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT. If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env. It makes sense to have explicit number_of_data_processors which possibly is greater then cpu cores number if you are to use all slave DB servers in your environment and making query to DB servers takes more time then processing returned data. Otherwise it's optimal to have number_of_data_processors equal to number of cpu cores.

freeze, thaw - you can use alternative serializer. for example if you know that you are working with array of words (0..65535) you can use freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]} which will reduce the amount of bytes exchanged between processes. But do it as the last optimization resort only. In fact automatic choise is quite good and efficient. It uses encode_sereal and decode_sereal if Sereal module is found. Otherwise it use Storable freeze and thaw.

Note: run has also undocumented prototype for calling (\@\$) i.e.

my @x2 = Parallel::DataPipe::run([1..100],sub {$_*2});

This feature is experimental and can be removed. Use it at your own risk.

pipeline

pipeline() is a chain of run() (parallel data pipes) executed in parallel and input for next pipe is implicitly got from previous one.

run {input => \@queue, process => \&process, output => \@out}

is the same as

pipeline {input => \@queue, process => \&process, output => \@out}

But with pipeline you can create chain of connected pipes and run all of them in parallel like it's done in unix with processes pipeline.

pipeline(
  { input => \@queue, process => \&process1},
  { process => \&process2},
  { process => \&process3, output => sub {print "$_\n";} },
);

And it works like in unix - input of next pipe is (implicitly) set to output from previous pipe. You have to specify input for the first pipe explicitly (see example of parallel grep 'hello' below ).

If you don't specify input for next pipe it is assumed that it is output from previous pipe like in unix. Also this assumption that input of next pipe depends on output of previous is applied for algorithm on prioritizing of execution of pipe processors. As long as the very right (last in list) pipe has input items to process it executes it's data processors. If this pipe has free processor that is not loaded with data then the processors from previous pipe are executed to produce an input data for next pipe. This is recursively applied for all chain of pipes.

Here is parallel grep implemented in 40 lines of perl code:

use List::More qw(part);
my @dirs = '.';
my @files;
pipeline(
  # this pipe looks (recursively) for all files in specified @dirs
  {
      input => \@dirs,
      process => sub {
          my ($files,$dirs) = part -d?1:0,glob("$_/*");
          return [$files,$dirs];
      },
      output => sub {
          my ($files,$dirs) = @$_;
          push @dirs,@$dirs;# recursion is here
          push @files,@$files;
      },
  },
  # this pipe grep files for word hello
  {
      input => \@files,
      process => sub {
          my ($file) = $_;
          open my $fh, $file;
          my @lines;
          while (<$fh>) {
              # line_number : line
              push @lines,"$.:$_" if m{hello};
          }
          return [$file,\@lines];
      },
      output => sub {
          my ($file,$lines) = @$_;
          # print filename, line_number , line
          print "$file:$_" for @$lines;
      }
  }
);

HOW parallel pipe (run) WORKS

1) Main thread (parent) forks number_of_data_processors of children for processing data.

2) As soon as data comes from input_iterator it sends it to next child using pipe mechanizm.

3) Child processes data and returns result back to parent using pipe.

4) Parent firstly fills up all the pipes to children with data and then starts to expect processed data on pipes from children.

5) If it receives result from chidlren it sends processed data to data_merge subroutine, and starts loop 2) again.

6) loop 2) continues until input data is ended (end of input_iterator array or input_iterator sub returned undef).

7) In the end parent expects processed data from all busy chidlren and puts processed data to data_merge

8) After having all the children sent processed data they are killed and run returns to the caller.

Note: If input_iterator or <process_data> returns reference, it serialize/deserialize data before/after pipe. That way you have full control whether data will be serialized on IPC.

SEE ALSO

fork

subs::parallel

Parallel::Loops

MCE

IO::Pipely - pipes that work almost everywhere

POE - portable multitasking and networking framework for any event loop

forks

threads

DEPENDENCIES

Only core modules are used.

if found it uses Sereal module for serialization instead of Storable as the former is more efficient.

BUGS

For all bugs please send an email to okharch@gmail.com.

SOURCE REPOSITORY

See the git source on github https://github.com/okharch/Parallel-DataPipe

COPYRIGHT

Copyright (c) 2013 Oleksandr Kharchenko <okharch@gmail.com>

All right reserved. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

AUTHOR

Oleksandr Kharchenko <okharch@gmail.com>