Skip to content

3.1 Creating and Running a Job

markgoldstein edited this page Oct 24, 2014 · 9 revisions

This document presents an overview of the MapReduce Python API. It consists of the following sections:

Including the MapReduce library

To enable MapReduce framework in the app add the following include to the app.yaml configuration file:

includes:
- mapreduce/include.yaml

Instantiating a MapReduce pipeline

In your code, you instantiate a MapReducePipeline object inside the run method of a PipelineBase object as follows:

class WordCountPipeline(base_handler.PipelineBase):

    def run(self, filekey, blobkey):
        logging.debug("filename is %s" % filekey)
        output = yield mapreduce_pipeline.MapreducePipeline(
            "word_count",
            "main.word_count_map",
            "main.word_count_reduce",
            "mapreduce.input_readers.BlobstoreZipInputReader",
            "mapreduce.output_writers.FileOutputWriter",
            mapper_params={
                "input_reader": {
                    "blob_key": blobkey,
                },
            },
            reducer_params={
                "output_writer": {
                    "mime_type": "text/plain",
                    "output_sharding": "input",
                    "filesystem": "blobstore",
                },
            },
            shards=16)
        yield StoreOutput("WordCount", filekey, output)

The following arguments are supplied to the MapReducePipeline object's run method:

  • The name of the MapReduce job, for display in the user interface and in any logs
  • The mapper function to use
  • The reducer function to use
  • The input reader to use to supply the mapper function with data
  • The output writer for the reducer function to use
  • The parameters (if any) to supply to the input reader
  • The parameters (if any) to supply to the output writer
  • The number of shards (workers) to use for the MapReduce job

You must write your own mapper and reducer functions. (The shuffler feature is built in and you don't invoke it explicitly.) You can use the standard data input readers and output writers (BlobstoreZipInputReader and FileOutputWriter in the example).

Starting a MapReduce job

To start a MapReduce job using the MapReducePipeline object, you invoke the Pipeline base class's start method on it, as shown below:

def post(self):
    filekey = self.request.get("filekey")
    blob_key = self.request.get("blobkey")

    if self.request.get("word_count"):
        pipeline = WordCountPipeline(filekey, blob_key)
        pipeline.start()

Showing the MapReduce status monitor

If you wish, you can display a status monitor for your MapReduce jobs, as follows:

def post(self):
    filekey = self.request.get("filekey")
    blob_key = self.request.get("blobkey")

    if self.request.get("word_count"):
        pipeline = WordCountPipeline(filekey, blob_key)
        pipeline.start()

        redirect_url = "%s/status?root=%s" % (pipeline.base_path,
                                               pipeline.pipeline_id)
        self.redirect(redirect_url)

Determining when a MapReducePipeline job is complete

To find out whether your MapReduce job is complete, you need to save the pipeline ID when you start the MapReduce job, as shown in the following MapReduce pipeline code:

class StartMapreduce(webapp2.RequestHandler):
    def get(self):
        pipeline = mapreduce_pipeline.MapreducePipeline(arguments)
        pipeline.start()
        self.redirect("/wait?pipeline=" + pipeline.pipeline_id)

Notice the redirect above where the pipeline ID is saved.

Then in the handler where you want to do some work when the MapReduce job is complete, you get the MapReduce pipeline using the saved pipeline ID, and you check it to determine whether it is done.

class WaitHandler(webapp2.RequestHandler):
    def get(self):
        pipeline_id = self.request.get("pipeline")
        pipeline = mapreduce_pipeline.MapreducePipeline.from_id(pipeline_id)
        if pipeline.has_finalized:
            # MapreducePipeline has completed
            pass
        else:
            # MapreducePipeline is still running
            pass

As shown above, the MapReducePipeline has_finalized method is used to check for a completed job.