NAME

POE::Component::ElasticSearch::Indexer - POE session to index data to ElasticSearch

VERSION

version 0.015

SYNOPSIS

This POE Session is used to index data to an ElasticSearch cluster.

use POE qw{ Component::ElasticSearch::Indexer };

my $es_session = POE::Component::ElasticSearch::Indexer->spawn(
    Alias            => 'es',                    # Default
    Protocol         => 'http',                  # Default
    Servers          => [qw(localhost)],         # Default
    Timeout          => 5,                       # Default
    FlushInterval    => 30,                      # Default
    FlushSize        => 1_000,                   # Default
    LoggingConfig    => undef,                   # Default
    DefaultIndex     => 'logs-%Y.%m.%d',         # Default
    DefaultType      => '_doc',                  # Default
    BatchDir         => '/tmp/es_index_backlog', # Default
    BatchDiskSpace   => undef,                   # Default
    StatsHandler     => undef,                   # Default
    StatsInterval    => 60,                      # Default
    AuthUsername     => $ENV{USER},              # Default
    AuthPassword     => undef,                   # Default
);

# Index the document using the queue for better performance
$poe_kernel->post( es => queue => $json_data );

DESCRIPTION

This module exists to provide event-based Perl programs with a simple way to index documents into an ElasticSearch cluster.

spawn()

This method spawns the ElasticSearch indexing POE::Session. It accepts the following parameters.

Alias

The alias this session is available to other sessions as. The default is es.

Protocol

Can be either http or https, defaults to http.

Servers

A list of Elasticsearch hosts for connections. Maybe in the form of hostname or hostname:port.

AuthUsername

Username for HTTP Basic Authorization, defaults to $ENV{USER}.

AuthPassword

Password for HTTP Basic Authorization, set to enable HTTP Basic Authorization.

PoolConnections

Boolean, default true. Enable connection pooling with POE::Component::Client::Keepalive. This is desirable in most cases, but can result in timeouts piling up. You may wish to disable this if you notice that indexing takes a while to recover after timeout events.

KeepAliveTimeout

Requires PoolConnections.

Set the keep_alive timeout in seconds for the creation of a POE::Component::Client::Keepalive connection pool.

Defaults to 2.

MaxConnsPerServer

Requires PoolConnections.

Maximum number of simultaneous connections to an Elasticsearch node. Used in the creation of a POE::Component::Client::Keepalive connection pool.

Defaults to 3.

MaxConnsTotal

Requires PoolConnections.

Maximum number of simultaneous connections to all servers. Used in the creation of a POE::Component::Client::Keepalive connection pool.

Defaults to MaxConnsPerServer * number of Servers.

MaxPendingRequests

Requires PoolConnections.

Maximum number of requests backlogged in the connection pool. Defaults to 5.

MaxFailedRatio

A number between 0 and 1 representing a percentage of bulk requests that can fail before we back off the cluster for the StatsInterval. This is calculated every StatsInterval. The default is 0.8 or 80%.

LoggingConfig

The Log::Log4perl configuration file for the indexer to use. Defaults to writing logs to the current directory into the file es_indexing.log.

Timeout

Number of seconds for the HTTP transport connect and transport timeouts. Defaults to 5 seconds. The total request timeout, waiting for an open connection slot and then completing the request, will be this multiplied by 2.

FlushInterval

Maximum number of seconds which can pass before a flush of the queue is attempted. Defaults to 30 seconds.

FlushSize

Once this number of documents is reached, flush the queue regardless of time since the last flush. Defaults to 1,000.

DefaultIndex

A strftime aware index pattern to use if the document is missing an _index element. Defaults to logs-%Y.%m.%d.

DefaultType

Use this _type attribute if the document is missing one. Defaults to _doc to be compatible with ES 7.x.

The _type attribute will be stripped from documents if the cluster is running greater than 7.0.0.

BatchDir

If the cluster responds with an HTTP failure code, the batch is written to disk in this directory to be indexed when the cluster is available again. Defaults to /tmp/es_index_backlog.

BatchDiskSpace

Defaults to undef, which means disk space isn't checked. If set, if the batch size goes over this limit, every new batch saved will delete the oldest batch. Checked every ten batches.

You may specify either as absolute bytes or using shortcuts:

BatchDiskSpace => 500kb,
BatchDiskSpace => 100mb,
BatchDiskSpace => 10gb,
BatchDiskSpace => 1tb,
MaxRecoveryBatches

The number of batches to process per backlog event. This will only come into play if there are batches on disk to flush. Defaults to 10.

StatsHandler

A code reference that will be passed a hash reference containing the keys and values of counters tracked by this component. Defaults to undef, meaning no code is run.

StatsInterval

Run the StatsHandler every StatsInterval seconds. Default to 60.

BacklogInterval

Run the backlog processing event every BacklogInterval seconds. Default to 60. Will process up to MaxRecoveryBatches batches per BacklogInterval.

This event only fires when there are batches on disk. When it's done processing them, it will then stop firing.

CleanupInterval

Run the cleanup event every CleanupInterval seconds. Default to 60. This will check to ensure the BatchDiskSpace is honored and delete the oldest batches if that is exceeded.

This event only fires when there are batches on disk.

EVENTS

The events provided by this component.

version

Runs at start to the get Elasticsearch Version.

queue

Takes an array reference of hash references to be transformed into JSON documents and submitted to the cluster's _bulk API.

Each hash reference may pass in the following special keys, which will be used to index the event. These keys will be deleted from the document being indexed as they have special meaning to the bulk API.

_id

Will be submitted as the document id in the bulk operation, if not specified, Elasticsearch will generate a UUID for each document automatically.

_type

Will be submitted as the document type in the bulk operation, if not specified, we'll use the DefaultType specified in the spawn() method.

Deprecated and removed for clusters running Elasticsearch version 7.0 or higher.

_index

Will cause the document to be indexed into that index, if not specified, the DefaultIndex will be used.

_epoch

If the DefaultIndex uses a strftime compatible string, you may specify an _epoch in every document. If not specified, we'll assume the epoch to use for strftime calculations is the current time.

For more information, see the Elasticsearch Bulk API Docs.

Alternatively, you can provide an array reference containing blessed objects that provide an as_bulk() method. The result of that method will be added to the bulk queue.

If you've decided to construct the requisite newline delimited JSON yourself, you may pass in an array reference containing scalars. If you do, the module assumes you know what you're doing and will append that text to the existing bulk queue unchanged.

Example use case:

sub syslog_handle_line {
    my ($kernel,$heap,$session,$line) = @_[KERNEL,HEAP,SESSION,ARG0];

    # Create a document from syslog data
    local $Parse::Syslog::Line::PruneRaw = 1;
    local $Parse::Syslog::Line::PruneEmpty = 1;
    my $evt = parse_syslog_line($line);

    # Override the type
    $evt->{_type} = 'syslog';

    # If we want to collect this event into an auth index:
    if( exists $Authentication{$evt->{program}} ) {
        $evt->{_index} = strftime('authentication-%Y.%m.%d',
                localtime($evt->{epoch} || time)
        );
    }
    else {
        # Set an _epoch for the es queue DefaultIndex
        $evt->{_epoch} = $evt->{epoch} ? delete $evt->{epoch} : time;
    }
    # You'll want to batch these in your processor to avoid excess
    # overhead creating so many events in the POE loop
    push @{ $heap->{batch} }, $evt;

    # Once we hit 10 messages, force the flush
    $kernel->call( $session->ID => 'submit_batch') if @{ $heap->{batch} } > 10;
}

sub submit_batch {
    my ($kernel,$heap) = @_[KERNEL,HEAP];

    # Reset the batch scheduler
    $kernel->delay( 'submit_batch' => 10 );

    $kernel->post( es => queue => delete $heap->{batch} );
    $heap->{batch} = [];
}
flush

Schedule a flush of the existing bulk updates to the cluster. It should never be necessary to call this event unless you'd like to shutdown the event loop faster.

backlog

Request the disk-based backlog be processed. You should never need to call this event as the session will run it once it starts and if there's data to process, it will continue rescheduling as needed. When a bulk operation fails resulting in a batch file, this event is scheduled to run again.

shutdown

Inform this session that you'd like to wrap up operations. This prevents recurring events from being scheduled.

AUTHOR

Brad Lhotsky <brad@divisionbyzero.net>

COPYRIGHT AND LICENSE

This software is Copyright (c) 2018 by Brad Lhotsky.

This is free software, licensed under:

The (three-clause) BSD License

CONTRIBUTOR

Mohammad S Anwar <mohammad.anwar@yahoo.com>

SUPPORT

Websites

The following websites have more information about this module, and may be of help to you. As always, in addition to those websites please use your favorite search engine to discover more resources.

Source Code

This module's source code is available by visiting: https://github.com/reyjrar/POE-Component-ElasticSearch-Indexer