NAME

Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ.

SYNOPSIS

my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
$rb->process({
    from_queue  => 'test_in',
    routing_key => 'test_out',
    handler     => \&msg_handler,
    batch       => {
        size          => 10,        # batch size
        timeout       => 2,         #
        ignore_size   => 0          # ignore in/out batches size mismatch
    },
    ignore_errors => 0,             # ignore handler errors
    publish_options => {
        exchange => 'exchange_out', # exchange name, default is 'amq.direct'
    },
});

sub msg_handler {
    my $messages = shift;
    # work with 10 messages
    return $messages;
}

DESCRIPTION

Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once.

This module:

  • gets messages from in queue and publish them by routing key

  • uses your handler to batch process messages

  • keeps persistency - if processing fails, nothing lost from input queue, nothing published

USAGE

Define a messages handler:

sub msg_handler {
    my $messages = shift;
    # works with hashref of messages
    return $messages;
}

$messages is an arrayref of message objects:

{
    body => 'Magic Transient Payload', # the reconstructed body
    routing_key => 'nr_test_q',        # route the message took
    delivery_tag => 1,                 # (used for acks)
    ....
    # Not all of these will be present. Consult the RabbitMQ reference for more details.
    props => { ... }
}

Handler should return arrayref of message objects (only body is required):

[
    { body => 'Processed message' },
    ...
]

Connect to RabbitMQ:

my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;

And process a batch:

$rb->process({
    from_queue  => 'test_in',
    routing_key => 'test_out',
    handler     => \&msg_handler,
    batch       => { size => 10 }
});

You might like to wrap it with while(1) {...} loop. See process_in_batches.pl or process_in_forked_batches.pl for example.

METHODS

process()

Known Issues

  • Can not set infinity timeout (use very long int)

  • No individual messages processing possible

  • No tests yet which is very sad :(

AUTHORS

Alex Svetkin

LICENSE

MIT