NAME
TheSchwartz::JobScheduler - Lightweight TheSchwartz job dispatcher
VERSION
version 0.001
SYNOPSIS
use TheSchwartz::JobScheduler;
my @databases = (
{ id => 'db_1', prefix => 'theschwartz_schema.', },
{ id => 'db_2', prefix => 'theschwartz_schema.', },
);
use Database::ManagedHandle;
sub get_dbh {
my ($db_id) = @_;
my $mh1 = Database::ManagedHandle->instance;
return $mh1->dbh( $db_id );
}
my $client = TheSchwartz::JobScheduler->new(
databases => \@databases,
);
my $job_id = $client->insert('funcname', 'arg');
my $job1 = TheSchwartz::JobScheduler::Job->new;
$job1->funcname("WorkerName");
$job1->arg({ foo => "bar" });
$job1->uniqkey("uniqkey");
$job1->run_after( time + 60 );
$client->insert($job1);
my $job2 = TheSchwartz::JobScheduler::Job->new(
funcname => 'WorkerName',
arg => { foo => 'baz' },
);
$client->insert($job2);
my @jobs = $client->list_jobs({ funcname => 'funcname' });
for my $job (@jobs) {
print $job->jobid;
}
DESCRIPTION
TheSchwartz::JobScheduler is an interface to insert a new job into TheSchwartz job queue (maintained by a database).
The rationale behind this module is using it in a long running web service, for instance, in Dancer2. Because the database connections cannot be relied to stay open indefinitely, we get a new database handle for each operation.
This module is solely created for the purpose of injecting a new job from web servers without loading additional TheSchwartz and Data::ObjectDriver modules onto your system. Your TheSchwartz job worker processes will still need to be implemented using the full featured TheSchwartz::Worker module.
Configuration: Databases and Their Handles
TheSchwartz can use several different databases simultaneously, for instance, to share load and distribute jobs safely to only those workers who could, in turn, demand restricted access. This makes TheSchwartz very decentralized.
If your setup is reasonably simple, for instance, a webapp, e.g. Dancer2, and TheSchwartz as a worker system executing long running tasks which would disrupt the webapp, then perhaps you only use one database. In that case, you can consider using the same database handle in both webapp and TheSchwartz. If you use database transactions to ensure an atomized commit, you can involve TheSchwartz::JobScheduler in the same transaction. If your transaction fails after worker task is inserted, then also the worker task gets cancelled (rollbacked).
If, however, your TheSchwartz system is complex or otherwise separate from the systems which create the tasks, or you simply use more than one database in TheSchwartz, you cannot share your other database handles with TheSchwartz::JobScheduler. Scheduler might need to access all databases in sequence to place the task in the right one. Besides this, TheSchwartz::JobScheduler is prepared for the possibility of one or more databases being off-line. It loops through all the databases until it gets a working database handle.
Database handles are provided by the calling program. This allows the caller to use any available system to provide the handles. If TheSchwartz::JobScheduler receives an undef
instead of a database handle, it tries the next database. If there is no working database handles, it croaks.
Database configuration does not need database addresses, dns:s or usernames and passwords. Because TheSchwartz::JobScheduler gets the database handle from outside, it only needs to know a database id to separate between databases and possibly a prefix for each database. Prefix is prepended to every database table and sequence name. If your database uses a different schema than the default one for TheSchwartz tables, use prefix
to solve this.
my %dbs = (
db_1 => [ 'dbi:SQLite:...', undef, undef, {} ],
db_2 => [ 'dbi:SQLite:...', undef, undef, {} ],
);
sub get_dbh {
my ($id) = @_;
my @connection_info = @{ $dbs->{ $id } };
return DBI->connect( @connection_info );
};
my %databases = (
db_1 => { prefix => 'theschwartz_schema.', dbh_callback => \&get_dbh, },
db_2 => { prefix => 'another_schema.', dbh_callback => \&get_dbh, },
);
use TheSchwartz::JobScheduler;
my $scheduler = TheSchwartz::JobScheduler->new(
\%databases, # databases
);
In the following example the calling program is using Database::ManagedHandle, a module which makes certain that a database handle is always usable.
# First create a Database::ManagedHandle config class
# See Database::ManagedHandle for instructions
# Then just use it:
my %databases = (
db_1 => {
prefix => 'theschwartz_schema.',
dbh_callback => 'Database::ManagedHandle->instance',
},
db_2 => {
{
prefix => 'another_schema.',
dbh_callback => 'Database::ManagedHandle->instance',
},
);
use TheSchwartz::JobScheduler;
my $scheduler = TheSchwartz::JobScheduler->new(
\%databases, # databases
);
DBH Callback
The item dbh_callback
can be either a CODE reference, i.e. a subroutine, or a string which when executed with eval
will produce an object. This object must have at least one method: dbh()
. This method, when called, must return either a DBI::db
object (such as created by DBI->connect
, or an undef
.
Uniqkey
The uniqkey
field is an arbitrary string identifier used to prevent applications from posting duplicate jobs. At most one with the same uniqkey value can be posted to a single TheSchwartz database.
There are, however, valid situations when inserting the same job and uniqkey would make sense. For instance, in a case when several different actions one after another but independent of each other would result in the same job being required to run.
Note, the job arguments do not enter into the uniqueness consideration, only job name and unique key (funcid
and uniqkey
fields).
Depending on the database and whether uniqueness is protected with database constraints, such as primary keys, trying to insert another job with the same uniqkey
can cause an error, the previous row being rewritten with new content and new arguments, or another row being created.
User can choose how to deal with this situation. When instantiating TheSchwartz::JobScheduler
, user can define the additional option handle_uniqkey
with any of the following values:
- no_check
-
This option does not do any checking on the condition. If the database is configured to not allow an insert operation, it will throw an exception. User must be prepared for this, for instance, by enclosing the operation in
eval
.This is the default setting.
- overwrite
-
Update the fields
arg
,insert_time
,run_after
,grabbed_until
,priority
andcoalesce
, and return the existing entry'sjobid
. This setting will create a slight overhead.Not yet implemented.
- acknowledge
-
If there is already a matching entry (
funcid
anduniqkey
fields), no change will be made. Thejobid
of the existing entry will be returned. This setting will create a slight overhead.
N.B. This option is used only when TheSchwartz::JobScheduler::Job has set the field uniqkey
. If you don't use uniqkey, this problem will never arise.
N.B.2. Using either c<overwrite or acknowledge
is the recommended value. Only in situations which require extreme throughput, should you consider other alternatives for this problem.>
# Depending on the database table settings,
# this will either throw an exception or
# it will pass and result with invalid table data.
my $scheduler = TheSchwartz::JobScheduler->new(
databases => \@databases,
opts => {
handle_uniqkey => 'no_check',
},
);
my $job = TheSchwartz::JobScheduler::Job->new(
funcname => 'Test::uniqkey',
arg => { an_item => 'value A' },
uniqkey => 'UNIQUE_STR_A',
);
$scheduler->insert( $job );
$job = TheSchwartz::JobScheduler::Job->new(
funcname => 'Test::uniqkey',
arg => { an_item => 'value B' },
uniqkey => 'UNIQUE_STR_A',
);
$scheduler->insert( $job );
Logging
TheSchwartz::JobScheduler uses the excellent Log::Any to produce logging messages.
The easiest way to get the logging messages printed is to add the following line in the preamble of your program:
use Log::Any::Adapter ('Stdout', log_level => 'debug' );
Alternative, you can do this on the command line:
perl '-MLog::Any::Adapter(Stdout, log_level=>trace)'
databases
The databases used by TheSchwartz.
Please see above "Configuration: Databases and Their Handles".
opts
Additional options for controlling other features, including uniqkey.
Please see above Uniqkey.
Example:
my $scheduler = TheSchwartz::JobScheduler->new(
databases => \@databases,
opts => {
handle_uniqkey => 'no_check',
},
);
insert
funcname_to_id
Fetch function id from database. If not exists, then insert.
list_jobs
Return a list of active jobs collected from all accessible databases.
Parameters: A hash containing named parameters.
my @jobs = $client->list_jobs({ funcname => 'fetch_webpage'});
THANKS
This module is very much inspired by TheSchwartz::Simple.
SEE ALSO
AUTHOR
Mikko Koivunalho <mikkoi@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2023 by Mikko Koivunalho.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.