NAME
AnyEvent::InfluxDB - An asynchronous library for InfluxDB time-series database
VERSION
version 1.0.2.0
SYNOPSIS
use EV;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::InfluxDB;
use Monitoring::Plugin::Performance;
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
username => 'admin',
password => 'password',
);
my $hdl;
tcp_server undef, 8888, sub {
my ($fh, $host, $port) = @_;
$hdl = AnyEvent::Handle->new(
fh => $fh,
);
$hdl->push_read(
line => sub {
my (undef, $line) = @_;
# Disk\t/=382MB;15264;15269;; /var=218MB;9443;9448
my ($measurement, $perfstring) = split(/\t/, $line);
my @perfdata
= Monitoring::Plugin::Performance->parse_perfstring($perfstring);
$db->write(
database => 'mydb',
data => [
map {
+{
measurement => $measurement,
tags => {
label => $_->label,
},
fields => {
value => $_->value,
uom => '"'. $_->uom .'"',
},
}
} @perfdata
],
on_success => sub { print "$line written\n"; },
on_error => sub { print "$line error: @_\n"; },
);
$hdl->on_drain(
sub {
$hdl->fh->close;
undef $hdl;
}
);
},
);
};
EV::run;
DESCRIPTION
Asynchronous client library for InfluxDB time-series database https://influxdb.com.
This version is meant to be used with InfluxDB v1.0.0 or newer.
METHODS
new
my $db = AnyEvent::InfluxDB->new(
server => 'http://localhost:8086',
# authenticate using Basic credentials
username => 'admin',
password => 'password',
# or use JWT token
jwt => 'JWT_TOKEN_BLOB'
);
Returns object representing given InfluDB server
connected using optionally provided username username
and password password
.
Default value of server
is http://localhost:8086
.
If the server protocol is https
then by default no validation of remote host certificate is performed. This can be changed by setting ssl_options
parameter with any options accepted by AnyEvent::TLS.
my $db = AnyEvent::InfluxDB->new(
...
ssl_options => {
verify => 1,
verify_peername => 'https',
ca_file => '/path/to/cacert.pem',
}
);
As an debugging aid the on_request
code reference may also be provided. It will be executed before each request with the method name, url and POST data if set.
my $db = AnyEvent::InfluxDB->new(
...
on_request => sub {
my ($method, $url, $post_data) = @_;
print "$method $url\n";
print "$post_data\n" if $post_data;
}
);
ping
$cv = AE::cv;
$db->ping(
wait_for_leader => 2,
on_success => $cv,
on_error => sub {
$cv->croak("Failed to ping cluster leader: @_");
}
);
my $version = $cv->recv;
Checks the leader of the cluster to ensure that the leader is available and ready. The optional parameter wait_for_leader
specifies the number of seconds to wait before returning a response.
The required on_success
code reference is executed if request was successful with the value of X-Influxdb-Version
response header as argument, otherwise executes the required on_error
code reference with the value of Reason
response header as argument.
Managing Data
write
$cv = AE::cv;
$db->write(
database => 'mydb',
precision => 's',
rp => 'last_day',
consistency => 'quorum',
data => [
# line protocol formatted
'cpu_load,host=server02,region=eu-east sensor="top",value=0.64 1456097956',
# or as a hash
{
measurement => 'cpu_load',
tags => {
host => 'server02',
region => 'eu-east',
},
fields => {
value => '0.64',
sensor => q{"top"},
},
time => time()
}
],
on_success => $cv,
on_error => sub {
$cv->croak("Failed to write data: @_");
}
);
$cv->recv;
Writes time-series data data
to database database
with optional parameters: retention policy rp
, time precision precision
and consistency consistency
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
The data
can be specified as single scalar value or hash reference with required keys measurement
and fields
and optional tags
and time
. Both can be also mixed and matched within an array reference.
Scalar values are expected to be formatted using InfluxDB line protocol.
All special characters need to be escaped. In that case you might want to use InfluxDB::LineProtocol:
use InfluxDB::LineProtocol qw(dataline);
...
$db->write(
database => 'mydb',
precision => 'n',
data => [
dataline('CPU Load', 0.64, { "Region of the World" => "Eastern Europe", codename => "eu-east" }, 1437868012260500137)
# which translates to
'CPU\ Load,Region\ of\ the\ World=Eastern\ Europe,codename=eu-east value=0.64 1437868012260500137',
],
...
);
Querying Data
select
$cv = AE::cv;
$db->select(
database => 'mydb',
# return time in Unix epoch format
epoch => "s",
# raw query
q => "SELECT count(value) FROM cpu_load"
." WHERE region = 'eu-east' AND time > now() - 14d"
." GROUP BY time(1d) fill(none)"
." ORDER BY time DESC"
." LIMIT 10 OFFSET 3",
# or query created from arguments
fields => 'count(value)',
measurement => 'cpu_load',
where => "region = 'eu-east' AND time > now() - 14d",
group_by => 'time(1d)',
fill => 'none',
order_by => 'time DESC',
limit => 10,
offset => 3,
# downsample result to another database, retention policy and measurement
into => 'otherdb."default".cpu_load_per5m',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to select data: @_");
}
);
my $results = $cv->recv;
for my $row ( @{ $results } ) {
print "Measurement: $row->{name}\n";
print "Values:\n";
for my $value ( @{ $row->{values} || [] } ) {
print " * $_ = $value->{$_}\n" for keys %{ $value || {} };
}
}
Executes an select query on database database
created from provided arguments measurement measurement
, fields to select fields
, optional where
clause, grouped by group_by
and empty values filled with fill
, ordered by order_by
with number of results limited to limit
with offset offset
. To limit number of returned series use slimit
with offset soffset
. If into
parameter is provided the result of the query will be copied to specified measurement. If epoch
is provided the returned time
value will in Unix epoch format. Optional chunk_size
can be provided to override the default value of 10,000 datapoints.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Database Management
create_database
$cv = AE::cv;
$db->create_database(
# raw query
q => "CREATE DATABASE mydb WITH DURATION 7d REPLICATION 3 SHARD DURATION 30m NAME oneweek",
# or query created from arguments
database => "mydb",
# retention policy parameters
duration => '7d',
shard_duration => '30m',
replication => 3,
name => 'oneweek',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create database: @_");
}
);
$cv->recv;
Creates database specified by database
argument.
If one of retention policy parameters is specified then the database will be created with that retention policy as default - see "Retention Policy Management" for more details.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_database
$cv = AE::cv;
$db->drop_database(
# raw query
q => "DROP DATABASE mydb",
# or query created from arguments
database => "mydb",
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop database: @_");
}
);
$cv->recv;
Drops database specified by database
argument.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_series
$cv = AE::cv;
$db->drop_series(
database => 'mydb',
# raw query
q => "DROP SERIES FROM cpu_load WHERE host = 'server02'",
# or query created from arguments
measurement => 'cpu_load',
where => q{host = 'server02'},
# multiple measurements can also be specified
measurements => [qw( cpu_load cpu_temp )],
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop measurement: @_");
}
);
$cv->recv;
Drops series from single measurement measurement
(or many using measurements
) and/or filtered by where
clause from database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
delete_series
$cv = AE::cv;
$db->delete_series(
database => 'mydb',
# raw query
q => "DELETE FROM cpu_load WHERE host = 'server02' AND time < '2016-01-01'",
# or query created from arguments
measurement => 'cpu_load',
where => q{host = 'server02' AND time < '2016-01-01'},
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop measurement: @_");
}
);
$cv->recv;
Deletes all points from a measurement measurement
and/or filtered by where
clause from database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_measurement
$cv = AE::cv;
$db->drop_measurement(
database => 'mydb',
# raw query
q => "DROP MEASUREMENT cpu_load",
# or query created from arguments
measurement => 'cpu_load',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop measurement: @_");
}
);
$cv->recv;
Drops measurement measurement
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_shards
$cv = AE::cv;
$db->show_shards(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list shards: @_");
}
);
my $shards = $cv->recv;
for my $database ( sort keys %{ $shards } ) {
print "Database: $database\n";
for my $s ( @{ $shards->{$database} } ) {
print " * $_: $s->{$_}\n" for sort keys %{ $s };
}
}
Returns a hash reference with database name as keys and their shards as values.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_shard_groups
$cv = AE::cv;
$db->show_shard_groups(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list shard groups: @_");
}
);
my @shard_groups = $cv->recv;
for my $sg ( @shard_groups ) {
print "ID: $sg->{id}\n";
print "Database: $sg->{database}\n";
print "Retention Policy: $sg->{retention_policy}\n";
print "Start Time: $sg->{start_time}\n";
print "End Time: $sg->{end_time}\n";
print "Expiry Time: $sg->{expiry_time}\n";
}
Returns a list of hash references with keys id
, database
, retention_policy
, start_time
, end_time
and expiry_time
for each shard groups.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_shard
$cv = AE::cv;
$db->drop_shard(
database => 'mydb',
# raw query
q => "DROP SHARD 1",
# or query created from arguments
id => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop measurement: @_");
}
);
$cv->recv;
Drops shard identified by id number id
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_queries
$cv = AE::cv;
$db->show_queries(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list shard groups: @_");
}
);
my @queries = $cv->recv;
for my $q ( @queries ) {
print "ID: $q->{qid}\n";
print "Query: $q->{query}\n";
print "Database: $q->{database}\n";
print "Duration: $q->{duration}\n";
}
Returns a list of hash references with keys qid
, query
, database
, duration
for all currently running queries.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
kill_query
$cv = AE::cv;
$db->kill_query(
id => 36,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to kill query: @_");
}
);
$cv->recv;
Stops a running query identified by id number id
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Retention Policy Management
create_retention_policy
$cv = AE::cv;
$db->create_retention_policy(
# raw query
q => "CREATE RETENTION POLICY last_day ON mydb DURATION 1d REPLICATION 1",
# or query created from arguments
name => 'last_day',
database => 'mydb',
duration => '1d',
shard_duration => '168h',
replication => 1,
default => 0,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create retention policy: @_");
}
);
$cv->recv;
Creates new retention policy named by name
on database database
with duration duration
, shard group duration shard_duration
and replication factor replication
. If default
is provided and true the created retention policy becomes the default one.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
alter_retention_policy
$cv = AE::cv;
$db->alter_retention_policy(
# raw query
q => "ALTER RETENTION POLICY last_day ON mydb DURATION 1d REPLICATION 1 DEFAULT",
# or query created from arguments
name => 'last_day',
database => 'mydb',
duration => '1d',
shard_duration => '12h',
replication => 1,
default => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to alter retention policy: @_");
}
);
$cv->recv;
Modifies retention policy named by name
on database database
. At least one of duration duration
, replication factor replication
or flag default
must be set.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_retention_policy
$cv = AE::cv;
$db->drop_retention_policy(
# raw query
q => "DROP RETENTION POLICY last_day ON mydb",
# or query created from arguments
name => "last_day",
database => "mydb",
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop retention policy: @_");
}
);
$cv->recv;
Drops specified by name
retention policy on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Schema Exploration
show_databases
$cv = AE::cv;
$db->show_databases(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list databases: @_");
}
);
my @db_names = $cv->recv;
print "$_\n" for @db_names;
Returns list of known database names.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_retention_policies
$cv = AE::cv;
$db->show_retention_policies(
# raw query
q => "SHOW RETENTION POLICIES ON mydb",
# or query created from arguments
database => 'mydb',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list retention policies: @_");
}
);
my @retention_policies = $cv->recv;
for my $rp ( @retention_policies ) {
print "Name: $rp->{name}\n";
print "Duration: $rp->{duration}\n";
print "Shard group duration: $rp->{shardGroupDuration}\n";
print "Replication factor: $rp->{replicaN}\n";
print "Default?: $rp->{default}\n";
}
Returns a list of hash references with keys name
, duration
, shardGroupDuration
, replicaN
and default
for each replication policy defined on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_series
$cv = AE::cv;
$db->show_series(
database => 'mydb',
# raw query
q => "SHOW SERIES FROM cpu_load"
." WHERE host = 'server02'"
." ORDER BY region"
." LIMIT 10 OFFSET 3",
# or query created from arguments
measurement => 'cpu_load',
where => q{host = 'server02'},
order_by => 'region',
limit => 10,
offset => 3,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list series: @_");
}
);
my @series = $cv->recv;
print "$_\n" for @series;
Returns names of series from database database
using optional measurement measurement
and optional where
clause.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_measurements
$cv = AE::cv;
$db->show_measurements(
database => 'mydb',
# raw query
q => "SHOW MEASUREMENTS WITH MEASUREMENT =~ /cpu_load.*/"
." WHERE host = 'server02'"
." ORDER BY region"
." LIMIT 10 OFFSET 3",
# or query created from arguments
measurement => '/cpu_load.*/',
where => q{host = 'server02'},
order_by => 'region',
limit => 10,
offset => 3,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list measurements: @_");
}
);
my @measurements = $cv->recv;
print "$_\n" for @measurements;
Returns names of measurements from database database
, optionally filtered with regular expression measurement
and optional where
clause. If the measurement
is not enclosed in //
then it will be treated as name of the measurement.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_tag_keys
$cv = AE::cv;
$db->show_tag_keys(
database => 'mydb',
# raw query
q => "SHOW TAG KEYS FROM cpu_load WHERE host = 'server02' LIMIT 10 OFFSET 3",
# or query created from arguments
measurement => 'cpu_load',
where => q{host = 'server02'},
limit => 10,
offset => 3,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list tag keys: @_");
}
);
my $tag_keys = $cv->recv;
for my $measurement ( sort keys %{ $tag_keys } ) {
print "Measurement: $measurement\n";
print " * $_\n" for @{ $tag_keys->{$measurement} };
}
Returns a hash reference with measurements as keys and their unique tag keys as values from database database
and optional measurement measurement
, optionally filtered by the where
clause, grouped by group_by
with number of results limited to limit
with offset offset
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_tag_values
$cv = AE::cv;
$db->show_tag_values(
database => 'mydb',
# raw query
q => q{SHOW TAG VALUES FROM cpu_load WITH KEY = "host"},
# or query created from arguments
measurement => 'cpu_load',
# single key
key => q{"host"},
# or a list of keys
keys => [qw( "host" "region" )],
limit => 10,
offset => 3,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list tag values: @_");
}
);
my $tag_values = $cv->recv;
for my $measurement ( sort keys %{ $tag_values } ) {
print "Measurement: $measurement\n";
for my $tag_key ( sort keys %{ $tag_values->{$measurement} } ) {
print " Tag key: $tag_key\n";
print " * $_\n" for @{ $tag_values->{$measurement}->{$tag_key} };
}
}
Returns a hash reference with measurements as keys and their unique tag values as values from database database
and optional measurement measurement
from a single tag key key
or a list of tag keys keys
with number of results limited to limit
with offset offset
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_field_keys
$cv = AE::cv;
$db->show_field_keys(
database => 'mydb',
# raw query
q => "SHOW FIELD KEYS FROM cpu_load",
# or query created from arguments
measurement => 'cpu_load',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list field keys: @_");
}
);
my $field_keys = $cv->recv;
for my $measurement ( sort keys %{ $field_keys } ) {
print "Measurement: $measurement\n";
for my $field ( @{ $field_keys->{$measurement} } ) {
print " Key: $field->{key}\n";
print " Type: $field->{type}\n";
}
}
Returns a hash reference with measurements as keys and their field keys names and type as values from database database
and optional measurement measurement
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
User Management
create_user
$cv = AE::cv;
$db->create_user(
# raw query
q => "CREATE USER jdoe WITH PASSWORD 'mypassword' WITH ALL PRIVILEGES",
# or query created from arguments
username => 'jdoe',
password => 'mypassword',
all_privileges => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create user: @_");
}
);
$cv->recv;
Creates user with username
and password
. If flag all_privileges
is set to true created user will be granted cluster administration privileges.
Note: password
will be automatically enclosed in single quotes.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
set_user_password
$cv = AE::cv;
$db->set_user_password(
# raw query
q => "SET PASSWORD FOR jdoe = 'otherpassword'",
# or query created from arguments
username => 'jdoe',
password => 'otherpassword',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to set password: @_");
}
);
$cv->recv;
Sets password to password
for the user identified by username
.
Note: password
will be automatically enclosed in single quotes.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_users
$cv = AE::cv;
$db->show_users(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list users: @_");
}
);
my @users = $cv->recv;
for my $u ( @users ) {
print "Name: $u->{user}\n";
print "Admin?: $u->{admin}\n";
}
Returns a list of hash references with keys user
and admin
for each defined user.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
grant_privileges
$cv = AE::cv;
$db->grant_privileges(
# raw query
q => "GRANT ALL ON mydb TO jdoe",
# or query created from arguments
username => 'jdoe',
# privileges at single database
database => 'mydb',
access => 'ALL',
# or to grant cluster administration privileges
all_privileges => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to grant privileges: @_");
}
);
$cv->recv;
Grants to user username
access access
on database database
. If flag all_privileges
is set it grants cluster administration privileges instead.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_grants
$cv = AE::cv;
$db->show_grants(
# raw query
q => "SHOW GRANTS FOR jdoe",
# or query created from arguments
username => 'jdoe',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list users: @_");
}
);
my @grants = $cv->recv;
for my $g ( @grants ) {
print "Database: $g->{database}\n";
print "Privilege: $g->{privilege}\n";
}
Returns a list of hash references with keys database
and privilege
describing the privileges granted for database to given user.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
revoke_privileges
$cv = AE::cv;
$db->revoke_privileges(
# raw query
q => "REVOKE WRITE ON mydb FROM jdoe",
# or query created from arguments
username => 'jdoe',
# privileges at single database
database => 'mydb',
access => 'WRITE',
# or to revoke cluster administration privileges
all_privileges => 1,
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to revoke privileges: @_");
}
);
$cv->recv;
Revokes from user username
access access
on database database
. If flag all_privileges
is set it revokes cluster administration privileges instead.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_user
$cv = AE::cv;
$db->drop_user(
# raw query
q => "DROP USER jdoe",
# or query created from arguments
username => 'jdoe',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop user: @_");
}
);
$cv->recv;
Drops user username
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Continuous Queries
create_continuous_query
$cv = AE::cv;
$db->create_continuous_query(
# raw query
q => 'CREATE CONTINUOUS QUERY per5minutes ON mydb'
.' RESAMPLE EVERY 10s FOR 10m'
.' BEGIN'
.' SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)'
.' END',
# or query created from arguments
database => 'mydb',
name => 'per5minutes',
every => '10s',
for => '2m',
query => 'SELECT MEAN(value) INTO "cpu_load_per5m" FROM cpu_load GROUP BY time(5m)',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create continuous query: @_");
}
);
$cv->recv;
Creates new continuous query named by name
on database database
using query query
. Optional every
and for
define the resampling times.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_continuous_query
$cv = AE::cv;
$db->drop_continuous_query(
# raw query
q => 'DROP CONTINUOUS QUERY per5minutes ON mydb',
# or query created from arguments
database => 'mydb',
name => 'per5minutes',
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop continuous query: @_");
}
);
$cv->recv;
Drops continuous query named by name
on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_continuous_queries
$cv = AE::cv;
$db->show_continuous_queries(
database => 'mydb',
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list continuous queries: @_");
}
);
my $continuous_queries = $cv->recv;
for my $database ( sort keys %{ $continuous_queries } ) {
print "Database: $database\n";
for my $s ( @{ $continuous_queries->{$database} } ) {
print " Name: $s->{name}\n";
print " Query: $s->{query}\n";
}
}
Returns a list of hash references with keys name
and query
for each continuous query defined on database database
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Kapacitor integration
Subscriptions tell InfluxDB to send all the data it receives to Kapacitor.
create_subscription
$cv = AE::cv;
$db->create_subscription(
# raw query
q => 'CREATE SUBSCRIPTION alldata ON "mydb"."default"'
." DESTINATIONS ANY 'udp://h1.example.com:9090', 'udp://h2.example.com:9090'",
# or query created from arguments
name => q{alldata},
database => q{"mydb"},
rp => q{"default"},
mode => "ANY",
destinations => [
q{'udp://h1.example.com:9090'},
q{'udp://h2.example.com:9090'}
],
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to create subscription: @_");
}
);
$cv->recv;
Creates a new subscription name
on database database
with retention policy rp
with mode mode
to destinations provided as destinations
. The destinations
could be either a single scalar value or array reference to a list of host.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
show_subscriptions
$cv = AE::cv;
$db->show_subscriptions(
on_success => $cv,
on_error => sub {
$cv->croak("Failed to list shards: @_");
}
);
my $subscriptions = $cv->recv;
for my $database ( sort keys %{ $subscriptions } ) {
print "Database: $database\n";
for my $s ( @{ $subscriptions->{$database} } ) {
print " Name: $s->{name}\n";
print " Retention Policy: $s->{retention_policy}\n";
print " Mode: $s->{mode}\n";
print " Destinations:\n";
print " * $_\n" for @{ $s->{destinations} || [] };
}
}
Returns a hash reference with database name as keys and their shards as values.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
drop_subscription
$cv = AE::cv;
$db->drop_subscription(
# raw query
q => 'DROP SUBSCRIPTION "alldata" ON "mydb"."default"',
# or query created from arguments
name => q{"alldata"},
database => q{"mydb"},
rp => q{"default"},
# callbacks
on_success => $cv,
on_error => sub {
$cv->croak("Failed to drop subscription: @_");
}
);
$cv->recv;
Drops subscription name
on database database
with retention policy rp
.
The required on_success
code reference is executed if request was successful, otherwise executes the required on_error
code reference.
Other
query
$cv = AE::cv;
$db->query(
method => 'GET',
query => {
db => 'mydb',
q => 'SELECT * FROM cpu_load',
},
on_response => $cv,
);
my ($response_data, $response_headers) = $cv->recv;
Executes an arbitrary query using provided in query
arguments.
The required on_response
code reference is executed with the raw response data and headers as parameters.
CAVEATS
Following the optimistic nature of InfluxDB this modules does not validate any arguments. Also quoting and escaping special characters is to be done by the user of this library.
AUTHOR
Alex J. G. Burzyński <ajgb@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2016 by Alex J. G. Burzyński <ajgb@cpan.org>.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.