It is a universal truth that our own code is perfect and that had we simply written every line of our project, every library, none of this would be happening. Let’s say that you’ve decided you’re ready to take matters into your own hands, but for some reason, your employer just isn’t into paying you to completely rewrite the kernel. What then? How in the world are you going to get your jobs to finish with other people’s wtf-segfaulting libraries in there?

The unexpected death of an otherwise properly configured Hadoop job can come from two basic failure modes: unpredictable crashes and regular uncaught exceptions. Regular uncaught exceptions are just that, regular – they happen in the same place every time but just weren’t caught. The only regular thing about unpredictable crashes is that if you run things long enough, you’ll have some.

Other People’s Code… May Have Issues

In one of our science projects, we have a few Hadoop Streaming jobs that run over ruby and rely on libxml to parse documents. This creates a perfect storm of badness – the web is full of really bad html and libxml occasionally goes into infinite loops or outright segfaults. On some documents, it always segfaults.

The interesting thing about Hadoop is that handling unpredictable crashes is built right in. Depending on how unstable things are, you might have to do some tuning to keep from retreading too much ground or giving up too soon, but you barely have to think about it. This might seem counterintuitive at first, but when you think about the redundant/commodity model, it makes sense to simply retry your failed tasks with a change of venue and hope for the best. And indeed, it pretty much works as advertised.

On the other hand, I was really surprised by the amount of magic required to just rerun tasks and skip over records that cause consistent failures. I was of course being naive, expecting some kind of “stop_failing_dammit=true” flag. After thinking about this some more, though, I get why it’s hard and not-so-well documented. The very reasonable assumption that Hadoop makes is that you catch your exceptions. As it turns out, particularly when you’re doing Streaming, stuff can just break and there’s not much you can do about it.

Handling Random Failures

To deal with random failures, all you have to do is give Hadoop enough attempts per task to have a very high likelihood of finishing the job. One not-so-scientific option, of course, is to set the attempt number so high that a job never fails. The reasonable thing to do is just set it to a small multiple of FAILURE_RATE * TOTAL_RECORDS / NUMBER_OF_TASKS.

For the map tasks, set the number of attempts with:

mapred.map.max.attempts

For the reduce tasks, set the number of attempts with:

mapred.reduce.max.attempts

If the same task fails more than the number of times specified in these settings, your job is killed.

Handling Predictable Failures

Earlier, I mentioned that some documents just cause libxml to segfault. The problem with a segfault is that it’s not an exception ruby can catch, so the task fails every time in the same place.

Fortunately, Hadoop has a feature for skipping bad records. However, when we were wrestling with our job’s stability issues, we discovered it was not as easy as setting “fail_horribly=false” or even the real setting, “mapred.skip.mode.enabled=true”.

Particularly with Streaming, Hadoop doesn’t know exactly which record causes a task to fail, so it can’t just automagically retry the task, bypassing the offending input. There are a few things we can do to point it in the right direction and some choices we can make to weigh the value of each record against wasted work. When a task fails and skipping mode is triggered, Hadoop can essentially do a binary search on failed ranges.

Triggering Skip Mode:

Hadoop will go into skip mode if the same record fails a set number of times. The default is 2.

mapred.skip.attempts.to.start.skipping=2

Trading Records for Attempts:

Once in skip mode, we can choose to hold on to all but the bad record, toss out the entire range, or something in between. This setting controls how long Hadoop will keep retrying and failing to narrow the range.

mapred.skip.map.max.skip.records=1 #only skip the bad record
mapred.skip.map.max.skip.records=0 #don’t go into skip mode
mapred.skip.map.max.skip.records=Long.MAX_VALUE #don’t try to narrow

Giving Hadoop Some Help:

Because there is lots of buffering going on, telling Hadoop every time a record is processed will help it figure out where things went wrong. In Streaming, this can be done by emitting the following counter:

reporter:counter:SkippingTaskCounters,MapProcessedRecords,1

Putting it all Together

To demonstrate how all of these settings fit together, I wrote a simple ruby script to simulate predictable failures. The input is a file with 99,990 “good” records and 10 “bad” ones. The goal is to run the job to the end.

Our mapper (naughty_or_nice.rb):

#!/usr/bin/env ruby
STDIN.each do |line|
  if line
    line.chomp!
    if line=='Naughty'
      #simulate a failure
      warn "No Presents!"
      exit!
    else
      warn "reporter:counter:SkippingTaskCounters,MapProcessedRecords,1"
      puts 'You get presents'
    end
  end
end

Running our job:

      hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1+169.127-streaming.jar          \
      -D mapred.skip.mode.enabled=true                                      \
      -D mapred.skip.map.max.skip.records=1                                 \
      -D mapred.skip.attempts.to.start.skipping=2                           \
      -D mapred.map.tasks=1000                                              \
      -D mapred.map.max.attempts=10                                        \
      -D mapred.reduce.tasks=0                                              \
      -D mapred.job.name="Skip Bad Records Test"                            \
      -input  "/user/hadoop/samples/skip-bad-records/skip_records_test.txt" \
      -output "/user/hadoop/samples/skip-bad-records/output/"               \
      -mapper "$APP_PATH/samples/map_reduce/naughty_or_nice.rb"

And here is our result:

Notice that we kept all of our good records and skipped the 10 “Naughty” ones.

To demonstrate the mapred.skip.map.max.skip.records setting, I tried it again allowing 2 records to be thrown out:

Notice that some “Nice” children missed out on presents because their names were listed next to “Naughty” ones. Santa did save himself some work, though.

Some Final Concerns

It looks like the setting to manually increment the record counter, mapred.skip.map.auto.incr.proc.count=false doesn’t seem to get picked up by the jobconf. It looks like it can only be set for the whole cluster which doesn’t make it awesome for mixing native Java and Streaming.

Good Luck Out There

Getting work done often depends on tolerating some amount of instability and getting around some known issues. If you’re struggling getting through some Hadoop jobs, hopefully this helps.

  • http://twitter.com/perplexes Colin Curtin
  • Mark

    Nice article.  I am finding that it doesn’t appear that this setting – mapred.skip.map.max.skip.records=Long.MAX_VALUE is not working via beeswax.  The setting with any other numerical value works, but when you set it to Long.MAX_VALUE it errors out 

  • Harsh J

    That is cause Long.MAX_VALUE is a constant term that makes sense only in the Java language. It compiles to a direct number that the constant represents, which is what Beeswax expects (i.e. the real value).

  • Harsh J

    Note that the skipping record feature inbuilt in Hadoop is limited to only the Old API at the moment. Support for it in the new API was deprecated, given that this need is best controlled within a single attempt by user code itself (with a try/catch like logic in their language of choice). The day streaming will move onto the new API, these settings may no longer work, but that day is pretty far away.

  • Abhijith Gopal

    Nice! Is there a way to access the counter from within the mapper or reducer?