NAME
MangoX::Queue - A MongoDB queue implementation using Mango
DESCRIPTION
MangoX::Queue is a MongoDB backed queue implementation using Mango to support blocking and non-blocking queues.
MangoX::Queue makes no attempt to handle the Mango connection, database or collection - pass in a collection to the constructor and MangoX::Queue will use it. The collection can be plain, capped or sharded.
For an introduction to MangoX::Queue, see MangoX::Queue::Tutorial.
SYNOPSIS
Non-blocking
Non-blocking mode requires a running Mojo::IOLoop.
my $queue = MangoX::Queue->new(collection => $mango_collection);
# To add a job
enqueue $queue 'test' => sub { my $id = shift; };
# To set options
enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test' => sub { my $id = shift; };
# To watch for a specific job status
watch $queue $id, 'Complete' => sub {
# Job status is 'Complete'
};
# To fetch a job
fetch $queue sub {
my ($job) = @_;
# ...
};
# To get a job by id
get $queue $id => sub { my $job = shift; };
# To requeue a job
requeue $queue $job => sub { my $id = shift; };
# To dequeue a job
dequeue $queue $id => sub { };
# To consume a queue
my $consumer = consume $queue sub {
my ($job) = @_;
# ...
};
# To stop consuming a queue
release $queue $consumer;
# To listen for errors
on $queue error => sub { my ($queue, $error) = @_; };
Blocking
my $queue = MangoX::Queue->new(collection => $mango_collection);
# To add a job
my $id = enqueue $queue 'test';
# To set options
my $id = enqueue $queue priority => 1, created => DateTime::Tiny->now, 'test';
# To watch for a specific job status
watch $queue $id;
# To fetch a job
my $job = fetch $queue;
# To get a job by id
my $job = get $queue $id;
# To requeue a job
my $id = requeue $queue $job;
# To dequeue a job
dequeue $queue $id;
# To consume a queue
while(my $job = consume $queue) {
# ...
}
Other
my $queue = MangoX::Queue->new(collection => $mango_collection);
# To listen for events
on $queue enqueued => sub ( my ($queue, $job) = @_; };
on $queue dequeued => sub ( my ($queue, $job) = @_; };
on $queue consumed => sub { my ($queue, $job) = @_; };
# To register a plugin
plugin $queue 'MangoX::Queue::Plugin::Statsd';
ATTRIBUTES
MangoX::Queue implements the following attributes.
collection
my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));
my $queue = MangoX::Queue->new(collection => $collection);
The Mango::Collection representing the MongoDB queue collection.
delay
my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);
The MangoX::Queue::Delay responsible for dynamically controlling the delay between queue queries.
plugins
my $plugins = $queue->plugins;
Returns a hash containing the plugins registered with this queue.
retries
my $retries = $queue->retries;
$queue->retries(5);
The number of times a job will be picked up from the queue before it is marked as failed.
timeout
my $timeout = $queue->timeout;
$queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before it is released back into Pending state. Defaults to 60 seconds.
EVENTS
MangoX::Queue inherits from Mojo::EventEmitter and emits the following events.
Events are emitted only for actions on the current queue object, not the entire queue.
consumed
on $queue consumed => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is consumed (either via consume or fetch)
dequeued
on $queue dequeued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is dequeued
enqueued
on $queue enqueued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is enqueued
METHODS
MangoX::Queue implements the following methods.
consume
# In blocking mode
while(my $job = consume $queue) {
# ...
}
# In non-blocking mode
consume $queue sub {
my ($job) = @_;
# ...
};
Waits for jobs to arrive on the queue, sleeping between queue checks using MangoX::Queue::Delay or Mojo::IOLoop.
Currently sets the status to 'Retrieved' before returning the job.
dequeue
my $job = fetch $queue;
dequeue $queue $job;
Dequeues a job. Currently removes it from the collection.
enqueue
my $id = enqueue $queue 'job name';
my $id = enqueue $queue [ 'some', 'data' ];
my $id = enqueue $queue +{ foo => 'bar' };
Add an item to the queue in blocking mode. The default priority is 1 and status is 'Pending'.
You can set queue options including priority, created and status.
my $id = enqueue $queue,
priority => 1,
created => time,
status => 'Pending',
+{
foo => 'bar'
};
For non-blocking mode, pass in a coderef as the final argument.
my $id = enqueue $queue 'job_name' => sub {
# ...
};
my $id = enqueue $queue priority => 1, +{
foo => 'bar',
} => sub {
# ...
};
Sets the status to 'Pending' by default.
fetch
# In blocking mode
my $job = fetch $queue;
# In non-blocking mode
fetch $queue sub {
my ($job) = @_;
# ...
};
Fetch a single job from the queue, returning undef if no jobs are available.
Currently sets job status to 'Retrieved'.
get
# In non-blocking mode
get $queue $id => sub {
my ($job) = @_;
# ...
};
# In blocking mode
my $job = get $queue $id;
Gets a job from the queue by ID. Doesn't change the job status.
You can also pass in a job instead of an ID.
$job = get $queue $job;
get_options
my $options = $queue->get_options;
Returns the Mango::Collection options hash used by find_and_modify to identify and update available queue items.
release
my $consumer = consume $queue sub {
# ...
};
release $queue $consumer;
Releases a non-blocking consumer from watching a queue.
requeue
my $job = fetch $queue;
requeue $queue $job;
Requeues a job. Sets the job status to 'Pending'.
update
my $job = fetch $queue;
$job->{status} = 'Failed';
update $queue $job;
Updates a job in the queue.
watch
Wait for a job to enter a certain status.
# In blocking mode
my $id = enqueue $queue 'test';
watch $queue $id, 'Complete'; # blocks until job is complete
# In non-blocking mode
my $id = enqueue $queue 'test';
watch $queue $id, 'Complete' => sub {
# ...
};
ERRORS
Errors are reported by MangoX::Queue using callbacks and Mojo::EventEmitter
To listen for all errors on a queue, subscribe to the 'error' event:
$queue->on(error => sub {
my ($queue, $job, $error) = @_;
# ...
});
To check for errors against an individual update, enqueue or dequeue call, you can check for an error argument to the callback sub:
enqueue $queue +$job => sub {
my ($job, $error) = @_;
if($error) {
# ...
}
}