stream handling functionality, including claiming/pending
get/set and observables
sorted sets
hyperloglog existence
simple queues via lists
pub/sub
This module is responsible for namespacing, connection handling and clustering. It should also cover retry for stateless calls.
wait_time
Time to wait for items, in milliseconds.
batch_count
Number of items to allow per batch (pending / readgroup calls).
apply_prefix
remove_prefix
oldest_processed_id
Check the last id that has been processed by all the consumer groups in the given stream.
compare_id
Given two IDs, compare them as if doing a <=>
numeric comparison.
next_id
Given a stream ID, returns the next ID after it. This is managed by the simple expedient of incrementing the right-hand part of the identifier.
iterate
Deal with incoming requests via a stream.
Returns a Ryu::Source which emits Myriad::Redis::Pending items.
cleanup
Clear up old entries from a stream when it grows too large.
pending
Check for any pending items, claiming them for reprocessing as required.
Takes the following named parameters:
stream
- the stream namegroup
- which consumer group to checkclient
- the name of the client to check
Returns the pending items in this stream.
create_stream
Creates a Redis stream. Note that there is no straight way to do that in Redis without creating a group or adding an event. To overcome this it will create a group with MKSTREAM option Then destroy that init consumer group.
stream
- name of the stream we want to create.
create_group
Create a Redis consumer group if it does NOT exist.
It'll also send the MKSTREAM option to create the stream if it doesn't exist.
stream
- The name of the stream we want to attach the group to.group
- The group name.start_from
- The id of the message that is going to be considered the start of the stream for this group's point of view by default it's$
which means the last message.
remove_group
Delete a Redis consumer group.
stream
- The name of the stream group belongs to.group
- The consumer group name.
pending_messages_info
Return information about the pending messages for a stream and a consumer group.
This currently just execute XPENDING
without any filtering.
stream
- The name of the stream we want to check.group
- The consumers group name that we want to check.
stream_length
Return the length of a given stream
borrow_instance_from_pool
Returns a Redis connection either from a pool of connection or a new one. With the possibility of waiting to get one, if all connection were busy and we maxed out our limit.
return_instance_to_pool
This puts back a redis connection into Redis pool, so it can be used by other called. It should be called at the end of every usage, as on_ready.
It should also be possible with a try/finally combination.. but that's currently failing with the $redis_pool slot not being defined.
Takes the following parameters:
$instance
- Redis connection to be returned.
redis
Resolves to a new Net::Async::Redis or Net::Async::Redis::Cluster instance, depending on the setting of $use_cluster
.
ack
Acknowledge a message from a Redis stream.
stream
- The stream name.group
- The group name.message_id
- The id of the message we want to acknowledge.
publish
Publish a message through a Redis channel (pub/sub system)
channel
- The channel name.message
- The message we want to publish (string).
subscribe
Subscribe to a redis channel.
INHERITED METHODS
- IO::Async::Notifier
-
add_child, adopt_future, adopted_futures, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, notifier_name, parent, remove_child, remove_from_parent
- Object::Pad::UNIVERSAL
AUTHOR
Deriv Group Services Ltd. DERIV@cpan.org
LICENSE
Copyright Deriv Group Services Ltd 2020-2022. Licensed under the same terms as Perl itself.