NAME

POE::Component::MessageQueue - A POE message queue that uses STOMP for the communication protocol

SYNOPSIS

use POE;
use POE::Component::Logger;
use POE::Component::MessageQueue;
use POE::Component::MessageQueue::Storage::Complex;
use strict;

my $DATA_DIR = '/tmp/perl_mq';

# we create a logger, because a production message queue would
# really need one.
POE::Component::Logger->spawn(
  ConfigFile => 'log.conf',
  Alias      => 'mq_logger'
);

POE::Component::MessageQueue->new({
  port     => 61613,            # Optional.
  address  => '127.0.0.1',      # Optional.
  hostname => 'localhost',      # Optional.
  domain   => AF_INET,          # Optional.
  
  logger_alias => 'mq_logger',  # Optional.

  # Required!!
  storage => POE::Component::MessageQueue::Storage::Complex->new({
    data_dir     => $DATA_DIR,
    timeout      => 2,
    throttle_max => 2
  })
});

POE::Kernel->run();
exit;

COMMAND LINE

If you are only interested in running with the recommended storage backend and some predetermined defaults, you can use the included command line script.

user$ mq.pl --usage
POE::Component::MessageQueue version 0.1.4
Copyright 2007 David Snopek

mq.pl [--port|-p <num>] [--hostname|-h <host>]
      [--timeout|-i <seconds>]   [--throttle|-T <count>]
      [--data-dir <path_to_dir>] [--log-cont <path_to_file>]
      [--background|-b] [--pidfile|-p <path_to_file>]
      [--version|-v] [--help|-h]

SERVER OPTIONS:
  --port     -p <num>    The port number to listen on (Default: 61613)
  --hostname -h <host>   The hostname of the interface to listen on (Default: localhost)

STORAGE OPTIONS:
  --timeout  -i <secs>   The number of seconds to keep messages in the front-store (Default: 4)
  --throttle -T <count>  The number of messages that can be stored at once before throttling (Default: 2)
  --data-dir <path>      The path to the directory to store data (Default: /var/lib/perl_mq)
  --log-conf <path>      The path to the log configuration file (Default: /etc/perl_mq/log.conf

DAEMON OPTIONS:
  --background -b        If specified the script will daemonize and run in the background
  --pidfile    -p <path> The path to a file to store the PID of the process

OTHER OPTIONS:
  --version    -v        Show the current version.
  --help       -h        Show this usage message

DESCRIPTION

This module implements a message queue [1] on top of POE that communicates via the STOMP protocol [2].

There exist a few good Open Source message queues, most notably ActiveMQ [3] which is written in Java. It provides more features and flexibility than this one (while still implementing the STOMP protocol), however, it was (at the time I last used it) very unstable. With every version there was a different mix of memory leaks, persistence problems, STOMP bugs, and file descriptor leaks. Due to its complexity I was unable to be very helpful in fixing any of these problems, so I wrote this module!

This component distinguishes itself in a number of ways:

  • No OS threads, its asynchronous. (Thanks to POE!)

  • Persistence was a high priority.

  • A strong effort is put to low memory and high performance.

  • Message storage can be provided by a number of different backends.

STORAGE

When creating an instance of this component you must pass in a storage object so that the message queue knows how to store its messages. There are some storage backends provided with this distribution. See their individual documentation for usage information. Here is a quick break down:

CONSTRUCTOR PARAMETERS

storage => SCALAR

The only required parameter. Sets the object that the message queue should use for message storage. This must be an object that follows the interface of POE::Component::MessageQueue::Storage but doesn't necessarily need to be a child of that class.

alias => SCALAR

The session alias to use.

port => SCALAR

The optional port to listen on. If none is given, we use 61613 by default.

address => SCALAR

The option interface address to bind to. It defaults to INADDR_ANY or INADDR6_ANY when using IPv4 or IPv6, respectively.

hostname => SCALAR

The optional name of the interface to bind to. This will be converted to the IP and used as if you set address instead. If you set both hostname and address, address will override this value.

domain => SCALAR

Optionally specifies the domain within which communication will take place. Defaults to AF_INET.

logger_alias => SCALAR

Opitionally set the alias of the POE::Component::Logger object that you want the message queue to log to. If no value is given, log information is simply printed to STDERR.

REFERENCES

[1]

http://en.wikipedia.org/Message_Queue -- General information about message queues

[2]

http://stomp.codehaus.org/Protocol -- The informal "spec" for the STOMP protocol

[3]

http://www.activemq.org/ -- ActiveMQ is a popular Java-based message queue

FUTURE

The goal of this module is not to support every possible feature but rather to be small, simple, efficient and robust. So, for the most part expect only incremental changes to address those areas. Other than that, here are some things I would like to implement someday in the future:

  • Full support for the STOMP protocol.

  • Topics a la "topic://" in ActiveMQ.

  • Some kind of security based on username/password.

  • Optional add on module via POE::Component::IKC::Server that allows to introspect the state of the message queue.

SEE ALSO

External modules:

POE, POE::Component::Server::Stomp, Net::Stomp, POE::Component::Logger, DBD::SQLite, POE::Component::Generic

Internal modules:

POE::Component::MessageQueue::Storage, POE::Component::MessageQueue::Storage::Memory, POE::Component::MessageQueue::Storage::DBI, POE::Component::MessageQueue::Storage::FileSystem, POE::Component::MessageQueue::Storage::Generic, POE::Component::MessageQueue::Storage::Generic::DBI, POE::Component::MessageQueue::Storage::Throttled, POE::Component::MessageQueue::Storage::Complex

BUGS

A ton of debugging work went into the 0.1.3 release.

I recieved a script from a user that would cause a memory leak in 0.1.2. By switching to POE::Component::Generic for the POE::Component::MessageQueue::Storage::DBI module, I was able to eliminate this memory leak.

However! Our message queue in production still appears to steadily increase its memory usage. This could be another memory leak but it is also possible that its just memory fragmentation or the load being so high that the number of throttled messages is getting out of control.

I am unable to recreate this in testing, making it difficult to debug. It only turns up under our production load. If anyone else experiences this problem and can recreate in an reliable way (preferably with something automated like a script), let me know!

That said, we are using this in production in a commercial application for thousands of large messages daily and it takes quite awhile to get unreasonably bloated. Despite its problems, in the true spirit of Open Source and Free Software, I've decided to "release early -- release often."

AUTHOR

Copyright 2007 David Snopek <dsnopek@gmail.com>