NAME
IO::Lambda::Message - message passing queue
DESCRIPTION
The module implements a generic message passing protocol, and two generic classes that implement the server and the client functionality. The server code is implemented in a simple, blocking fashion, and is expected to be executed remotely. The client API is written in lambda style, where message completion can be asynchronously awaited for. The communication between server and client is done through two file handles of any type ( stream sockets, pipes, etc ).
SYNOPSIS
use IO::Lambda::Message qw(message);
lambda {
my $messenger = IO::Lambda::Message-> new( \*READER, \*WRITER);
context $messenger-> new_message('hello world');
tail {
print "response1: @_, "\n";
context $messenger, 'same thing';
message {
print "response2: @_, "\n";
undef $messenger;
}}}
Message protocol
The message passing protocol featured here is synchronous, which means that any message initiated either by server or client is expected to be replied to. Both server and client can wait for the message reply, but they cannot communicate while waiting.
Messages are prepended with simple header, that is a 8-digit hexadecimal length of the message, and 1 byte with value 0x0A (newline). After the message another 0x0A byte is followed.
IO::Lambda::Message
The class implements a generic message passing queue, that allows adding asynchronous messages to the queue, and wait for the response.
- new $class, $reader, $writer, %options
-
Constructs a new object of
IO::Lambda::Message
class, and attaches to$reader
and$writer
file handles ( which can be the same object, and in which case$writer
can be omitted, but only if%options
is empty too). Accepted options:- reader :: ($fh, $buf, $cond, $deadline) -> ioresult
-
Custom reader,
sysreader
by default. - writer :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
-
Custom writer,
syswriter
by default. - buf :: string
-
If
$reader
handle was used (or will be needed to be used) in buffered I/O, its buffer can be passed along to the object. - async :: boolean
-
If set, the object will listen for incoming messages from the server, otherwise it will only initiate outcoming messages. By default set to 0, and the method
incoming
that handles incoming messages, dies. This functionality is designed for derived classes, not for the caller.
- new_message($message, $deadline = undef) :: () -> ($response, $error)
-
Registers a new message in the queue. The message must be delivered and replied to no later than
$deadline
, and returns a lambda that will be ready when the message is responded to. The lambda returns the response or the error.Upon communication error, all queued messages are discarded. Timeout is regarded as a protocol error too, so use the
$deadline
option with care. That means, as soon the deadline error is fired, communication is no longer possible; the remote will wait for its eventual response to be read by your program, which no longer listens. And if it tries to write to the socket again, the whole thing will deadlock. Consider using other means to wait for the message with a timeout. - message ($message, $deadline = undef) :: () -> ($response, $error)
-
Condition version of
new_message
. - cancel_queue(@reason)
-
Cancels all pending messages, stores
@reason
in the associated lambdas. - error
-
Returns the last protocol handling error. If set, no new messages are allowed to be registered, and listening will fail too.
- is_listening
-
If set, object is listening for asynchronous events from server.
- is_pushing
-
If set, object is sending messages to the server.
IO::Lambda::Message::Simple
The class implements a simple generic protocol dispatcher, that executes methods of its own class, and returns the results back to the client. The methods have to be defined in a derived class.
- new $reader [$writer = $reader]
-
Creates a new object that will communicate with clients using given handles, in a blocking fashion.
- run
-
Starts the message loop
- quit
-
Signals the loop to stop
SEE ALSO
AUTHOR
Dmitry Karasik, <dmitry@karasik.eu.org>.