NAME

AnyEvent::Stomper::Cluster - The client for the cluster of STOMP servers

SYNOPSIS

use AnyEvent;
use AnyEvent::Stomper::Cluster;

my $cluster = AnyEvent::Stomper::Cluster->new(
  nodes => [
    { host => 'stomp-server-1.com', port => 61613 },
    { host => 'stomp-server-2.com', port => 61613 },
    { host => 'stomp-server-3.com', port => 61613 },
  ],
  login    => 'guest',
  passcode => 'guest',
);

my $cv = AE::cv;

$cluster->subscribe(
  id          => 'foo',
  destination => '/queue/foo',

  { on_receipt => sub {
      my $err = $_[1];

      if ( defined $err ) {
        warn $err->message . "\n";
        $cv->send;

        return;
      }

      $cluster->send(
        destination => '/queue/foo',
        body        => 'Hello, world!',
      );
    },

    on_message => sub {
      my $msg = shift;

      my $body = $msg->body;
      print "Consumed: $body\n";

      $cv->send;
    },
  }
);

$cv->recv;

DESCRIPTION

AnyEvent::Stomper::Cluster is the client for the cluster of STOMP servers.

CONSTRUCTOR

new( %params )

my $cluster = AnyEvent::Stomper::Cluster->new(
  nodes => [
    { host => 'stomp-server-1.com', port => 61613 },
    { host => 'stomp-server-2.com', port => 61613 },
    { host => 'stomp-server-3.com', port => 61613 },
  ],
  login              => 'guest',
  passcode           => 'guest',
  vhost              => '/',
  heartbeat          => [ 5000, 5000 ],
  connection_timeout => 5,
  reconnect_interval => 5,

  on_node_connect => sub {
    # handling...
  },

  on_node_disconnect => sub {
    # handling...
  },

  on_node_error => sub {
    my $err = shift;

    # error handling...
  },
);
nodes => \@nodes

Specifies the list of nodes. Parameter should contain array of hashes. Each hash should contain host and port elements. The client gets one random node from this list, connects to it and sends all frames to this node. If current active node fails, the client gets next node from the list.

login => $login

The user identifier used to authenticate against a secured STOMP server. Must be the same for all nodes.

passcode => $passcode

The password used to authenticate against a secured STOMP server. Must be the same for all nodes.

vhost => $vhost

The name of a virtual host that the client wishes to connect to. Must be the same for all nodes.

heartbeat => \@heartbeat

Heart-beating can optionally be used to test the healthiness of the underlying TCP connection and to make sure that the remote end is alive and kicking. The first number sets interval in milliseconds between outgoing heart-beats to the node. 0 means, that the client will not send heart-beats. The second number sets interval in milliseconds between incoming heart-beats from the node. 0 means, that the client does not want to receive heart-beats.

heartbeat => [ 5000, 5000 ],

Not set by default.

connection_timeout => $connection_timeout

Specifies connection timeout. If the client could not connect to the node after specified timeout, the on_node_error callback is called with the E_CANT_CONN error. The timeout specifies in seconds and can contain a fractional part.

connection_timeout => 10.5,

By default the client use kernel's connection timeout.

reconnect_interval => $reconnect_interval

If the connection to the node was lost, the client will try to restore the connection when you execute next command. By default reconnection is performed immediately, on next command execution. If the reconnect_interval parameter is specified, the client will try to reconnect only after this interval and commands executed between reconnections will be queued. The client will try to reconnect to every available node before raise the error.

reconnect_interval => 5,

Not set by default.

handle_params => \%params

Specifies AnyEvent::Handle parameters.

handle_params => {
  autocork => 1,
  linger   => 60,
}

Enabling of the autocork parameter can improve performance. See documentation on AnyEvent::Handle for more information.

default_headers => \%headers

Specifies default headers for all outgoing frames.

default_headers => {
  'x-foo' => 'foo_value',
  'x-bar' => 'bar_value',
}
command_headers

Specifies default headers for particular commands.

command_headers => {
  SEND => {
    receipt => 'auto',
  },

  SUBSCRIBE => {
    durable => 'true',
    ack     => 'client',
  },
}
body_encoder => $cb->( $body [, $content_type ] )

Specifies the encode function for the body of the frame. The function accepts two arguments: the body and the content type of the body if it specified in the frame. The function must return the encoded body.

body_encoder => sub {
  return encode_json( $_[0] );
}
body_decoder => $cb->( $body [, $content_type ] )

Specifies the decode function for the body of the frame. The function accepts two arguments: the body and the content type of the body if it specified in the frame. The function must return the decoded body.

body_decoder => sub {
  return decode_json( $_[0] );
}
on_node_connect => $cb->( $host, $port )

The on_node_connect callback is called when the connection to particular node is successfully established. To callback are passed two arguments: host and port of the node to which the client was connected.

Not set by default.

on_node_disconnect => $cb->( $host, $port )

The on_node_disconnect callback is called when the connection to particular node is closed by any reason. To callback are passed two arguments: host and port of the node from which the client was disconnected.

Not set by default.

on_node_error => $cb->( $err, $host, $port )

The on_node_error callback is called when occurred an error, which was affected on entire node (e. g. connection error or authentication error). Also the on_node_error callback can be called on command errors if the command callback is not specified. To callback are passed three arguments: error object, and host and port of the node on which an error occurred.

Not set by default.

COMMAND METHODS

To execute the STOMP command you must call appropriate method. STOMP headers can be specified as command parameters. The client automatically adds content-length header to all outgoing frames. Every command method can also accept two additional parameters: the body parameter where you can specify the body of the frame, and the on_receipt parameter that is the alternative way to specify the command callback.

If you want to receive RECEIPT frame, you must specify receipt header. The receipt header can take the special value auto. If it set, the receipt identificator will be generated automatically by the client. The RECEIPT frame is passed to the command callback in first argument as the object of the class AnyEvent::Stomper::Frame. If the receipt header is not specified the first argument of the command callback will be undef.

For commands SUBSCRIBE, UNSUBSCRIBE, DISCONNECT the client automatically adds receipt header for internal usage.

The command callback is called in one of two cases depending on the presence of the receipt header: when the command was successfully sent to the server or when the RECEIPT frame will be received. If any error occurred during the command execution, the error object is passed to the callback in second argument. Error object is the instance of the class AnyEvent::Stomper::Error.

If you want to track errors on particular nodes for particular command, you must specify on_node_error callback in command method.

$cluster->send(
  destination => '/queue/foo',
  body        => 'Hello, world!',

  on_receipt => sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }

    # receipt handling...
  },

  on_node_error => sub {
    my $err  = shift;
    my $host = shift;
    my $port = shift;

    # error handling...
  }
);

The full list of all available headers for every command you can find in STOMP protocol specification and in documentation on your STOMP server. For various versions of STOMP protocol and various STOMP servers they can be differ.

send( [ %params ] [, $cb->( $receipt, $err ) ] )

Sends a message to a destination in the messaging system.

$cluster->send(
  destination => '/queue/foo',
  body        => 'Hello, world!',
);

$cluster->send(
  destination => '/queue/foo',
  body        => 'Hello, world!',

  sub {
    my $err = $_[1];

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }
  }
);

$cluster->send(
  destination => '/queue/foo',
  receipt     => 'auto',
  body        => 'Hello, world!',

  on_receipt => sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }

    # receipt handling...
  }
);

subscribe( [ %params ] [, $cb->( $msg ) ] )

The method is used to register to listen to a given destination. The subscribe method require the on_message callback, which is called on every received MESSAGE frame from the server. The MESSAGE frame is passed to the on_message callback in first argument as the object of the class AnyEvent::Stomper::Frame. If the subscribe method is called with one callback, this callback will be act as on_message callback.

$cluster->subscribe(
  id          => 'foo',
  destination => '/queue/foo',

  sub {
    my $msg = shift;

    my $headers = $msg->headers;
    my $body    = $msg->body;

    # message handling...
  },
);

$cluster->subscribe(
  id          => 'foo',
  destination => '/queue/foo',
  ack         => 'client',

  on_receipt => sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      return;
    }

    # receipt handling...
  },

  on_message => sub {
    my $msg = shift;

    my $headers = $msg->headers;
    my $body    = $msg->body;

    # message handling...
  }
);

unsubscribe( [ %params ] [, $cb->( $receipt, $err ) ] )

The method is used to remove an existing subscription.

$cluster->unsubscribe(
  id          => 'foo',
  destination => '/queue/foo',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      return;
    }

    # receipt handling...
  }
);

ack( [ %params ] [, $cb->( $receipt, $err ) ] )

The method is used to acknowledge consumption of a message from a subscription using client or client-individual acknowledgment. Any messages received from such a subscription will not be considered to have been consumed until the message has been acknowledged via an ack() method.

$cluster->ack( id => $ack_id );

$cluster->ack(
  id      => $ack_id,
  receipt => 'auto',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...
    }

    # receipt handling...
  }
);

nack( [ %params ] [, $cb->( $receipt, $err ) ] )

The nack method is the opposite of ack method. It is used to tell the server that the client did not consume the message.

$cluster->nack( id => $ack_id );

$cluster->nack(
  id      => $ack_id,
  receipt => 'auto',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...
    }

    # receipt handling...
  }
);

begin( [ %params ] [, $cb->( $receipt, $err ) ] )

The method begin is used to start a transaction.

commit( [ %params ] [, $cb->( $receipt, $err ) ] )

The method commit is used to commit a transaction.

abort([ %params ] [, $cb->( $receipt, $err ) ] )

The method abort is used to roll back a transaction.

disconnect( [ %params ] [, $cb->( $receipt, $err ) ] )

A client can disconnect from the сurrent active node at anytime by closing the socket but there is no guarantee that the previously sent frames have been received by the node. To do a graceful shutdown, where the client is assured that all previous frames have been received by the node, you must call disconnect method and wait for the RECEIPT frame.

execute( $command, [ %params ] [, $cb->( $receipt, $err ) ] )

An alternative method to execute commands. In some cases it can be more convenient.

$cluster->execute( 'SEND',
  destination => '/queue/foo',
  receipt     => 'auto',
  body        => 'Hello, world!',

  sub {
    my $receipt = shift;
    my $err     = shift;

    if ( defined $err ) {
      my $err_msg   = $err->message;
      my $err_code  = $err->code;
      my $err_frame = $err->frame;

      # error handling...

      return;
    }

    # receipt handling...
  }
);

ERROR CODES

Every error object, passed to callback, contain error code, which can be used for programmatic handling of errors. AnyEvent::Stomper::Cluster provides constants for error codes. They can be imported and used in expressions.

use AnyEvent::Stomper::Cluster qw( :err_codes );

Full list of error codes see in documentation on AnyEvent::Stomper.

OTHER METHODS

get_node( $host, $port )

Gets node by host and port.

nodes()

Gets all available nodes.

on_error( [ $callback ] )

Gets or sets the on_error callback.

force_disconnect()

The method for forced disconnection. All uncompleted operations will be aborted.

SEE ALSO

AnyEvent::Stomper

AUTHOR

Eugene Ponizovsky, <ponizovsky@gmail.com>

Sponsored by SMS Online, <dev.opensource@sms-online.com>

COPYRIGHT AND LICENSE

Copyright (c) 2016, Eugene Ponizovsky, SMS Online. All rights reserved.

This module is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

1 POD Error

The following errors were encountered while parsing the POD:

Around line 847:

Non-ASCII character seen before =encoding in 'сurrent'. Assuming UTF-8