NAME
AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues
SYNOPSIS
use AnyEvent::Beanstalk::Worker;
use Data::Dumper;
use JSON;
my $w = AnyEvent::Beanstalk::Worker->new(
concurrency => 10,
initial_state => 'reserved',
beanstalk_watch => 'jobs',
beanstalk_decoder => sub {
eval { decode_json(shift) };
}
);
$w->on(reserved => sub {
my $self = shift;
my ($qjob, $qresp) = @_;
say "Got a job: " . Dumper($qjob->decode);
shift->emit( my_next_state => $qjob );
});
$w->on(my_next_state => sub {
my $self = shift;
my $job = shift;
## do something with job
...
## maybe not ready yet?
unless ($job_is_ready) {
return $self->finish(release => $job->id, { delay => 60 });
}
## all done!
$self->finish(delete => $job->id);
});
$w->start;
AnyEvent->condvar->recv;
DESCRIPTION
AnyEvent::Beanstalk::Worker implements a simple, abstract finite-state automaton for beanstalk queues. It can handle a configurable number of concurrent jobs, and implements graceful worker shutdown.
You are encouraged to subclass AnyEvent::Beanstalk::Worker and implement your own init function, for example, so your object has access to anything you need in subsequent states.
The "SUPPLEMENTAL" section below contains additional information about the various technolgies this module uses.
METHODS
AnyEvent::Beanstalk::Worker implements these methods:
new
Create a new object. The new method accepts the following arguments:
- initial_state
-
Specify an initial state to move to after a job has been reserved. The handler for this state should expect to receive an AnyEvent::Beanstalk::Job object and the beanstalk queue response (a string such as "RESERVED"). Default is undefined--you should supply an initial state if you want your worker to do anything more than accepting and deleting jobs from the queue.
- concurrency
-
How many concurrent jobs this worker will handle. Set this to a higher number to process more jobs simultaneously. Defaults to 1.
- max_jobs
-
How many jobs this worker will handle before it exits. 0 means the worker will never exit. Defaults to 0.
- max_stop_tries
-
How many
TERM
orINT
signals must be received before we quit, regardless of outstanding jobs. Defaults to 3. - beanstalk_host
-
The hostname of the beanstalk server. Defaults to 'localhost'.
- beanstalk_port
-
The port of the beanstalk server. Defaults to 11300.
- beanstalk_decoder
-
A reference to a subroutine responsible for decoding a beanstalk job. See AnyEvent::Beanstalk.
- beanstalk_watch
-
The beanstalk tube to watch. Set this to the same tube your producers add jobs to. See AnyEvent::Beanstalk.
- log_level
-
The default log level. Defaults to 4 (meaning "error"). See AnyEvent::Log.
- reserve_timeout
-
How long in seconds to wait for a job from beanstalk. Defaults to 1 second. After this time, the loop will run again looking for additional events before trying to reserve another job.
- release_delay
-
How long in seconds a job should wait before another worker can take it. Defaults to 3 seconds.
init
Called at the end of new; by default this is an empty method. If you want your worker object to have access to additional "things" (such as a web user agent object), subclass AnyEvent::Beanstalk::Worker and implement init:
package WebWorker;
use parent 'AnyEvent::Beanstalk::Worker';
use Mojo::UserAgent;
sub init { shift->{ua} = Mojo::UserAgent->new }
1;
Now we can use our WebWorker class:
use WebWorker;
use JSON;
my $w = WebWorker->new(
concurrency => 50,
initial_state => 'reserved',
beanstalk_watch => 'web-jobs',
beanstalk_decoder => sub {
eval { decode_json(shift) };
}
);
$w->on(reserved => sub {
my $self = shift;
my ($job, $resp) = @_;
$self->{ua}->get($job->decode->{url},
sub { $self->emit(page_found => $job) });
});
$w->on(page_found => sub {...});
start
Starts the worker. Before start is invoked, the worker does not receive or emit events.
$w->start;
stop
Tries to stop the worker. If max_stop_tries is reached or there are no outstanding jobs, the worker exits immediately. If max_stop_tries has not yet been reached and the worker has outstanding jobs, control returns to the event loop until the jobs complete or max_stop_tries is reached. Sending a SIGINT
or SIGTERM
invokes stop.
$w->stop;
finish
Should be called when a worker is finished with a job. The first argument is the beanstalk method to call: release
, delete
, or bury
.
The second argument is the beanstalk job id. An optional third argument will be passed to the beanstalk method invoked in the first argument.
$w->finish(delete => $job->id);
on
Registers an event listener.
$w->on(some_state => sub {
my $self = shift;
...
$self->emit(next_state => @args);
});
emit
Emits an event with optional arguments.
$self->emit(a_state => ());
EVENTS
AnyEvent::Beanstalk::Worker emits some events internally, but these should not be interesting to anyone using the module in most cases. This module also provides its own handlers for each of these events. You may override these handlers (via on), but you should know what you're doing if you do that.
If you use this module, you should emit your own states and provide your own state handlers for those events, beginning with the handler for the event you indicated in the constructor's initial_state argument, which this module will emit for you once a job has been reserved from the queue.
The following list of internal events is provided for completeness only and you should generally not emit nor handle these events:
start
reserved
ATTRIBUTES
AnyEvent::Beanstalk::Worker implements the following attributes.
beanstalk
This is a handle to the internal AnyEvent::Beanstalk object.
job_count
This returns the number of outstanding jobs this worker is handling.
handled_jobs
This returns the number of jobs this worker has reserved and begun work on.
concurrency
Sets or gets the number of jobs this worker can handle at the same time.
SIGNALS
AnyEvent::Beanstalk::Worker receives the following signals:
INT
A INT
signal will cause the worker to invoke its stop method, which will process any outstanding events before shutting down.
TERM
A TERM
signal is handled in the same way as INT
.
USR2
A USR2
signal will bump the log level of the worker up until it reaches trace; after trace it wraps around and starts again at critical. See AnyEvent::Log for available log levels.
LOGGING
AnyEvent::Beanstalk::Worker implements logging via AnyEvent::Log; it probably doesn't do this as well as it could and more work needs to be done here.
EXAMPLES
The eg directory has several working examples of using this module, including one that shows how to subclass it.
SUPPLEMENTAL
This section contains additional information not directly needed to use this module, but may be useful for those unfamiliar with any of the underlying technologies.
Caveat
This module represents the current results of an ongoing experiment involving queues (beanstalk, AnyEvent::Beanstalk), non-blocking and asynchronous events (AnyEvent), and state machines as means of a simpler to understand method of event-driven programming.
Introduction to beanstalk
beanstalkd is a small, fast work queue written in C. When you need to do lots of jobs (work units--call them what you will), such as sending an email, fetching and parsing a web page, image processing, etc.), a producer (a small worker that creates jobs) adds jobs to the queue. One or more consumer workers come along and ask for jobs from the queue, and then work on them. When the consumer worker is done, it deletes the job from the queue and asks for another job.
Introduction to AnyEvent
AnyEvent is an elegantly designed, generic interface to a variety of event loops.
Introduction to state machines
The idea behind state machines is you have a "machine" (or program modeling a machine) with a set of states and a set of events that when triggered alter the state of the machine. For example, we could model a web crawler as a state machine. Our states will be get url, fetch, parse, and add url, and our events will be got url, fetched, parsed, and added.
+---------+
| get url |
+-/-----^-+
(got url) / \
/ \ (added)
+-----v-+ +---\-----+
| fetch | | add url |
+-----\-+ +-^-------+
(fetched) \ /
\ / (parsed)
+-v---/-+
| parse |
+-------+
In the get url state, we take a URL from a list of URLs (perhaps we seed it with one URL), then we emit the got url event. This causes our machine to move to the fetch state. In the fetch state, we make an HTTP GET
request on that URL and then emit the fetched event, which moves our machine to the parse state where we parse the incoming web page. Then we add any URLs we find into the queue and start over.
If we use our WebWorker class above, the result might look like this:
#!/usr/bin/env perl
use strict;
use warnings;
use feature 'say';
use WebWorker;
my $w = WebWorker->new
( concurrency => 1,
max_stop_tries => 1,
initial_state => 'fetch',
beanstalk_watch => "urls" );
## do this before we call start()
$w->beanstalk->use("urls")->recv;
$w->on(fetch => sub {
my ($self, $job, $resp) = @_;
say STDERR "fetching " . $job->data;
$w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
});
$w->on(receive => sub {
my ($self, $job, undef, $tx) = @_;
if ( $tx->error ) {
warn "Moved or some error: " . $tx->error;
return $self->finish(delete => $job->id);
}
unless ($tx->res->headers->content_type =~ /html/i) {
warn "Not HTML; skipping\n";
return $self->finish(delete => $job->id);
}
say STDERR "parsing " . $job->data;
eval {
$tx->res->dom->at("html body")->find('a[href]')
->each(sub { $self->emit(add_url => shift->{href}) });
};
return $self->finish(delete => $job->id);
});
$w->on(add_url => sub {
my ($self, $url) = @_;
return unless $url =~ /^http/;
$self->beanstalk
->put({ priority => 100,
ttr => 15,
delay => 1,
data => $url },
sub { say STDERR "URL $url added" });
});
$w->start;
AnyEvent->condvar->recv;
We've just written a simple (and impolite--should read robots.txt) web crawler.
See eg/web-state.pl and eg/web-state-add.pl for this example.
Introduction to event loops
I couldn't find any gentle introductions into event loops; I was going to write one myself but realized it would probably turn into a book. Additionally, I'm not qualified to write said book. With that disclaimer, here is a brief, "close enough" introduction to event loops which may help some people get an approximate mental model, good enough to begin event programming.
An event loop can be as simple as this:
my @events = ();
my %watchers = ();
while (1) {
my $event = pop @events;
handle($event);
}
sub handle {
my $event = shift;
$_->($event) for @{$watchers{$event->{type}}};
}
The @events
list (or queue, since events are read as a FIFO) might be populated asynchronously from system events, such as receiving signals, network data, disk I/O, timers, or other sources. The handle()
subroutine checks the %watchers
hash to see if there are any watchers or handlers for this event and calls those subroutines as needed. Some of these subroutines may add more events to the event queue. Then the loop starts again.
Most of the time you never see the event loop--you just start it. For example, most of the time when I'm programming with EV, this is all I ever see of it:
EV::run;
EV receives all kinds of events from the system, but you can tell it about more events. Then you register event handlers to fire off when a particular kind of event is received.
SEE ALSO
beanstalkd, by Keith Rarick: http://kr.github.io/beanstalkd/
AnyEvent::Beanstalk, by Graham Barr: AnyEvent::Beanstalk
AnyEvent, by Marc Lehmann: http://anyevent.schmorp.de
AUTHOR
Scott Wiersdorf, <scott@perlcode.org>
COPYRIGHT AND LICENSE
Copyright (C) 2014 by Scott Wiersdorf
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.16.1 or, at your option, any later version of Perl 5 you may have available.