NAME
Kafka::Producer::Avro - Avro message producer for Apache Kafka.
SYNOPSIS
use Kafka::Connection;
use Kafka::Producer::Avro;
my $connection = Kafka::Connection->new( host => 'localhost' );
my $producer = Kafka::Producer::Avro->new( Connection => $connection );
# Do some interactions with Avro & SchemaRegistry before sending messages
# Sending a single message
my $response = $producer->send(...);
# Sending a series of messages
$response = $producer->send(...);
# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;
DESCRIPTION
Kafka::Producer::Avro
main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.
Kafka::Producer::Avro
inerhits from and extends Kafka::Producer.
CONSTRUCTOR
new
Creates new producer client object.
new()
takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.
In addition, takes in the following arguments:
SchemaRegistry => $schema_registry
(mandatory)-
Is a Confluent::SchemaRegistry instance.
METHODS
The following methods are defined for the Kafka::Avro::Producer
class:
schema_registry
()
Returns the Confluent::SchemaRegistry instance supplied to the construcor.
get_error
()
Returns a string containing last error message.
send( %params )
Sends a messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
Despite Kafka::Producer method that expects positional arguments, Kafka::Producer::Avro-
send()> method looks for named parameters:
$producer->send(
topic => $topic, # scalar
partition => $partition, # scalar
messages => $messages, # scalar | array
keys => $keys, # scalar | array
compression_codec => $compression_codec, # scalar
key_schema => $key_schema, # optional JSON-string
value_schema => $value_schema # optional JSON-string
);
Extra arguments may be suggested:
key_schema => $key_schema
andvalue_schema => $value_schema
-
Both
$key_schema
and$value_schema
parameters are optional and provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).These schemas are validated against
schema_registry
and, if compliant, they are added to the registry under the$topic+'key'
or$topic+'value'
subjects.If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the subject (key or value).
bulk_send( %params )
Similar to send
but uses bulks to avoid memory leaking.
Extra named parameters are expected:
size => $size
-
The size of the bulk
on_before_send_bulk => sub {...}
(optional)-
A code block that will be executed before the sending of each bulk.
The block will receive the following positional parameters:
on_after_send_bulk => sub {...}
(optional)-
A code block that will be executed after the sending of each bulk.
The block will receive the following positional parameters:
on_init => sub {...}
(optional)-
A code block that will be executed only once before at the beginning of the cycle.
The block will receive the following positional parameters:
on_complete => sub {...}
(optional)-
A code block that will be executed only once after the end of the cycle.
The block will receive the following positional parameters:
on_send_error => sub {...}
(optional)-
A code block that will be executed when a bulk registers an error.
AUTHOR
Alvaro Livraghi, <alvarol@cpan.org>
CONTRIBUTE
https://github.com/alivraghi/Kafka-Producer-Avro
BUGS
Please use GitHub project link above to report problems or contact authors.
COPYRIGHT AND LICENSE
Copyright 2018 by Alvaro Livraghi
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.