NAME
Net::RabbitMQ::Client - RabbitMQ client (XS for librabbitmq)
SYNOPSIS
Simple API:
use utf8;
use strict;
use Net::RabbitMQ::Client;
produce();
consume();
sub produce {
my $simple = Net::RabbitMQ::Client->sm_new(
host => "best.host.for.rabbitmq.net",
login => "login", password => "password",
exchange => "test_ex", exchange_type => "direct", exchange_declare => 1,
queue => "test_queue", queue_declare => 1
);
die sm_get_error_desc($simple) unless ref $simple;
my $sm_status = $simple->sm_publish('{"say": "hello"}', content_type => "application/json");
die sm_get_error_desc($sm_status) if $sm_status;
$simple->sm_destroy();
}
sub consume {
my $simple = Net::RabbitMQ::Client->sm_new(
host => "best.host.for.rabbitmq.net",
login => "login", password => "password",
queue => "test_queue"
);
die sm_get_error_desc($simple) unless ref $simple;
my $sm_status = $simple->sm_get_messages(sub {
my ($self, $message) = @_;
print $message, "\n";
1; # it is important to return 1 (send ask) or 0
});
die sm_get_error_desc($sm_status) if $sm_status;
$simple->sm_destroy();
}
Base API:
use utf8;
use strict;
use Net::RabbitMQ::Client;
produce();
consume();
sub produce {
my $rmq = Net::RabbitMQ::Client->create();
my $exchange = "test";
my $routingkey = "";
my $messagebody = "lalala";
my $channel = 1;
my $conn = $rmq->new_connection();
my $socket = $rmq->tcp_socket_new($conn);
my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
die "Can't create socket" if $status;
$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq->channel_open($conn, $channel);
die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
$rmq->queue_bind($conn, 1, "test_q", $exchange, $routingkey, 0);
die "Can't bind queue" if $status != AMQP_RESPONSE_NORMAL;
my $props = $rmq->type_create_basic_properties();
$rmq->set_prop__flags($props, AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG);
$rmq->set_prop_content_type($props, "text/plain");
$rmq->set_prop_delivery_mode($props, AMQP_DELIVERY_PERSISTENT);
$status = $rmq->basic_publish($conn, $channel, $exchange, $routingkey, 0, 0, $props, $messagebody);
if($status != AMQP_STATUS_OK) {
print "Can't send message\n";
}
$rmq->type_destroy_basic_properties($props);
$rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
$rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
$rmq->destroy_connection($conn);
}
sub consume {
my $rmq = Net::RabbitMQ::Client->create();
my $exchange = "test";
my $routingkey = "";
my $messagebody = "lalala";
my $channel = 1;
my $conn = $rmq->new_connection();
my $socket = $rmq->tcp_socket_new($conn);
my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
die "Can't create socket" if $status;
$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq->channel_open($conn, $channel);
die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq->basic_consume($conn, 1, "test_q", undef, 0, 1, 0);
die "Consuming" if $status != AMQP_RESPONSE_NORMAL;
my $envelope = $rmq->type_create_envelope();
while (1)
{
$rmq->maybe_release_buffers($conn);
$status = $rmq->consume_message($conn, $envelope, 0, 0);
last if $status != AMQP_RESPONSE_NORMAL;
print "New message: \n", $rmq->envelope_get_message_body($envelope), "\n";
$rmq->destroy_envelope($envelope);
}
$rmq->type_destroy_envelope($envelope);
$rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
$rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
$rmq->destroy_connection($conn);
}
DESCRIPTION
This is binding for RabbitMQ-C library.
Please, before install this module make RabbitMQ-C library.
See https://github.com/alanxz/rabbitmq-c
https://github.com/lexborisov/perl-net-rabbitmq-client
METHODS
Simple API
sm_new
my $simple = Net::RabbitMQ::Client->sm_new(
host => '', # default:
port => 5672, # default: 5672
channel => 1, # default: 1
login => '', # default:
password => '', # default:
exchange => undef, # default: undef
exchange_type => undef, # optional, if exchange_declare == 1; types: topic, fanout, direct, headers; default: undef
exchange_declare => 0, # declare exchange or not; 1 or 0; default: 0
queue => undef, # default: undef
routingkey => '', # default:
queue_declare => 0, # declare queue or not; 1 or 0; default: 0
);
Return: a Simple object if successful, otherwise an error occurred
sm_publish
my $sm_status = $simple->sm_publish($text,
# default args
content_type => "text/plain",
delivery_mode => AMQP_DELIVERY_PERSISTENT,
_flags => AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG
);
Return: 0 if successful, otherwise an error occurred
sm_get_messages
Loop to get messages
my $callback = {
my ($simple, $message) = @_;
1; # it is important to return 1 (send ask) or 0
}
my $sm_status = $simple->sm_get_messages($callback);
Return: 0 if successful, otherwise an error occurred
sm_get_message
Get one message
my $sm_status = 0;
my $message = $simple->sm_get_message($sm_status);
Return: message if successful
sm_get_rabbitmq
my $rmq = $simple->sm_get_rabbitmq();
Return: RabbitMQ Base object
sm_get_connection
my $conn = $simple->sm_get_connection();
Return: Connection object (from Base API new_connection)
sm_get_socket
my $socket = $simple->sm_get_socket();
Return: Socket object (from Base API tcp_socket_new)
sm_get_config
my $config = $simple->sm_get_config();
Return: Config when creating a Simple object
sm_get_config
my $description = $simple->sm_get_error_desc($sm_error_code);
Return: Error description by Simple error code
sm_destroy
Destroy a Simple object
$simple->sm_destroy();
Base API
Connection and Authorization
create
my $rmq = Net::RabbitMQ::Client->create();
Return: rmq
new_connection
my $amqp_connection_state_t = $rmq->new_connection();
Return: amqp_connection_state_t
tcp_socket_new
my $amqp_socket_t = $rmq->tcp_socket_new($conn);
Return: amqp_socket_t
socket_open
my $status = $rmq->socket_open($socket, $host, $port);
Return: status
socket_open_noblock
my $status = $rmq->socket_open_noblock($socket, $host, $port, $struct_timeout);
Return: status
login
my $status = $rmq->login($conn, $vhost, $channel_max, $frame_max, $heartbeat, $sasl_method);
Return: status
channel_open
my $status = $rmq->channel_open($conn, $channel);
Return: status
socket_get_sockfd
my $res = $rmq->socket_get_sockfd($socket);
Return: variable
get_socket
my $amqp_socket_t = $rmq->get_socket($conn);
Return: amqp_socket_t
channel_close
my $status = $rmq->channel_close($conn, $channel, $code);
Return: status
connection_close
my $status = $rmq->connection_close($conn, $code);
Return: status
destroy_connection
my $status = $rmq->destroy_connection($conn);
Return: status
SSL
ssl_socket_new
my $amqp_socket_t = $rmq->ssl_socket_new($conn);
Return: amqp_socket_t
ssl_socket_set_key
my $status = $rmq->ssl_socket_set_key($socket, $cert, $key);
Return: status
set_initialize_ssl_library
$rmq->set_initialize_ssl_library($do_initialize);
ssl_socket_set_cacert
my $status = $rmq->ssl_socket_set_cacert($socket, $cacert);
Return: status
ssl_socket_set_key_buffer
my $status = $rmq->ssl_socket_set_key_buffer($socket, $cert, $key, $n);
Return: status
ssl_socket_set_verify
$rmq->ssl_socket_set_verify($socket, $verify);
Basic Publish/Consume
basic_publish
my $status = $rmq->basic_publish($conn, $channel, $exchange, $routing_key, $mandatory, $immediate, $properties, $body);
Return: status
basic_consume
my $status = $rmq->basic_consume($conn, $channel, $queue, $consumer_tag, $no_local, $no_ack, $exclusive);
Return: status
basic_get
my $status = $rmq->basic_get($conn, $channel, $queue, $no_ack);
Return: status
basic_ack
my $status = $rmq->basic_ack($conn, $channel, $delivery_tag, $multiple);
Return: status
basic_nack
my $status = $rmq->basic_nack($conn, $channel, $delivery_tag, $multiple, $requeue);
Return: status
basic_reject
my $status = $rmq->basic_reject($conn, $channel, $delivery_tag, $requeue);
Return: status
Consume
consume_message
my $status = $rmq->consume_message($conn, $envelope, $struct_timeout, $flags);
Return: status
Queue
queue_declare
my $status = $rmq->queue_declare($conn, $channel, $queue, $passive, $durable, $exclusive, $auto_delete);
Return: status
queue_bind
my $status = $rmq->queue_bind($conn, $channel, $queue, $exchange, $routing_key);
Return: status
queue_unbind
my $status = $rmq->queue_unbind($conn, $channel, $queue, $exchange, $routing_key);
Return: status
Exchange
exchange_declare
my $status = $rmq->exchange_declare($conn, $channel, $exchange, $type, $passive, $durable, $auto_delete, $internal);
Return: status
Envelope
envelope_get_redelivered
my $res = $rmq->envelope_get_redelivered($envelope);
Return: variable
envelope_get_channel
my $res = $rmq->envelope_get_channel($envelope);
Return: variable
envelope_get_exchange
my $res = $rmq->envelope_get_exchange($envelope);
Return: variable
envelope_get_routing_key
my $res = $rmq->envelope_get_routing_key($envelope);
Return: variable
destroy_envelope
$rmq->destroy_envelope($envelope);
envelope_get_consumer_tag
my $res = $rmq->envelope_get_consumer_tag($envelope);
Return: variable
envelope_get_delivery_tag
my $res = $rmq->envelope_get_delivery_tag($envelope);
Return: variable
envelope_get_message_body
my $res = $rmq->envelope_get_message_body($envelope);
Return: variable
Types
type_create_envelope
my $amqp_envelope_t = $rmq->type_create_envelope();
Return: amqp_envelope_t
type_destroy_envelope
$rmq->type_destroy_envelope($envelope);
type_create_timeout
my $struct_timeval = $rmq->type_create_timeout($timeout_sec);
Return: struct_timeval
type_destroy_timeout
$rmq->type_destroy_timeout($timeout);
type_create_basic_properties
my $amqp_basic_properties_t = $rmq->type_create_basic_properties();
Return: amqp_basic_properties_t
type_destroy_basic_properties
my $status = $rmq->type_destroy_basic_properties($props);
Return: status
For a Basic Properties
set_prop_app_id
$rmq->set_prop_app_id($props, $value);
set_prop_content_type
$rmq->set_prop_content_type($props, $value);
set_prop_reply_to
$rmq->set_prop_reply_to($props, $value);
set_prop_priority
$rmq->set_prop_priority($props, $priority);
set_prop__flags
$rmq->set_prop__flags($props, $flags);
set_prop_user_id
$rmq->set_prop_user_id($props, $value);
set_prop_delivery_mode
$rmq->set_prop_delivery_mode($props, $delivery_mode);
set_prop_message_id
$rmq->set_prop_message_id($props, $value);
set_prop_timestamp
$rmq->set_prop_timestamp($props, $timestamp);
set_prop_cluster_id
$rmq->set_prop_cluster_id($props, $value);
set_prop_correlation_id
$rmq->set_prop_correlation_id($props, $value);
set_prop_expiration
$rmq->set_prop_expiration($props, $value);
set_prop_type
$rmq->set_prop_type($props, $value);
set_prop_content_encoding
$rmq->set_prop_content_encoding($props, $value);
Other
data_in_buffer
my $amqp_boolean_t = $rmq->data_in_buffer($conn);
Return: amqp_boolean_t
maybe_release_buffers
$rmq->maybe_release_buffers($conn);
error_string
my $res = $rmq->error_string($error);
Return: variable
DESTROY
undef $rmq;
Free mem and destroy object.
AUTHOR
Alexander Borisov <lex.borisov@gmail.com>
COPYRIGHT AND LICENSE
This software is copyright (c) 2015 by Alexander Borisov.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.
See librabbitmq license and COPYRIGHT https://github.com/alanxz/rabbitmq-c