Writing a MapReduce Program and using mrUnit to test it

The advantage of using unit testing is essential when writing m/r jobs (in Java or using Streaming); Let’s imagine we want to write a m/r job that output the average length of the words in a text starting with that letter

Ex:

The Algorithm

Mapper

For each word in the line, emit the first letter of the word as a key, and the length of the word as a value

the key is the letter and the value is the length of the word

Reducer

Because of the sort/shuffle phase built in to MapReduce, the Reducer receives the keys and the list of associates length found for each word with that initial letter

ant the output is

N> the Mapper output type != from the reducer one, we have INT and DOUBLE

Implementation

Define the Driver… This class should configure and submit your basic job. Among the basic steps here, configure the job with the Mapper class and the Reducer class you will write, and the data types of the intermediate and final keys.

Use str.substring(0, 1) // String : first letter of str

str.length() // int : length of str

Define the Reducer… In a single invocation the Reducer receives a string containing one letter along with an iterator of integers. For this call, the reducer should emit a single output of the letter and the average of the integers.

Test your program Compile, jar, and test your program.
the Driver

 
public class AvgWordLength {

public static void main(String[] args) throws Exception {

Job job = new Job();
job.setJarByClass(AvgWordLength.class);
job.setJobName("Average Word Length");

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(LetterMapper.class);
job.setReducerClass(AverageReducer.class);

/*
* The mapper's output keys and values have different data types than
* the reducer's output keys and values. Therefore, you must call the
* setMapOutputKeyClass and setMapOutputValueClass methods.
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
job.waitForCompletion(true);
}
}

the Mapper

public class LetterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
for (String word : line.split(" \\ W+")) {
String wordInital = word.substring(0, 1).toUpperCase();
Integer wordLength = wordInital.length();
context.write(new Text(wordInital), new IntWritable(wordLength));
}
}
}

the Reducer

public class AverageReducer extends

Reducer<Text, IntWritable, Text, DoubleWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
double averageValues = 0;
int countValues = 0;
int sumValues = 0;
for (IntWritable val : values) {
sumValues += val.get();
countValues++;
}
if (countValues != 0d) {
averageValues = sumValues / (double) countValues;
}
context.write(key, new DoubleWritable(averageValues));
}
}

N> we can incur in an Exception for empty words…

checking the job task history

Enforcing the code to check the length on the word as :

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
for (String word : line.split("\\W+")) {
Integer wordLength = word.length();
if (wordLength &gt; 0) {
String wordInital = word.substring(0, 1).toUpperCase();
context.write(new Text(wordInital), new IntWritable(wordLength));
}
}
}

we can check the results…

@localhost log_file_analysis]$ hadoop fs -cat /user/training/avg0_0/part-r-00000 | grep "A\|E\|I\|O\|U"
A 3.275899648342265
E 5.307238813182565
I 2.1290417039114753
O 2.8046206567868732
U 4.588696504410324

for the vowels only…

Writing Unit Tests With the MRUnit Framework

Rather than using this test and trial time consuming approach we can use the http://mrunit.apache.org/ to test the m/r program on a set of input/output to be sure the code is working as expected.

We need top test the Mapper/the Reducer and the Driver; the approach is unit testing so we can isolate the data set to test and check the results.

Let’s see an example for the typical wordcount and then for the example of the average word length…

The first thing is to import the classes used in the Unit test

import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

Give a m/r job we define a Test class as

public class TestWordCount {

First let’s define the obj used for the test

this depends on the nature of the m/r job, in this case we have

Map K V = Long Text

Reduce K [V] = Text [I]

MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

Let’s set up the test. This method will be called before every test; what we need to do is to instantiate the 3 classes above and ser Mapper, Reducer and Driver (to handle the first 2)

@Before
public void setUp() {
/*
* Set up the mapper test harness.
*/
WordMapper mapper = new WordMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
/*
* Set up the reducer test harness.
*/
SumReducer reducer = new SumReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>();
reduceDriver.setReducer(reducer);
/*
* Set up the mapper/reducer test harness.
*/
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable>();
mapReduceDriver.setMapper(mapper);
mapReduceDriver.setReducer(reducer);
}

Now we can test individually the mapper, reducers and driver
for the reducer we define an arraylist to simulate the list of values given the key

/*
* Test the mapper.
*/
@Test
public void testMapper() {
/*
* For this test, the mapper's input will be "cat cat dog"
*/
mapDriver.withInput(new LongWritable(1), new Text("cat cat dog"));
/*
* The expected output is "cat 1", "cat 1", and "dog 1".
*/
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("cat"), new IntWritable(1));
mapDriver.withOutput(new Text("dog"), new IntWritable(1));
/*
* Run the test.
*/
mapDriver.runTest();
}
/*
* Test the reducer.
*/
@Test
public void testReducer() {
/*
* For this test, the reducer's input will be "cat 1 1".
*/
List&lt;IntWritable&gt; values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
values.add(new IntWritable(1));
reduceDriver.withInput(new Text("cat"), values);
/*
* The expected output is "cat 2"
*/
reduceDriver.withOutput(new Text("cat"), new IntWritable(2));
/*
* Run the test.
*/
reduceDriver.runTest();
}
/*
* Test the mapper and reducer working together.
*/
@Test
public void testMapReduce() {
/*
* For this test, the mapper's input will be "1  |  cat cat dog"
*/
mapReduceDriver.withInput(new LongWritable(1), new Text("cat cat dog"));
/*
* The expected output (from the reducer) is "cat 2", "dog 1".
*/
mapReduceDriver.addOutput(new Text("cat"), new IntWritable(2));
mapReduceDriver.addOutput(new Text("dog"), new IntWritable(1));
/*
* Run the test.
*/
mapReduceDriver.runTest();
}
}

the pattern is so for

Mapper … using

withInput

withOutput

Reducer … using

withInput // ArrayList

withOutput

Driver … using

withInput

addOutput

Ex of to calculate Avg length of words…

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

public class TestWordCount {
MapDriver<LongWritable, Text, Text, IntWritable&gt; mapDriver;
ReduceDriver<Text, IntWritable, Text, DoubleWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable> mapReduceDriver;
@Before
public void setUp() {
LetterMapper mapper = new LetterMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
AverageReducer reducer = new AverageReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, DoubleWritable>();
reduceDriver.setReducer(reducer);
mapReduceDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, DoubleWritable>();
mapReduceDriver.setMapper(mapper);
mapReduceDriver.setReducer(reducer);
}
@Test
public void testMapper() {
mapDriver.withInput(new LongWritable(1), new Text("a test try"));
mapDriver.withOutput(new Text("A"), new IntWritable(1));
mapDriver.withOutput(new Text("T"), new IntWritable(4));
mapDriver.withOutput(new Text("T"), new IntWritable(3));
// !!! get UpperCase by design
mapDriver.runTest();
}
/*
* Test the reducer.
*/
@Test
public void testReducer() {
List&lt;IntWritable&gt; values = new ArrayList<IntWritable>();
values.add(new IntWritable(3));
values.add(new IntWritable(4));
reduceDriver.withInput(new Text("T"), values);
reduceDriver.withOutput(new Text("T"), new DoubleWritable(3.5));
reduceDriver.runTest();
}
/*
* Test the mapper and reducer working together.
*/
@Test
public void testMapReduce() {
 
mapReduceDriver.withInput(new LongWritable(1),
new Text("bed ark bronx"));
 
mapReduceDriver.addOutput(new Text("A"), new DoubleWritable(3.0));
mapReduceDriver.addOutput(new Text("B"), new DoubleWritable(4.0));
// !!! output is sorted
/*
* Run the test.
*/
mapReduceDriver.runTest();
}
}

N> the different usage for the Types used in the Generic definition, so keep always clear the Mapper and Reduce Key Types

Using HackPack as unstructured daily todo list

I have been using  a lot todoist to keep all my todo tasks for a while, the only things I don’t like of this type of tools is that they have ben designed following a strict structured approach…

The reality is my tasks, your tasks are never so well organised so trying to add too many boundaries and structure might end up to be counterproductive… things change, you need to add notes, ideas, mark what done and to focus on what to do next…

I’ve been using hackpad for a few weeks now and I really like the Markdown syntax (specially if you come from a console world)…

So how to use it to keep things effective with a very little of effort? I though the best is to give you a visual example and then to find your way…

Here some screenshots of the steps to build it…

 

View Unstructured TODO List on Hackpad.

Plan Big, an agile pomodoro spreadsheet

Plan Big, an agile pomodoro spreadsheet

I’ve seen too many colleagues starting their day without spending 10 mins thinking upfront what to do during the day, ending up in this way with loads of time wasted and confusion… honestly it’s much better starting your day with a thought what you’ve done, what you need to do today and what you plan to achieve tomorrow; this is essential in a team to have a scope and a feeling to work for something meaningful… I cannot get why this simple steps are still not a dogma in so many companies… are we supposed to be slave of incompetent PM/ Team leaders forever making projects to fail just for a luck of commons sense?

You can use this google doc here to help yourself, to use it you need 2 minutes of your rapacious time and it will help you to be more focused and to think on what are you working on…

to plan your daily task – https://docs.google.com/spreadsheet/ccc?key=0AsSPt8G9vLMjdDlzMk1DdlNkQl90TzhYZm1RV1dlZUE&usp=sharing 

1st you clone it in you google doc 
 2nd you update it with the daily task and how long do you think is going to take considering

20140364_000383-CapturFiles

1 = 25 min commitment ( a pomodoro ) 
so for each task you update the column DONE 
 
20140364_000359-CapturFiles 
Note the time if updated every minute and the state of Estimated Done changes when changing the DONE column; in this way you can have see in realtime view how long it is going to take you before finishing your daily tasks
 
Last but not least you might save the daily sheet, to keep track of your week, just add a new sheet for each new week and copy and paste the columns at the end of the day, so you can switch off your brain and enjoy your free time

The MapReduce paradigm on the shell

 

Hadoop is hdfs + m/r; map reduce it’s a programming paradigm that everyone can use, so let’s get the idea of the concept behind using a simple approach.

Let’s get some text file to process on the shell, for example we can use War & Peace

http://onlinebooks.library.upenn.edu/webbin/gutbook/lookup?num=2600

One approach to get some sort of counting on the text is using the well know grep command, it counts the line with a specified word

grep “Napoleon ” 2600.txt | wc -l

image

The concept of map is a function that works on key-value pairs; in our case we can consider the key the file row (or the offset from the beginning of the file) and value the file row itself.

This simple script (map1.sh) will count the lines in the word

#!/bin/bash
kcount=0
while read line ; do
if [ "$line"="Napoleon,1" ] ; then
    let kcount=kcount+1
fi
done

echo “Napoleon, $kcount”

N> don’t forget to apply the chmod u+rwx map1.sh

Getting something like this:

image

Note that each occurrence is emitted with a 1, this idea is quite common practice, when a specific value is found and we are looking for the number of it (sum) we can think in such a way.
Now let’s define the reducer (red1.sh) that will sum up what has been emitted by the mapper

#!/bin/bash
kcount=0
while read line ; do
if [ "$line"="Napoleon,1" ] ; then
    let kcount=kcount+1
fi
done

echo “Napoleon, $kcount”

From this will have the number of occurrences of the word:

image

N> the value is slightly bigger as grep counted the lines, here we are counting the occurrences (probably some lines had more than one occurrence in it)

    Parallelism
The important idea is that the problem can be decomposed and parallelized without affecting the result: finding tokens in a file, specifically on rows input, is a computation that can be done independently; we can process each row separately and then sum up all the occurrences found… still the result will be correct

On Hadoop, we can slice the input file on several datanode (assuming the input is very big, as usually it happens in big-data problems), and still the algorithm will work, in this way we can scale out just adding nodes to the cluster.

In our case we used a simple mapper looking for a single word, the natural generalization is to generate X mapper for N different key we want to compute and allocate N reducers to make them counting on the mapper output, the X mappers can split the input on different subset and process it in a size / X time (roughly) and then the reducers will receive the mapper output for one of the 1..N different keys (words) we want to count and get the output result.

image

A closer look to hdfs

 

I am keeping up-skilling on Hadoop & NoSql in general, so this post might be useful if you’re doing the same…

Hdfs is optimized for steaming operations, to achieve high performance when processing huge amount of data, a trade off is it’s poor on random seeks, but because of the nature of the way data is meant to be processed, this is not a real limitation.
The structure is intrinsically write once, so data cannot be deleted or changes; because there is no real reread of the same blocks (in the steaming processing) no local cache needs; the failure handling is a core feature; for example even if one rack (of machine) has communication issues, hadoop is still able to keep m/r jobs going (with degrading performances as probably data needs to be re-replicated);

Namenode and datanode

The clusters can supply different services, consider them as nodes that can do different responsibilities along the time-line. The hdfs uses a master-slave model: the namenode is a traffic coordinator, it provides a single namespace for all the file-system (of course it’s spread along the clusters), the datanode is where the data is located, namenode looks after any datanode that fails (when it’s so it’s considered not reachable or down).
Let’s see a diagram where a client uses the namenode when using hdfs, so the namenode check the status of the datanodes and then, they start to stream the data requested by the client

image

Note the secondary namenode is not a failover node, it does checkpoint for the namenode fs so if the namenode goes down it can used to restart the process; it’s usually on a separate server.

Filesystem Namespace

Hdfs support the majority command of unix fs, (create, move, cp etc), this operations happened though namenode, the manipulation goes through the namenode not directly accessing the datanode. Some features such has hard/soft link are not supported as not relevant in the hdfs world.

Block replication
Because of the streaming nature of hdfs and quantity of data, each datablock is 64 MB.

If we have a file that have this size

image

Hdfs will break this in 5 parts (hp the size is 64*4 + X) we have the block is replicated 3 times (by defualt), so the namenode will spread the 5 blocks on the available datanode

image

The namenode itself maintains an imaginary table of where the data is replicated so in case of failure it will instruct a free datanode to replicate the data of the other 2 available.

Reading hdfs

image

let’s see the steps when a client reads from hdfs.
Step 1
the client calls the distributed file-system, it calls the namenode to figure out from which datanode in the cluster the block (we want read data from) is located in the cluster. Don’t forget that as data is replicated, so we expected to have a list of namenodes holding copies of the data, from which we can read;
Step 2
This list of datanodes is based on the proximity to the client; this makes sense when looking to minimize wait time; the namenode is aware of this information when the cluster is built (adding/removing clusters)
Step 3
A FSDataInputStream is returned to the client, this is an objects that abstrscts the process
Step 4
so the client can read the data from the datanode; data can be streamed to the client
Step 5
Connections are opened/closed during this process to read data from the cluster, don’t forget is a distributed system
Writing to hdfs
the writing is basically specular and the main difference is we have a node pipe of blocks as the data is replicated across (3) datanodes, and we have a FSDataOutputStream and checking phase oh Acknowledge so the namenode can update is internal references, sure the data is correctly replicated.

References

Buy this http://www.informit.com/store/hadoop-fundamentals-livelessons-video-training-9780133392821

Exploring the world of Hadoop Q/A

In this post some Q/A useful to explore the Hadoop world, this can be used even for a fist interview, make use of it…

??? What does H(adoop) allow to do at high level??? to manage huge volumes of data allowing big problems to be broken down in smaller elements so the parallel analysis can be done quickly and cost effectively, scaling out when necessary.

??? Which are the 2 main components of H ??? we have H distributed filesystem (storage cluster) and MapReduce engine to implement data processing

??? Can you add on the fly new clusters ??? yes H can manage dynamically the scale-out, so when a cluster is down it can be replaced transparently to the user; the machine are named commodity hardware as they have not specialised hw.

??? Can you update data in hdfs ??? No, data is written once and read many times, that’s one reason hdfs is not POSIX compliant, you can append data (HBASE db)

??? what’s a block ??? when a file is too big (based on the current configuratin) it’s split across the DataNodes (DNs)

??? how do blocks relate to DNs ??? DNs are server that contain blocks for a given set of files; they peform data validation

??? what does the NameNode do (regarding the DataNode) ??? it manages file access, reads, writes, replication of data blocks across DataNodes.

??? what is a filesystem namespace ??? the complete collection of all the files in a cluster

??? who is the smarter DataNode or NamNode ??? NameNode is the one to communicate with DataNodes checking their status and looking for issues and manages their access. It’s usually provide of loads of ram and replication.

??? what does it mean that DataNode are resilient ??? data blocks are replicated across multiple data nodes, this replication and all the mechanism provide high availability of data

??? what is a Rack ID and what about its usage ??? hdfs is rack-ware, to ensure replication effeiecency the NN uses an id to keep track of where the DNs are physically located (it’s faster to copy data in the same rack than between 2 different ones)

??? what heartbeats are??? they are messages used by NN to check the DNs healthy, a DN that does not send a heartbeat will be removed for a while by the ND from the working DDs list; all it’s transparent to the user (it might come back later is communciation is reenstablished)

??? how hdfs ensure integrity ??? it uses transaction logs (to keep track of the operation, and necessary for rebuilding data) and checksum validation (to ensure the data is valid and not corrupted)

??? what is and where is about the blocks metadata ??? it’s a detailed description of when the file was crateds, accessed … where the blocks are stored, how many DNs and file are on the cluster, transaction log info; it’s stored in the NN and loaded in RAM

??? what is a data pipeline ??? it’s a connection between multiple data nodes to support data transfer; the data in the block is forwarded to different DNs to ensure replicas

??? what is the rebalancer ??? it’s a hdfs service that balance the DNs and avoid traffic congestions

??? can you draw the workflow for a 2 cluster system ???

files from hdfs –>

input to InputFormat –> (Split)* –> (RR)* –> (Map)* –> Partitioner –>

shuffle process communicating with he 2nd cluster to collect data (if any) –> Sort –> Reduce –> Outputformat –>

file are written to hdfs

??? what is the functionality for the InputFormat and RecordReader functions ??? they convert the input file of a m/r program (we want to run) to something that can be processed

??? what is an InputSplit ??? the InputFormat processes the input file and it can decide to split it in pieces, then it assigns a RR so the data can be processed by the map function (specifically the key-value pair)

??? in a m/r job do we have a relation between key-value pair and map ??? a map instance is defined for each key-value pair

??? what does the OutputCollector do ??? it collects the output from the independent map instantiated to process the input

??? what happens after all the map tasks are completed ??? the intermediate results are gathered in partition, a shuffling phase occurs sorting he output so the reducers can process that efficiently

??? can we start the Reduce phase before all the mappings is done ??? No

??? what does the RecordWriter do ??? it writes data from the OutputFormat reducers to the hdfs

Simple Job search on GitHub & LiVe

Simple Job search on GitHub & LiVe

When looking for a new role it’s always frustrating and time wasting checking job boards and recruiter websites. A cool approach is using http://en.wikipedia.org/wiki/Web_scraping that means downloading all the job details from a website and collect them; this is usually a tool used by in web marketing/web campaign ect; but you need to know about the structure of the page and how to pull relevant html tags from it.

As I need to keep an eye on the Irish market (an sometimes abroad) I spent a few hours putting together a simple ws project you can download here  http://github.com/mamatucci/JobSites/

It’s a dummy ASP.NET page that has a list of option (job sites) and a IFRAME where the actual web page  page is loaded, so you can reloaded in sequence them with a click on the list on the right, rather then going and placing the keyword and (optionally) the region/city where to search.

Usage examples

The usage is easy: select the 2 option What and Where and one job site on the right column and click Reload

Then click on another web site on the right and the page will reload it with the same what and where selection … saving you a lot of time…

(note the Region is kept in the address in this example as the ws support it in the search query)

Open in a new page  careerjet looking for [ETL,Ireland]

How to add a new site?

The procedure is very easy; let’s do an example for http://www.cwjobs.co.uk/

Put XXX in the keyword (it will be mapped to _what_ in the 1st screen) and YYY in region (mapped to _Where_), something like this

http://www.cwjobs.co.uk/JobSearch/Results.aspx?Keywords=XXX&LTxt=YYY&Radius=5

Now copy and past it and use the string to update the job panel at the bottom 

Cwjobs    |http://www.cwjobs.co.uk/JobSearch/Results.aspx?Keywords=XXX&amp;LTxt=YYY&amp;Radius=5   |UK|true

  

Note> substitute the & with &amp; as this is an xml doc so you need to escape that …

QueryString Note

The websites that uses session state rather than query string will not work as you cannot use this approach…

Dump pane

A time saver function is to use the Dump panel to open all the website in one go…

and scrolling down

N> I updated the ws list in the previous panel to show results only for two…

Update AppHarbor

I’ve found this interesting platform to deploy asp.net pages on the cloud

https://appharbor.com/

So I deployed the project on it…

image

You can use it on the web without need to installing it ;)

http://obar1jobs.apphb.com/

I hope it helps!

Mario