NAME
Couchbase::Client::Async - Asynchronous system for couchbase clients.
DESCRIPTION
This is a module intended for use by other higher level components which interact directly with Perl event frameworks.
libcouchbase allows for pluggable event loops, which drive and tell it about file descriptor events, timeouts, and other such nicetis.
The purpose of this module is to provide a unified library for perl, which will be compatible with POE, AnyEvent, IO::Async and other event loops.
The module is divided into two components:
I/O Events
This part of the module provides an interface for event libraries to tell libcouchbase about events, and conversely, have libcouchbase tell the event library about new file descriptors and events.
Command Results and Events
This part of the module provides a framework in which event libraries can provide interfaces to users, so that they can perform commands on a couchbase cluster, and receive their results asynchronously.
EVENT MANAGEMENT
Event, Timer, and I/O management is the lower level of this module, and constitutes the interface which event loop integrators would need to be most attune to.
At the most basic level, you are required to implement four callbacks. These callbacks are indicated by values passed to their given hash keys in the object's constructor.
cb_update_event
cb_update_event => sub {
my ($evdata,$action,$flags) = @_;
if(ref($evdata) ne "Couchbase::Client::Event") {
bless $evdata, "Couchbase::Client::Event";
}
...
};
This callback is invoked to update and/or modify events. It receives three arguments.
The first is $evdata
which is an array, which may be blessed into Couchbase::Client::Async::Event.
The $evdata
structure contains the state-'glue' for interaction with the internal (C) event routines and their Perl dispatchers.
The following will mention some useful fields for the $evdata
structure. Note this is not an exhaustive listing (see that class' documentation for that) but a more practical guide as to what the fields are for, and how they may be used.
fd
-
This read-only field contains the numeric file descriptor on which the C library will perform I/O functions. If your event loop supports watching file descriptor numbers directly, you may simply watch this number; otherwise, look at the next field
dupfh
-
This mutatable field contains an optional dup'd Perl filehandle. Some Perl event loops do not allow for watching a file descriptor directly and demand to be given a 'PerlIO' filehandle (conforming to the IO::Handle interface, or one of the things returned by open).
This filehandle should be a dup'd version of the
fd
field, the reason being that when the filehandle goes out of scope from perl, the underlying file descriptor will beclose()
d. Since there is not necessarily a one-to-one correlation between stream lifetimes as they exist in the Perl client, and their lifetimes as they exist in libcouchbase, it is recommended that the file descriptor be dup'd. In that way, close() on the dup'd file descriptor will not affect the file descriptor in the C side.The filehandle stored in the
dupfh
field will remain active until the underlyingfd
is closed or changed.The first time an event is created, the
dupfh
field will be undef, and the callback should check for this creation, and if true, create a new one, using the following idiom:open my $dupfh, ">&", $evdata->fd; $evdata->dupfh($dupfh);
the
dupfh
field will persist the next time thecb_update_event
callback is invoked. - opaque
-
This contains opaque data to be passed to our "HaveEvent" package method. You must not modify this object in any way. Failure to comply will likely result in a segfault.
The third argument (we will get to the second argument later) is a bitfield of flags, describing which events to watch for. Flags may be a bitwise-OR'd combination of the following
COUCHBASE_READ_EVENT
-
Dispatch event when the stream will not block on read()
COUCHBASE_WRITE_EVENT
-
Dispatch event when the stream will not block on write()
Consequently, it is the responsibility of this callback to ensure that only the I/O events specified in $flags
are active, and that all others remain inactive (spuriously delivering write events is a very bad idea).
To make life easier, the $evdata
structure has a old_flags
field which contains the active events before this callback was invoked. Thus, instead of explicitly disabling all non-listed events, one can do the following:
my $events_to_delete = $evdata->old_flags & (~$flags);
and only handle the events mentioned in $events_to_delete
The old_flags
will be set to the current value of $flags
once the callback returns.
The second argument ($action
) specifies which type of update this is. It can be one of the following:
EVACTION_WATCH
-
Request that the stream be watched for events
EVACTION_UNWATCH
-
Remove all watchers on this stream, and possibly do cleanup
EVACTION_SUSPEND
-
Temporarily disable the watching of events on this stream, but do not forget about which events are active
EVACTION_RESUME
-
Resume a suspended event
The suspension and resumption of events may be necessary so that libcouchbase only receives events when it is ready for them, without impacting performance by re-selecting all file descriptors.
POE::Kernel has select_resume_read
and select_pause_read
, for example.
For the EVACTION_WATCH
event, the implementation must have the event loop dispatch to a function that will ultimately do something of the following:
Couchbase::Client::Async->HaveEvent($which, $evdata->opaque);
Where the $which
argument contains the event which ocurred (either COUCHBASE_READ_EVENT
or COUCHBASE_WRITE_EVENT
), and the opaque
argument being the opaque
field passed in (this) callback.
See "HaveEvent" for more details.
cb_update_timer
cb_update_timer => sub {
my ($evdata,$action,$usecs) = @_;
my $timer_id = $evdata->pl_data;
if($action == EVACTION_WATCH) {
if(defined $timer_id) {
reschedule_timer($timer_id, $usecs / (1000*1000),
callback => 'Couchbase::Client::Async::HaveData',
args => ['Couchbase::Client::Async', 0, $evdata->opaque]);
} else {
$timer_id = schedule_timer(
$usecs / (1000*1000) ...
);
}
} else {
delete_timer($evdata->pl_data);
}
};
This callback is invoked to schedule an interval timer. It is passed three arguments:
$evdata
-
The 'event'. See "cb_update_event" for details.
$action
-
This is one of
EVACTION_WATCH
andEVACTION_UNWATCH
.For
EVACTION_WATCH
, a timer should be scheduled to trigger in the future, forEVACTION_UNWATCH
, and active timer should be deleted. $usecs
-
How many microseconds from now should the timer be triggered.
These timers should end up calling "HaveEvent" when they expire. The first argument to "HaveEvent" (the flags) is ignored, but it must still be passed the opaque
object from $evdata
(see "cb_update_event" for details).
It is common for timers to require some kind of internal identifier by which the event loop can allow their cancelling and postponing.
In order to maintain the timer, the $evdata
offers a Perl-only writeable pl_data
field, which can hold anything you want it to.
Timers should not be affected (or ever receive) EVACTION_SUSPEND
or EVACTION_RESUME
actions.
cb_waitdone
This is called with no arguments whenever libcouchbase has determined it no longer needs to watch any file descriptors. Normally the "cb_update_timer" will be called with EVACTION_SUSPEND
for all active file descriptor watchers.
This can be used to signal other layers that there are no more pending user events to wait for.
cb_error
This is called with two arguments, the first an internal libcouchbase error number, and the second, a string describing the details of the error.
HaveEvent
This is the 'return trip' function which should be called whenever an event is ready. It is a package method and not an object method.
It is called as so:
Couchbase::Client::Async->HaveEvent($flags, $opaque);
The $flags
argument is only relevant for I/O events (and not timers). The $opaque
argument must be supplied and contains an internal pointer to a private C data structure. RTFSC if you really wish to know what's inside there.
COUCHBASE COMMANDS AND RESULTS
command($type, $cmdargs, $cbparams)
Issue a command asynchronously.
The type is one of the PLCBA_CMD_*
constants:
- PLCBA_CMD_GET
- PLCBA_CMD_TOUCH
- PLCBA_CMD_LOCK
- PLCBA_CMD_REPLACE
- PLCBA_CMD_ADD
- PLCBA_CMD_APPEND
- PLCBA_CMD_PREPEND
- PLCBA_CMD_SET
- PLCBA_CMD_ARITHMETIC
- PLCBA_CMD_INCR
- PLCBA_CMD_DECR
- PLCBA_CMD_REMOVE
- PLCBA_CMD_UNLOCK
The constants hopefully should be self-explanatory.
The second parameter, the $cmdargs
, is an array reference of arguments to pass to the commands. These follow the same semantics as in the *_multi
variants (in fact, internally they mostly follow the same code path).
Finally, the $cbparams
parameter is a hashref containing the following keys:
callback
-
Mandatory. A CODE reference or name of a function to be invoked when the response arrives.
data
-
Optional. A scalar which is passed to the callback as its second argument
type
-
The type, or 'mode' of callback to use. Options are
CBTYPE_INCREMENTAL
andCBTYPE_COMPLETION
. TheINCREMENTAL
type is useful if performing a large get operation, where you wish to perform an action on each key as it arrives.Otherwise, the default (
CBTYPE_COMPLETION
) will only be invoked once the entire set of commands have been completed.
The callback is always invoked with two arguments; the first is a hashref of keys and Couchbase::Client::Return object values; the second is the data
parameter (if passed).
A callback may thus look like this
$async->command(PLCBA_CMD_GET, [ "key" ], {
data => "hello there",
callback => sub {
my ($results, $data) = @_;
my $single_result = $results->{key};
$single_result->value;
$single_result->is_ok; #etc..
print "$data\n"; # hello there
});
There are several convenience functions available as well: