NAME

Net::Async::Beanstalk - Non-blocking beanstalk client

SYNOPSIS

use IO::Async::Loop;
my $loop = IO::Async::Loop->new();

use Net::Async::Beanstalk;
my $client = Net::Async::Beanstalk->new();
$loop->add($client);
$client->connect(host => 'localhost', service => '11300',
                 on_connected => sub { $client->put("anything") });

$loop->run();

BUGS

  • Some events are invoked with useless data.

  • Receiving on_disconnect after sending quit might not work.

    In fact disconnecting hasn't been tested at all, even ad-hoc.

  • Protocol errors and the corresponding future exception are out of sync.

  • Net::Async::Beanstalk::Receive is highly repetetive.

  • This document is too long.

  • There are no tests

    See if it's appropriate to steal the tests out of Beanstalk::Client.

DESCRIPTION

Implements the client-side of the beanstalk 1.10 protocol described in https://raw.githubusercontent.com/kr/beanstalkd/v1.10/doc/protocol.txt using IO::Async to provide an asynchronous non-blocking API.

Responses from the server can be handled with events registered in the client object or by waiting on a Future which is returned from the function that initiated a command request.

Net::Async::Beanstalk is based on Moo and IO::Async::Protocol::Stream. Refer to those modules' documentation for basic usage. In particular "connect" in IO::Async::Protocol.

Although this module implements a non-blocking beanstalk client, the protocol itself is not asynchronous. Each command sent will receive a response before the next command will be processed although in practice network buffering makes it appear that commands can be sent while waiting for a previous command's response.

Ordinarily this is irrelevant as all the commands except reserve and reserve-with-timeout respond quickly enough that any delay will be negligible and this module's internal command stack smooths over the times where that's not the case. When a reserve or a reserve-with-timeout command has been sent any other commands will be stacked up waiting to be sent to the server but will not be handled until the reserve/reserve-with-timeout command has completed or the timeout has expired.

If there is a need to send a command while a reserve or reserve-with-timeout is pending then another client object can be created to do it. Note that while a job is reserved by a client (and it remains connected) that job is invisible to any other clients. It cannot, for example, be deleted except over the same connected in which it was reserved and if that connection is closed the job will return to the ready queue and may be reserved by another client.

ATTRIBUTES

default_priority (10,000)
defauly_delay (0)
default_ttr (120)

Default values to associate with a job when it is "put" on the beanstalk server. The defaults are arbitrary but chosen to match the default values from AnyEvent::Beanstalk.

decoder (&YAML::Load)
encoder (&YAML::Dump)

A coderef which will be used to deserialise or serialise jobs as they are retreived from or sent to a beanstalk server.

using

The name of the tube most recently used.

_watching

A hashref who's keys are the tubes which are being watched. The values are irrelevant.

Use the accessor watching to get the list of watched tubes instead of using the attribute directly.

_command_stack

An internal FIFO stack of commands which are waiting to be sent or responded to.

Accessors:

count_commands

How many commands are in the stack, including the one which the server is currently processing.

current_command

Returns the command the server is currently processing, or has just sent a response to, without removing it from the stack.

is_busy

A boolean indicating whether the client is busy, ie. has a command currently being processed or has commands waiting to be sent. Actually implemented by the same method as "count_commands".

_pending_commands

Returns the commands which have not yet completed, including the one which the server is currently processing.

_push_command

Push a new command onto the stack.

_shift_command

Remove and return the first command from the stack, which the server is either processing or has returned a response to.

COMMAND METHODS

Methods which initiate a command on the server are implemented in Net::Async::Beanstalk::Send. The server response is processed by the event handlers in Net::Async::Beanstalk::Receive.

Every method returns a Future which will complete with the server's response to that command, whether success or failure. With few exceptions, documented below, each method expects exactly the arguments that the respective command requires. The commands which expect a YAML structure deserialise the response before returning it.

As well as completing (or failing) the Future returned when the command is sent, each response from the server will attempt to invoke an event. It is probably not a good idea to mix code which waits for futures with code which handles events.

However all commands can invoke one of the error events if an exceptional condition arises and as this is done using invoke_error which will call die if the on_error event is not handled.

All commands can potentially invoke these error events, which will raise an exception if an on_error event handler has not been installed.

on_error (Protocol error: Bad format)

Failed future's exception: invalid

on_error (Protocol error: Internal error)

Failed future's exception: server-error

on_error (Protocol error: Out of memory)

Failed future's exception: server-error

on_error (Protocol error: Unknown command)

Failed future's exception: invalid

on_error (Protocol error: Unknown response)

Failed future's exception: unknown

See the protocol documentation for further details on each command.

The methods are named with a _ where the respective protocol command has a -. They are:

put ($job, %options) or put (%options)

Put a job onto the currently used tube. The job data can be passed as the method's first argument or in %options.

The job's priority, delay or ttr can be set by including those values in %options. If they are not then the client's default value is used (see above).

If the job is passed as the method's first argument and it is a reference which does not overload the stringify ("") operator then it is first serialised using "encoder".

Alternatively the job may be passed in options as raw_data, which is sent to beanstalk as-is, or as data which is first serialised using "encoder". It should probably be considered a bug that $job and $options{data} are handled treated differently.

Regardless of whether /encoder is used to transform the job data, it is first changed to a string of bytes using "encode" in utf8.

It is an error to pass the job data in more than one form or to included unknown options.

Possible events:

on_error (Protocol error: Expected cr+lf)

Failed future's exception: invalid

on_error (Protocol error: Job too big)

Failed future's exception: invalid

on_job_insert ($job_id)
on_job_insert_fail (?)
on_draining (?)
reserve (%options)

Reserve the next available job. timeout may be passed in %options in which case the reserve-with-timout command is sent instead. timeout may be 0.

The data returned by the server is transformed into a string of characters with "decode" in utf8 then deserialised using /decoder.

If the asis option is set to a true value then the data returned by the server is transformed into characters but is not deserialised.

If the raw option is set to a true value then the data is left completely untouched.

Possible events:

on_time_out (?)
on_ttr_soon (?)
on_job_reserve ($job_id, $data)
reserve_with_timeout ($time, %options)

Implemented by calling "reserve" with a timeout option.

bury ($job_id)

Possible events:

on_job_bury ($job_id)
on_job_bury_not_found ($job_id)
delete ($job_id)

Possible events:

on_job_delete ($job_id)
on_job_delete_not_found ($job_id)
ignore ($tube_name)

Possible events:

on_tube_ignore ($tube, $count)
on_tube_ignore_fail ($tube)
kick_job ($job_id)

Possible events:

on_job_kick ($job_id)
on_job_kick_not_found ($job_id)
kick ($max)

Possible events:

on_tube_kick ($tube, $count)
list_tubes ()

Possible events:

on_list_tubes (@tubes)
list_tubes_watched ()

Possible events:

on_list_tubes_watched (@tubes)
list_tube_used ()

Possible events:

on_list_use ($tube)
pause_tube ($tube_name, $delay)

Possible events:

on_tube_pause ($tube)
on_tube_pause_not_found ($tube)
peek ($job_id)

Possible events:

on_job_peek ($job_id, $data)
on_job_peek_not_found ($job_id)
peek_buried ()

Possible events:

on_job_peek_bury ($job_id, $data)
on_job_peek_bury_not_found ($job_id)
peek_delayed ()

Possible events:

on_job_peek_delay ($job_id, $data)
on_job_peek_delay_not_found ($job_id)
peek_ready ()

Possible events:

on_job_peek_ready ($job_id, $data)
on_job_peek_ready_not_found ($job_id)
quit ()

Possible events:

on_disconnect
release ($job_id, $priority, $delay)

Possible events:

on_job_release ($job_id)
on_job_release_fail ($job_id)
on_job_release_not_found ($job_id)
stats ()

Possible events:

on_server_stats (%stats)
stats_job ($job_id)

Possible events:

on_job_stats (%stats)
on_job_stats_not_found ($job_id)
stats_tube ($tube_name)

Possible events:

on_tube_stats (%stats)
on_tube_stats_not_found ($tube_name)
touch ($job_id)

Possible events:

on_job_touch ($job_id)
on_job_touch_not_found ($job_id)
use ($tube_name)

Possible events:

on_tube_use ($tube_name)
watch ($tube_name)

Possible events:

on_tube_watch ($tube_name, $count)

OTHER METHODS

reserve_pending () => @commands

Returns a all the entries in the command stack which refer to a reserve or reserve-with-timeout command.

disconnect () => $future

An alias for quit.

sync () => $future

Returns a Future which completes when all pending commands have been responded to.

watch_only (@tubes) => $future

Send a list-tubes-watched command and based on its result send a series of watch and then ignore commands so that the tubes being watched for this client exactly matches @tubes.

INTERNAL METHODS

_assert_state($response_word) => VOID

Raises an exception of the word received from the server is not something expected in response to the command which has most recently been sent.

error_command ($exception, $message, @args) => $future

Calls invoke_error on the client object with $message and @args then removes the current command from the command stack and fails the Future associated with it with $exception and @args.

The Future returned is the same as that returned when initiating a command and can be safely ignored.

This is used by Net::Async::Beanstalk::Receive to indicate serious communication or internal errors and should never happen. It is called in response to any of these:

Invalid response from server
BAD_FORMAT
EXPECTED_CRLF
INTERNAL_ERROR
JOB_TOO_BIG
OUT_OF_MEMORY
UNKNOWN_COMMAND
fail_command($event, $exception, @args) => $future

Attempts to invoke the $event event if there is a handler for it with @args then removes the current command from the command stack and fails the Future associated with it with $exception and @args.

The Future returned is the same as that returned when initiating a command and can be safely ignored.

This is used by Net::Async::Beanstalk::Receive when the client received an expected response which nevertheless indicates an error of some kind, such as DEADLINE_SOON received in response to a reserve command.

finish_command($event, @args) => $future

Attempts to invoke the $event event if there is a handler for it with @args then removes the current command from the command stack and completes the Future associated with it with @args.

The Future returned is the same as that returned when initiating a command and can be safely ignored.

This is used by Net::Async::Beanstalk::Receive when the server sent a response to a command which indicates success.

ALTERNATIVE IMPLEMENTATIONS

AnyEvent::Beanstalk

A good module and asynchronous but it uses AnyEvent which ... the less said the better. The core of the protocol is implemented but it does not handle all error conditions. I have attempted to make Net::Async::Beanstalk's API superficially similar to this one.

Beanstalk::Client

Also written by Graham Barr, this module seems to be slightly more functionally complete than its AnyEvent counterpart and has proven itself stable and fast but unfortunately does not operate asynchronously.

AnyEvent::Beanstalk::Worker

Unfortunately also based on AnyEvent which is a shame because it implements what appears to be an interesting FSA using beanstalk queues.

Queue::Beanstalk

Ancient, presumably unsupported and based on an out-dated version of the beanstalk protocol.

SEE ALSO

IO::Async

Future

http://kr.github.com/beanstalkd/

https://raw.githubusercontent.com/kr/beanstalkd/v1.10/doc/protocol.txt

AUTHOR

Matthew King <chohag@jtan.com>