NAME

IPC::MPS - Message Passing Style of Inter-process communication

SYNOPSIS

use IPC::MPS;

my $vpid = spawn { 
	receive {
		msg ping => sub {
			my ($from, $i) = @_;
			print "Ping ", $i, " from $from\n";
			snd($from, "pong", $i);
		};
	};
};

snd($vpid, "ping", 1);
receive { 
	msg pong => sub {
		my ($from, $i) = @_;
		print "Pong $i from $from\n";
		if ($i < 3) {
			snd($from, "ping", $i + 1);
		} else {
			exit;
		}
	};
};

DESCRIPTION

The messaging system between parental and child processes, and between child processes, that have the same parent.

Moto: inter-process communication without blocking.

Concurrency programming

The peculiarity of the system is that the messaging between child processes is handled by the parents. That is why we recommend using the parental processes just to coordinate the working process and to store data.

The messages are handled by the UNIX sockets.

$vpid = spawn {
   ...
   receive {
     msg "name" => sub {
       my ($from, @args) = @_;
       ...
     };
     msg "name" => sub { ... };
     msg "name" => sub { ... };
     ...
   };
 };

Child processes are created not when spawn is called, they are created later when receive is called, just before send-receive cycle is called. It is necessary so that all vpid are defined by fork call. vpid is an address of the link to the socket from main process to the child one.

Other spawn may be created inside spawn. If spawn is created inside receive, receive also must be called to start child processes. New receive will add its information to the old one and pass the control to the old receive messaging cycle.

The message sending.

snd($vpid, "msg name", @args);

if vpid = 0 , this is a message to the parental process.

If the parental process is over, the child process ends too.

Dataflow programming

Sometimes you may need to get additional information from other processes and only then continue the message processing. For this you may send a message with information request and then wait information getting in a proper place by subprogram wt (abbreviated "wait"), without current message processing break.

snd("vpid_1", "msg_1", @args_1);
snd("vpid_2", "msg_2", @args_2);

my $r = wt("vpid_1", "msg_1");
...
my @r = wt("vpid_2", "msg_2");

Subprogram wt starts new waiting cycle, sending of old messages continues and receiving of new messages starts, but new messages are not processed, they are accumulated in a buffer. When the response to a needed message is received, this waiting cycle ends and wt returns the response --- the processing of the initial message continues.

my $r = snd_wt($vpid, $msg, @args);

is a shortening for:

snd($vpid, $msg, @args);
my $r = wt($vpid, $msg);

The main differences from Erlang

Attention, this is not Erlang, this is Perl IPC::MPS. The main differences, subsequent upon one another:

  1. Full operating system processes.

  2. Subprogram 'spawn' doesn't create processes directly, it just performs the preparative operations. The processes are created when 'receive' is called.

  3. 'Receive' is repeated, not a one-time as in Erlang.

  4. 'Receive' inside 'receive' doesn't supersede the previous one, but adds a new message handlers and starts new processes.

  5. To wait the response to a specific message inside handler, subprogram 'wt' should be used. In Erlang it is done with the same 'receive'.

Distributed Programming

To transform the current process to a node you need to call 'listener' subprogram:

listener($host, $port);

Connecting to the remote node is done with 'open_node' subprogram:

my $vpid = open_node($host, $port);

To detect connection closing message NODE_CLOSED handler should be defined:

msg NODE_CLOSED => sub { 
	my ($vpid) = @_;
	...
};

This statement is true for both the client and the server.

EXAMPLES

Ping Pong

 use IPC::MPS;
 
 my $ping_pong = 3;
 
 my ($vpid1, $vpid2);
 
 $vpid1 = spawn {
 	snd($vpid2, "ping", 1);
 	receive { 
 		msg pong => sub {
 			my ($from, $i) = @_;
 			print "Pong $i from $from\n";
 			if ($i < $ping_pong) {
 				snd($from, "ping", $i + 1);
 			} else {
 				snd(0, "exit");
 			}
 		};
 	};
 };
 
 $vpid2 = spawn { 
 	receive {
 		msg ping => sub {
 			my ($from, $i) = @_;
 			print "Ping ", $i, " from $from\n";
 			snd($from, "pong", $i);
 		};
 	};
 };
 
 receive {
 	msg exit => sub {
 		print "EXIT\n";
		exit;
 	};
 };

Triplex circular Ping Pong

 use IPC::MPS;
 
 my $ping_pong = 5;
 
 sub ping_pong($) {
 	my $vpid = shift;
 	sub {
 		msg ping => sub {
 			my ($from, @args) = @_;
 			print "Ping ", $args[0], " from $from\n";
 			snd($from, "pong", $args[0]);
 			if ($args[0] < $ping_pong) {
 				snd($vpid, "ping", $args[0] + 1, $$);
 			}
 		};
 		msg pong => sub {
 			my ($from, @args) = @_;
 			print "Pong ", $args[0], " from $from\n";
 			unless ($args[0] < $ping_pong) {
 				snd(0, "exit");
 			}
 		};
 	};
 }
 
 
 my ($vpid1, $vpid2, $vpid3);
 
 $vpid1 = spawn {
 	snd($vpid2, "ping", 1, $$);
 	receive { ping_pong($vpid2)->() };
 };
 
 $vpid2 = spawn { 
 	receive { ping_pong($vpid3)->() };
 };
 
 $vpid3 = spawn { 
 	receive { ping_pong($vpid1)->() };
 };
 
 
 receive {
 	msg exit => sub {
 		print "EXIT\n";
		exit;
 	};
 };

Tree

use IPC::MPS;

my $vpid1 = spawn {

	my $vpid2 = spawn {
		receive { 
		 	msg hello2 => sub {
		 		print "Hello 2\n";
		 	};
		};
	};

	receive {
		msg hello1 => sub {
			print "Hello 1\n";
			snd($vpid2, "hello2");

			my $vpid3 = spawn {
				receive { 
					msg hello3 => sub {
						print "Hello 3\n";
					};
				};
			};

			snd($vpid3, "hello3");
			receive {};
		};
	};
};

spawn {
	sleep 1;
	print "SLEEP\n";
	snd(0, "exit");
	receive {};
};

snd($vpid1, "hello1");
receive {
	msg exit => sub {
		print "EXIT\n";
		exit;
	};
};

Waiting

Waiting for a response to a specific message.

use IPC::MPS;

my $vpid = spawn {
	receive { 
		msg foo => sub {
			my ($from, $text) = @_;
			print "foo: $text\n";

			snd(0, "too", 1);
			print "too -> baz\n";

			my $rv = wt(0, "baz");
			print "baz: $rv\n";

			my @rv = snd_wt(0, "sugar", $rv);
			print "sugar: $rv[0]\n";

			exit;
		};
	};
};


snd($vpid, "foo", "Hello, wait");

receive {
	msg too => sub {
		my ($from, $i) = @_;
		print "too: $i\n";
		snd($from, "baz", ++$i);
	};
	msg sugar => sub {
		my ($from, $i) = @_;
		snd($from, "sugar", ++$i);
	};
};

DEMO

See directory demo.

REALISATIONS

Compatibility with Event, EV, AnyEvent based modules

IPC::MPS::Event and IPC::MPS::EV allows usage of side modules based on Event, EV modules accordingly (directly or thru AnyEvent).

Timer

use IPC::MPS::Event;
use Event;

my $vpid = spawn {
	receive {
		msg ping => sub {
			my ($from, $hello) = @_;
			print "$hello; $$\n";
			Event->timer(after => 1, cb => sub {
				snd($from, "pong", "hy");
			});
		};
	};
};

snd($vpid, "ping", "hello");

receive {
	msg pong => sub {
		my ($from, $hello) = @_;
		print "$hello; $$\n";
		print "EXIT\n";
		exit;
	};
};

AnyEvent::HTTP

use IPC::MPS::Event;
use AnyEvent::HTTP;

my $vpid = spawn {
	receive {
		msg req => sub {
			my ($from, $url) = @_;
			http_get $url, sub { 
				print ${$_[1]}{URL}, "\t", ${$_[1]}{Status}, "; $$\n";
				snd($from, "res", $url, ${$_[1]}{Status});
			};
		};
	};
};

snd($vpid, "req", "http://localhost/");

receive {
	msg res => sub {
		my ($from, $url, $status) = @_;
		print "$url\t$status; $$\n";
		print "EXIT\n";
		exit;
	};
};

EV note

Using EV based modules one should call EV::loop clearly:

 use IPC::MPS::EV;
 use AnyEvent::HTTP;
 ...
 			http_get $url, sub { 
 				print ${$_[1]}{URL}, "\t", ${$_[1]}{Status}, "; $$\n";
 				snd($from, "res", $url, ${$_[1]}{Status});
 			};
			use EV; EV::loop;
 ...

SEE ALSO

Sometimes it is easier to use a module BGS - Background execution of subroutines in child processes.

AUTHOR

Nick Kostirya

COPYRIGHT AND LICENSE

Copyright (C) 2009 by Nick Kostirya

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.8 or, at your option, any later version of Perl 5 you may have available.