NAME
Piper - Flexible, iterable pipeline engine with automatic batching
SYNOPSIS
use Piper;
my $pipeline = Piper->new(
first_process => sub {
my ($instance, $batch) = @_;
$instance->emit( map { ... } @$batch );
},
second_processes => Piper->new(...),
final_process => sub { ... },
)->init;
$pipeline->enqueue(@data);
while ($pipeline->isnt_exhausted) {
my $item = $pipeline->dequeue;
...
}
DESCRIPTION
The software engineering concept known as a pipeline is a chain of processing segments, arranged such that the output of each segment is the input of the next.
Piper is a pipeline builder. It composes arbitrary processing segments into a single pipeline instance with the following features:
Pipeline instances are iterators, only processing data as needed.
Data is automatically processed in batches for each segment (with configurable batch sizes).
Built-in support exists for non-linear and/or recursive pipelines.
Processing segments are pluggable and reusable.
CONSTRUCTOR
new(@segments)
Create a container pipeline segment (parent) from the provided child @segments
.
Additionally, a single hashref of attributes for the container/parent segment may be included as an argument to the constructor (anywhere in the argument list). See the "SEGMENT ATTRIBUTES" section for a description of attributes available for both parent and child segments.
Accepted segment types are as follows:
- Piper object
-
Creates a sub-container of pipeline segments. There is no (explicit) limit to the number of nested containers a pipeline may contain.
- Piper::Process object
-
See the "PROCESS HANDLER" section for a description of Piper::Process objects.
- A coderef (which will be coerced into a Piper::Process object).
- A hashref that can be coerced into a Piper::Process object.
-
In order to be considered a candidate for coercion, the hashref must contain (at a minimum) the 'handler' key.
- Piper::Instance object
-
In this case, the associated Piper or Piper::Process object is extracted from the Piper::Instance object for use in the new pipeline segment.
See "INITIALIZATION" for a description of Piper::Instance objects.
- A
$label => $segment
pair -
For such pairs, the
$segment
can be any of the above segment types, and$label
is a simple scalar which will be used as$segment
's label.If the
$segment
already has a label,$label
will override it.
Constructor Example
my $pipe = Piper->new(
\%main_opts,
subpipe_label => Piper->new(
first_handler => Piper::Process->new(sub { ... }),
second_handler => sub { ... },
third_handler => {
handler => sub { ... },
},
another_subpipe => Piper->new(...),
\%subpipe_opts,
),
Piper::Process->new({
label => 'another_handler',
handler => sub { ... },
}),
sub {
# An un-labeled handler
...
},
{
label => 'final_handler',
handler => sub { ... },
},
);
INITIALIZATION
Piper segments were designed to be easily reusable. Prior to initialization, Piper and Piper::Process objects do not process data; they simply contain the blueprint for creating the pipeline. As such, blueprints for commonly-used pipeline segments can be stored in package libraries and imported wherever needed.
To create a functioning pipeline from one such blueprint, simply call the init
method on the outermost segment. The init
method returns a Piper::Instance object of the outermost segment, which is the realization of the pipeline design, and which contains Piper::Instance objects created from all its contained segments.
Initialization fuses the pipeline segments together, establishes the relationships between the segments, and initializes the dataflow infrastructure.
The init
method may be chained from the constructor if the blueprint object is not needed:
my $instance = Piper->new(...)->init;
Any arguments passed to the init
method will be cached and made available to each handler in the pipeline (see the "PROCESS HANDLER" section for full description of handlers). This is a great way to share a resource (such as a database handle) among process handlers.
my $pipe = Piper->new(
query => sub {
my ($instance, $batch, $dbh) = @_;
$instance->emit(
$dbh->do_query(@$batch)
);
},
...
);
my $instance = $pipe->init($dbh);
Instances are ready to accept data for processing:
$instance->enqueue(@data);
while ($instance->isnt_exhausted) {
my $result = $instance->dequeue;
}
PROCESS HANDLER
Piper::Process objects have the same "SEGMENT ATTRIBUTES" as Piper objects, but have an additional required attribute known as its handler
.
A process handler
is the data-processing subroutine for the segment.
In its simplest form, the process handler takes input from the previous pipeline segment, processes it, and passes it on to the next segment; but handlers also have built-in support for non-linear and recursive dataflow (see "FLOW CONTROL").
The arguments provided to the handler
subroutine are:
$instance
-
The instance (a Piper::Instance object) corresponding to the segment.
$batch
-
An arrayref of data items to process.
@args
-
Any arguments provided to the
init
method during the "INITIALIZATION" of the pipeline.
After processing a batch of data, the handler
may pass the results to the next segment using the emit
method called from the handler's $instance
.
Example:
sub {
my ($instance, $batch) = @_;
$instance->emit( map { ... } @$batch );
}
FLOW CONTROL
Since Piper has built-in support for non-linear and/or recursive pipelines, a "PROCESS HANDLER" may send data to any other segment in the pipeline, including itself.
The following methods may be called from the $instance
object passed as the first argument to a handler
:
emit(@data)
Send @data
to the next segment in the pipeline. If the instance is the last in the pipeline, emits to the drain, making the @data
ready for dequeue
.
recycle(@data)
Re-queue @data
to the top of the current segment in an order such that dequeue(1)
would subsequently return $data[0]
and so forth.
injectAt($location, @data)
injectAfter($location, @data)
Send @data
to the segment at or after the specified $location
.
For each of the above methods, $location
must be the label of a segment in the pipeline or a path-like representation of an hierarchy of labels.
For example, in the following pipeline, a few possible $location
values include a
, subpipe/b
, or main/subpipe/c
.
my $pipe = Piper->new(
{ label => 'main' },
subpipe => Piper->new(
a => sub { ... },
b => sub { ... },
c => sub { ... },
),
);
If a label is unique within the pipeline, only the label is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.
For example, in the following pipeline, searching for processA
from the handler of processB
would find main/pipeA/processA
, not main/processA
. So to reach main/processA
from processB
, the handler would need to search for main/processA
.
my $pipe = Piper->new(
{ label => 'main' },
pipeA => Piper->new(
processA => sub { ... },
processB => sub { ... },
),
processA => sub { ... },
);
inject(@data)
If the segment has a parent, enqueues @data
to its parent. Otherwise, enqueues @data
to itself.
eject(@data)
If the segment has a parent, send @data
to the drain of its parent. Otherwise, enqueues @data
to the segment's drain.
SEGMENT ATTRIBUTES
All of the following attributes are available for both container (Piper) and processor (Piper::Process) segment types.
Each attribute is equipped with an accessor of the same name.
A star (*) indicates that the attribute is writable, and can be modified at runtime by passing a value as an argument to the method of the same name.
All attributes (except label
) have an associated predicate method called has_$attribute
which returns a boolean indicating whether the attribute has been set for the segment.
All writable attributes (indicated by *) can be cleared by passing an explicit undef
to the writer method or by calling the appropriate clearer method called clear_$attribute
.
All accessors, writers, predicates, and clearers are available for each segment before and after "INITIALIZATION".
allow
A coderef which can be used to subset the items which are allowed to be processed by the segment.
The coderef executes on each item attempting to queue to the segment. If it returns true, the item is queued. Otherwise, the item skips the segment and proceeds to the next adjacent segment.
Each item is localized to $_
, and is also passed in as the first argument.
These example allow
subroutines are equivalent:
# This segment only accepts digit inputs
allow => sub { /^\d+$/ }
allow => sub { $_ =~ /^\d+$/ }
allow => sub { $_[0] =~ /^\d+$/ }
*batch_size
The number of items to process at a time for the segment.
Once initialized (see "INITIALIZATION"), a segment inherits the batch_size
of any existing parent(s) if not provided. If the segment has no parents, or if none of the parents have a batch_size
defined, the default batch_size
will be used. The default batch_size
is 200, but can be configured in the import statement (see the "GLOBAL CONFIGURATION" section).
*debug
The debug level for the segment.
Once initialized (see "INITIALIZATION"), a segment inherits the debug level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable PIPER_DEBUG
.
See the "LOGGING AND DEBUGGING" section for specifics about debug and verbosity levels.
*enabled
A boolean indicating that the segment is enabled and can accept items for processing.
Once initialized (see "INITIALIZATION"), a segment inherits this attribute from any existing parent(s). The default is true.
If a segment is disabled (enabled = 0
), all items attempting to queue to the segment are forwarded to the next adjacent segment.
label
A label for the segment. If no label is provided, a globally unique ID will be used.
Labels are necessary for certain types of "FLOW CONTROL" (for example, injectAt or injectAfter). For pipelines that do not utilize "FLOW CONTROL" features, labels are primarily useful for "LOGGING AND DEBUGGING".
*verbose
The verbosity level for the segment.
Once initialized (see "INITIALIZATION"), a segment inherits the verbosity level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable PIPER_VERBOSE
.
See the "LOGGING AND DEBUGGING" section for specifics about debug and verbosity levels.
INSTANCE ATTRIBUTES
The following attributes have read-only accessors (of the same name).
children
For container instances (made from Piper objects, not Piper::Process objects), holds an arrayref of the contained instance objects.
main
For any instance in the pipeline, this attribute holds a reference to the outermost container instance.
parent
For all instances in the pipeline except the outermost container (main
), this attribute holds a reference to the instance's immediate container segment.
path
The full path to the instance, built as the concatenation of all the parent(s) labels and the instance's label, joined by /
. Instances stringify to this attribute.
INSTANCE METHODS
Methods marked with a (*) should only be called from the outermost instance.
*dequeue([$num])
Remove at most $num
(default 1) processed items from the end of the pipeline.
*enqueue(@data)
Queue @data
for processing by the pipeline.
find_segment($location)
Find and return the segment instance according to $location
, which can be a label or a path-like hierarchy of labels. See injectAfter for a detailed description of $location
.
*flush
Process batches until there are no more items pending.
has_children
A boolean indicating whether the instance has any children.
has_parent
A boolean indicating whether the instance has a parent.
has_pending
Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.
*is_exhausted
Returns a boolean indicating whether there are any items left to process or dequeue.
*isnt_exhausted
Returns the opposite of is_exhausted
.
next_segment
Returns the next adjacent segment from the calling segment. Returns undef for the outermost container.
pending
Returns the number of items that are queued at some level of the pipeline segment but have not completed processing.
*prepare([$num])
Process batches while data is still pending
until at least $num
(default 1) items are ready
for dequeue
.
ready
Returns the number of items that have finished processing and are ready for dequeue
from the pipeline segment.
GLOBAL CONFIGURATION
The following global attributes are configurable from the Piper import statement.
Ex:
# Change the default batch_size to 50
use Piper batch_size => 50;
batch_size
The default batch size used by pipeline segments which do not have a locally defined batch_size
and do not have a parent segment with a defined batch_size
.
The batch_size
attribute must be a positive integer.
The default batch_size
is 200.
LOGGING AND DEBUGGING
Logging and debugging facilities are available upon "INITIALIZATION" of a pipeline.
Warnings and errors are issued regardless of debug and verbosity levels via carp
and croak
from the Carp module, and are therefore configurable with any of Carp's global options or environment variables.
Debugging and/or informational messages are printed to STDERR if debug and/verbosity levels have been set. There are three levels used by Piper for each of debug
/verbose
: 0, 1, or 2. The default is 0 (off).
Levels
Levels can be set by any of the following mechanisms: at construction of the Piper/Piper::Process objects, dynamically via the debug
and verbose
methods of segments, or with the environment variables PIPER_DEBUG
and PIPER_VERBOSE
.
Levels can be set local to specific segments. The default levels of a sub-segment are inherited from its parent.
Ex:
# main verbose => 0 (default)
# main/subpipe verbose => 1
# main/subpipe/normal verbose => 1 (inherited)
# main/subpipe/loud verbose => 2
# main/subpipe/quiet verbose => 0
my $pipe = Piper->new(
{ label => 'main' },
subpipe => Piper->new(
{ verbose => 1 },
normal => sub {...},
loud => {
verbose => 2,
handler => sub {...},
},
quiet => {
verbose => 0,
handler => sub {...},
},
),
);
Levels set via the environment variables PIPER_DEBUG
and PIPER_VERBOSE
are global. If set, these environment variables override any and all settings defined in the source code.
Messages
All messages include information about the segment which called the logger.
Existing informational (verbose
or debug
> 0) messages describe data processing steps, such as noting when items are queueing or being processed by specific segments. Increasing level(s) 1> simply adds more detail to the printed messages.
Existing debug messages describe the decision actions of the pipeline engine itself. Examples include logging its search steps when locating a named segment or explaining how it chooses which batch to process. Increasing the debug level > 1 simply adds more detail to the printed messages.
Custom messaging
User-defined errors, warnings, and debug or informational messages can use the same logging system as Piper itself.
The first argument passed to a "PROCESS HANDLER" is the Piper::Instance object associated with that segment, which has the below-described methods available for logging, debugging, warning, or throwing errors.
In each of the below methods, the @items
are optional and only printed if the verbosity level for the segment is > 1. They can be used to pass additional context or detail about the data being processed or which caused the message to print (for conditional messages).
The built-in messaging only uses debug/verbosity levels 1 and 2, but there are no explicit rules enforced on maximum debug/verbosity levels, so users may explicitly require higher levels for custom messages to heighten the required levels for any custom message.
ERROR($message, [@items])
Throws an error with $message
via croak
.
WARN($message, [@items])
Issues a warning with $message
via carp
.
INFO($message, [@items])
Prints an informational $message
to STDERR if either the debug or verbosity level for the segment is > 0.
DEBUG($message, [@items])
Prints a debug $message
to STDERR if the debug level for the segment is > 0.
Example:
my $pipe = Piper->new(
messenger => sub {
my ($instance, $batch) = @_;
for my $data (@$batch) {
if ($data->is_bad) {
$instance->ERROR("Data <$data> is bad!");
}
}
# User-heightened verbosity level
$instance->INFO('Data all good!', @$batch)
if $instance->verbose > 2;
...
},
...
);
ACKNOWLEDGEMENTS
Much of the concept and API for this project was inspired by the work of Nathaniel Pierce.
Special thanks to Tim Heaney for his encouragement and mentorship.
VERSION
version 0.03
AUTHOR
Mary Ehlers <ehlers@cpan.org>
CONTRIBUTOR
Tim Heaney <oylenshpeegul@gmail.com>
COPYRIGHT AND LICENSE
This software is Copyright (c) 2017 by Mary Ehlers.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004