NAME

OP::Persistence::Async

DESCRIPTION

Non-blocking statement handle access for OP classes.

POE::Wheel::Run is used for handling database access in a forked background process. Unlike most other modules using POE for DB access, this module foregoes the marshaling of (potentially huge) data sets and the plethora of delegate methods which usually come with the territory of asynchronous DB access.

Instead, this mix-in exposes each query's statement handle (DBI::st object, aka $sth) directly to the caller, allowing for more memory-efficient handling of data, all while using the usual DBI method calls which Perl developers are familiar with.

The coquery class method launches a query in the background. The finish function waits for any pending workers to finish up. transmit pipes data structures from child process to the parent's callback sub.

METHODS

  • $class->coquery(Str $query), finish

    INSERT/UPDATE Handler - Executes the received query in a background process. For INSERT and UPDATE queries, just provide the query as an arg.

    use OP;
    
    # ...
    
    $class->coquery(
      sprintf('update %s set foo = 42', $class->tableName)
    );
    
    # Wait for workers to finish
    finish;
  • $class->coquery(Str $query, Code $sub), finish

    SELECT Handler - Executes the received query in a background process, handing off the statement handle to the received sub.

    $class->coquery(
      sprintf('select name from %s', $class->tableName),
      sub(DBI::st $sth) {
        while( my ( $name ) = $sth->fetchrow_array ) {
          print "Have name: $name\n";
    
          #
          # Do something with $class, maybe some synchronous queries
          # in this subprocess.
          #
        }
      }
    );
    
    finish;
  • $class->coquery(Str $query, Code $sub, Code $callback), transmit($value, [$value, ...]), finish

    SELECT Handler with Callback - Executes the received query in a background process, handing off the statement handle to the received sub. Hand off any transmitted data structures to the received callback sub, which runs in the parent process.

    The transmit function encodes any received data structures, and propagates up to the parent.

    $class->coquery(
      sprintf('select id, name from %s', $class->tableName),
      sub(DBI::st $sth) {
        while( my $hash = $sth->fetchrow_hashref ) {
          print "$$ - Child is transmitting $hash->{name}\n";
    
          transmit $hash;
        }
      },
      sub(Hash $hash) {
        print "$$ - Parent process received $hash->{name}\n";
      }
    );
    
    finish;

FUNCTIONS

  • finish

    Waits for any pending queries to finish execution. Does not block execution of other POE or Coro threads.

  • transmit($value, [$value, ...])

    Propagates the received values, to be collected within the parent process's optional callback sub. Will not work for GLOB, CODE, or circular references.

  • convey($fromSub, [$toSub])

    Run the received CODE block ($fromSub) in a subprocess, and reap any transmitted results using the received callback in the parent process.

    use OP;
    
    convey(
      # $fromSub:
      sub {
        transmit "Hello from $$...";
        sleep 3;
        transmit "And hello again (from $$ of course)...";
      },
    
      # $toSub:
      sub(Str $str) {
        print "$$ received string: $str\n";
      }
    );
    
    finish;
    
    #
    # 26464 received string: Hello from 26466...
    # 26464 received string: And hello again (from 26466 of course)...
    #

SEE ALSO

This file is part of OP.