NAME

MangoX::Queue - A MongoDB queue implementation using Mango

SYNOPSIS

use Mango;
use MangoX::Queue;

my $mango = Mango->new("mongodb://localhost:27017");
my $collection = $mango->db('my_db')->collection('my_queue');

my $queue = MangoX::Queue->new(collection => $collection);

# To add a basic job
enqueue $queue 'some job name';
$queue->enqueue('some job name');

# To add a complex job
enqueue $queue +{
	foo => 'bar'
};
$queue->enqueue({
	foo => 'bar'
});

# To set priority
enqueue $queue priority => 2, 'job_name';
$queue->enqueue(priority => 2, 'job_name');

# To set created
enqueue $queue created => DateTime->now, 'job_name';
$queue->enqueue(created => DateTime->now, 'job_name');

# To set status
enqueue $queue status => 'Pending', 'job_name';
$queue->enqueue(status => 'Pending', 'job_name');

# To set multiple options
enqueue $queue priority => 1, created => DateTime->now, 'job_name';
$queue->enqueue(priority => 1, created => DateTime->now, 'job_name');

# To wait for a job status change (non-blocking)
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete' => sub {
	# Job status is 'Complete'
};

# To wait on mutliple statuses (non-blocking)
my $id = enqueue $queue 'test';
monitor $queue $id, ['Complete', 'Failed'] => sub {
	# Job status is 'Complete' or 'Failed'
};

# To wait for a job status change (blocking)
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete';

# To fetch a job (blocking)
my $job = fetch $queue;
my $job = $queue->fetch;

# To fetch a job (non-blocking)
fetch $queue sub {
	my ($job) = @_;
	# ...
};
$queue->fetch(sub {
	my ($job) = @_;
	# ...
});

# To get a job by id (currently blocking)
my $id = enqueue $queue 'test';
my $job = get $queue $id;

# To requeue a job (currently blocking)
my $id = enqueue $queue 'test';
my $job = get $queue $id;
requeue $queue $job;

# To dequeue a job (currently blocking)
my $id = enqueue $queue 'test';
dequeue $queue $id;

# To watch a queue (blocking)
while (my $job = watch $queue) {
	# ...
}
while (my $job = $queue->watch) {
	# ...
}

# To watch a queue (non-blocking)
watch $queue sub {
	my ($job) = @_;
	# ...
};
$queue->watch(sub{
	my ($job) = @_;
	# ...
});

DESCRIPTION

MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.

MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.

ATTRIBUTES

MangoX::Queue implements the following attributes.

delay

my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);

The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.

collection

my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));

my $queue = MangoX::Queue->new(collection => $collection);

The Mango::Collection representing the MongoDB queue collection.

retries

my $retries = $queue->retries;
$queue->retries(5);

The number of times a job will be picked up from the queue before it is marked as failed.

timeout

my $timeout = $queue->timeout;
$queue->timeout(10);

The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.

METHODS

MangoX::Queue implements the following methods.

dequeue

my $job = fetch $queue;
dequeue $queue $job;

Dequeues a job. Currently removes it from the collection.

enqueue

enqueue $queue 'job name';
enqueue $queue [ 'some', 'data' ];
enqueue $queue +{ foo => 'bar' };

$queue->enqueue('job name');
$queue->enqueue([ 'some', 'data' ]);
$queue->enqueue({ foo => 'bar' });

Add an item to the queue.

Currently uses priority 1 with a job status of 'Pending'.

fetch

# In blocking mode
my $job = fetch $queue;
my $job = $queue->fetch;

# In non-blocking mode
fetch $queue sub {
	my ($job) = @_;
	# ...
};
$queue->fetch(sub {
	my ($job) = @_;
	# ...
});

Fetch a single job from the queue, returning undef if no jobs are available.

Currently sets job status to 'Retrieved'.

get

my $job = get $queue $id;

Gets a job from the queue by ID. Doesn't change the job status.

get_options

my $options = $queue->get_options;

Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.

monitor

# In blocking mode
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete'; # blocks until job is complete

# In non-blocking mode
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete' => sub {
	# ...
};

Wait for a job to enter a certain status.

requeue

my $job = fetch $queue;
requeue $queue $job;

Requeues a job. Sets the job status to 'Pending'.

watch

# In blocking mode
while(my $job = watch $queue) {
	# ...
}
while(my $job = $queue->watch) {
	# ...
}

# In non-blocking mode
watch $queue sub {
	my ($job) = @_;
	# ...
};
$queue->watch(sub {
	my ($job) = @_;
	# ...
});

Watches the queue for jobs, sleeping between queue checks using MangoX::Queue::Delay.

Currently sets job status to 'Retrieved'.

SEE ALSO

Mojolicious, Mango