NAME

Net::STOMP::Client - STOMP object oriented client module

SYNOPSIS

#
# simple producer
#

use Net::STOMP::Client;

$stomp = Net::STOMP::Client->new(host => "127.0.0.1", port => 61613);
$stomp->connect(login => "guest", passcode => "guest");
$stomp->send(destination => "/queue/test", body => "hello world!");
$stomp->disconnect();

#
# consumer with client side acknowledgment
#

use Net::STOMP::Client;

$stomp = Net::STOMP::Client->new(host => "127.0.0.1", port => 61613);
$stomp->connect(login => "guest", passcode => "guest");
# declare a callback to be called for each received message frame
$stomp->message_callback(sub {
    my($self, $frame) = @_;
    $self->ack(frame => $frame);
    printf("received: %s\n", $frame->body());
    return($self);
});
# subscribe to the given queue
$stomp->subscribe(
    destination => "/queue/test",
    id          => "testsub",          # required in STOMP 1.1
    ack         => "client",           # client side acknowledgment
);
# wait for a specified message frame
$stomp->wait_for_frames(callback => sub {
    my($self, $frame) = @_;
    if ($frame->command() eq "MESSAGE") {
        # stop waiting for new frames if body is "quit"
        return(1) if $frame->body() eq "quit";
    }
    # continue to wait for more frames
    return(0);
});
$stomp->unsubscribe(id => "testsub");
$stomp->disconnect();

DESCRIPTION

This module provides an object oriented client interface to interact with servers supporting STOMP (Streaming Text Orientated Messaging Protocol). It supports the major features of modern messaging brokers: SSL, asynchronous I/O, receipts and transactions.

CONSTRUCTOR

The new() method can be used to create a Net::STOMP::Client object that will later be used to interact with a server. The following attributes are supported:

accept_version

the STOMP version to use (string) or versions to use (reference to a list of strings); this defaults to the list of all supported versions

version

this attribute is obsolete and should not be used anymore, use accept_version instead; it is left here only to provide backward compatibility with Net::STOMP::Client 1.x

uri

the Uniform Resource Identifier (URI) specifying where the STOMP service is and how to connect to it, this can be for instance tcp://msg01:6163 or something more complex, see Net::STOMP::Client::Connection for more information

host

the server name or IP address

port

the port number of the STOMP service

sockopts

arbitrary socket options (as a hash reference) that will be passed to IO::Socket::INET->new() or IO::Socket::SSL->new()

client_heart_beat

the desired client-side heart-beat setting, see Net::STOMP::Client::HeartBeat for more information

server_heart_beat

the desired server-side heart-beat setting, see Net::STOMP::Client::HeartBeat for more information

debug

the debugging flags for this object, see the "DEBUGGING" section for more information

timeout

the maximum time (in seconds) for various operations, see the "TIMEOUTS" section for more information

Upon object creation, a TCP connection is made to the server but no data (i.e. STOMP frame) is exchanged.

DEBUGGING

Net::STOMP::Client uses No::Worries::Log's log_debug() to log debugging information. In addition, to avoid useless data massaging, it also uses a debug string to specify what will be logged using log_debug().

The debug string should contain a list of words describing what to log. For instance, "io" logs I/O information while "io connection" logs both I/O and connection information.

Here are the supported debug words that can be used:

all

everything

api

high-level API calls

body

frame bodies

command

frame commands

connection

connection establishment

frame headers

io

I/O as bytes sent/received

To enable debugging, you must first configure No::Worries::Log so that it indeed reports debugging messages. This can be done with something like:

log_filter("debug");

or, to enable logging only from Net::STOMP::Client modules:

log_filter("debug caller=~^Net::STOMP::Client");

See the No::Worries::Log documentation for more information.

Then, you have to tell Net::STOMP::Client to indeed log what you want to see. This can be done globally for all connections by setting the global variable $Net::STOMP::Client::Debug:

$Net::STOMP::Client::Debug = "connection api";

or per connection via the new() method:

$stomp = Net::STOMP::Client->new(
    uri   => "stomp://mybroker:6163",
    debug => "connection api",
);

TIMEOUTS

By default, when sending STOMP frames, the module waits until the frame indeed has been sent (from the socket point of view). In case the server is stuck or unusable, the module can therefore hang.

When creating the Net::STOMP::Client object, you can pass a timeout attribute to better control how certain operations handle timeouts.

This attribute should contain a reference to hash with the following keys:

connect

TCP-level timeout that will be given to the underlying IO::Socket::INET or IO::Socket::SSL object (default: none)

connected

timeout used while waiting for the initial CONNECTED frame from the broker (default: 10)

disconnect

timeout specifying how long the disconnect() method should wait for a RECEIPT frame back in case the DISCONNECT frame contained a receipt (default: 10)

receive

timeout used while trying to receive any frame (default: none)

send

timeout used while trying to send any frame (default: none)

All values are in seconds. No timeout means wait until the operation succeeds.

As a shortcut, the timeout attribute can also be a scalar. In this case, only the connect and connected operations use this value.

STOMP METHODS

With a Net::STOMP::Client object, the following methods can be used to interact with the server. They match one-to-one the different commands that a client frame can hold:

connect()

connect to server

disconnect()

disconnect from server

subscribe()

subscribe to something

unsubscribe()

unsubscribe from something

send()

send a message somewhere

ack()

acknowledge the reception of a message

nack()

acknowledge the rejection of a message (STOMP >=1.1 only)

begin()

begin/start a transaction

commit()

commit a transaction

abort()

abort/rollback a transaction

All these methods can receive options that will be passed directly as frame headers. For instance:

$stomp->subscribe(
    destination => "/queue/test",
    id          => "testsub",
    ack         => "client",
);

Some methods also support additional options:

send()

body or body_reference: holds the body or body reference of the message to be sent

ack()

frame: holds the MESSAGE frame object to ack

nack()

frame: holds the MESSAGE frame object to nack

Finally, all methods support debug and timeout options that will be given to the send_frame() method called internally to send the crafted frame.

OTHER METHODS

In addition to the STOMP methods, the following ones are also available:

new(OPTIONS)

return a new Net::STOMP::Client object (constructor)

peer()

return a Net::STOMP::Client::Peer object containing information about the connected STOMP server

socket()

return the file handle of the socket connecting the client and the server

server()

return the server header seen on the CONNECTED frame (if any)

session()

return the session identifier if connected or false otherwise

uuid()

return a universal pseudo-unique identifier to be used for instance in receipts and transactions

wait_for_frames()

wait for frames coming from the server, see the next section for more information

noop([timeout => TIMEOUT])

send an empty/noop frame i.e. a single newline byte, using send_frame() underneath

CALLBACKS

Since STOMP is asynchronous (for instance, MESSAGE frames could be sent by the server at any time), Net::STOMP::Client uses callbacks to handle frames. There are in fact two levels of callbacks.

First, there are per-command callbacks that will be called each time a frame is handled (via the internal dispatch_frame() method). Net::STOMP::Client implements default callbacks that should be sufficient for all frames except MESSAGE frames, which should really be handled by the coder. These callbacks should return undef on error, something else on success.

Here is an example with a callback counting the messages received:

$stomp->message_callback(sub {
    my($self, $frame) = @_;
    $MessageCount++;
    return($self);
});

Here are the methods that can be used to get or set these per-command callbacks:

connected_callback([SUBREF])
error_callback([SUBREF])
message_callback([SUBREF])
receipt_callback([SUBREF])

These callbacks are somehow global and it is good practice not to change them during a session. If you do not need a global message callback, you can supply the dummy:

$stomp->message_callback(sub { return(1) });

Then, the wait_for_frames() method takes an optional callback argument holding some code to be called for each received frame, after the per-command callback has been called. This can be seen as a local callback, only valid for the call to wait_for_frames(). This callback must return undef on error, false if more frames are expected or true if wait_for_frames() can now stop waiting for new frames and return.

Here are all the options that can be given to wait_for_frames():

callback

code to be called for each received frame (see above)

timeout

time to wait before giving up, undef means wait forever, this is the default

once

wait only for one frame, within the given timeout

The return value of wait_for_frames() can be: false if no suitable frame has been received, the received frame if there is no user callback or the user callback return value otherwise.

TRANSACTIONS

Here is an example using transactions:

# create a unique transaction id
$tid = $stomp->uuid();
# begin the transaction
$stomp->begin(transaction => $tid);
# send two messages as part of this transaction
$stomp->send(
    destination => "/queue/test1",
    body        => "message 1",
    transaction => $tid,
);
$stomp->send(
    destination => "/queue/test2",
    body        => "message 2",
    transaction => $tid,
);
# commit the transaction
$stomp->commit(transaction => $tid);

LOW-LEVEL API

It should be enough to use the high-level API and use, for instance, the send() method to create a MESSAGE frame and send it in one go.

If you need lower level interaction, you can manipulate frames with the Net::STOMP::Client::Frame module.

You can also use:

$stomp->dispatch_frame(FRAME, [OPTIONS])

dispatch one received frame by calling the appropriate callback; supported options: debug

$stomp->send_frame(FRAME, [OPTIONS])

try to send the given frame object; supported options: timeout and debug

$stomp->send_message(MESSAGE, [OPTIONS])

identical to send_frame() but taking a Messaging::Message object

$stomp->queue_frame(FRAME, [OPTIONS])

add the given frame to the outgoing buffer queue; supported options: debug

$stomp->queue_message(MESSAGE, [OPTIONS])

identical to queue_frame() but taking a Messaging::Message object

$stomp->send_data([OPTIONS])

send all the queued data; supported options: timeout and debug

$stomp->receive_frame([OPTIONS])

try to receive a frame; supported options: timeout and debug

$stomp->receive_data([OPTIONS])

try to receive data (this data will be appended to the incoming buffer); supported options: timeout and debug

$stomp->outgoing_buffer_length()

return the length (in bytes) of the outgoing buffer

$stomp->incoming_buffer_reference()

return a reference to the incoming buffer

In these methods, the timeout option can either be undef (meaning block until it's done) or 0 (meaning do not block at all) or a positive number (meaning block at most this number of seconds).

COMPATIBILITY

This module has been successfully tested against ActiveMQ, Apollo, HornetQ and RabbitMQ brokers.

See Net::STOMP::Client::Version for the list of supported STOMP protocol versions.

SEE ALSO

Messaging::Message, Net::STOMP::Client::Connection, Net::STOMP::Client::Frame, Net::STOMP::Client::HeartBeat, Net::STOMP::Client::Peer, Net::STOMP::Client::Receipt, Net::STOMP::Client::Tutorial, Net::STOMP::Client::Version, No::Worries::Log.

AUTHOR

Lionel Cons http://cern.ch/lionel.cons

Copyright CERN 2010-2012