NAME
Data::Consumer - Repeatedly consume a data resource in a robust way
VERSION
Version 0.14
SYNOPSIS
use Data::Consumer;
my $consumer = Data::Consumer->new(
type => $consumer_name,
unprocessed => $unprocessed,
working => $working,
processed => $processed,
failed => $failed,
max_passes => $num_or_undef,
max_process => $num_or_undef,
max_elapsed => $seconds_or_undef,
);
$consumer->consume( sub {
my $id = shift;
print "processed $id\n";
} );
DESCRIPTION
It is a common requirement to need to process a feed of items of some sort in a robust manner. Such a feed might be records that are inserted into a table, or files dropped in a delivery directory. Writing a script that handles all the edge cases, like getting "stuck" on a failed item, and manages things like locking so that the script can be parallelized can be tricky and is certainly repetitive.
The aim of Data::Consumer is to provide a framework to allow writing such consumer type scripts as easy as writing a callback that processes each item. The framework handles the rest.
The basic idea is that one need only use, or in the case of a feed type not already supported, define a Data::Consumer subclass which implements a few reasonably well defined primitive methods which handle the required tasks, and then the Data::Consumer methods use those to provide a DWIMily consistent interface to the end consumer.
Currently Data::Consumer is distributed with two subclasses, Data::Consumer::MySQL for handling records in a MySQL db (using the MySQL GET_LOCK()
function), and Data::Consumer::Dir for handling a drop directory scenario (like for FTP or a mail directory).
Once a resource type has been defined as a Data::Consumer subclass the use pattern is to construct the subclass with the appropriate arguments, and then call consume with a callback.
The Consumer Pattern
The consumer pattern is where code wants to consume an 'atomic' resource piece by piece. The consuming code doesn't really want to worry much about how they got the piece, a task that should be handled by the framework. The consumer subclasses assume that the resource can be modeled as a queue (that there is some ordering principle by which they can be processed in a predictable sequence). The consume pattern in full glory is something very close to the following following pseudo code. The items marked with asterisks are where user callbacks may be invoked:
DO
RESET TO THE BEGINNING OF THE QUEUE
WHILE QUEUE NOT EMPTY AND CAN *PROCEED*
ACQUIRE NEXT ITEM TO PROCESS FROM QUEUE
MARK AS 'WORKING'
*PROCESS* ITEM
IF PROCESSING FAILED
MARK AS 'FAILED'
OTHERWISE
MARK AS 'PROCESSED'
SWEEP UP ABANDONDED 'WORKING' ITEMS AND MARK THEM AS 'FAILED'
UNTIL WE CANNOT *PROCEED* OR NOTHING WAS PROCESSED
RELEASE ANY LOCKS STILL HELD
This implies that each item potentially has four states: unprocessed
, working
, processed
and failed
. In a database these might be values in a field, in a drop directory scenario these would be different directories, but with all of them they would normally be supplied as values to the Data::Consumer subclass being created.
Subclassing Data::Consumer
Data::Consumer can be used with any resource type that can be modeled as a queue, supports some form of advisory locking mechanism, and provides a way to discriminate between at least the unprocessed
and processed
state.
The routines that must be defined for a new consumer type are new()
, reset()
, acquire()
, release()
, and _mark_as()
, _do_callback()
.
- new
-
It is almost for sure that a subclass will need to override the default constructor. All Data::Consumer objects are blessed hashes, and in fact you should always call the parents classes constructor first with:
my $self= $class->SUPER::new();
- reset
-
This routine is used to reset the objects internal state so the next call to acquire will return the first available item in the queue.
- acquire
-
This routine is to find and in some way lock the next item in the queue. It should ensure that it call is_ignored() on each item to verify the item has not been requested to be ignored.
- release
-
This routine is to release any held locks in the object.
- _mark_as
-
This routine is called to "mark" an item as a particular state. It should be able to handle user supplied values. For instance Data::Consumer::MySQL implements this as an update statement that maps user supplied values to the consumer state names.
Possible states are:
unprocessed
,working
,processed
,failed
. - _do_callback
-
This routine is used to call the user supplied callback with the correct arguments. What arguments are appropriate for the callback are context dependent on the type of class. For instance in Data::Consumer::MySQL calls the callback with the arguments
($consumer, $id, $dbh)
whereas Data::Consumer::Dir calls the callback with the arguments($consumer, $filespec, $filehandle, $filename)
. The point is that the end user should be passed the arguments that make sense, not necessarily the same thing for each consumer type.
Every well-behaved Data::Consumer subclass should include the functional equivalent of the following code in its .pm file:
use base 'Data::Consumer';
__PACKAGE__->register();
This will ensure that it can be properly loaded by Data::Consumer->new(type=>$shortname)
.
It is also normal for a Data::Consumer subclass to provide special methods as needed. For instance Data::Consumer::Dir->fh()
and Data::Consumer::MySQL->dbh()
.
METHODS
CLASS->new(%opts)
Constructor. Normally Data::Consumer's constructor is not called directly, instead the constructor of a subclass is used. However to make it easier to have a data driven load process Data::Consumer accepts the type
argument which should specify the the short name of the subclass (the part after Data::Consumer::
) or the full name of the subclass.
Thus
Data::Consumer->new(type=>'MySQL',%args);
is exactly equivalent to calling
Data::Consumer::MySQL->new(%args);
except that the former will automatically require or use the appropriate module and the latter necessitates that you do so yourself.
Every Data::Consumer subclass constructor supports the following arguments on top of any that are subclass specific. Additionally some arguments are universally used, but have different meaning depending on the subclass.
- unprocessed
-
How to tell if the item is unprocessed.
How this argument is interpreted depends on the Data::Consumer subclass involved.
- working
-
How to tell if the item is currently being worked on.
How this argument is interpreted depends on the Data::Consumer subclass involved.
- processed
-
How to tell if the item has already been worked on.
How this argument is interpreted depends on the Data::Consumer subclass involved.
- failed
-
How to tell if processing failed while handling the item.
How this argument is interpreted depends on the Data::Consumer subclass involved.
- max_passes => $num_or_undef
-
Normally
consume()
will loop through the data set until it is exhausted. By setting this parameter you can control the maximum number of iterations, for instance setting it to1
will result in a single pass through the data per invocation. If0
(or any other false value) is treated as meaning "loop until exhausted". - max_processed => $num_or_undef
-
Maximum number of items to process per invocation.
If set to a false value there is no limit.
- max_failed => $num_or_undef
-
Maximum number of failed process attempts that may occur before consume will stop. If set to a false value there is no limit. Setting this to 1 will cause processing to stop after the first failure.
- max_elapsed => $seconds_or_undef
-
Maximum amount of time that may have elapsed when starting a new process. If more than this value has elapsed then no further processing occurs. If
0
(or any false value) then there is no time limit. - proceed => $code_ref
-
This is a callback that may be used to control the looping process in consume via the
proceed()
method. See the documentation ofconsume()
andproceed()
- sweep => $bool
-
*** NOTE CURRENTLY THIS OPTION IS DISABLED ***
If this parameter is true, and there are four modes defined (
unprocessed
,working
,processed
,failed
) then consume will perform a "sweep up" after every pass, which is responsible for moving "abandonded" files from the working directory (such as from a previous process that segfaulted during processing). Generally this should not be necessary.
CLASS->register(@alias)
Used by subclasses to register themselves as a Data::Consumer subclass and register any additional aliases that the class may be identified as.
Will throw an exception if any of the aliases are already associated to a different class.
When called on a subclass in list context returns a list of the subclasses registered aliases,
If called on Data::Consumer in list context returns a list of all alias class mappings.
$class_or_object->debug_warn_hook()
Specify a callback to use to capture diagnostics data produced by a Data::Consumer object.
If called as a class method, sets the default object for all Data::Consumer objects that have not explicitly set a hook.
If called as an object method, sets the hook to use for that object alone.
Returns the current effective hook. Defaults to use the default_debug_warn()
method for the object. Thus it can be overridden by a subclass if necessary.
The hook will be called with the arguments
($consumer,$level,@lines)
and is not expected to return anything.
$class_or_object->default_debug_warn($level,$debug);
Use warn to output diagnostics. Message includes the process id and the class name.
$class_or_object->debug_level($level,@debug_lines)
Set the minimum debug level.
When called as an object method sets the value of that object alone. undef is distinct from 0 in that undef results in the global debug level being used for that object.
When called as a class method sets the value for all objects which do not have a defined debug level.
Returns the current effective debug level for the object or class.
$class_or_object->debug_warn($level,@debug_lines)
If the current debugging level is above $level
then call the current debug_warn_hook() to output a set of diagnostic messages.
$object->last_id()
Returns the identifier for the last item acquired.
Returns undef if acquire has never been called or if the last attempt to acquire data failed because none was available.
$object->mark_as($type)
Mark an item as a particular type if the object defines that type.
Allowed types are unprocessed
, working
, processed
, failed
$object->process($callback)
Marks the current item as working
and processes it using the $callback
. If the $callback
dies then the item is marked as failed
, otherwise the item is marked as processed
once the $callback
returns. The return value of the $callback
is ignored.
$callback
will be called with at least two arguments, the first being the $consumer object itself, and the second being an identifier for the current record. Normally additional, likely to be useful, arguments are provided as well, on a per subclass basis. For example Data::Consumer::MySQL will pass in the consumer object, the id of the to be processed record, and a copy of the consumers database handle as well for convenience. On the other hand Data::Consumer::Dir will pass in the consumer object, followed by a filespecification for the file to be processed, an open filehandle to the file, and the filename itself (with no path).
The callback may call the methods 'leave', 'ignore', 'fail', and 'halt' on the consumer object before returning, typically by doing something like
return $consumer->ignore;
this allows the callback to send specific signals to consume, specifically
leave : return the item to the unprocessed state after the callback returns.
ignore : return the item to the unprocessed state after the callback returns
and never attempt to process it again with this consumer object.
fail : same result as dieing in a callback, except without throwing an exception
in the situation where there might be $SIG{__DIE__} hooks to worry about.
halt : stop the consume() process after this has been executed
For further details always consult the relevant subclasses documentation for process()
$consumer->leave()
Sometimes its useful to defer processing. This method when called from within a consume/process callback will result in the item being marked as 'unprocessed' after the callback returns (so long as it does not die).
Typically this is invoked as
return $consumer->leave;
from withing a consume/process callback.
Returns $consumer. Will die if not 'unprocessed' state is defined.
$consumer->ignore(@list)
This can used to cause acquire to ignore each item in @list.
If @list is empty then it is assumed it is being called from within consume/process and marks the currently acquired item as ignored and calls $consumer->leave()
.
Returns $consumer. Will die if no 'unprocessed' state is defined.
$consumer->fail($message)
Same as doing die($message)
from within a consume/process callback except that no exception is thrown (no $SIG{__DIE__}
callbacks are invoked) and the error is deferred until the callback actually returns.
Typically used as
return $consumer->fail;
from within a consumer() callback.
Returns the $consumer object.
$consumer->halt()
Causes consume() to halt processing and exit once the callback returns. Typically invoked like
return $consumer->halt;
or
return $consumer->fail->halt;
Returns the consumer object.
$object->is_ignored($id)
Returns true if an item has been set to be ignored. If $id is omitted defaults to last_id
$object->reset()
Reset the state of the object.
$object->acquire()
Acquire an item to be processed.
Returns an identifier to be used to identify the item acquired.
$object->release()
Release any locks on the currently held item.
Normally there is no need to call this directly.
$object->error()
Calls the error
callback if the user has provided one, otherwise calls confess()
. Probably not all that useful for an end user.
$object->consume($callback)
Consumes a data resource until it is exhausted using acquire()
, process()
, and release()
as appropriate. Normally this is the main method used by external processes.
Before each attempt to acquire a new resource, and once at the end of each pass consume will call proceed()
to determine if it can do so. The user may hook into this by specifying a callback in the constructor. This callback will be executed with no args when it is in the inner loop (per item), and with the number of passes at the end of each pass (starting with 1).
$object->proceed($passes)
Returns true
if the conditions specified at construction time are satisfied and processing may proceed. Returns false
otherwise.
If the user has specified a proceed
callback in the constructor then this will be executed before any other rules are applied, with a reference to the current $object
, a reference to the runstats, and if being called at the end of pass with the number of passes.
If this callback returns true
then the other rules will be applied, and only if all other conditions from the constructor are satisfied will proceed()
itself return true
.
$object->runstats()
Returns a reference to a hash of statistics about the last (or currently running) execution of consume. Example:
{
'passes' => 2,
'processed_this_pass' => 0,
'processed' => 3,
'start_time' => 1209750962,
'failed' => 0,
'elapsed' => 0,
'end_time' => 1209750962,
'failed_this_pass' => 0
}
Note that start_time and end_time are unix timestamps.
AUTHOR
Yves Orton, <YVES at cpan.org>
BUGS
Please report any bugs or feature requests to bug-data-consumer at rt.cpan.org
, or through the web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Data-Consumer.
I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.
SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Data::Consumer
You can also look for information at:
RT: CPAN's request tracker
AnnoCPAN: Annotated CPAN documentation
CPAN Ratings
Search CPAN
ACKNOWLEDGEMENTS
Igor Sutton <IZUT@cpan.org> for ideas, testing and support
COPYRIGHT & LICENSE
Copyright 2008, 2010, 2011 Yves Orton, all rights reserved.
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.