NAME

Thread::Queue::Duplex - thread-safe request/response queue with identifiable elements

SYNOPSIS

use Thread::Queue::Duplex;
my $q = new Thread::Queue::Duplex;
#
#	enqueue elements, returning a unique queue ID
#	(used in the client)
#
my $id = $q->enqueue("foo", "bar");
#
#	dequeue next available element (used in the server),
#	waiting indefinitely for an element to be made available
#	returns shared arrayref, first element is unique ID
#
my $foo = $q->dequeue;
#
#	dequeue next available element (used in the server),
#	returns undef if no element immediately available
#	otherwise, returns shared arrayref, first element is unique ID
#
my $foo = $q->dequeue_nb;
#
#	dequeue next available element (used in the server),
#	returned undef if no element available within $timeout 
#	seconds; otherwise, returns shared arrayref, first 
#	element is unique ID
#
my $foo = $q->dequeue_until($timeout);
#
#	returns number of items still in queue
#
my $left = $q->pending;
#
#	maps a response for the
#	queued element identified by $id;
#
$q->respond($id, @list);
#
#	tests for a response to the queued 
#	element identified by $id; returns undef if
#	not yet available, else returns the queue object
#
my $result = $q->ready($id);
#
#	returns list of available response ID's;
#	if list provided, only returns ID's from the list.
#	Returns undef if none available.
#	In scalar context, returns only first available;
#	Else a list of available IDs
#
my @ids = $q->available();
#
#	wait for and return the response for the 
#	specified unique identifier
#	(dequeue_response is alias)
#
my $result = $q->wait($id);
my $result = $q->dequeue_response($id);
#
#	waits up to $timeout seconds for a response to 
#	the queued element identified by $id; returns undef if
#	not available within $timeout, else returns the queue object
#
my $result = $q->wait_until($id, $timeout);
#
#	wait for a response to the queued 
#	elements listed in @ids, returning a hashref of
#	the first available response(s), keyed by id
#
my $result = $q->wait_any(@ids);
#
#	wait upto $timeout seconds for a response to 
#	the queued elements listed in @ids, returning 
#	a hashref of the first available response(s), keyed by id
#	Returns undef if none available in $timeout seconds
#
my $result = $q->wait_any_until($timeout, @ids);
#
#	wait for responses to all the queued 
#	elements listed in @ids, returning a hashref of
#	the response(s), keyed by id
#
my $result = $q->wait_all(@ids);
#
#	wait upto $timeout seconds for responses to 
#	all the queued elements listed in @ids, returning 
#	a hashref of the response(s), keyed by id
#	Returns undef if all responses not recv'd 
#	in $timeout seconds
#
my $result = $q->wait_all_until($timeout, @ids);

DESCRIPTION

A mapped queue, similar to Thread::Queue, except that as elements are queued, they are assigned unique identifiers, which are used to identify responses returned from the dequeueing thread. This class provides a simple RPC-like mechanism between multiple client and server threads, so that a single server thread can safely multiplex requests from multiple client threads.

Thread::Queue::Duplex objects encapsulate

a shared array, used as the queue (same as Thread::Queue)
a shared scalar, used to provide unique identifier sequence numbers
a shared hash, aka the mapping hash, used to return responses to enqueued elements, using the generated uniqiue identifier as the hash key

A normal processing sequence for Thread::Queue::Duplex might be:

#
#	Thread A (the client):
#
	...marshal parameters for a coroutine...
	my $id = $q->enqueue('function_name', \@paramlist);
	my $results = $q->dequeue_response($id);
	...process $results...
#
#	Thread B (the server):
#
	while (1) {
		my $call = $q->dequeue;
		my ($id, $func, @params) = @$call;
		$q->respond($id, $self->$func(@params));
	}

FUNCTIONS AND METHODS

new

The new function creates a new empty queue, and associated mapping hash.

enqueue LIST

Creates a shared array, pushes a unique identifier onto the shared array, then pushes the LIST onto the array, then pushes the shared arrayref onto the queue.

enqueue_urgent LIST

Same as enqueue, but adds the element to head of queue, rather than tail.

dequeue

Waits indefinitely for an element to become available in the queue, then removes and returns it.

dequeue_nb

The dequeue_nb method is identical to dequeue(), except it will return undef immediately if there are no elements currently in the queue.

dequeue_until

Identical to dequeue(), except it accepts a $timeout parameter specifying a duration (in seconds) to wait for an available element. If no element is available within the $timeout, it returns undef.

pending

Returns the number of items still in the queue.

respond($id [, LIST ])

Creates a new element in the mapping hash, keyed by $id, with a value set to a shared arrayref containing LIST.

ready($id)

Tests for a response to a uniquely identified previously enqueue'd LIST. Returns undef if no response is available, otherwise returns the Thread::Queue::Duplex object.

available([@ids])

Returns list of available response ID's; if @ids provided, only returns ID's from the list. Returns undef if none available; in scalar context, returns only first available; else a returns a list of available IDs.

wait($id) aka dequeue_response($id)

Waits indefinitely for a response to a uniquely identified previously enqueue'd LIST. Returns the returned result.

wait_until($id, $timeout)

Waits up to $timeout seconds for a response to to a uniquely identified previously enqueue'd LIST. Returns undef if no response is available in the specified $timeout duration, otherwise, returns the result.

wait_any(@ids)

Wait indefinitely for a response to any of the previously enqueue'd elements specified in the the supplied @ids. Returns a hashref of available responses keyed by their identifiers

wait_any_until($timeout, @ids)

Wait upto $timeout seconds for a response to any of the previously enqueue'd elements specified in the the supplied @ids. Returns a hashref of available responses keyed by their identifiers, or undef if none available within $timeout seconds.

wait_all(@ids)

Wait indefinitely for a response to all the previously enqueue'd elements specified in the supplied @ids. Returns a hashref of responses keyed by their identifiers.

wait_all_until($timeout, @ids)

Wait upto $timeout seconds for a response to all the previously enqueue'd elements specified in the supplied @ids. Returns a hashref of responses keyed by their identifiers, or undef if all responses are not available within $timeout seconds.

SEE ALSO

threads, threads::shared, Thread::Queue

AUTHOR, COPYRIGHT, & LICENSE

Dean Arnold, Presicient Corp. darnold@presicient.com

Copyright(C) 2005, Presicient Corp., USA

Permission is granted to use this software under the same terms as Perl itself. Refer to the Perl Artistic License for details.