NAME

AnyEvent::PgRecvlogical - perl port of pg_recvlogical

Coverage Status CPAN version

SYNOPSIS

use AnyEvent::PgRecvlogical;

my $recv = AnyEvent::PgRecvlogical->new(
    dbname     => 'mydb',
    slot       => 'myreplslot',
    on_message => sub {
        my ($record, $guard) = @_;

        process($record);

        undef $guard; # declare done with $record
    }
);

$recv->start;

DESCRIPTION

AnyEvent::PgRecvlogical provides perl bindings of similar functionality to that of pg_recvlogical. The reasoning being that pg_recvlogical does afford the consuming process the opportunity to emit feedback to PostgreSQL. This results is potentially being sent more data than you can handle in a timely fashion.

ATTRIBUTES

dbname
Str, Required

Database name to connect to.

slot
Str, Required

Name of the replication slot to use (and/or create, see "do_create_slot" and "slot_exists_ok")

host
Str
port
Int
username
Str
password
Str

Standard PostgreSQL connection parameters, see "connect" in DBD::Pg.

do_create_slot
Bool, Default: 0

If true, the "slot" will be be created upon connection. Otherwise, it's assumed it already exists. If it does not, PostgreSQL will raise an exception.

slot_exists_ok
Bool, Default: 0

If true, and if "do_create_slot" is also true, then no exception will be raised if the "slot" already exists. Otherwise, one will be raised.

reconnect
Bool, Default: 1

If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails. Otherwise, the connection will gracefully be allowed to close.

reconnect_delay
Int, Default: 5

Time, in seconds, to wait before reconnecting.

reconnect_limit
Int, Default: 1

Number of times to attempt reconnecting. If this limit is exceded, an exception will be thrown.

heartbeat
Int, Default: 10

Interval, in seconds, to report our progress to the PostgreSQL server.

plugin
Str, Default: test_decoding

The server-sider plugin used to decode the WAL file before being sent to this connection. Only required when "create_slot" is true.

options
HashRef, Default: {}

Key-value pairs sent to the server-side "plugin". Keys with a value of undef are sent as the keyword only.

startpos
LSN, Default: 0/0

Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.

received_lsn
LSN, Default: 0/0, Read Only

Holds the last LSN position received from the server.

flushed_lsn
LSN, Default: 0/0, Read Only

Holds the last LSN signaled to handled by the client (see: "on_message")

on_error
CodeRef, Default: croak

Callback in the event of an error.

on_message
CodeRef, Required

Callback to receive the replication payload from the server. This is the raw output from the "plugin".

The callback is passed the $payload received and a $guard object. Hang onto the $guard until you have handled the payload. Once it is released, the server will be informed that the WAL position has been "flushed."

CONSTRUCTOR

All the "ATTRIBUTES" above are accepted by the constructor, with a few exceptions:

"received_lsn" and "flushed_lsn" are read-only and not accepted by the constructor.

"dbname", "slot" and "on_message" are required.

Note, that logical replication will not automatically commence upon construction. One must call "start" first.

METHODS

All "ATTRIBUTES" are also accesible via methods. They are all read-only.

start

Initialize the logical replication process asyncronously and return immediately. This performs the following steps:

1. "identify_system"
2. "create_slot" (if requested)
3. "start_replication"
4. heartbeat timer

This method wraps the above steps for convenience. Should you desire to modify the replication startup protocol (which you shouldn't), the methods are described in detail below.

Returns: Promises::Promise

identify_system

Issues the IDENTIFY_SYSTEM command to the server to put the connection in repliction mode.

Returns: Promises::Promise

create_slot

Issues the appropriate CREATE_REPLICATION_SLOT command to the server, if requested.

Returns: Promises::Promise

start_replication

Issues the START_REPLICATION SLOT command and immediately returns. The connection will then start receiving logical replication payloads.

pause

Pauses reading from the database. Useful for throttling the inbound flow of data so as to not overwhelm your application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.

unpause

Resume reading from the database. After a successful "pause", this will pick right back reciving data and sending it to the provided "callback". It is safe, albeit redundant, to call this method multiple time in a row without pausing.

is_paused

Returns the current pause state.

Returns: boolean

stop

Stop receiving replication payloads and disconnect from the PostgreSQL server.

AUTHOR

William Cox (cpan:MYDMNSN) <mydimension@gmail.com>

COPYRIGHT

Copyright (c) 2017-2018 William Cox

LICENSE

This library is free software and may be distributed under the same terms as perl itself. See http://dev.perl.org/licenses/.