Package org.apache.pig.backend.hadoop.executionengine.physicalLayer

Implementation of physical operators that use hadoop as the execution engine and data storage.


Interface Summary
PigLogger An interface to allow aggregation of messages
PigProgressable Pig's progress indicator.

Class Summary
PhysicalOperator This is the base class for all operators.

Exception Summary

Package org.apache.pig.backend.hadoop.executionengine.physicalLayer Description

Implementation of physical operators that use hadoop as the execution engine and data storage.


Physical operators use the operator, plan, visitor, and optimizer framework provided by the org.apache.pig.impl.plan package.

As with org.apache.pig.impl.logicalLayer, physical operators consist of org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators and org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators. In many data processing systems relational operators and expression operators are modeled as different entities because they behave differently. Pig blurs, though does not entirely remove, this distinction because of its support for nested operations.

Conceptually, relational operators work on an entire relation (in Pig's case, a bag). In terms of implementation, they operate on one record (tuple) at a time. This avoids needing to load the entire relation into memory before operating on it.

Expression operators, on the other hand, operate on the assumption that they are provided their entire input at invocation time and provide their entire output when they are finished.

Pig's hadoop implementation implements a pull based model, where each operator calls getNext() on the operator before it in the plan. getNext() is implemented for each of the different data types, so that operators can request the data type they expect. Relational operators will always expect a tuple. Expression operators can request any data type.

As with the logical plan, physical relational operators often have embedded physical plans. When a relational operator calls getNext() on its predecessor and receives a tuple, it will attach that tuple to its embedded physical plan(s) and then call getNext() on the root node(s) of those plan(s) in order to get the output. For example, the Pig Latin filter A by $0 != 5 will produce a POFilter object, with an embedded physical plan that consists of POProject(0), POConst(5), both attached to PONotEqual. Each time POFilter.getNext() is called, it will call its predecessors getNext() method, and then attach the input to POProject and POConst. It will then call PONotEqual.getNext(). PONotEqual will in turn call POProject.getNext() and POConst.getNext(), and then evaluate and return the results. If the result is true, POFilter will return its input tuple. If the answer is false, it will call it's predecessor's getNext() method and try again.

Given Pig's nested data and execution models, there are places it is necessary to move between relational and expression operators. Consider the following Pig Latin script: A = load 'myfile'; B = group A by $0; C = foreach B { C1 = filter $1 by $0 > 0; C2 = distinct C1; generate group, COUNT(C2), SUM(C1.$0); } In particular, the foreach section presents some interesting challenges.

First, foreach has three separate outputs, all of which require separate but parallel executions. To address this, each element of the foreach is described by a separate embedded plan. This can cause duplication of operations, as in this plan. In this case splitting the plans for COUNT and SUM cause a double execution of the C1 = filter section of the script. But it avoids needing to place a split operator between filter -> distinct and filter -> SUM.

The second issue presented by the nested logic is that the foreach operator is going to receive a tuple with the format ($0, bag), where bag is a collection of all the tuples with a given value for $0. It will then attach that to the filter. But filter does not expect a bag. It expects to get tuples. On the other end, distinct will be outputing tuples. But COUNT() expects C2 to be a bag that can be processed by COUNT as a whole.

To address this issue, some operators have been modified to provide "bookend" functionality. That is, the ability to translate between relational and expression operators. The embedded plan for calculating the COUNT in the foreach will look like: POProject(1) -> PODistinct -> POProject(*) -> COUNT(). The first POProject(1) will have a bag attached as its input by POForeach. But POFilter will call getNext(Tuple). In this case, POProject will know to open the bag and provide the tuples one at a time, until the bag is empty, at which point it will return STATUS_EOP. The PODistinct will be expecting to return tuples, but POProject(*) will call getNext(bag). In this case all relational operators will be able to accumulate all of the tuples by calling getNext(tuple) on themselves until they see STATUS_EOP, packaging those tuples into a bag, and then returning that bag.

And third, project is being subtly overloaded here. In cases where the script says C = foreach B generate $1, this type of projection means take the second element from the tuple and project it. But in cases like C = foreach B generate SUM($1.$0) and $1 is a bag, this type of projection expects to receive a bag ($1) and output a modified bag ($1 with only the first field, $0, remaining in all the tuples in the bag). To handle this issue, POProject will, when it sees that its predecessor is a POProject and its successors is an expression operator it will perform a projection on the bag (that is, perform the specified project on each tuple in the bag) rather than on a tuple.

Copyright © ${year} The Apache Software Foundation