NAME

Couchbase::Client::Async - Asynchronous system for couchbase clients.

DESCRIPTION

This is a module intended for use by other higher level components which interact directly with Perl event frameworks.

libcouchbase allows for pluggable event loops, which drive and tell it about file descriptor events, timeouts, and other such nicetis.

The purpose of this module is to provide a unified library for perl, which will be compatible with POE, AnyEvent, IO::Async and other event loops.

The module is divided into two components:

I/O Events

This part of the module provides an interface for event libraries to tell libcouchbase about events, and conversely, have libcouchbase tell the event library about new file descriptors and events.

Command Results and Events

This part of the module provides a framework in which event libraries can provide interfaces to users, so that they can perform commands on a couchbase cluster, and receive their results asynchronously.

EVENT MANAGEMENT

Event, Timer, and I/O management is the lower level of this module, and constitutes the interface which event loop integrators would need to be most attune to.

At the most basic level, you are required to implement four callbacks. These callbacks are indicated by values passed to their given hash keys in the object's constructor.

cb_update_event

cb_update_event => sub {
    my ($evdata,$action,$flags) = @_;

    if(ref($evdata) ne "Couchbase::Client::Event") {
        bless $evdata, "Couchbase::Client::Event";
    }
    ...
};

This callback is invoked to update and/or modify events. It receives three arguments.

The first is $evdata which is an array, which may be blessed into Couchbase::Client::Async::Event.

The $evdata structure contains the state-'glue' for interaction with the internal (C) event routines and their Perl dispatchers.

The following will mention some useful fields for the $evdata structure. Note this is not an exhaustive listing (see that class' documentation for that) but a more practical guide as to what the fields are for, and how they may be used.

fd

This read-only field contains the numeric file descriptor on which the C library will perform I/O functions. If your event loop supports watching file descriptor numbers directly, you may simply watch this number; otherwise, look at the next field

dupfh

This mutatable field contains an optional dup'd Perl filehandle. Some Perl event loops do not allow for watching a file descriptor directly and demand to be given a 'PerlIO' filehandle (conforming to the IO::Handle interface, or one of the things returned by open).

This filehandle should be a dup'd version of the fd field, the reason being that when the filehandle goes out of scope from perl, the underlying file descriptor will be close()d. Since there is not necessarily a one-to-one correlation between stream lifetimes as they exist in the Perl client, and their lifetimes as they exist in libcouchbase, it is recommended that the file descriptor be dup'd. In that way, close() on the dup'd file descriptor will not affect the file descriptor in the C side.

The filehandle stored in the dupfh field will remain active until the underlying fd is closed or changed.

The first time an event is created, the dupfh field will be undef, and the callback should check for this creation, and if true, create a new one, using the following idiom:

open my $dupfh, ">&", $evdata->fd;
$evdata->dupfh($dupfh);

the dupfh field will persist the next time the cb_update_event callback is invoked.

opaque

This contains opaque data to be passed to our "HaveEvent" package method. You must not modify this object in any way. Failure to comply will likely result in a segfault.

The third argument (we will get to the second argument later) is a bitfield of flags, describing which events to watch for. Flags may be a bitwise-OR'd combination of the following

COUCHBASE_READ_EVENT

Dispatch event when the stream will not block on read()

COUCHBASE_WRITE_EVENT

Dispatch event when the stream will not block on write()

Consequently, it is the responsibility of this callback to ensure that only the I/O events specified in $flags are active, and that all others remain inactive (spuriously delivering write events is a very bad idea).

To make life easier, the $evdata structure has a old_flags field which contains the active events before this callback was invoked. Thus, instead of explicitly disabling all non-listed events, one can do the following:

my $events_to_delete = $evdata->old_flags & (~$flags);

and only handle the events mentioned in $events_to_delete

The old_flags will be set to the current value of $flags once the callback returns.

The second argument ($action) specifies which type of update this is. It can be one of the following:

EVACTION_WATCH

Request that the stream be watched for events

EVACTION_UNWATCH

Remove all watchers on this stream, and possibly do cleanup

EVACTION_SUSPEND

Temporarily disable the watching of events on this stream, but do not forget about which events are active

EVACTION_RESUME

Resume a suspended event

The suspension and resumption of events may be necessary so that libcouchbase only receives events when it is ready for them, without impacting performance by re-selecting all file descriptors.

POE::Kernel has select_resume_read and select_pause_read, for example.

For the EVACTION_WATCH event, the implementation must have the event loop dispatch to a function that will ultimately do something of the following:

Couchbase::Client::Async->HaveEvent($which, $evdata->opaque);

Where the $which argument contains the event which ocurred (either COUCHBASE_READ_EVENT or COUCHBASE_WRITE_EVENT), and the opaque argument being the opaque field passed in (this) callback.

See "HaveEvent" for more details.

cb_update_timer

cb_update_timer => sub {
    my ($evdata,$action,$usecs) = @_;
    my $timer_id = $evdata->pl_data;

    if($action == EVACTION_WATCH) {
        if(defined $timer_id) {
            reschedule_timer($timer_id, $usecs / (1000*1000),
                callback => 'Couchbase::Client::Async::HaveData',
                args => ['Couchbase::Client::Async', 0, $evdata->opaque]);

        } else {
            $timer_id = schedule_timer(
                $usecs / (1000*1000) ...
            );
        }
    } else {
        delete_timer($evdata->pl_data);
    }
};

This callback is invoked to schedule an interval timer. It is passed three arguments:

$evdata

The 'event'. See "cb_update_event" for details.

$action

This is one of EVACTION_WATCH and EVACTION_UNWATCH.

For EVACTION_WATCH, a timer should be scheduled to trigger in the future, for EVACTION_UNWATCH, and active timer should be deleted.

$usecs

How many microseconds from now should the timer be triggered.

These timers should end up calling "HaveEvent" when they expire. The first argument to "HaveEvent" (the flags) is ignored, but it must still be passed the opaque object from $evdata (see "cb_update_event" for details).

It is common for timers to require some kind of internal identifier by which the event loop can allow their cancelling and postponing.

In order to maintain the timer, the $evdata offers a Perl-only writeable pl_data field, which can hold anything you want it to.

Timers should not be affected (or ever receive) EVACTION_SUSPEND or EVACTION_RESUME actions.

cb_waitdone

This is called with no arguments whenever libcouchbase has determined it no longer needs to watch any file descriptors. Normally the "cb_update_timer" will be called with EVACTION_SUSPEND for all active file descriptor watchers.

This can be used to signal other layers that there are no more pending user events to wait for.

cb_error

This is called with two arguments, the first an internal libcouchbase error number, and the second, a string describing the details of the error.

HaveEvent

This is the 'return trip' function which should be called whenever an event is ready. It is a package method and not an object method.

It is called as so:

Couchbase::Client::Async->HaveEvent($flags, $opaque);

The $flags argument is only relevant for I/O events (and not timers). The $opaque argument must be supplied and contains an internal pointer to a private C data structure. RTFSC if you really wish to know what's inside there.

COUCHBASE COMMANDS AND RESULTS

This contains the higher level inteface for issuing commands, and asynchronously awaiting for their results to trickle in.

The asynchronous interface to this is a bit ugly, and is intended to be wrapped according to the style you prefer.

There is only one function with which to issue commands:

request

Issue couchbase command(s).

$async->request(
    PLCBA_CMD_*, REQTYPE_*,
    sub {
        my ($results,$arg) = @_;
        print "i am a result callback.\n";
        printf("I have results for these keys: %s\n",
            join(",", keys %$results));
        printf("My request argument was $arg\n");
    },
    "arg",
    CBTYPE_*,
    [....],
);

Pretty complicated, eh?

It was the only sane way to have a single request function without limiting the featureset or duplicating code an insane amount of times.

The arguments are as follows:

0

The command. This is one of the PLCBA_CMD_* macros, and are present in Couchbase::Client::IDXConst.

1

Request type. This is one of REQTYPE_SINGLE or REQTYPE_MULTI. If the former is specified, then only one set of parameters for the command will be passed; if the latter, then this will be a single command which will operate on a multitude of keys.

2

Callback.

This is the callback which will be invoked when the command receives results. It is called with two arguments. The first argument is a hash reference with the command key(s) as its keys, and Couchbase::Client::Return objects as its values, which contain information about the response and status of the command.

The second argument is a user defined 'arg' defined next.

3

Argument

This is a dummy argument passed to the callback, and can be whatever you want.

4

Callback type.

This is one of CBTYPE_COMPLETION and CBTYPE_INCREMENTAL.

In the case of the former, the callback will only be invoked once, when all the results for all the keys have been gathered. In the case of the latter, the callback will be invoked for each new result received.

Note that the contents of the callback result hash is not automatically reset in between calls, so it is advisable to clear the hash (or delete relevant keys) if CBTYPE_INCREMENTAL is used.

5

Command arguments.

Either a Couchbase::Client::Async::Request object, or an arrayref of such objects; depending on whether REQTYPE_SINGLE or REQTYPE_MULTI was specifed, respectively.

The Couchbase::Client::Async::Request is a simple array, and it is not strictly required to bless into this object. See its documentation for which fields are provided, and which commands they apply to.

It may help to use my Array::Assign module to deal with the fields in the array.

5 POD Errors

The following errors were encountered while parsing the POD:

Around line 377:

Expected text after =item, not a number

Around line 384:

Expected text after =item, not a number

Around line 395:

Expected text after =item, not a number

Around line 401:

Expected text after =item, not a number

Around line 415:

Expected text after =item, not a number