NAME

Kafka::Producer::Avro - Avro message producer for Apache Kafka.

SYNOPSIS

use Kafka::Connection;
use Kafka::Producer::Avro;

my $connection = Kafka::Connection->new( host => 'localhost' );

my $producer = Kafka::Producer::Avro->new( Connection => $connection );

# Do some interactions with Avro & SchemaRegistry before sending messages

# Sending a single message
my $response = $producer->send(...);

# Sending a series of messages
$response = $producer->send(...);

# Closes the producer and cleans up
undef $producer;
$connection->close;
undef $connection;

DESCRIPTION

Kafka::Producer::Avro main feature is to provide object-oriented API to produce messages according to Confluent SchemaRegistry and Avro serialization.

Kafka::Producer::Avro inerhits from and extends Kafka::Producer.

CONSTRUCTOR

new

Creates new producer client object.

new() takes arguments in key-value pairs as described in Kafka::Producer from which it inherits.

In addition, takes in the following arguments:

SchemaRegistry => $schema_registry (mandatory)

Is a Confluent::SchemaRegistry instance.

METHODS

The following methods are defined for the Kafka::Avro::Producer class:

schema_registry()

Returns the Confluent::SchemaRegistry instance supplied to the construcor.

get_error()

Returns a string containing last error message.

send( %params )

Sends a messages on a Kafka::Connection object.

Returns a non-blank value (a reference to a hash with server response description) if the message is successfully sent.

Despite Kafka::Producer method that expects positional arguments, Kafka::Producer::Avro-send()> method looks for named parameters:

$producer->send(
	topic             => $topic,             # scalar 
	partition         => $partition,         # scalar
	messages          => $messages,          # scalar | array
	keys              => $keys,              # scalar | array
	compression_codec => $compression_codec, # scalar
	key_schema        => $key_schema,        # optional JSON-string
	value_schema      => $value_schema       # optional JSON-string
);    

Extra arguments may be suggested:

key_schema => $key_schema and value_schema => $value_schema

Both $key_schema and $value_schema parameters are optional and provide JSON strings that represent Avro schemas to use to validate and serialize key(s) and value(s).

These schemas are validated against schema_registry and, if compliant, they are added to the registry under the $topic+'key' or $topic+'value' subjects.

If an expected schema isn't provided, latest version from Schema Registry is used accordingly to the subject (key or value).

bulk_send( %params )

Similar to send but uses bulks to avoid memory leaking.

Extra named parameters are expected:

size => $size

The size of the bulk

on_before_send_bulk => sub {...} (optional)

A code block that will be executed before the sending of each bulk.

The block will receive the following positional parameters:

$bulk_num the number of the bulk
$bulk_messages the number of messages in the bulk
$bulk_keys the number of keys in the bulk
$index_from the absolute index of the first message in the bulk
$index_to the absolute index of the last message in the bulk
on_after_send_bulk => sub {...} (optional)

A code block that will be executed after the sending of each bulk.

The block will receive the following positional parameters:

$sent the number of sent messages in the bulk
$total_sent the total number of messages sent
on_init => sub {...} (optional)

A code block that will be executed only once before at the beginning of the cycle.

The block will receive the following positional parameters:

$to_send the total number of messages to send
$bulk_size the size of the bulk
on_complete => sub {...} (optional)

A code block that will be executed only once after the end of the cycle.

The block will receive the following positional parameters:

$to_send the total number of messages to send
$total_sent the total number of messages sent
$errors the number bulks sent with errors
on_send_error => sub {...} (optional)

A code block that will be executed when a bulk registers an error.

AUTHOR

Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE

https://github.com/alivraghi/Kafka-Producer-Avro

BUGS

Please use GitHub project link above to report problems or contact authors.

COPYRIGHT AND LICENSE

Copyright 2018 by Alvaro Livraghi

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.