The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

MCE::Shared::Condvar - Condvar helper class

VERSION

This document describes MCE::Shared::Condvar version 1.837

DESCRIPTION

This helper class for MCE::Shared provides a Scalar, Mutex, and primitives for conditional locking.

SYNOPSIS

use MCE::Shared;

my $cv = MCE::Shared->condvar( 0 );

# OO interface

$val = $cv->set( $val );
$val = $cv->get();
$len = $cv->len();

# conditional locking primitives

$cv->lock();
$cv->unlock();
$cv->broadcast(0.05);     # delay before broadcasting
$cv->broadcast();
$cv->signal(0.05);        # delay before signaling
$cv->signal();
$cv->timedwait(2.5);
$cv->wait();

# included, sugar methods without having to call set/get explicitly

$val = $cv->append( $string );     #   $val .= $string
$val = $cv->decr();                # --$val
$val = $cv->decrby( $number );     #   $val -= $number
$val = $cv->getdecr();             #   $val--
$val = $cv->getincr();             #   $val++
$val = $cv->incr();                # ++$val
$val = $cv->incrby( $number );     #   $val += $number
$old = $cv->getset( $new );        #   $o = $v, $v = $n, $o

EXAMPLE

The following example demonstrates barrier synchronization.

use MCE;
use MCE::Shared;
use Time::HiRes qw(usleep);

my $num_workers = 8;
my $count = MCE::Shared->condvar(0);
my $state = MCE::Shared->scalar('ready');

my $microsecs = ( lc $^O =~ /mswin|mingw|msys|cygwin/ ) ? 0 : 200;

# The lock is released upon entering ->broadcast, ->signal, ->timedwait,
# and ->wait. For performance reasons, the condition variable is *not*
# re-locked prior to exiting the call. Therefore, obtain the lock when
# synchronization is desired subsequently.

sub barrier_sync {
   usleep($microsecs) while $state->get eq 'down';

   $count->lock;
   $state->set('up'), $count->incr;

   if ($count->get == $num_workers) {
      $count->decr, $state->set('down');
      $count->broadcast;
   }
   else {
      $count->wait while $state->get eq 'up';
      $count->lock;
      $state->set('ready') if $count->decr == 0;
      $count->unlock;
   }
}

sub user_func {
   my $id = MCE->wid;
   for (1 .. 400) {
      MCE->print("$_: $id\n");
      barrier_sync();  # made possible by MCE::Shared::Condvar
    # MCE->sync();     # same thing via the MCE-Core API
   }
}

my $mce = MCE->new(
   max_workers => $num_workers,
   user_func   => \&user_func
)->run;

# Time taken from a 2.6 GHz machine running Mac OS X.
# threads::shared:   0.207s  Perl threads
#   forks::shared:  36.426s  child processes
#     MCE::Shared:   0.353s  child processes
#        MCE Sync:   0.062s  child processes

API DOCUMENTATION

new ( [ value ] )

Constructs a new condition variable. Its value defaults to 0 when value is not specified.

use MCE::Shared;

$cv = MCE::Shared->condvar( 100 );
$cv = MCE::Shared->condvar;
set ( value )

Sets the value associated with the cv object. The new value is returned in scalar context.

$val = $cv->set( 10 );
$cv->set( 10 );
get

Returns the value associated with the cv object.

$val = $cv->get;
len

Returns the length of the value. It returns the undef value if the value is not defined.

$len = $var->len;
lock

Attempts to grab the lock and waits if not available. Multiple calls to $cv-lock> by the same process or thread is safe. The mutex will remain locked until $cv-unlock> is called.

$cv->lock;
unlock

Releases the lock. A held lock by an exiting process or thread is released automatically.

$cv->unlock;
signal ( [ floating_seconds ] )

Releases a held lock on the variable. Then, unblocks one process or thread that's waiting on that variable. The variable is *not* locked upon return.

Optionally, delay floating_seconds before signaling.

$count->signal;
$count->signal( 0.5 );
broadcast ( [ floating_seconds ] )

The broadcast method works similarly to signal. It releases a held lock on the variable. Then, unblocks all the processes or threads that are blocked in a condition wait on the variable, rather than only one. The variable is *not* locked upon return.

Optionally, delay floating_seconds before broadcasting.

$count->broadcast;
$count->broadcast( 0.5 );
wait

Releases a held lock on the variable. Then, waits until another thread does a signal or broadcast for the same variable. The variable is *not* locked upon return.

$count->wait() while $state->get() eq "bar";
timedwait ( floating_seconds )

Releases a held lock on the variable. Then, waits until another thread does a signal or broadcast for the same variable or if the timeout exceeds floating_seconds.

A false value is returned if the timeout is reached, and a true value otherwise. In either case, the variable is *not* locked upon return.

$count->timedwait( 10 ) while $state->get() eq "foo";

SUGAR METHODS

This module is equipped with sugar methods to not have to call set and get explicitly. In shared context, the benefit is atomicity and reduction in inter-process communication.

The API resembles a subset of the Redis primitives http://redis.io/commands#strings without the key argument.

append ( value )

Appends a value at the end of the current value and returns its new length.

$len = $cv->append( "foo" );
decr

Decrements the value by one and returns its new value.

$num = $cv->decr;
decrby ( number )

Decrements the value by the given number and returns its new value.

$num = $cv->decrby( 2 );
getdecr

Decrements the value by one and returns its old value.

$old = $cv->getdecr;
getincr

Increments the value by one and returns its old value.

$old = $cv->getincr;
getset ( value )

Sets the value and returns its old value.

$old = $cv->getset( "baz" );
incr

Increments the value by one and returns its new value.

$num = $cv->incr;
incrby ( number )

Increments the value by the given number and returns its new value.

$num = $cv->incrby( 2 );

CHAMENEOS DEMONSTRATION

The MCE example is derived from the chameneos example by Jonathan DePeri and Andrew Rodland.

use 5.010;
use strict;
use warnings;

use MCE::Hobo;
use MCE::Shared;
use Time::HiRes 'time';

die 'No argument given' if not @ARGV;

my $start = time;
my %color = ( blue => 1, red => 2, yellow => 4 );

my ( @colors, @complement );

@colors[values %color] = keys %color;

for my $triple (
  [qw(blue blue blue)],
  [qw(red red red)],
  [qw(yellow yellow yellow)],
  [qw(blue red yellow)],
  [qw(blue yellow red)],
  [qw(red blue yellow)],
  [qw(red yellow blue)],
  [qw(yellow red blue)],
  [qw(yellow blue red)],
) {
  $complement[ $color{$triple->[0]} | $color{$triple->[1]} ] =
    $color{$triple->[2]};
}

my @numbers = qw(zero one two three four five six seven eight nine);

sub display_complements
{
  for my $i (1, 2, 4) {
    for my $j (1, 2, 4) {
      print "$colors[$i] + $colors[$j] -> $colors[ $complement[$i | $j] ]\n";
    }
  }
  print "\n";
}

sub num2words
{
  join ' ', '', map $numbers[$_], split //, shift;
}

# Construct condvars and queues first before other shared objects or in
# any order when IO::FDPass is installed, used by MCE::Shared::Server.

my $meetings = MCE::Shared->condvar();

tie my @creatures, 'MCE::Shared';
tie my $first, 'MCE::Shared', undef;
tie my @met, 'MCE::Shared';
tie my @met_self, 'MCE::Shared';

sub chameneos
{
  my $id = shift;

  while (1) {
    $meetings->lock();

    unless ($meetings->get()) {
      $meetings->unlock();
      last;
    }

    if (defined $first) {
      $creatures[$first] = $creatures[$id] =
        $complement[$creatures[$first] | $creatures[$id]];

      $met_self[$first]++ if ($first == $id);
      $met[$first]++;  $met[$id]++;
      $meetings->decr();
      $first = undef;

      # Unlike threads::shared (condvar) which retains the lock
      # while in the scope, MCE::Shared signal and wait methods
      # must be called prior to leaving the block, due to lock
      # being released upon return.

      $meetings->signal();
    }
    else {
      $first = $id;
      $meetings->wait();  # ditto ^^
    }
  }
}

sub pall_mall
{
  my $N = shift;
  @creatures = map $color{$_}, @_;
  my @threads;

  print " ", join(" ", @_);
  $meetings->set($N);

  for (0 .. $#creatures) {
    $met[$_] = $met_self[$_] = 0;
    push @threads, MCE::Hobo->create(\&chameneos, $_);
  }
  for (@threads) {
    $_->join();
  }

  $meetings->set(0);

  for (0 .. $#creatures) {
    print "\n$met[$_]", num2words($met_self[$_]);
    $meetings->incrby($met[$_]);
  }

  print "\n", num2words($meetings->get()), "\n\n";
}

display_complements();

pall_mall($ARGV[0], qw(blue red yellow));
pall_mall($ARGV[0], qw(blue red yellow red yellow blue red yellow red blue));

printf "duration: %0.03f\n", time - $start;

CREDITS

The conditional locking feature is inspired by threads::shared.

LIMITATIONS

Perl must have IO::FDPass for constructing a shared condvar or queue while the shared-manager process is running. For platforms where IO::FDPass isn't possible, construct condvar and queue before other classes. On systems without IO::FDPass, the manager process is delayed until sharing other classes or started explicitly.

use MCE::Shared;

my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;

my $cv  = MCE::Shared->condvar();
my $que = MCE::Shared->queue();

MCE::Shared->start() unless $has_IO_FDPass;

Regarding mce_open, IO::FDPass is needed for constructing a shared-handle from a non-shared handle not yet available inside the shared-manager process. The workaround is to have the non-shared handle made before the shared-manager is started. Passing a file by reference is fine for the three STD* handles.

# The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.

mce_open my $shared_in,  "<",  \*STDIN;   # ok
mce_open my $shared_out, ">>", \*STDOUT;  # ok
mce_open my $shared_err, ">>", \*STDERR;  # ok
mce_open my $shared_fh1, "<",  "/path/to/sequence.fasta";  # ok
mce_open my $shared_fh2, ">>", "/path/to/results.log";     # ok

mce_open my $shared_fh, ">>", \*NON_SHARED_FH;  # requires IO::FDPass

The IO::FDPass module is known to work reliably on most platforms. Install 1.1 or later to rid of limitations described above.

perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"

INDEX

MCE, MCE::Hobo, MCE::Shared

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>