NAME

Thread::Queue::Any::Monitored - monitor a queue for any specific content

SYNOPSIS

use Thread::Queue::Any::Monitored;
my ( $q, $t ) = Thread::Queue::Any::Monitored->new( {
  monitor => sub { print "monitoring value $_[0]\n" }, # is a must
  pre     => sub { print "prepare monitoring\n" },     # optional
  post    => sub { print "stop monitoring\n" },        # optional
  queue   => $queue, # use existing queue, create new if not specified
  exit    => 'exit', # default to undef
} );

$q->enqueue( "foo", ['listref'], {'hashref'} );
$q->enqueue(undef); # exit value by default

@post= $t->join; # optional, wait for monitor thread to end

$queue= Thread::Queue::Any::Monitored->self; # "pre", "do", "post"

# specify class with "freeze" and "thaw" methods
use Thread::Queue::Any::Monitored serializer => 'Storable';

# specify custom freeze and thaw subroutines
use Thread::Queue::Any::Monitored freeze => \&solid, thaw => \&liquid;

VERSION

This documentation describes version 1.00.

DESCRIPTION

                   *** A note of CAUTION ***

This module only functions if threading has been enabled when building
Perl, or if the "forks" module has been installed on an unthreaded Perl.

                   *************************

A queue, as implemented by Thread::Queue::Any::Monitored is a thread-safe data structure that inherits from Thread::Queue::Any. But unlike the standard Thread::Queue::Any, it starts a single thread that monitors the contents of the queue by taking new sets of values off the queue as they become available.

It can be used for simply logging actions that are placed on the queue. Or only output warnings if a certain value is encountered. Or whatever.

The action performed in the thread, is determined by a name or reference to a subroutine. This subroutine is called for every set of values obtained from the queue.

Any number of threads can safely add sets of values to the end of the list.

CLASS METHODS

new

( $queue, $thread )= Thread::Queue::Any::Monitored->new( {
  pre     => \&pre,
  monitor => 'monitor',
  post    => \&module::post,
  queue   => $queue, # use existing queue, create new if not specified
  exit    => 'exit', # default to undef
} );

The new function creates a monitoring function on an existing or on an new (empty) queue. It returns the instantiated Thread::Queue::Any::Monitored object in scalar context: in that case, the monitoring thread will be detached and will continue until the exit value is passed on to the queue. In list context, the thread object is also returned, which can be used to wait for the thread to be really finished using the join() method.

The first input parameter is a reference to a hash that should at least contain the "monitor" key with a subroutine reference.

The other input parameters are optional. If specified, they are passed to the the "pre" routine which is executed once when the monitoring is started.

The following field must be specified in the hash reference:

do
monitor => 'monitor_the_queue',           # assume caller's namespace

or:

monitor => 'Package::monitor_the_queue',

or:

monitor => \&SomeOther::monitor_the_queue,

or:

monitor => sub { print "anonymous sub monitoring the queue\n" },

The "monitor" field specifies the subroutine to be executed for each set of values that is removed from the queue. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  set of values obtained from the queue

What the subroutine does with the values, is entirely up to the developer.

The following fields are optional in the hash reference:

pre
pre => 'prepare_monitoring',              # assume caller's namespace

or:

pre => 'Package::prepare_monitoring',

or:

pre => \&SomeOther::prepare_monitoring,

or:

pre => sub { print "anonymous sub preparing the monitoring\n" },

The "pre" field specifies the subroutine to be executed once when the monitoring of the queue is started. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  any extra parameters that were passed with the call to L<new>.
post
post => 'stop_monitoring',                # assume caller's namespace

or:

post => 'Package::stop_monitoring',

or:

post => \&SomeOther::stop_monitoring,

or:

post => sub { print "anonymous sub when stopping the monitoring\n" },

The "post" field specifies the subroutine to be executed once when the monitoring of the queue is stopped. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  any parameters that were passed with the call to L<new>.

Any values returned by the "post" routine, can be obtained with the join method on the thread object.

queue
queue => $queue,  # create new one if not specified

The "queue" field specifies the Thread::Queue::Any object that should be monitored. A new Thread::Queue::Any object will be created if it is not specified.

exit
exit => 'exit',   # default to undef

The "exit" field specifies the value that will cause the monitoring thread to seize monitoring. The "undef" value will be assumed if it is not specified. This value should be enqueued to have the monitoring thread stop.

self

$queue= Thread::Queue::Any::Monitored->self; # only within "pre" and "do"

The class method "self" returns the object for which this thread is monitoring. It is available within the "pre" and "do" subroutine only.

OBJECT METHODS

enqueue

$queue->enqueue( $scalar, [], {} );
$queue->enqueue('exit'); # stop monitoring

The enqueue method adds all specified parameters as a set on to the end of the queue. The queue will grow as needed to accommodate the list. If the "exit" value is passed, then the monitoring thread will shut itself down.

USING ANOTHER SERIALIZER

Please see the section USING ANOTHER SERIALIZER in Thread::Queue::Any for a description of the options for using specific data serializers.

REQUIRED MODULES

Test::More (0.88)
Thread::Queue::Any (1.11)
Thread::Queue::Monitored (1.02)

INSTALLATION

This distribution contains two versions of the code: one maintenance version for versions of perl < 5.014 (known as 'maint'), and the version currently in development (known as 'blead'). The standard build for your perl version is:

perl Makefile.PL
make
make test
make install

This will try to test and install the "blead" version of the code. If the Perl version does not support the "blead" version, then the running of the Makefile.PL will *fail*. In such a case, one can force the installing of the "maint" version of the code by doing:

perl Makefile.PL maint

Alternately, if you want automatic selection behavior, you can set the AUTO_SELECT_MAINT_OR_BLEAD environment variable to a true value. On Unix-like systems like so:

AUTO_SELECT_MAINT_OR_BLEAD=1 perl Makefile.PL

If your perl does not support the "blead" version of the code, then it will automatically install the "maint" version of the code.

CAVEATS

You cannot remove any values from the queue, as that is done by the monitoring thread. Therefore, the methods "dequeue", "dequeue_dontwait" and "dequeue_keep" are disabled on this object.

Passing unshared values between threads is accomplished by serializing the specified values using Storable when enqueuing and de-serializing the queued value on dequeuing. This allows for great flexibility at the expense of more CPU usage. It also limits what can be passed, as e.g. code references can not be serialized and therefore not be passed.

AUTHOR

Elizabeth Mattijsen, <liz@dijkmat.nl>.

Please report bugs to <perlbugs@dijkmat.nl>.

COPYRIGHT

Copyright (c) 2002, 2003, 2007, 2012 Elizabeth Mattijsen <liz@dijkmat.nl>. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

SEE ALSO

threads, threads::shared, Thread::Queue::Any, Storable.