NAME
Forks::Queue - queue that can be shared across processes
VERSION
0.01
SYNOPSIS
use Forks::Queue;
$q = Forks::Queue->new( impl => ..., style => 'lifo' );
# put items on queue
$q->put("a scalar item");
$q->put(["an","arrayref","item"]);
$q->put({"a"=>"hash","reference"=>"item"});
$q->put("list","of","multiple",["items"]);
$q->end; # no more jobs will be added to queue
# retrieve items from queue, possibly after a fork
$item = $q->get;
$item = $q->peek; # get item without removing it
@up_to_10_items = $q->get(10);
$remaining_items = $q->pending;
DESCRIPTION
Interface for a queue object that can be shared across processes. Available implementations are Forks::Queue::File, Forks::Queue::Shmem, Forks::Queue::SQLite.
METHODS
Many of these methods pass or return "tasks". For this distribution, a "task" is any scalar or reference that can be serialized and shared across processes.
This will include scalars and most unblessed references
"42"
[1,2,3,"forty-two"]
{ name=>"a job", timestamp=>time, input=>{foo=>[19],bar=>\%bardata} }
but will generally prohibit data with blessed references and code references
{ name => "bad job", callback => \&my_callback_routine }
[ 46, $url13, File::Temp->new ]
new
$queue = Forks::Queue->new( %opts )
Instantiates a new queue object with the given configuration.
If one of the options is impl
, the constructor from that Forks::Queue
subclass will be invoked.
If the impl
option is not provided, this call will fail.
Options that should be supported on all implementations include
style => 'fifo' | 'lifo'
Indicates whether the "get" method will return items in first-in-first-out order or last-in-first-out order (and which end of the queue the "peek" method will examine)
limit => int
A maximum size for the queue. Set to a non-positive value to specify an unlimited size queue.
on_limit => 'block' | 'fail'
Dictates what the queue should do when an attempt is made to add tasks beyond the queue's limit. If
block
, the queue will block and wait until items are removed from the queue. Iffail
, the queue will warn and return immediately without changing the queue.join => bool
If true, expects that the queue referred to by this constructor has already been created in another process, and that the current process should access the existing queue. This allows a queue to be shared across unrelated processes (i.e., processes that do not have a parent-child relationship).
# my_daemon.pl - may run "all the time" in the background $q = Forks::Queue::File->new(file=>'/var/spool/foo/q17'); # creates new queue object ... # worker.pl - may run periodically for a short time, launched from # cron or from command line, but not from the daemon $q = Forks::Queue->new( impl => 'File', join => 1, file => '/var/spool/foo/q17', # the new queue attaches to existing file at /var/spool/foo/q17 ...
join
is not necessary for child processes forked from a process with an existing queue$q = Forks::Queue->new(...) ... if (fork() == 0) { # $q already exists and the child process can begin using it, # no need for a Forks::Queue constructor with join ... }
persist => bool
Active
Forks::Queue
objects affect your system, writing to disk or writing to memory, and in general they clean themselves up when they detect that no more processes are using the queue. Thepersist
option, if set to true, instructs the queue object to leave its state intact after destruction.An obvious use case for this option is debugging, to examine the state of the queue after abnormal termination of your program.
A second use case is to create persistent queues -- queues that are shared not only among different processes, but among different processes that are running at different times. The persistent queue can be used by supplying both the
persist
and thejoin
options to theForks::Queue
constructor.$queue_file = "/tmp/persistent.job.queue"; $join = -f $queue_file; $q = Forks::Queue->new( impl => 'File', file => $queue_file, join => $join, persist => 1 ); ... work with the queue ... # the queue remains intact if this program exits or aborts
put
$count = $queue->put(@tasks)
Place one or more "tasks" on the queue, and returns the number of tasks successfully added to the queue.
Adding tasks to the queue will fail if the "end" method of the queue had previously been called from any process.
push
$count = $queue->push(@tasks)
Equivalent to "put", adding tasks to the end of the queue and returning the number of tasks successfully added. The most recent tasks appended to the queue by push
or put
will be the first tasks taken from the queue by "pop" or by "get" with LIFO style queues, and the last tasks removed by "shift" or "get" with FIFO style queues.
For now there is no unshift
method to add tasks to the front of the queue (and to be retrieved first in FIFO queues), but maybe someday there will be.
end
$queue->end;
Indicates that no more tasks are to be put on the queue, so that when a process tries to retrieve a task from an empty queue, it will not block and wait until a new task is added. This method may be called from any process that has access to the queue.
get
$task = $queue->get
@tasks = $queue->get($count)
Attempt to retrieve one or more "tasks" on the queue. If the queue is empty, and if "end" has not been called on the queue, this call blocks until a task is available or until the "end" method has been called from some other process. If the queue is empty and "end" has been called, this method returns an empty list in list context or undef
in scalar context.
If a $count
argument is supplied, returns up to $count
tasks or however many tasks are currently available on the queue, whichever is fewer.
pop
$task = $queue->pop
@tasks = $queue->pop($count)
Retrieves one or more tasks from the "back" of the queue. For LIFO style queues, the "get" method is equivalent to this method. Like "get"
, this method blocks while the queue is empty and the "end" method has not been called on the queue.
If a $count
argument is supplied, returns up to $count
tasks or however many tasks are currently available on the queue, whichever is fewer.
shift
$task = $queue->shift
@tasks = $queue->shift($count)
Retrieves one or more tasks from the "front" of the queue. For FIFO style queues, the "get" method is equivalent to this method. Like "get"
, this method blocks while the queue is empty and the "end" method has not been called on the queue.
If a $count
argument is supplied, returns up to $count
tasks or however many tasks are currently available on the queue, whichever is fewer.
peek
$task = $queue->peek
$task = $queue->peek_front
$task = $queue->peek_back
Returns a task from the queue without removing it. The peek_front
and peek_back
methods inspect the task at the front and the back of the queue, respectively. The generic peek
method is equivalent to peek_front
for FIFO style queues and peek_back
for LIFO style queues.
If the queue is empty, these methods will return undef
.
Note that unlike the <"peek" method in Thread::Queue
>, Forks::Queue::peek
returns a copy of the item on the queue, so manipulating a reference returned from peek
while not affect the item on the queue.
pending
$num_tasks_avail = $queue->pending
Returns the total number of tasks available on the queue. There is no guarentee that the number of available tasks will not change between a call to pending
and a subsequent call to "get"
status
$status = $queue->status
Returns a hash reference with meta information about the queue. The information should at least include the number of tasks remaining in the queue. Other implementations may provide additional information in this return value.
DEPENDENCIES
The Forks::Queue
module and all its current implementations require the JSON module.
SEE ALSO
Thread::Queue, File::Queue, Queue::Q, MCE::Queue, Queue::DBI, Directory::Queue.
SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Forks::Queue
You can also look for information at:
RT: CPAN's request tracker (report bugs here)
AnnoCPAN: Annotated CPAN documentation
CPAN Ratings
Search CPAN
LICENSE AND COPYRIGHT
Copyright (c) 2017, Marty O'Brien.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.10.1 or, at your option, any later version of Perl 5 you may have available.
See http://dev.perl.org/licenses/ for more information.