NAME

Kafka::Consumer::Avro - Avro message consumer for Apache Kafka.

SYNOPSIS

use Kafka qw/DEFAULT_MAX_BYTES/;
use Kafka::Connection;
use Kafka::Consumer::Avro;
use Confluent::SchemaRegistry;

my $connection = Kafka::Connection->new( host => 'localhost' );

my $consumer = Kafka::Consumer::Avro->new( Connection => $connection , SchemaRegistry => Confluent::SchemaRegistry->new() );

# Consuming messages
my $messages = $consumer->fetch(
	'mytopic',            # topic
	0,                    # partition
	0,                    # offset
	$DEFAULT_MAX_BYTES    # Maximum size of MESSAGE(s) to receive
);

if ($messages) {
	foreach my $message (@$messages) {
		if ( $message->valid ) {
			say 'payload    : ', $message->payload;
			say 'key        : ', $message->key;
			say 'offset     : ', $message->offset;
			say 'next_offset: ', $message->next_offset;
		}
		else {
			say 'error      : ', $message->error;
		}
	}
}

# Closes the consumer and cleans up
undef $consumer;
$connection->close;
undef $connection;

DESCRIPTION

Kafka::Consumer::Avro main feature is to provide object-oriented API to consume messages according to Confluent SchemaRegistry and Avro serialization.

Kafka::Consumer::Avro inerhits from and extends Kafka::Consumer.

INSTALL

Installation of Kafka::Consumer::Avro is a canonical:

perl Makefile.PL
make
make test
make install

TEST NOTES

Tests are focused on verifying Avro-formatted messages and theirs interactions with Confluent Schema Registry and are intended to extend Kafka::Consumer test suite.

They expect that in the target machine are available Kafka and Schema Registry listening on localhost and default ports, otherwise most of the test are skipped.

USAGE

CONSTRUCTOR

new

Creates new consumer client object.

new() takes arguments in key-value pairs as described in Kafka::Consumer from which it inherits.

In addition, takes in the following arguments:

SchemaRegistry => $schema_registry (mandatory)

Is a Confluent::SchemaRegistry instance.

METHODS

The following methods are defined for the Kafka::Avro::Consumer class:

schema_registry()

Returns the Confluent::SchemaRegistry instance supplied to the construcor.

get_error()

Returns a string containing last error message.

fetch( %params )

Gets messages froma a Kafka topic.

Please, see Kafka::Consumer for more details.

AUTHOR

Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE

https://github.com/alivraghi/Kafka-Consumer-Avro

BUGS

Please use GitHub project link above to report problems or contact authors.

COPYRIGHT AND LICENSE

Copyright 2018 by Alvaro Livraghi

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.