NAME

DR::TarantoolQueue - client for tarantool's queue

SYNOPSIS

my $queue = DR::TarantoolQueue->new(
    host    => 'tarantool.host',
    port    => 33014,
    tube    => 'request_queue',
    space   => 11,

    connect_opts => {   # see perldoc DR::Tarantool
        reconnect_period    => 1,
        reconnect_always    => 1
    }
);


# put empty task into queue with name 'request_queue'
my $task = $queue->put;

my $task = $queue->put(data => [ 1, 2, 3 ]);

printf "task.id = %s\n", $task->id;

DESCRIPTION

The module contains sync and async (coro) driver for tarantool queue.

ATTRIBUTES

host (ro) & port (ro)

Tarantool's parameters.

connect_opts (ro)

Additional options for DR::Tarantool. HashRef.

fake_in_test (ro, default=true)

Start fake tarantool (only for msgpack) if ($0 =~ /\.t$/).

For the case the driver uses the following lua code:

log.info('Fake Queue starting')

box.cfg{ listen  = os.getenv('PRIMARY_PORT') }

box.schema.user.create('test', { password = 'test' })
box.schema.user.grant('test', 'read,write,execute', 'universe')

_G.queue = require('megaqueue')
queue:init()

log.info('Fake Queue started')

msgpack (ro)

If true, the driver will use DR::Tnt driver (1.6). Also it will use tarantool-megaqueue lua module with namespace queue.

coro (ro)

If true (default) the driver will use Coro tarantool's driver, otherwise the driver will use sync driver.

ttl (rw)

Default ttl for tasks.

ttr (rw)

Default ttr for tasks.

pri (rw)

Default pri for tasks.

delay (rw)

Default delay for tasks.

space (rw)

Default space for tasks.

tube (rw)

Default tube for tasks.

defaults

Defaults for queues. HashRef. Key is tube name. Value is a hash with the following fields:

ttl
ttr
delay
pri

Methods "put" ("urgent") use these parameters if they are absent (otherwise it uses the same global attributes).

METHODS

new

my $q = DR::TarantoolQueue->new(host => 'abc.com', port => 123);

Creates new queue(s) accessor.

dig

$q->dig(task => $task);
$task->dig; # the same

$q->dig(id => $task->id);
$q->dig(id => $task->id, space => $task->space);

'Dig up' a buried task. Checks, that the task is buried. The task status is changed to ready.

unbury

Is a synonym of "dig".

delete

$q->delete(task => $task);
$task->delete; # the same

$q->delete(id => $task->id);
$q->delete(id => $task->id, space => $task->space);

Delete a task from the queue (regardless of task state or status).

peek

$q->peek(task => $task);
$task->peek; # the same

$q->peek(id => $task->id);
$q->peek(id => $task->id, space => $task->space);

Return a task by task id.

statistics

my $s = $q->statistics;
my $s = $q->statistics(space => 123);
my $s = $q->statistics(space => 123, tube => 'abc');
my $s = DR::TarantoolQueue->statistics(space => 123);
my $s = DR::TarantoolQueue->statistics(space => 123, tube => 'abc');

Return queue module statistics, since server start. The statistics is broken down by queue id. Only queues on which there was some activity are included in the output.

get_meta

Task was processed (and will be deleted after the call).

my $m = $q->get_meta(task => $task);
my $m = $q->get_meta(id => $task->id);

Returns a hashref with fields:

id

task id

tube

queue id

status

task status

event

time of the next important event in task life time, for example, when ttl or ttr expires, in microseconds since start of the UNIX epoch.

ipri

internal value of the task priority

pri

task priority as set when the task was added to the queue

cid

consumer id, of the consumer which took the task (only if the task is taken)

created

time when the task was created (microseconds since start of the UNIX epoch)

ttl

task time to live (microseconds)

ttr

task time to run (microseconds)

cbury

how many times the task was buried

ctaken

how many times the task was taken

now

time recorded when the meta was called

Producer methods

put

$q->put;
$q->put(data => { 1 => 2 });
$q->put(space => 1, tube => 'abc',
        delay => 10, ttl => 3600,
        ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put(data => 'string');

Enqueue a task. Returns new task object. The list of fields with task data (data => ...) is optional.

If 'space' and (or) 'tube' aren't defined the method will try to use them from queue object.

put_unique

$q->put_unique(data => { 1 => 2 });
$q->put_unique(space => 1, tube => 'abc',
        delay => 10, ttl => 3600,
        ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put_unique(data => 'string');

Enqueue an unique task. Returns new task object, if it was not enqueued previously. Otherwise it will return existing task. The list of fields with task data (data => ...) is optional.

If 'space' and (or) 'tube' aren't defined the method will try to use them from queue object.

urgent

Enqueue a task. The task will get the highest priority. If delay is not zero, the function is equivalent to put.

Consumer methods

take

my $task = $q->take;
my $task = $q->take(timeout => 0.5);
my $task = $q->take(space => 1, tube => 'requests, timeout => 20);

If there are tasks in the queue ready for execution, take the highest-priority task. Otherwise, wait for a ready task to appear in the queue, and, as soon as it appears, mark it as taken and return to the consumer. If there is a timeout, and the task doesn't appear until the timeout expires, returns undef. If timeout is not given, waits indefinitely.

All the time while the consumer is working on a task, it must keep the connection to the server open. If a connection disappears while the consumer is still working on a task, the task is put back on the ready list.

ack

$q->ack(task => $task);
$task->ack; # the same

$q->ack(id => $task->id);
$q->ack(space => $task->space, id => $task->id);

Confirm completion of a task. Before marking a task as complete, this function verifies that:

  • the task is taken

  • the consumer that is confirming the task is the one which took it

Consumer identity is established using a session identifier. In other words, the task must be confirmed by the same connection which took it. If verification fails, the function returns an error.

On success, deletes the task from the queue. Throws an exception otherwise.

requeue

$q->requeue(task => $task);
$task->requeue; # the same

$q->requeue(id => $task->id);
$q->requeue(id => $task->id, space => $task->space);

Return a task to the queue, the task is not executed. Puts the task at the end of the queue, so that it's executed only after all existing tasks in the queue are executed.

bury

$q->bury(task => $task);
$task->bury; # the same

$q->bury(id => $task->id);
$q->bury(id => $task->id, space => $task->space);

Mark a task as buried. This special status excludes the task from the active list, until it's dug up. This function is useful when several attempts to execute a task lead to a failure. Buried tasks can be monitored by the queue owner, and treated specially.

release

$q->release(task => $task);
$task->release; # the same

$q->release(id => $task->id, space => $task->space);
$q->release(task => $task, delay => 10); # delay the task
$q->release(task => $task, ttl => 3600); # append task's ttl

Return a task back to the queue: the task is not executed. Additionally, a new time to live and re-execution delay can be provided.

done

$q->done(task => $task, data => { result => '123' });
$task->done(data => { result => '123' }); # the same
$q->done(id => $task->id, space => $task->space);

Mark a task as complete (done), but don't delete it. Replaces task data with the supplied data.

COPYRIGHT AND LICENCE

Copyright (C) 2012 by Dmitry E. Oboukhov <unera@debian.org>
Copyright (C) 2012 by Roman V. Nikolaev <rshadow@rambler.ru>

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.8 or, at your option, any later version of Perl 5 you may have available.