NAME

AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database

VERSION

version 1.0.2.0

SYNOPSIS

use EV;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::InfluxDB;
use Monitoring::Plugin::Performance;

my $db = AnyEvent::InfluxDB->new(
    server => 'http://localhost:8086',
    username => 'admin',
    password => 'password',
);

my $hdl;
tcp_server undef, 8888, sub {
    my ($fh, $host, $port) = @_;

    $hdl = AnyEvent::Handle->new(
        fh => $fh,
    );

    $hdl->push_read(
        line => sub {
            my (undef, $line) = @_;

            # Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448
            my ($measurement, $perfstring) = split(/\t/, $line);

            my @perfdata
                = Monitoring::Plugin::Performance->parse_perfstring($perfstring);

            $db->write(
                database => 'mydb',
                data => [
                    map {
                        +{
                            measurement => $measurement,
                            tags => {
                                label => $_->label,
                            },
                            fields => {
                                value => $_->value,
                                uom => '"'. $_->uom .'"',
                            },
                        }
                    } @perfdata
                ],
                on_success => sub { print "$line written\n"; },
                on_error => sub { print "$line error: @_\n"; },
            );

            $hdl->on_drain(
                sub {
                    $hdl->fh->close;
                    undef $hdl;
                }
            );
        },
    );
};

EV::run;

DESCRIPTION

Asynchronous client library for InfluxDB time-series database https://influxdb.com.

This version is meant to be used with InfluxDB v1.0.0 or newer.

METHODS

new

my $db = AnyEvent::InfluxDB->new(
    server => 'http://localhost:8086',

    # authenticate using Basic credentials
    username => 'admin',
    password => 'password',

    # or use JWT token
    jwt => 'JWT_TOKEN_BLOB'
);

Returns object representing given InfluDB server connected using optionally provided username username and password password.

Default value of server is http://localhost:8086.

If the server protocol is https then by default no validation of remote host certificate is performed. This can be changed by setting ssl_options parameter with any options accepted by AnyEvent::TLS.

my $db = AnyEvent::InfluxDB->new(
    ...
    ssl_options => {
        verify => 1,
        verify_peername => 'https',
        ca_file => '/path/to/cacert.pem',
    }
);

As an debugging aid the on_request code reference may also be provided. It will be executed before each request with the method name, url and POST data if set.

my $db = AnyEvent::InfluxDB->new(
    ...
    on_request => sub {
        my ($method, $url, $post_data) = @_;
        print "$method $url\n";
        print "$post_data\n" if $post_data;
    }
);

ping

$cv = AE::cv;
$db->ping(
    wait_for_leader => 2,

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to ping cluster leader: @_");
    }
);
my $version = $cv->recv;

Checks the leader of the cluster to ensure that the leader is available and ready. The optional parameter wait_for_leader specifies the number of seconds to wait before returning a response.

The required on_success code reference is executed if request was successful with the value of X-Influxdb-Version response header as argument, otherwise executes the required on_error code reference with the value of Reason response header as argument.

Managing Data

write

$cv = AE::cv;
$db->write(
    database => 'mydb',
    precision => 's',
    rp => 'last_day',
    consistency => 'quorum',

    data => [
        # line protocol formatted
        'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1456097956',

        # or as a hash
        {
            measurement => 'cpu_load',
            tags => {
                host => 'server02',
                region => 'eu-east',
            },
            fields => {
                value => '0.64',
                sensor => q{"top"},
            },
            time => time()
        }
    ],

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to write data: @_");
    }
);
$cv->recv;

Writes time-series data data to database database with optional parameters: retention policy rp, time precision precision and consistency consistency.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

The data can be specified as single scalar value or hash reference with required keys measurement and fields and optional tags and time. Both can be also mixed and matched within an array reference.

Scalar values are expected to be formatted using InfluxDB line protocol.

All special characters need to be escaped. In that case you might want to use InfluxDB::LineProtocol:

use InfluxDB::LineProtocol qw(dataline);

...
$db->write(
    database => 'mydb',
    precision => 'n',

    data => [
        dataline('CPU Load', 0.64, { "Region of the World" => "Eastern Europe", codename => "eu-east" }, 1437868012260500137)

        # which translates to
        'CPU\ Load,Region\ of\ the\ World=Eastern\ Europe,codename=eu-east value=0.64 1437868012260500137',
    ],
    ...
);

Querying Data

select

$cv = AE::cv;
$db->select(
    database => 'mydb',

    # return time in Unix epoch format
    epoch => "s",

    # raw query
    q => "SELECT count(value) FROM cpu_load"
        ." WHERE region = 'eu-east' AND time > now() - 14d"
        ." GROUP BY time(1d) fill(none)"
        ." ORDER BY time DESC"
        ." LIMIT 10 OFFSET 3",

    # or query created from arguments
    fields => 'count(value)',
    measurement => 'cpu_load',
    where => "region = 'eu-east' AND time > now() - 14d",

    group_by => 'time(1d)',
    fill => 'none',

    order_by => 'time DESC',

    limit => 10,
    offset => 3,

    # downsample result to another database, retention policy and measurement
    into => 'otherdb."default".cpu_load_per5m',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to select data: @_");
    }
);
my $results = $cv->recv;
for my $row ( @{ $results } ) {
    print "Measurement: $row->{name}\n";
    print "Values:\n";
    for my $value ( @{ $row->{values} || [] } ) {
        print " * $_ = $value->{$_}\n" for keys %{ $value || {} };
    }
}

Executes an select query on database database created from provided arguments measurement measurement, fields to select fields, optional where clause, grouped by group_by and empty values filled with fill, ordered by order_by with number of results limited to limit with offset offset. To limit number of returned series use slimit with offset soffset. If into parameter is provided the result of the query will be copied to specified measurement. If epoch is provided the returned time value will in Unix epoch format. Optional chunk_size can be provided to override the default value of 10,000 datapoints.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Database Management

create_database

$cv = AE::cv;
$db->create_database(
    # raw query
    q => "CREATE DATABASE mydb WITH DURATION 7d REPLICATION 3 SHARD DURATION 30m NAME oneweek",

    # or query created from arguments
    database => "mydb",

    # retention policy parameters
    duration => '7d',
    shard_duration => '30m',
    replication => 3,
    name => 'oneweek',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create database: @_");
    }
);
$cv->recv;

Creates database specified by database argument.

If one of retention policy parameters is specified then the database will be created with that retention policy as default - see "Retention Policy Management" for more details.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_database

$cv = AE::cv;
$db->drop_database(
    # raw query
    q => "DROP DATABASE mydb",

    # or query created from arguments
    database => "mydb",

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop database: @_");
    }
);
$cv->recv;

Drops database specified by database argument.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_series

$cv = AE::cv;
$db->drop_series(
    database => 'mydb',

    # raw query
    q => "DROP SERIES FROM cpu_load WHERE host = 'server02'",

    # or query created from arguments
    measurement => 'cpu_load',
    where => q{host = 'server02'},

    # multiple measurements can also be specified
    measurements => [qw( cpu_load cpu_temp )],

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop measurement: @_");
    }
);
$cv->recv;

Drops series from single measurement measurement (or many using measurements) and/or filtered by where clause from database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

delete_series

$cv = AE::cv;
$db->delete_series(
    database => 'mydb',

    # raw query
    q => "DELETE FROM cpu_load WHERE host = 'server02' AND time < '2016-01-01'",

    # or query created from arguments
    measurement => 'cpu_load',
    where => q{host = 'server02' AND time < '2016-01-01'},

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop measurement: @_");
    }
);
$cv->recv;

Deletes all points from a measurement measurement and/or filtered by where clause from database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_measurement

$cv = AE::cv;
$db->drop_measurement(
    database => 'mydb',

    # raw query
    q => "DROP MEASUREMENT cpu_load",

    # or query created from arguments
    measurement => 'cpu_load',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop measurement: @_");
    }
);
$cv->recv;

Drops measurement measurement.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_shards

$cv = AE::cv;
$db->show_shards(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list shards: @_");
    }
);
my $shards = $cv->recv;
for my $database ( sort keys %{ $shards } ) {
    print "Database: $database\n";
    for my $s ( @{ $shards->{$database} } ) {
        print " * $_: $s->{$_}\n" for sort keys %{ $s };
    }
}

Returns a hash reference with database name as keys and their shards as values.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_shard_groups

$cv = AE::cv;
$db->show_shard_groups(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list shard groups: @_");
    }
);
my @shard_groups = $cv->recv;
for my $sg ( @shard_groups ) {
    print "ID: $sg->{id}\n";
    print "Database: $sg->{database}\n";
    print "Retention Policy: $sg->{retention_policy}\n";
    print "Start Time: $sg->{start_time}\n";
    print "End Time: $sg->{end_time}\n";
    print "Expiry Time: $sg->{expiry_time}\n";
}

Returns a list of hash references with keys id, database, retention_policy, start_time, end_time and expiry_time for each shard groups.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_shard

$cv = AE::cv;
$db->drop_shard(
    database => 'mydb',

    # raw query
    q => "DROP SHARD 1",

    # or query created from arguments
    id => 1,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop measurement: @_");
    }
);
$cv->recv;

Drops shard identified by id number id.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_queries

$cv = AE::cv;
$db->show_queries(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list shard groups: @_");
    }
);
my @queries = $cv->recv;
for my $q ( @queries ) {
    print "ID: $q->{qid}\n";
    print "Query: $q->{query}\n";
    print "Database: $q->{database}\n";
    print "Duration: $q->{duration}\n";
}

Returns a list of hash references with keys qid, query, database, duration for all currently running queries.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

kill_query

$cv = AE::cv;
$db->kill_query(
    id => 36,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to kill query: @_");
    }
);
$cv->recv;

Stops a running query identified by id number id.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Retention Policy Management

create_retention_policy

$cv = AE::cv;
$db->create_retention_policy(
    # raw query
    q => "CREATE RETENTION POLICY last_day ON mydb DURATION 1d REPLICATION 1",

    # or query created from arguments
    name => 'last_day',
    database => 'mydb',
    duration => '1d',
    shard_duration => '168h',
    replication => 1,
    default => 0,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create retention policy: @_");
    }
);
$cv->recv;

Creates new retention policy named by name on database database with duration duration, shard group duration shard_duration and replication factor replication. If default is provided and true the created retention policy becomes the default one.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

alter_retention_policy

$cv = AE::cv;
$db->alter_retention_policy(
    # raw query
    q => "ALTER RETENTION POLICY last_day ON mydb DURATION 1d REPLICATION 1 DEFAULT",

    # or query created from arguments
    name => 'last_day',
    database => 'mydb',

    duration => '1d',
    shard_duration => '12h',
    replication => 1,
    default => 1,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to alter retention policy: @_");
    }
);
$cv->recv;

Modifies retention policy named by name on database database. At least one of duration duration, replication factor replication or flag default must be set.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_retention_policy

$cv = AE::cv;
$db->drop_retention_policy(
    # raw query
    q => "DROP RETENTION POLICY last_day ON mydb",

    # or query created from arguments
    name => "last_day",
    database => "mydb",

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop retention policy: @_");
    }
);
$cv->recv;

Drops specified by name retention policy on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Schema Exploration

show_databases

$cv = AE::cv;
$db->show_databases(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list databases: @_");
    }
);
my @db_names = $cv->recv;
print "$_\n" for @db_names;

Returns list of known database names.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_retention_policies

$cv = AE::cv;
$db->show_retention_policies(
    # raw query
    q => "SHOW RETENTION POLICIES ON mydb",

    # or query created from arguments
    database => 'mydb',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list retention policies: @_");
    }
);
my @retention_policies = $cv->recv;
for my $rp ( @retention_policies ) {
    print "Name: $rp->{name}\n";
    print "Duration: $rp->{duration}\n";
    print "Shard group duration: $rp->{shardGroupDuration}\n";
    print "Replication factor: $rp->{replicaN}\n";
    print "Default?: $rp->{default}\n";
}

Returns a list of hash references with keys name, duration, shardGroupDuration, replicaN and default for each replication policy defined on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_series

$cv = AE::cv;
$db->show_series(
    database => 'mydb',

    # raw query
    q => "SHOW SERIES FROM cpu_load"
        ." WHERE host = 'server02'"
        ." ORDER BY region"
        ." LIMIT 10 OFFSET 3",

    # or query created from arguments
    measurement => 'cpu_load',
    where => q{host = 'server02'},

    order_by => 'region',

    limit => 10,
    offset => 3,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list series: @_");
    }
);
my @series = $cv->recv;
print "$_\n" for @series;

Returns names of series from database database using optional measurement measurement and optional where clause.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_measurements

$cv = AE::cv;
$db->show_measurements(
    database => 'mydb',

    # raw query
    q => "SHOW MEASUREMENTS WITH MEASUREMENT =~ /cpu_load.*/"
        ." WHERE host = 'server02'"
        ." ORDER BY region"
        ." LIMIT 10 OFFSET 3",

    # or query created from arguments
    measurement => '/cpu_load.*/',
    where => q{host = 'server02'},

    order_by => 'region',

    limit => 10,
    offset => 3,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list measurements: @_");
    }
);
my @measurements = $cv->recv;
print "$_\n" for @measurements;

Returns names of measurements from database database, optionally filtered with regular expression measurement and optional where clause. If the measurement is not enclosed in // then it will be treated as name of the measurement.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_tag_keys

$cv = AE::cv;
$db->show_tag_keys(
    database => 'mydb',

    # raw query
    q => "SHOW TAG KEYS FROM cpu_load WHERE host = 'server02' LIMIT 10 OFFSET 3",

    # or query created from arguments
    measurement => 'cpu_load',
    where => q{host = 'server02'},

    limit => 10,
    offset => 3,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list tag keys: @_");
    }
);
my $tag_keys = $cv->recv;
for my $measurement ( sort keys %{ $tag_keys } ) {
    print "Measurement: $measurement\n";
    print " * $_\n" for @{ $tag_keys->{$measurement} };
}

Returns a hash reference with measurements as keys and their unique tag keys as values from database database and optional measurement measurement, optionally filtered by the where clause, grouped by group_by with number of results limited to limit with offset offset.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_tag_values

$cv = AE::cv;
$db->show_tag_values(
    database => 'mydb',

    # raw query
    q => q{SHOW TAG VALUES FROM cpu_load WITH KEY = "host"},

    # or query created from arguments
    measurement => 'cpu_load',

    # single key
    key => q{"host"},
    # or a list of keys
    keys => [qw( "host" "region" )],

    limit => 10,
    offset => 3,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list tag values: @_");
    }
);
my $tag_values = $cv->recv;
for my $measurement ( sort keys %{ $tag_values } ) {
    print "Measurement: $measurement\n";
    for my $tag_key ( sort keys %{ $tag_values->{$measurement} } ) {
        print "  Tag key: $tag_key\n";
        print "   * $_\n" for @{ $tag_values->{$measurement}->{$tag_key} };
    }
}

Returns a hash reference with measurements as keys and their unique tag values as values from database database and optional measurement measurement from a single tag key key or a list of tag keys keys with number of results limited to limit with offset offset.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_field_keys

$cv = AE::cv;
$db->show_field_keys(
    database => 'mydb',

    # raw query
    q => "SHOW FIELD KEYS FROM cpu_load",

    # or query created from arguments
    measurement => 'cpu_load',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list field keys: @_");
    }
);
my $field_keys = $cv->recv;
for my $measurement ( sort keys %{ $field_keys } ) {
    print "Measurement: $measurement\n";
    for my $field ( @{ $field_keys->{$measurement} } ) {
        print "  Key:  $field->{key}\n";
        print "  Type: $field->{type}\n";
    }
}

Returns a hash reference with measurements as keys and their field keys names and type as values from database database and optional measurement measurement.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

User Management

create_user

$cv = AE::cv;
$db->create_user(
    # raw query
    q => "CREATE USER jdoe WITH PASSWORD 'mypassword' WITH ALL PRIVILEGES",

    # or query created from arguments
    username => 'jdoe',
    password => 'mypassword',
    all_privileges => 1,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create user: @_");
    }
);
$cv->recv;

Creates user with username and password. If flag all_privileges is set to true created user will be granted cluster administration privileges.

Note: password will be automatically enclosed in single quotes.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

set_user_password

$cv = AE::cv;
$db->set_user_password(
    # raw query
    q => "SET PASSWORD FOR jdoe = 'otherpassword'",

    # or query created from arguments
    username => 'jdoe',
    password => 'otherpassword',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to set password: @_");
    }
);
$cv->recv;

Sets password to password for the user identified by username.

Note: password will be automatically enclosed in single quotes.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_users

$cv = AE::cv;
$db->show_users(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list users: @_");
    }
);
my @users = $cv->recv;
for my $u ( @users ) {
    print "Name: $u->{user}\n";
    print "Admin?: $u->{admin}\n";
}

Returns a list of hash references with keys user and admin for each defined user.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

grant_privileges

$cv = AE::cv;
$db->grant_privileges(
    # raw query
    q => "GRANT ALL ON mydb TO jdoe",

    # or query created from arguments
    username => 'jdoe',

    # privileges at single database
    database => 'mydb',
    access => 'ALL',

    # or to grant cluster administration privileges
    all_privileges => 1,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to grant privileges: @_");
    }
);
$cv->recv;

Grants to user username access access on database database. If flag all_privileges is set it grants cluster administration privileges instead.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_grants

$cv = AE::cv;
$db->show_grants(
    # raw query
    q => "SHOW GRANTS FOR jdoe",

    # or query created from arguments
    username => 'jdoe',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list users: @_");
    }
);
my @grants = $cv->recv;
for my $g ( @grants ) {
    print "Database: $g->{database}\n";
    print "Privilege: $g->{privilege}\n";
}

Returns a list of hash references with keys database and privilege describing the privileges granted for database to given user.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

revoke_privileges

$cv = AE::cv;
$db->revoke_privileges(
    # raw query
    q => "REVOKE WRITE ON mydb FROM jdoe",

    # or query created from arguments
    username => 'jdoe',

    # privileges at single database
    database => 'mydb',
    access => 'WRITE',

    # or to revoke cluster administration privileges
    all_privileges => 1,

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to revoke privileges: @_");
    }
);
$cv->recv;

Revokes from user username access access on database database. If flag all_privileges is set it revokes cluster administration privileges instead.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_user

$cv = AE::cv;
$db->drop_user(
    # raw query
    q => "DROP USER jdoe",

    # or query created from arguments
    username => 'jdoe',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop user: @_");
    }
);
$cv->recv;

Drops user username.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Continuous Queries

create_continuous_query

$cv = AE::cv;
$db->create_continuous_query(
    # raw query
    q => 'CREATE CONTINUOUS QUERY per5minutes ON mydb'
        .' RESAMPLE EVERY 10s FOR 10m'
        .' BEGIN'
        .' SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)'
        .' END',

    # or query created from arguments
    database => 'mydb',
    name => 'per5minutes',
    every => '10s',
    for => '2m',
    query => 'SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create continuous query: @_");
    }
);
$cv->recv;

Creates new continuous query named by name on database database using query query. Optional every and for define the resampling times.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_continuous_query

$cv = AE::cv;
$db->drop_continuous_query(
    # raw query
    q => 'DROP CONTINUOUS QUERY per5minutes ON mydb',

    # or query created from arguments
    database => 'mydb',
    name => 'per5minutes',

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop continuous query: @_");
    }
);
$cv->recv;

Drops continuous query named by name on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_continuous_queries

$cv = AE::cv;
$db->show_continuous_queries(
    database => 'mydb',

    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list continuous queries: @_");
    }
);
my $continuous_queries = $cv->recv;
for my $database ( sort keys %{ $continuous_queries } ) {
    print "Database: $database\n";
    for my $s ( @{ $continuous_queries->{$database} } ) {
        print " Name: $s->{name}\n";
        print " Query: $s->{query}\n";
    }
}

Returns a list of hash references with keys name and query for each continuous query defined on database database.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Kapacitor integration

Subscriptions tell InfluxDB to send all the data it receives to Kapacitor.

create_subscription

$cv = AE::cv;
$db->create_subscription(
    # raw query
    q => 'CREATE SUBSCRIPTION alldata ON "mydb"."default"'
        ." DESTINATIONS ANY 'udp://h1.example.com:9090', 'udp://h2.example.com:9090'",

    # or query created from arguments
    name => q{alldata},
    database => q{"mydb"},
    rp => q{"default"},
    mode => "ANY",
    destinations => [
        q{'udp://h1.example.com:9090'},
        q{'udp://h2.example.com:9090'}
    ],
    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to create subscription: @_");
    }
);
$cv->recv;

Creates a new subscription name on database database with retention policy rp with mode mode to destinations provided as destinations. The destinations could be either a single scalar value or array reference to a list of host.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

show_subscriptions

$cv = AE::cv;
$db->show_subscriptions(
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to list shards: @_");
    }
);
my $subscriptions = $cv->recv;
for my $database ( sort keys %{ $subscriptions } ) {
    print "Database: $database\n";
    for my $s ( @{ $subscriptions->{$database} } ) {
        print " Name: $s->{name}\n";
        print " Retention Policy: $s->{retention_policy}\n";
        print " Mode: $s->{mode}\n";
        print " Destinations:\n";
        print "  * $_\n" for @{ $s->{destinations} || [] };
    }
}

Returns a hash reference with database name as keys and their shards as values.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

drop_subscription

$cv = AE::cv;
$db->drop_subscription(
    # raw query
    q => 'DROP SUBSCRIPTION "alldata" ON "mydb"."default"',

    # or query created from arguments
    name => q{"alldata"},
    database => q{"mydb"},
    rp => q{"default"},

    # callbacks
    on_success => $cv,
    on_error => sub {
        $cv->croak("Failed to drop subscription: @_");
    }
);
$cv->recv;

Drops subscription name on database database with retention policy rp.

The required on_success code reference is executed if request was successful, otherwise executes the required on_error code reference.

Other

query

$cv = AE::cv;
$db->query(
    method => 'GET',
    query => {
        db => 'mydb',
        q => 'SELECT * FROM cpu_load',
    },
    on_response => $cv,
);
my ($response_data, $response_headers) = $cv->recv;

Executes an arbitrary query using provided in query arguments.

The required on_response code reference is executed with the raw response data and headers as parameters.

CAVEATS

Following the optimistic nature of InfluxDB this modules does not validate any arguments. Also quoting and escaping special characters is to be done by the user of this library.

AUTHOR

Alex J. G. Burzyński <ajgb@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2016 by Alex J. G. Burzyński <ajgb@cpan.org>.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.