Getting started

Writing and running a job

To create your own map reduce job, subclass MRJob, create a series of mappers and reducers, and override steps(). For example, a word counter:

from mrjob.job import MRJob

class MRWordCounter(MRJob):
    def get_words(self, key, line):
        for word in line.split():
            yield word, 1

    def sum_words(self, word, occurrences):
        yield word, sum(occurrences)

    def steps(self):
        return [self.mr(self.get_words, self.sum_words),]

if __name__ == '__main__':
    MRWordCounter.run()

The two lines at the bottom are mandatory. This is what allows your class to be run by Hadoop streaming.

This will take in a file with lines of whitespace separated words, and output a file with tab-separated lines like: "stars"\t5.

For one-step jobs, you can also just redefine mapper() and reducer():

from mrjob.job import MRJob

class MRWordCounter(MRJob):
    def mapper(self, key, line):
        for word in line.split():
            yield word, 1

    def reducer(self, word, occurrences):
        yield word, sum(occurrences)

if __name__ == '__main__':
    MRWordCounter.run()

Running locally

To test the job locally, just run:

python your_mr_job_sub_class.py < log_file_or_whatever > output

The script will automatically invoke itself to run the various steps, using LocalMRJobRunner.

You can also run individual steps:

# test 1st step mapper:
python your_mr_job_sub_class.py --mapper
# test 2nd step reducer (--step-num=1 because step numbers are 0-indexed):
python your_mr_job_sub_class.py --reducer --step-num=1

By default, we read from stdin, but you can also specify one or more input files. It automatically decompresses .gz and .bz2 files:

python your_mr_job_sub_class.py log_01.gz log_02.bz2 log_03

See mrjob.examples for more examples.

Running on EMR

  • Set up your Amazon Account (see SSH Tunneling and Log Fetching)

  • Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

  • Run your job with -r emr:

    python your_mr_job_sub_class.py -r emr < input > output

Running on your own Hadoop cluster

  • Set up a hadoop cluster (see http://hadoop.apache.org/common/docs/current/)

  • If running Python 2.5 on your cluster, install the simplejson module on all nodes. (Recommended but not required for Python 2.6+)

  • Make sure HADOOP_HOME is set

  • Run your job with -r hadoop:

    python your_mr_job_sub_class.py -r hadoop < input > output

Running from another script

Use make_runner() to run an MRJob from another Python script:

from __future__ import with_statement # only needed on Python 2.5

mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
    runner.run()
    for line in runner.stream_output():
        key, value = mr_job.parse_output_line(line)
        ... # do something with the parsed output

Common configuration tasks

Putting your source tree in the PYTHONPATH

If your job spans multiple files, you can create a tarball of your source tree and use python_archives to have it decompressed and added to the PYTHONPATH:

runners:
  emr:  # this will work for any runner
    python_archives:
    - my-src-tree.tar.gz

It will probably be convenient to have the tarball generated by your build process.

Increasing the task timeout

Warning

Some EMR AMIs appear to not support setting parameters like timeout with jobconf at run time. Instead, you must use Bootstrap-time configuration.

If your mappers or reducers take a long time to process a single step, you may want to increase the amount of time Hadoop lets them run before failing them as timeouts. You can do this with jobconf and the version-appropriate Hadoop environment variable. For example, this configuration will set the timeout to one hour:

runners:
    hadoop: # this will work for both hadoop and emr
        jobconf:
            # Hadoop 0.18
            mapred.task.timeout: 3600000
            # Hadoop 0.21+
            mapreduce.task.timeout: 3600000

mrjob will convert your jobconf options between Hadoop versions if necessary. In this example, either jobconf line could be removed and the timeout would still be changed when using either version of Hadoop.

Writing compressed output

To save space, you can have Hadoop automatically save your job’s output as compressed files. This can be done using the same method as changing the task timeout, with jobconf and the appropriate environment variables. This example uses the Hadoop 0.21+ version:

runners:
    hadoop: # this will work for both hadoop and emr
        jobconf:
           # "true" must be a string argument, not a boolean! (#323)
           mapreduce.output.compress: "true"
           mapreduce.output.compression.codec: org.apache.hadoop.io.compress.GzipCodec

Common EMR configuration tasks

Custom Python packages

There are a couple of ways to install Python packages that are not in the standard library. If there is a Debian package, you can add a call to apt-get as a bootstrap_cmd:

runners:
  emr:
    bootstrap_cmds:
    - sudo apt-get install -y python-simplejson

If there is no Debian package or you prefer to use your own tarballs for some other reason, you can specify tarballs in bootstrap_python_packages, which supports glob syntax:

runners:
  emr:
    bootstrap_python_packages:
    - $MY_SOURCE_TREE/emr_packages/*.tar.gz

Bootstrap-time configuration

Some Hadoop options, such as the maximum number of running map tasks per node, must be set at bootstrap time and will not work with –jobconf. You must use Amazon’s configure-hadoop script for this. For example, this limits the number of mappers and reducers to one per node:

--bootstrap-action="s3://elasticmapreduce/bootstrap-actions/configure-hadoop \
-m mapred.tasktracker.map.tasks.maximum=1 \
-m mapred.tasktracker.reduce.tasks.maximum=1"

Setting up Ganglia

Ganglia <http://www.ganglia.info>`_ is a scalable distributed monitoring system for high-performance computing systems. You can enable it for your EMR cluster with Amazon’s install-ganglia bootstrap action:

--bootstrap-action="s3://elasticmapreduce/bootstrap-actions/install-ganglia

Enabling Python core dumps

Particularly bad errors may leave no traceback in the logs. To enable core dumps on your EMR instances, put this script in core_dump_bootstrap.sh:

#!/bin/sh

chk_root () {
    if [ ! $( id -u ) -eq 0 ]; then
        exec sudo sh ${0}
        exit ${?}
    fi
}

chk_root

mkdir /tmp/cores
chmod -R 1777 /tmp/cores
echo "\n* soft core unlimited" >> /etc/security/limits.conf
echo "ulimit -c unlimited" >> /etc/profile
echo "/tmp/cores/core.%e.%p.%h.%t" > /proc/sys/kernel/core_pattern

Use the script as a bootstrap action in your job:

--bootstrap-action=core_dump_setup.sh

You’ll probably want to use a version of Python with debugging symbols, so install it and use it as python_bin:

--bootstrap-cmd="sudo apt-get install -y python2.6-dbg" \
--python-bin=python2.6-dbg

Run your job in a persistent job flow. When it fails, you can SSH to your nodes to inspect the core dump files:

you@local: emr --ssh j-MYJOBFLOWID

hadoop@ip-10-160-75-214:~$ gdb `which python` /tmp/cores/core.python.blah

If you have multiple nodes, you may have to scp your identity file to the master node and use it to SSH to the slave nodes, where the core dumps are located:

hadoop@ip-10-160-75-214:~$ hadoop dfsadmin -report | grep ^Name
Name: 10.166.50.85:9200
Name: 10.177.63.114:9200

hadoop@ip-10-160-75-214:~$ ssh -i uploaded_key.pem 10.166.50.85

hadoop@ip-10-166-50-85:~$ gdb `which python2.6-dbg` /tmp/cores/core.python.blah

Example configuration file

# This is basically the config file we use in production at Yelp, with some
# strategic edits. ;)
#
# If you don't have the yaml module installed, you'll have to use JSON instead,
# which would look something like this:
#
# {"runners": {
#    "emr": {
#      "aws_access_key_id": "HADOOPHADOOPBOBADOOP",
#      "aws_region": "us-west-1",
#      "aws_secret_access_key": "MEMIMOMADOOPBANANAFANAFOFADOOPHADOOP",
#      "base_tmp_dir": "/scratch/$USER"
#      "bootstrap_python_packages": [
#        "$BT/aws/python-packages/*.tar.gz"
#      ],
#      ...
#
runners:
  emr:
    aws_access_key_id: HADOOPHADOOPBOBADOOP
    # We run on in the west region because we're located on the west coast,
    # and there are no eventual consistency issues with newly created S3 keys.
    aws_region: us-west-1
    aws_secret_access_key: MEMIMOMADOOPBANANAFANAFOFADOOPHADOOP
    # alternate tmp dir
    base_tmp_dir: /scratch/$USER
    # $BT is the path to our source tree. This lets us add modules to
    # install on EMR by simply dumping them in this dir.
    bootstrap_python_packages:
    - $BT/aws/python-packages/*.tar.gz
    # specifying an ssh key pair allows us to ssh tunnel to the job tracker
    # and fetch logs via ssh
    ec2_key_pair: EMR
    ec2_key_pair_file: $BT/config/EMR.pem
    # use beefier instances in production
    ec2_instance_type: c1.xlarge
    # but only use one unless overridden
    num_ec2_instances: 1
    # use our local time zone (this is important for deciding when
    # days start and end, for instance)
    cmdenv:
      TZ: America/Los_Angeles
    # we create the src-tree.tar.gz tarball with a Makefile. It only contains
    # a subset of our code
    python_archives: &python_archives
    - $BT/aws/src-tree.tar.gz
    # our bucket also lives in the us-west region
    s3_log_uri: s3://walrus/tmp/logs/
    s3_scratch_uri: s3://walrus/tmp/
    setup_cmds: &setup_cmds
    # these files are different between dev and production, so they're
    # uploaded separately. copying them into place isn't safe because
    # src-tree.tar.gz is actually shared between several mappers/reducers.
    # Another safe approach would be to add a rule to Makefile.emr that
    # copies these files if they haven't already been copied (setup_cmds
    # from two mappers/reducers won't run simultaneously on the same machine)
    - ln -sf $(readlink -f config.py) src-tree.tar.gz/config/config.py
    - ln -sf $(readlink -f secret.py) src-tree.tar.gz/config/secret.py
    # run Makefile.emr to compile C code (EMR has a different architecture,
    # so we can't just upload the .so files)
    - cd src-tree.tar.gz; make -f Makefile.emr
    # generally, we run jobs on a Linux server separate from our desktop
    # machine. So the SSH tunnel needs to be open so a browser on our
    # desktop machine can connect to it.
    ssh_tunnel_is_open: true
    ssh_tunnel_to_job_tracker: true
    # upload these particular files on the fly because they're different
    # between development and production
    upload_files: &upload_files
    - $BT/config/config.py
    - $BT/config/secret.py
  hadoop:
    # Note the use of YAML references to re-use parts of the EMR config.
    # We don't currently run our own hadoop cluster, so this section is
    # pretty boring.
    base_tmp_dir: /scratch/$USER
    python_archives: *python_archives
    setup_cmds: *setup_cmds
    upload_files: *upload_files
  local:
    # We don't have gcc installed in production, so if we have to run an
    # MRJob in local mode in production, don't run the Makefile
    # and whatnot; just fall back on the original copy of the code.
    base_tmp_dir: /scratch/$USER