Kafka::Protocol - Functions to process messages in the Apache Kafka protocol.


This documentation refers to Kafka::Protocol version 1.06 .


use 5.010;
use strict;
use warnings;

use Data::Compare;
use Kafka qw(
use Kafka::Internals qw(
use Kafka::Protocol qw(

# a encoded produce request hex stream
my $encoded = pack( q{H*}, '00000049000000000000000400000001000005dc0000000100076d79746f7069630000000100000000000000200000000000000000000000148dc795a20000ffffffff0000000648656c6c6f21' );

# a decoded produce request
my $decoded = {
    CorrelationId                       => 4,
    ClientId                            => q{},
    RequiredAcks                        => $WAIT_WRITTEN_TO_LOCAL_LOG,
    Timeout                             => $REQUEST_TIMEOUT * 100,  # ms
    topics                              => [
            TopicName                   => 'mytopic',
            partitions                  => [
                    Partition           => 0,
                    MessageSet              => [
                            Offset          => $PRODUCER_ANY_OFFSET,
                            MagicByte       => 0,
                            Attributes      => $COMPRESSION_NONE,
                            Key             => q{},
                            Value           => 'Hello!',

my $encoded_request = encode_produce_request( $decoded );
say 'encoded correctly' if $encoded_request eq $encoded;

# a encoded produce response hex stream
$encoded = pack( q{H*}, '00000023000000040000000100076d79746f706963000000010000000000000000000000000000' );

# a decoded produce response
$decoded = {
    CorrelationId                           => 4,
    topics                                  => [
            TopicName                       => 'mytopic',
            partitions                      => [
                    Partition               => 0,
                    ErrorCode               => $ERROR_NO_ERROR,
                    Offset                  => 0,

my $decoded_response = decode_produce_response( \$encoded );
say 'decoded correctly' if Compare( $decoded_response, $decoded );

# more examples, see t/*_decode_encode.t


This module is not a user module.

In order to achieve better performance, functions of this module do not perform arguments validation.

The main features of the Kafka::Protocol module are:

  • Supports parsing the Apache Kafka protocol.

  • Supports Apache Kafka Requests and Responses (PRODUCE and FETCH). Within this package we currently support access to PRODUCE, FETCH, OFFSET, METADATA Requests and Responses.

  • Support for working with 64 bit elements of the Kafka protocol on 32 bit systems.


The following constants are available for export


The default API version that will be used as fallback, if it's not possible to detect what the Kafka server supports. Only Kafka servers > can be queried to get which API version they implements. On Kafka servers 0.8.x and 0.9.x, the protocol will default to use $DEFAULT_APIVERSION. Currently its value is '0'


According to Apache Kafka documentation: 'ReplicaId - Normal client consumers should always specify this as -1 as they have no node id.'


According to Apache Kafka documentation: 'Protocol Primitive Types: ... bytes, string - A length of -1 indicates null.'


According to Apache Kafka documentation: 'Offset - When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.'


The following functions are available for Kafka::MockProtocol module.

encode_api_versions_request( $ApiVersions_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$ApiVersions_Request is a reference to the hash representing the structure of the APIVERSIONS Request. it contains CorrelationId, ClientId (can be empty string), and ApiVersion (must be 0)

decode_api_versions_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the APIVERSIONS Response.

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_find_coordinator_request( $FindCoordinator_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$FindCoordinator_Request is a reference to the hash representing the structure of the FINDCOORDINATOR Request. it contains CorrelationId, ClientId (can be empty string), CoordinatorKey and CoordinatorType (for version 1 of protocol)

decode_find_coordinator_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the FINDCOORDINATOR Response.

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_produce_request( $Produce_Request, $compression_codec )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$Produce_Request is a reference to the hash representing the structure of the PRODUCE Request (examples see t/*_decode_encode.t).



$compression_codec sets the required type of $messages compression, if the compression is desirable.


decode_produce_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the PRODUCE Response (examples see t/*_decode_encode.t).

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_fetch_request( $Fetch_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$Fetch_Request is a reference to the hash representing the structure of the FETCH Request (examples see t/*_decode_encode.t).

decode_fetch_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the FETCH Response (examples see t/*_decode_encode.t).

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offset_request( $Offset_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$Offset_Request is a reference to the hash representing the structure of the OFFSET Request (examples see t/*_decode_encode.t).

decode_offset_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSET Response (examples see t/*_decode_encode.t).

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_metadata_request( $Metadata_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$Metadata_Request is a reference to the hash representing the structure of the METADATA Request (examples see t/*_decode_encode.t).

decode_metadata_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the METADATA Response (examples see t/*_decode_encode.t).

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offsetcommit_request( $OffsetCommit_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$OffsetCommit_Request is a reference to the hash representing the structure of the OffsetCommit Request (examples see t/*_decode_encode.t).

decode_offsetcommit_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSETCOMMIT Response (examples see t/*_decode_encode.t).

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.

encode_offsetfetch_request( $OffsetFetch_Request )

Encodes the argument and returns a reference to the encoded binary string representing a Request buffer.

This function takes the following arguments:


$OffsetFetch_Request is a reference to the hash representing the structure of the OffsetFetch Request (examples see t/*_decode_encode.t).

decode_offsetfetch_response( $bin_stream_ref )

Decodes the argument and returns a reference to the hash representing the structure of the OFFSETFETCH Response (examples see t/*_decode_encode.t).

This function takes the following arguments:


$bin_stream_ref is a reference to the encoded Response buffer. The buffer must be a non-empty binary string.


In order to achieve better performance, functions of this module do not perform arguments validation.


The basic operation of the Kafka package modules:

Kafka - constants and messages used by the Kafka package modules.

Kafka::Connection - interface to connect to a Kafka cluster.

Kafka::Producer - interface for producing client.

Kafka::Consumer - interface for consuming client.

Kafka::Message - interface to access Kafka message properties.

Kafka::Int64 - functions to work with 64 bit elements of the protocol on 32 bit systems.

Kafka::Protocol - functions to process messages in the Apache Kafka's Protocol.

Kafka::IO - low-level interface for communication with Kafka server.

Kafka::Exceptions - module designated to handle Kafka exceptions.

Kafka::Internals - internal constants and functions used by several package modules.

A wealth of detail about the Apache Kafka and the Kafka Protocol:

Main page at

Kafka Protocol at


Kafka package is hosted on GitHub:


Sergey Gladkov

Please use GitHub project link above to report problems or contact authors.


Alexander Solovey

Jeremy Jordan

Sergiy Zuban

Vlad Marchenko

Damien Krotkine

Greg Franklin


Copyright (C) 2012-2017 by TrackingSoft LLC.

This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.