The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Async::Simple::Pool - Simple manager of asyncronous tasks

SYNOPSIS

    Simplest way:

        use Async::Simple::Pool;
        use Data::Dumper;

        my $task = sub{
            my $data = shift;

            return $data->{i} * 10;
        };

        my $data = [ { i => 1 }, { i => 2 }, { i => 3 } ];

        my $pool = Async::Simple::Pool->new( $task, $data );

        my $result = $pool->process;

        say Dumper $result;

        $VAR1 = [
                  10,
                  20,
                  30
                ];


    Next step ( $pool->new() with various param sets ):

        Init of pool must be done in different ways:

        If you pass $data to $pool->new() then all processes will be started immediately.
        You can call $pool->process after "new" and get your results in no time.


        # Few ways to create pool object:

            my $pool = Async::Simple::Pool->new( $task, $data );

            my $pool = Async::Simple::Pool->new( %pool_params );               # Creates a new pool. The only one param "task" is required by default.

            my $pool = Async::Simple::Pool->new( $task, %pool_params );        # "$task" is required, al params are optional

            my $pool = Async::Simple::Pool->new( $data, %pool_params );        # $data - required, all pool_params except "task" are optional

            my $pool = Async::Simple::Pool->new( $task, $data, %pool_params ); # $task, $data - required, all pool_params are optional


        By default $task is required and must be a CodeRef.
        For example: $task = sub { my $task_X_data = shift; some useful code; return $task_X_result };

        $data can be ArrayRef of your tasks params. $data = [ $task_data1, $task_data2, ... ];

        Also $data can be HashRef of your tasks params. $data = { task_id1 => $task_data1, task_id2 => $task_data2, ... };
        In this case you can pass any scalars as keys of this hash. They will be mirrored into result

        The "pool->new()" creates "$pool->tasks_count" count of "$pool->task_class objects".
        By default task_class is "Async::Simple::Task::Fork".
        In this case "$pool->tasks_count" processes will be preforked (10 by default).
        Each of them starts to wait for data which will be provided by pool later.

        This is the main dispatcher of pool.
        It behavior depends on %pool_params.
        If you pass $data to $pool->process, this data will be added to execution.
        $results = $pool->process( $data );

        Type of $result depends on pool params that you pass in $pool->new( %pool_params );
        By default result is arrayref.

    Yet another step, %pool_params:

        data          - ArrayRef/HashRef. A data for tasks, as described above,

        tasks_count   - Integer number of workers. 10 by default.

        flush_data    - 1 - remove or 0 - don't remove results from pool, when they has been readed by $pool->process()

        result_type   - list (list of ready results) / full_list (list of all results) / hash (hash of ready results)

        break_on      - busy (when all workers are busy) / run(all data is executing) / done (all result are ready)

        task_class    - see explanation below. For example 'Your::Task::Class';

        task_params   - Any params you wish to pass to each task object to $task->new( %$here ).


    Last step: your own task class:

        You can make your own class of task. This class MUST has at least this code:

        package Your::Task::Class;

        use parent 'Async::Simple::Task';

        # Trying to read result.
        # If result found, call $self->result( $result );
        # If result is not ready, do nothing
        sub get {
            my $self = shift;

            return unless you have result;

            # result can be undef; Don't worry, all will be ok!
            $self->result( $result );
        };

        # Just push data to execution in your way
        sub put {
            my ( $self, $data ) = @_;

            $self->clear_answer; # Optional, if you plan to use your package regardlessly from this pool.

            # Pass your data to your processor here
        }

        1;

DESCRIPTION

    Allows to work with pool of async processes.

    There are many other similar packages you can find on CPAN: Async::Queue, Anyevent::FIFO, Task::Queue, Proc::Simple.

    The main difference of this package is convenience and simplicity of usage.

METHODS

    $pool->new

    $pool->process

SUPPORT AND DOCUMENTATION

    After installing, you can find documentation for this module with the
    perldoc command.

    perldoc Async::Simple::Task

    You can also look for information at:

        RT, CPAN's request tracker (report bugs here)
            http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Simple-Task

        AnnoCPAN, Annotated CPAN documentation
            http://annocpan.org/dist/Async-Simple-Task

        CPAN Ratings
            http://cpanratings.perl.org/d/Async-Simple-Task

        Search CPAN
            http://search.cpan.org/dist/Async-Simple-Task/

AUTHOR

    ANTONC <antonc@cpan.org>

LICENSE

    This program is free software; you can redistribute it and/or modify it
    under the terms of the the Artistic License (2.0). You may obtain a
    copy of the full license at:

    L<http://www.perlfoundation.org/artistic_license_2_0>

data

    You can pass hashref or arrayref as data

    When it is array, then each item of it will be passed to task as task params
    ids for internal format will be generated automatically by increasing from 0

    When is is hashref, then each value of hash will be passed to task as task params
    ids for internal format will be the same as in your hash

    In both cases it converts to internal format = { id => { source => paramref1, result => if_processed1 }, { source => paramref2, result => if_processed2 },  ... };

tasks_count

    Tasks_count - a number of tasks that will be created (defailt is 10).

flush_data

    flush_data - remove used data and results after is has been readed in $self->process;

result_type

    results_type = [hash|list|fulllist]

    when 'list'     - returns all results as list without placing them to the order of source data

    when 'fulllist' - returns all results as list with the full accordance to the source data order and positions

    when 'hash'     - resurns hash, where index is the position of corresponding source data item and value - result

break_on

    Condition of stopping waiting for results and do something other before next check.

    'busy' = $self->process will exit after filling all the tasks with tasks, without any checks

    'run'  = $self->process will end straight after the last task started

    'done' = $self->process will wait until all the tasks have finished their work

    Default is 'done'

task_class

    Task object class name

    Default is 'Async::Simple::Fork'

task_params

    Task init params.

    You can pass all these params directly into pool constructor.

    In this case task_params will be separated by magic;)

new( some various params )

    my $pool    = Async::Simple::Pool->new( $task );                   # Process should be started below.

    my $pool    = Async::Simple::Pool->new( $task, \@data );           # Process will be started inside new.

    my $pool    = Async::Simple::Pool->new( \@data, task => $task );   # Process will be started inside new.

    my $results = Async::Simple::Pool->new( $task, \@data )->results;  # Just do everything and give me my results!

    my $pool = Async::Simple::Pool->new( task => $task );              # Minimal init with hash of params, all by default, process sould be started manually below


    # full list of params for default task type (Async::Simple::Fork) with default values.

    my $pp = Async::Simple::Pool->new(
        tasks_count   => 10,
        break_on      => 'done', # [ 'busy', 'run', 'done' ]
        data          => \@data,
        task_class  => 'Async::Simple::Fork',
        task_params => { # Can be placed into pool params directly
            task          => $task,
            timeout       => 0.01,
        },
    );

    It is a good idea to run new() before gathering all this huge amount of data,
    and run $pool->process separately:

    my $pool = Async::Simple::Pool->new( $task );

    <collecting all your data after forking>

    my $results = $pool->process( \@your_data );

BUILD

    Internal.
    overrided init for magic with params.

process

    Main dispatcher of child tasks

    - writes data to tasks

    - checks for reasults


    We don't care about all internal fails, dying or hang ons of your tasks.

    If your task can do something bad, please write workaround for this case inside your "sub".

    Will be called inside new() in case you pass data there.

results

    Internal.
    Returns all results that already gathered
    by default returns hash, where keys equal to indexes of source data list
    and values are the results for data at these indexes

make_tasks

    Internal.
    All tasks are created here.
    Called from constructor.

read_tasks

    Internal.
    Reads busy tasks.

write_tasks

    Internal.
    Writes to free tasks.

_conv_data_to_internal

    Internal.
    Converts source data ( hashref or arrayref ) to internal representation ( hashref ).