Apache > Hadoop > Pig
 

Testing and Diagnostics

Diagnostic Operators

DESCRIBE

Returns the schema of a relation.

Syntax

DESCRIBE alias;       

Terms

alias

The name of a relation.

Usage

Use the DESCRIBE operator to view the schema of a relation. You can view outer relations as well as relations defined in a nested FOREACH statement.

Example

In this example a schema is specified using the AS clause. If all data conforms to the schema, Pig will use the assigned types.

A = LOAD 'student' AS (name:chararray, age:int, gpa:float);

B = FILTER A BY name matches 'J.+';

C = GROUP B BY name;

D = FOREACH B GENERATE COUNT(B.age);

DESCRIBE A;
A: {group, B: (name: chararray,age: int,gpa: float}

DESCRIBE B;
B: {group, B: (name: chararray,age: int,gpa: float}

DESCRIBE C;
C: {group, chararry,B: (name: chararray,age: int,gpa: float}

DESCRIBE D;
D: {long}

In this example no schema is specified. All fields default to type bytearray or long (see Data Types).

a = LOAD 'student';

b = FILTER a BY $0 matches 'J.+';

c = GROUP b BY $0;

d = FOREACH c GENERATE COUNT(b.$1);

DESCRIBE a;
Schema for a unknown.

DESCRIBE b;
2008-12-05 01:17:15,316 [main] WARN  org.apache.pig.PigServer - bytearray is implicitly cast to chararray under LORegexp Operator
Schema for b unknown.

DESCRIBE c;
2008-12-05 01:17:23,343 [main] WARN  org.apache.pig.PigServer - bytearray is implicitly caste to chararray under LORegexp Operator
c: {group: bytearray,b: {null}}

DESCRIBE d;
2008-12-05 03:04:30,076 [main] WARN  org.apache.pig.PigServer - bytearray is implicitly caste to chararray under LORegexp Operator
d: {long}

This example shows how to view the schema of a nested relation using the :: operator.

A = LOAD 'studentab10k' AS (name, age, gpa); 
B = GROUP A BY name; 
C = FOREACH B { 
     D = DISTINCT A.age; 
     GENERATE COUNT(D), group;} 

DESCRIBE C::D; 
D: {age: bytearray} 

DUMP

Dumps or displays results to screen.

Syntax

DUMP alias;       

Terms

alias

The name of a relation.

Usage

Use the DUMP operator to run (execute) Pig Latin statements and display the results to your screen. DUMP is meant for interactive mode; statements are executed immediately and the results are not saved (persisted). You can use DUMP as a debugging device to make sure that the results you are expecting are actually generated.

Note that production scripts SHOULD NOT use DUMP as it will disable multi-query optimizations and is likely to slow down execution (see Store vs. Dump).

Example

In this example a dump is performed after each statement.

A = LOAD 'student' AS (name:chararray, age:int, gpa:float);

DUMP A;
(John,18,4.0F)
(Mary,19,3.7F)
(Bill,20,3.9F)
(Joe,22,3.8F)
(Jill,20,4.0F)

B = FILTER A BY name matches 'J.+';

DUMP B;
(John,18,4.0F)
(Joe,22,3.8F)
(Jill,20,4.0F)

EXPLAIN

Displays execution plans.

Syntax

EXPLAIN [–script pigscript] [–out path] [–brief] [–dot] [–param param_name = param_value] [–param_file file_name] alias; 

Terms

–script

Use to specify a Pig script.

–out

Use to specify the output path (directory).

Will generate a logical_plan[.txt|.dot], physical_plan[.text|.dot], exec_plan[.text|.dot] file in the specified path.

Default (no path specified): Stdout

–brief

Does not expand nested plans (presenting a smaller graph for overview).

–dot

Text mode (default): multiple output (split) will be broken out in sections.

Dot mode: outputs a format that can be passed to the dot utility for graphical display – will generate a directed-acyclic-graph (DAG) of the plans in any supported format (.gif, .jpg ...).

–param param_name = param_value

See Parameter Substitution.

–param_file file_name

See Parameter Substitution.

alias

The name of a relation.

Usage

Use the EXPLAIN operator to review the logical, physical, and map reduce execution plans that are used to compute the specified relationship.

If no script is given:

  • The logical plan shows a pipeline of operators to be executed to build the relation. Type checking and backend-independent optimizations (such as applying filters early on) also apply.

  • The physical plan shows how the logical operators are translated to backend-specific physical operators. Some backend optimizations also apply.

  • The mapreduce plan shows how the physical operators are grouped into map reduce jobs.

If a script without an alias is specified, it will output the entire execution graph (logical, physical, or map reduce).

If a script with a alias is specified, it will output the plan for the given alias.

Example

In this example the EXPLAIN operator produces all three plans. (Note that only a portion of the output is shown in this example.)

A = LOAD 'student' AS (name:chararray, age:int, gpa:float);

B = GROUP A BY name;

C = FOREACH B GENERATE COUNT(A.age);

EXPLAIN C;
-----------------------------------------------
Logical Plan:
-----------------------------------------------
Store xxx-Fri Dec 05 19:42:29 UTC 2008-23 Schema: {long} Type: Unknown
|
|---ForEach xxx-Fri Dec 05 19:42:29 UTC 2008-15 Schema: {long} Type: bag
 etc ...  

-----------------------------------------------
Physical Plan:
-----------------------------------------------
Store(fakefile:org.apache.pig.builtin.PigStorage) - xxx-Fri Dec 05 19:42:29 UTC 2008-40
|
|---New For Each(false)[bag] - xxx-Fri Dec 05 19:42:29 UTC 2008-39
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT)[long] - xxx-Fri Dec 05 
 etc ...  

--------------------------------------------------
| Map Reduce Plan                               
-------------------------------------------------
MapReduce node xxx-Fri Dec 05 19:42:29 UTC 2008-41
Map Plan
Local Rearrange[tuple]{chararray}(false) - xxx-Fri Dec 05 19:42:29 UTC 2008-34
|   |
|   Project[chararray][0] - xxx-Fri Dec 05 19:42:29 UTC 2008-35
 etc ...  


ILLUSTRATE

Displays a step-by-step execution of a sequence of statements.

Syntax

ILLUSTRATE {alias | -script scriptfile}; 

Terms

alias

The name of a relation.

-script scriptfile

The script keyword followed by the name of a Pig script (for example, myscript.pig).

The script file should not contain an ILLUSTRATE statement.

Usage

Use the ILLUSTRATE operator to review how data is transformed through a sequence of Pig Latin statements. ILLUSTRATE allows you to test your programs on small datasets and get faster turnaround times.

ILLUSTRATE is based on an example generator (see Generating Example Data for Dataflow Programs). The algorithm works by retrieving a small sample of the input data and then propagating this data through the pipeline. However, some operators, such as JOIN and FILTER, can eliminate tuples from the data - and this could result in no data following through the pipeline. To address this issue, the algorithm will automatically generate example data, in near real-time. Thus, you might see data propagating through the pipeline that was not found in the original input data, but this data changes nothing and ensures that you will be able to examine the semantics of your Pig Latin statements.

As shown in the examples below, you can use ILLUSTRATE to review a relation or an entire Pig script.

Example - Relation

This example demonstrates how to use ILLUSTRATE with a relation. Note that the LOAD statement must include a schema (the AS clause).

grunt> visits = LOAD 'visits.txt' AS (user:chararray, url:chararray, timestamp:chararray);
grunt> DUMP visits;

(Amy,yahoo.com,19990421)
(Fred,harvard.edu,19991104)
(Amy,cnn.com,20070218)
(Frank,nba.com,20070305)
(Fred,berkeley.edu,20071204)
(Fred,stanford.edu,20071206)

grunt> recent_visits = FILTER visits BY timestamp >= '20071201';
grunt> user_visits = GROUP recent_visits BY user;
grunt> num_user_visits = FOREACH user_visits GENERATE group, COUNT(recent_visits);
grunt> DUMP num_user_visits;

(Fred,2)

grunt> ILLUSTRATE num_user_visits;
------------------------------------------------------------------------
| visits     | user: chararray | url: chararray | timestamp: chararray |
------------------------------------------------------------------------
|            | Fred            | berkeley.edu   | 20071204             |
|            | Fred            | stanford.edu   | 20071206             |
|            | Frank           | nba.com        | 20070305             |
------------------------------------------------------------------------
-------------------------------------------------------------------------------
| recent_visits     | user: chararray | url: chararray | timestamp: chararray |
-------------------------------------------------------------------------------
|                   | Fred            | berkeley.edu   | 20071204             |
|                   | Fred            | stanford.edu   | 20071206             |
-------------------------------------------------------------------------------
------------------------------------------------------------------------------------------------------------------
| user_visits     | group: chararray | recent_visits: bag({user: chararray,url: chararray,timestamp: chararray}) |
------------------------------------------------------------------------------------------------------------------
|                 | Fred             | {(Fred, berkeley.edu, 20071204), (Fred, stanford.edu, 20071206)}          |
------------------------------------------------------------------------------------------------------------------
--------------------------------------------------
| num_user_visits     | group: chararray | long  |
--------------------------------------------------
|                     | Fred             | 2     |
--------------------------------------------------

Example - Script

This example demonstrates how to use ILLUSTRATE with a Pig script. Note that the script itself should not contain an ILLUSTRATE statement.

grunt> cat visits.txt
Amy     yahoo.com       19990421
Fred    harvard.edu     19991104
Amy     cnn.com 20070218
Frank   nba.com 20070305
Fred    berkeley.edu    20071204
Fred    stanford.edu    20071206

grunt> cat visits.pig
visits = LOAD 'visits.txt' AS (user, url, timestamp);
recent_visits = FILTER visits BY timestamp >= '20071201';
historical_visits = FILTER visits BY timestamp <= '20000101';
DUMP recent_visits;
DUMP historical_visits;
STORE recent_visits INTO 'recent';
STORE historical_visits INTO 'historical';

grunt> exec visits.pig

(Fred,berkeley.edu,20071204)
(Fred,stanford.edu,20071206)

(Amy,yahoo.com,19990421)
(Fred,harvard.edu,19991104)


grunt> illustrate -script visits.pig

------------------------------------------------------------------------
| visits     | user: bytearray | url: bytearray | timestamp: bytearray |
------------------------------------------------------------------------
|            | Amy             | yahoo.com      | 19990421             |
|            | Fred            | stanford.edu   | 20071206             |
------------------------------------------------------------------------
-------------------------------------------------------------------------------
| recent_visits     | user: bytearray | url: bytearray | timestamp: bytearray |
-------------------------------------------------------------------------------
|                   | Fred            | stanford.edu   | 20071206             |
-------------------------------------------------------------------------------
---------------------------------------------------------------------------------------
| Store : recent_visits     | user: bytearray | url: bytearray | timestamp: bytearray |
---------------------------------------------------------------------------------------
|                           | Fred            | stanford.edu   | 20071206             |
---------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------
| historical_visits     | user: bytearray | url: bytearray | timestamp: bytearray |
-----------------------------------------------------------------------------------
|                       | Amy             | yahoo.com      | 19990421             |
-----------------------------------------------------------------------------------
-------------------------------------------------------------------------------------------
| Store : historical_visits     | user: bytearray | url: bytearray | timestamp: bytearray |
-------------------------------------------------------------------------------------------
|                               | Amy             | yahoo.com      | 19990421             |
-------------------------------------------------------------------------------------------

Pig Scripts and MapReduce Job IDs

Complex Pig scripts often generate many MapReduce jobs. To help you debug a script, Pig prints a summary of the execution that shows which relations (aliases) are mapped to each MapReduce job.

JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime 
    MinReduceTime AvgReduceTime Alias Feature Outputs
job_201004271216_12712 1 1 3 3 3 12 12 12 B,C GROUP_BY,COMBINER
job_201004271216_12713 1 1 3 3 3 12 12 12 D SAMPLER
job_201004271216_12714 1 1 3 3 3 12 12 12 D ORDER_BY,COMBINER 
    hdfs://mymachine.com:9020/tmp/temp743703298/tmp-2019944040,

Pig Statistics

Pig Statistics is a framework for collecting and storing script-level statistics for Pig Latin. Characteristics of Pig Latin scripts and the resulting MapReduce jobs are collected while the script is executed. These statistics are then available for Pig users and tools using Pig (such as Oozie) to retrieve after the job is done.

The new Pig statistics and the existing Hadoop statistics can also be accessed via the Hadoop job history file (and job xml file). Piggybank has a HadoopJobHistoryLoader which acts as an example of using Pig itself to query these statistics (the loader can be used as a reference implementation but is NOT supported for production use).

Java API

Several new public classes make it easier for external tools such as Oozie to integrate with Pig statistics.

The Pig statistics are available here: http://pig.apache.org/docs/r0.9.0/api/

The stats classes are in the package: org.apache.pig.tools.pigstats

  • PigStats
  • JobStats
  • OutputStats
  • InputStats

The PigRunner class mimics the behavior of the Main class but gives users a statistics object back. Optionally, you can call the API with an implementation of progress listener which will be invoked by Pig runtime during the execution.

package org.apache.pig;

public abstract class PigRunner {
    public static PigStats run(String[] args, PigProgressNotificationListener listener)
}

public interface PigProgressNotificationListener extends java.util.EventListener {
    // just before the launch of MR jobs for the script
    public void LaunchStartedNotification(int numJobsToLaunch);
    // number of jobs submitted in a batch
    public void jobsSubmittedNotification(int numJobsSubmitted);
    // a job is started
    public void jobStartedNotification(String assignedJobId);
    // a job is completed successfully
    public void jobFinishedNotification(JobStats jobStats);
    // a job is failed
    public void jobFailedNotification(JobStats jobStats);
    // a user output is completed successfully
    public void outputCompletedNotification(OutputStats outputStats);
    // updates the progress as percentage
    public void progressUpdatedNotification(int progress);
    // the script execution is done
    public void launchCompletedNotification(int numJobsSucceeded);
}

Job XML

The following entries are included in job conf:

Pig Statistic

Description

pig.script.id

The UUID for the script. All jobs spawned by the script have the same script ID.

pig.script

The base64 encoded script text.

pig.command.line

The command line used to invoke the script.

pig.hadoop.version

The Hadoop version installed.

pig.version

The Pig version used.

pig.input.dirs

A comma-separated list of input directories for the job.

pig.map.output.dirs

A comma-separated list of output directories in the map phase of the job.

pig.reduce.output.dirs

A comma-separated list of output directories in the reduce phase of the job.

pig.parent.jobid

A comma-separated list of parent job ids.

pig.script.features

A list of Pig features used in the script.

pig.job.feature

A list of Pig features used in the job.

pig.alias

The alias associated with the job.

Hadoop Job History Loader

The HadoopJobHistoryLoader in Piggybank loads Hadoop job history files and job xml files from file system. For each MapReduce job, the loader produces a tuple with schema (j:map[], m:map[], r:map[]). The first map in the schema contains job-related entries. Here are some of important key names in the map:

PIG_SCRIPT_ID

CLUSTER

QUEUE_NAME

JOBID

JOBNAME

STATUS

USER

HADOOP_VERSION

PIG_VERSION

PIG_JOB_FEATURE

PIG_JOB_ALIAS

PIG_JOB_PARENTS

SUBMIT_TIME

LAUNCH_TIME

FINISH_TIME

TOTAL_MAPS

TOTAL_REDUCES

Examples that use the loader to query Pig statistics are shown below.

Examples

Find scripts that generate more then three MapReduce jobs:

a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
b = group a by (j#'PIG_SCRIPT_ID', j#'USER', j#'JOBNAME');
c = foreach b generate group.$1, group.$2, COUNT(a);
d = filter c by $2 > 3;
dump d;

Find the running time of each script (in seconds):

a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, 
         (Long) j#'SUBMIT_TIME' as start, (Long) j#'FINISH_TIME' as end;
c = group b by (id, user, script_name)
d = foreach c generate group.user, group.script_name, (MAX(b.end) - MIN(b.start)/1000;
dump d;

Find the number of scripts run by user and queue on a cluster:

a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'QUEUE_NAME' as queue;
c = group b by (id, user, queue) parallel 10;
d = foreach c generate group.user, group.queue, COUNT(b);
dump d;

Find scripts that have failed jobs:

a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
b = foreach a generate (Chararray) j#'STATUS' as status, j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, j#'JOBID' as job;
c = filter b by status != 'SUCCESS';
dump c;

Find scripts that use only the default parallelism:

a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, (Long) r#'NUMBER_REDUCES' as reduces;
c = group b by (id, user, script_name) parallel 10;
d = foreach c generate group.user, group.script_name, MAX(b.reduces) as max_reduces;
e = filter d by max_reduces == 1;
dump e;

PigUnit

PigUnit is a simple xUnit framework that enables you to easily test your Pig scripts. With PigUnit you can perform unit testing, regression testing, and rapid prototyping. No cluster set up is required if you run Pig in local mode.

Build PigUnit

To compile PigUnit run the command shown below from the Pig trunk. The compile will create the pigunit.jar file.

$pig_trunk ant pigunit-jar   

Run PigUnit

You can run PigUnit using Pig's local mode or mapreduce mode.

Local Mode

PigUnit runs in Pig's local mode by default. Local mode is fast and enables you to use your local file system as the HDFS cluster. Local mode does not require a real cluster but a new local one is created each time.

Mapreduce Mode

PigUnit also runs in Pig's mapreduce mode. Mapreduce mode requires you to use a Hadoop cluster and HDFS installation. It is enabled when the Java system property pigunit.exectype.cluster is set to any value: e.g. -Dpigunit.exectype.cluster=true or System.getProperties().setProperty("pigunit.exectype.cluster", "true"). The cluster you select must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable).

PigUnit Example

Many PigUnit examples are available in the PigUnit tests.

The example included here computes the top N of the most common queries. The Pig script, top_queries.pig, is similar to the Query Phrase Popularity in the Pig tutorial. It expects an input a file of queries and a parameter n (n is 2 in our case in order to do a top 2).

Setting up a test for this script is easy because the argument and the input data are specified by two text arrays. It is the same for the expected output of the script that will be compared to the actual result of the execution of the Pig script.

Java Test

  @Test
  public void testTop2Queries() {
    String[] args = {
        "n=2",
        };
 
    PigTest test = new PigTest("top_queries.pig", args);
 
    String[] input = {
        "yahoo",
        "yahoo",
        "yahoo",
        "twitter",
        "facebook",
        "facebook",
        "linkedin",
    };
 
    String[] output = {
        "(yahoo,3)",
        "(facebook,2)",
    };
 
    test.assertOutput("data", input, "queries_limit", output);
  }

top_queries.pig

data =
    LOAD 'input'
    AS (query:CHARARRAY);
     
queries_group =
    GROUP data
    BY query; 
    
queries_count = 
    FOREACH queries_group 
    GENERATE 
        group AS query, 
        COUNT(data) AS total;
        
queries_ordered =
    ORDER queries_count
    BY total DESC, query;
            
queries_limit =
    LIMIT queries_ordered $n;

STORE queries_limit INTO 'output';

Run

The test can be executed by JUnit (or any other Java testing framework). It requires:

  1. pig.jar
  2. pigunit.jar

The test takes about 25s to run and should pass. In case of error (for example change the parameter n to n=3), the diff of output is displayed:

junit.framework.ComparisonFailure: null expected:<...ahoo,3)
(facebook,2)[]> but was:<...ahoo,3)
(facebook,2)[
(linkedin,1)]>
        at junit.framework.Assert.assertEquals(Assert.java:81)
        at junit.framework.Assert.assertEquals(Assert.java:87)
        at org.apache.pig.pigunit.PigTest.assertEquals(PigTest.java:272)

Troubleshooting Tips

Common problems you may encounter are discussed below.

Classpath in Mapreduce Mode

When using PigUnit in mapreduce mode, be sure to include the $HADOOP_CONF_DIR of the cluster in your CLASSPATH.

The default value is ~/pigtest/conf.

org.apache.pig.backend.executionengine.ExecException: 
ERROR 4010: Cannot find hadoop configurations in classpath 
(neither hadoop-site.xml nor core-site.xml was found in the classpath).
If you plan to use local mode, please put -x local option in command line

UDF jars Not Found

This error means that you are missing some jars in your test environment.

WARN util.JarManager: Couldn't find the jar for 
org.apache.pig.piggybank.evaluation.string.LOWER, skip it

Storing Data

Pig currently drops all STORE and DUMP commands. You can tell PigUnit to keep the commands and execute the script:

test = new PigTest(PIG_SCRIPT, args);   
test.unoverride("STORE");
test.runScript();

Cache Archive

For cache archive to work, your test environment needs to have the cache archive options specified by Java properties or in an additional XML configuration in its CLASSPATH.

If you use a local cluster, you need to set the required environment variables before starting it:

export LD_LIBRARY_PATH=/home/path/to/lib

Future Enhancements

Improvements and other components based on PigUnit that could be built later.

For example, we could build a PigTestCase and PigTestSuite on top of PigTest to:

  1. Add the notion of workspaces for each test.
  2. Remove the boiler plate code appearing when there is more than one test methods.
  3. Add a standalone utility that reads test configurations and generates a test report.

Penny

Note: Penny is an experimental feature.

Penny is a framework for creating Pig monitoring and debugging tools. Penny comes with a library of tools (see Penny Tool Library). However, the real power of Penny is in creating your own custom monitoring and debugging tools using Penny's simple API.

How it Works

Before you can create a tool, you need to understand how Penny instruments Pig scripts (called "dataflow programs" in the following diagram).

Penny Architecture

As shown in the diagram, Penny inserts one or more monitor agents (called "Penny agent" in the diagram) between steps of the Pig script, which observe data flowing between the Pig script steps. Monitor agents run arbitrary Java code as needed for your tool, which has access to some primitives for tagging records and communicating with other agents and with a central coordinator process (called "Penny coordinator" in the diagram). The coordinator also runs arbitrary code defined by your tool.

The whole thing is kicked off by the tool's Main program (called "application" in the diagram), which receives instructions from the user (e.g. "please figure out why this Pig script keeps crashing"), launches one or more runs of the Pig script instrumented with monitor agents, and reports the outcome back to the user (e.g. "the crash appears to be caused by one of these records: ...").

API

You need to write three Java classes: a Main class, a Coordinator class, and a MonitorAgent class (for certain, fancy tools, you may need multiple MonitorAgent classes). You can find many examples of Main/Coordinator/MonitorAgent classes that define Penny tools in the Penny source code (/pig/trunk/contrib/penny/java/src/main/java/) under org.apache.pig.penny.apps. All of the tools described in Penny Tool Library are written using this API, so you've got plenty of examples to work with. We'll paste a few code fragments below to get you going -- in fact the entire code for the "data samples" tool (all 97 lines of Java) is included below.

Main Class

Your Main class is the "shell" of your application. It receives instructions from the user, and configures and launches one or more Penny-instrumented runs of the user's Pig script.

You talk to Penny via the PennyServer class. You can do two things: (1) parse a user's Pig script and (2) launch an Penny-instrumented run of the Pig script. Here is the Main class for the data samples tool, described in Penny Tool Library:

import java.util.HashMap;
import java.util.Map;
import org.apache.pig.penny.ClassWithArgs;
import org.apache.pig.penny.ParsedPigScript;
import org.apache.pig.penny.PennyServer;

/**
 * Data samples app.
 */
public class Main {
    public static void main(String[] args) throws Exception {
        PennyServer pennyServer = new PennyServer();
        String pigScriptFilename = args[0];
        ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
        Map<String, ClassWithArgs> monitorClasses = new HashMap<String, ClassWithArgs>();
        for (String alias : parsedPigScript.aliases()) {
            monitorClasses.put(alias, new ClassWithArgs(DSMonitorAgent.class));
        }
        parsedPigScript.trace(DSCoordinator.class, monitorClasses);
    }
}      

The "monitorClasses" map dictates which monitor agent (if any) to place after each dataflow step (steps are identified by Pig script aliases). You can also pass arguments to each monitor agent, and/or to the coordinator, as shown in this example for the "data histograms" tool:

import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pig.penny.ClassWithArgs;
import org.apache.pig.penny.ParsedPigScript;
import org.apache.pig.penny.PennyServer;

/**
 * Data summaries app. that computes a histogram of one of the fields of one of the intermediate data sets.
 */

public class Main {
    public static void main(String[] args) throws Exception {
        PennyServer pennyServer = new PennyServer();
        String pigScriptFilename = args[0];
        ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
        String alias = args[1]; // which alias to create histogram for
        int fieldNo = Integer.parseInt(args[2]); // which field to create histogram for
        int min = Integer.parseInt(args[3]); // min field value
        int max = Integer.parseInt(args[4]); // max field value
        int bucketSize = Integer.parseInt(args[5]); // histogram bucket size
        if (!parsedPigScript.aliases().contains(alias)) throw new IllegalArgumentException("No such alias.");
        Map<String, ClassWithArgs> monitorClasses = new HashMap<String, ClassWithArgs>();
        monitorClasses.put(alias, new ClassWithArgs(DHMonitorAgent.class, fieldNo, min, max, bucketSize));
        TreeMap<Integer, Integer> histogram = (TreeMap<Integer, Integer>) parsedPigScript.trace(DHCoordinator.class, monitorClasses);
        System.out.println("Histogram: " + histogram);
    }
}    

MonitorAgent Class

Monitor agents implement the following API:

 /**
  * Furnish set of fields to monitor. (Null means monitor all fields ('*').)
  * /
 public abstract Set<Integer> furnishFieldsToMonitor(); /**
  * Initialize, using any arguments passed from higher layer.
  * /
 public abstract void init(Serializable[] args);
 /**
  * Process a tuple that passes through the monitoring point.
  *
  * @param t   the tuple
  * @param tag t's tags
  * @return FILTER_OUT to remove the tuple from the data stream; 
  *    NO_TAGS to let it pass through and not give it any tags; 
  *    a set of tags to let it pass through and assign those tags
  */
 public abstract Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException;
 /**
  * Process an incoming (synchronous or asynchronous) message.
  */
 public abstract void receiveMessage(Location source, Tuple message);
 /**
  * No more tuples are going to pass through the monitoring point. Finish any ongoing processing.
  */
 public abstract void finish();      
      

Here's an example from the "data samples" tool:

import java.io.Serializable; import java.util.Set;

import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; 

import org.apache.pig.penny.Location; import org.apache.pig.penny.MonitorAgent;

public class DSMonitorAgent extends MonitorAgent {

    private final static int NUM_SAMPLES = 5;
    private int tupleCount = 0;
    public void finish() { }
    public Set<Integer> furnishFieldsToMonitor() {
        return null;
    }
    public void init(Serializable[] args) { }
    public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
        if (tupleCount++ < NUM_SAMPLES) {
            communicator().sendToCoordinator(t);
        }
        return tags;
    }
    public void receiveMessage(Location source, Tuple message) { }
}      
      

Monitor agents have access to a "communicator" object, which is the gateway for sending messages to other agents or to the coordinator. The communicator API is:

  /**
  * Find out my (physical) location.
  * /
 public abstract Location myLocation();
 /**
  * Send an message to the coordinator, asynchronously.
  * /
 public abstract void sendToCoordinator(Tuple message);
 /**
  * Send a message to immediate downstream neighbor(s), synchronously.
  * If downstream neighbor(s) span a task boundary, all instances will receive it; otherwise only same-task instances will receive it.
  * If there is no downstream neighbor, an exception will be thrown.
  * /
 public abstract void sendDownstream(Tuple message) throws NoSuchLocationException;
 /**
  * Send a message to immediate upstream neighbor(s), synchronously.
  * If upstream neighbor(s) are non-existent or span a task boundary, an exception will be thrown.
  * /
 public abstract void sendUpstream(Tuple message) throws NoSuchLocationException;
 /**
  * Send a message to current/future instances of a given logical location.
  * Instances that have already terminated will not receive the message (obviously).
  * Instances that are currently executing will receive it asynchronously (or perhaps not at all, if they terminate before the message arrives).
  * Instances that have not yet started will receive the message prior to beginning processing of tuples.
  * /
 public abstract void sendToAgents(LogicalLocation destination, Tuple message) throws NoSuchLocationException; 
 // The following methods mirror the ones above, but take care of packaging a list of objects into a tuple (you're welcome!) ...
 public void sendToCoordinator(Object ... message) {
  . sendToCoordinator(makeTuple(message));
 }
 public void sendDownstream(Object ... message) throws NoSuchLocationException {
  . sendDownstream(makeTuple(message));
 }
 public void sendUpstream(Object ... message) throws NoSuchLocationException {
  . sendUpstream(makeTuple(message));
 }
 public void sendToAgents(LogicalLocation destination, Object ... message) throws NoSuchLocationException {
  . sendToAgents(destination, makeTuple(message));
 }     

Coordinator Class

Your tool's coordinator implements the following API:

 /**
  * Initialize, using any arguments passed from higher layer.
  * /
 public abstract void init(Serializable[] args);
 /**
  * Process an incoming (synchronous or asynchronous) message.
  * /
 public abstract void receiveMessage(Location source, Tuple message); /**
  * The data flow has completed and all messages have been delivered. Finish processing.
  *   * @return              final output to pass back to application
  * /
 public abstract Object finish();      
      

The coordinator for the "data samples" tool is:

import java.io.Serializable;
import org.apache.pig.data.Tuple;
import org.apache.pig.penny.Coordinator;
import org.apache.pig.penny.Location;
public class DSCoordinator extends Coordinator {
    public void init(Serializable[] args) { }
    public Object finish() {
        return null;
    }
    public void receiveMessage(Location source, Tuple message) {
        System.out.println("*** SAMPLE RECORD AT ALIAS " + source.logId() + ": " + truncate(message));
    }
    private String truncate(Tuple t) {
        String s = t.toString();
        return s.substring(0, Math.min(s.length(), 100));
    }
}