Introduction to MapReduce

Introduction

There are tons of large data in today’s life. Processing large data on a single computer becomes unacceptable as the scale of data soars. A classic problem is the word count problem: Given very large text data, get the the number of occurrences of each word. If we use multiple computers, we can speed up the process, but the complexity of programming will also increase. MapReduce can help us handle large data on a cluster of computers fast and easily. In fact, MapReduce is an idea of data parallelism, and also a model of data parallelism.

In this article, we are going to implement a serial program and a MapReduce parallel program for the word count problem, and compare their performances.

Algorithm Specification

Brief Introduction to MapReduce

MapReduce model consists of two main stages: map stage and reduce stage.

Map stage accepts some input records, and transform them into some intermediate records. Let’s take the word count problem as an example. Assume that we are given the following text:

1
2
3
apple is red
orange is orange
banana is yellow

Before map stage, the input text is divided into several input splits. Each of them is assigned to a mapper.

1
2
3
("apple is red")		->	Mapper 1
("orange is orange") -> Mapper 2
("banana is yellow") -> Mapper 3

In map stage, we split the data into several pairs (key, value). In this example, key represents for a word and value represents for one occurrence of that word.

1
2
3
Mapper 1	->	("apple", 1), ("is", 1), ("red", 1)
Mapper 2 -> ("orange", 1), ("is", 1), ("orange", 1)
Mapper 3 -> ("banana", 1), ("is", 1), ("yellow", 1)

Between map stage and reduce stage, there is a shuffle (or partition) process. The pairs with the same key are merged together, forming pairs like (key, <set of values>), and are sent to the same reducer.

1
2
3
("apple", <1>), ("orange", <1, 1>)	->	Reducer 1
("is", <1, 1, 1>), ("banana", <1>) -> Reducer 2
("red", <1>), ("yellow", <1>) -> Reducer 3

In reduce stage, we reduce a set of intermediate values which share a key to a smaller set of values. In this example, we simply calculate the sum of values in the set for each key.

1
2
3
Reducer 1	->	("apple", 1), ("orange", 2)
Reducer 2 -> ("is", 3), ("banana", 1)
Reducer 3 -> ("red", 1), ("yellow", 1)

After reduce stage, we merge the results generated by reducers together.

1
2
3
4
5
6
("apple", 1)
("orange", 2)
("is", 3)
("banana", 1)
("red", 1)
("yellow", 1)

And here we have finished the task, i.e., find the number of occurrences of each word.

Brief Introduction to Hadoop

Despite simple principle, MapReduce isn’t that easy to implement. There are lots of details to consider, such as how to transmit data between computers, how to split the input, how to manage memory and when to write data to disk. Fortunately, Apache Software Foundation has an excellent implementation called Hadoop. We can just focus on map function and reduce function, and deploy Hadoop on a cluster of computers. The Hadoop framework will handle the rest of things. Specifically, Hadoop implements a file system called HDFS to store files and read and write data during MapReduce process.

Let’s see how to use Hadoop to solve the word count problem. First, we need to build a cluster of computers with Hadoop installed. We use Docker because it is convenient to deploy several containers from the same image. Then, we should specify one of the containers to be name node (manage HDFS), and the other containers to be data nodes (store data). After that, we put the test data into HDFS, and write our own map function and reduce function. Finally, we compile the program with Hadoop library and run it using Hadoop to get the result.

To be more concrete, the map function should be defined in a subclass of Mapper. We define class TokenizerMapper that extends Mapper<Object, Text, Text, IntWritable>. Here, the first two parameters indicate the type of input pair of map function (<Object, Text>), and the last two parameters indicate the type of output pair of map function (<Text, IntWritable>). We define a word as a string that contains only letters and quotation marks and doesn’t start with or end with quotation mark. So we can write our map function like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

/* map function */
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String str = new String("");
for (int i = 0; i < line.length(); i++) {
if (Character.isLetter(line.charAt(i)) || str.length() > 0 && line.charAt(i) == '\'') {
str += Character.toLowerCase(line.charAt(i)); // extend word
} else {
for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) {
str = str.substring(0, str.length() - 1); // remove trailing quotation mark
}
if (str.length() > 0) {
word.set(str);
context.write(word, one); // output pair
str = "";
}
}
}
for (; str.length() > 0 && str.charAt(str.length() - 1) == '\'';) {
str = str.substring(0, str.length() - 1); // remove trailing quotation mark
}
if (str.length() > 0) {
word.set(str);
context.write(word, one); // output pair
}
}
}

Similarly, the reduce function should be defined in a subclass of Reducer. We define class IntSumReducer that extends Reducer<Text, IntWritable, Text, IntWritable>. Here, the first two parameters indicate the type of input pair of reduce function (<Text, IntWritable>), which should be equal to the type of output pair of map function, and the last two parameters indicate the type of output pair of reduce function (<Text, IntWritable>). Note that by default, the output pairs are sorted by their key, i.e., by the word in lexicographical order. If we want to customize the order, we need to do extra work after reducing. We can store the pairs and do not output them in reduce function. Instead, we sort and output them in cleanup function, which will be executed after all reducers finish their work. We can write our reduce function like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private HashMap<String, Integer> record = new HashMap<String, Integer>(); // store count in map, used for final sort
private Text word = new Text();
private IntWritable result = new IntWritable();

/* reduce function */
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
record.put(key.toString(), record.getOrDefault(key.toString(), 0) + sum); // update map, don't output here
}

/* cleanup function executed after reducing */
public void cleanup(Context context) throws IOException, InterruptedException {
ArrayList<HashMap.Entry<String, Integer>> list = new ArrayList<HashMap.Entry<String, Integer>>(record.entrySet()); // get answer from map
Collections.sort(list, new Comparator<HashMap.Entry<String, Integer>>() {
public int compare(HashMap.Entry<String, Integer> o1, HashMap.Entry<String, Integer> o2) { // compare function
if (o1.getValue().equals(o2.getValue())) {
return o1.getKey().compareTo(o2.getKey());
}
return o1.getValue() < o2.getValue() ? 1 : -1;
}
});
for (HashMap.Entry<String, Integer> itr : list) {
word.set(itr.getKey());
result.set(itr.getValue());
context.write(word, result); // output here
}
}
}

A Simple Implementation of MapReduce

If we just want to process relatively small data on a multicore computer using parallelism, we can implement our own MapReduce framework. We choose Golang as our programming language because it is easy to reach high concurrency. More importantly, the communication between different threads is very convenient, using a special type in Golang called channel.

In Golang, we can start a new thread simply using go funcName(args). And we can define a channel of string using chanName chan string. To send something via channel, use chanName <- something. To receive something from channel, use something := <-chanName. Thus, we are able to communicate with other threads using go funcName(inChan, outChan), and send data via inChan and receive data from outChan.

To implement map stage, for simplicity, we just split the input data by file (that is, each file will be assigned to a mapper to be processed). We can write mapper like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Frame-defined mapper */
func Mapper(config Config, filename chan string, shuffle chan Pair) {
output := make(chan Pair, chanSize)
merge := make(map[string][]int)
for name := range filename {
text, _ := os.ReadFile(name) // read file content
go config.mapFunc(text, output)
for pair := range output {
if pair.key == "" { // map finish
break
}
merge[pair.key] = append(merge[pair.key], pair.value) // merge in map stage
}
}
for key, list := range merge {
shuffle <- config.combineFunc(key, list) // combine local result, to shuffle stage
}
shuffle <- Pair{"", 0} // mapper finish
}

Here, we merge and combine local result in map stage. This is because we can send (key, 3) once instead of sending (key, 1) three times, to reduce data transmission and lighten the work of reducer.

To implement shuffle process, we simply use a map to ensure that pairs with the same key are sent to the same reducer. We can write shuffle process like this:

1
2
3
4
5
6
7
8
9
10
11
merge := make(map[string][]int)
for pair := range shuffle { // get from map stage
if pair.key == "" {
workerCnt--
if workerCnt == 0 {
break
}
} else {
merge[pair.key] = append(merge[pair.key], pair.value) // merge in shuffle stage
}
}

To implement reduce stage, we just call reduce function. We can write reduce stage like this:

1
2
3
4
5
6
7
/* Frame-defined reducer */
func Reducer(config Config, keylist chan AggPair, collect chan Pair) {
for aggPair := range keylist {
collect <- config.reduceFunc(aggPair.key, aggPair.list) // to final stage
}
collect <- Pair{"", 0} // reducer finish
}

Serial Algorithm

We are given a set of documents. We can read them one by one, and scan the content to find words. Use a map (not the map in MapReduce) to store the number of occurrences of each word. After processing all documents, we’ve got the answer, and we just need to sort them in specified order.

Analysis and Comments

The main limitation is the hardware. For a single computer, if it has more threads, then it can run more mappers or reducers simultaneously, which will increase the speed. For a cluster of computers, increase the number of computers means more nodes, and it will also increase the speed.

When we increase the number of documents, the serial program surely needs more time to get the result. For the parallel program, if we do not increase the number of mappers, it will run slower because each mapper takes more time to finish. Otherwise, the running time of program will roughly remain invariant because the time for map stage will not change (we have more mappers), so as the time for reduce stage (the number of different words will possibly change very slightly).

In short, the larger the data we want to process, the more efficient the parallel program compared to the serial program.


Introduction to MapReduce
https://regmsif.cf/2021/07/03/coding/introduction-to-mapreduce/
作者
RegMs If
发布于
2021年7月3日
许可协议