NAME

Mojo::Redis::PubSub - Publish and subscribe to Redis messages

SYNOPSIS

use Mojo::Redis;

my $redis  = Mojo::Redis->new;
my $pubsub = $redis->pubsub;

$pubsub->listen("user:superwoman:messages" => sub {
  my ($pubsub, $message, $channel) = @_;
  say "superwoman got a message '$message' from channel '$channel'";
});

$pubsub->notify("user:batboy:messages", "How are you doing?");

See https://github.com/jhthorsen/mojo-redis/blob/master/examples/chat.pl for example Mojolicious application.

DESCRIPTION

Mojo::Redis::PubSub is an implementation of the Redis Publish/Subscribe messaging paradigm. This class has the same API as Mojo::Pg::PubSub, so you can easily switch between the backends.

This object holds one connection for receiving messages, and one connection for sending messages. They are created lazily the first time "listen" or "notify" is called. These connections does not affect the connection pool for Mojo::Redis.

See pubsub for more details.

EVENTS

before_connect

$pubsub->on(before_connect => sub { my ($pubsub, $conn) = @_; ... });

Emitted before "connection" is connected to the redis server. This can be useful if you want to gather the CLIENT ID or run other commands before it goes into subscribe mode.

disconnect

$pubsub->on(disconnect => sub { my ($pubsub, $conn) = @_; ... });

Emitted after "connection" is disconnected from the redis server.

psubscribe

$pubsub->on(psubscribe => sub { my ($pubsub, $channel, $success) = @_; ... });

Emitted when the server responds to the "listen" request and/or when "reconnect" resends psubscribe messages.

This event is EXPERIMENTAL.

reconnect

$pubsub->on(reconnect => sub { my ($pubsub, $conn) = @_; ... });

Emitted after switching the "connection" with a new connection. This event will only happen if "reconnect_interval" is 0 or more.

subscribe

$pubsub->on(subscribe => sub { my ($pubsub, $channel, $success) = @_; ... });

Emitted when the server responds to the "listen" request and/or when "reconnect" resends subscribe messages.

This event is EXPERIMENTAL.

ATTRIBUTES

db

$db = $pubsub->db;

Holds a Mojo::Redis::Database object that will be used to publish messages or run other commands that cannot be run by the "connection".

connection

$conn = $pubsub->connection;

Holds a Mojo::Redis::Connection object that will be used to subscribe to channels.

reconnect_interval

$interval = $pubsub->reconnect_interval;
$pubsub   = $pubsub->reconnect_interval(1);
$pubsub   = $pubsub->reconnect_interval(0.1);
$pubsub   = $pubsub->reconnect_interval(-1);

The amount of time in seconds to wait to "reconnect" after disconnecting. Default is 1 (second). "reconnect" can be disabled by setting this to a negative value.

redis

$conn   = $pubsub->connection;
$pubsub = $pubsub->connection(Mojo::Redis->new);

Holds a Mojo::Redis object used to create the connections to talk with Redis.

METHODS

channels_p

$promise = $pubsub->channels_p->then(sub { my $channels = shift });
$promise = $pubsub->channels_p("pat*")->then(sub { my $channels = shift });

Lists the currently active channels. An active channel is a Pub/Sub channel with one or more subscribers (not including clients subscribed to patterns).

json

$pubsub = $pubsub->json("foo");

Activate automatic JSON encoding and decoding with "to_json" in Mojo::JSON and "from_json" in Mojo::JSON for a channel.

# Send and receive data structures
$pubsub->json("foo")->listen(foo => sub {
  my ($pubsub, $payload, $channel) = @_;
  say $payload->{bar};
});
$pubsub->notify(foo => {bar => 'I ♥ Mojolicious!'});

keyspace_listen

$cb = $pubsub->keyspace_listen(\%args,              sub { my ($pubsub, $message) = @_ }) });
$cb = $pubsub->keyspace_listen({key => "cool:key"}, sub { my ($pubsub, $message) = @_ }) });
$cb = $pubsub->keyspace_listen({op  => "del"},      sub { my ($pubsub, $message) = @_ }) });

Used to listen for keyspace notifications. See https://redis.io/topics/notifications for more details. The channel that will be subscribed to will look like one of these:

__keyspace@${db}__:$key $op
__keyevent@${db}__:$op $key

This means that "key" and "op" is mutually exclusive from the list of parameters below:

  • db

    Default database to listen for events is the database set in "url" in Mojo::Redis. "*" is also a valid value, meaning listen for events happening in all databases.

  • key

    Alternative to passing in $key. Default value is "*".

  • op

    Alternative to passing in $op. Default value is "*".

keyspace_unlisten

$pubsub = $pubsub->keyspace_unlisten(@args);
$pubsub = $pubsub->keyspace_unlisten(@args, $cb);

Stop listening for keyspace events. See "keyspace_listen" for details about keyspace events and what @args can be.

listen

$cb = $pubsub->listen($channel => sub { my ($pubsub, $message, $channel) = @_ });

Subscribe to an exact channel name (SUBSCRIBE) or a channel name with a pattern (PSUBSCRIBE). $channel in the callback will be the exact channel name, without any pattern. $message will be the data published to that the channel.

The returning code ref can be passed on to "unlisten".

notify

$pubsub->notify($channel => $message);

Send a plain string message to a channel. This method is the same as:

$pubsub->notify_p($channel => $message)->wait;

notify_p

$p = $pubsub->notify_p($channel => $message);

Send a plain string message to a channel and returns a Mojo::Promise object.

numpat_p

$promise = $pubsub->channels_p->then(sub { my $int = shift });

Returns the number of subscriptions to patterns (that are performed using the PSUBSCRIBE command). Note that this is not just the count of clients subscribed to patterns but the total number of patterns all the clients are subscribed to.

numsub_p

$promise = $pubsub->numsub_p(@channels)->then(sub { my $channels = shift });

Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels as a hash-ref, where the keys are channel names.

unlisten

$pubsub = $pubsub->unlisten($channel);
$pubsub = $pubsub->unlisten($channel, $cb);

Unsubscribe from a channel.

SEE ALSO

Mojo::Redis.