NAME

Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client

SYNOPSIS

use Mojo::RabbitMQ::Client;

# Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
my $client = Mojo::RabbitMQ::Client->new(
  url => 'amqp://guest:guest@127.0.0.1:5672/');

# Catch all client related errors
$client->catch(sub { warn "Some error caught in client"; });

# When connection is in Open state, open new channel
$client->on(
  open => sub {
    my ($client) = @_;

    # Create a new channel with auto-assigned id
    my $channel = Mojo::RabbitMQ::Client::Channel->new();

    $channel->catch(sub { warn "Error on channel received"; });

    $channel->on(
      open => sub {
        my ($channel) = @_;
        $channel->qos(prefetch_count => 1)->deliver;

        # Publish some example message to test_queue
        my $publish = $channel->publish(
          exchange    => 'test',
          routing_key => 'test_queue',
          body        => 'Test message',
          mandatory   => 0,
          immediate   => 0,
          header      => {}
        );
        # Deliver this message to server
        $publish->deliver;

        # Start consuming messages from test_queue
        my $consumer = $channel->consume(queue => 'test_queue');
        $consumer->on(message => sub { say "Got a message" });
        $consumer->deliver;
      }
    );
    $channel->on(close => sub { $log->error('Channel closed') });

    $client->open_channel($channel);
  }
);

# Start connection
$client->connect();

# Start Mojo::IOLoop if not running already
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

CONSUMER

use Mojo::RabbitMQ::Client;
my $consumer = Mojo::RabbitMQ::Client->consumer(
  url      => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
  defaults => {
    qos      => {prefetch_count => 1},
    queue    => {durable        => 1},
    consumer => {no_ack         => 0},
  }
);

$consumer->catch(sub { die "Some error caught in Consumer" } );
$consumer->on('success' => sub { say "Consumer ready" });
$consumer->on(
  'message' => sub {
    my ($consumer, $message) = @_;

    $consumer->channel->ack($message)->deliver;
  }
);

Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

PUBLISHER

use Mojo::RabbitMQ::Client;
my $publisher = Mojo::RabbitMQ::Client->publisher(
  url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo'
);

$publisher->catch(sub { die "Some error caught in Publisher" } );
$publisher->on('success' => sub { say "Publisher ready" });

$publisher->publish('plain text');
$publisher->publish({encode => { to => 'json'}});

Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

DESCRIPTION

Mojo::RabbitMQ::Client is a rewrite of AnyEvent::RabbitMQ to work on top of Mojo::IOLoop.

EVENTS

Mojo::RabbitMQ::Client inherits all events from Mojo::EventEmitter and can emit the following new ones.

connect

$client->on(connect => sub {
  my ($client, $stream) = @_;
  ...
});

Emitted when TCP/IP connection with RabbitMQ server is established.

open

$client->on(open => sub {
  my ($client) = @_;
  ...
});

Emitted AMQP protocol Connection.Open-Ok method is received.

close

$client->on(close => sub {
  my ($client) = @_;
  ...
});

Emitted on reception of Connection.Close-Ok method.

disconnect

$client->on(close => sub {
  my ($client) = @_;
  ...
});

Emitted when TCP/IP connection gets disconnected.

ATTRIBUTES

Mojo::RabbitMQ::Client has following attributes.

url

my $url = $client->url;
$client->url('rabbitmq://...');

heartbeat_timeout

my $timeout = $client->heartbeat_timeout;
$client->heartbeat_timeout(180);

METHODS

Mojo::RabbitMQ::Client inherits all methods from Mojo::EventEmitter and implements the following new ones.

connect

$client->connect();

Tries to connect to RabbitMQ server and negotiate AMQP protocol.

close

$client->close();

open_channel

my $channel = Mojo::RabbitMQ::Client::Channel->new();
...
$client->open_channel($channel);

delete_channel

my $removed = $client->delete_channel($channel->id);

SEE ALSO

Mojo::RabbitMQ::Client::Channel, Mojo::RabbitMQ::Client::Consumer, Mojo::RabbitMQ::Client::Publisher

COPYRIGHT AND LICENSE

Copyright (C) 2015-2016, Sebastian Podjasek and others

Based on AnyEvent::RabbitMQ - Copyright (C) 2010 Masahito Ikuta, maintained by bobtfish@bobtfish.net

This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.