NAME

TaskPipe::Manual::Overview - An overview of the TaskPipe framework

WHAT IS TASKPIPE?

TaskPipe is a framework for managing scraping projects. TaskPipe aims to simplify the transfer of a web database, which may be spread across many interlinking web pages, directly to a local, cross-referenced SQL database.

WHY TASKPIPE?

Suppose your friend Bob wants a copy of the simple online database at example.com which just has a "list" page, and each item in the list has a link to a "detail" page.

So to get that database you: - create a module called Example::Com::List that grabs the list page - create a module called Example::Com::Detail that grabs.. guess what? the detail page!

Great. Then your friend Bill wants a copy of a different online db at example.net, which is is similarly structured. So you write - Example::Net::List - Example::Net::Detail but sigh in exasperation as you feel a strange sense of deja vu.

The problem is, although the jobs are structurally similar, each is coded quite specifically. Example::Com::List looks specifically for the module Example::Com::Detail to send links to.

Wouldn't it be better to have a module which scrapes a general list page and feeds that to a general detail page? Then the specifics could be coded on top in each case.

So now we are on the way to building TaskPipe.

Indeed, this is basically how the project started. Basically I built many scrapers, and found myself wondering if there wasn't a more flexible and modular way which avoided reinventing the wheel each time. Does TaskPipe achieve this? It's early days, but I think it does look promising.

WHAT IS TASKPIPE SUITABLE FOR?

Right now it's probably best for feeding a (scraped) web database into SQL. In fact the core of TaskPipe is really a task execution system (hence the name) and could potentially be used in applications other than web data gathering - however it is probably going to work best with:

  • Multi-threaded processes where the threads do not rejoin. ie the parent kicks off children which perform task(s) and then terminate without ever returning to the caller.

  • Workflows where the execution time of individual tasks is relatively long (on the order of milliseconds up to seconds, or longer). TaskPipe is built with Moose and DBIx::Class, both of which involve a performance penalty.

Additionally, a number of web related tools are built around the core functionality (e.g. the inclusion of TOR, PhantomJS etc.) with the presumption this is how it is going to be used. If you wanted to use it for something else, you'd need to write your own tasks.

Finally TaskPipe may also be good for building crawlers (?) I have not yet tried to do this, and I imagine some code adjustments may be needed before this is really feasible. So perhaps not with this version, but coming soon! (Perhaps!)

Features

The core of TaskPipe has been expanded with web data gathering in mind, so that it offers the following features:

  • Caching to minimise the need to repeat requests, and allow the process to resume quickly in the case of premature termination

  • Thread management designed to ensure a consistent number of threads remain active regardless of the workflow. E.g. if a 10 thread limit is specified and 4 threads are required on a given task, then the remaining 6 threads automatically cascade to the next or adjacent tasks.

    Quite a bit of work has been done on improving the resume and threading functionality in version 0.06, together with combating (or even eliminating?) memory leaks in long running TaskPipe processes. Hopefully version 0.06 does these things considerably better than earlier versions.

  • Choice of non-rendering or rendering useragent: Currently these are Perl's native LWP (non-rendering) or PhantomJS (rendering). Switching between useragent just requires a one line change in the project config.

  • Choice of proxy net system: Currently these are TOR and Open Proxy. The TOR UserAgent Manager automatically launches TOR instances on separate ports for each designated thread. The Open Proxy system runs as a daemon gathering IPs from Open Proxy lists and testing them. Switching between proxy systems, or switching between proxying and non-proxying requires only a one line change in the project config.

    (As of version 0.06 the TOR functionality has been worked on considerably. The open proxy system remains much more of a work in progress.)

  • A command line tool (TaskPipe Tool) is intended to enable quick deployment of files and database tables, simplifying the creation of fresh projects

So TaskPipe is a framework, and as such is intended to be extensible. Custom tasks, IP lists and even TaskPipe Tool commands can (potentially) be added. (But maybe I need to create more documentation first...)

HOW READY IS TASKPIPE?

Version 0.06 is looking considerably more robust than version 0.05. Actually I think this now can be used to build practical applications. But it is rough round the edges, and still needs a lot of work.

Some notes specifically related to improvements made in version 0.06:

Caching and Resume

At first I built a caching system which, in it's original form, was definitely too complex. I found in practical situations - particularly when using a fully developed project in production - that caching was more of a nuisance than anything else. In this situation I really only wanted a speedy resume, which got all threads going quickly and where possible did not repeat requests.

In fact this turned out to be quite an engineering challenge in itself due to having threads distributed over a tree of tasks. Large jobs where the precise number of items to be extracted is not known also presents a difficulty (e.g grabbing links from an e-commerce site search box).

A 3 pronged approach seems to achieve a resume system with a tolerance of 1 'lost' operation per thread. ie when resuming each thread starts either exactly where it left off, or at worst it repeats the last request. (I think this is as good as it can be, because there is always going to be a gap between making a request and recording the fact you did that successfully)

The best way I could find to achieve this is via a holographic approach involving 3 separate tracking mechanisms: 1. keep a snapshot of which branches each thread is assigned to 2. persist loop counters (or last pointer positions) for each task 3. cache executed tasks in semi-executed branches

This method now seems quite robust, and boasts an in-built safety advantage: in a really long running process, involving hundreds of threads mopping up millions of records, the last thing you need is to lose track of where you were. With the resume system in v0.06, losing any one tracking mechanism (e.g. if you deleted the threads snapshot) will generally set you back a few records, but nothing that a few minutes of execution won't recapture. (In fact, I believe you could lose 2 of the 3 and still recover fairly quickly, as long as one of those was item 3.)

However, there are still some kinks that need to be ironed out. One is deciding when the process has finished. With every thread pottering off unsupervised and doing its own thing, it becomes surprisingly difficult to tell the difference between e.g. a pause in throughput and complete termination. Currently the system decides the process has terminated when all threads go into an 'available' state. This ought to be correct, because if all threads are 'available', none can dish out further jobs, and the process has to terminate anyway. In practice there are occasions when all threads go into 'available' state, but the process is still a long way of completion. Why? This needs to be solved. Deleting the thread snapshot (item 1) and starting the process again gets past this roadblock - but a more permanent solution (and indeed explanation) is definitely needed.

The original general caching is now turned off by default. It may be desirable to turn it on for specific tasks during the development phase - but exactly how this should work to provide the best benefit has not been worked out.

TaskPipe Tool Commands

  • The number of commands TaskPipe Tool offers at the current stage is minimal. There are definitely a number of fairly obvious commands which would make project management easier. Again, these are being worked on.

  • More and better of pretty much everything is needed: more job manager code, better logging options and messages, more IP lists, better proxy management more individual TaskPipe components (tasks, sample plans, iterators etc)...

  • As mentioned, exhaustive testing has definitely not taken place. A test framework is on the drawing board, and no doubt will uncover some hidden gems. Testing on different platforms also needs to take place. As usual, use at your own risk.

  • There's a lot in TaskPipe and this early documentation is probably going to leave you scratching your head. Sorry about that. A tutorial is planned but won't happen overnight.

SHOW ME THE BASICS?

In TaskPipe you specify a plan, which is a yaml file:

# plan.yml:

---
-   _name: Scrape_Companies
    url: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies
    headers:
        Referer: https://www.google.com

-   _name: Scrape_Quote
    url: $this
    headers:
        Referer: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies

-   _name: Record
    table: Company
    values:
        quote: $this
        ticker: $this[1]
        url: $this[1]
        name: $this[1]
        sector: $this[1]
        industry: $this[1]
        address: $this[1]
        date_added: $this[1]
        cik: $this[1]

In this example our plan is to scrape the list of S&P 500 companies from wikipedia, and gather a stock quote for each company, then insert the completed company information (including the quote) into the database. Our plan has 3 tasks:

  1. Scrape the list from Wikipedia

  2. Scrape the Quote from the URL which was grabbed from each wikipedia list item

  3. Record the full record on the database

Task Definition

Here's our plan again:

# plan.yml:

---
-   _name: Scrape_Companies
    url: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies
    headers:
        Referer: https://www.google.com

-   _name: Scrape_Quote
    url: $this
    headers:
        Referer: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies

-   _name: Record
    table: Company
    values:
        quote: $this
        ticker: $this[1]
        url: $this[1]
        name: $this[1]
        sector: $this[1]
        industry: $this[1]
        address: $this[1]
        date_added: $this[1]
        cik: $this[1]
        

How do we define Scrape_Companies? Scrape_Companies is the name of a TaskPipe task. This means a module needs to be created with the name TaskPipe::Task_Scrape_Companies (ie the module name will be in the format TaskPipe::Task_(task name)). Here's our module:

package TaskPipe::Task_Scrape_Companies;

use Moose;
use Web::Scraper;
extends 'TaskPipe::Task_Scrape';

has ws => (is => 'ro', isa => 'Web::Scraper', default => sub {
    scraper {
        process_first 'table.wikitable', 'table' => scraper {
            process 'tr + tr', 'tr[]' => scraper {
                process_first 'td:nth-child(1) a', 'ticker' => 'TEXT';
                process_first 'td:nth-child(1) a', 'url' => '@href';
                process_first 'td:nth-child(2) a', 'name' => 'TEXT';
                process_first 'td:nth-child(4)', 'sector' => 'TEXT';
                process_first 'td:nth-child(5)', 'industry' => 'TEXT';
                process_first 'td:nth-child(6)', 'address' => 'TEXT';
                process_first 'td:nth-child(7)', 'date_added' => 'TEXT';
                process_first 'td:nth-child(8)', 'cik' => 'TEXT';
            };
            result 'tr';
        };
        result 'table';
    };
});

1;

This just has a single ws attribute which is a Web::Scraper - and as such this is pretty much the simplest form a scraping task can take. The ws itself looks a bit more complex, but I will leave the explanation of how Web::Scraper works to the Web::Scraper documentation.

Task Output

A dump of the output from the task above might look something like

[   {
        ticker => 'MMM',
        url => 'https://www.nyse.com/quote/XNYS:MMM',
        name => '3M Company',
        sector => 'Industrials',
        industry => 'Industrial Conglomerates',
        address => 'St. Paul, Minnesota',
        date => '',
        cik => '0000066740'

    },

    {
        ticker => 'ABT',
        url => 'https://www.nyse.com/quote/XNYS:ABT',
        name => 'Abbot Laboratories',
        sector => 'Health Care',
        industry => 'Health Care Equipment',
        address => 'North Chicago, Illinois',
        date => '1964-03-31',
        cik => '0000001800'
    }

    # ...

]

ie our output from the task is an arrayref of result set hashes.

Task Input/Output Format

In TaskPipe each task accepts a hashref input of variables, performs an operation (the task itself) and produces a list of results. Thus it is, in general, a one to many operation. A general example is where our task scrapes a list of data - such as the wikipedia list of S&P500 companies. We have 1 set of inputs (the wikipedia URL and the Referer header), and we produce a list of companies.

In some cases our task expects a single set of inputs and delivers a single set of outputs. This is true when we scrape a detail page. So in the second task, where we are scraping the stock quote, we have one input (the URL of the quote for a given company) and one output (the quote for the company). However, we still expect to produce a list (ie an arrayref) as the output - it is just a list containing only one element in this case.

tasks vs xtasks, branches vs xbranches, trees vs xtrees...

In our example we had a single branch, ie a completely linear order of operations. However, you'll note that the first task is executed only once (against the single wikipedia page which lists the S&P500 companies), but the second task Scrape_Quote is executed many times (one for each S&P500 company). If we draw the structure of executed tasks (which we will denote as xtasks) then this looks more like a tree (an xtree!) than a single branch:

            (Scrape_Companies)
        wikipedia S&P500 companies list page
                    |
                    |
       ------------------------------------------
       |                   |                |           
(Scrape_Quote)      (Scrape_Quote)      (Scrape_Quote)  ...
Company: MMM        Company: ABT        Company: ABBV

So in "task space" we are looking at a single branch. But in "xtask space" we are looking at an "xtree". An appreciation of tasks vs. xtasks and branches vs. xbranches is important for understanding and getting the most from caching. However, I will skip the details of this for now.

Remember, TaskPipe is handling threads. So if you have e.g. 10 threads specified as the maximum number of threads in your config file, TaskPipe will go off and grab up to 10 company quotes at the same time. In general you shouldn't need to worry about thread allocation: TaskPipe's ThreadManager module should ensure the threads you allocate are kept busy.

TaskPipe Tool

TaskPipe comes with a command line interface script taskpipe which can be used to deploy files, database tables and manage projects. Some example commands

  • prepare taskpipe for use on your system

    taskpipe setup
  • create file stubs for a new project

    taskpipe deploy files --project=myprojectname
  • deploy basic database tables for your new project

    taskpipe deploy tables --project=myprojectname
  • run your projects main plan

    taskpipe run plan --project=myprojectname

NOTE The above is a sample selection of commands, not an exhaustive list, and nor does it represent all the steps needed for setup. Run

taskpipe help

for a list of available commands. See TaskPipe::Manual::Installation for information on getting TaskPipe up and running.