NAME
Queue::Q::ReliableFIFO::Redis - In-memory Redis implementation of the ReliableFIFO queue
SYNOPSIS
use Queue::Q::ReliableFIFO::Redis;
my $q = Queue::Q::ReliableFIFO::Redis->new(
server => 'myredisserver',
port => 6379,
queue_name => 'my_work_queue',
);
# Producer:
$q->enqueue_item("foo");
# You can pass any JSON-serializable data structure
$q->enqueue_item({ bar => "baz" });
$q->enqueue_item({ id=> 12},{id =>34}); # two items
# get rid of everything in the queue:
$q->flush_queue(); # get a clean state, removes queue
# Consumer:
$q->consumer(\&callback);
$q->consumer(
sub { my $data = shift; print 'Received: ', Dumper($data); });
# Cleanup script
my $action = 'requeue';
while (1) {
my @handled_items = $q->handle_expired_items($timeout, $action);
for my $item (@handled_items) {
printf "%s: item %s, in queue since %s, requeued % times\n",
$action, Dumper($item->data),
scalar localtime $item->time,
$item->requeue_count;
}
sleep(60);
}
# retry items that failed before:
$q->requeue_failed_items();
$q->requeue_failed_items(10); # only the first 10 items
# Nagios?
$q->queue_length();
$q->queue_length('failed');
# Depricated (consumer)
my $item = $q->claim_item;
my @items= $q->claim_item(100);
my $foo = $item->data;
$q->mark_item_as_done($item); # single item
$q->mark_item_as_done(@items); # multiple items
DESCRIPTION
Implements interface defined in Queue::Q::ReliableFIFO: an implementation based on Redis.
The data structures passed to enqueue_item
are serialized using JSON (cf. JSON::XS), so any data structures supported by that can be enqueued. We use JSON because that is supported at the lua side as well (the cjson library).
The implementation is kept very lightweight at the Redis level in order to get a hight throughput. With this implementation it is easy to get a throughput of 10,000 items per second on a single core.
At the Redis side this is basically done at the following events:
- putting an item: lput
- getting an item: (b)rpoplpush
- mark as done: lrem
- mark an item as failed: lrem + lpush
- requeue an item: lrem + lpush (or lrem + rpush)
Note that only exceptions need multiple commands.
To detect hanging items, a cronjob is needed, looking at how long items stay in the busy status.
The queues are implemented as list data structures in Redis. The lists ave as name the queue name plus an extension. The extension is:
_main for the working queue
_busy for the list with items that are claimed but not finished
_failed for the items that failed
There can also be a list with extension "_time" if a cronjob is monitoring how long items are in the busy list (see method handle_expired_items()).
METHODS
Important note: At the Redis level a lost connection will always throw an exception, even if auto-reconnect is switched on. As consequence, the methods that do redis commands, like enqueue_item(), claim_item() and mark_item_as_done(), will throw an exception when the connection to the Redis server is lost. The consume() method handles these exceptions. For other methods you need to catch and handle the exception.
All methods of Queue::Q::ReliableFIFO. Other methods are:
new
Constructor. Takes named parameters. Required parameters are
- the server hostname or address
- the Redis port, and
- the name of the Redis key to use as the queue_name.
Optional parameters are
- a Redis db number to use.
Default value is 0
. - redis_options for connection options
- redis_connection for reusing an existing redis connection
- requeue_limit to specify how often an item is allowed to enter the queu again before ending up in the failed queue.
Default value is 5
. - claim_wait_timeout (in seconds) to specify how long the claim_item() method is allowed to wait before it returns. This applies to the situation with an empty queue. A value of "0" means "wait forever".
Default value is 1
. - busy_expiry_time to specify the threshold (in seconds) after which an item is supposed to get stuck. After this time a follow up strategy should be applied. (Normally done by the handle_expired_items() Method, typically done by a cronjob).
Default value is 30
.
enqueue_item(@items)
Special for the Redis imlementation is that the return value
is the length of the queue after the items are added.
consume(\&callback, $action, %options)
This method is called by the consumer to consume the items of a queue. For each item in the queue, the callback function will be called. The function will receive that data of the queued item as parameter.
The $action parameter is applied when the callback function returns a "die". Allowed values are:
requeue. (
Default
). I.e. do it again, the item will be put at the tail of the queue. The requeue_limit property is the queue indicates the limit to how many times an item can be requeued. The default is 5 times. You can change that by setting by calling the set_queue_limit() method or by passing the property to the constructor. When the requeue limit is reached, the item will go to the failed queue.Note: by setting the queue_limit to "0" you can force the item to go to the "failed" status right away (without being requeued).
drop. Forget about it.
options
Chunk. The Chunk option is used to set a chunk size for number of items to claim and to mark as done in one go. This helps to fight latency.
DieOnError. If this option has a true value, the consumer will stop when the callback function returns a "die" call. Default "false".
MaxItems This can be used to limit the consume method to process only a limited amount of items. This be useful in cases of memory leaks and restarting strategies with cron. Of course this comes with delays in handling the items.
Examples:
$q->consume(\&callback,);
$q->consume(\&callback, 'requeue'); # same, because 'requeue' is default
# takes 50 items a time. Faster because less latency
$q->consume(\&callback, 'requeue', { Chunk => 50 });
@item_obj = $q->handle_expired_items($timeout, $action);
This method can be used by a cleanup job to ensures that items don't stick forever in the busy status. When an item has been in this status for $timeout seconds, the action specified by the $action will be done. The $action parameter is the same as with the consume() method.
The method returns item objects of type Queue::Q::ReliableFIFO::Item which has the item data as well as the time when it was queued at the first time, how often it was requeued.
To set/change the limit of how often an item can be requeued, use the requeue_limit parameter in the new() constructor or use the method set_requeue_limit.
Once an item is moved to the failed queue, the counter is reset. The item can be put back into the main queue by using the requeue_failed_items() method (or via the CLI). Then it will be retried again up to requeue_limit times.
my $count = $q->unclaim(@items)
This puts claimed items back to the queue. It is pushed to the front of the queue so that it can be picked up a.s.a.p. This method is e.g. be used when a chunk of items are claimed but the consumer aborts before all items are processed.
my $count = $q->requeue_busy(@items)
This puts items that are claimed back to the queue so that other consumers can pick this up. In this case the items are put at the back of the queue, so depending the queue length it can take some time before it is available for consumers.
my $count = $q->requeue_failed_items([ $limit ]);
This method will move items from the failed queue to the working queue. The $limit parameter is optional and can be used to move only a subset to the working queue. The number of items actually moved will be the return value.
my $age = $q->age($queue_name [,$type]);
This methods returns the age (in seconds) of the oldest item in the queue. The second parameter ($type) is optional and can be main (default)
my @raw_items = $q->raw_items_busy( [$max_number] );
Returns objects of type Queue::Q::RelaibleFIFO::Item from the busy list. You can limit the number of items by passing the limit to the method.
my @raw_items = $q->raw_items_failed( [$max_number] );
Similar to raw_items_busy() but for failed items.
my @raw_items = $q->raw_items_main( [$max_number] );
Similar to raw_items_busy() but for items in the working queue. Note that the main queue can be large, so a limit is strongly recommended here.
my $memory_usage = $q->memory_usage_perc();
Returns the memory usage percentage of the redis instance where the queue is located.
AUTHOR
Herald van der Breggen, <herald.vanderbreggen@booking.com>
Steffen Mueller, <smueller@cpan.org>
COPYRIGHT AND LICENSE
Copyright (C) 2012 by Steffen Mueller
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.1 or, at your option, any later version of Perl 5 you may have available.