org.apache.pig.piggybank.storage.avro
Class AvroStorage

java.lang.Object
  extended by org.apache.pig.LoadFunc
      extended by org.apache.pig.FileInputLoadFunc
          extended by org.apache.pig.piggybank.storage.avro.AvroStorage
All Implemented Interfaces:
LoadMetadata, OrderedLoadFunc, StoreFuncInterface

public class AvroStorage
extends FileInputLoadFunc
implements StoreFuncInterface, LoadMetadata

AvroStorage is used to load/store Avro data
Document can be found here


Constructor Summary
AvroStorage()
          Empty constructor.
AvroStorage(String[] parts)
          Constructor of quoted string list
 
Method Summary
 void checkSchema(ResourceSchema s)
          Append newly specified schema
 void cleanupOnFailure(String location, org.apache.hadoop.mapreduce.Job job)
          This method will be called by Pig if the job which contains this store fails.
protected  org.apache.avro.Schema getAvroSchema(org.apache.hadoop.fs.Path path, org.apache.hadoop.fs.FileSystem fs)
          Get avro schema of input path.
protected  org.apache.avro.Schema getAvroSchema(String location, org.apache.hadoop.mapreduce.Job job)
           
 org.apache.hadoop.mapreduce.InputFormat getInputFormat()
          This will be called during planning on the front end.
 Tuple getNext()
          Retrieves the next tuple to be processed.
 org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
          Return the OutputFormat associated with StoreFuncInterface.
 String[] getPartitionKeys(String location, org.apache.hadoop.mapreduce.Job job)
          Find what columns are partition keys for this input.
protected  org.apache.avro.Schema getSchema(org.apache.hadoop.fs.Path path, org.apache.hadoop.fs.FileSystem fs)
          This method is called by getAvroSchema(java.lang.String, org.apache.hadoop.mapreduce.Job).
 ResourceSchema getSchema(String location, org.apache.hadoop.mapreduce.Job job)
          Get avro schema from "location" and return the converted PigSchema.
protected  org.apache.avro.Schema getSchemaFromFile(org.apache.hadoop.fs.Path path, org.apache.hadoop.fs.FileSystem fs)
          This method is called to return the schema of an avro schema file.
 ResourceStatistics getStatistics(String location, org.apache.hadoop.mapreduce.Job job)
          Get statistics about the data to be loaded.
protected  void init(Map<String,Object> inputs)
          Initialize output avro schema using input property map
protected  Map<String,Object> parseJsonString(String jsonString)
          build a property map from a json object
protected  Map<String,Object> parseStringList(String[] parts)
          build a property map from a string list
 void prepareToRead(org.apache.hadoop.mapreduce.RecordReader reader, PigSplit split)
          Initializes LoadFunc for reading data.
 void prepareToWrite(org.apache.hadoop.mapreduce.RecordWriter writer)
          Initialize StoreFuncInterface to write data.
 void putNext(Tuple t)
          Write a tuple to the data store.
 String relToAbsPathForStoreLocation(String location, org.apache.hadoop.fs.Path curDir)
          This method is called by the Pig runtime in the front end to convert the output location to an absolute path if the location is relative.
 void setLocation(String location, org.apache.hadoop.mapreduce.Job job)
          Set input location and obtain input schema.
 void setPartitionFilter(Expression partitionFilter)
          Set the filter for partitioning.
 void setStoreFuncUDFContextSignature(String signature)
          This method will be called by Pig both in the front end and back end to pass a unique signature to the StoreFuncInterface which it can use to store information in the UDFContext which it needs to store between various method invocations in the front end and back end.
 void setStoreLocation(String location, org.apache.hadoop.mapreduce.Job job)
          Communicate to the storer the location where the data needs to be stored.
 
Methods inherited from class org.apache.pig.FileInputLoadFunc
getSplitComparable
 
Methods inherited from class org.apache.pig.LoadFunc
getAbsolutePath, getLoadCaster, getPathStrings, join, relativeToAbsolutePath, setUDFContextSignature, warn
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

AvroStorage

public AvroStorage()
Empty constructor. Output schema is derived from pig schema.


AvroStorage

public AvroStorage(String[] parts)
            throws IOException,
                   org.json.simple.parser.ParseException
Constructor of quoted string list

Parameters:
parts - quoted string list
Throws:
IOException
org.json.simple.parser.ParseException
Method Detail

setLocation

public void setLocation(String location,
                        org.apache.hadoop.mapreduce.Job job)
                 throws IOException
Set input location and obtain input schema. FIXME: currently we assume all avro files under the same "location" share the same schema and will throw exception if not.

Specified by:
setLocation in class LoadFunc
Parameters:
location - Location as returned by LoadFunc.relativeToAbsolutePath(String, Path)
job - the Job object store or retrieve earlier stored information from the UDFContext
Throws:
IOException - if the location is not valid.

getAvroSchema

protected org.apache.avro.Schema getAvroSchema(String location,
                                               org.apache.hadoop.mapreduce.Job job)
                                        throws IOException
Throws:
IOException

getAvroSchema

protected org.apache.avro.Schema getAvroSchema(org.apache.hadoop.fs.Path path,
                                               org.apache.hadoop.fs.FileSystem fs)
                                        throws IOException
Get avro schema of input path. There are three cases: 1. if path is a file, then return its avro schema; 2. if path is a first-level directory (no sub-directories), then return the avro schema of one underlying file; 3. if path contains sub-directories, then recursively check whether all of them share the same schema and return it if so or throw an exception if not.

Parameters:
path - input path
fs - file system
Returns:
avro schema of data
Throws:
IOException - if underlying sub-directories do not share the same schema; or if input path is empty or does not exist

getSchema

protected org.apache.avro.Schema getSchema(org.apache.hadoop.fs.Path path,
                                           org.apache.hadoop.fs.FileSystem fs)
                                    throws IOException
This method is called by getAvroSchema(java.lang.String, org.apache.hadoop.mapreduce.Job). The default implementation returns the schema of an avro file; or the schema of the last file in a first-level directory (it does not contain sub-directories).

Parameters:
path - path of a file or first level directory
fs - file system
Returns:
avro schema
Throws:
IOException

getSchemaFromFile

protected org.apache.avro.Schema getSchemaFromFile(org.apache.hadoop.fs.Path path,
                                                   org.apache.hadoop.fs.FileSystem fs)
                                            throws IOException
This method is called to return the schema of an avro schema file. This method is different than getSchema(org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FileSystem), which returns the schema from a data file.

Parameters:
path - path of a file or first level directory
fs - file system
Returns:
avro schema
Throws:
IOException

getInputFormat

public org.apache.hadoop.mapreduce.InputFormat getInputFormat()
                                                       throws IOException
Description copied from class: LoadFunc
This will be called during planning on the front end. This is the instance of InputFormat (rather than the class name) because the load function may need to instantiate the InputFormat in order to control how it is constructed.

Specified by:
getInputFormat in class LoadFunc
Returns:
the InputFormat associated with this loader.
Throws:
IOException - if there is an exception during InputFormat construction

prepareToRead

public void prepareToRead(org.apache.hadoop.mapreduce.RecordReader reader,
                          PigSplit split)
                   throws IOException
Description copied from class: LoadFunc
Initializes LoadFunc for reading data. This will be called during execution before any calls to getNext. The RecordReader needs to be passed here because it has been instantiated for a particular InputSplit.

Specified by:
prepareToRead in class LoadFunc
Parameters:
reader - RecordReader to be used by this instance of the LoadFunc
split - The input PigSplit to process
Throws:
IOException - if there is an exception during initialization

getNext

public Tuple getNext()
              throws IOException
Description copied from class: LoadFunc
Retrieves the next tuple to be processed. Implementations should NOT reuse tuple objects (or inner member objects) they return across calls and should return a different tuple object in each call.

Specified by:
getNext in class LoadFunc
Returns:
the next tuple to be processed or null if there are no more tuples to be processed.
Throws:
IOException - if there is an exception while retrieving the next tuple

getSchema

public ResourceSchema getSchema(String location,
                                org.apache.hadoop.mapreduce.Job job)
                         throws IOException
Get avro schema from "location" and return the converted PigSchema.

Specified by:
getSchema in interface LoadMetadata
Parameters:
location - Location as returned by LoadFunc.relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)
job - The Job object - this should be used only to obtain cluster properties through JobContext.getConfiguration() and not to set/query any runtime job information.
Returns:
schema for the data to be loaded. This schema should represent all tuples of the returned data. If the schema is unknown or it is not possible to return a schema that represents all returned data, then null should be returned. The schema should not be affected by pushProjection, ie. getSchema should always return the original schema even after pushProjection
Throws:
IOException - if an exception occurs while determining the schema

getStatistics

public ResourceStatistics getStatistics(String location,
                                        org.apache.hadoop.mapreduce.Job job)
                                 throws IOException
Description copied from interface: LoadMetadata
Get statistics about the data to be loaded. If no statistics are available, then null should be returned.

Specified by:
getStatistics in interface LoadMetadata
Parameters:
location - Location as returned by LoadFunc.relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)
job - The Job object - this should be used only to obtain cluster properties through JobContext.getConfiguration() and not to set/query any runtime job information.
Returns:
statistics about the data to be loaded. If no statistics are available, then null should be returned.
Throws:
IOException - if an exception occurs while retrieving statistics

getPartitionKeys

public String[] getPartitionKeys(String location,
                                 org.apache.hadoop.mapreduce.Job job)
                          throws IOException
Description copied from interface: LoadMetadata
Find what columns are partition keys for this input.

Specified by:
getPartitionKeys in interface LoadMetadata
Parameters:
location - Location as returned by LoadFunc.relativeToAbsolutePath(String, org.apache.hadoop.fs.Path)
job - The Job object - this should be used only to obtain cluster properties through JobContext.getConfiguration() and not to set/query any runtime job information.
Returns:
array of field names of the partition keys. Implementations should return null to indicate that there are no partition keys
Throws:
IOException - if an exception occurs while retrieving partition keys

setPartitionFilter

public void setPartitionFilter(Expression partitionFilter)
                        throws IOException
Description copied from interface: LoadMetadata
Set the filter for partitioning. It is assumed that this filter will only contain references to fields given as partition keys in getPartitionKeys. So if the implementation returns null in LoadMetadata.getPartitionKeys(String, Job), then this method is not called by Pig runtime. This method is also not called by the Pig runtime if there are no partition filter conditions.

Specified by:
setPartitionFilter in interface LoadMetadata
Parameters:
partitionFilter - that describes filter for partitioning
Throws:
IOException - if the filter is not compatible with the storage mechanism or contains non-partition fields.

parseJsonString

protected Map<String,Object> parseJsonString(String jsonString)
                                      throws org.json.simple.parser.ParseException
build a property map from a json object

Parameters:
jsonString - json object in string format
Returns:
a property map
Throws:
org.json.simple.parser.ParseException

parseStringList

protected Map<String,Object> parseStringList(String[] parts)
                                      throws IOException
build a property map from a string list

Parameters:
parts - input string list
Returns:
a property map
Throws:
IOException
org.json.simple.parser.ParseException

init

protected void init(Map<String,Object> inputs)
             throws IOException
Initialize output avro schema using input property map

Throws:
IOException

relToAbsPathForStoreLocation

public String relToAbsPathForStoreLocation(String location,
                                           org.apache.hadoop.fs.Path curDir)
                                    throws IOException
Description copied from interface: StoreFuncInterface
This method is called by the Pig runtime in the front end to convert the output location to an absolute path if the location is relative. The StoreFuncInterface implementation is free to choose how it converts a relative location to an absolute location since this may depend on what the location string represent (hdfs path or some other data source). The static method LoadFunc.getAbsolutePath(java.lang.String, org.apache.hadoop.fs.Path) provides a default implementation for hdfs and hadoop local file system and it can be used to implement this method.

Specified by:
relToAbsPathForStoreLocation in interface StoreFuncInterface
Parameters:
location - location as provided in the "store" statement of the script
curDir - the current working direction based on any "cd" statements in the script before the "store" statement. If there are no "cd" statements in the script, this would be the home directory -
/user/ 
Returns:
the absolute location based on the arguments passed
Throws:
IOException - if the conversion is not possible

setStoreLocation

public void setStoreLocation(String location,
                             org.apache.hadoop.mapreduce.Job job)
                      throws IOException
Description copied from interface: StoreFuncInterface
Communicate to the storer the location where the data needs to be stored. The location string passed to the StoreFuncInterface here is the return value of StoreFuncInterface.relToAbsPathForStoreLocation(String, Path) This method will be called in the frontend and backend multiple times. Implementations should bear in mind that this method is called multiple times and should ensure there are no inconsistent side effects due to the multiple calls. StoreFuncInterface.checkSchema(ResourceSchema) will be called before any call to StoreFuncInterface.setStoreLocation(String, Job).

Specified by:
setStoreLocation in interface StoreFuncInterface
Parameters:
location - Location returned by StoreFuncInterface.relToAbsPathForStoreLocation(String, Path)
job - The Job object
Throws:
IOException - if the location is not valid.

checkSchema

public void checkSchema(ResourceSchema s)
                 throws IOException
Append newly specified schema

Specified by:
checkSchema in interface StoreFuncInterface
Parameters:
s - to be checked
Throws:
IOException - if this schema is not acceptable. It should include a detailed error message indicating what is wrong with the schema.

getOutputFormat

public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
                                                         throws IOException
Description copied from interface: StoreFuncInterface
Return the OutputFormat associated with StoreFuncInterface. This will be called on the front end during planning and on the backend during execution.

Specified by:
getOutputFormat in interface StoreFuncInterface
Returns:
the OutputFormat associated with StoreFuncInterface
Throws:
IOException - if an exception occurs while constructing the OutputFormat

prepareToWrite

public void prepareToWrite(org.apache.hadoop.mapreduce.RecordWriter writer)
                    throws IOException
Description copied from interface: StoreFuncInterface
Initialize StoreFuncInterface to write data. This will be called during execution before the call to putNext.

Specified by:
prepareToWrite in interface StoreFuncInterface
Parameters:
writer - RecordWriter to use.
Throws:
IOException - if an exception occurs during initialization

setStoreFuncUDFContextSignature

public void setStoreFuncUDFContextSignature(String signature)
Description copied from interface: StoreFuncInterface
This method will be called by Pig both in the front end and back end to pass a unique signature to the StoreFuncInterface which it can use to store information in the UDFContext which it needs to store between various method invocations in the front end and back end. This is necessary because in a Pig Latin script with multiple stores, the different instances of store functions need to be able to find their (and only their) data in the UDFContext object.

Specified by:
setStoreFuncUDFContextSignature in interface StoreFuncInterface
Parameters:
signature - a unique signature to identify this StoreFuncInterface

cleanupOnFailure

public void cleanupOnFailure(String location,
                             org.apache.hadoop.mapreduce.Job job)
                      throws IOException
Description copied from interface: StoreFuncInterface
This method will be called by Pig if the job which contains this store fails. Implementations can clean up output locations in this method to ensure that no incorrect/incomplete results are left in the output location

Specified by:
cleanupOnFailure in interface StoreFuncInterface
Parameters:
location - Location returned by StoreFuncInterface.relToAbsPathForStoreLocation(String, Path)
job - The Job object - this should be used only to obtain cluster properties through JobContext.getConfiguration() and not to set/query any runtime job information.
Throws:
IOException

putNext

public void putNext(Tuple t)
             throws IOException
Description copied from interface: StoreFuncInterface
Write a tuple to the data store.

Specified by:
putNext in interface StoreFuncInterface
Parameters:
t - the tuple to store.
Throws:
IOException - if an exception occurs during the write


Copyright © 2007-2012 The Apache Software Foundation