NAME
MangoX::Queue - A MongoDB queue implementation using Mango
SYNOPSIS
use Mango;
use MangoX::Queue;
my $mango = Mango->new("mongodb://localhost:27017");
my $collection = $mango->db('my_db')->collection('my_queue');
my $queue = MangoX::Queue->new(collection => $collection);
# To add a basic job
enqueue $queue 'some job name';
$queue->enqueue('some job name');
# To add a complex job
enqueue $queue +{
foo => 'bar'
};
$queue->enqueue({
foo => 'bar'
});
# To set priority
enqueue $queue priority => 2, 'job_name';
$queue->enqueue(priority => 2, 'job_name');
# To set created
enqueue $queue created => DateTime->now, 'job_name';
$queue->enqueue(created => DateTime->now, 'job_name');
# To set status
enqueue $queue status => 'Pending', 'job_name';
$queue->enqueue(status => 'Pending', 'job_name');
# To set multiple options
enqueue $queue priority => 1, created => DateTime->now, 'job_name';
$queue->enqueue(priority => 1, created => DateTime->now, 'job_name');
# To wait for a job status change (non-blocking)
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete' => sub {
# Job status is 'Complete'
};
# To wait on mutliple statuses (non-blocking)
my $id = enqueue $queue 'test';
monitor $queue $id, ['Complete', 'Failed'] => sub {
# Job status is 'Complete' or 'Failed'
};
# To wait for a job status change (blocking)
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete';
# To fetch a job (blocking)
my $job = fetch $queue;
my $job = $queue->fetch;
# To fetch a job (non-blocking)
fetch $queue sub {
my ($job) = @_;
# ...
};
$queue->fetch(sub {
my ($job) = @_;
# ...
});
# To get a job by id (currently blocking)
my $id = enqueue $queue 'test';
my $job = get $queue $id;
# To requeue a job (currently blocking)
my $id = enqueue $queue 'test';
my $job = get $queue $id;
requeue $queue $job;
# To dequeue a job (currently blocking)
my $id = enqueue $queue 'test';
dequeue $queue $id;
# To watch a queue (blocking)
while (my $job = watch $queue) {
# ...
}
while (my $job = $queue->watch) {
# ...
}
# To watch a queue (non-blocking)
watch $queue sub {
my ($job) = @_;
# ...
};
$queue->watch(sub{
my ($job) = @_;
# ...
});
DESCRIPTION
MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.
MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.
ATTRIBUTES
MangoX::Queue implements the following attributes.
delay
my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);
The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.
collection
my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));
my $queue = MangoX::Queue->new(collection => $collection);
The Mango::Collection representing the MongoDB queue collection.
retries
my $retries = $queue->retries;
$queue->retries(5);
The number of times a job will be picked up from the queue before it is marked as failed.
timeout
my $timeout = $queue->timeout;
$queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.
METHODS
MangoX::Queue implements the following methods.
dequeue
my $job = fetch $queue;
dequeue $queue $job;
Dequeues a job. Currently removes it from the collection.
enqueue
enqueue $queue 'job name';
enqueue $queue [ 'some', 'data' ];
enqueue $queue +{ foo => 'bar' };
$queue->enqueue('job name');
$queue->enqueue([ 'some', 'data' ]);
$queue->enqueue({ foo => 'bar' });
Add an item to the queue.
Currently uses priority 1 with a job status of 'Pending'.
fetch
# In blocking mode
my $job = fetch $queue;
my $job = $queue->fetch;
# In non-blocking mode
fetch $queue sub {
my ($job) = @_;
# ...
};
$queue->fetch(sub {
my ($job) = @_;
# ...
});
Fetch a single job from the queue, returning undef if no jobs are available.
Currently sets job status to 'Retrieved'.
get
my $job = get $queue $id;
Gets a job from the queue by ID. Doesn't change the job status.
get_options
my $options = $queue->get_options;
Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.
monitor
# In blocking mode
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete'; # blocks until job is complete
# In non-blocking mode
my $id = enqueue $queue 'test';
monitor $queue $id, 'Complete' => sub {
# ...
};
Wait for a job to enter a certain status.
requeue
my $job = fetch $queue;
requeue $queue $job;
Requeues a job. Sets the job status to 'Pending'.
watch
# In blocking mode
while(my $job = watch $queue) {
# ...
}
while(my $job = $queue->watch) {
# ...
}
# In non-blocking mode
watch $queue sub {
my ($job) = @_;
# ...
};
$queue->watch(sub {
my ($job) = @_;
# ...
});
Watches the queue for jobs, sleeping between queue checks using MangoX::Queue::Delay.
Currently sets job status to 'Retrieved'.