• stream handling functionality, including claiming/pending

  • get/set and observables

  • sorted sets

  • hyperloglog existence

  • simple queues via lists

  • pub/sub

This module is responsible for namespacing, connection handling and clustering. It should also cover retry for stateless calls.

wait_time

Time to wait for items, in milliseconds.

batch_count

Number of items to allow per batch (pending / readgroup calls).

apply_prefix

remove_prefix

oldest_processed_id

Check the last id that has been processed by all the consumer groups in the given stream.

compare_id

Given two IDs, compare them as if doing a <=> numeric comparison.

next_id

Given a stream ID, returns the next ID after it. This is managed by the simple expedient of incrementing the right-hand part of the identifier.

iterate

Deal with incoming requests via a stream.

Returns a Ryu::Source which emits Myriad::Redis::Pending items.

cleanup

Clear up old entries from a stream when it grows too large.

pending

Check for any pending items, claiming them for reprocessing as required.

Takes the following named parameters:

  • stream - the stream name

  • group - which consumer group to check

  • client - the name of the client to check

Returns a Ryu::Source for the pending items in this stream.

create_group

Create a Redis consumer group if it does NOT exist.

It'll also send the MKSTREAM option to create the stream if it doesn't exist.

  • stream - The name of the stream we want to attach the group to.

  • group - The group name.

  • start_from - The id of the message that is going to be considered the start of the stream for this group's point of view by default it's $ which means the last message.

pending_messages_info

Return information about the pending messages for a stream and a consumer group.

This currently just execute XPENDING without any filtering.

  • stream - The name of the stream we want to check.

  • group - The consumers group name that we want to check.

stream_length

Return the length of a given stream

borrow_instance_from_pool

Returns a Redis connection either from a pool of connection or a new one. With the possibility of waiting to get one, if all connection were busy and we maxed out our limit.

return_instance_to_pool

This puts back a redis connection into Redis pool, so it can be used by other called. It should be called at the end of every usage, as on_ready.

It should also be possible with a try/finally combination.. but that's currently failing with the $redis_pool slot not being defined.

Takes the following parameters:

  • $instance - Redis connection to be returned.

redis

Resolves to a new Net::Async::Redis or Net::Async::Redis::Cluster instance, depending on the setting of $use_cluster.

ack

Acknowledge a message from a Redis stream.

  • stream - The stream name.

  • group - The group name.

  • message_id - The id of the message we want to acknowledge.

publish

Publish a message through a Redis channel (pub/sub system)

  • channel - The channel name.

  • message - The message we want to publish (string).

subscribe

Subscribe to a redis channel.

INHERITED METHODS

IO::Async::Notifier

add_child, adopt_future, adopted_futures, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, notifier_name, parent, remove_child, remove_from_parent

Object::Pad::UNIVERSAL

BUILDARGS

AUTHOR

Deriv Group Services Ltd. DERIV@cpan.org

LICENSE

Copyright Deriv Group Services Ltd 2020-2021. Licensed under the same terms as Perl itself.