NAME
Kafka::Connection - Object interface to connect to a kafka cluster.
VERSION
This documentation refers to Kafka::Connection
version 1.04 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
# A simple example of Kafka::Connection usage:
use Kafka::Connection;
# connect to local cluster with the defaults
my $connection;
try {
$connection = Kafka::Connection->new( host => 'localhost' );
} catch {
my $error = $_;
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
warn $error->message, "\n", $error->trace->as_string, "\n";
exit;
} else {
die $error;
}
};
# Closes the connection and cleans up
$connection->close;
undef $connection;
DESCRIPTION
The main features of the Kafka::Connection
class are:
Provides API for communication with Kafka 0.9+ cluster.
Performs requests encoding and responses decoding, provides automatic selection or promotion of a leader server from Kafka cluster.
Provides information about Kafka cluster.
EXPORT
The following constants are available for export
%RETRY_ON_ERRORS
These are non-fatal errors, which when happen causes refreshing of meta-data from Kafka followed by another attempt to fetch data.
CONSTRUCTOR
new
Creates Kafka::Connection
object for interaction with Kafka cluster. Returns created Kafka::Connection
object.
new()
takes arguments in key-value pairs. The following arguments are currently recognized:
host => $host
-
$host
is any Apache Kafka cluster host to connect to. It can be a hostname or the IP-address in the "xx.xx.xx.xx" form.Optional. Either
host
orbroker_list
must be supplied.WARNING:
Make sure that you always connect to brokers using EXACTLY the same address or host name as specified in broker configuration (host.name in server.properties). Avoid using default value (when host.name is commented out) in server.properties - always use explicit value instead.
port => $port
-
Optional, default =
$KAFKA_SERVER_PORT
.$port
is the attribute denoting the port number of the service we want to access (Apache Kafka service).$port
should be an integer number.$KAFKA_SERVER_PORT
is the default Apache Kafka server port constant (9092
) that can be imported from the Kafka module. broker_list => $broker_list
-
Optional,
$broker_list
is a reference to array of the host:port or [IPv6_host]:port strings, defining the list of Kafka servers. This list will be used to locate the new leader if the server specified viahost => $host
andport => $port
arguments becomes unavailable. Eitherhost
orbroker_list
must be supplied. ip_version => $ip_version
-
Specify version of IP for interpreting of passed IP address and resolving of host name.
Optional, undefined by default, which works in the following way: version of IP address is detected automatically, host name is resolved into IPv4 address.
timeout => $timeout
-
Optional, default =
$Kafka::REQUEST_TIMEOUT
.$timeout
specifies how long we wait for the remote server to respond.$timeout
is in seconds, could be a positive integer or a floating-point number not bigger than int32 positive integer.Special behavior when
timeout
is set toundef
:
Alarms are not used internally (namely when performing
gethostbyname
).Default
$REQUEST_TIMEOUT
is used for the rest of IO operations.
SEND_MAX_ATTEMPTS => $attempts
-
Optional, int32 signed integer, default =
$Kafka::SEND_MAX_ATTEMPTS
.In some circumstances (leader is temporarily unavailable, outdated metadata, etc) we may fail to send a message. This property specifies the maximum number of attempts to send a message. The
$attempts
should be an integer number. RETRY_BACKOFF => $backoff
-
Optional, default =
$Kafka::RETRY_BACKOFF
.Since leader election takes a bit of time, this property specifies the amount of time, in milliseconds, that the producer waits before refreshing the metadata. The
$backoff
should be an integer number. AutoCreateTopicsEnable => $mode
-
Optional, default value is 0 (false).
Kafka BUG "[KAFKA-1124]" (Fixed in Kafka 0.8.2): AutoCreateTopicsEnable controls how this module handles the first access to non-existent topic when
auto.create.topics.enable
in server configuration istrue
. If AutoCreateTopicsEnable is false (default), the first access to non-existent topic produces an exception; however, the topic is created and next attempts to access it will succeed.If AutoCreateTopicsEnable is true, this module waits (according to the
SEND_MAX_ATTEMPTS
andRETRY_BACKOFF
properties) until the topic is created, to avoid errors on the first access to non-existent topic.If
auto.create.topics.enable
in server configuration isfalse
, this setting has no effect. MaxLoggedErrors => $number
-
Optional, default value is 100.
Defines maximum number of last non-fatal errors that we keep in log. Use method "nonfatal_errors" to access those errors.
dont_load_supported_api_versions => $boolean
-
Optional, default value is 0 (false).
If set to false, when communicating with a broker, the client will automatically try to find out the best version numbers to use for each of the API endpoints.
If set to true, the client will always use
$Kafka::Protocol::DEFAULT_APIVERSION
as API version.WARNING: API versions are supported starting from Kafka 0.10. Set this parameter to true if you're connecting to 0.9.
METHODS
The following methods are defined for the Kafka::Producer
class:
get_known_servers
Returns the list of known Kafka servers (in host:port or [IPv6_host]:port format).
get_metadata( $topic )
If $topic
is present, it must be a non-false string of non-zero length.
If $topic
is absent, this method returns metadata for all topics.
Updates kafka cluster's metadata description and returns the hash reference to metadata, which can be schematically described as:
{
TopicName => {
Partition => {
'Leader' => ...,
'Replicas' => [
...,
],
'Isr' => [
...,
],
},
...,
},
...,
}
Consult Kafka "Wire protocol" documentation for more details about metadata structure.
is_server_known( $server )
Returns true, if $server
(host:port or [IPv6_host]:port) is known in cluster.
receive_response_to_request( $request, $compression_codec )
$request
-
$request
is a reference to the hash representing the structure of the request.This method encodes
$request
, passes it to the leader of cluster, receives reply, decodes and returns it in a form of hash reference.
WARNING:
This method should be considered private and should not be called by an end user.
In order to achieve better performance, this method does not perform arguments validation.
$compression_codec
-
Optional.
$compression_codec
sets the required type of$messages
compression, if the compression is desirable.Supported codecs: $COMPRESSION_NONE, $COMPRESSION_GZIP, $COMPRESSION_SNAPPY.
exists_topic_partition( $topic, $partition )
Returns true if the metadata contains information about specified combination of topic and partition. Otherwise returns false.
exists_topic_partition()
takes the following arguments:
close_connection( $server )
Closes connection with $server
(defined as host:port or [IPv6_host]:port).
close
Closes connection with all known Kafka servers.
cluster_errors
Returns a reference to a hash.
Each hash key is the identifier of the server (host:port or [IPv6_host]:port), and the value is the last communication error with that server.
An empty hash is returned if there were no communication errors.
nonfatal_errors
Returns a reference to an array of the last non-fatal errors.
Maximum number of entries is set using MaxLoggedErrors
parameter of constructor.
A reference to the empty array is returned if there were no non-fatal errors or parameter MaxLoggedErrors
is set to 0.
clear_nonfatals
Clears an array of the last non-fatal errors.
A reference to the empty array is returned because there are no non-fatal errors now.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Connection class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Here is the list of possible error messages that Kafka::Connection
may produce:
Invalid argument
-
Invalid argument was provided to
new
constructor or to other method. Cannot send
-
Request cannot be sent to Kafka.
Cannot receive
-
Response cannot be received from Kafka.
Cannot bind
-
A successful TCP connection cannot be established on given host and port.
Cannot get metadata
-
Error detected during parsing of response from Kafka.
Leader not found
-
Failed to locate leader of Kafka cluster.
Mismatch CorrelationId
-
Mismatch of
CorrelationId
of request and response. There are no known brokers
-
Failed to locate cluster broker.
Cannot get metadata
-
Received meta data is incorrect or missing.
Debug mode
Debug output can be enabled by passing desired level via environment variable using one of the following ways:
PERL_KAFKA_DEBUG=1
- debug is enabled for the whole Kafka package.
PERL_KAFKA_DEBUG=Connection:1
- enable debug for Kafka::Connection
only.
Kafka::Connection
prints to STDERR
information about non-fatal errors, re-connection attempts and such when debug level is set to 1 or higher.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov
Please use GitHub project link above to report problems or contact authors.
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
Damien Krotkine
Greg Franklin
COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.