NAME

Piper - Flexible, iterable pipeline engine with automatic batching

SYNOPSIS

use Piper;

my $pipeline = Piper->new(
    first_process => sub {
        my ($instance, $batch) = @_;
        $instance->emit( map { ... } @$batch );
    },
    second_processes => Piper->new(...),
    final_process => sub { ... },
)->init;

$pipeline->enqueue(@data);

while ($pipeline->isnt_exhausted) {
    my $item = $pipeline->dequeue;
    ...
}

DESCRIPTION

The software engineering concept known as a pipeline is a chain of processing segments, arranged such that the output of each segment is the input of the next.

Piper is a pipeline builder. It composes arbitrary processing segments into a single pipeline instance with the following features:

  • Pipeline instances are iterators, only processing data as needed.

  • Data is automatically processed in batches for each segment (with configurable batch sizes).

  • Built-in support exists for non-linear and/or recursive pipelines.

  • Processing segments are pluggable and reusable.

CONSTRUCTOR

new(@segments)

Create a container pipeline segment (parent) from the provided child @segments.

Additionally, a single hashref of attributes for the container/parent segment may be included as an argument to the constructor (anywhere in the argument list). See the "SEGMENT ATTRIBUTES" section for a description of attributes available for both parent and child segments.

Accepted segment types are as follows:

Piper object

Creates a sub-container of pipeline segments. There is no (explicit) limit to the number of nested containers a pipeline may contain.

Piper::Process object

See the "PROCESS HANDLER" section for a description of Piper::Process objects.

A coderef (which will be coerced into a Piper::Process object).
A hashref that can be coerced into a Piper::Process object.

In order to be considered a candidate for coercion, the hashref must contain (at a minimum) the 'handler' key.

Piper::Instance object

In this case, the associated Piper or Piper::Process object is extracted from the Piper::Instance object for use in the new pipeline segment.

See "INITIALIZATION" for a description of Piper::Instance objects.

A $label => $segment pair

For such pairs, the $segment can be any of the above segment types, and $label is a simple scalar which will be used as $segment's label.

If the $segment already has a label, $label will override it.

Constructor Example

my $pipe = Piper->new(
    \%main_opts,
    subpipe_label => Piper->new(
        first_handler => Piper::Process->new(sub { ... }),
        second_handler => sub { ... },
        third_handler => {
            handler => sub { ... },
        },
        another_subpipe => Piper->new(...),
        \%subpipe_opts,
    ),
    Piper::Process->new({
        label => 'another_handler',
        handler => sub { ... },
    }),
    sub {
        # An un-labeled handler
        ...
    },
    {
        label => 'final_handler',
        handler => sub { ... },
    },
);

INITIALIZATION

Piper segments were designed to be easily reusable. Prior to initialization, Piper and Piper::Process objects do not process data; they simply contain the blueprint for creating the pipeline. As such, blueprints for commonly-used pipeline segments can be stored in package libraries and imported wherever needed.

To create a functioning pipeline from one such blueprint, simply call the init method on the outermost segment. The init method returns a Piper::Instance object of the outermost segment, which is the realization of the pipeline design, and which contains Piper::Instance objects created from all its contained segments.

Initialization fuses the pipeline segments together, establishes the relationships between the segments, and initializes the dataflow infrastructure.

The init method may be chained from the constructor if the blueprint object is not needed:

my $instance = Piper->new(...)->init;

Any arguments passed to the init method will be cached and made available to each handler in the pipeline (see the "PROCESS HANDLER" section for full description of handlers). This is a great way to share a resource (such as a database handle) among process handlers.

my $pipe = Piper->new(
    query => sub {
        my ($instance, $batch, $dbh) = @_;
        $instance->emit(
            $dbh->do_query(@$batch)
        );
    },
    ...
);
my $instance = $pipe->init($dbh);

Instances are ready to accept data for processing:

$instance->enqueue(@data);
while ($instance->isnt_exhausted) {
    my $result = $instance->dequeue;
}

PROCESS HANDLER

Piper::Process objects have the same "SEGMENT ATTRIBUTES" as Piper objects, but have an additional required attribute known as its handler.

A process handler is the data-processing subroutine for the segment.

In its simplest form, the process handler takes input from the previous pipeline segment, processes it, and passes it on to the next segment; but handlers also have built-in support for non-linear and recursive dataflow (see "FLOW CONTROL").

The arguments provided to the handler subroutine are:

$instance

The instance (a Piper::Instance object) corresponding to the segment.

$batch

An arrayref of data items to process.

@args

Any arguments provided to the init method during the "INITIALIZATION" of the pipeline.

After processing a batch of data, the handler may pass the results to the next segment using the emit method called from the handler's $instance.

Example:

sub {
    my ($instance, $batch) = @_;
    $instance->emit( map { ... } @$batch );
}

FLOW CONTROL

Since Piper has built-in support for non-linear and/or recursive pipelines, a "PROCESS HANDLER" may send data to any other segment in the pipeline, including itself.

The following methods may be called from the $instance object passed as the first argument to a handler:

emit(@data)

Send @data to the next segment in the pipeline. If the instance is the last in the pipeline, emits to the drain, making the @data ready for dequeue.

recycle(@data)

Re-queue @data to the top of the current segment in an order such that dequeue(1) would subsequently return $data[0] and so forth.

injectAt($location, @data)

injectAfter($location, @data)

Send @data to the segment at or after the specified $location.

For each of the above methods, $location must be the label of a segment in the pipeline or a path-like representation of an hierarchy of labels.

For example, in the following pipeline, a few possible $location values include a, subpipe/b, or main/subpipe/c.

my $pipe = Piper->new(
    { label => 'main' },
    subpipe => Piper->new(
        a => sub { ... },
        b => sub { ... },
        c => sub { ... },
    ),
);

If a label is unique within the pipeline, only the label is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.

For example, in the following pipeline, searching for processA from the handler of processB would find main/pipeA/processA, not main/processA. So to reach main/processA from processB, the handler would need to search for main/processA.

my $pipe = Piper->new(
    { label => 'main' },
    pipeA => Piper->new(
        processA => sub { ... },
        processB => sub { ... },
    ),
    processA => sub { ... },
);

inject(@data)

If the segment has a parent, enqueues @data to its parent. Otherwise, enqueues @data to itself.

eject(@data)

If the segment has a parent, send @data to the drain of its parent. Otherwise, enqueues @data to the segment's drain.

SEGMENT ATTRIBUTES

All of the following attributes are available for both container (Piper) and processor (Piper::Process) segment types.

Each attribute is equipped with an accessor of the same name.

A star (*) indicates that the attribute is writable, and can be modified at runtime by passing a value as an argument to the method of the same name.

All attributes (except label) have an associated predicate method called has_$attribute which returns a boolean indicating whether the attribute has been set for the segment.

All writable attributes (indicated by *) can be cleared by passing an explicit undef to the writer method or by calling the appropriate clearer method called clear_$attribute.

All accessors, writers, predicates, and clearers are available for each segment before and after "INITIALIZATION".

allow

A coderef which can be used to subset the items which are allowed to be processed by the segment.

The coderef executes on each item attempting to queue to the segment. If it returns true, the item is queued. Otherwise, the item skips the segment and proceeds to the next adjacent segment.

Each item is localized to $_, and is also passed in as the first argument.

These example allow subroutines are equivalent:

# This segment only accepts digit inputs
allow => sub { /^\d+$/ }
allow => sub { $_ =~ /^\d+$/ }
allow => sub { $_[0] =~ /^\d+$/ }

*batch_size

The number of items to process at a time for the segment.

Once initialized (see "INITIALIZATION"), a segment inherits the batch_size of any existing parent(s) if not provided. If the segment has no parents, or if none of the parents have a batch_size defined, the default batch_size will be used. The default batch_size is 200, but can be configured in the import statement (see the "GLOBAL CONFIGURATION" section).

*debug

The debug level for the segment.

Once initialized (see "INITIALIZATION"), a segment inherits the debug level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable PIPER_DEBUG.

See the "LOGGING AND DEBUGGING" section for specifics about debug and verbosity levels.

*enabled

A boolean indicating that the segment is enabled and can accept items for processing.

Once initialized (see "INITIALIZATION"), a segment inherits this attribute from any existing parent(s). The default is true.

If a segment is disabled (enabled = 0), all items attempting to queue to the segment are forwarded to the next adjacent segment.

label

A label for the segment. If no label is provided, a globally unique ID will be used.

Labels are necessary for certain types of "FLOW CONTROL" (for example, injectAt or injectAfter). For pipelines that do not utilize "FLOW CONTROL" features, labels are primarily useful for "LOGGING AND DEBUGGING".

*verbose

The verbosity level for the segment.

Once initialized (see "INITIALIZATION"), a segment inherits the verbosity level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable PIPER_VERBOSE.

See the "LOGGING AND DEBUGGING" section for specifics about debug and verbosity levels.

INSTANCE ATTRIBUTES

The following attributes have read-only accessors (of the same name).

children

For container instances (made from Piper objects, not Piper::Process objects), holds an arrayref of the contained instance objects.

main

For any instance in the pipeline, this attribute holds a reference to the outermost container instance.

parent

For all instances in the pipeline except the outermost container (main), this attribute holds a reference to the instance's immediate container segment.

path

The full path to the instance, built as the concatenation of all the parent(s) labels and the instance's label, joined by /. Instances stringify to this attribute.

INSTANCE METHODS

Methods marked with a (*) should only be called from the outermost instance.

*dequeue([$num])

Remove at most $num (default 1) processed items from the end of the pipeline.

*enqueue(@data)

Queue @data for processing by the pipeline.

find_segment($location)

Find and return the segment instance according to $location, which can be a label or a path-like hierarchy of labels. See injectAfter for a detailed description of $location.

*flush

Process batches until there are no more items pending.

has_children

A boolean indicating whether the instance has any children.

has_parent

A boolean indicating whether the instance has a parent.

has_pending

Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.

*is_exhausted

Returns a boolean indicating whether there are any items left to process or dequeue.

*isnt_exhausted

Returns the opposite of is_exhausted.

next_segment

Returns the next adjacent segment from the calling segment. Returns undef for the outermost container.

pending

Returns the number of items that are queued at some level of the pipeline segment but have not completed processing.

*prepare([$num])

Process batches while data is still pending until at least $num (default 1) items are ready for dequeue.

ready

Returns the number of items that have finished processing and are ready for dequeue from the pipeline segment.

GLOBAL CONFIGURATION

The following global attributes are configurable from the Piper import statement.

Ex:
# Change the default batch_size to 50
use Piper batch_size => 50;

batch_size

The default batch size used by pipeline segments which do not have a locally defined batch_size and do not have a parent segment with a defined batch_size.

The batch_size attribute must be a positive integer.

The default batch_size is 200.

LOGGING AND DEBUGGING

Logging and debugging facilities are available upon "INITIALIZATION" of a pipeline.

Warnings and errors are issued regardless of debug and verbosity levels via carp and croak from the Carp module, and are therefore configurable with any of Carp's global options or environment variables.

Debugging and/or informational messages are printed to STDERR if debug and/verbosity levels have been set. There are three levels used by Piper for each of debug/verbose: 0, 1, or 2. The default is 0 (off).

Levels

Levels can be set by any of the following mechanisms: at construction of the Piper/Piper::Process objects, dynamically via the debug and verbose methods of segments, or with the environment variables PIPER_DEBUG and PIPER_VERBOSE.

Levels can be set local to specific segments. The default levels of a sub-segment are inherited from its parent.

Ex:
    # main                 verbose => 0 (default)
    # main/subpipe         verbose => 1
    # main/subpipe/normal  verbose => 1 (inherited)
    # main/subpipe/loud    verbose => 2
    # main/subpipe/quiet   verbose => 0

    my $pipe = Piper->new(
        { label => 'main' },
        subpipe => Piper->new(
            { verbose => 1 },
            normal => sub {...},
            loud => {
                verbose => 2,
                handler => sub {...},
            },
            quiet => {
                verbose => 0,
                handler => sub {...},
            },
        ),
    );

Levels set via the environment variables PIPER_DEBUG and PIPER_VERBOSE are global. If set, these environment variables override any and all settings defined in the source code.

Messages

All messages include information about the segment which called the logger.

Existing informational (verbose or debug > 0) messages describe data processing steps, such as noting when items are queueing or being processed by specific segments. Increasing level(s) 1> simply adds more detail to the printed messages.

Existing debug messages describe the decision actions of the pipeline engine itself. Examples include logging its search steps when locating a named segment or explaining how it chooses which batch to process. Increasing the debug level > 1 simply adds more detail to the printed messages.

Custom messaging

User-defined errors, warnings, and debug or informational messages can use the same logging system as Piper itself.

The first argument passed to a "PROCESS HANDLER" is the Piper::Instance object associated with that segment, which has the below-described methods available for logging, debugging, warning, or throwing errors.

In each of the below methods, the @items are optional and only printed if the verbosity level for the segment is > 1. They can be used to pass additional context or detail about the data being processed or which caused the message to print (for conditional messages).

The built-in messaging only uses debug/verbosity levels 1 and 2, but there are no explicit rules enforced on maximum debug/verbosity levels, so users may explicitly require higher levels for custom messages to heighten the required levels for any custom message.

ERROR($message, [@items])

Throws an error with $message via croak.

WARN($message, [@items])

Issues a warning with $message via carp.

INFO($message, [@items])

Prints an informational $message to STDERR if either the debug or verbosity level for the segment is > 0.

DEBUG($message, [@items])

Prints a debug $message to STDERR if the debug level for the segment is > 0.

Example:

my $pipe = Piper->new(
    messenger => sub {
        my ($instance, $batch) = @_;
        for my $data (@$batch) {
            if ($data->is_bad) {
                $instance->ERROR("Data <$data> is bad!");
            }
        }
        # User-heightened verbosity level
        $instance->INFO('Data all good!', @$batch)
            if $instance->verbose > 2;
        ...
    },
    ...
);

ACKNOWLEDGEMENTS

Much of the concept and API for this project was inspired by the work of Nathaniel Pierce.

Special thanks to Tim Heaney for his encouragement and mentorship.

VERSION

version 0.02

AUTHOR

Mary Ehlers <ehlers@cpan.org>

CONTRIBUTOR

Tim Heaney <oylenshpeegul@gmail.com>