NAME
Kafka::Producer - Perl interface for Kafka producer client.
VERSION
This documentation refers to Kafka::Producer
version 1.02 .
SYNOPSIS
use 5.010;
use strict;
use warnings;
use Scalar::Util qw(
blessed
);
use Try::Tiny;
use Kafka::Connection;
use Kafka::Producer;
my ( $connection, $producer );
try {
#-- Connection
$connection = Kafka::Connection->new( host => 'localhost' );
#-- Producer
$producer = Kafka::Producer->new( Connection => $connection );
# Sending a single message
my $response = $producer->send(
'mytopic', # topic
0, # partition
'Single message' # message
);
# Sending a series of messages
$response = $producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
]
);
} catch {
my $error = $_;
if ( blessed( $error ) && $error->isa( 'Kafka::Exception' ) ) {
warn 'Error: (', $error->code, ') ', $error->message, "\n";
exit;
} else {
die $error;
}
};
# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;
DESCRIPTION
Kafka producer API is implemented by Kafka::Producer
class.
The main features of the Kafka::Producer
class are:
Provides object-oriented API for producing messages.
Provides Kafka PRODUCE requests.
CONSTRUCTOR
new
Creates new producer client object.
new()
takes arguments in key-value pairs. The following arguments are currently recognized:
Connection => $connection
-
$connection
is the Kafka::Connection object responsible for communication with the Apache Kafka cluster. ClientId => $client_id
-
This is a user supplied identifier (string) for the client application.
If
ClientId
is not passed to constructor, its value will be automatically assigned (to string'producer'
). RequiredAcks => $acks
-
The
$acks
should be an int16 signed integer.Indicates how many acknowledgements the servers should receive before responding to the request.
If it is
$NOT_SEND_ANY_RESPONSE
the server does not send any response.If it is
$WAIT_WRITTEN_TO_LOCAL_LOG
, (default) the server will wait until the data is written to the local log before sending a response.If it is
$BLOCK_UNTIL_IS_COMMITTED
the server will block until the message is committed by all in sync replicas before sending a response.$NOT_SEND_ANY_RESPONSE
,$WAIT_WRITTEN_TO_LOCAL_LOG
,$BLOCK_UNTIL_IS_COMMITTED
can be imported from the Kafka module. Timeout => $timeout
-
This provides a maximum time the server can await the receipt of the number of acknowledgements in
RequiredAcks
.The
$timeout
in seconds, could be any integer or floating-point type not bigger than int32 positive integer.Optional, default =
$REQUEST_TIMEOUT
.$REQUEST_TIMEOUT
is the default timeout that can be imported from the Kafka module.
METHODS
The following methods are defined for the Kafka::Producer
class:
send( $topic, $partition, $messages, $keys, $compression_codec )
Sends a messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
send()
takes the following arguments:
$topic
-
The
$topic
must be a normal non-false string of non-zero length. $partition
-
The
$partition
must be a non-negative integer. $messages
-
The
$messages
is an arbitrary amount of data (a simple data string or a reference to an array of the data strings). $keys
-
The
$keys
are optional message keys, for partitioning with each message, so the consumer knows the partitioning key. This argument should be either a single string (common key for all messages), or an array of strings with length matching messages array. $compression_codec
-
Optional.
$compression_codec
sets the required type of$messages
compression, if the compression is desirable.Supported codecs:
$COMPRESSION_NONE
,$COMPRESSION_GZIP
,$COMPRESSION_SNAPPY
. The defaults that can be imported from the Kafka module.Do not use
$Kafka::SEND_MAX_ATTEMPTS
inKafka::Producer-<gt
send> request to prevent duplicates.
DIAGNOSTICS
When error is detected, an exception, represented by object of Kafka::Exception::Producer
class, is thrown (see Kafka::Exceptions).
code and a more descriptive message provide information about thrown exception. Consult documentation of the Kafka::Exceptions for the list of all available methods.
Authors suggest using of Try::Tiny's try
and catch
to handle exceptions while working with Kafka package.
Invalid argument
-
Invalid arguments were provided to a
new
constructor or to other method. Cannot send
-
Request cannot be sent.
Cannot receive
-
Response cannot be received.
Cannot bind
-
TCP connection cannot be established on a given host and port.
Cannot get metadata
-
IO error is present, errors found in the structure of the reply or the reply contains a non-zero error codes.
Description leader not found
-
Information about the server-leader is missing in metadata.
Mismatch CorrelationId
-
CorrelationId
of response doesn't match one in request. There are no known brokers
-
Information about brokers in the cluster is missing.
Cannot get metadata
-
Obtained metadata is incorrect or failed to obtain metadata.
SEE ALSO
The basic operation of the Kafka package modules:
Kafka - constants and messages used by the Kafka package modules.
Kafka::Connection - interface to connect to a Kafka cluster.
Kafka::Producer - interface for producing client.
Kafka::Consumer - interface for consuming client.
Kafka::Message - interface to access Kafka message properties.
Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.
Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.
Kafka::IO - low-level interface for communication with Kafka server.
Kafka::Exceptions - module designated to handle Kafka exceptions.
Kafka::Internals - internal constants and functions used by several package modules.
A wealth of detail about the Apache Kafka and the Kafka Protocol:
Main page at http://kafka.apache.org/
Kafka Protocol at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
SOURCE CODE
Kafka package is hosted on GitHub: https://github.com/TrackingSoft/Kafka
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
Please use GitHub project link above to report problems or contact authors.
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Sergiy Zuban
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2017 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.