Working with Hadoop: My first MapReduce App

Most Hadoop tutorials use the wordcount application as a demo application. And while this might be a good demo application, it is not particularly helpful. So i wanted to think of an idea for a more useful application to use on a cluster.

My first thought was trying to implement the famous Sieve of Eratosthenes. But it turned out that this might not be a good first application because i think there needs to be a shared state between map tasks. So its better left for another time.

So instead, i opted for the methods mentioned in Primality Test. And went with the first naive approach as an example. So i tested numbers by checking if any number perceeding them divides them.(again,no optimization)

This might seem like a simple application to do, since you can basically change a couple of things in the wordcount demo application. But you see, the first thing that i noticed while reading up on Hadoop is that most apps focus on the same file-based approaches. Which makes sense when you consider the project’s roots in indexing. And that is a limitation if you consider the possible projects that can and do make use of distributed computing such as medical research, cryptography work, and so on…

Well, while Hadoop does have built-in classes to deal easily with file based input, you can easily make it deal with any other kind of input you want. So while most apps don’t actually go deep enough to tell you how to do this, I will. Although i am still scratching the surface, i hope this would be useful to somebody out there.

So, if you just finished reading about the wordcount application, you’d think that you can the prime test by writing all the numbers that you want to check into a file and then just use that file as input to your MapReduce app. While this would work, why do the extra step of writing all the numbers to a file and then reading from that file again. We’re already have to generate the numbers, so just hand out those numbers to the Map tasks.

From this point on, I assume that you’ve already read the tutorial that comes with Hadoop, so I won’t go into details on anything basic.

In a nutshell, to make the mapping tasks acccept generated input, you have to define a new type of input format. You do that by develping a class that implements InputFormat. With that, you also have to implement a RecordReader and an InputSplit.

In the end, you will have the following files:

  • PrimeTest.java: Contains the MapReduce procedures.
  • PrimeTestInputFormat.java: Defines the input format and provides the input splits for the map tasks and also defines the record reader.
  • PrimeInputSplit.java: Defines a split in the input data for each map task.
  • PrimeRecordReader.java: Responsible for returning <Key, Value> records from bye-based input splits.

This application will use a record <K,V> of <long, long> type with both the key and value being the prime number to test. And the intermediate record <K’, V’> from the map tasks will be of types <boolean, <Long….>> with the key being a true/false primality result. The result record <K”, V”> from the reduce tasks will be of type <boolean, Long> and is again the number keyed by primality result. Not really the best of use for records, but for out specific example, that’s all we need.

Starting with the main file, In PrimeTest.java, we define the map/reduce procedures.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class PrimeTest
{
    private static final Log LOG = LogFactory.getLog("PrimeTest");

    public static class Map extends MapReduceBase implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable>
    {
        private final static BooleanWritable truePrime = new BooleanWritable(true);
        private final static BooleanWritable falsePrime = new BooleanWritable(false);
    
        public void map(LongWritable key, LongWritable value, OutputCollector<BooleanWritable, LongWritable> output, Reporter reporter) throws IOException
        {
            long suspectPrimeValue = value.get();
            boolean isComposite = false;

            for(long i = 2; i < suspectPrimeValue - 1 ; i++)
            {
                if(suspectPrimeValue % i == 0)
                {
                    output.collect(falsePrime, value);
                    isComposite = true;
                    break;
                }
            }
            
            if(!isComposite)
            {
                output.collect(truePrime, value);
            }
            System.out.println(suspectPrimeValue);
        }
    }
    
    public static class Reduce extends MapReduceBase implements Reducer<BooleanWritable, LongWritable, BooleanWritable, LongWritable>
    {
        public void reduce(BooleanWritable key, Iterator<LongWritable> values, OutputCollector<BooleanWritable, LongWritable> output, Reporter reporter) throws IOException
        {
            while(values.hasNext())
            {
                LongWritable vCheck = values.next();
                output.collect(key, vCheck);
            }
        
        }

    }
    
    public static void main(String[] args) throws Exception
    {
        JobConf conf = new JobConf(PrimeTest.class);
        conf.setJobName("primetest");
    
        conf.setOutputKeyClass(BooleanWritable.class);
        conf.setOutputValueClass(LongWritable.class);
    
        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);
    
        conf.setInputFormat(PrimeTestInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
    
        conf.setOutputPath(new Path(args[0]));
    
        JobClient.runJob(conf);
    }
}

The map procedure is a direct implementation of the prime test algorithm. And so is quite straightforward. So is our reduce procedure; just placing the values in the output collector.

You’ll notice that we define our own input format and that we do not need to define an input path since our input is generated.

Now we need to implement the input format class. And it would look like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.ArrayList;

import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RecordReader;


public class PrimeTestInputFormat implements InputFormat<LongWritable, LongWritable>
{
    public RecordReader<LongWritable, LongWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
    {
        return new PrimeRecordReader((PrimeInputSplit)split);
    }

    public InputSplit[] getSplits(JobConf job, int numSplits)
    {
        long startingNumber = 2; //from 
        long endingNumber = 265; // to, exclusive

        long numbersInSplit = (long)Math.floor((endingNumber - startingNumber)/numSplits);
        long startingNumberInSplit = startingNumber;
        long endingNumberInSplit = startingNumberInSplit + numbersInSplit;
        long remainderInLastSplit = (endingNumber - startingNumber) - numSplits*numbersInSplit;

        ArrayList<PrimeInputSplit> splits = new ArrayList<PrimeInputSplit>(numSplits);

        for(int i = 0; i < numSplits - 1; i++)
        {
            splits.add(new PrimeInputSplit(startingNumberInSplit, endingNumberInSplit));
            startingNumberInSplit = endingNumberInSplit;
            endingNumberInSplit = startingNumberInSplit + numbersInSplit;
        }

        //add last split, with remainder if any
        splits.add(new PrimeInputSplit(startingNumberInSplit, endingNumberInSplit + remainderInLastSplit));

        return splits.toArray(new PrimeInputSplit[splits.size()]);
    }

    public void validateInput(JobConf conf)
    {
        //valid input since we are generating it
    }

}

The only interesting part in this class is the getSplits() method which splits the input data to feed the map tasks. One thing to note here is that the “from “and “to” numbers are hardcoded while it would have been better to place them as properties and retrieve them via the JobConf. Also, the input generation is a naive one and has many assumptions.

Looking at the PrimeTestInputFormat class, you’ll also notice that we’re referncing a PrimeInputSplit class and a PrimeRecordReader. So we also need to implement those.

As mentioned earlier, the PrimeInputSplit class represents a byte view to a particular data split. In our case, this class will hold the starting and ending numbers for each split. So it could like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;

import org.apache.hadoop.mapred.InputSplit;

public class PrimeInputSplit implements InputSplit
{
    private long m_StartNum;
    private long m_EndNum;

    PrimeInputSplit(){}

    public PrimeInputSplit(long p_Start, long p_End)
    {
        this.m_StartNum = p_Start;
        this.m_EndNum = p_End;
    }

    public long getLength()
    {
        return (this.m_EndNum - this.m_StartNum) * 8;
    }

    public String[] getLocations() throws IOException
    {
        return new String[]{};
    }

    public void readFields(DataInput in) throws IOException
    {
        this.m_StartNum = in.readLong();
        this.m_EndNum = in.readLong();
    }

    public void write(DataOutput out) throws IOException
    {
        out.writeLong(this.m_StartNum);
        out.writeLong(this.m_EndNum);
    }

    public long getStartNum()
    {
        return this.m_StartNum;
    }

    public long getEndNum()
    {
        return this.m_EndNum;
    }


}

Next, we implement the record reader which translates the byte view input split into a record for the map task.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52



53
54
55
56
57
58
59
60
61
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.RecordReader;

public class PrimeRecordReader implements RecordReader<LongWritable, LongWritable>
{
    private long m_End;
    private long m_Index;
    private long m_Start;

    public PrimeRecordReader(PrimeInputSplit split)
    {
        this.m_End = split.getEndNum();
        this.m_Index = split.getStartNum(); //index at starting number of split
        this.m_Start = split.getStartNum();
    }

    public LongWritable createKey()
    {
        return new LongWritable();
    }

    public LongWritable createValue()
    {
        return new LongWritable();
    }

    public void close(){}

    public float getProgress()
    {
        if(this.m_Index  == this.m_End)
        {
            return 0.0f;
        }
        else
        {
            return Math.min(1.0f, (this.m_Index - this.m_Start) / (float)(this.m_End - this.m_Start));
        }
    }

    public long getPos()
    {
        return this.m_End - this.m_Index;
    }

    public boolean next(LongWritable key, LongWritable value)
    {
        if(this.m_Index < this.m_End)
        {
            key.set(this.m_Index);
            value.set(this.m_Index);
            this.m_Index++;
    
            return true;
        }
        else
        {
            return false;
        }
    }
}

And that’s about it. Just jar up the application and deploy it on Hadoop.

There are a couple of interesting things to try here. First up would be to implement some optimizations to the prime testing function. Another thing would be to implement a better way to generate input splits to accommodate sparse data for example. Another interesting thing that I’d like to try sometime is do a multi-threaded application with the same functionality and then compare its performance to a small non-uniform cluster. Ofcourse the overhead of Hadoop on a single node cluster would mean that the local app would be faster so i am interested in observing how much this overhead affects performance so spreading it out over a couple of machines should balance things out. At least in theory…so we’ll see.

The source files are available here.(you can also directly deploy this jar)

Comments, feedback are welcome.

  • Abhishek Jain

    Thanks a ton. I am a newbe to Hadoop MapReduce and was looking for an example for custom InputFormat.

    -abhishek

  • http://www.ualberta.ca/~hwsamuel Hamman Samuel

    You said “Although i am still scratching the surface, i hope this would be useful to somebody out there”. Yes, it helped me a lot, thanks :)

  • http://xmariachi.blogspot.com xmariachi

    Hi,
    it did help. It was not easy to find an example for a custom format.
    I actually was looking for a simple LongWritable,LongWritable instead of the preset LongWritable, Text that the KeyValueTextInputFormat forces to use, so I got into hadoop code and copied/adapted the KeyValueTextInputFormat and KeyValueLineRecordReader.
    It was easy, but that allowed to make simpler the specification of formats. Othewise I had to convert from Text to LongWritable all the time in the hadoop code which is ugly and confusing for the reader (as it happened to me first! Didnt understand why Longs were specified as Text, so I though it was another thing!)
    Thanks

  • aeolist

    thanks dude, huge help :)

  • astronomino

    THANK YOU !!!!
    I need to use Hadoop MapReduce for a school project and I’m glad I’ve finally found a normal example. The official documentation blabbers on and on for hundreds of pages and countless video hours about how MapReduce works and how endless is the choice of programming languages and scripts that can be used. Yet there isn’t a single piece of useful info other than that word counter which lacks too many features.

  • Dhyan

    Great article, thanks a lot.

  • lovecpp

    a useful article!
    But how to use Hadoop0.20ApI to code PrimeRecordReader?

  • Rajgopal

    Hi.. Nice article.. I have a few doubts, in PrimeTestInputFormat.java, there is a int numSplits argument. what factor decides the value of the argument ?

  • Maha

    Thanks a lot :) Although, I’m a little suspicious about:

    //add last split, with remainder if any // Is there an error?? start+remainder
    splits.add(new PrimeInputSplit(startingNumberInSplit,
    endingNumberInSplit + remainderInLastSplit));

    because endingNumerInSplit is already updated in the loop above it to be starting+NumInSplit … So that line should be:

    splits.add(new PrimeInputSplit(startingNumberInSplit,
    startingNumberInSplit + remainderInLastSplit));

    Right?

    Anyways, thanks again, I’ve been looking for this for days !!

  • Maha Alabduljalil

    another question …

    What happens if you write: return splits.toArray() ;
    instead of : return splits.toArray(new PrimeInputSplit[splits.size()]);

    in the getSplits of the PrimeTextInputSplit class ?

  • ranjini

    hi ,
    I am using hadoop 0.20 version and java 1.6.
    I need to process xml via mapreduce program. But i am having issues.
    I have put JDOM 1.0 jar but still ClassNotFound Exception of JDOM Exception and SAX Builder coming pl help to fix the issue.