NAME
IPC::MPS - Message Passing Style of Inter-process communication
SYNOPSIS
use IPC::MPS;
my $vpid = spawn {
receive {
msg ping => sub {
my ($from, $i) = @_;
print "Ping ", $i, " from $from\n";
snd($from, "pong", $i);
};
};
};
snd($vpid, "ping", 1);
receive {
msg pong => sub {
my ($from, $i) = @_;
print "Pong $i from $from\n";
if ($i < 3) {
snd($from, "ping", $i + 1);
} else {
exit;
}
};
};
DESCRIPTION
The messaging system between parental and child processes, and between child processes, that have the same parent.
Moto: inter-process communication without blocking.
Concurrency programming
The peculiarity of the system is that the messaging between child processes is handled by the parents. That is why we recommend using the parental processes just to coordinate the working process and to store data.
The messages are handled by the UNIX sockets.
$vpid = spawn {
...
receive {
msg "name" => sub {
my ($from, @args) = @_;
...
};
msg "name" => sub { ... };
msg "name" => sub { ... };
...
};
};
Child processes are created not when spawn is called, they are created later when receive is called, just before send-receive cycle is called. It is necessary so that all vpid are defined by fork call. vpid is an address of the link to the socket from main process to the child one.
Other spawn may be created inside spawn. If spawn is created inside receive, receive also must be called to start child processes. New receive will add its information to the old one and pass the control to the old receive messaging cycle.
The message sending.
snd($vpid, "msg name", @args);
if vpid = 0 , this is a message to the parental process.
If the parental process is over, the child process ends too.
To detect spawn closing message SPAWN_CLOSED handler should be defined:
msg SPAWN_CLOSED => sub {
my ($vpid) = @_;
...
};
Note, for your convenience "close" and "exit" messages are special. If a process is sent a "close" or "exit" message, then any messages from it are ignored, SPAWN_CLOSED and NODE_CLOSED too.
To break receive use "quit" subroutine.
The vpid2pid function accepts the vpid argument and returns OS PID. Importnat! PID will be available only after the receive subroutine is called.
Dataflow programming
Sometimes you may need to get additional information from other processes and only then continue the message processing. For this you may send a message with information request and then wait information getting in a proper place by subprogram wt (abbreviated "wait"), without current message processing break.
snd("vpid_1", "msg_1", @args_1);
snd("vpid_2", "msg_2", @args_2);
my $r = wt("vpid_1", "msg_1");
...
my @r = wt("vpid_2", "msg_2");
Subprogram wt starts new waiting cycle, sending of old messages continues and receiving of new messages starts, but new messages are not processed, they are accumulated in a buffer. When the response to a needed message is received, this waiting cycle ends and wt returns the response --- the processing of the initial message continues.
my $r = snd_wt($vpid, $msg, @args);
is a shortening for:
snd($vpid, $msg, @args);
my $r = wt($vpid, $msg);
The main differences from Erlang
Attention, this is not Erlang, this is Perl IPC::MPS. The main differences, subsequent upon one another:
Full operating system processes.
Subprogram 'spawn' doesn't create processes directly, it just performs the preparative operations. The processes are created when 'receive' is called.
'Receive' is repeated, not a one-time as in Erlang.
'Receive' inside 'receive' doesn't supersede the previous one, but adds a new message handlers and starts new processes.
To wait the response to a specific message inside handler, subprogram 'wt' should be used. In Erlang it is done with the same 'receive'.
Distributed Programming
To transform the current process to a node you need to call 'listener' subprogram:
listener($host, $port);
Connecting to the remote node is done with 'open_node' subprogram:
my $vpid = open_node($host, $port);
You may set youself pack and unpack functions, instead of freeze and thaw functions of Storable module:
listener($host, $port, pack => sub { ... }, unpack => sub { ... });
my $vpid = open_node($host, $port, pack => sub { ... }, unpack => sub { ... });
To detect connection closing message NODE_CLOSED handler should be defined:
msg NODE_CLOSED => sub {
my ($vpid, $connected) = @_;
if ($connected) {
print "Node closed.\n";
} else {
print "Cannot connect to node: $!.\n";
}
...
};
This statement is true for both the client and the server.
EXAMPLES
Ping Pong
use IPC::MPS;
my $ping_pong = 3;
my ($vpid1, $vpid2);
$vpid1 = spawn {
snd($vpid2, "ping", 1);
receive {
msg pong => sub {
my ($from, $i) = @_;
print "Pong $i from $from\n";
if ($i < $ping_pong) {
snd($from, "ping", $i + 1);
} else {
snd(0, "exit");
}
};
};
};
$vpid2 = spawn {
receive {
msg ping => sub {
my ($from, $i) = @_;
print "Ping ", $i, " from $from\n";
snd($from, "pong", $i);
};
};
};
receive {
msg exit => sub {
print "EXIT\n";
exit;
};
};
Triplex circular Ping Pong
use IPC::MPS;
my $ping_pong = 5;
sub ping_pong($) {
my $vpid = shift;
sub {
msg ping => sub {
my ($from, @args) = @_;
print "Ping ", $args[0], " from $from\n";
snd($from, "pong", $args[0]);
if ($args[0] < $ping_pong) {
snd($vpid, "ping", $args[0] + 1, $$);
}
};
msg pong => sub {
my ($from, @args) = @_;
print "Pong ", $args[0], " from $from\n";
unless ($args[0] < $ping_pong) {
snd(0, "exit");
}
};
};
}
my ($vpid1, $vpid2, $vpid3);
$vpid1 = spawn {
snd($vpid2, "ping", 1, $$);
receive { ping_pong($vpid2)->() };
};
$vpid2 = spawn {
receive { ping_pong($vpid3)->() };
};
$vpid3 = spawn {
receive { ping_pong($vpid1)->() };
};
receive {
msg exit => sub {
print "EXIT\n";
exit;
};
};
Tree
use IPC::MPS;
my $vpid1 = spawn {
my $vpid2 = spawn {
receive {
msg hello2 => sub {
print "Hello 2\n";
};
};
};
receive {
msg hello1 => sub {
print "Hello 1\n";
snd($vpid2, "hello2");
my $vpid3 = spawn {
receive {
msg hello3 => sub {
print "Hello 3\n";
};
};
};
snd($vpid3, "hello3");
receive {};
};
};
};
spawn {
sleep 1;
print "SLEEP\n";
snd(0, "exit");
receive {};
};
snd($vpid1, "hello1");
receive {
msg exit => sub {
print "EXIT\n";
exit;
};
};
Waiting
Waiting for a response to a specific message.
use IPC::MPS;
my $vpid = spawn {
receive {
msg foo => sub {
my ($from, $text) = @_;
print "foo: $text\n";
snd(0, "too", 1);
print "too -> baz\n";
my $rv = wt(0, "baz");
print "baz: $rv\n";
my @rv = snd_wt(0, "sugar", $rv);
print "sugar: $rv[0]\n";
exit;
};
};
};
snd($vpid, "foo", "Hello, wait");
receive {
msg too => sub {
my ($from, $i) = @_;
print "too: $i\n";
snd($from, "baz", ++$i);
};
msg sugar => sub {
my ($from, $i) = @_;
snd($from, "sugar", ++$i);
};
};
DEMO
See directory demo.
REALISATIONS
IPC::MPS based on IO::Select.
IPC::MPS::Event based on Event.
IPC::MPS::EV based on EV.
Compatibility with Event, EV, AnyEvent based modules
IPC::MPS::Event and IPC::MPS::EV allows usage of side modules based on Event, EV modules accordingly (directly or thru AnyEvent).
Timer
use IPC::MPS::Event;
use Event;
my $vpid = spawn {
receive {
msg ping => sub {
my ($from, $hello) = @_;
print "$hello; $$\n";
Event->timer(after => 1, cb => sub {
snd($from, "pong", "hy");
});
};
};
};
snd($vpid, "ping", "hello");
receive {
msg pong => sub {
my ($from, $hello) = @_;
print "$hello; $$\n";
print "EXIT\n";
exit;
};
};
AnyEvent::HTTP
use IPC::MPS::Event;
use AnyEvent::HTTP;
my $vpid = spawn {
receive {
msg req => sub {
my ($from, $url) = @_;
http_get $url, sub {
print ${$_[1]}{URL}, "\t", ${$_[1]}{Status}, "; $$\n";
snd($from, "res", $url, ${$_[1]}{Status});
};
};
};
};
snd($vpid, "req", "http://localhost/");
receive {
msg res => sub {
my ($from, $url, $status) = @_;
print "$url\t$status; $$\n";
print "EXIT\n";
exit;
};
};
SEE ALSO
Sometimes it is easier to use a module BGS - Background execution of subroutines in child processes.
AUTHOR
Nick Kostyria
COPYRIGHT AND LICENSE
Copyright (C) 2009 by Nick Kostyria
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.8 or, at your option, any later version of Perl 5 you may have available.