NAME
Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ.
SYNOPSIS
my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
$rb->process({
from_queue => 'test_in',
routing_key => 'test_out',
handler => \&msg_handler,
batch => {
size => 10, # batch size
timeout => 2, #
ignore_size => 0 # ignore in/out batches size mismatch
},
ignore_errors => 0, # ignore handler errors
publish_options => {
exchange => 'exchange_out', # exchange name, default is 'amq.direct'
},
});
sub msg_handler {
my $messages = shift;
# work with 10 messages
return $messages;
}
DESCRIPTION
Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once.
This module:
gets messages from in queue and publish them by routing key
uses your handler to batch process messages
keeps persistency - if processing fails, nothing lost from input queue, nothing published
USAGE
Define a messages handler:
sub msg_handler {
my $messages = shift;
# works with hashref of messages
return $messages;
}
$messages
is an arrayref of message objects:
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
delivery_tag => 1, # (used for acks)
....
# Not all of these will be present. Consult the RabbitMQ reference for more details.
props => { ... }
}
Handler should return arrayref of message objects (only body
is required):
[
{ body => 'Processed message' },
...
]
Connect to RabbitMQ:
my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
And process a batch:
$rb->process({
from_queue => 'test_in',
routing_key => 'test_out',
handler => \&msg_handler,
batch => { size => 10 }
});
You might like to wrap it with while(1) {...}
loop. See process_in_batches.pl or process_in_forked_batches.pl for example.
METHODS
process()
Known Issues
Can not set infinity timeout (use very long int)
No individual messages processing possible
No tests yet which is very sad :(
AUTHORS
Alex Svetkin
LICENSE
MIT