NAME

Beekeeper::Worker - Base class for creating services

VERSION

Version 0.03

SYNOPSIS

package MyApp::Worker;

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

sub on_startup {
    my $self = shift;
    
    $self->accept_notifications(
        'myapp.msg' => 'got_message',
    );
    
    $self->accept_remote_calls(
        'myapp.sum' => 'do_sum',
    );

    log_info 'Ready';
}

sub authorize_request {
    my ($self, $req) = @_;

    return BKPR_REQUEST_AUTHORIZED;
}

sub got_message {
    my ($self, $params) = @_;
    warn $params->{message};
}

sub do_sum {
    my ($self, $params) = @_;
    return $params->[0] + $params->[1];
}

DESCRIPTION

Base class for creating services.

METHODS

CONSTRUCTOR

Beekeeper::Worker objects are created automatically by Beekeeper::WorkerPool after spawning new processes.

METHODS

on_startup

This method is executed on a fresh worker process immediately after it was spawned, after connecting to the broker and initializing the logger.

It is placeholder intended to be overrided in subclasses, which in their implementation perform startup tasks and declare which job methods and notifications will be handled.

This is the place to initialize, for example, persistent database or cache connections.

After this method returns the worker will wait for incoming events to handle.

on_shutdown

This method is executed just before a worker process is stopped.

It can be overrided as needed, the default implementation does nothing.

authorize_request( $req )

This method MUST be overrided in your worker classes, as the default behavior is to deny the execution of any request.

When a request is received this method is called before executing the corresponding callback, and it must return the exported constant BKPR_REQUEST_AUTHORIZED in order to authorize it. Returning any other value will result in the request being ignored.

This is the place to handle application authentication and authorization.

log_handler

By default, all workers use a Beekeeper::Logger logger which logs errors and warnings to files and also to a topic on the message bus. The command line tool bkpr-log allows to inspect in real time the logs from the message bus.

To replace this default log mechanism for another one of your choice, you must override the class log_handler method and make that return an object implementing a log method.

For convenience you can import the ':log' symbols and expose to your class the functions log_fatal, log_alert, log_critical, log_error, log_warn, log_warning, log_notice, log_info, log_debug and log_trace.

These will call the underlying log method of your logger class, if the severity is equal or higher than $Beekeeper::Worker::LogLevel, which is set to allow warnings by default. You can increase the log level to include debug info with the --debug option of bkpr, or from class config in file pool.config.json.

Using these functions makes very easy to switch logging backends at a later date.

All warnings and errors generated by the execution of the worker code are logged, unless you specifically catch and ignore them (or raise the log level).

RPC call methods

In order to make RPC calls to another services, methods send_notification, call_remote, call_remote_async, fire_remote and wait_async_calls are automatically imported from Beekeeper::Client.

accept_notifications ( $method => $callback, ... )

Make this worker start accepting specified notifications from message bus.

$method is a string with the format "{service_class}.{method}". A default or fallback handler can be specified using a wildcard as "{service_class}.*".

$callback is a method name or a coderef that will be called when a notification is received. When executed, the callback will receive two parameters $params (which contains the notification data itself) and $req which is a Beekeeper::JSONRPC::Notification object (usually redundant unless you need to inspect the request MQTT properties).

Notifications are not expected to return a value, any value returned from its callback is ignored.

The callback is executed within an eval block, if it dies the error will be logged but otherwise the worker will continue running.

Example:

package MyWorker;

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

sub on_startup {
    my $self = shift;
    
    $self->accept_notifications(
        'foo.bar' => 'bar',       # call $self->bar       for notifications 'foo.bar'
        'foo.baz' =>  $coderef,   # call $coderef->()     for notifications 'foo.baz'
        'foo.*'   => 'fallback',  # call $self->fallback  for any other 'foo.*'
    );
}  

sub bar {
     my ($self, $params, $req) = @_
     
     # $self is a MyWorker object
     # $params is a ref to the notification data
     # $req is a Beekeeper::JSONRPC::Notification object

     log_warn "Got a notification foo.bar";
}

accept_remote_calls ( $method => $callback, ... )

Make this worker start accepting specified RPC requests from message bus.

$method is a string with the format "{service_class}.{method}". A default or fallback handler can be specified using a wildcard as "{service_class}.*".

$callback is a method name or a coderef that will be called when a request is received. When executed, the callback will receive two parameters $params (which contains the notification data itself) and $req which is a Beekeeper::JSONRPC::Request object (usually redundant unless you need to inspect the request MQTT properties).

The value or data ref returned by the callback will be sent back to the caller as response.

The callback is executed within an eval block, if it dies the error will be logged but otherwise the worker will continue running, and the caller will receive a generic error response.

Example:

package MyWorker;

use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';

sub on_startup {
    my $self = shift;
    
    $self->accept_remote_calls(
        'foo.inc' => 'increment',  # call $self->increment  for requests to 'foo.inc'
        'foo.baz' =>  $coderef,    # call $coderef->()      for requests to 'foo.baz'
        'foo.*'   => 'fallback',   # call $self->fallback   for any other 'foo.*'
    );
}

sub increment {
     my ($self, $params, $req) = @_
     
     # $self is a MyWorker object
     # $params is a ref to the parameters of the request
     # $req is a Beekeeper::JSONRPC::Request object

     log_warn "Got a call to foo.inc";

     return $params->{number} + 1;
}

stop_accepting_notifications ( $method, ... )

Make this worker stop accepting specified notifications from message bus.

$method must be one of the strings used previously in accept_notifications.

stop_accepting_calls ( $method, ... )

Make this worker stop accepting specified RPC requests from message bus.

$method must be one of the strings used previously in accept_remote_calls.

stop_working

Make this worker stop accepting new RPC requests, process all requests already received, execute on_shutdown method, and then exit.

This is the default signal handler for TERM signal.

Please note that it is not possible to stop worker pools calling this method, as WorkerPool will immediately respawn another worker after the current one exits.

SEE ALSO

Beekeeper::Client, Beekeeper::Logger, Beekeeper::WorkerPool.

AUTHOR

José Micó, jose.mico@gmail.com

COPYRIGHT AND LICENSE

Copyright 2015-2021 José Micó.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language itself.

This software is distributed in the hope that it will be useful, but it is provided “as is” and without any express or implied warranties. For details, see the full text of the license in the file LICENSE.