NAME

Parallel::MapReduce - MapReduce Infrastructure, multithreaded

SYNOPSIS

  ## THIS IS ALL STILL EXPERIMENTAL!!
  ## DO NOT USE FOR PRODUCTION!!
  ## LOOK AT THE ROADMAP AND FEEDBACK WHAT YOU FIND IMPORTANT!!

  use Parallel::MapReduce;
  my $mri = new Parallel::MapReduce (MemCacheds => [ '127.0.0.1:11211', .... ],
                                     Workers    => [ '10.0.10.1', '10.0.10.2', ...]);

  my $A = {1 => 'this is something ',
           2 => 'this is something else',
           3 => 'something else completely'};

  # apply MR algorithm (word count) on $A
  my $B = $mri->mapreduce (
			     sub {
				 my ($k, $v) = (shift, shift);
				 return map { $_ => 1 } split /\s+/, $v;
			     },
			     sub {
				 my ($k, $v) = (shift, shift);
				 my $sum = 0;
				 map { $sum += $_ } @$v;
				 return $sum;
			     },
			     $A
                           );

  # prefabricate mapreducer
  my $string2lines = $mri->mapreducer (sub {...}, sub {...});
  # apply it
  my $B = &$string2lines ($A);

  # pipeline it with some other mapreducer
  my $pipeline = $mri->pipeline ($string2lines,
                                 $lines2wordcounts);

  # apply that
  my $B = &$pipeline ($A);

ABSTRACT

The MapReduce framework allows a parallel, and possibly distributed computation of CPU intensive computations on several, if not many hosts.

For this purpose you will have to formulate your problem into one which only deals with list traversal (map) and list comprehension (reduce), something which is not unnatural for Perl programmers. In effect you end up with a hash-to-hash transform and this is exactly what this package implements.

This package implements MapReduce for local invocations, parallelized (but still local) invocations and for fully distributed invocations. For the latter it is not using a file system to propagate data, but instead a pool of memcached servers.

DESCRIPTION

In a nutshell, the MapReduce algorithm is this (in sequential form):

sub mapreduce {
    my $mri    = shift;
    my $map    = shift;
    my $reduce = shift;
    my $h1     = shift;

    my %h3;
    while (my ($k, $v) = each %$h1) {
	my %h2 = &$map ($k => $v);
	map { push @{ $h3{$_} }, $h2{$_} } keys %h2;
    }
    my %h4;
    while (my ($k, $v) = each %h3) {
	$h4{$k} = &$reduce ($k => $v);
    }
    return \%h4;
}

It is the task of the application programmer to determine the functions $map and $reduce, which when applied to the hash $h1 will produce the wanted result. The infrastructure $mri is not used above, but it becomes relevant when the individual invocations of $map and $reduce are (a) parallelized or (b) are distributed. And this is what this package does.

Master

This is the host where you initiate the computation and this is where the central algorithm will be executed.

Workers

Each worker can execute either the $map function or the $reduce over the subslice of the overall data. Workers can run local simply as subroutine (see Parallel::MapReduce::Worker, or can be a thread talking to a remote instance of a worker (see Parallel::MapReduce::Worker::SSH).

When you create your MR infrastructure you can specify which kind of workers you want to use (via a WorkerClass in the constructor).

NOTE: Feel free to propose more workers.

Servers

To exchange hash data between master and workers and also between workers this package makes use of an existing memcached server pool (see http://www.danga.com/memcached/). Obviously, the more servers there are running, the merrier.

NOTE: The (Debian-packaged) Perl client is somewhat flaky in multi-threaded environments. I made some work-arounds, but other options should be investigated.

INTERFACE

Constructor

$mri = new Parallel::MapReduce (...)

The constructor accepts the following options:

MemCacheds (default: none)

A list reference to IP:port strings detailing how the memcached can be reached. You must specify at least one. If you have no memcached running, your only option is to use Parallel::MapReduce::Testing instead. That runs the whole thing locally.

Workers (default: none)

A list reference to IP addresses on which hosts the workers should be run. You can name one and the same IP address multiple times to rebalance the load.

For worker implementations which are not farmed out, the IP addresses do not count. But their number does.

WorkerClass (default: Parallel::MapReduce::Worker)

Which worker implementation to be used.

Methods

shutdown

$mri->shutdown

Especially when you use the SSH workers you should make sure that you terminate them properly. So better run this method if you do not want to have plenty of SSH sessions being left over.

mapreduce

$A = $mri->mapreduce ($map_coderef, $reduce_coderef, $B)

This method applies to hash (reference) $B the MR algorithm. You have to pass in CODE refs to the map and the reduce function. The result a reference to a hash.

mapreducer

$op = $mri->mapreducer ($map_coderef, $reduce_coderef)

This method returns a prefabricated mapreducer (see SYNOPSIS). You also have to pass in CODE refs to the map and the reduce function.

pipeline

$op = $mri->pipeline ($op1, $op2, ...)

This method takes a number of prefabricated mapreducers and pipelines them into one. That is returned.

NOTE: When a pipeline is executed the processor could be clever not to retrieve intermediate hashes. At the moment, though, this is still the case.

SEE ALSO

Parallel::MapReduce::Sequential, Parallel::MapReduce::Testing, Parallel::MapReduce::Worker, Log::Log4perl

COPYRIGHT AND LICENSE

Copyright 200[8] by Robert Barta, <drrho@cpan.org>

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.