NAME
IO::Lambda - non-blocking I/O as lambda calculus
SYNOPSIS
The code below demonstrates execution of parallel HTTP requests
use strict;
use IO::Lambda qw(:lambda :func);
use IO::Socket::INET;
# this function creates a new lambda object
# associated with one socket, and fetches a single URL
sub http
{
my $host = shift;
# Simple HTTP functions by first sending request to the remote, and
# then waiting for the response. This sets up a new lambda object with
# attached one of many closures that process sequentially
return lambda {
# create a socket, and issue a tcp connect
my $socket = IO::Socket::INET-> new(
PeerAddr => $host,
PeerPort => 80
);
# Wait until socket become writable. Parameters to writable()
# are passed using context(). This association is remembered
# within the engine.
context $socket;
# writeable sets up a possible event to monitor, when
# $socket is writeable, execute the closure.
writable {
# The engine discovered we can write, so send the request
print $socket "GET /index.html HTTP/1.0\r\n\r\n";
# This variable needs to stay shared across
# multiple invocations of our readable closure, so
# it needs to be outside that closure. Here, it collects
# whatever the remote returns
my $buf = '';
# readable registers another event to monitor -
# that $socket is readable. Note that we do not
# need to set the context again because when we get
# here, the engine knows what context this command
# took place in, and assumes the same context.
# Also note that socket won't be awaited for writable events
# anymore, and this code won't be executed for this $socket.
readable {
# This closure is executed when we can read.
# Read from the socket. sysread() returns number of
# bytes read. Zero means EOF, and undef means error, so
# we stop on these conditions.
# If we return without registering a follow-up
# handler, this return will be processed as the
# end of this sequence of events for whoever is
# waiting on us.
return $buf unless
sysread( $socket, $buf, 1024, length($buf));
# We're not done so we need to do this again.
# Note that the engine knows that it just
# called this closure because $socket was
# readable, so it can infer that it is supposed
# to set up a callback that will call this
# closure when $socket is next readable.
again;
}}}
}
# Fire up a single lambda and wait until it completes.
print http('www.perl.com')-> wait;
# Fire up a lambda that waits for two http requests in parallel.
# tails() can wait for more than one lambda
my @hosts = ('www.perl.com', 'www.google.com');
lambda {
context map { http($_) } @hosts;
# tails() asynchronously waits until all lambdas in the context
# are finished.
tails { print @_ }
}-> wait;
# crawl for all urls in parallel, but keep 10 parallel connections max
print par(10)-> wait(map { http($_) } @hosts);
# crawl for all urls sequentially
print mapcar( curry { http(shift) })-> wait(@hosts);
Note: io
and lambda
are synonyms - I personally prefer lambda
but some find the word slightly inappropriate, hence io
. See however "Higher-order functions" to see why it is more lambda
than io
.
DESCRIPTION
This module is another attempt to fight the horrors of non-blocking I/O. It tries to bring back the simplicity of the declarative programming style, that is only available when one employs threads, coroutines, or co-processes. Usually coding non-blocking I/O for single process, single thread programs requires construction of state machines, often fairly complex, which fact doesn't help the code clarity, and is the reason why the asynchronous I/O programming is often considered 'messy'. Similar to the concept of monads in functional languages, that enforce a certain order of execution over generally orderless functions, IO::Lambda
allows writing I/O callbacks in a style that resembles the good old sequential, declarative programming.
The manual begins with code examples, then proceeds to explaining basic assumptions, then finally gets to the complex concepts, where the real fun begins. You can skip directly there ("Stream IO", "Higher-order functions"), where the functional style mixes with I/O. If, on the contrary, you are intimidated by the module's ambitions, you can skip to "Simple use" for a more gentle introduction. Those, who are interested how the module is different from the other I/O frameworks, please continue reading.
Simple use
This section is for those who don't need all of the module's powerful machinery. Simple callback-driven programming examples show how to use the module for unsophisticated tasks, using concepts similar to the other I/O frameworks. It is possible to use the module on this level only, however one must be aware that by doing so, the real power of the higher-order abstraction is not used.
IO::Lambda
, like all I/O multiplexing libraries, provides functions for registering callbacks, that in turn are called when a timeout occurs, or when a file handle is ready for reading and/or writing. See below code examples that demonstrate how to program on this level of abstraction.
- Combination of two timeouts and an IO_READ event
-
use IO::Lambda qw(:constants); my $obj = IO::Lambda-> new; # Either 3 or time + 3 will do. See "Time" section for more info $obj-> watch_timer( 3, sub { print "I've slept 3 seconds!\n" }); # I/O flags is a combination of IO_READ, IO_WRITE, and IO_EXCEPTION. # Timeout is either 5 or time + 5, too. $obj-> watch_io( IO_READ, \*STDIN, 5, sub { my ( $self, $ok) = @_; print $ok ? "stdin is readable!\n" : "stdin is not readable within 5 seconds\n"; }); # main event loop is stopped when there are no lambdas and no # pending events IO::Lambda::run;
- Waiting for another lambda to complete
-
use IO::Lambda; my $a = IO::Lambda-> new; $a-> watch_timer( 3, sub { print "I've slept 3 seconds!\n" }); my $b = IO::Lambda-> new; # A lambda can wait for more than one event or lambda. # A lambda can be awaited by more than one lambda. $b-> watch_lambda( $a, sub { print "lambda #1 is finished!\n"}); IO::Lambda::run;
Example: reading lines from a filehandle
Given $filehandle
is non-blocking, the following code creates a lambda object (later, simply a lambda) that reads from the handle until EOF or an error occured. Here, getline
(see "Stream IO" below) constructs a lambda that reads a single line from a filehandle.
use IO::Lambda qw(:all);
sub my_reader
{
my $filehandle = shift;
lambda {
context getline, $filehandle, \(my $buf = '');
tail {
my ( $string, $error) = @_;
if ( $error) {
warn "error: $error\n";
} else {
print $string;
return again;
}
}}
}
Assume we have two socket connections, and sockets are non-blocking - read from both of them in parallel. The following code creates a lambda that reads from two readers:
sub my_reader_all
{
my @filehandles = @_;
lambda {
context map { my_reader($_) } @filehandles;
tails { print "all is finished\n" };
}
}
my_reader_all( $socket1, $socket2)-> wait;
Non-blocking HTTP client
Given a socket, create a lambda that implements the HTTP protocol
use IO::Lambda qw(:all);
use IO::Socket;
use HTTP::Request;
sub talk
{
my $req = shift;
my $socket = IO::Socket::INET-> new( PeerAddr => 'www.perl.com', PeerPort => 80);
lambda {
context $socket;
writable {
# connected
print $socket "GET ", $req-> uri, "\r\n\r\n";
my $buf = '';
readable {
sysread $socket, $buf, 1024, length($buf) or return $buf;
again; # wait for reading and re-do the block
}
}
}
}
Connect and talk to the remote
$request = HTTP::Request-> new( GET => 'http://www.perl.com');
my $q = talk( $request );
print $q-> wait; # will print content of $buf
Connect two parallel connections: by explicitly waiting for each
$q = lambda {
context talk($request);
tail { print shift };
context talk($request2);
tail { print shift };
};
$q-> wait;
Connect two parallel connections: by waiting for all
$q = lambda {
context talk($request1), talk($request2);
tails { print for @_ };
};
$q-> wait;
Teach our simple http request to redirect by wrapping talk(). talk_redirect() will have exactly the same properties as talk() does
sub talk_redirect
{
my $req = shift;
lambda {
context talk( $req);
tail {
my $res = HTTP::Response-> parse( shift );
return $res unless $res-> code == 302;
$req-> uri( $res-> uri);
context talk( $req);
again;
}
}
}
Full example code
use strict;
use IO::Lambda qw(:lambda);
use IO::Socket::INET;
sub get
{
my ( $socket, $url) = @_;
lambda {
context $socket;
writable {
print $socket "GET $url HTTP/1.0\r\n\r\n";
my $buf = '';
readable {
my $n = sysread( $socket, $buf, 1024, length($buf));
return "read error:$!" unless defined $n;
return $buf unless $n;
again;
}}}
}
sub get_parallel
{
my @hosts = @_;
lambda {
context map { get(
IO::Socket::INET-> new(
PeerAddr => $_,
PeerPort => 80
), '/index.html') } @hosts;
tails {
join("\n\n\n", @_ )
}
}
}
print get_parallel('www.perl.com', 'www.google.com')-> wait;
See tests and additional examples in directory eg/
for more information.
API
Events and states
A lambda is an IO::Lambda
object, that waits for I/O and timeout events, and for events generated when other lambdas are completed. On each such event a callback is executed. The result of the execution is saved, and passed on to the next callback, when the next event arrives.
Life cycle of a lambda goes through three modes: passive, waiting, and stopped. A lambda that is just created, or was later reset with reset
call, is in the passive state. When the lambda gets started, the only executed code will be the callback associated with the lambda:
$q = lambda { print "hello world!\n" };
# not printed anything yet
$q-> wait; # <- here it will
Lambdas are usually not started explicitly. Usually, the function that can wait for a lambda, starts it too. wait
, the synchronous waiter, and tail
/tails
, the asynchronous ones, start passive lambdas when called. A lambda is finished when there are no more events to listen to. The lambda in the example above will finish right after print
statement.
Lambda can listen to events by calling conditions, that internally subscribe the lambda object to the corresponding file handles, timers, and other lambdas. Most of the expressive power of IO::Lambda
lies in the conditions, such as readable
, writable
, timeout
. Conditions are different from normal perl subroutines in the way how they receive their parameters. The only parameter they receive in the normal way, is the associated callback, while all other parameters are passed to it through the alternate stack, by the explicit context
call.
In the example below, lambda watches for file handle readability:
$q = lambda {
context \*SOCKET;
readable { print "I'm readable!\n"; }
# here is nothing printed yet
};
# and here is nothing printed yet
Such lambda, when started, will switch to the waiting state, which means that it will be waiting for the socket. The lambda will finish only after the callback associated with readable
condition is called. Of course, new event listeners can be created inside all callbacks, on each state. This fact constitutes another large benefit of IO::Lambda
, as it allows to program FSMs dynamically.
The new event listeners can be created either by explicitly calling condition, or by restarting the last condition with the again
call. For example, code
readable {
print 1;
again if int rand(2)
}
prints indeterminable number of ones.
Contexts
All callbacks associated with a lambda object (further on, merely lambda) execute in one, private context, also associated to the lambda. The context here means that all conditions register callbacks on an implicitly given lambda object, and keep the passed parameters on the context stack. The fact that the context is preserved between states, helps building terser code with series of IO calls:
context \*SOCKET;
writable {
readable {
}}
is actually the shorter form for
context \*SOCKET;
writable {
context \*SOCKET; # <-- context here is retained from one frame up
readable {
}}
And as the context is bound to the current closure, the current lambda object is too, in this
property. The code above is actually
my $self = this;
context \*SOCKET;
writable {
this $self; # <-- object reference is retained here
context \*SOCKET;
readable {
}}
this
can be used if more than one lambda needs to be accessed. In which case,
this $object;
context @context;
is the same as
this $object, @context;
which means that explicitly setting this
will always clear the context.
Data and execution flow
A lambda is initially called with some arguments passed from the outside. These arguments can be stored using the call
method; wait
and tail
also issue call
internally, thus replacing any previous data stored by call
. Inside the lambda these arguments are available as @_
.
Whatever is returned by a condition callback (including the lambda
condition itself), will be passed further on as @_
to the next callback, or to the outside, if the lambda is finished. The result of the finished lambda is available by peek
method, that returns either all array of data available in the array context, or first item in the array otherwise. wait
returns the same data as peek
does.
When more than one lambda watches for another lambda, the latter will get its last callback results passed to all the watchers. However, when a lambda creates more than one state that derive from the current state, a forking behaviour of sorts, the latest stored results gets overwritten by the first executed callback, so constructions such as
readable { 1 + shift };
writable { 2 + shift };
...
wait(0)
will eventually return 3, but whether it will be 1+2 or 2+1, is undefined.
wait
is not the only function that synchronises input and output data. wait_for_all
method waits for all lambdas, including the caller, to finish. It returns collected results of all the objects in a single list. wait_for_any
method waits for at least one lambda, from the list of passed lambdas (again, including the caller), to finish. It returns list of finished objects as soon as possible.
Time
Timers and I/O timeouts can be given not only in the timeout values, as it usually is in event libraries, but also as deadlines in (fractional) seconds since epoch. This decision, strange at first sight, actually helps a lot when a total execution time is to be tracked. For example, the following code reads as many bytes as possible from a socket within 5 seconds:
lambda {
my $buf = '';
context $socket, time + 5;
readable {
if ( shift ) {
return again if sysread $socket, $buf, 1024, length($buf);
} else {
print "oops! a timeout\n";
}
$buf;
}
};
Rewriting the same code with readable
semantics that accepts time as a timeout instead, would be not that elegant:
lambda {
my $buf = '';
my $time_left = 5;
my $now = time;
context $socket, $time_left;
readable {
if ( shift ) {
if (sysread $socket, $buf, 1024, length($buf)) {
$time_left -= (time - $now);
$now = time;
context $socket, $time_left;
return again;
}
} else {
print "oops! a timeout\n";
}
$buf;
}
};
However, the exact opposite is true for timeout
. The following two lines both sleep 5 seconds:
lambda { context 5; timeout {} }
lambda { context time + 5; timeout {} }
Internally, timers use Time::HiRes::time
that gives the fractional number of seconds. This however is not required for the caller, because when high-res timers are not used, timeouts will simply be less precise, and will jitter plus-minus half a second.
Conditions
All conditions receive their parameters from the context stack, or simply the context. The only parameter passed to them by using perl call, is the callback itself. Conditions can also be called without a callback, in which case, they will pass further data that otherwise would be passed as @_
to the callback. Thus, a condition can be called either as
readable { .. code ... }
or
&readable(); # no callback
&readable; # DANGEROUS!! same as &readable(@_)
Conditions can either be used after explicit exporting
use IO::Lambda qw(:lambda);
lambda { ... }
or by using the package syntax,
use IO::Lambda;
IO::Lambda::lambda { ... };
Note: If you know concept of continuation-passing style, this is exactly how conditions work, except that closures are used instead of continuations (Brock Wilcox:thanks!) .
- lambda()
-
Creates a new
IO::Lambda
object. - io()
-
Same as
lambda
. - readable($filehandle, $deadline = undef)
-
Executes either when
$filehandle
becomes readable, or after$deadline
. Passes one argument, which is either TRUE if the handle is readable, or FALSE if time is expired. Ifdeadline
isundef
, then no timeout is registered, that means that it will never be called with FALSE. - writable($filehandle, $deadline = undef)
-
Exactly same as
readable
, but executes when$filehandle
becomes writable. - rwx($flags, $filehandle, $deadline = undef)
-
Executes either when
$filehandle
satisfies any of the condition in$flags
, or after$deadline
.$flags
is a combination of three integer constants,IO_READ
,IO_WRITE
, andIO_EXCEPTION
, that are imported byuse IO::Lambda qw(:constants);
Passes one argument, which is either a combination of the same
IO_XXX
flags, that report which conditions the handle satisfied, or 0 if time is expired. Ifdeadline
isundef
, no timeout is registered, i.e. will never return 0. - timeout($deadline)
-
Executes after
$deadline
.$deadline
cannot beundef
. - tail($lambda, @parameters)
-
Issues
$lambda-> call(@parameters)
, then waits for the$lambda
to complete. Sincecall
can only be done on inactive lambdas, will fail if@parameters
is not empty and$lambda
is already running.By default,
tail
resets lambda if is was alredy finished. This behavior can be changed by manipulatingautorestart
property. - tails(@lambdas)
-
Executes when all objects in
@lambdas
are finished, returns the collected, unordered results of the objects. - tailo(@lambdas)
-
Same as
tails
, but the results are ordered. - any_tail($deadline,@lambdas)
-
Executes either when all objects in
@lambdas
are finished, or$deadline
expires. Returns lambdas that were successfully executed during the allotted time. - context @ctx
-
If called with no parameters, returns the current context, otherwise replaces the current context with
@ctx
. It is thus not possible (not that it is practical anyway) to clear the context with this call. If really needed, usethis(this)
syntax. - this $this, @ctx
-
If called with no parameters, returns the current lambda. Otherwise, replaces both the current lambda and the current context. Can be useful either when juggling with several lambdas, or as a convenience hack over
my
variables, for example,this lambda { ... }; this-> wait;
instead of
my $q = lambda { ... }; $q-> wait;
- condition $lambda, $callback, $method, $name
-
Helper function for creating conditions, either from lambdas or from lambda constructors.
Example: convert existing
getline
constructor into a condition:sub gl(&) { getline-> call(context)-> condition( shift, \&gl, 'gl') } ... context $fh, $buf, $deadline; gl { ... }
Frames
These are functions to jump to previous callback frames to a previously saved context.
- again([$frame, [@context]])
-
Restarts the frame with the context. If
$frame
is given, jumps to the frame previously returned by arestartable
call, resetting the context to its previous state too. If@context
is given, it is used instead.All the conditions above, excluding
lambda
, are restartable withagain
call (seestart
for restarting alambda
). The codecontext $obj1; tail { return if $null++; context $obj2; again; };
is thus equivalent to
context $obj1; tail { context $obj2; &tail(); };
again
passes the current context to the condition.If
$frame
is provided, then it is treated as result of previousrestartable
call. It contains data sufficient to restarting another call, instead of the current. See Frames for details. - restartable([$name])
-
Save a frame.
restartable
can generate unique$name
itself if not given. All frames are deleted when a lambda is stopped.Example:
my $counter = 0; context lambda { $counter += 1 }; tail { return if 12 == shift; my $frame = restartable; context lambda { $counter += 10 }; tail { again($frame); } }
restartable
records the current content on the lambda, andagain
switches it back so thatagain
call goes to the firsttail
instead of the second. - delete_frame($frame)
-
Deletes existing saved frame. Can be used to clean up eventual circular references (see below).
- get_frame, set_frame(@frame), swap_frame(@frame)
-
A lower level frame accessors that save and restore all contexts. Do not use directly because it can easily used to inadvertedly create circular references, where
@frame
points to a callback while the callback via the closure mechanisms holds a reference to the@frame
variable (Thanks to Ben Tilly for bringing up the issue).
Stream IO
The whole point of this module is to help building protocols of arbitrary complexity in a clear, consequent programming style. Consider how perl's low-level sysread
and syswrite
relate to its higher-level readline
, where the latter not only does the buffering, but also recognizes $/
as input record separator. The section above described lower-level lambda I/O conditions, that are only useful for sysread
and syswrite
. This section tells about higher-level lambdas that relate to these low-level ones, as the aforementioned readline
relates to sysread
.
All functions in this section return the lambda, that does the actual work. Not unlike as a class constructor returns a newly created class instance, these functions return newly created lambdas. Such functions will be further referred as lambda constructors, or simply constructors. Therefore, constructors are documented here as having two inputs and one output, as for example a function sysreader
is a function that takes 0 parameters, always returns a new lambda, and this lambda, in turn, takes four parameters and returns two. This constructor will be described as
# sysreader() :: ($fh,$$buf,$length,$deadline) -> ($result,$error)
Since all stream I/O lambdas return same set of scalars, the return type will be further on referred as ioresult
:
# ioresult :: ($result, $error)
# sysreader() :: ($fh,$$buf,$length,$deadline) -> ioresult
ioresult
's first scalar is defined on success, and is not otherwise. In the latter case, the second scalar contains the error, usually either $!
or 'timeout'
(if $deadline
was set).
Before describing the actual functions, consider the code that may benefit from using them. Let's take a lambda that needs to implement a very simple HTTP/0.9 request:
lambda {
my $handle = shift;
my $buf = '';
context getline, $handle, \$buf;
tail {
my $req = shift;
die "bad request" unless $req =~ m[GET (.*)$]i;
do_request($handle, $1);
}}
getline
reads from $handle
to $buf
, and wakes up when a new line is there. However, what if we need, for example, HTTPS instead of HTTP, where reading from a socket may involve some writing, and of course some waiting? Then the first default parameter to getline has to be replaced. By default,
context getline, $handle, \$buf;
is the same as
my $reader = sysreader;
context getline($reader), $handle, \$buf;
where sysreader
creates a lambda $reader
, that given $handle
, awaits when it becomes readable, and reads from it. getline
, in turn, repeatedly calls $reader
, until the whole line is read.
Thus, we call
context getline(https_reader), $handle, \$buf;
instead, that should conform to sysreader signature:
sub https_reader
{
lambda {
my ( $fh, $buf, $length, $deadline) = @_;
# read from SSL socket
return $error ? (undef, $error) : $data;
}
}
I'm not showing the actual implementation of a HTTPS reader (if you're curious, look at IO::Lambda::HTTP::HTTPS ), but the idea is that inside that reader, it is perfectly fine to do any number of read and write operations, and wait for their completion too, as long as the upper-level lambda will sooner or later gets the data. getline
(or, rather, readbuf
that getline
is based on) won't care about internal states of the reader.
Check out t/06_stream.t that emulates reading and writing implemented in this fashion.
These functions are imported with
use IO::Lambda qw(:stream);
- sysreader() :: ($fh, $$buf, $length, $deadline) -> ioresult
-
Creates a lambda that accepts all the parameters used by
sysread
(except$offset
though), plus$deadline
. The lambda tries to read$length
bytes from$fh
into$buf
, when$fh
becomes available for reading. If$deadline
expires, fails with'timeout'
error. On successful read, returns number of bytes read, or$!
otherwise. - syswriter() :: ($fh, $$buf, $length, $offset, $deadline) -> ioresult
-
Creates a lambda that accepts all the parameters used by
syswrite
plus$deadline
. The lambda tries to write$length
bytes to$fh
from$buf
from$offset
, when$fh
becomes available for writing. If$deadline
expires, fails with'timeout'
error. On successful write, returns number of bytes written, or$!
otherwise. - readbuf($reader = sysreader()) :: ($fh, $$buf, $cond, $deadline) -> ioresult
-
Creates a lambda that is able to perform buffered reads from
$fh
, either using custom lambdareader
, or using one newly generated bysysreader
. The lambda, when called, reads continually from$fh
into$buf
, and either fails on timeout, I/O error, or end of file, or succeeds if$cond
condition matches.The condition
$cond
is a "smart match" of sorts, and can be one of:- integer
-
The lambda will succeed when
$buf
is exactly$cond
bytes long. - regexp
-
The lambda will succeed when
$cond
matches the content of$buf
. Note thatreadbuf
saves and restores value ofpos($$buf)
, so use of\G
is encouraged here. - coderef :: ($buf -> BOOL)
-
The lambda succeeds if coderef called with
$buf
returns true value. - undef
-
The lambda will succeed on end of file. Note that for all other conditions end of file is reported as an error, with literal
"eof"
string.
- writebuf($writer) :: ($fh, $$buf, $length, $offset, $deadline) -> ioresult
-
Creates a lambda that is able to perform buffered writes to
$fh
, either using custom lambdawriter
, or using one generated bysyswriter
. That writer lambda, in turn, writes continually$buf
(from$offset
,$length
bytes) and either fails on timeout or I/O error, or succeeds when$length
bytes are written successfully.If
$length
is undefined, buffer is continuously checked if it got new data. This feature can be used to implement concurrent writes. - getline($reader) :: ($fh, $$buf, $deadline) -> ioresult
-
Same as
readbuf
, but succeeds when a string of bytes ended by a newline is read.
Higher-order functions
Functions described in this section justify the lambda in IO::Lambda
. Named deliberately after the classic function names, they provide a similar interface.
These function are imported with
use IO::Lambda qw(:func);
- mapcar($lambda) :: @p -> @r
-
Given a
$lambda
, creates another lambda, that accepts array@p
, and sequentially executes$lambda
with each parameter from the array. The lambda returns results collected from the executed lambdas.print mapcar( lambda { 1 + shift })-> wait(1..5); 23456
mapcar
can be used for organizing simple loops:mapcar(curry { sendmail(shift) })-> wait(@email_addresses);
- filter($lambda) :: @p -> @r
-
Given a
$lambda
, creates another lambda, that accepts array@p
, and sequentially executes$lambda
with each parameter from the array. Depending on the result of the execution, parameters are either returned, or not returned back to the caller.print filter(lambda { shift() % 2 })-> wait(1..5); 135
- fold($lambda) :: @b -> @c; $lambda :: ($a,@b) -> @c
-
Given a
$lambda
, returns another lambda that accepts array@b
, and runs pairwise its members through$lambda
. Results of repeated execution of$lambda
is returned.print fold( lambda { $_[0] + $_[1] } )-> wait( 1..4 ); 10
- curry(@a -> $l) :: @a -> @b
-
curry
accepts a function that returns a lambda, and possible parameters to it. Returns a new lambda, that will execute the inner lambda, and returns its result as is. For example,context $lambda, $a, $b, $c; tail { ... }
where
$lambda
accepts three parameters, can be rewritten as$m = curry { $lambda, $a, $b }; context $m, $c; tail { ... }
Another example, tie
readbuf
with a filehandle and buffer:my $readbuf = curry { readbuf, $fh, \(my $buf = '') };
- seq() :: @a -> @b
-
Creates a new lambda that executes all lambdas passed to it in
@a
sequentially, one after another. The lambda returns results collected from the executed lambdas.sub seq { mapcar curry { shift }} print seq-> wait( map { my $k = $_; lambda { $k } } 1..5); 12345
- par($max = 0) :: @a -> @b
-
Given a limit
$max
, returns a new lambda that accepts lambdas in@a
to be executed in parallel, but so that number of lambdas that run simultaneously never goes higher than the limit. The lambda returns results collected from the executed lambdas.If
$max
is undefined or 0, behaves similar to a lambda version oftails
, i.e., all of the lambdas are run in parallel.The code below prints 123, then sleeps, then 456, then sleeps, then 789.
par(3)-> wait( map { my $k = $_; lambda { context 0.5; timeout { print $k, "\n" } } } 1..9);
Object API
This section lists methods of IO::Lambda
class. Note that by design all lambda-style functionality is also available for object-style programming. Together with the fact that lambda syntax is not exported by default, it thus leaves a place for possible implementations of user-defined syntax, either with or without lambdas, on top of the object API, without accessing the internals.
The object API is mostly targeted to developers that need to connect third-party asynchronous event libraries with the lambda interface.
- new($class, $start)
-
Creates new
IO::Lambda
object in the passive state.$start
will be called once, after the lambda gets active. - watch_io($flags, $handle, $deadline, $callback, $cancel)
-
Registers an IO event listener that calls
$callback
either after$handle
satisfies condition of$flags
( a combination of IO_READ, IO_WRITE, and IO_EXCEPTION bits), or after$deadline
time is passed. If$deadline
is undef, watches for the file handle indefinitely.The callback is called with first parameter as integer set of IO_XXX flags, or 0 if the callback was timed out. Other parameters, as it is the case with the other callbacks, are passed the result of the last called callback attached to the same lambda. The result of this callback will then be stored and passed on to the next callback in the same fashion.
If the event is cancelled with
cancel_event
, then$cancel
callback is executed. The result of this callback will be stored and passed on, in the same manner as results and parameters to$callback
. - watch_timer($deadline, $callback, $cancel)
-
Registers a timer listener that calls
$callback
after$deadline
time. - watch_lambda($lambda, $callback, $cancel)
-
Registers a listener that calls
$callback
after$lambda
, aIO::Lambda
object is finished. If$lambda
is in passive state, it is started first. - is_stopped
-
Reports whether lambda is stopped or not.
- is_waiting
-
Reports whether lambda has any registered callbacks left or not.
- is_passive
-
Reports if lambda wasn't run yet. Is true when the lambda is in a state after either
new
orreset
are called. - is_active
-
Reports if lambda was run.
- reset
-
Cancels all watchers and switches the lambda to the passive state. If there are any lambdas that watch for this object, these will be called first.
- autorestart
-
If set, gives permission to watchers to reset the lambda if it becomes stopped.
tail
does that when needed, other watchers are allowed to do that too. Is set by default. - peek
-
At any given time, returns stored data that are either passed in by
call
if the lambda is in the passive state, or stored result of execution of the latest callback. - start
-
Starts a passive lambda. Can be used for effective restart of the whole lambda; the only requirement is that the lambda should have no pending events.
- call @args
-
Stores
@args
internally, to be passed on to the first callback. Only works in passive state, croaks otherwise. If called multiple times, arguments from the previous calls are overwritten. - terminate @args
-
Cancels all watchers and resets lambda to the stopped state. If there are any lambdas that watch for this object, these will be notified first.
@args
will be stored and available for later calls bypeek
. - destroy
-
Cancels all watchers and resets lambda to the stopped state. Does the same to all lambdas the caller lambda watches after, recursively. Useful where explicit, long-lived lambdas shouldn't be subject to the global destruction, which kills objects in random order;
destroy
kills them in some order, at least. - wait @args
-
Waits for the caller lambda to finish, returns the result of
peek
. If the object was in passive state, callscall(@args)
, otherwise@args
are not used. - wait_for_all @lambdas
-
Waits for caller lambda and
@lambdas
to finish. Returns collection ofpeek
results for all objects. The results are unordered. - wait_for_any @lambdas
-
Waits for at least one lambda from the list of caller lambda and
@lambdas
to finish. Returns list of finished objects. - yield $nonblocking = 0
-
Runs one round of dispatching events. Returns 1 if there are more events in internal queues, 0 otherwise. If
$NONBLOCKING
is set, exits as soon as possible, otherwise waits for events; this feature can be used for organizing event loops withoutwait/run
calls. - run
-
Enters the event loop and doesn't exit until there are no registered events. Can be also called as package method.
- bind $cancel, @args
-
Creates an event record that contains the lambda and
@args
, and returns it. The lambda won't finish until this event is returned withresolve
.$cancel
is an optional callback that will be called when the event is cancelled; the callback is passed two parameters, the lambda and the cancelled event record.bind
can be called several times on a single lambda; each event requires individualresolve
. - resolve $event
-
Removes
$event
from the internal waiting list. If a lambda has no more events to wait, notifies eventual lambdas that wait to the objects, and then stops.Note that
resolve
doesn't provide any means to call associated callbacks, which is intentional. - intercept $condition [ $state = '*' ] $coderef
-
Installs a
$coderef
as an overriding hook for a condition callback, where condition istail
,readable
,writable
, etc. Whenever a condition callback is being called, the$coderef
hook will be called instead, that should be able to analyze the call, and allow or deny it the further processing.$state
, if omitted, is equivalent to'*'
, that means that checks on lambda state are omitted too. Setting$state
toundef
is allowed though, and will match when the lambda state is also undefined (which it is by default).There can exist more than one
intercept
handlers, stacked on top of each other. If$coderef
isundef
, the last registered hook is removed.Example:
my $q = lambda { ... tail { ... }}; $q-> intercept( tail => sub { if ( stars are aligned right) { # pass return this-> super(@_); } else { return 'not right'; } });
See also
state
,super
, andoverride
. - override $condition [ $state = '*' ] $coderef
-
Installs a
$coderef
as an overriding hook for a condition -tail
,readable
,writable
, etc, possibly with a named state. Whenever a lambda calls one of these condition, the$coderef
hook will be called instead, that should be able to analyze the call, and allow or deny it the further processing.$state
, if omitted, is equivalent to'*'
, that means that checks on lambda state are omitted too. Setting$state
toundef
is allowed though, and will match when the lambda state is also undefined (which it is by default).There can exist more than one
override
handlers, stacked on top of each other. If$coderef
isundef
, the last registered hook is removed.Example:
my $q = lambda { ... tail { ... }}; $q-> override( tail => sub { if ( stars are aligned right) { # pass this-> super; } else { # deny and rewrite result return tail { 'not right' } } });
See also
state
,super
, andintercept
. - super
-
Analogous to Perl's
SUPER
, but on the condition level, this method is designed to be called from overridden conditions to call the original condition or callback.There is a slight difference in the call syntax, depending on whether it is being called from inside an
override
orintercept
callback. Theintercept
'ed callback will call the previous callback right away, and may call it with parameters directly. Theoverride
callback will only call the condition registration routine itself, not the callback, and therefore is called without parameters. See intercept and override for examples of use. - state $state
-
A helper function for explicit naming of condition calls. The function stores the
$state
string on the current lambda; this string can be used in calls tointercept
andoverride
to identify a particular condition or a callback.The recommended use of the method is when a lambda contains more than one condition of a certain type; for example the code
tail { tail { ... }}
is therefore better to be written as
state A => tail { state B => tail { ... }}
Exceptions and backtrace
In addition to the normal call stack as reported by the caller
builtin, it can be useful also to access execution information of the thread of events, when a lambda waits for another, which in turn waits for another, etc. The following functions deal with backtrace information and exceptions, that propagate through thread of events.
- catch $coderef, $event
-
Registers $coderef on $event, that is called when $event is aborted via either
cancel_event
,cancel_all_event
, orterminate
:my $resource = acquire; context lambda { .. $resource .. }; catch { $resource-> free; } tail { $resource-> free; }
catch
must be invoked after a condition, but in the syntax above that means thatcatch
should lexically come before it. If undesirable, use explicit event reference:my $event = tail { ... }; catch { ... }, $event;
- autocatch $event
-
Prefixes a condition, so that it is called even if cancelled. However, immediately after the call the exception is rethrown. Can be used in the following fashion:
context lambda ...; autocatch tail { print "aborted\n" if this-> is_cancelling; .. finalize ... };
- is_cancelling
-
Returns true if running within a
catch
block. - call_again(@param)
-
To be called only from within a
catch
block. Calls the normal callback that would be called if the event wouldn't be cancelled.@param
is passed to the callback. - throw(@error)
-
Terminates the current lambda, then propagates
@error
to the immediate caller lambdas. They will have a chance to catch the exception withcatch
later, and re-throw by callingthrow
again. The default action is to propagate the exception further.When there are no caller lambdas, a
sigthrow
callback is called ( analog: die outside eval calls $SIG{__DIE__} ). - sigthrow($callback :: ($lambda, @error))
-
Retrieves and sets a callback that is invoked when
throw
is called on lambda that no lambdas wait for. By default, is empty. When invoked, is passed the lambda, and parameters passed tothrow
. - callers
-
Returns event records that watch for the lambda.
- callees
-
Returns event records that corresponds to the lambdas this lambda watches.
- backtrace
-
Returns a
IO::Lambda::Backtrace
object that represents thread of events which leads to the current lambda. See IO::Lambda::Backtrace for more.
MISCELLANEOUS
Included modules
IO::Lambda::Backtrace - debug chains of events
IO::Lambda::Signal - POSIX signals.
IO::Lambda::Socket - lambda versions of
connect
,accept
etc.IO::Lambda::HTTP - implementation of HTTP and HTTPS protocols. HTTPS requires IO::Socket::SSL, NTLM/Negotiate authentication requires Authen::NTLM modules (not marked as dependencies).
IO::Lambda::DNS - asynchronous domain name resolver.
IO::Lambda::SNMP - SNMP requests lambda style. Requires SNMP.
IO::Lambda::Thread - run blocking code executed in another thread. Requires perl version greater than 5.8.0, preferably 5.10.0, built with threads.
IO::Lambda::Fork - run blocking code executed in another process context. Doesn't work on win32 for obvious reasons.
IO::Lambda::Message - base class for message queues over existing file handles.
IO::Lambda::DBI - asynchronous DBI
IO::Lambda::Poll - generic polling wrapper
IO::Lambda::Flock - flock(2) wrapper
IO::Lambda::Mutex - wait for a shared resource
Debugging
Various sub-modules can be controlled with the single environment variable, IO_LAMBDA_DEBUG
, which is treated as a comma-separated list of modules. For example,
env IO_LAMBDA_DEBUG=io=2,http perl script.pl
displays I/O debug messages from IO::Lambda
(with extra verbosity) and from IO::Lambda::HTTP
. IO::Lambda
responds for the following keys:
- io
-
Prints debugging information about file and timeout asynchronous events.
- lambda
-
Print debugging information about event flow of lambda objects, where one object waits for another, lambda being cancelled, finished, etc.
- caller
-
Increase verbosity of lambda by storing information about which line invoked object creation and subscription. See IO::Lambda::Backtrace for more.
- die
-
If set, fatal errors dump the stack trace.
- loop=MODULE
-
Sets loop module, one of: Select, AnyEvent, Prima, POE.
Keys recognized for the other modules: select,dbi,http,https,signal,message,thread,fork,poll,flock.
Online information
Project homepage: http://iolambda.karasik.eu.org/
Mailing list: io-lambda-general at lists.sourceforge.net, thanks to sourceforge. Subscribe by visiting https://lists.sourceforge.net/lists/listinfo/io-lambda-general.
Benchmarks
A single-process TCP client and server; server echoes back everything is sent by the client. 500 connections sequentially created, instructed to send a single line to the server, and destroyed.
2.4GHz x86-64 linux 1.2GHz win32 Lambda/select 0.697 7.468 Lambda/select, optimized 0.257 5.273 Lambda/AnyEvent 0.648 8.175 Lambda/AnyEvent, optimized 7.087 Raw sockets using select 0.149 4.859 POE/select, components 1.185 12.306 POE/select, raw sockets 0.382 6.233 POE/select, optimized 0.770 7.510
See benchmarking code in eg/bench.
Apologetics
There are many async libraries readily available from CPAN. IO::Lambda
is yet another one. How is it different from the existing tools? Why use it? To answer these questions, I need to show the evolution of async libraries, to explain how they grew from simple tools to complex frameworks.
First, all async libraries are based on OS-level syscalls, like select
, poll
, epoll
, kqueue
, and Win32::WaitForMultipleObjects
. The first layer provides access to exactly these facilities: there are IO::Select
, IO::Epoll
, IO::Kqueue
etc. I won't go deeper into describing pros and cons for programming on this level, this should be obvious.
Perl modules of the next abstraction layer are often characterised by portability and event loops. While the modules of the first layer are seldom portable, and have no event loops, the second layer modules strive to be OS-independent, and use callbacks to ease the otherwise convoluted ways async I/O would be programmed. These modules mostly populate the "asynchronous input-output programming frameworks" niche in the perl world. The examples are many: IO::Events
, EV
, AnyEvent
, IO::NonBlocking
, IO::Multiplex
, to name a few.
Finally, there's the third layer of complexity, which, before IO::Lambda
, had a single representative: POE
(now, to the best of my knowledge, IO::Async
also partially falls in this category). Modules of the third layer are based on concepts from the second, but introduce a powerful tool to help the programming of complex protocols, something that isn't available in the second layer modules: finite state machines (FSMs). The FSMs reduce programming complexity, for example, of intricate network protocols, that are best modelled as a set of states in a logical circuit. Also, the third layer modules are agnostic of the event loop module: the programmer is (almost) free to choose the event loop backend, such as native select
, Gtk
, EV
, Prima
, or AnyEvent
, depending on the nature of the task.
IO::Lambda
allows the programmer to build protocols of arbitrary complexity, and is also based on event loops, callbacks, and is portable. It differs from POE
in the way the FSMs are declared. Where POE
requires an explicit switch from one state to another, using f.ex. post
or yield
commands, IO::Lambda
incorporates the switching directly into the program syntax. Consider POE
code:
POE::Session-> create(
inline_states => {
state1 => sub {
print "state1\n";
$_[ KERNEL]-> yield("state2");
},
state2 => sub {
print "state2\n";
},
});
and the corresponding IO::Lambda
code (state1 and state2 are conditions, they need to be declared separately):
lambda {
state1 {
print "state1\n";
state2 {
print "state2\n";
}}
}
In IO::Lambda
, the programming style is (deliberately) not much different from the declarative
print "state1\n";
print "state2\n";
as much as the nature of asynchronous programming allows that.
To sum up, the intended use of IO::Lambda
is for areas where simple callback-based libraries require lots of additional work, and where state machines are beneficial. Complex protocols like HTTP, parallel execution of several tasks, strict control of task and protocol hierarchy - this is the domain where IO::Lambda
works best.
LICENSE AND COPYRIGHT
This work is partially sponsored by capmon ApS.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
Dmitry Karasik, <dmitry@karasik.eu.org>.
I wish to thank those who helped me:
Ben Tilly for providing thorough comments to the code in the synopsis, bringing up various important issues, valuable discussions, for his patience and dedicated collaboration.
David A. Golden for discussions about names, and his propositions to rename some terms into more appropriate, such as "read" to "readable", and "predicate" to "condition". Rocco Caputo for optimizing the POE benchmark script. Randal L. Schwartz, Brock Wilcox, and zby@perlmonks helped me to understand how the documentation for the module could be made better.
All the good people on perlmonks.org and perl conferences, who invested their time into understanding the module.