MRJob - Base class for all jobs

Basic

class mrjob.job.MRJob(args=None)

The base class for all MapReduce jobs. See __init__() for details.

Writing one-step jobs

MRJob.mapper(key, value)

Re-define this to define the mapper for a one-step job.

Yields zero or more tuples of (out_key, out_value).

Parameters:
  • key – A value parsed from input.
  • value – A value parsed from input.

If you don’t re-define this, your job will have a mapper that simply yields (key, value) as-is.

By default (if you don’t mess with Protocols):
  • key will be None
  • value will be the raw input line, with newline stripped.
  • out_key and out_value must be JSON-encodable: numeric, unicode, boolean, None, list, or dict whose keys are unicodes.
MRJob.reducer(key, values)

Re-define this to define the reducer for a one-step job.

Yields one or more tuples of (out_key, out_value)

Parameters:
  • key – A key which was yielded by the mapper
  • value – A generator which yields all values yielded by the mapper which correspond to key.
By default (if you don’t mess with Protocols):
  • out_key and out_value must be JSON-encodable.
  • key and value will have been decoded from JSON (so tuples will become lists).
MRJob.combiner(key, values)

Re-define this to define the combiner for a one-step job.

Yields one or more tuples of (out_key, out_value)

Parameters:
  • key – A key which was yielded by the mapper
  • value – A generator which yields all values yielded by one mapper task/node which correspond to key.
By default (if you don’t mess with Protocols):
  • out_key and out_value must be JSON-encodable.
  • key and value will have been decoded from JSON (so tuples will become lists).
MRJob.mapper_init()

Re-define this to define an action to run before the mapper processes any input.

One use for this function is to initialize mapper-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.mapper_final()

Re-define this to define an action to run after the mapper reaches the end of input.

One way to use this is to store a total in an instance variable, and output it after reading all input data. See mrjob.examples for an example.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.reducer_init()

Re-define this to define an action to run before the reducer processes any input.

One use for this function is to initialize reducer-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.reducer_final()

Re-define this to define an action to run after the reducer reaches the end of input.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.combiner_init()

Re-define this to define an action to run before the combiner processes any input.

One use for this function is to initialize combiner-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.combiner_final()

Re-define this to define an action to run after the combiner reaches the end of input.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

Writing multi-step jobs

MRJob.steps()

Re-define this to make a multi-step job.

If you don’t re-define this, we’ll automatically create a one-step job using any of mapper(), mapper_init(), mapper_final(), reducer_init(), reducer_final(), and reducer() that you’ve re-defined. For example:

def steps(self):
    return [self.mr(mapper=self.transform_input,
                    reducer=self.consolidate_1),
            self.mr(reducer_init=self.log_mapper_init,
                    reducer=self.consolidate_2)]
Returns:a list of steps constructed with mr()
classmethod MRJob.mr(mapper=None, reducer=None, _mapper_final=None, **kwargs)

Define a step (mapper, reducer, and/or any combination of mapper_init, reducer_final, etc.) for your job.

Used by steps(). (Don’t re-define this, just call it!)

Accepts the following keyword arguments. For convenience, you may specify mapper and reducer as positional arguments as well.

Parameters:
  • mapper – function with same function signature as mapper(), or None for an identity mapper.
  • reducer – function with same function signature as reducer(), or None for no reducer.
  • combiner – function with same function signature as combiner(), or None for no combiner.
  • mapper_init – function with same function signature as mapper_init(), or None for no initial mapper action.
  • mapper_final – function with same function signature as mapper_final(), or None for no final mapper action.
  • reducer_init – function with same function signature as reducer_init(), or None for no initial reducer action.
  • reducer_final – function with same function signature as reducer_final(), or None for no final reducer action.
  • combiner_init – function with same function signature as combiner_init(), or None for no initial combiner action.
  • combiner_final – function with same function signature as combiner_final(), or None for no final combiner action.

Please consider the way we represent steps to be opaque, and expect it to change in future versions of mrjob.

Running the job

classmethod MRJob.run()

Entry point for running job from the command-line.

This is also the entry point when a mapper or reducer is run by Hadoop Streaming.

Does one of:

MRJob.__init__(args=None)

Entry point for running your job from other Python code.

You can pass in command-line arguments, and the job will act the same way it would if it were run from the command line. For example, to run your job on EMR:

mr_job = MRYourJob(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
    ...

Passing in None is the same as passing in [] (if you want to parse args from sys.argv, call MRJob.run()).

For a full list of command-line arguments, run: python -m mrjob.job --help

MRJob.make_runner()

Make a runner based on command-line arguments, so we can launch this job on EMR, on Hadoop, or locally.

Return type:mrjob.runner.MRJobRunner

Parsing the output

MRJob.parse_output_line(line)

Parse a line from the final output of this MRJob into (key, value). Used extensively in tests like this:

runner.run()
for line in runner.stream_output():
    key, value = mr_job.parse_output_line(line)

Counters and status messages

MRJob.increment_counter(group, counter, amount=1)

Increment a counter in Hadoop streaming by printing to stderr.

Parameters:
  • group (str) – counter group
  • counter (str) – description of the counter
  • amount (int) – how much to increment the counter by

Commas in counter or group will be automatically replaced with semicolons (commas confuse Hadoop streaming).

MRJob.set_status(msg)

Set the job status in hadoop streaming by printing to stderr.

This is also a good way of doing a keepalive for a job that goes a long time between outputs; Hadoop streaming usually times out jobs that give no output for longer than 10 minutes.

Advanced

Protocols

Protocols convert data between string representations for Hadoop Streaming and Python data structures for input and output by steps. Using the default protocols, the first step will get raw strings as input and should return JSON-encodable Python objects. The last step should accept JSON-decoded Python objects and return JSON-encodable Python objects, which will appear in the final output as JSON.

There are three ways to specify which protocol you want to use for a given step. The simplest way is to assign a class imported from mrjob.protocols to one of the class variables MRJob.INPUT_PROTOCOL, MRJob.INTERNAL_PROTOCOL, or MRJob.OUTPUT_PROTOCOL on your job. The input protocol is the format of your input data, the internal protocol is the format of data passed between steps, and the output protocol is what is produced at the end of the job. These classes are instantiated when your MRJob is instantiated.

For example, this class accepts raw strings, outputs JSON, and uses pickle to communicate between steps:

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol, PickleProtocol, RawValueProtocol

class MRProtocolJob1(MRJob)
    INPUT_PROTOCOL = RawValueProtocol
    INTERNAL_PROTOCOL = PickleProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol

The second way is to override MRJob.input_protocol(), MRJob.internal_protocol(), or MRJob.output_protocol(). These methods should return instances of protocols. For example, if you want your job to operate on either JSON or raw strings depending on a command line option, you can do this:

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol, RawValueProtocol

class MRProtocolJob2(MRJob):

    def configure_options(self):
        # set up 'data_format' option

    def input_protocol(self):
        if self.options.data_format == 'json':
            return JSONValueProtocol()
        elif self.options.data_format == 'raw':
            return RawValueProtocol()
        else:
            raise ValueError("Data format must be 'json' or 'raw'")

If you need behavior even more complex than that, including using different protocols to communicate between different steps (like JSON between steps 1 and 2, but pickle between steps 2 and 3), you can override MRJob.pick_protocols() to return different protocols based on the step number and step type of the current process.

See mrjob.protocol - input and output for more information about protocols, including how to implement your own.

MRJob.INPUT_PROTOCOL = <class 'mrjob.protocol.RawValueProtocol'>

Protocol for reading input to the first mapper in your job. Default: RawValueProtocol.

For example you know your input data were in JSON format, you could set:

INPUT_PROTOCOL = JsonValueProtocol

in your class, and your initial mapper would receive decoded JSONs rather than strings.

See mrjob.protocol for the full list of protocols.

MRJob.INTERNAL_PROTOCOL = <class 'mrjob.protocol.JSONProtocol'>

Protocol for communication between steps and final output. Default: JSONProtocol.

For example if your step output weren’t JSON-encodable, you could set:

INTERNAL_PROTOCOL = PickleProtocol

and step output would be encoded as string-escaped pickles.

See mrjob.protocol for the full list of protocols.

MRJob.OUTPUT_PROTOCOL = <class 'mrjob.protocol.JSONProtocol'>

Protocol to use for writing output. Default: JSONProtocol.

For example, if you wanted the final output in repr, you could set:

OUTPUT_PROTOCOL = ReprProtocol

See mrjob.protocol for the full list of protocols.

MRJob.DEFAULT_INPUT_PROTOCOL = 'raw_value'

Deprecated since version 0.3.0.

Default protocol for reading input to the first mapper in your job specified by a string.

Overridden by any changes to INPUT_PROTOCOL.

See mrjob.protocol.PROTOCOL_DICT for the full list of protocol strings. Can be overridden by --input-protocol.

MRJob.DEFAULT_PROTOCOL = 'json'

Deprecated since version 0.3.0.

Default protocol for communication between steps and final output specified by a string.

Overridden by any changes to INTERNAL_PROTOCOL.

See mrjob.protocol.PROTOCOL_DICT for the full list of protocol strings. Can be overridden by --protocol.

MRJob.DEFAULT_OUTPUT_PROTOCOL = None

Deprecated since version 0.3.0.

Overridden by any changes to OUTPUT_PROTOCOL. If OUTPUT_PROTOCOL is not set, defaults to DEFAULT_PROTOCOL.

See mrjob.protocol.PROTOCOL_DICT for the full list of protocol strings. Can be overridden by the --output-protocol.

MRJob.input_protocol()

Instance of the protocol to use to convert input lines to Python objects. Default behavior is to return an instance of INPUT_PROTOCOL.

MRJob.internal_protocol()

Instance of the protocol to use to communicate between steps. Default behavior is to return an instance of INTERNAL_PROTOCOL.

MRJob.output_protocol()

Instance of the protocol to use to convert Python objects to output lines. Default behavior is to return an instance of OUTPUT_PROTOCOL.

classmethod MRJob.protocols()

Deprecated in favor of INPUT_PROTOCOL, OUTPUT_PROTOCOL, and INTERNAL_PROTOCOL.

Mapping from protocol name to the protocol class to use for parsing job input and writing job output. We give protocols names so that we can easily choose them from the command line.

This returns mrjob.protocol.PROTOCOL_DICT by default.

To add a custom protocol, define a subclass of mrjob.protocol.HadoopStreamingProtocol, and re-define this method:

@classmethod
def protocols(cls):
    protocol_dict = super(MRYourJob, cls).protocols()
    protocol_dict['rot13'] = Rot13Protocol
    return protocol_dict

DEFAULT_PROTOCOL = 'rot13'
MRJob.pick_protocols(step_num, step_type)

Pick the protocol classes to use for reading and writing for the given step.

Parameters:
  • step_num (int) – which step to run (e.g. 0 for the first step)
  • step_type (str) – 'M' for mapper, 'C' for combiner, 'R' for reducer
Returns:

(read_function, write_function)

By default, we use one protocol for reading input, one internal protocol for communication between steps, and one protocol for final output (which is usually the same as the internal protocol). Protocols can be controlled by setting INPUT_PROTOCOL, INTERNAL_PROTOCOL, and OUTPUT_PROTOCOL.

Re-define this if you need fine control over which protocols are used by which steps.

Custom command-line options

See Quick Reference for a complete list of all configuration options.

MRJob.configure_options()

Define arguments for this script. Called from __init__().

Run python -m mrjob.job.MRJob --help to see all options.

Re-define to define custom command-line arguments:

def configure_options(self):
    super(MRYourJob, self).configure_options

    self.add_passthrough_option(...)
    self.add_file_option(...)
    ...
MRJob.add_passthrough_option(*args, **kwargs)

Function to create options which both the job runner and the job itself respect (we use this for protocols, for example).

Use it like you would use optparse.OptionParser.add_option():

def configure_options(self):
    super(MRYourJob, self).configure_options()
    self.add_passthrough_option(
        '--max-ngram-size', type='int', default=4, help='...')

Specify an opt_group keyword argument to add the option to that OptionGroup rather than the top-level OptionParser.

If you want to pass files through to the mapper/reducer, use add_file_option() instead.

MRJob.add_file_option(*args, **kwargs)

Add a command-line option that sends an external file (e.g. a SQLite DB) to Hadoop:

def configure_options(self):
   super(MRYourJob, self).configure_options()
   self.add_file_option('--scoring-db', help=...)

This does the right thing: the file will be uploaded to the working dir of the script on Hadoop, and the script will be passed the same option, but with the local name of the file in the script’s working directory.

We suggest against sending Berkeley DBs to your job, as Berkeley DB is not forwards-compatible (so a Berkeley DB that you construct on your computer may not be readable from within Hadoop). Use SQLite databases instead. If all you need is an on-disk hash table, try out the sqlite3dbm module.

MRJob.load_options(args)

Load command-line options into self.options.

Called from __init__() after configure_options().

Parameters:args (list of str) – a list of command line arguments. None will be treated the same as [].

Re-define if you want to post-process command-line arguments:

def load_options(self, args):
    super(MRYourJob, self).load_options(args)

    self.stop_words = self.options.stop_words.split(',')
    ...
MRJob.is_mapper_or_reducer()

True if this is a mapper/reducer.

This is mostly useful inside load_options(), to disable loading options when we aren’t running inside Hadoop Streaming.

Custom command-line types and actions

MRJob.OPTION_CLASS = <class optparse.Option at 0x1013b3050>

optparse.Option subclass to use with the optparse.OptionParser instance.

The optparse module allows the addition of new actions and types. See the optparse docs for instructions on defining custom options. The only difference is that instead of passing option_class to the OptionParser instance yourself, you must set the MRJob.OPTION_CLASS attribute.

Passthrough arguments have the additional caveat that mrjob uses some lesser magic to reproduce the argument values for the command lines of subprocesses. In practice you shouldn’t encounter any problems here even with relatively exotic option behavior, but be aware that your options will be processed twice, with the second round using a copy of your default values produced by copy.deepcopy().

Job runner configuration

MRJob.job_runner_kwargs()

Keyword arguments used to create runners when make_runner() is called.

Returns:map from arg name to value

Re-define this if you want finer control of runner initialization.

You might find mrjob.conf.combine_dicts() useful if you want to add or change lots of keyword arguments.

MRJob.local_job_runner_kwargs()

Keyword arguments to create create runners when make_runner() is called, when we run a job locally (-r local).

Returns:map from arg name to value

Re-define this if you want finer control when running jobs locally.

MRJob.emr_job_runner_kwargs()

Keyword arguments to create create runners when make_runner() is called, when we run a job on EMR (-r emr).

Returns:map from arg name to value

Re-define this if you want finer control when running jobs on EMR.

MRJob.hadoop_job_runner_kwargs()

Keyword arguments to create create runners when make_runner() is called, when we run a job on EMR (-r hadoop).

Returns:map from arg name to value

Re-define this if you want finer control when running jobs on hadoop.

MRJob.generate_passthrough_arguments()

Returns a list of arguments to pass to subprocesses, either on hadoop or executed via subprocess.

These are passed to mrjob.runner.MRJobRunner.__init__() as extra_args.

MRJob.generate_file_upload_args()

Figure out file upload args to pass through to the job runner.

Instead of generating a list of args, we’re generating a list of tuples of ('--argname', path)

These are passed to mrjob.runner.MRJobRunner.__init__() as file_upload_args.

classmethod MRJob.mr_job_script()

Path of this script. This returns the file containing this class.

Running specific parts of jobs

MRJob.run_job()

Run the all steps of the job, logging errors (and debugging output if --verbose is specified) to STDERR and streaming the output to STDOUT.

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_mapper(step_num=0)

Run the mapper and final mapper action for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_reducer(step_num=0)

Run the reducer for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_combiner(step_num=0)

Run the combiner for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.show_steps()

Print information about how many steps there are, and whether they contain a mapper or reducer. Job runners (see Runners - launching your job) use this to determine how Hadoop should call this script.

Called from run(). You’d probably only want to call this directly from automated tests.

We currently output something like MR M R, but expect this to change!

Hadoop Configuration

MRJob.HADOOP_INPUT_FORMAT = None

Optional name of an optional Hadoop InputFormat class, e.g. 'org.apache.hadoop.mapred.lib.NLineInputFormat'.

Passed to Hadoop with the first step of this job with the -inputformat option.

MRJob.hadoop_input_format()

Optional Hadoop InputFormat class to parse input for the first step of the job.

Normally, setting HADOOP_INPUT_FORMAT is sufficient; redefining this method is only for when you want to get fancy.

MRJob.HADOOP_OUTPUT_FORMAT = None

Optional name of an optional Hadoop OutputFormat class, e.g. 'org.apache.hadoop.mapred.FileOutputFormat'.

Passed to Hadoop with the last step of this job with the -outputformat option.

MRJob.hadoop_output_format()

Optional Hadoop OutputFormat class to write output for the last step of the job.

Normally, setting HADOOP_OUTPUT_FORMAT is sufficient; redefining this method is only for when you want to get fancy.

MRJob.JOBCONF = {}

Optional jobconf arguments we should always pass to Hadoop. This is a map from property name to value. e.g.:

{'stream.num.map.output.key.fields': '4'}

It’s recommended that you only use this to hard-code things that affect the semantics of your job, and leave performance tweaks to the command line or whatever you use to launch your job.

MRJob.jobconf()

-jobconf args to pass to hadoop streaming. This should be a map from property name to value.

By default, this combines jobconf options from the command lines with JOBCONF, with command line arguments taking precedence.

If you want to re-define this, it’s strongly recommended that do something like this, so as not to inadvertently disable jobconf:

def jobconf(self):
    orig_jobconf = super(MyMRJobClass, self).jobconf()
    custom_jobconf = ...

    return mrjob.conf.combine_dicts(orig_jobconf, custom_jobconf)
MRJob.PARTITIONER = None

Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers. For example: 'org.apache.hadoop.mapred.lib.HashPartitioner'.

MRJob.partitioner()

Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers.

By default, returns whatever is passed to --partitioner, of if that option isn’t used, PARTITIONER.

You probably don’t need to re-define this; it’s just here for completeness.

Hooks for testing

MRJob.sandbox(stdin=None, stdout=None, stderr=None)

Redirect stdin, stdout, and stderr for automated testing.

You can set stdin, stdout, and stderr to file objects. By default, they’ll be set to empty StringIO objects. You can then access the job’s file handles through self.stdin, self.stdout, and self.stderr. See Testing with mrjob for more information about testing.

You may call sandbox multiple times (this will essentially clear the file handles).

stdin is empty by default. You can set it to anything that yields lines:

mr_job.sandbox(stdin=StringIO('some_data\n'))

or, equivalently:

mr_job.sandbox(stdin=['some_data\n'])

For convenience, this sandbox() returns self, so you can do:

mr_job = MRJobClassToTest().sandbox()

Simple testing example:

mr_job = MRYourJob.sandbox()
assert_equal(list(mr_job.reducer('foo', ['bar', 'baz'])), [...])

More complex testing example:

from StringIO import StringIO

mr_job = MRYourJob(args=[...])

fake_input = '"foo"\t"bar"\n"foo"\t"baz"\n'
mr_job.sandbox(stdin=StringIO(fake_input))

mr_job.run_reducer(link_num=0)
assert_equal(mr_job.parse_output(), ...)
assert_equal(mr_job.parse_counters(), ...)
MRJob.parse_output(protocol=None)

Convenience method for parsing output from any mapper or reducer, all at once.

This helps you test individual mappers and reducers by calling run_mapper() or run_reducer(). For example:

mr_job.sandbox(stdin=your_input)
mr_job.run_mapper(step_num=0)
output = mrjob.parse_output()
Parameters:protocol (str) – A protocol instance to use (e.g. JSONProtocol()), Also accepts protocol names (e.g. 'json'), but this is deprecated.

This only works in sandbox mode. This does not clear self.stdout.

MRJob.parse_counters(counters=None)

Convenience method for reading counters. This only works in sandbox mode. This does not clear self.stderr.

Returns:a map from counter group to counter name to amount.

To read everything from self.stderr (including status messages) use mrjob.parse.parse_mr_job_stderr().

When writing unit tests, you may find MRJobRunner.counters() more useful.