NAME

Future::Workflow::Pipeline - a pipeline of processing stages

SYNOPSIS

  # 1: Make a pipeline
  my $pipeline = Future::Workflow::Pipeline->new;


  # 2: Add some stages to it

  # An async stage; e.g. perform an HTTP fetch
  my $ua = Net::Future::HTTP->new;
  $pipeline->append_stage_async( async sub ($url) {
     return await $ua->GET( $url );
  });

  # A synchronous (in-process) stage; e.g. some HTML parsing
  $pipeline->append_stage_sync( sub ($response) {
     my $dom = Mojo::DOM->new( $response->decoded_content );
     return $dom->at('div[id="main"]')->text;
  });

  # A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
  $pipeline->append_stage_detached( sub ($text) {
     my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);

     my $best; my $bestscore;
     while(my @words = $iter->next) {
        my $str = join "\0", @words;
        my $score = md5sum( $str );
        next if defined $bestscore and $score ge $bestscore;

        $best      = $str;
        $bestscore = $score;
     }

     return $best;
  });


  # 3: Give it an output

  # These are alternatives:

  # An asynchronous output
  my $dbh = Database::Async->new( ... );
  $pipeline->set_output_async( async sub ($best) {
     await $dbh->do('INSERT INTO Results VALUES (?)', $best);
  });

  # A synchronous output
  $pipeline->set_output_sync( sub ($best) {
     print "MD5 minimized sort order is:\n";
     print "  $_\n" for split m/\0/, $best;
  });


  # 4: Now start it running on some input values

  foreach my $url (slurp_lines("urls.txt")) {
     await $pipeline->push_input($url);
  }


  # 5: Wait for it all to finish
  await $pipeline->drain;

DESCRIPTION

Instances of this class implement a "pipeline", a sequence of data-processing stages. Each stage is represented by a function that is passed a single argument and should return a result. The pipeline itself stores a function that will be passed each eventual result.

Queueing

In front of every stage there exists a queue of pending items. If the first stage is currently busy when /push_input is called, the item is accepted into its queue instead. Items will be taken from the queue in the order they were pushed when the stage's work function finishes with prior items.

If the queue between stages is full, then items will remain pending in prior stages. Ultimately this back-pressure will make its way back to the /push_input method at the beginning of the pipeline.

CONSTRUCTOR

$pipeline = Future::Workflow::Pipeline->new;

The constructor takes no additional parameters.

METHODS

set_output

$pipeline->set_output( $code );

   await $code->( $result );

Sets the destination output for the pipeline. Each completed work item will be passed to the invoked function, which is expected to return a Future.

set_output_sync

$pipeline->set_output_sync( $code );

   $code->( $result );

Similar to "set_output", where the output function is called synchronously, returning when it has finished.

append_stage

$pipeline->append_stage( $code, %args );

   $result = await $code->( $item );

Appends a pipeline stage that is implemented by an asynchronous function. Each work item will be passed in by invoking the function, and it is expected to return a Future which will eventually yield the result of that stage.

The following optional named args are recognised:

concurrent => NUM

Allow this number of outstanding items concurrently.

max_queue => NUM

If defined, no more than this number of items can be enqueued. If undefined, no limit is applied.

This value can be zero, which means that any attempts to push more items will remain pending until the work function is free to deal with it; i.e. no queueing will be permitted.

on_failure => CODE
$on_failure->( $f )

Provides a callback event function for handling a failure thrown by the stage code. If not provided, the default behaviour is to print the failure message as a warning.

Note that this handler cannot turn a failure into a successful result or otherwise resume or change behaviour of the pipeline. For error-correction you will have to handle that inside the stage function code itself. This handler is purely the last stop of error handling, informing the user of an otherwise-unhandled error before ignoring it.

append_stage_sync

$pipeline->append_stage_sync( $code, %args );

   $result = $code->( $item );

Similar to "append_stage", where the stage function is called synchronously, returning its result immediately.

Because of this, the concurrent named parameter is not permitted.

push_input

await $pipeline->push_input( $item );

Adds a new work item into the pipeline, which will pass through each of the stages and eventually invoke the output function.

UNSOLVED QUESTIONS

  • Is each work item represented by some object that gets passed around?

    Can we store context, maybe in a hash or somesuch, that each stage can inspect and append more things into? It might be useful to remember at least the initial URLs by the time we generate the outputs

  • Tuning parameters. In particular, being able to at least set overall concurrency of async and detached stages, the detachment model of the detached stages (threads vs. forks), inter-stage buffering?

AUTHOR

Paul Evans <leonerd@leonerd.org.uk>