NAME
POE::Component::MessageQueue - A POE message queue that uses STOMP for the communication protocol
SYNOPSIS
use POE;
use POE::Component::Logger;
use POE::Component::MessageQueue;
use POE::Component::MessageQueue::Storage::Complex;
use strict;
my $DATA_DIR = '/tmp/perl_mq';
# we create a logger, because a production message queue would
# really need one.
POE::Component::Logger->spawn(
ConfigFile => 'log.conf',
Alias => 'mq_logger'
);
POE::Component::MessageQueue->new({
port => 61613, # Optional.
address => '127.0.0.1', # Optional.
hostname => 'localhost', # Optional.
domain => AF_INET, # Optional.
logger_alias => 'mq_logger', # Optional.
# Required!!
storage => POE::Component::MessageQueue::Storage::Complex->new({
data_dir => $DATA_DIR,
timeout => 2
})
});
POE::Kernel->run();
exit;
DESCRIPTION
This module implements a message queue [1] on top of POE that communicates via the STOMP protocol [2].
There exist a few good Open Source message queues, most notably ActiveMQ [3] which is written in Java. It provides more features and flexibility than this one (while still implementing the STOMP protocol), however, it was (at the time I last used it) very unstable. With every version there was a different mix of memory leaks, persistence problems, STOMP bugs, and file descriptor leaks. Due to its complexity I was unable to be very helpful in fixing any of these problems, so I wrote this module!
This component distinguishes itself in a number of ways:
No OS threads, its asynchronise. (Thanks to POE!)
Persistence was a high priority.
A strong effort is put to low memory and high performance.
Message storage can be provided by a number of different backends.
STORAGE
When creating an instance of this component you must pass in a storage object so that the message queue knows how to store its messages. There are four storage backends provided with this distribution. See their individual documentation for usage information. Here is a quick break down:
POE::Component::MessageQueue::Storage::Memory -- The simplest storage backend. It keeps messages in memory and provides absolutely no presistence.
POE::Component::MessageQueue::Storage::DBI -- Uses Perl DBI to store messages. Not recommended to use directly because message body doesn't belong in the database. All messages are stored persistently.
POE::Component::MessageQueue::Storage::FileSystem -- Builds on top of the DBI backend above but stores the message body on the filesystem. All messages are stored persistently regardless of whether a message has the persistent flag set or not.
POE::Component::MessageQueue::Storage::Complex -- A combination of the Memory and FileSystem modules above. It will keep messages in Memory and move them into FileSystem after a given number of seconds. The FileSystem backend is configured to use SQLite2. It is capable of correctly handling a messages persistent flag. This is the recommended storage backend and should provide the best performance when both providers and consumers are connected to the queue at the same time.
CONSTRUCTOR PARAMETERS
- storage => SCALAR
-
The only required parameter. Sets the object that the message queue should use for message storage. This must be an object that follows the interface of POE::Component::MessageQueue::Storage but doesn't necessarily need to be a child of that class.
- port => SCALAR
-
The optional port to listen on. If none is given, we use 61613 by default.
- address => SCALAR
-
The option interface address to bind to. It defaults to INADDR_ANY or INADDR6_ANY when using IPv4 or IPv6, respectively.
- hostname => SCALAR
-
The optional name of the interface to bind to. This will be converted to the IP and used as if you set address instead. If you set both hostname and address, address will override this value.
- domain => SCALAR
-
Optionally specifies the domain within which communication will take place. Defaults to AF_INET.
- logger_alias => SCALAR
-
Opitionally set the alias of the POE::Component::Logger object that you want the message queue to log to. If no value is given, log information is simply printed to STDERR.
REFERENCES
- [1]
-
http://en.wikipedia.org/Message_Queue -- General information about message queues
- [2]
-
http://stomp.codehaus.org/Protocol -- The informal "spec" for the STOMP protocol
- [3]
-
http://www.activemq.org/ -- ActiveMQ is a popular Java-based message queue
FUTURE
The goal of this module is not to support every possible feature but rather to be small, simple, efficient and robust. So, for the most part expect only incremental changes to address those areas. Other than that, here are some things I would like to implement someday in the future:
Full support for the STOMP protocol.
Topics a la "topic://" in ActiveMQ.
Some kind of security based on username/password.
Optional add on module via POE::Component::IKC::Server that allows to introspect the state of the message queue.
SEE ALSO
External modules:
POE, POE::Component::Server::Stomp, Net::Stomp, POE::Component::Logger, DBD::SQLite2
Internal modules:
POE::Component::MessageQueue::Storage, POE::Component::MessageQueue::Storage::Memory, POE::Component::MessageQueue::Storage::DBI, POE::Component::MessageQueue::Storage::FileSystem, POE::Component::MessageQueue::Storage::Complex
BUGS
There is a mysterious memory leak I still haven't found. I have narrowed it down to not being in the storage layer but thats about it. That said, I personally am using this in production for thousands of large messages daily and it takes quite a few days to get unreasonably bloated. I do hope to find it but as is said, "Release often -- release early."
AUTHOR
Copyright 2007 David Snopek <dsnopek@gmail.com>