NAME

Thread::Queue::Multiplex (aka TQM) - thread-safe publish/subscribe queue

Thread-Queue-Multiplex-0.91.tar.gz

SYNOPSIS

use Thread::Queue::Multiplex;
#
#	create new queue, limiting the max pending requests
#	to 20
#
my $tqm = Thread::Queue::Multiplex->new(MaxPending => 20);
#
#	register as a subscriber
#
$tqm->subscribe('myID');
#
#	unregister as a subscriber
#
$tqm->unsubscribe();
#
#	wait for $count subscribers to register
#
$tqm->wait_for_subscribers($count, $timeout);
#
#	get the list of current subscribers ID's
#
my @subids = $tqm->get_subscribers();
#
#	change the max pending limit
#
$tqm->set_max_pending($limit);
#
#	enqueue elements, returning a unique queue ID
#	(used in the client)
#
my $id = $tqm->publish("foo", "bar");
#
#	publish elements, and wait for a response
#	(used in the client)
#
my $resp = $tqm->publish_and_wait("foo", "bar");
#
#	publish elements, and wait for a response
#	until $timeout secs (used in the client)
#
my $resp = $tqm->publish_and_wait_until($timeout, "foo", "bar");
#
#	publish elements at head of queue, returning a
#	unique queue ID (used in the client)
#
my $id = $tqm->publish_urgent("foo", "bar");
#
#	publish elements at head of queue and wait for response
#
my $resp = $tqm->publish_urgent_and_wait("foo", "bar");
#
#	publish elements at head of queue and wait for
#	response until $timeout secs
#
my $resp = $tqm->publish_urgent_and_wait_until($timeout, "foo", "bar");
#
#	publish elements for simplex operation (no response)
#	returning the queue object
#
$tqm->publish_simplex("foo", "bar");

$tqm->publish_simplex_urgent("foo", "bar");
#
#########################################################
#
#	subscribers use the existing TQD dequeue() methods
#
#######################################################
#
#	modified versions of the TQD base enqueue methods
#	to support directed messaging to a single subscriber
#	or group of subscribers
#
#######################################################
#
#	enqueue elements to a specific subscriber, returning
#	a unique queue ID (used in the client)
#
my $id = $tqm->enqueue($subID, "foo", "bar");
#
#	enqueue elements to 2 subscribers, and wait for a response
#	(used in the client)
#
my $resp = $tqm->enqueue_and_wait([ $subID1, $subID2 ], "foo", "bar");
#
#	enqueue elements, and wait for a response
#	until $timeout secs (used in the client)
#
my $resp = $tqm->enqueue_and_wait_until($subID, $timeout, "foo", "bar");
#
#	SEE Thread::Queue::Duplex for the various publisher enqueue()
#	and wait() methods,
#	and the subscriber dequeue() methods
#

DESCRIPTION

A subclass of Thread::Queue::Duplex aka TQD which implements a "publish and subscribe" communications model for threads. Subscribers register with the queue, which registers either the provided subscriber ID, or, if no ID is provided, 1 plus the TID of the subscriber's thread, as a subscriber ID. As the publisher publishes messages to the queue, each subscriber receives a copy of the message. If the publication is not simplex, the publisher expects all subscribers to read and respond to the message; otherwise, the publisher simply continues its processing. Thread::Queue::Multiplex provides publish() method counterparts for all the Thread::Queue::Duplex enqueue() methods, e.g., publish_simplex(), publish_urgent(), publish_and_wait(), publish_and_wait_until(), etc.

Subscribers receive and reply to messages using the existing TQD dequeue() and respond() methods. In addition, modified versions of the enqueue() methods are provided to publishers to permit directing a message to a single subscriber, or subset of subscribers, by specifying the scalar subscriber ID (for single subscriber messages), or an arrayref of unique subscriber ID's (for multi-subscriber messages).

Thread::Queue::Multiplex subclass overrides some of the internal behavior of Thread::Queue::Duplex by

  • adding a shared hash to hold the list of unique subscriber ID's (provided either explicitly with subscribe(), or derived from 1 + threads->self()->tid() when the subscriber subscribe()s) mapped to a threads::shared array to hold ID's of messages published to the subscriber. (Note: tid() + 1 is used in order to avoid an ID of zero for the root thread).

  • adding a shared hash to hold the list of message ID's mapped to a threads::shared array to containing [message ID, flags, refcount, @params], where flags indicates the urgent and/or simplex status of the request, and refcount indicates the number of subscribers assigned to the request. A special refcount value of -1 indicates that only the first subscriber to retrieve/process the request should respond (to mimic the behavior of Thread::Queueu::Duplex), which is specified by the publisher using any of the enqueue methods with a subscriber ID of -1.

  • adding a shared hash to hold the list of message ID's mapped to a threads::shared hash containing a reference count of subscribers for the message, and a map of subscriber IDs to their responses. This "pending response" hash is used to accumulate all subscriber responses; when the reference count of a message is zero, the hash of responses is posted to the final response message mapping hash.

  • adding a shared hash to hold the map of thread ID's to subscriber ID's. Note: Each thread can have only a single subscriber.

  • changing the message mapping hash to map a unique message ID to a hash of unique subscriber ID's, mapped to their response (if any), i.e.,

    $msg_map = {
    	$msgid => {
    		$subID1 => $subID1_response,
    		$subID2 => $subID2_response,
    		etc.
    	}
    }
  • when the publisher dequeues the response to a message, it receives a copy of the subscriber mapping hash, and is responsible for iterating over the hash to read each subscriber's results

A normal processing sequence for Thread::Queue::Multiplex might be:

#
#	Thread A (the client):
#
	...marshal parameters for a coroutine...
	my $id = $tqm->publish('function_name', \@paramlist);
	my $results = $tqm->dequeue_response($id);
	while (($subID, $subresult) = each %$results) {
	...process $results...
	}
#
#	Thread B (a subscriber):
#
	while (1) {
		my $call = $tqm->dequeue;
		my ($id, $func, @params) = @$call;
		$tqm->respond($id, $self->$func(@params));
	}

FUNCTIONS AND METHODS

$tqm = Thread::Queue::Multiplex->new([MaxPending => $limit])

Constructor. Creates a new empty queue. If the MaxPending value is a non-zero value, the number of pending requests will be limited to $limit, and any further attempt to queue a request will block until the pending count drops below $limit. This limit may be applied or modified later via the set_max_pending() method (see below).

subscribe( [ $subID ] ) aka listen()

Subscribe to the queue. The listen() alias is provided for compatibility with TQD apps. If $subID is not provided, 1 plus the current thread's TID is used as the subscriber ID. Only a single subscriber per thread is permitted; undef will be returned if the current thread already has a subscriber.

unsubscribe() aka ignore()

Unsubscribe from the queue. The ignore() alias is provided for compatibility with TQD apps. Note the subscriber for the current thread is unsubscribed. Unsubscribing another thread is not currently supported.

@subIDs = $tqm->get_subscribers()

Returns the current list of subscriber IDs.

$msgID = $tqm->publish(@request)

enqueue()s the @request to all subscribers.

$results = $tqm->publish_and_wait(@request)

Same as publish, except that it waits for and returns the response hash, rather than returning immediately with the request ID.

$results = $tqm->publish_and_wait_until($timeout, @request)

Same as publish, except that it waits up to $timeout seconds for all subscribers to respond, and returns the response hash, rather than returning immediately with the request ID. If some, but not all, subscribers respond within the timeout, the responses are discarded.

$msgID = $tqm->publish_urgent(@request)

Same as publish, but adds the element to head of queue, rather than tail.

$results = $tqm->publish_urgent_and_wait(@request)

Same as publish_and_wait, but adds the element to head of queue, rather than tail.

$results = $tqm->publish_urgent_and_wait_until($timeout, @request)

Same as publish_and_wait_until, but adds the element to head of queue, rather than tail.

$msgID = $tqm->publish_simplex(@request)

Same as publish, but does not allocate an identifier, nor expect a response.

$msgID = $tqm->publish_simplex_urgent(@request)

Same as publish_simplex, but adds the element to head of queue, rather than tail.

$count = $tqm->pending()

Returns the number of items still in the queue. Note that, for subscribers, the returned value is the number of requests published to the individual subscriber, which may be less than the total number of pending requests, due to directed enqueue requests. Also, for subscribers, the number may include requests which have been cancelled, but not yet processed/discarded by the subscriber.

The following TQD methods are overloaded by TQM to support directed requests by adding either a single scalar subscriber ID, or an arrayref of multiple subscriber IDs, as the first parameter:

$msgID = $tqm->enqueue( $subID, @request)
$msgID = $tqm->enqueue( [ @subIDs ], @request)
$msgID = $tqm->enqueue_simplex( $subID, @request)
$msgID = $tqm->enqueue_simplex( [ @subIDs ], @request)
$msgID = $tqm->enqueue_urgent( $subID, @request)
$msgID = $tqm->enqueue_urgent( [ @subIDs ], @request)
$msgID = $tqm->enqueue_simplex_urgent( $subID, @request)
$msgID = $tqm->enqueue_simplex_urgent( [ @subIDs ], @request)
$result = $tqm->enqueue_and_wait( $subID, @request)
$result = $tqm->enqueue_and_wait( [ @subIDs ], @request)
$result = $tqm->enqueue_urgent_and_wait( $subID, @request)
$result = $tqm->enqueue_urgent_and_wait( [ @subIDs ], @request)
$result = $tqm->enqueue_and_wait_until( $subID, $timeout, @request)
$result = $tqm->enqueue_and_wait_until( [ @subIDs ], $timeout, @request)
$result = $tqm->enqueue_urgent_and_wait_until( $subID, $timeout, @request)
$result = $tqm->enqueue_urgent_and_wait_until( [ @subIDs ], $timeout, @request)

CAVEATS

If any subscriber thread dies, then the publisher may hang on any of the blocking publish calls, or the wait()/dequeue_response(). A future update may support occasional scans and forced unsubscribe() for dead threads.

SEE ALSO

Thread::Queue::Duplex Thread::Queue::Queueable, threads threads::shared Thread::Queue

AUTHOR, COPYRIGHT, & LICENSE

Dean Arnold, Presicient Corp. darnold@presicient.com

Copyright(C) 2006, Presicient Corp., USA

Licensed under the Academic Free License version 2.1, as specified in the License.txt file included in this software package, or at OpenSource.org http://www.opensource.org/licenses/afl-2.1.php.