Skip to content

2.6 The Java MapReduce Library

johndmulhausen edited this page Mar 31, 2015 · 3 revisions

This page describes the major classes in the MapReduce library.

Javadocs

The Javadocs for the MapReduce Library are available at the Maven.org Central Repository's entry for MapReduce.

The Input class

When you define a job, you specify an Input class that reads a particular type of input and returns records to the map stage. The MapReduce library contains several Input classes that handle data from different types of sources.

Each type of Input class is paired with a corresponding subclass of InputReader. The MapReduce library calls the Input class method createReaders(), which determines how many map shards are needed (the shard count) and creates an instance of InputReader for each shard. Each reader is initialized to process the non-overlapping subset of data from its shard. An Input class may explicitly set the shard count, or it can compute the count by analyzing the input data.

The available Input classes can be found in the MapReduce library's inputs package. Some of the most often-used classes are:

  • BlobstoreInput - Reads an object from your app’s blobstore, separating the input into records (byte arrays) that are delineated by a separator, which is a single byte. You declare the shard count, and the input is divided into equal-sized shards. A byte array is passed to the Mapper. (A similar class, GoogleCloudStorageLineInput shards files in Google Cloud Storage on separator boundaries.)
  • DatastoreInput - Uses a datastore query to select entities from your app's datastore for processing. The shard count is given by the user. An Entity object is passed to the Mapper.
  • ConsecutiveLongInput - Generates a specified number of consecutive long numbers with each shard using a different starting offset to prevent duplicates. A Long object is passed to the Mapper. This is useful in testing applications.
  • RandomLongInput - Random number generator, generates a specified number of random long values across the specified number of shards. This is useful in testing applications. By specifying the random seed, the job can be reproduced to help debugging. A Long object is passed to the Mapper.

Writing your own Input class

If the existing Input classes don't serve your purpose, you can write your own, along with an associated InputReader class, to read and parse other types of input.

Subclassing Input

The Input class must define the createReaders() method:

List<? extends InputReader<I>> createReaders() throws IOException;

This method determines how many map shards are needed and creates and initializes an instance of InputReader for each shard. The number of readers may be explicitly set by the user, or the method could analyze the input and determine how many readers are needed to partition the data efficiently.

Subclassing InputReader

The InputReader class implements the Serializable interface. You should be sure your implementation is serializable. The class must implement the next() method:

public I next() throws IOException, NoSuchElementException;

This method is similar to an Iterator interface. next() is called repeatedly. It returns a new record each time until the data source is exhausted, in which case it throws NoSuchElementException.

InputReader may also override these methods, see the class reference for more details:

public Double getProgress();
public void beginSlice() throws IOException;
public void endSlice() throws IOException;
public void beginShard() throws IOException;
public void endShard() throws IOException;
public long estimateMemoryRequirement();

To see an example of an Input implementation, take a look at the source code for ConsecutiveLongInput. This file defines the ConsecutiveLongInput class and its InputReader class, Reader, which is nested.

The Mapping classes

There are two different classes for defining mappers for the two types of jobs.

The Mapper class

The map stage of a MapReduce job must implement the Mapper class, which defines the map() method:

public abstract void map(I value);

The library calls map() once for each record produced by an InputReader. The method can call [Mapper.emit()](https://github.com/GoogleCloudPlatform/appengine-mapreduce/blob/master/java/src/main/java/com/google/appengine/tools/mapreduce/Mapper.html#emit(K, V)) to emit zero or more key-value pairs. The size of each key-value pair must be less than 1MB:

protected void emit(K key, V value);

The MapOnlyMapper class

The map stage of a Map job must implement the MapOnlyMapper class, which defines the map() method:

public abstract void map(I value);

The library calls map() once for each record produced by an InputReader. The method can call MapOnlyMapper.emit() to emit zero or more values. You must be sure that the size of the emitted value is valid for the job's output type:

protected void emit(O output);

Note: you may use an instance of the Mapper class in a Map job instead, provided its key type is Void. You can then pass the Mapper instance to the MapSpecification.Builder.setMapper() method.

A map() method should be simple and small. If you need to do a lot of work related to one input item, try re-thinking what the input should be. It is usually possible to formulate a job so the library does the hard work.

Both types of mappers handle input data in zero or more shards; each shard is subdivided into zero or more slices. A mapper may be serialized and deserialized before processing its first shard (or first slice in a shard), after processing its last shard (or last slice in a shard), or between any consecutive shards (or slices). To make it easy to initialize, save, and restore state, a mapper has these methods, which can be overridden:

public void beginShard();
public void endShard();
public void beginSlice();
public void endSlice();
public long estimateMemoryRequirement();

Since a Mapper may be serialized many times, avoid saving references to large objects or any data you can easily reconstruct in the beginSlice() methods. If you use IOC frameworks to inject members into your mapper, be sure the frameworks work with MapReduce serialization. If this is not done properly, when the class is serialized between slices its members may not be preserved.

The Reducer class

MapReduce jobs must provide an implementation of Reducer class, which defines the reduce() method:

public abstract void reduce(K key, ReducerInput<V> values);

The library calls reduce() once for each unique key. The values argument contains all the values produced by every Mapper instance for the same key. The method evaluates all the values and generates the final output for the key. Note that the type of the values (ReducerInput) is an Iterator. The reduce() method calls Reducer.emit() to emit the final output for the key.

protected void emit(K key, V value);

Like the Mapper class, Reducer handles its input in zero or more shards; each shard is subdivided into zero or more slices. A Reducer may be serialized and deserialized before processing its first shard (or first slice in a shard), after processing its last shard (or last slice in a shard), or between any consecutive shards (or slices). To make it easy to initialize, save, and restore state, Reducer has these methods, which can be overridden:

public void beginShard();
public void endShard();
public void beginSlice();
public void endSlice();
public long estimateMemoryRequirement();

As with map(), the reduce() method should also be simple and small. If you need to do a lot of work related to one output item, try re-thinking what the output should be. Let the library do the hard work.

Since a Reducer may be serialized many times, avoid saving references to large objects or any data you can easily reconstruct in the beginSlice() method. If you use IOC frameworks to inject members into your reducer, be sure the frameworks work with MapReduce serialization. If this is not done properly, when the class is serialized between slices its members may not be preserved.

The MapReduce library contains a few Reducer classes; they can be found in the MapReduce library's reducers package.

The Output class

When you define a job, you specify an Output class that takes the data emitted by the final stage of the job (the map stage of a Map job, or the reduce stage of a MapReduce job) and writes it to a specific type of destination.

The MapReduce library contains several Output classes that write to different types of output destinations. Each type of Output class is paired with a corresponding subclass of OutputWriter. The MapReduce library calls the Output class method createWriters(numShards) with the number of shards requested in the MapReduceSpecification.

The available Output classes can be found in the MapReduce library's outputs package. Some of the most often-used classes are:

  • GoogleCloudStorageFileOutput - Writes the data emitted by reduce to a GCS file. There is no delimiter inserted between records. This allows the reducer to define its own format.
  • BlobFileOutput - Produces a Blobstore file for each reducer shard. Each file name is generated using a given format string that includes a zero-based integer parameter that is replaced with the shard number. The user specifies the number of shards.
  • GoogleCloudStorageLevelDbOutput - Writes the data emitted by reduce to a GCS file. Each item is written as a single record in LevelDb format. This is useful for chaining jobs.
  • MarshallingOutput - This can be used in conjunction with either of the above. It allows the reducer to write any type of record for which a Marshaller can be provided, which greatly simplifies coding.
  • InMemoryOutput - Used to return an arbitrary Java object in memory. This is normally used for testing.
  • NoOutput - Useful for Map jobs that do not need to produce any output.
  • DatastoreOutput - Writes Entities to datastore.

Writing your own Output class

If the existing Output classes don't serve your purpose, you can write your own, along with its associated OutputWriter class.

Subclassing Output

The Output class must define these methods:

List<? extends OutputWriter<O>> createWriters(int numShards);
R finish(Collection<? extends OutputWriter<O>> writers) throws IOException;

createWriters(numShards) creates and initializes an instance of OutputWriter for each shard.

The finish() method returns the job's result (of type R), or null. In many cases it returns a pointer to the data written by the Output class.

Subclassing OutputWriter

The OutputWriter class must define the method write(), which writes a value to the output.

OutputWriter may also override these methods, see the class reference for more details:

public void beginSlice() throws IOException;
public void endSlice() throws IOException;
public void beginShard() throws IOException;
public void endShard() throws IOException;
public long estimateMemoryRequirement();
public boolean allowSliceRetry();

To see an example of an Output implementation, take a look at the source code for DatastoreOutput. This file defines the DatastoreOutput class and the DatastoreOutputWriter class, which is nested.

The Marshaller class

After mapping, each key-value goes into the shuffle stage and then into a Reducer. Between each stage the data is temporarily stored. This means the keys and values are serialized and deserialized by marshallers. You must provide the MapReduceSpecification with a marshaller for keys and for values separately, and you may use the same Marshaller for both.

The MapReduce library provides several marshallers for you to use. They are private classes. To use them you call a getXXXMarshaller() method in the Marshallers class:

  • StringMarshaller - Takes Java strings and encodes them using utf-8.
  • SerializationMarshaller - this will handle any Java object that implements the Serializable interface
  • LongMarshaller - an optimized marshaller for long values. Unlike the SerializationMarshaller that uses Java’s built in serialization methods, the LongMarshaller will produce a more compact representation (since it knows every entry is a long). This marshaller does not accept null values.
  • IntegerMarshaller - Just like the LongMarshaller except for integers.
  • ByteBufferMarshaller - a marshaller for ByteBuffers.

If the SerializationMarshaller is not efficient enough for your types you can write your own marshaller. It will need to implement the Marshaller interface which requires two methods:

public ByteBuffer toBytes(T object);
public T fromBytes(ByteBuffer b) throws IOException;

The Counter Class

You may find it convenient to use the Counter class. A counter is an integer variable that is aggregated across multiple shards. Counters can be used to do statistical calculations. You acquire a counter by calling getCounter() on the context from the map() or reduce() function. For example:

Counter c = getContext().getCounter("name");

This retrieves an existing counter or creates one if it doesn't already exist.

Size limits

The total size of all the instances of Mapper, InputReader, OutputWriter and Counters must be less than 1MB between slices. This is because these instances are serialized and saved to the datastore between slices.