AnyEvent::MySQL - Pure Perl AnyEvent socket implementation of MySQL client


Version 1.2.1


This package is used in my company since 2012 to today (2017). I think it should be stable. (though some data type fetching through prepared command are not implemented)

Please read the file as a usage example. >w<


use strict;
use warnings;

    eval {
        require AE;
        require Data::Dumper;
        require Devel::StackTrace;
        require EV;
    if( $@ ) {
        warn "require module fail: $@";

$EV::DIED = sub {
    print "EV::DIED: $@\n";
    print Devel::StackTrace->new->as_string;

use lib 'lib';
use AnyEvent::MySQL;

my $end = AE::cv;

my $dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=;port=3306", "ptest", "pass", { PrintError => 1 }, sub {
    my($dbh) = @_;
    if( $dbh ) {
        warn "Connect success!";
        $dbh->pre_do("set names latin1");
        $dbh->pre_do("set names utf8");
    else {
        warn "Connect fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";

$dbh->do("select * from t1 where a<=?", {}, 15, sub {
    my $rv = shift;
    if( defined($rv) ) {
        warn "Do success: $rv";
    else {
        warn "Do fail: $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";

my $end2 = AE::cv;

#$dbh->prepare("update t1 set a=1 where b=1", sub {
#$dbh->prepare("select * from t1", sub {
my $sth = $dbh->prepare("select b, a aaa from t1 where a>?", sub {
#$dbh->prepare("select * from type_all", sub {
    warn "prepared!";


my $end3 = AE::cv;

$sth->execute(1, sub {
    warn "executed! $_[0]";

my $fth = $end3->recv;

my $end4 = AE::cv;

$fth->bind_col(2, \my $a, sub {
    warn $_[0];
my $fetch; $fetch = sub {
    $fth->fetch(sub {
        if( $_[0] ) {
            warn "Get! $a";
        else {
            warn "Get End!";
            undef $fetch;
}; $fetch->();

#$fth->bind_columns(\my($a, $b), sub {
#    warn $_[0];
#    warn $AnyEvent::MySQL::errstr;
#my $fetch; $fetch = sub {
#    $fth->fetch(sub {
#        if( $_[0] ) {
#            warn "Get! ($a, $b)";
#            $fetch->();
#        }
#        else {
#            undef $fetch;
#            $end4->send;
#        }
#    });
#}; $fetch->();

#my $fetch; $fetch = sub {
#    $fth->fetchrow_array(sub {
#        if( @_ ) {
#            warn "Get! (@_)";
#            $fetch->();
#        }
#        else {
#            undef $fetch;
#            $end4->send;
#        }
#    });
#}; $fetch->();

#my $fetch; $fetch = sub {
#    $fth->fetchrow_arrayref(sub {
#        if( $_[0] ) {
#            warn "Get! (@{$_[0]})";
#            $fetch->();
#        }
#        else {
#            undef $fetch;
#            $end4->send;
#        }
#    });
#}; $fetch->();

#my $fetch; $fetch = sub {
#    $fth->fetchrow_hashref(sub {
#        if( $_[0] ) {
#            warn "Get! (@{[%{$_[0]}]})";
#            $fetch->();
#        }
#        else {
#            undef $fetch;
#            $end4->send;
#        }
#    });
#}; $fetch->();


#tcp_connect 0, 3306, sub {
#    my $fh = shift;
#    my $hd = AnyEvent::Handle->new( fh => $fh );
#    AnyEvent::MySQL::Imp::do_auth($hd, 'tiwi', '', sub {
#        undef $hd;
#        warn $_[0];
#        $end->send;
#    });

my $end5 = AE::cv;

$dbh->selectall_arrayref("select a*2, b from t1 where a<=?", {}, 15, sub {
    warn "selectall_arrayref";
    warn Dumper($_[0]);

$dbh->selectall_hashref("select a*2, b from t1", 'b', sub {
    warn "selectall_hashref";
    warn Dumper($_[0]);

$dbh->selectall_hashref("select a*2, b from t1", ['b', 'a*2'], sub {
    warn "selectall_hashref";
    warn Dumper($_[0]);

$dbh->selectall_hashref("select a*2, b from t1", sub {
    warn "selectall_hashref";
    warn Dumper($_[0]);

$dbh->selectcol_arrayref("select a*2, b from t1", { Columns => [1,2,1] }, sub {
    warn "selectcol_arrayref";
    warn Dumper($_[0]);

$dbh->selectall_arrayref("select * from t3", sub {
    warn "selectall_arrayref t3";
    warn Dumper($_[0]);

$dbh->selectrow_array("select * from t1 where a>? order by a", {}, 2, sub {
    warn "selectrow_array";
    warn Dumper(\@_);

$dbh->selectrow_arrayref("select * from t1 where a>? order by a", {}, 2, sub {
    warn "selectrow_arrayref";
    warn Dumper($_[0]);

$dbh->selectrow_hashref("select * from t1 where a>? order by a", {}, 2, sub {
    warn "selectrow_hashref";
    warn Dumper($_[0]);

my $st = $dbh->prepare("select * from t1 where a>? order by a");

$st->execute(2, sub {
    warn "fetchall_arrayref";
    warn Dumper($_[0]->fetchall_arrayref());

$st->execute(2, sub {
    warn "fetchall_hashref(a)";
    warn Dumper($_[0]->fetchall_hashref('a'));

$st->execute(2, sub {
    warn "fetchall_hashref";
    warn Dumper($_[0]->fetchall_hashref());

$st->execute(2, sub {
    warn "fetchcol_arrayref";
    warn Dumper($_[0]->fetchcol_arrayref());

$dbh->begin_work( sub {
    warn "txn begin.. @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
} );

$dbh->do("update t1 set a=? b=?", {}, 3, 4, sub {
    warn "error update @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
} );

$dbh->do("update t1 set b=b+1", {}, sub {
    warn "after error update @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
} );

$dbh->commit( sub {
    warn "aborted commit @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
} );

$dbh->do("update t1 set b=b+1", {}, sub {
    warn "after aborted commit @_ | $AnyEvent::MySQL::errstr ($AnyEvent::MySQL::err)";
} );

#my $txh = $dbh->begin_work(sub {
#    warn "txn begin.. @_";
#$dbh->do("insert into t1 values (50,50)", { Tx => $txh }, sub {
#    warn "insert in txn @_ insertid=".$dbh->last_insert_id;
#$txh->rollback(sub {
#    warn "rollback txn @_";
#$dbh->selectall_arrayref("select * from t1", sub {
#    warn "check rollback txn: ".Dumper($_[0]);
#my $txh2 = $dbh->begin_work(sub {
#    warn "txn2 begin.. @_";
#$dbh->do("insert into t1 values (50,50)", { Tx => $txh2 }, sub {
#    warn "insert in txn2 @_ insertid=".$dbh->last_insert_id;
#$txh2->commit(sub {
#    warn "commit txn2 @_";
#$dbh->selectall_arrayref("select * from t1", sub {
#    warn "check commit txn: ".Dumper($_[0]);
#$dbh->do("delete from t1 where a=50", sub {
#    warn "remove the effect @_";
#my $update_st;
#my $txh3; $txh3 = $dbh->begin_work(sub {
#    warn "txn3 begin.. @_";
#    $update_st = $dbh->prepare("insert into t1 values (?,?)", sub {
#        warn "prepare insert @_";
#    });
#    $update_st->execute(60, 60, { Tx => $txh3 }, sub {
#        warn "insert 60 @_";
#    });
#    $dbh->selectall_arrayref("select * from t1", { Tx => $txh3 }, sub {
#        warn "select in txn3: ".Dumper($_[0]);
#    });
#    $txh3->rollback(sub {
#        warn "txh3 rollback @_";
#    });
#    $dbh->selectall_arrayref("select * from t1", sub {
#        warn "select out txn3: ".Dumper($_[0]);
#    });

#$st_all = $dbh->prepare("select `date`, `time`, `datetime`, `timestamp` from all_type", sub {
#    warn "prepare st_all @_";


my $readonly_dbh = AnyEvent::MySQL->connect("DBI:mysql:database=test;host=;port=3306", "ptest", "pass", { ReadOnly => 1 }, sub {
  # ... we can only use "select" and "show" and "set names" command on this handle


$dbh = AnyEvent::MySQL->connect($data_source, $username, [$auth, [\%attr,]] $cb->($dbh, 1))

$dbh = AnyEvent::MySQL::db->new($dsn, $username, [$auth, [\%attr,]] [$cb->($dbh, $next_guard)])

$cb will be called when each time the db connection is connected, reconnected,
or tried but failed.

If failed, the $dbh in the $cb's args will be undef.

You can do some connection initialization here, such as
 set names utf8;

But you should NOT rely on this for work flow control,
cause the reconnection can occur anytime.

$error_num = $dbh->err

$error_str = $dbh->errstr

$rv = $dbh->last_insert_id

Non-blocking get the value immediately

$dbh->do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])

$dbh->pre_do($statement, [\%attr, [@bind_values,]] [$cb->($rv)])

This method is like $dbh->do except that $dbh->pre_do will unshift
job into the queue instead of push.

This method is for the initializing actions in the AnyEvent::MySQL->connect's callback

$dbh->selectall_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))

$dbh->selectall_hashref($statement, [$key_field|\@key_field], [\%attr, [@bind_values,]] $cb->($hash_ref))

$dbh->selectcol_arrayref($statement, [\%attr, [@bind_values,]] $cb->($ary_ref))

$dbh->selectrow_array($statement, [\%attr, [@bind_values,]], $cb->(@row_ary))

$dbh->selectrow_arrayref($statement, [\%attr, [@bind_values,]], $cb->($ary_ref))

$dbh->selectrow_hashref($statement, [\%attr, [@bind_values,]], $cb->($hash_ref))

$sth = $dbh->prepare($statement, [$cb->($sth)])

$cb will be called each time when this statement is prepared
(or re-prepared when the db connection is reconnected)

if the preparation is not success,
the $sth in the $cb's arg will be undef.

So you should NOT rely on this for work flow controlling.




$dbh->ping(sub {my $alive = shift;});

$sth = AnyEvent::MySQL::st->new($dbh, $statement, [$cb->($sth)])

$sth->execute(@bind_values, [\%attr,] [$cb->($fth/$rv)])

$fth = AnyEvent::MySQL::ft->new(\@data_set)

$rc = $fth->bind_columns(@list_of_refs_to_vars_to_bind, [$cb->($rc)])

$rc = $fth->bind_col($col_num, \$col_variable, [$cb->($rc)])

$rv = $fth->fetch([$cb->($rv)])

@row_ary = $fth->fetchrow_array([$cb->(@row_ary)])

$ary_ref = $fth->fetchrow_arrayref([$cb->($ary_ref)])

$hash_ref = $fth->fetchrow_hashref([$cb->($hash_ref)])

$ary_ref = $fth->fetchall_arrayref([$cb->($ary_ref)])

$hash_ref = $fth->fetchall_hashref([($key_field|\@key_field),] [$cb->($hash_ref)])

$ary_ref = $fth->fetchcol_arrayref([\%attr], [$cb->($ary_ref)])


Cindy Wang (CindyLinz)


Dmitriy Shamatrin justnoxx@github

clking clking@github


Copyright 2011-2015 Cindy Wang (CindyLinz).

This program is free software; you can redistribute it and/or modify it under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License.

See for more information.