NAME
Future::Workflow::Pipeline
- a pipeline of processing stages
SYNOPSIS
# 1: Make a pipeline
my $pipeline = Future::Workflow::Pipeline->new;
# 2: Add some stages to it
# An async stage; e.g. perform an HTTP fetch
my $ua = Net::Future::HTTP->new;
$pipeline->append_stage_async( async sub ($url) {
return await $ua->GET( $url );
});
# A synchronous (in-process) stage; e.g. some HTML parsing
$pipeline->append_stage_sync( sub ($response) {
my $dom = Mojo::DOM->new( $response->decoded_content );
return $dom->at('div[id="main"]')->text;
});
# A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
$pipeline->append_stage_detached( sub ($text) {
my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);
my $best; my $bestscore;
while(my @words = $iter->next) {
my $str = join "\0", @words;
my $score = md5sum( $str );
next if defined $bestscore and $score ge $bestscore;
$best = $str;
$bestscore = $score;
}
return $best;
});
# 3: Give it an output
# These are alternatives:
# An asynchronous output
my $dbh = Database::Async->new( ... );
$pipeline->set_output_async( async sub ($best) {
await $dbh->do('INSERT INTO Results VALUES (?)', $best);
});
# A synchronous output
$pipeline->set_output_sync( sub ($best) {
print "MD5 minimized sort order is:\n";
print " $_\n" for split m/\0/, $best;
});
# 4: Now start it running on some input values
foreach my $url (slurp_lines("urls.txt")) {
await $pipeline->push_input($url);
}
# 5: Wait for it all to finish
await $pipeline->drain;
DESCRIPTION
Instances of this class implement a "pipeline", a sequence of data-processing stages. Each stage is represented by a function that is passed a single argument and should return a result. The pipeline itself stores a function that will be passed each eventual result.
Queueing
In front of every stage there exists a queue of pending items. If the first stage is currently busy when /push_input
is called, the item is accepted into its queue instead. Items will be taken from the queue in the order they were pushed when the stage's work function finishes with prior items.
If the queue between stages is full, then items will remain pending in prior stages. Ultimately this back-pressure will make its way back to the /push_input
method at the beginning of the pipeline.
CONSTRUCTOR
$pipeline = Future::Workflow::Pipeline->new;
The constructor takes no additional parameters.
METHODS
set_output
$pipeline->set_output( $code );
await $code->( $result );
Sets the destination output for the pipeline. Each completed work item will be passed to the invoked function, which is expected to return a Future
.
set_output_sync
$pipeline->set_output_sync( $code );
$code->( $result );
Similar to "set_output", where the output function is called synchronously, returning when it has finished.
append_stage
$pipeline->append_stage( $code, %args );
$result = await $code->( $item );
Appends a pipeline stage that is implemented by an asynchronous function. Each work item will be passed in by invoking the function, and it is expected to return a Future
which will eventually yield the result of that stage.
The following optional named args are recognised:
- concurrent => NUM
-
Allow this number of outstanding items concurrently.
- max_queue => NUM
-
If defined, no more than this number of items can be enqueued. If undefined, no limit is applied.
This value can be zero, which means that any attempts to push more items will remain pending until the work function is free to deal with it; i.e. no queueing will be permitted.
- on_failure => CODE
-
$on_failure->( $f )
Provides a callback event function for handling a failure thrown by the stage code. If not provided, the default behaviour is to print the failure message as a warning.
Note that this handler cannot turn a failure into a successful result or otherwise resume or change behaviour of the pipeline. For error-correction you will have to handle that inside the stage function code itself. This handler is purely the last stop of error handling, informing the user of an otherwise-unhandled error before ignoring it.
append_stage_sync
$pipeline->append_stage_sync( $code, %args );
$result = $code->( $item );
Similar to "append_stage", where the stage function is called synchronously, returning its result immediately.
Because of this, the concurrent
named parameter is not permitted.
push_input
await $pipeline->push_input( $item );
Adds a new work item into the pipeline, which will pass through each of the stages and eventually invoke the output function.
UNSOLVED QUESTIONS
Is each work item represented by some object that gets passed around?
Can we store context, maybe in a hash or somesuch, that each stage can inspect and append more things into? It might be useful to remember at least the initial URLs by the time we generate the outputs
Tuning parameters. In particular, being able to at least set overall concurrency of
async
anddetached
stages, the detachment model of thedetached
stages (threads vs. forks), inter-stage buffering?
AUTHOR
Paul Evans <leonerd@leonerd.org.uk>