NAME

IPC::Pipeline - Create a shell-like pipeline of many running commands

SYNOPSIS

use IPC::Pipeline;

my @pids = pipeline(\*FIRST_CLD_IN, \*LAST_CLD_OUT, \*CHILDREN_ERR,
    [qw/command1/],
    [qw/command2/],
    ...
    [qw/commandN/]
);

... do stuff ...

my @statuses = map {
    waitpid($_, 0);
    $? >> 8;
} @pids;

DESCRIPTION

Similar in calling convention to IPC::Open3, pipeline() spawns N children, connecting the first child to the FIRST_CLD_IN handle, the final child to LAST_CLD_OUT, and each child to a shared standard error handle, CHILDREN_ERR. Each subsequent command specified causes a new process to be fork()ed. Each process is linked to the last with a file descriptor pair created by pipe(), using dup2() to chain each process' standard input to the last standard output. The commands specified in the anonymous arrays passed are started in the child processes with a simple exec() call.

Like IPC::Open3, pipeline() returns immediately after spawning the process chain, though differing slightly in that the IDs of each process is returned in order of specification in a list when called in array context. When called in scalar context, only the ID of the first child process spawned is returned.

Also like IPC::Open3, one may use select() to multiplex reading and writing to each of the descriptors returned by pipeline(), preferably with non-buffered sysread() and syswrite() calls. Using this to handle reading standard output and error from the children is ideal, as blocking and buffering considerations are alleviated.

If any child process dies prematurely, or any of the piped file descriptors are closed for any reason, the calling process inherits the kernel behavior of receiving a SIGPIPE, which requires the installation of a signal handler for appropriate recovery.

EXAMPLE ONE - OUTPUT ONLY

The following example implements a quick and dirty, but relatively sane tar and gzip solution. For proper error handling from any of the children, use select() to multiplex the output and error streams.

use IPC::Pipeline;

my @paths = qw(/some /random /locations);

my @pids = pipeline(my ($in, $out), undef,
    [qw/tar pcf -/, @paths],
    [qw/gzip/]
);

open(my $fh, '>', 'file.tar.gz');
close($in);

while (my $len = sysread($out, my $buf, 512)) {
    syswrite($fh, $buf, $len);
}

close($fh);
close($out);

#
# We may need to wait for the children to die in some extraordinary
# circumstances.
#
foreach my $pid (@pids) {
    waitpid($pid, 1);
}

EXAMPLE ONE - INPUT AND OUTPUT

The following solution implements a true I/O stream filter as provided by any Unix-style shell.

use IPC::Pipeline;

my @pids = pipeline(my ($in, $out), undef,
    [qw/tr A-Ma-mN-Zn-z N-Zn-zA-Ma-m/],
    [qw/cut -d/, ':', qw/-f 2/]
);

my @records = qw(
    foo:bar:baz
    eins:zwei:drei
    cats:dogs:rabbits
);

foreach my $record (@records) {
    print $in $record ."\n";
}

close($in);

while (my $len = sysread($out, my $buf, 512)) {
    syswrite(STDOUT, $buf, $len);
}

close($out);

foreach my $pid (@pids) {
    waitpid($pid, 1);
}

SEE ALSO

  • IPC::Open3

  • IPC::Run, for a Swiss Army knife of Unix I/O gizmos

    It should be mentioned that mst's IO::Pipeline has very little in common with IPC::Pipeline.

COPYRIGHT

Copyright 2010, wrath@cpan.org. Released under the terms of the MIT license.