mrjob.runner - base class for all runners

class mrjob.runner.MRJobRunner(mr_job_script=None, conf_path=None, extra_args=None, file_upload_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, stdin=None, **opts)

Abstract base class for all runners.

Runners are responsible for launching your job on Hadoop Streaming and fetching the results.

Most of the time, you won’t have any reason to construct a runner directly; it’s more like a utility that allows an MRJob to run itself. Normally things work something like this:

  • Get a runner by calling make_runner() on your job
  • Call run() on your runner. This will:
    • Run your job with --steps to find out how many mappers/reducers to run
    • Copy your job and supporting files to Hadoop
    • Instruct Hadoop to run your job with the appropriate --mapper, --combiner, --reducer, and --step-num arguments

Each runner runs a single job once; if you want to run a job multiple times, make multiple runners.

Subclasses: EMRJobRunner, HadoopJobRunner, InlineJobRunner, LocalMRJobRunner

Runners’ constructors take a bewildering array of keyword arguments; we’ll get to that in Configuration options

Running your job

MRJobRunner.run()

Run the job, and block until it finishes.

Raise an exception if there are any problems.

MRJobRunner.stream_output()

Stream raw lines from the job’s output. You can parse these using the read() method of the appropriate HadoopStreamingProtocol class.

MRJobRunner.cleanup(mode=None)

Clean up running jobs, scratch dirs, and logs, subject to the cleanup option passed to the constructor.

If you create your runner in a with block, cleanup() will be called automatically:

with mr_job.make_runner() as runner:
    ...

# cleanup() called automatically here
Parameters:mode – override cleanup passed into the constructor. Should be a list of strings from CLEANUP_CHOICES
mrjob.runner.CLEANUP_CHOICES = ['ALL', 'LOCAL_SCRATCH', 'LOGS', 'NONE', 'REMOTE_SCRATCH', 'SCRATCH', 'IF_SUCCESSFUL']

cleanup options:

  • 'ALL': delete local scratch, remote scratch, and logs
  • 'LOCAL_SCRATCH': delete local scratch only
  • 'LOGS': delete logs only
  • 'NONE': delete nothing
  • 'REMOTE_SCRATCH': delete remote scratch only
  • 'SCRATCH': delete local and remote scratch, but not logs
  • 'IF_SUCCESSFUL' (deprecated): same as ALL. Not supported for cleanup_on_failure.
mrjob.runner.CLEANUP_DEFAULT = 'IF_SUCCESSFUL'

Deprecated since version 0.3.0.

the default cleanup-on-success option: 'IF_SUCCESSFUL'

Run Information

MRJobRunner.counters()

Get counters associated with this run in this form:

[{'group name': {'counter1': 1, 'counter2': 2}},
 {'group name': ...}]

The list contains an entry for every step of the current job, ignoring earlier steps in the same job flow.

MRJobRunner.get_hadoop_version()

Return the version number of the Hadoop environment as a string if Hadoop is being used or simulated. Return None if not applicable.

EMRJobRunner infers this from the job flow. HadoopJobRunner gets this from hadoop version. LocalMRJobRunner has an additional hadoop_version option to specify which version it simulates, with a default of 0.20. InlineMRJobRunner does not simulate Hadoop at all.

File management

Some simple filesystem operations that work on both the local filesystem and HDFS (when running HadoopJobRunner) or S3 (when running EMRJobRunner).

Use hdfs:// and s3:// URIs to refer to remote files.

We don’t currently support mv() and cp() because S3 doesn’t really have directories, so the semantics get a little weird.

MRJobRunner.get_output_dir()

Find the directory containing the job output. If the job hasn’t run yet, returns None

MRJobRunner.du(path_glob)

Get the total size of files matching path_glob

Corresponds roughly to: hadoop fs -dus path_glob

MRJobRunner.ls(path_glob)

Recursively list all files in the given path.

We don’t return directories for compatibility with S3 (which has no concept of them)

Corresponds roughly to: hadoop fs -lsr path_glob

MRJobRunner.cat(path)

cat output from a given path. This would automatically decompress .gz and .bz2 files.

Corresponds roughly to: hadoop fs -cat path

MRJobRunner.mkdir(path)

Create the given dir and its subdirs (if they don’t already exist).

Corresponds roughly to: hadoop fs -mkdir path

MRJobRunner.path_exists(path_glob)

Does the given path exist?

Corresponds roughly to: hadoop fs -test -e path_glob

MRJobRunner.path_join(dirname, filename)

Join a directory name and filename.

MRJobRunner.rm(path_glob)

Recursively delete the given file/directory, if it exists

Corresponds roughly to: hadoop fs -rmr path_glob

MRJobRunner.touchz(path)

Make an empty file in the given location. Raises an error if a non-zero length file already exists in that location.

Correponds to: hadoop fs -touchz path

Table Of Contents

Previous topic

Runners - launching your job

Next topic

mrjob.local - run locally for testing

This Page