NAME
DR::TarantoolQueue - client for tarantool's queue
SYNOPSIS
my $queue = DR::TarantoolQueue->new(
host => 'tarantool.host',
port => 33014,
tube => 'request_queue',
space => 11,
connect_opts => { # see perldoc DR::Tarantool
reconnect_period => 1,
reconnect_always => 1
}
);
# put empty task into queue with name 'request_queue'
my $task = $queue->put;
my $task = $queue->put(data => [ 1, 2, 3 ]);
printf "task.id = %s\n", $task->id;
DESCRIPTION
The module contains sync and async (coro) driver for tarantool queue.
ATTRIBUTES
host (ro) & port (ro)
Tarantool's parameters.
connect_opts (ro)
Additional options for DR::Tarantool. HashRef.
fake_in_test (ro, default=true)
Start fake tarantool (only for msgpack) if ($0 =~ /\.t$/)
.
For the case the driver uses the following lua code:
log.info('Fake Queue starting')
box.cfg{ listen = os.getenv('PRIMARY_PORT') }
box.schema.user.create('test', { password = 'test' })
box.schema.user.grant('test', 'read,write,execute', 'universe')
_G.queue = require('megaqueue')
queue:init()
log.info('Fake Queue started')
msgpack (ro)
If true, the driver will use DR::Tnt driver (1.6
). Also it will use tarantool-megaqueue lua module with namespace queue
.
coro (ro)
If true (default) the driver will use Coro tarantool's driver, otherwise the driver will use sync driver.
ttl (rw)
Default ttl for tasks.
ttr (rw)
Default ttr for tasks.
pri (rw)
Default pri for tasks.
delay (rw)
Default delay for tasks.
space (rw)
Default space for tasks.
tube (rw)
Default tube for tasks.
defaults
Defaults for queues. HashRef. Key is tube name. Value is a hash with the following fields:
- ttl
- ttr
- delay
- pri
Methods "put" ("urgent") use these parameters if they are absent (otherwise it uses the same global attributes).
METHODS
new
my $q = DR::TarantoolQueue->new(host => 'abc.com', port => 123);
Creates new queue(s) accessor.
dig
$q->dig(task => $task);
$task->dig; # the same
$q->dig(id => $task->id);
$q->dig(id => $task->id, space => $task->space);
'Dig up' a buried task. Checks, that the task is buried. The task status is changed to ready.
unbury
Is a synonym of "dig".
delete
$q->delete(task => $task);
$task->delete; # the same
$q->delete(id => $task->id);
$q->delete(id => $task->id, space => $task->space);
Delete a task from the queue (regardless of task state or status).
peek
$q->peek(task => $task);
$task->peek; # the same
$q->peek(id => $task->id);
$q->peek(id => $task->id, space => $task->space);
Return a task by task id.
statistics
my $s = $q->statistics;
my $s = $q->statistics(space => 123);
my $s = $q->statistics(space => 123, tube => 'abc');
my $s = DR::TarantoolQueue->statistics(space => 123);
my $s = DR::TarantoolQueue->statistics(space => 123, tube => 'abc');
Return queue module statistics, since server start. The statistics is broken down by queue id. Only queues on which there was some activity are included in the output.
get_meta
Task was processed (and will be deleted after the call).
my $m = $q->get_meta(task => $task);
my $m = $q->get_meta(id => $task->id);
Returns a hashref with fields:
- id
-
task id
- tube
-
queue id
- status
-
task status
- event
-
time of the next important event in task life time, for example, when ttl or ttr expires, in microseconds since start of the UNIX epoch.
- ipri
-
internal value of the task priority
- pri
-
task priority as set when the task was added to the queue
- cid
-
consumer id, of the consumer which took the task (only if the task is taken)
- created
-
time when the task was created (microseconds since start of the UNIX epoch)
- ttl
-
task time to live (microseconds)
- ttr
-
task time to run (microseconds)
- cbury
-
how many times the task was buried
- ctaken
-
how many times the task was taken
- now
-
time recorded when the meta was called
Producer methods
put
$q->put;
$q->put(data => { 1 => 2 });
$q->put(space => 1, tube => 'abc',
delay => 10, ttl => 3600,
ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put(data => 'string');
Enqueue a task. Returns new task object. The list of fields with task data (data => ...
) is optional.
If 'space' and (or) 'tube' aren't defined the method will try to use them from queue object.
put_unique
$q->put_unique(data => { 1 => 2 });
$q->put_unique(space => 1, tube => 'abc',
delay => 10, ttl => 3600,
ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put_unique(data => 'string');
Enqueue an unique task. Returns new task object, if it was not enqueued previously. Otherwise it will return existing task. The list of fields with task data (data => ...
) is optional.
If 'space' and (or) 'tube' aren't defined the method will try to use them from queue object.
urgent
Enqueue a task. The task will get the highest priority. If delay is not zero, the function is equivalent to put.
Consumer methods
take
my $task = $q->take;
my $task = $q->take(timeout => 0.5);
my $task = $q->take(space => 1, tube => 'requests, timeout => 20);
If there are tasks in the queue ready for execution, take the highest-priority task. Otherwise, wait for a ready task to appear in the queue, and, as soon as it appears, mark it as taken and return to the consumer. If there is a timeout, and the task doesn't appear until the timeout expires, returns undef. If timeout is not given, waits indefinitely.
All the time while the consumer is working on a task, it must keep the connection to the server open. If a connection disappears while the consumer is still working on a task, the task is put back on the ready list.
ack
$q->ack(task => $task);
$task->ack; # the same
$q->ack(id => $task->id);
$q->ack(space => $task->space, id => $task->id);
Confirm completion of a task. Before marking a task as complete, this function verifies that:
the task is taken
the consumer that is confirming the task is the one which took it
Consumer identity is established using a session identifier. In other words, the task must be confirmed by the same connection which took it. If verification fails, the function returns an error.
On success, deletes the task from the queue. Throws an exception otherwise.
requeue
$q->requeue(task => $task);
$task->requeue; # the same
$q->requeue(id => $task->id);
$q->requeue(id => $task->id, space => $task->space);
Return a task to the queue, the task is not executed. Puts the task at the end of the queue, so that it's executed only after all existing tasks in the queue are executed.
bury
$q->bury(task => $task);
$task->bury; # the same
$q->bury(id => $task->id);
$q->bury(id => $task->id, space => $task->space);
Mark a task as buried. This special status excludes the task from the active list, until it's dug up. This function is useful when several attempts to execute a task lead to a failure. Buried tasks can be monitored by the queue owner, and treated specially.
release
$q->release(task => $task);
$task->release; # the same
$q->release(id => $task->id, space => $task->space);
$q->release(task => $task, delay => 10); # delay the task
$q->release(task => $task, ttl => 3600); # append task's ttl
Return a task back to the queue: the task is not executed. Additionally, a new time to live and re-execution delay can be provided.
done
$q->done(task => $task, data => { result => '123' });
$task->done(data => { result => '123' }); # the same
$q->done(id => $task->id, space => $task->space);
Mark a task as complete (done), but don't delete it. Replaces task data with the supplied data.
COPYRIGHT AND LICENCE
Copyright (C) 2012 by Dmitry E. Oboukhov <unera@debian.org>
Copyright (C) 2012 by Roman V. Nikolaev <rshadow@rambler.ru>
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.8 or, at your option, any later version of Perl 5 you may have available.