Why not adopt me?
NAME
Stream::Aggregate - generate aggregate information from a stream of data
SYNOPSIS
use Stream::Aggregate;
my $af = generate_aggregation_func(
$agg_config,
$extra_parameters,
$user_extra_parameters);
while ($log = ???) {
@stats = $af->($log);
}
@stats = $af->(undef);
DESCRIPTION
Stream::Aggregate is a general-purpose aggregation module that will aggregate from a stream of perl objects. While it was written specifically for log processing, it can be used for other things too.
Aggregation has two key elements: how you group things and what you aggregate. This module understands two different ways to group things: nested and cross-product.
Nested groupings come from processing sorted input: if you have three fields you are considering your context, the order in which the data is sorted must match the order in which these fields make up your context. If you want to count things by URL, then you must sort your input by URL.
Cross-product groupings come from processing unsorted input. Each combination of values of the fields that make up your context is another context. This can lead to memory exhaustion so you must specify the maximum number of values for each of the fields.
Nested groupings
Nested groups are most easily illustrated with a simple example: aggregating by year, month, and day. The input data must be sorted by year, month, and day. The current context is defined by the triplet: (year, month, day). That triplet must be returned by the context
code. It is stored in the @current_context
array. When a context is finished, it must be converted into a hash by context2columns
.
Doing it this way, you can, for example, get the average of some data item per day, per month, and per year in one pass though your data.
Cross-Product grouping
Cross Product grouping does not depend on the sort order of the input and can have many contexts active at the same time.
For example, if you're aggregating sales figures for shoes and want statistics for the combinations of size, width, and color there isn't a sort or nesting order that will answer your questions.
Use crossproduct
to limit yourself to a certain number of values for each variable (say 10 sizes, 3 widths, and 5 colors).
API
The configuration for Stream::Aggregate is compiled into a perl function which is then called once for each input object. Each time it is called, it may produce one or more aggregate objects as output. When there is no more input data, call the aggregation function with undef
as its input.
The generate-the-function routine, generate_aggregation_func
takes three parameters. The first is the configuration object (defined below). The configuration object is expected (but not required) to come from a YAML file. All examples are in YAML format. The second and third arguments provide extra information. Currently they are only used to get a description of what this aggregation is trying to do using the name
field. Eg:
generate_aggregation_func($agg_config, $extra, $user_extra);
my $code = qq{#line 1 "FAKE-all-code-for-$extra->{name}"\n};
The configuration object for Stream::Aggregate is expected to be read from a YAML file but it does not have to be created that way.
For some of the code fields (below), marked as Closure/Config, you can provide a closure instead of code. To do that, have a BEGIN
block assign a value (the closure) to the variable $coderef
. If you do this, code outside the BEGIN
block will only be compiled but will never be run. When evalutating the BEGIN block, the variable $agg_config
will be set to the value of key_config (assuming the field was key).
The behavior of generate_aggregation_func
in array context may change in the future to provide additional return values.
CONTEXT-RELATED CONFIGURATION PARAMETERS
As the aggregator runs over the input, it needs to know the boundries of the contexts so that it knows when to generate an aggregation result record.
For example, if you were aggretgating information about URL with nested groupings, you need to sort your input by URL and you need to define a context
that returns the URL (in YAML format, with $log
as your input variable):
context: |
return ($log->{url})
If you want to aggregate over both the URL and the web host, the context
must return an array: host & URL (in YAML format):
context: |
$log->{url} =~ m{(?:https?|ftp)://([^/:]+)}
my $host = $1;
return ($host, $log->{url})
When the context is has multiple levels like that, there will be a resultant aggregation record for each value at each level.
- context
-
Code, Optional. Given a
$log
entry, return an array that describes the aggregation context. For example, for a URL, this array might be: domain name; host name (if different from domain name); each component of the path of the URL except the final filename. As Aggregate runs, it will generate an aggregation record for each element of the array.This code will be invoked on every input record.
This is not required for cross-product aggregations but is required for nested aggregations.
- context2columns
-
Code, Optional. Given a context, in
@current_context
, return additional key/value pairs for the resulting aggregation record. This is how the context gets described in the aggregation results records.This code will be invoked to generate resultant values just before a context is closed.
If this code sets the variable
$suppress_result
, then this aggregation result will be discarded. - stringify_context
-
Code, Optional.
If the new context array returned by the
context
code (soon to become@current_context
) is not an array of strings but rather an array of references, it will be turned into strings using YAML. These strings may not matter because you control how the context manifests in the result records withcontext2columns
.If this isn't what you want, use
stringify_context
to do something different. Unlike most of the other functions,stringify_context
operates on@_
.This will be invoked for every input record.
- crossproduct
-
Hash, Name->Number, Optional.
For crossproduct groupings, this defines the dimensions. The keys are the variables. The values are the maximum number of values for each variable to track.
The keys must be
ephemeral0
,ephemeral
, orephemeral2
column names. - combinations
-
Hash, Name->Code, Optional.
If you have crossproduct groupings, do you also want to synthesize contexts that exclude some or all of the crossproduct dimensions?
For each dimension, provide code that that answers the question: if you remove this dimension from the crossproduct, should this new context be considered? This code can look at
$row
to see what other dimensions are active for this potential context. To keep this context, the code must evaluate to a true value.Combinations are evaluated in alphabetical order. If there is more than one path to a combination, only the first path will be considered. For example, if you have three crossproduct dimensions,
a
,b
andc
then the combinations are (in the order the'll be considered):b
andc
, excludinga
a
andc
, excludingb
a
andb
, excludingc
-
These have no dependency and will be produced if the combinations code returns a true value.
c
, excludinga
andb
-
This possibility will only be explored coming from
b
andc
. If the combination rule fora
rejected the combination, then thec
-only permuation will never be reached. b
, excludinga
andc
-
This possibility will only be explored coming from
b
andc
. If the combination rule fora
rejected the combination, then theb
-only permuation will never be reached. a
, excludingb
andc
-
This possibility will only be explored coming from
a
andc
. If the combination rule forb
rejected the combination, then thea
-only permuation will never be reached. - excluding
a
,b
andc
-
This possibility will only be explored coming from
c
. If the combination rule fora
orc
rejected the combination, then the empty permuation will never be reached.
Using combinations can greatly expand your memory requirements. More than four dimensions of combinations is probably a bad idea.
If you want skip a dimension entirely, leave it out of the combinations key rather than adding key that evaluates to false. A key the evaluates to false will still have to be tested many times whereas a missing key won't have any code generated for it.
- simplify
-
Hash, Name->Code, Optional.
When a cross-product key is exceeding its quota of values, the default replacement value is
*
. This hash allows you to override the code that chooses the new value. - finalize_result
-
Code, Optional, Closure/Config.
This code will be called after the resultant values for a context have been calculated. It is a last-chance to modify them or to suppress the results. The values can be found as a reference to a hash:
$row
. To suppress the results, set$suppress_results
. - new_context
-
Code, Optional, Closure/Config.
This code will be called each time there is a new context. At the time it is called,
$ps
is a reference to the new context, but@current_context
will not yet have been updated to the new value. - merge
-
Code, Optional, Closure/Config.
When using multiple levels of contexts, data is counted for the top-most context layer only. When that top-most layer finishes, the counts are merged into the next more-general layer.
During the merge there is both
$ps
and$oldps
available to for code to reference. The default merge handles all of the pre-defined column types. If you are using$ps->{heap}
storage for context data, you need to merge that data from$oldps
to$ps
yourself.
CONTROL FLOW CONFIGURATION
- filter
-
Code, Optional. Before any of the columns are calculated or any of the values saved, run this filter code. If it returns a true value then proceed as normal. If it returns a false value, then do not count any of the values. The filter code can remember things in
$ps-
{heap}> that might effect how other things are counted.In some situations, you many want to throw away most data and count things in the filter. When doing that, it may be that all of the columns come from
output
.This may be redesigned in a future release in a way that is not backwards compatible.
- filter_early
-
Boolean, Optional, default
false
. Check the filter early before figuring out contexts? If so, and the result is filtered, don't check to see if the context changed. - passthrough
-
Code, Optional. Add results to the output of the aggregation. A value of
$log
(assuming that's your input record variable) adds the input data stream to the output stream. The passthrough code is run before the input line is counted.
PARAMETERS CONFIGURATION
- max_stats_to_keep
-
Number, Optional, default: 4000.
When aggregating large amounts of data, limit memory use by throwing away some of the data. When data is thrown away, keep this number of samples for statistics functions that need bulk data like standard_deviation.
- reduce
-
Code, Optional, Closure/Config.
When
max_stats_to_keep
is exceeded, data will be thrown away. This function will be called when that has happened. - preprocess
-
Code, Optional. Code to preprocess the input
$log
objects. - item_name
-
String, Optional, default:
$log
. The default name for the input values to aggregate over is$log
. If this name is not appropriate, you can useitem_name
to change the variable name of the input values to something else. Include the dollar sign ($
) in the name. - debug
-
Boolean, Optional. Print out some debugging information, including the code that is generated for building the columns.
- strict
-
Boolean, Optional, default: false. Enforce strict and warnings for user code.
AGGREGATE OUTPUT COLUMNS CONFIGURATION
Each of these (except ephemeral
& keep
) defines additional columns of output that will be included in each aggregation record. These are all optional and all are defined as key/value pairs where the keys are column names and the values are perl code. You can refer to other columns using the variable $column_column_name
where column_name is the name of one of the other columns. When refering to other columns, the order in which columns are processed matters: ephemeral
and keep
are processed first and second respecively. Idential code fragments will be evaluated only once. Within a group, columns are evaluated alphabetically.
Some of the columns will have their code evaluated per-item and some are evaluated per-aggregation.
The input data is in $log
unless overriden by item_name
.
Per item callbacks
- ephemeral
-
These columns will not be included in the aggregation data. Refer to them as
$column_column_name
. If you are using ephemeral to declare the column but do not want to assign it a value, set the value for the ephemeral code to be undef. In YAML, thats~
:ephemeral: var1: ~ var2: ~
- ephemeral0
-
Same as
ephemeral
, will be evaluated beforeephemeral
. - ephemeral2
-
Same as
ephemeral
, will be evaluated afterephemeral
. - counter
-
Keep a counter. Add one if the code returns true.
- percentage
-
Keep a counter. Include the percentage of items for which the code returned true as an output column as opposed to the number of items where the code returned
0
. A return value ofundef
does not get counted at all. - sum
-
Keep an accumulator. Add the return values.
- mean
-
Keep an accumulator. Add the return values. Divide by the number of items before inserting into the results. Items whose value is
undef
do not count towards the number of items or the sum. - standard_deviation
-
Remeber the return values. Compute the standard deviation of the accumulated return values and insert that into the results. Items whose value is
undef
are removed before calculating the standard_deviation. - median
-
Remeber the return values. Compute the median of the accumulated return values and insert that into the results. Items whose value is
undef
are removed before calculating the median. - dominant
-
Remeber the return values. Compute the mode (most frequent) of the accumulated return values and insert that into the results. Items whose value is
undef
are removed before calculating the mode. - min
-
Keep a minimum numeric value. Replace it with the return value if the return value is less than the current value. Items whose value is
undef
are removed before calculating the min. - max
-
Keep a maximum numeric value. Replace it with the return value if the return value is greater than the current value. Items whose value is
undef
are removed before calculating the max. - minstr
-
Keep a minimum string value. Replace it with the return value if the return value is less than the current value. Items whose value is
undef
are removed before calculating the minstr. - maxstr
-
Keep a maximum string value. Replace it with the return value if the return value is greater than the current value. Items whose value is
undef
are removed before calculating the maxstr. - keep
-
Remember the return values. The return values are available at aggregation time as
@{$ps->{keep}{column_name}}
. Items whose value isundef
are kept but they're ignored by Stream::Aggregate::Stats functions.
Per aggregation result record callbacks
For code that is per-aggregation, the saved aggregation state can be found in $ps
. One item that is probably needed is $ps->{item_count}
.
- output
-
Extra columns to include in the output. This is where to save
$ps->{item_count}
. - stat
-
Use arbitrary perl code to compute statistics on remembered return values kept with
keep
. Write your own function or use any of the functions in Stream::Aggregate::Stats (the global variable is pre-loaded). No, there isn't any difference between this andoutput
.
VARIALBES AVAILABLE FOR CODE SNIPPETS TO USE
The following variables are available for the code that generates per-item and per-aggregation statistics:
- $log
-
The current input item (unless overridden by
item_name
) - $ps->{keep}{column_name}
-
An array of return values kept by
keep
. - $ps->{numeric}{column_name}
-
If Stream::Aggregate::Stats functions are called, they will grab the numeric values from
$ps->{keep}{column_name}
and store them in$ps->{numeric}{column_name}
. - $ps->{random}
-
For each kept item in
$ps->{keep}{column_name}
, there is a corrosponding item in $ps->{random} that is a random number. These random numbers are used to determine which values to keep and which values to toss if there are too many values to keep them all. - $ps->{$column_type}{column_name}
-
For each type of column (output, counter, percentage, sum, min, standard_deviation, median, stat) the values that will be part of the final aggregation record.
- $ps->{$tempoary_type}{column_name}
-
Some columns need temporary storage for their values: percentage_counter (the counter used by percentage); percentage_total (the number of total items); mean_sum (the sum used to compute the mean); mean_count (the number of items for the mean).
- $ps->{heap}
-
A hash that can be used by the configured perl code for whatever it wants.
- $ps->{item_counter}
-
The count of items.
- $agg_config
-
The configuration object for Stream::Aggregate
- $itemref
-
A reference to
$log
. It's always$itemref
even if$log
is something else. - @current_context
-
The current context as returned by
context
. - @context_strings
-
The string-ified version
@current_context
as returned bystringify_context
or YAML. - @contexts
-
The array of context objects.
$ps
is always$context[-1]
. - @items_seen
-
An array that counts the number of rows of output from this aggregation. When the context is multi-level, the counter is multi-level. For example, if the context is domain, host, and URL; then
$items_seen[0]
is the number of domains (so far), and$items_seen[1]
is the number of hosts for this domain (so far), and$items_seen[2]
is the number of URLs for this host (so far).Passthrough rows do not count.
XXX what about cross-product aggregations?
- $row
-
When gathering results, the variable that holds them is a reference to a hash:
$row
. - $suppress_result
-
After gathering results, the
$suppress_result
variable is examined. If it's set the results (in$row
) are discards.To skip results that aren't crossproduct results, in
finalize_result
, set$suppress_result
if$cross_count
isn't true. - $cross_count
-
The number of currently active crossproduct accumulator contexts.
- $extra, $user_extra
-
The additional paramerts (beyond
$agg_config
) that were passed togenerate_aggregation_func()
. - %persist
-
This hash is not used by Stream::Aggregate. It's available for any supplied code to use however it wants.
- $last_item
-
A refernece to the previous
$log
object. This is valid duringfinalize_result
andcontext2columns
.
There are more. Read the code.
HELPER FUNCTIONS
The following helper functions are available: everything in Stream::Aggregate::Stats and:
- nonblank($value)
-
Returns $value if $value is defined and not the empty string. Returns undef otherwise.
EXAMPLE1
This example will look at a set of records regarding health risks. Each record represents a person:
name<TAB>birthday<TAB>sex<TAB>number of hospital visits in the last year
We will generate the following aggregation records:
Average age of entire sample
Median number of hospital visits in the last year for the entire sample.
Median number of hospital visits in the last year by sex.
Median number of hospital visits in the last year by age, with a minimum sample size of five.
Median number of hospital vists in the last year by age and sex with a minimum sample size of five.
The code:
#!/usr/bin/perl
#
# Our input data is the raw strings from the input file. Most of the
# work is parsing them and reformatting the data.
#
use strict;
use warnings;
use Stream::Aggregate;
use YAML;
my $aconfig = Load(<<'END_ACONFIG');
strict: 1
debug: 0
item_name: $record
max_stats_to_keep: 500
filter_early: 1
filter: |
# ignore black lines and comments
return 0 if $record =~ /^#/;
return 0 if $record =~ /^$/;
return 1;
crossproduct:
sex: 3
age: 150
combinations:
sex: 1
age: 1
ephemeral0:
#
# We are using ephemeral0 to declare the column variables
#
name: ~
birthday: ~
gender: ~
number_of_visits: ~
ephemeral:
#
# We are using a fake column ($column_step1) in ephemeral to initialize
# the raw column variables we declared in ephemeral0
#
step1: |
chomp($record);
($column_name, $column_birthday, $column_gender, $column_number_of_visits) = split(/\t/, $record);
ephemeral2:
#
# We are using ephemeral2 to generate the computed input data
#
age: |
use Time::ParseDate qw(parsedate);
my $t = parsedate($column_birthday, NO_RELATIVE => 1, DATE_REQUIRED => 1, WHOLE => 1, GMT => 1);
return undef unless $t;
return int ((parsedate('2011-05-01', GMT => 1) - $t) / (365.24 * 86400))
sex: |
return 'M' if $column_gender =~ /^m/i;
return 'F' if $column_gender =~ /^f/i;
return undef;
hospital_visits: |
$column_number_of_visits =~ /^(\d+)$/;
$1
output:
sample_size: $ps->{item_counter}
median:
avg_hospital_visits: $column_hospital_visits
mean:
avg_age: $column_age
finalize_result: |
#
# Don't generate result records unless there are at
# least five items being aggregated.
#
$suppress_result = 1 if $ps->{item_counter} < 5;
END_ACONFIG
my $ag = generate_aggregation_func($aconfig, {
name => 'Aggregate Hospital Visits',
});
my @results;
while (<>) {
for my $result ($ag->($_)) {
# do something with the result records
}
}
for my $result ($ag->(undef)) {
# do something with the result records
}
EXAMPLE2
Our example will count the following things: number of unique URLs per domain, average length of the URL.
#!/usr/bin/perl
use strict;
use warnings;
use Stream::Aggregate;
use YAML;
my $aconfig = Load(<<'END_ACONFIG');
strict: 1
debug: 0
item_name: $item
context: $item->{domain}
context2columns: return (domain => $current_context[0])
ephemeral:
#
# The only persistent unstructured place to store data from
# one row to the next is $ps->{heap}. $ps->{heap}
# is per-context, but that's okay for our usage.
#
is_different: |
my $old = $ps->{heap}{last_item};
$ps->{heap}{last_item} = $item;
return 1 unless $old;
return 0 if $old->{url} eq $item->{url};
return 1;
sum:
unique_urls: $column_is_different
mean:
avg_url_length: length($item->{url})
finalize_result: |
# we don't want the roll-up context of all domains
$suppress_result = 1 unless $row->{domain};
END_ACONFIG
my $ag = generate_aggregation_func($aconfig, {
name => 'Aggregate URL data'
});
while(<>) {
# we'll parse the input here
my $item;
chomp;
next if /^$/;
next if /^#/;
die "'$_'" unless m{^\w+:\/\/([^/]+)(?:/.*)?};
$item = {
domain => $1,
url => $_,
};
for my $result ($ag->($item)) {
# do something with the result records
}
}
for my $result ($ag->(undef)) {
# do something with the results records
}
LICENSE
Copyright (C) 2008-2010 David Sharnoff; Copyright (C) 2011 Google, Inc.
This package may be used and redistributed under the terms of either the Artistic 2.0 or LGPL 2.1 license.