Writing a MapReduce Program and using mrUnit to test it

The advantage of using unit testing is essential when writing m/r jobs (in Java or using Streaming); Let’s imagine we want to write a m/r job that output the average length of the words in a text starting with that letter

Ex:

The Algorithm

Mapper

For each word in the line, emit the first letter of the word as a key, and the length of the word as a value

the key is the letter and the value is the length of the word

Reducer

Because of the sort/shuffle phase built in to MapReduce, the Reducer receives the keys and the list of associates length found for each word with that initial letter

ant the output is

N> the Mapper output type != from the reducer one, we have INT and DOUBLE

Implementation

Define the Driver… This class should configure and submit your basic job. Among the basic steps here, configure the job with the Mapper class and the Reducer class you will write, and the data types of the intermediate and final keys.

Use str.substring(0, 1) // String : first letter of str

str.length() // int : length of str

Define the Reducer… In a single invocation the Reducer receives a string containing one letter along with an iterator of integers. For this call, the reducer should emit a single output of the letter and the average of the integers.

Test your program Compile, jar, and test your program.
the Driver

 
public class AvgWordLength {

public static void main(String[] args) throws Exception {

Job job = new Job();
job.setJarByClass(AvgWordLength.class);
job.setJobName("Average Word Length");

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(LetterMapper.class);
job.setReducerClass(AverageReducer.class);

/*
* The mapper's output keys and values have different data types than
* the reducer's output keys and values. Therefore, you must call the
* setMapOutputKeyClass and setMapOutputValueClass methods.
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.waitForCompletion(true);
}
}

the Mapper

public class LetterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
for (String word : line.split(" \\ W+")) {
String wordInital = word.substring(0, 1).toUpperCase();
Integer wordLength = wordInital.length();
context.write(new Text(wordInital), new IntWritable(wordLength));
}
}
}

the Reducer

public class AverageReducer extends

Reducer<Text, IntWritable, Text, DoubleWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
double averageValues = 0;
int countValues = 0;
int sumValues = 0;
for (IntWritable val : values) {
sumValues += val.get();
countValues++;
}
if (countValues != 0d) {
averageValues = sumValues / (double) countValues;
}
context.write(key, new DoubleWritable(averageValues));
}
}

N> we can incur in an Exception for empty words…

checking the job task history

Enforcing the code to check the length on the word as :

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
for (String word : line.split("\\W+")) {
Integer wordLength = word.length();
if (wordLength &gt; 0) {
String wordInital = word.substring(0, 1).toUpperCase();
context.write(new Text(wordInital), new IntWritable(wordLength));
}
}
}

we can check the results…

@localhost log_file_analysis]$ hadoop fs -cat /user/training/avg0_0/part-r-00000 | grep "A\|E\|I\|O\|U"
A 3.275899648342265
E 5.307238813182565
I 2.1290417039114753
O 2.8046206567868732
U 4.588696504410324

for the vowels only…

Writing Unit Tests With the MRUnit Framework

Rather than using this test and trial time consuming approach we can use the http://mrunit.apache.org/ to test the m/r program on a set of input/output to be sure the code is working as expected.

We need top test the Mapper/the Reducer and the Driver; the approach is unit testing so we can isolate the data set to test and check the results.

Let’s see an example for the typical wordcount and then for the example of the average word length…

The first thing is to import the classes used in the Unit test

import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

Give a m/r job we define a Test class as

public class TestWordCount {

First let’s define the obj used for the test

this depends on the nature of the m/r job, in this case we have

Map K V = Long Text

Reduce K [V] = Text [I]

MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

Let’s set up the test. This method will be called before every test; what we need to do is to instantiate the 3 classes above and ser Mapper, Reducer and Driver (to handle the first 2)

@Before
public void setUp() {
/*
* Set up the mapper test harness.
*/
WordMapper mapper = new WordMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
/*
* Set up the reducer test harness.
*/
SumReducer reducer = new SumReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>();
reduceDriver.setReducer(reducer);
/*
* Set up the mapper/reducer test harness.
*/
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable>();
mapReduceDriver.setMapper(mapper);
mapReduceDriver.setReducer(reducer);
}

Now we can test individually the mapper, reducers and driver
for the reducer we define an arraylist to simulate the list of values given the key

/*
* Test the mapper.
*/
@Test
public void testMapper() {
/*
* For this test, the mapper's input will be "cat cat dog"
*/
mapDriver.withInput(new LongWritable(1), new Text("cat cat dog"));
/*
* The expected output is "cat 1", "cat 1", and "dog 1".
*/
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("dog"), new IntWritable(1));
/*
* Run the test.
*/
mapDriver.runTest();
}
/*
* Test the reducer.
*/
@Test
public void testReducer() {
/*
* For this test, the reducer's input will be "cat 1 1".
*/
List&lt;IntWritable&gt; values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
values.add(new IntWritable(1));
reduceDriver.withInput(new Text("cat"), values);
/*
* The expected output is "cat 2"
*/
reduceDriver.withOutput(new Text("cat"), new IntWritable(2));
/*
* Run the test.
*/
reduceDriver.runTest();
}
/*
* Test the mapper and reducer working together.
*/
@Test
public void testMapReduce() {
/*
* For this test, the mapper's input will be "1  |  cat cat dog"
*/
mapReduceDriver.withInput(new LongWritable(1), new Text("cat cat dog"));
/*
* The expected output (from the reducer) is "cat 2", "dog 1".
*/
mapReduceDriver.addOutput(new Text("cat"), new IntWritable(2));
mapReduceDriver.addOutput(new Text("dog"), new IntWritable(1));
/*
* Run the test.
*/
mapReduceDriver.runTest();
}
}

the pattern is so for

Mapper … using

withInput

withOutput

Reducer … using

withInput // ArrayList

withOutput

Driver … using

withInput

addOutput

Ex of to calculate Avg length of words…

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class TestWordCount {
MapDriver<LongWritable, Text, Text, IntWritable&gt; mapDriver;
ReduceDriver<Text, IntWritable, Text, DoubleWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;
@Before
public void setUp() {
LetterMapper mapper = new LetterMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
AverageReducer reducer = new AverageReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, DoubleWritable>();
reduceDriver.setReducer(reducer);
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
mapReduceDriver.setMapper(mapper);
mapReduceDriver.setReducer(reducer);
}
@Test
public void testMapper() {
mapDriver.withInput(new LongWritable(1), new Text("a test try"));
mapDriver.withOutput(new Text("A"), new IntWritable(1));
mapDriver.withOutput(new Text("T"), new IntWritable(4));
mapDriver.withOutput(new Text("T"), new IntWritable(3));
// !!! get UpperCase by design
mapDriver.runTest();
}
/*
* Test the reducer.
*/
@Test
public void testReducer() {
List&lt;IntWritable&gt; values = new ArrayList<IntWritable>();
values.add(new IntWritable(3));
values.add(new IntWritable(4));
reduceDriver.withInput(new Text("T"), values);
reduceDriver.withOutput(new Text("T"), new DoubleWritable(3.5));
reduceDriver.runTest();
}
/*
* Test the mapper and reducer working together.
*/
@Test
public void testMapReduce() {
 
mapReduceDriver.withInput(new LongWritable(1),
new Text("bed ark bronx"));
 
mapReduceDriver.addOutput(new Text("A"), new DoubleWritable(3.0));
mapReduceDriver.addOutput(new Text("B"), new DoubleWritable(4.0));
// !!! output is sorted
/*
* Run the test.
*/
mapReduceDriver.runTest();
}
}

N> the different usage for the Types used in the Generic definition, so keep always clear the Mapper and Reduce Key Types

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s