NAME
Net::RabbitMQ::Java - interface to the RabbitMQ Java AMQP client library
SYNOPSIS
Net::RabbitMQ::Java->init;
# connect
my $factory = Net::RabbitMQ::Java::Client::ConnectionFactory->new;
$factory->setUsername('guest');
$factory->setPassword('guest');
$factory->setHost("localhost");
my $conn = $factory->newConnection;
my $channel = $conn->createChannel;
# declare exchange and queues
$channel->exchangeDeclare('my-exchange', "direct", 1);
my $queue_name = $channel->queueDeclare->getQueue;
$channel->queueBind($queue_name, 'my-exchange', 'my.routing.key');
# publish
$channel->basicPublish('my-exchange', 'my.routing.key', {}, 'Message contents');
# manage transactions
$channel->txSelect;
$channel->txRollback;
$channel->txCommit;
# consume
my $consumer = Net::RabbitMQ::Java::Client::QueueingConsumer->new($channel);
$channel->basicConsume($queue_name, 0, $consumer);
while (1) {
my $delivery = $consumer->nextDelivery;
print $delivery->getBody, "\n";
$channel->basicAck($delivery->getEnvelope->getDeliveryTag, 0);
}
# set and poll callbacks
$conn->addShutdownListener(sub {
my $e = shift;
print $e->getReason->getMethod->getReplyText, "\n";
});
$channel->setReturnListener(sub {
my ($replyCode, $replyText, $exchange, $routingKey, $properties, $body) = @_;
print "Unroutable message: $body\n";
});
...
Net::RabbitMQ::Java->processCallbacks;
# disconnect
$channel->close;
$conn->close;
ABSTRACT
This module provides full bindings for the AMQP RabbitMQ Java library. It is based on Inline::Java and it exposes all of the classes and interfaces of the original library. You should refer to the original documentation in order to understand how to do the various AMQP tasks and to check the exact method signatures:
- http://www.rabbitmq.com/api-guide.html
- http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.3.1/rabbitmq-java-client-javadoc-2.3.1/
You can also have a look at the test suite to get started.
This distribution ships the RabbitMQ client library, so you don't need to download it yourself. The module version number represents the library version. If a newer library is available from the RabbitMQ team and this distribution wasn't updated, you can use it (see the CLASSPATH
option below).
Don't be scared by the "Java" thing. Using this module is quite easy: if you have Inline::Java installed, it just works. To install Inline::Java you only need to have Java SDK installed in your system (no more difficult than a quick apt-get install openjdk-6-jdk
, probably).
Yet another RabbitMQ module?
Yes. At the time of writing, CPAN offers incomplete or unmaintained modules. Some do not support recent AMQP specs such as 0-9-1, others do not support features like returned messages. This is not criticism, though. Writing and maintaining an AMQP module is probably not easy, given the complexity of the protocol, the variety of broker implementations and different spec versions, so I understand that it's difficult to develop and maintain a robust Perl implementation. I believe that an optimal solution would be a module with XS bindings to an AMQP C/C++ library. However, there seem to be no stable or widely-adopted C/C++ libraries, so I decided to build an interface to the Java client library developed by the RabbitMQ team, which appears to be the most actively maintained library.
INITIALIZATION
Before using AMQP classes you have to initialize the library:
use Net::RabbitMQ::Java;
Net::RabbitMQ::Java->init;
This will load the Java code, start the background JVM and populate the Net::RabbitMQ::Java::
namespace with the loaded classes. If you want fine-grained configuration over Inline::Java behaviour, you can pass arguments to init
:
Net::RabbitMQ::Java->init(JNI => 1);
So, if you want to use a custom client library JAR (instead of the one shipped with this module), just populate the CLASSPATH
option:
my $path = '/path/to/your/libraries';
Net::RabbitMQ::Java->init(
CLASSPATH => "$path/rabbitmq-client.jar:$path/commons-io-1.2.jar",
);
AVAILABLE CLASSES
There are few classes you need to instantiate directly:
CALLING METHODS
See the client library original documentation to learn about method signatures. This module will take care of casting data types. You only need to take care of the number of arguments which must match what the library is expecting, even if you want to pass null values:
$channel->queueDeclare($name, 1, 0, 0, undef);
In Perl you could omit the last argument, but since we're talking to Java you must provide the exact number of arguments described in docs as it's needed to identify which signature are you calling the method with (for the non-Java-savvy people out there: this is why many methods are listed multiple times with different argument lists).
AUGMENTED METHODS
In order to provide you with a better interface to the underlying library, some methods are overloaded and augmented. Thus, for these methods you should combine the RabbitMQ client library docs with the following instructions:
- $ConnectionFactory->setClientProperties( HASHREF )
-
You can pass a Perl hashref to this method.
- HASHREF = $ConnectionFactory->getClientProperties()
- HASHREF = $Connection->getClientProperties()
- HASHREF = $Connection->getServerProperties()
- HASHREF = $BasicProperties->getHeaders()
-
These methods return a Perl hashref.
- $Channel->exchangeBind( ..., HASHREF )
- $Channel->exchangeDeclare( ..., HASHREF )
- $Channel->exchangeUnbind( ..., HASHREF )
- $Channel->queueBind( ..., HASHREF )
- $Channel->queueDeclare( ..., HASHREF )
- $Channel->queueUnbind( ..., HASHREF )
-
The
arguments
argument (which is the last one when you call these methods with a signature that requires it) can be a Perl hashref. - $Channel->basicConsume( ..., HASHREF, $consumer )
-
When called with the 7-arguments signature, the next-to-last one can be passed as a Perl hashref.
- $Channel->basicPublish( ..., HASHREF, $body )
-
Next-to-last argument (which the docs require to be an AMQP.BasicProperties object) can be passed as a hashref containing the following keys:
{ contentType => '', contentEncoding => '', headers => {}, # hashref deliveryMode => 1, # 1 = non-persistent, 2 = persistent priority => 0, correlationId => '', replyTo => '', expiration => '', messageId => '', timestamp => 1271857990, # this can also be a DateTime object type => '', userId => '', appId => '', clusterId => '', }
Note that
getProperties()
methods don't return hashrefs. They returnBasicProperties
objects (see Java library docs), so you can call accessor methods on them:my $reply_key = $delivery->getProperties->getReplyTo;
CALLBACKS
This module provides some glue to use Perl code as callbacks for reacting to events thrown by the RabbitMQ client library. The library itself is multi-threading; however there's currently no way to share Java objects between multiple Perl threads, so your application will need to have one connection per Perl thread. Thus, your callbacks will be executed in a single-threaded environment as soon as you want. The Java library will catch the events in the background and will put them in a queue so that you can poll from Perl using the following command:
Net::RabbitMQ::Java->processCallbacks();
Note that this is a non-blocking call! It will execute any callbacks available in the internal queue. If there are no callbacks to execute, it will return immediately.
Each callback setter method returns a reference to the callback too, so you can process callbacks for individual listeners too:
my $cb = $channel->setReturnListener(sub { ... });
...
$cb->process;
Handling returned messages
If a message is published with the "mandatory" or "immediate" flags set, but cannot be delivered, the broker will return it to the sending client (via a AMQP.Basic.Return
command). To be notified of such returns, clients can set up a callback using the following syntax:
my $cb = $channel->setReturnListener(sub {
my ($replyCode, $replyText, $exchange, $routingKey, $properties, $body) = @_;
...
});
Warning: if you call $cb->process()
right after publishing a message with $channel->basicPublish()
, you likely won't catch an eventual return as the server may take some time to send it (milliseconds or even seconds). So it's up to you to poll frequently for callbacks. You could use an event-driven environment such as POE or Reflex to schedule regular calls to $cb->process()
or processCallbacks()
.
Another solution is to wrap your basicPublish()
in a transaction:
$channel->txSelect;
$channel->basicPublish(...);
$channel->txCommit;
$cb->process;
If you don't need a transaction, note that calling txSelect
and txCommit
add a significant server overhead due to disk processing and so on, but this lets you ensure that by calling $cb->process()
immediately you will catch an eventual return. The reason for this relies in the wire traffic order enforced by the transaction commit: the server sends the tx.commit-ok
response after having sent any basic.return
frame, so when the txCommit()
methods returns, the AMQP library has already processed the return frame and enqueued the callback. (I actually haven't checked whether this ordering is enforced by the AMQP specs or is just RabbitMQ's implementation.)
Requesting publisher confirms
RabbitMQ extended the AMTP protocol with a feature that lets clients request explicit confirmation for published messages, without the need to initiate a transaction (see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/).
You do this by assigning a callback with the following syntax:
my $confirmed = 0;
my $cb = $channel->setConfirmListener(sub {
my ($type, $deliveryTag, $multiple) = @_;
$confirmed = 1;
warn 'Message lost!' if $type eq 'nack';
});
$channel->confirmSelect;
$channel->basicPublish(...);
$cb->process while !$confirmed;
The first argument passed to the callback is ack
or nack
depending on the kind of event notified by the server (consult RabbitMQ docs for the semantics of these). Note that you will get a confirm for every single message published, so you should poll (i.e. call $cb->process()
or processCallbacks
) until you've got enough confirms:
my $toConfirm = 0;
my $cb = $channel->setConfirmListener(sub { $toConfirm-- });
$channel->confirmSelect;
... # publish your messages and increase $toConfirm for each one
$cb->process while $toConfirm > 0;
Registering a shutdown handler
The client library will fire a shutdown event whenever a connection or a channel is closed by the server or due to a communication failure (see the Core API Guide on RabbitMQ website linked above). To handle such events you can register a callback using the following syntax:
my $cb = $conn->addShutdownListener(sub {
my $cause = shift;
...
});
my $cb2 = $channel->addShutdownListener(sub {
my $cause = shift;
...
});
You can add as many callbacks as you need. The first argument is a ShutdownSignalException
as documented in the client library Java docs, that you can query to get error messages and so on.
Remember to call $cb->process()
or processCallbacks()
often.
To remove a shutdown listener you can use the following method:
$conn->removeShutdownListener($cb->getListener);
$channel->removeShutdownListener($cb2->getListener);
EXCEPTIONS
Net::RabbitMQ::Java will throw exceptions just as documented in the Java client docs. You can catch them as normal Perl exceptions:
my $conn = eval { $factory->newConnection };
if ($@) {
if ($@->isa('Net::RabbitMQ::Java::Client::PossibleAuthenticationFailureException')) {
die "Authentication failed";
} else {
...
}
}
(Hint: use a module like Try::Tiny or TryCatch to catch your exceptions without surprises.)
Note that you should the isa()
method instead of doing ref $@
, because the resulting package name might have a different namespace than Net::RabbitMQ::Java::Client::
. Otherwise you could use a regexp and omit the namespace:
if ($@ =~ /PossibleAuthenticationFailureException$/) {
...
}
TODO
Some things will need future work:
- Implementation of the RpcClient class
-
It should be enabled explicitely by the user, so that we don't load an extra Java class that we don't use.
- Full async support
-
An async mode would be useful to allow implementation with event-based frameworks such as POE and others. This requires a non-blocking behaviour of AMQP sync commands. It could be achieved by extending the current callback system: a method call like
txCommit()
should accept a coderef as last argument and return immediately; the Java client library should receive thetx.commit-ok
response in a separate Java thread and enqueue the callback call. - Compile java Helper code at install time
-
This would speed up start-up.
- Provide named arguments to all methods
-
This should be done at least on Channel methods. This module should then decide which Java signature to call.
SEE ALSO
BUGS
Please report any bugs to bug-net-rabbitmq-java@rt.cpan.org
, or through the web interface at https://rt.cpan.org/Public/Bug/Report.html?Queue=Net-RabbitMQ-Java. The author will be happy to read your feedback.
AUTHOR
Alessandro Ranellucci <aar@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2011 by Alessandro Ranellucci.
This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
This distribution includes the RabbitMQ Java client library which is dual-licensed under the MPL and the GPL v2. It also includes the commons-io library which is licensed under the Apache Licence v2. If you have any questions or concerns regarding licensing, contact the distribution maintainer.