NAME
Kafka::Producer::Avro - Avro message producer for Apache Kafka.
SYNOPSIS
use Kafka::Connection;
use Kafka::Producer::Avro;
my $connection = Kafka::Connection->new( host => 'localhost' );
my $producer = Kafka::Producer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );
# Set Avro schema for message key (valid JSON-string)
my $key_schema = <<KEY_SCHEMA;
{
"type": "long",
"name": "_id"
}
KEY_SCHEMA
# Set Avro schema for message value (payload) (valid JSON-string)
my $value_schema = <<VALUE_SCHEMA;
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "f1",
"type": "string"
}
]
}
VALUE_SCHEMA
# Sending a single message
my $response = $producer->send(
'mytopic', # topic
0, # partition
'Single message', # message
undef, # key
undef, # compression_codec
undef, # timestamps
$key_schema, # key_schema
$value_schema # value_schema
);
# Sending a series of messages
$response = $producer->send(
'mytopic', # topic
0, # partition
[ # messages
'The first message',
'The second message',
'The third message',
],
undef, # key(s)
undef, # compression_codec
undef, # timestamp(s)
$key_schema, # key_schema
$value_schema # value_schema
);
# ...or use named parameters
$producer->send(
topic => $topic,
partition => $partition,
messages => $messages,
keys => $keys,
compression_codec => $compression_codec,
timestamps => $timestamps,
key_schema => $key_schema,
value_schema => $value_schema
);
# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;
DESCRIPTION
Kafka::Producer::Avro
main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.
Kafka::Producer::Avro
inerhits from and extends Kafka::Producer.
INSTALL
Installation of Kafka::Producer::Avro
is a canonical:
perl Makefile.PL
make
make test
make install
TEST NOTES
Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend Kafka::Producer
test suite.
They expect that in the target are listening Apache Kafka and Schema Registry services, respectively listening on localhost:9092
and http://localhost:8081
.
You can alternatively set a different URLs by exporting the following environment variable:
KAFKA_HOST
KAFKA_PORT
CONFLUENT_SCHEMA_REGISTY_URL
For example:
export KAFKA_HOST=my-kafka-host.my-domain.org
export FALFA_PORT=9092
export CONFLUENT_SCHEMA_REGISTY_URL=http://my-schema-registry-host.my-domain.org
USAGE
CONSTRUCTOR
new
Creates new producer client object.
new()
takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.
In addition, takes in the following arguments:
SchemaRegistry => $schema_registry
(mandatory)-
Is a Confluent::SchemaRegistry instance.
METHODS
The following methods are defined for the Kafka::Avro::Producer
class:
schema_registry
()
Returns the Confluent::SchemaRegistry instance supplied to the construcor.
get_error
()
Returns a string containing last error message.
send( $topic, $partition, $messages, $keys, $compression_codec, $timestamps, $key_schema, $value_schema )
send( %named_params )
Sends Avro-formatted messages on a Kafka::Connection object.
Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.
In order to handle Avro format, Kafka::Producer|Kafka::Producer
send()
method is extended with two more positional arguments, $key_schema
and $value_schema
:
$producer->send(
$topic, # scalar
$partition, # scalar
$messages, # scalar | array
$keys, # (optional) undef | scalar | array
$compression_codec, # (optional) undef | scalar
$timestamps, # (optional) undef | scalar | array
$key_schema, # (optional) undef | JSON-string
$value_schema # (optional) undef | JSON-string
);
Both $key_schema
and $value_schema
parameters are optional and must provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).
These schemas are validated against $schema_registry
and, if compliant, they are added to the registry under the $topic+'key'
or $topic+'value'
Schema Registry's subjects.
If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the (topic + key/value) subject.
Alternatively, for ease of use, the send()
method may be also used by suggesting named parameters:
$producer->send(
topic => $topic, # scalar
partition => $partition, # scalar
messages => $messages, # scalar | array
keys => $keys, # (optional) undef | scalar | array
compression_codec => $compression_codec, # (optional) undef | scalar
timestamps => $timestamps, # (optional) undef | scalar | array
key_schema => $key_schema, # (optional) undef | JSON-string
value_schema => $value_schema # (optional) undef | JSON-string
);
bulk_send( %params )
Similar to send
but uses bulks to avoid memory leaking.
Extra named parameters are expected:
size => $size
-
The size of the bulk
on_before_send_bulk => sub {...}
(optional)-
A code block that will be executed before the sending of each bulk.
The block will receive the following positional parameters:
on_after_send_bulk => sub {...}
(optional)-
A code block that will be executed after the sending of each bulk.
The block will receive the following positional parameters:
on_init => sub {...}
(optional)-
A code block that will be executed only once before at the beginning of the cycle.
The block will receive the following positional parameters:
on_complete => sub {...}
(optional)-
A code block that will be executed only once after the end of the cycle.
The block will receive the following positional parameters:
on_send_error => sub {...}
(optional)-
A code block that will be executed when a bulk registers an error.
AUTHOR
Alvaro Livraghi, <alvarol@cpan.org>
CONTRIBUTE
https://github.com/alivraghi/Kafka-Producer-Avro
BUGS
Please use GitHub project link above to report problems or contact authors.
COPYRIGHT AND LICENSE
Copyright 2018 by Alvaro Livraghi
This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.