NAME

Riak::Light - Fast and lightweight Perl client for Riak

VERSION

version 0.084

SYNOPSIS

use Riak::Light;

# create a new instance - using pbc only
my $client = Riak::Light->new(
  host => '127.0.0.1',
  port => 8087
);

$client->is_alive() or die "ops, riak is not alive";

# store hashref into bucket 'foo', key 'bar'
# will serializer as 'application/json'
$client->put( foo => bar => { baz => 1024 });

# store text into bucket 'foo', key 'bar' 
$client->put( foo => baz => "sometext", 'text/plain');
$client->put_raw( foo => baz => "sometext");  # does not encode !

# fetch hashref from bucket 'foo', key 'bar'
my $hash = $client->get( foo => 'bar');
my $text = $client->get_raw( foo => 'baz');   # does not decode !

# delete hashref from bucket 'foo', key 'bar'
$client->del(foo => 'bar');

# check if exists (like get but using less bytes in the response)
$client->exists(foo => 'baz') or warn "ops, foo => bar does not exist";

# list keys in stream (callback only)
$client->get_keys(foo => sub{
   my $key = $_[0];

   # you should use another client inside this callback!
   $another_client->del(foo => $key);
});

# perform 2i queries
my $keys    = $client->query_index( $bucket_name => 'index_test_field_bin', 'plop');

# list all 2i indexes and values
my $indexes = $client->get_all_indexes( $bucket_name => $key );

# perform map / reduce operations
my $response = $client->map_reduce('{
    "inputs":"training",
    "query":[{"map":{"language":"javascript",
    "source":"function(riakObject) {
      var val = riakObject.values[0].data.match(/pizza/g);
      return [[riakObject.key, (val ? val.length : 0 )]];
    }"}}]}');  

DESCRIPTION

Riak::Light is a very light (and fast) Perl client for Riak using PBC interface. Support operations like ping, get, exists, put, del, and secondary indexes (so-called 2i) setting and querying.

It is flexible to change the timeout backend for I/O operations and can suppress 'die' in case of error (autodie) using the configuration. There is no auto-reconnect option. It can be very easily wrapped up by modules like Action::Retry to manage flexible retry/reconnect strategies.

ATTRIBUTES

host

Riak ip or hostname. There is no default.

port

Port of the PBC interface. There is no default.

r

R value setting for this client. Default 2.

w

W value setting for this client. Default 2.

dw

DW value setting for this client. Default 2.

autodie

Boolean, if false each operation will return undef in case of error (stored in $@). Default is true.

timeout

Timeout for connection, write and read operations. Default is 0.5 seconds.

in_timeout

Timeout for read operations. Default is timeout value.

out_timeout

Timeout for write operations. Default is timeout value.

tcp_nodelay

Boolean, enable or disable TCP_NODELAY. If True (default), disables Nagle's Algorithm.

See more in: http://docs.basho.com/riak/latest/dev/references/client-implementation/#Nagle-s-Algorithm.

timeout_provider

Can change the backend for timeout. The default value is IO::Socket::INET and there is only support to connection timeout.

IMPORTANT: in case of any timeout error, the socket between this client and the Riak server will be closed. To support I/O timeout you can choose 5 options (or you can set undef to avoid IO Timeout):

  • Riak::Light::Timeout::Alarm

    uses alarm and Time::HiRes to control the I/O timeout. Does not work on Win32. (Not Safe)

  • Riak::Light::Timeout::Time::Out

    uses Time::Out and Time::HiRes to control the I/O timeout. Does not work on Win32. (Not Safe)

  • Riak::Light::Timeout::Select

    uses IO::Select to control the I/O timeout

  • Riak::Light::Timeout::SelectOnWrite

    uses IO::Select to control only Output Operations. Can block in Write Operations. Be Careful.

  • Riak::Light::Timeout::SetSockOpt

    uses setsockopt to set SO_RCVTIMEO and SO_SNDTIMEO socket properties. Does not Work on NetBSD 6.0.

driver

This is a Riak::Light::Driver instance, to be able to connect and perform requests to Riak over PBC interface.

METHODS

is_alive

$client->is_alive() or warn "ops... something is wrong: $@";

Perform a ping operation. Will return false in case of error (will store in $@).

is_alive

try { $client->ping() } catch { "oops... something is wrong: $_" };

Perform a ping operation. Will die in case of error.

get

my $value_or_reference = $client->get(bucket => 'key');

Perform a fetch operation. Expects bucket and key names. Decode the json into a Perl structure. if the content_type is 'application/json'. If you need the raw data you can use get_raw.

get_raw

my $scalar_value = $client->get_raw(bucket => 'key');

Perform a fetch operation. Expects bucket and key names. Return the raw data. If you need decode the json, you should use get instead.

exists

$client->exists(bucket => 'key') or warn "key not found";

Perform a fetch operation but with head => 0, and the if there is something stored in the bucket/key.

get_all_indexes

$client->get_all_indexes(bucket => 'key');

Perform a fetch operation but instead return the content, return a hashref with a mapping between index name and an arrayref with all possible values (or empty arrayref if none). For example one possible return is:

[
    { key => 'index_test_field_bin', value => 'plop' },
    { key => 'index_test_field2_bin', value => 'plop2' }, 
    { key => 'index_test_field2_bin', value => 'plop3' }, 
]

IMPORT: this arrayref is unsortered.

put

$client->put('bucket', 'key', { some_values => [1,2,3] });
$client->put('bucket', 'key', { some_values => [1,2,3] }, 'application/json);
$client->put('bucket', 'key', 'text', 'plain/text');

# you can set secondary indexes (2i)
$client->put( 'bucket', 'key', 'text', 'plain/text',
              { field1_bin => 'abc', field2_int => 42 }
            );
$client->put( 'bucket', 'key', { some_values => [1,2,3] }, undef,
              { field1_bin => 'abc', field2_int => 42 }
            );
# remember that a key can have more than one value in a given index. In this
# case, use ArrayRef:
$client->put( 'bucket', 'key', 'value', undef,
              { field1_bin => [ 'abc', 'def' ] } );

Perform a store operation. Expects bucket and key names, the value, the content type (optional, default is 'application/json'), and the indexes to set for this value (optional, default is none).

Will encode the structure in json string if necessary. If you need only store the raw data you can use put_raw instead.

IMPORTANT: all the index field names should end by either _int or _bin, depending if the index type is integer or binary.

To query secondary indexes, see query_index.

put_raw

$client->put_raw('bucket', 'key', encode_json({ some_values => [1,2,3] }), 'application/json');
$client->put_raw('bucket', 'key', 'text');
$client->put_raw('bucket', 'key', 'text', undef, {field_bin => 'foo'});

Perform a store operation. Expects bucket and key names, the value, the content type (optional, default is 'plain/text'), and the indexes to set for this value (optional, default is none).

Will encode the raw data. If you need encode the structure you can use put instead.

IMPORTANT: all the index field names should end by either _int or _bin, depending if the index type is integer or binary.

To query secondary indexes, see query_index.

del

$client->del(bucket => key);

Perform a delete operation. Expects bucket and key names.

get_keys

$client->get_keys(foo => sub{
   my $key = $_[0];

   # you should use another client inside this callback!
   $another_client->del(foo => $key);
});

Perform a list keys operation. Receive a callback and will call it for each key. You can't use this callback to perform other operations!

The callback is optional, in which case an ArrayRef of all the keys are returned. But you should always provide a callback, to avoid your RAM usage to skyrocket...

query_index

Perform a secondary index query. Expects a bucket name, the index field name, and the index value you're searching on. Returns and ArrayRef of matching keys.

The index value you're searching on can be of two types. If it's a scalar, an exact match query will be performed. if the value is an ArrayRef, then a range query will be performed, the first element in the array will be the range_min, the second element the range_max. other elements will be ignored.

Based on the example in put, here is how to query it:

# exact match
my $matching_keys = $client->query_index( 'bucket',  'field2_int', 42 );

# range match
my $matching_keys = $client->query_index( 'bucket',  'field2_int', [ 40, 50] );

# with pagination
my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket',  'field2_int', 42, { max_results => 100 });

to fetch the next 100 keys

my ($matching_keys, $continuation, $done) = $client->query_index( 'bucket',  'field2_int', 42, { 
  max_results => 100,
  continuation => $continuation
 });

to fetch only the first 100 keys you can do this

my $matching_keys = $client->query_index( 'bucket',  'field2_int', [ 40, 50], { max_results => 100 });

query_index_loop

Instead using a normal loop around query_index to query 2i with pagination, like this:

do {
    ($matching_keys, $continuation) = $client->query_index( 'bucket',  'field2_int', 42, { 
    max_results => 100,
    continuation => $continuation
   });
   push @keys, @{$matching_keys};
} while(defined $continuation);

you can simply use query_index_loop helper method

my $matching_keys = $client->query_index_loop( 'bucket',  'field2_int', [ 40, 50], { max_results => 1024 });

if you omit the max_results, the default value is 100

map_reduce

This is an alias for map_reduce_raw with content-type 'application/json'

map_reduce_raw

Performa a map/reduce operation. You can use content-type 'application/json' or 'application/x-erlang-binary' Accept callback.

Example:

my $map_reduce_json = '{
  "inputs":"training",
  "query":[{"map":{"language":"javascript",
  "source":"function(riakObject) {
    var val = riakObject.values[0].data.match(/pizza/g);
    return [[riakObject.key, (val ? val.length : 0 )]];
  }"}}]}';
  
my $response = $client->map_reduce_raw($map_reduce_json, 'application/json');

will return something like

[
  {'response' => [['foo',1]],'phase' => 0},
  {'response' => [['bam',3]],'phase' => 0},
  {'response' => [['bar',4]],'phase' => 0},
  {'response' => [['baz',0]],'phase' => 0}
]    

a hashref with response (decoded if json) and phase value. you can also pass a callback

$client->map_reduce( $map_reduce_json , sub { 
    my ($response, $phase) = @_;
    
    # process the response
  });

this callback will be called 4 times, with this response (decoded from json)

[['foo', 1]]
[['bam', 3]]
[['bar', 4]]
[['baz', 0]]

using map_reduce method, you can also use a hashref as a map reduce query:

my $json_hash = {
    inputs => "training",
    query => [{
      map => {
        language =>"javascript",
        source =>"function(riakObject) {
          var val = riakObject.values[0].data.match(/pizza/g);
          return [[riakObject.key, (val ? val.length : 0 )]];
        }"
      }
    }]
  };

$client->map_reduce($json_hash, sub { ... });

map_reduce encode/decode to json format. If you need control with the format (like to use with erlang), you should use map_reduce_raw.

you can use erlang functions but using the json format (see this example ).

{"inputs":"messages","query":[{"map":{"language":"erlang","module":"mr_example","function":"get_keys"}}]}

More information:

protocol buffers mapreduce api

mapreduce basics

advanced mapreduce

SEE ALSO

Net::Riak

Data::Riak

Data::Riak::Fast

Action::Retry

AUTHORS

  • Tiago Peczenyj <tiago.peczenyj@gmail.com>

  • Damien Krotkine <dams@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2013 by Weborama.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.