NAME
AnyEvent::PgRecvlogical - perl port of pg_recvlogical
SYNOPSIS
use AnyEvent::PgRecvlogical;
my $recv = AnyEvent::PgRecvlogical->new(
dbname => 'mydb',
slot => 'myreplslot',
on_message => sub {
my ($record, $guard) = @_;
process($record);
undef $guard; # declare done with $record
}
);
$recv->start;
DESCRIPTION
AnyEvent::PgRecvlogical
provides perl bindings of similar functionality to that of pg_recvlogical. The reasoning being that pg_recvlogical
does afford the consuming process the opportunity to emit feedback to PostgreSQL. This results is potentially being sent more data than you can handle in a timely fashion.
ATTRIBUTES
dbname
-
- Str, Required
Database name to connect to.
slot
-
- Str, Required
Name of the replication slot to use (and/or create, see "do_create_slot" and "slot_exists_ok")
host
port
username
password
-
Standard PostgreSQL connection parameters, see "connect" in DBD::Pg.
do_create_slot
-
- Bool, Default:
0
If true, the "slot" will be be created upon connection. Otherwise, it's assumed it already exists. If it does not, PostgreSQL will raise an exception.
- Bool, Default:
slot_exists_ok
-
- Bool, Default:
0
If true, and if "do_create_slot" is also true, then no exception will be raised if the "slot" already exists. Otherwise, one will be raised.
- Bool, Default:
reconnect
-
- Bool, Default:
1
If true, will attempt to reconnect to the server and resume logical replication in the event the connection fails. Otherwise, the connection will gracefully be allowed to close.
- Bool, Default:
reconnect_delay
-
- Int, Default:
5
Time, in seconds, to wait before reconnecting.
- Int, Default:
reconnect_limit
-
- Int, Default:
1
Number of times to attempt reconnecting. If this limit is exceded, an exception will be thrown.
- Int, Default:
heartbeat
-
- Int, Default:
10
Interval, in seconds, to report our progress to the PostgreSQL server.
- Int, Default:
plugin
-
- Str, Default: test_decoding
The server-sider plugin used to decode the WAL file before being sent to this connection. Only required when "create_slot" is true.
options
-
- HashRef, Default:
{}
Key-value pairs sent to the server-side "plugin". Keys with a value of
undef
are sent as the keyword only. - HashRef, Default:
startpos
-
- LSN, Default:
0/0
Start replication from the given LSN. Also accepts the integer form, but that is considered advanced usage.
- LSN, Default:
received_lsn
-
- LSN, Default:
0/0
, Read Only
Holds the last LSN position received from the server.
- LSN, Default:
flushed_lsn
-
- LSN, Default:
0/0
, Read Only
Holds the last LSN signaled to handled by the client (see: "on_message")
- LSN, Default:
on_error
-
Callback in the event of an error.
on_message
-
- CodeRef, Required
Callback to receive the replication payload from the server. This is the raw output from the "plugin".
The callback is passed the
$payload
received and a$guard
object. Hang onto the$guard
until you have handled the payload. Once it is released, the server will be informed that the WAL position has been "flushed."
CONSTRUCTOR
All the "ATTRIBUTES" above are accepted by the constructor, with a few exceptions:
"received_lsn" and "flushed_lsn" are read-only and not accepted by the constructor.
"dbname", "slot" and "on_message" are required.
Note, that logical replication will not automatically commence upon construction. One must call "start" first.
METHODS
All "ATTRIBUTES" are also accesible via methods. They are all read-only.
- start
-
Initialize the logical replication process asyncronously and return immediately. This performs the following steps:
- 1. "identify_system"
- 2. "create_slot" (if requested)
- 3. "start_replication"
- 4. heartbeat timer
This method wraps the above steps for convenience. Should you desire to modify the replication startup protocol (which you shouldn't), the methods are described in detail below.
Returns: Promises::Promise
- identify_system
-
Issues the
IDENTIFY_SYSTEM
command to the server to put the connection in repliction mode.Returns: Promises::Promise
- create_slot
-
Issues the appropriate
CREATE_REPLICATION_SLOT
command to the server, if requested.Returns: Promises::Promise
- start_replication
-
Issues the
START_REPLICATION SLOT
command and immediately returns. The connection will then start receiving logical replication payloads. - pause
-
Pauses reading from the database. Useful for throttling the inbound flow of data so as to not overwhelm your application. It is safe, albeit redundant, to call this method multiple time in a row without unpausing.
- unpause
-
Resume reading from the database. After a successful "pause", this will pick right back reciving data and sending it to the provided "callback". It is safe, albeit redundant, to call this method multiple time in a row without pausing.
- is_paused
-
Returns the current pause state.
Returns: boolean
- stop
-
Stop receiving replication payloads and disconnect from the PostgreSQL server.
AUTHOR
William Cox (cpan:MYDMNSN) <mydimension@gmail.com>
COPYRIGHT
Copyright (c) 2017-2018 William Cox
LICENSE
This library is free software and may be distributed under the same terms as perl itself. See http://dev.perl.org/licenses/.