NAME
IPC::DirQueue - disk-based many-to-many task queue
SYNOPSIS
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
$dq->enqueue_file("filename");
my $dq = IPC::DirQueue->new({ dir => "/path/to/queue" });
my $job = $dq->pickup_queued_job();
if (!$job) { print "no jobs left\n"; exit; }
# ...do something interesting with $job->get_data_path() ...
$job->finish();
DESCRIPTION
This module implements a FIFO queueing infrastructure, using a directory as the communications and storage media. No daemon process is required to manage the queue; all communication takes place via the filesystem.
A common UNIX system design pattern is to use a tool like lpr
as a task queueing system; for example, http://patrick.wagstrom.net/old/weblog/archives/000128.html
describes the use of lpr
as an MP3 jukebox.
However, lpr
isn't as efficient as it could be. When used in this way, you have to restart each task processor for every new task. If you have a lot of startup overhead, this can be very inefficient. With IPC::DirQueue
, a processing server can run persistently and cache data needed across multiple tasks efficiently; it will not be restarted unless you restart it.
Multiple enqueueing and dequeueing processes on multiple hosts (NFS-safe locking is used) can run simultaneously, and safely, on the same queue.
Since multiple dequeuers can run simultaneously, this provides a good way to process a variable level of incoming tasks using a pre-defined number of worker processes.
If you need more CPU power working on a queue, you can simply start another dequeuer to help out. If you need less, kill off a few dequeuers.
If you need to take down the server to perform some maintainance or upgrades, just kill the dequeuer processes, perform the work, and start up new ones. Since there's no 'socket' or similar point of failure aside from the directory itself, the queue will just quietly fill with waiting jobs until the new dequeuer is ready.
Arbitrary 'name = value' string-pair metadata can be transferred alongside data files. In fact, in some cases, you may find it easier to send unused and empty data files, and just use the 'metadata' fields to transfer the details of what will be worked on.
METHODS
- $dq->new ($opts);
-
Create a new queue object, suitable for either enqueueing jobs or picking up already-queued jobs for processing.
$opts
is a reference to a hash, which may contain the following options:- dir => $path_to_directory (no default)
-
Name the directory where the queue files are stored. This is required.
- data_file_mode => $mode (default: 0666)
-
The
chmod
-style file mode for data files. This should be specified as a string with a leading 0. It will be affected by the current processumask
. - queue_file_mode => $mode (default: 0666)
-
The
chmod
-style file mode for queue control files. This should be specified as a string with a leading 0. It will be affected by the current processumask
. - ordered => { 0 | 1 } (default: 1)
-
Whether the jobs should be processed in order of submission, or in no particular order.
- queue_fanout => { 0 | 1 } (default: 0)
-
Whether the queue directory should be 'fanned out'. This allows better scalability with NFS-shared queues with large numbers of pending files, but hurts performance otherwise. It also implies ordered = 0. (This is strictly experimental, has overall poor performance, and is not recommended.)
- indexd_uri => $uri (default: undef)
-
A URI of a
dq-indexd
daemon, used to maintain the list of waiting jobs. The URI must be of the formdq://hostname[:port]
. (This is strictly experimental, and is not recommended.) - buf_size => $number (default: 65536)
-
The buffer size to use when copying files, in bytes.
- active_file_lifetime => $number (default: 600)
-
The lifetime of an untouched active lockfile, in seconds. See 'STALE LOCKS AND SIGNAL HANDLING', below, for more details.
- $dq->enqueue_file ($filename [, $metadata [, $pri] ] );
-
Enqueue a new job for processing. Returns
1
if the job was enqueued, orundef
on failure.$metadata
is an optional hash reference; every item of metadata will be available to worker processes on theIPC::DirQueue::Job
object, in the$job->{metadata}
hashref. Note that using this channel for metadata brings with it several restrictions:- 1. it requires that the metadata be stored as 'name' => 'value' string pairs
- 2. neither 'name' nor 'value' may contain newline (\r) or NUL (\0) characters
- 3. 'name' cannot contain colon characters
- 4. 'name' cannot start with a capital 'Q' and be 4 characters in length
If those restrictions are broken, die() will be called with the following error:
die "IPC::DirQueue: invalid metadatum: '$k'";
This is a change added in release 0.06; prior to that, that metadatum would be silently dropped.
An optional priority can be specified; lower priorities are run first. Priorities range from 0 to 99, and 50 is default.
- $dq->enqueue_fh ($filehandle [, $metadata [, $pri] ] );
-
Enqueue a new job for processing. Returns
1
if the job was enqueued, orundef
on failure.$pri
and$metadata
are as described in$dq->enqueue_file()
.$filehandle
is a perl file handle that must be open for reading. It will be closed on completion, regardless of success or failure. - $dq->enqueue_string ($string [, $metadata [, $pri] ] );
-
Enqueue a new job for processing. The job data is entirely read from
$string
. Returns1
if the job was enqueued, orundef
on failure.$pri
and$metadata
are as described in$dq->enqueue_file()
. - $dq->enqueue_sub ($subref [, $metadata [, $pri] ] );
-
Enqueue a new job for processing. Returns
1
if the job was enqueued, orundef
on failure.$pri
and$metadata
are as described in$dq->enqueue_file()
.$subref
is a perl subroutine, which is expected to return one of the following each time it is called:- a string of data bytes to be appended to any existing data. (the string may be empty, C<''>, in which case it's a no-op.) - C<undef> when the enqueued data has ended, ie. EOF. - C<die()> if an error occurs. The C<die()> message will be converted into a warning, and the C<enqueue_sub()> call will return C<undef>.
(Tip: note that this is a closure, so variables outside the subroutine can be accessed safely.)
- $job = $dq->pickup_queued_job();
-
Pick up the next job in the queue, so that it can be processed.
If no job is available for processing, either because the queue is empty or because other worker processes are already working on them,
undef
is returned; otherwise, a new instance ofIPC::DirQueue::Job
is returned.Note that the job is marked as active until
$job->finish()
is called. - $job = $dq->wait_for_queued_job ([ $timeout [, $pollinterval] ]);
-
Wait for a job to be queued within the next
$timeout
seconds.If there is already a job ready for processing, this will return immediately. If one is not available, it will sleep, wake up periodically, check for job availabilty, and either carry on sleeping or return the new job if one is now available.
If a job becomes available, a new instance of
IPC::DirQueue::Job
is returned. If the timeout is reached,undef
is returned.If
$timeout
is not specified, or is less than 1, this function will wait indefinitely.The optional parameter
$pollinterval
indicates how frequently to wake up and check for new jobs. It is specified in seconds, and floating-point precision is supported. The default is1
.Note that if
$timeout
is not a round multiple of$pollinterval
, the nearest round multiple of$pollinterval
greater than$timeout
will be used instead. Also note that$timeout
is used as an integer. - $job = $dq->visit_all_jobs($visitor, $visitcontext);
-
Visit all the jobs in the queue, in a read-only mode. Used to list the entire queue.
The callback function
$visitor
will be called for each job in the queue, like so:&$visitor ($visitcontext, $job);
$visitcontext
is whatever you pass in that variable above.$job
is a new, read-only instance ofIPC::DirQueue::Job
representing that job.If a job is active (being processed), the
$job
object also contains the following additional data:'active_host': the hostname on which the job is active 'active_pid': the process ID of the process which picked up the job
STALE LOCKS AND SIGNAL HANDLING
If interrupted or terminated, dequeueing processes should be careful to either call $job->finish()
or $job->return_to_queue()
on any active tasks before exiting -- otherwise those jobs will remain marked active.
Dequeueing processes can also call $job->touch_active_lock()
periodically, while processing large tasks, to ensure that the task is still marked as active.
Stale locks are normally dealt with automatically. If a lock is still active after about 10 minutes of inactivity, the other dequeuers on that machine will probe the process ID listed in that lock file using kill(0)
. If that process ID is no longer running, the lock is presumed likely to be stale. If a given timeout (10 minutes plus a random value between 0 and 256 seconds) has elapsed since the lock file was last modified, the lock file is deleted.
This 10-minute default can be modified using the active_file_lifetime
parameter to the IPC::DirQueue
constructor.
Note: this means that if the dequeueing processes are spread among multiple machines, and there is no longer a dequeuer running on the machine that initially 'locked' the task, it will never be unlocked, unless you delete the active file for that task.
QUEUE DIRECTORY STRUCTURE
IPC::DirQueue
maintains the following structure for a queue directory:
- queue directory
-
The queue directory is used to store the queue control files. Queue control files determine what jobs are in the queue; if a job has a queue control file in this directory, it is listed in the queue.
The filename format is as follows:
50.20040909232529941258.HASH[.PID.RAND]
The first two digits (
50
) are the priority of the job. Lower priority numbers are run first.20040909232529
is the current date and time when the enqueueing process ran, inYYYYMMDDHHMMSS
format.941258
is the time in microseconds, as returned bygettimeofday()
. And finally,HASH
is a variable-length hash of some semi-random data, used to increase the chance of uniqueness.If there is a collision, the timestamps are regenerated after a 250 msec sleep, and further randomness will be added at the end of the string (namely, the current process ID and a random integer value). Multiple retries are attempted until the file is atomically moved into the queue directory without collision.
If queue_fanout was used in the
IPC::DirQueue
constructor, then the queue directory does not contain the queue control files directly; instead, there is an interposing set of 16 'fanout' directories, named according to the hex digits from0
tof
. - active directory
-
The active directory is used to store active queue control files.
When a job becomes 'active' -- ie. is picked up by
pickup_queued_job()
-- its control file is moved from the queue directory into the active directory while it is processed. - data directory
-
The data directory is used to store enqueued data files.
It contains a two-level "fan-out" hashed directory structure; each data file is stored under a single-letter directory, which in turn is under a single-letter directory. This increases efficiency of directory lookups.
The format of filenames here is similar to that used in the queue directory, except that the last two characters are removed and used instead for the "fan-out" directory names.
- tmp directory
-
The tmp directory contains temporary work files that are in the process of enqueueing, and not ready ready for processing.
The filename format here is similar to the above, with suffixes indicating the type of file (".ctrl", ".data").
Atomic, NFS-safe renaming is used to avoid collisions, overwriting or other unsafe operations.
SEE ALSO
IPC::DirQueue::Job
AUTHOR
Justin Mason <dq /at/ jmason.org>
MAILING LIST
The IPC::DirQueue mailing list is at <ipc-dirqueue-subscribe@perl.org>.
COPYRIGHT
IPC::DirQueue
is distributed under the same license as perl itself.
AVAILABILITY
The latest version of this library is likely to be available from CPAN.