MIT 6.824 Lab 1: MapReduce 试验总结

  1. 需要注意的是,在试验前,推荐先行阅读MapReduce论文,及MIT6.824第一章节关于MapReduce的介绍。

  2. 该实验不推荐在Win上实现!

  3. 本实验并非完整的MapReduce的实现,而是给出Map和Reduce函数(WordCount) 以及RPC等相关操作,要求模仿给定的Example的样子,实现master和worker互相通讯完成单词统计任务。
  4. 推荐先行了解GRPC调用过程和Go 动态库编译。

对官方案例的分析

1
2
3
4
5
git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824
cd 6.824
cd src/main
go build -buildmode=plugin ../mrapps/wc.go
go run mrsequential.go wc.so pg*.txt
  1. 首先可以看到的是,官方以及给定了很朴素的map和reduce函数,即../mrapps/wc.go文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//
// 每个输入文件都会调用一次 map 函数。 第一个参数是输入文件的名称,第二个参数是文件的完整内容。 您应该忽略输入文件名,只查看内容参数。 返回值是一段键/值对
//
func Map(filename string, contents string) []mr.KeyValue {
// function to detect word separators.
ff := func(r rune) bool { return !unicode.IsLetter(r) }

// split contents into an array of words.
words := strings.FieldsFunc(contents, ff)

kva := []mr.KeyValue{}
for _, w := range words {
kv := mr.KeyValue{w, "1"}
kva = append(kva, kv)
}
return kva
}

//
// 为 map 任务生成的每个键调用一次 reduce 函数,其中包含任何 map 任务为该键创建的所有值的列表
//
func Reduce(key string, values []string) string {
// return the number of occurrences of this word.
return strconv.Itoa(len(values))
}
  1. 主函数mrsequential.go,只是简单的模拟运行了Map和Reduce函数(读取文件,map、写入中间文件、reduce读取中间文件、调用reduce函数、写入结果),其中并不涉及master和worker的通讯和map、reduce之间shuffle的过程,这是我们后面试验的重点!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main

//
// simple sequential MapReduce.
//
// go run mrsequential.go wc.so pg*.txt
//

import "fmt"
import "../mr"
import "plugin"
import "os"
import "log"
import "io/ioutil"
import "sort"

// for sorting by key.
type ByKey []mr.KeyValue

// KeyValue数组的排序策略
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func main() {
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
os.Exit(1)
}
// 动态加载map reduce函数
mapf, reducef := loadPlugin(os.Args[1])

//
// read each input file,
// pass it to Map,
// accumulate the intermediate Map output.read each input file,
// pass it to Map,
// accumulate the intermediate Map output.
// 读取每一个文件和内容,生成中间文件
//

// 此处由于是示范,直接将所有的kv对放到一个数组中不做shuffle操作。
intermediate := []mr.KeyValue{}
for _, filename := range os.Args[2:] {
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
// 调用map函数 生成 KeyValue数组
kva := mapf(filename, string(content))
intermediate = append(intermediate, kva...)
}

//
// a big difference from real MapReduce is that all the
// intermediate data is in one place, intermediate[],
// rather than being partitioned into NxM buckets.
//

// 读取完kv直接排序
sort.Sort(ByKey(intermediate))

// 全部reduce后输出到此文件中
oname := "mr-out-0"
ofile, _ := os.Create(oname)

//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
// 将k,[v...]给reduce函数汇总
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()
}

// 关于动态调用Map Reduce的过程
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
}
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map in %v", filename)
}
mapf := xmapf.(func(string, string) []mr.KeyValue)
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce in %v", filename)
}
reducef := xreducef.(func(string, []string) string)

return mapf, reducef
}

ok,分析完官方案例的流程,我们来设更加完整的MapReduce。

对MapReduce设计的思考

官方给定的条件

  1. 不使用刚才的mrsequential.go直接完成整个过程,而是分成master和worker分开负责。

    1. 其中mrmaster.go负责给输入并启动master, 如:go run mrmaster.go pg-*.txt

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      func main() {
      if len(os.Args) < 2 {
      fmt.Fprintf(os.Stderr, "Usage: mrmaster inputfiles...\n")
      os.Exit(1)
      }
      // 启动master节点,监听网络,分发任务
      m := mr.MakeMaster(os.Args[1:], 10)
      for m.Done() == false {
      time.Sleep(time.Second)
      }

      time.Sleep(time.Second)
      }
    2. mrworker.go负责启动worker节点,申请执行map和reduce任务,并回传状态。如: go run mrworker.go wc.so

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      func main() {
      if len(os.Args) != 2 {
      fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
      os.Exit(1)
      }

      mapf, reducef := loadPlugin(os.Args[1])

      // 核心在这里,调用mr.Worker函数
      mr.Worker(mapf, reducef)
      }

      // 加载Map 和 Reduce函数
      // load the application Map and Reduce functions
      // from a plugin file, e.g. ../mrapps/wc.so
      //
      func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
      p, err := plugin.Open(filename)
      if err != nil {
      log.Fatalf("cannot load plugin %v", filename)
      }
      xmapf, err := p.Lookup("Map")
      if err != nil {
      log.Fatalf("cannot find Map in %v", filename)
      }
      mapf := xmapf.(func(string, string) []mr.KeyValue)
      xreducef, err := p.Lookup("Reduce")
      if err != nil {
      log.Fatalf("cannot find Reduce in %v", filename)
      }
      reducef := xreducef.(func(string, []string) string)

      return mapf, reducef
      }
    3. 其中RPC网络调用部分,已经被实现如master的server和worker的call方法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      //
      // start a thread that listens for RPCs from worker.go
      //
      func (m *Master) server() {
      rpc.Register(m)
      rpc.HandleHTTP()
      //l, e := net.Listen("tcp", ":1234")
      sockname := masterSock()
      os.Remove(sockname)
      l, e := net.Listen("unix", sockname)
      if e != nil {
      log.Fatal("listen error:", e)
      }
      go http.Serve(l, nil)
      }


      //
      // send an RPC request to the master, wait for the response.
      // usually returns true.
      // returns false if something goes wrong.
      //
      func call(rpcname string, args interface{}, reply interface{}) bool {
      // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
      sockname := masterSock()
      c, err := rpc.DialHTTP("unix", sockname)
      if err != nil {
      log.Fatal("dialing:", err)
      }
      defer c.Close()

      err = c.Call(rpcname, args, reply)
      if err == nil {
      return true
      }

      fmt.Println(err)
      return false
      }

我们需要实现那些功能,在刚才的基础上?

  1. 将map和reduce交给worker节点完成,master只负责调用和记录。
  2. 由于有N个worker,所以我们必须考虑中间文件存储和分发的过程,即shuffle的过程。由于没有GFS文件系统,只能在本地文件系统实现,所以我们将每一个map都按map任务id % N + 分桶id进行存储,即一个map会产生N个中间文件。reduce时,每个reduce将所有以分桶id == reduce id的进行汇总。
  3. master和worker之间通过rpc通讯。

需要我们实现的地方

  1. master 定义RPC通讯 的状态

    1
    2
    3
    type Master struct {
    // Your definitions here.
    }
  2. master 定义worker申请任务的处理

    1
    func (m *Master) Work(args *WorkArgs,reply *WorkReply) error{}
  3. master 定义worker完成任务的处理

    1
    func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {}
  4. master启动时,初始状态的定义

    1
    2
    3
    4
    5
    func MakeMaster(files []string, nReduce int) *Master {
    m := Master{}
    m.server()
    return &m
    }
  5. woker申请执行MapReduce任务

    1
    2
    3
    4
    func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {

    }
  6. rpc 定义传输的参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    type WorkArgs struct {
    WorkerId string
    }

    type WorkReply struct {
    IsFinished bool

    TaskId int
    Filename string
    MapReduce string
    BucketNumber int
    }

    type CommitArgs struct {
    WorkerId string
    TaskId int
    MapReduce string
    }

    type CommitReply struct {
    IsOK bool
    }

具体实现

  • rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

//
// example to show how to declare the arguments
// and reply for an RPC.
//

type ExampleArgs struct {
X int
}

type ExampleReply struct {
Y int
}

// Add your RPC definitions here.

// work作业的参数
type WorkArgs struct {
WorkerId string // worker随机生成作业id,用作唯一标识
}
// work作业的回复
type WorkReply struct {
IsFinished bool

TaskId int //master分配的任务id,第几个worker
Filename string //需要处理的文件名
MapReduce string //该任务是map还是reduce任务,直接用string表示
BucketNumber int //中间文件的分桶数,设置等于worker的个数,n个worker n个map,n个中间文件,n个reduce程序
}
// 完成作业的参数
type CommitArgs struct {
WorkerId string
TaskId int
MapReduce string
}
// 完成作业的回复
type CommitReply struct {
IsOK bool
}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
s := "/var/tmp/824-mr-"
s += strconv.Itoa(os.Getuid())
return s
}
  • master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package mr

import (
"context"
"errors"
"log"
"sync"
"time"
)
import "net"
import "os"
import "net/rpc"
import "net/http"

// work运行状态,空闲、工作、完成
const(
TaskIdle = iota
TaskWorking
TaskCommit
)


type Master struct {
// Your definitions here.
// 文件列表
files []string
// worker数量
nReduce int

// map和reduce作业的完成情况
mapTasks []int
reduceTasks []int
// map作业完成情况
mapCount int

// worker完成情况
workerCommit map[string]int
allCommited bool
// worker超时时间
timeout time.Duration
// 资源锁
mu sync.RWMutex
}

// Your code here -- RPC handlers for the worker to call.
// 申请任务
func (m *Master) Work(args *WorkArgs,reply *WorkReply) error{
m.mu.Lock()
defer m.mu.Unlock()

// 分配 map 工作
for k,v := range m.files{
if m.mapTasks[k] != TaskIdle{
continue
}
// RPC通过这些参数的改变传递消息
reply.TaskId = k
reply.Filename = v
reply.MapReduce = "map"
reply.BucketNumber = m.nReduce
reply.IsFinished = false
m.workerCommit[args.WorkerId] = TaskWorking
m.mapTasks[k] = TaskWorking

ctx,_ := context.WithTimeout(context.Background(),m.timeout)
go func(){
select {
case <-ctx.Done():{
m.mu.Lock()
// 超时的work 进行处理
if m.workerCommit[args.WorkerId] != TaskCommit && m.mapTasks[k] != TaskCommit{
m.mapTasks[k] = TaskIdle
log.Println("[Error]:", "worker:", args.WorkerId, "map task:", k, "timeout")
}
m.mu.Unlock()
}
}
}()
return nil
}

// 分配reduce 任务
for k, v := range m.reduceTasks {
if m.mapCount != len(m.files) {
return nil
}
if v != TaskIdle {
continue
}

reply.TaskId = k
reply.Filename = ""
reply.MapReduce = "reduce"
reply.BucketNumber = len(m.files)
reply.IsFinished = false
m.workerCommit[args.WorkerId] = TaskWorking
m.reduceTasks[k] = TaskWorking

ctx, _ := context.WithTimeout(context.Background(), m.timeout)
go func() {
select {
case <-ctx.Done():
{
m.mu.Lock()
if m.workerCommit[args.WorkerId] != TaskCommit && m.reduceTasks[k] != TaskCommit {
m.reduceTasks[k] = TaskIdle
log.Println("[Error]:", "worker:", args.WorkerId, "reduce task:", k, "timeout")
}
m.mu.Unlock()
}
}
}()

log.Println("a worker", args.WorkerId, "apply a reduce task:", *reply)

return nil
}

// 检查是否都完成了
for _,v := range m.workerCommit{
if v == TaskWorking{
reply.IsFinished = false
return nil
}
}
// 完成任务
reply.IsFinished = true
return errors.New("worker apply but no tasks to dispatch")
}

// 申请完成任务
func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {
log.Println("a worker", args.WorkerId, "commit a "+args.MapReduce+" task:", args.TaskId)
m.mu.Lock()

switch args.MapReduce {
case "map" : {
m.mapTasks[args.TaskId] = TaskCommit
m.workerCommit[args.WorkerId] = TaskCommit
m.mapCount++
}
case "reduce" :{
m.reduceTasks[args.TaskId] = TaskCommit
m.workerCommit[args.WorkerId] = TaskCommit
}
}
m.mu.Unlock()

log.Println("current", m.mapTasks, m.reduceTasks)

// 检查map reduce是否运行完成
for _,v := range m.mapTasks{
if v != TaskCommit{
return nil
}
}

for _,v := range m.reduceTasks{
if v != TaskCommit{
return nil
}
}

m.allCommited = true
log.Println("all tasks completed")
return nil
}

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (m *Master) Example(args *ExampleArgs, reply *ExampleReply) error {
log.Println("a worker")
reply.Y = args.X + 1
return nil
}


//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
// Your code here.
return m.allCommited
}

//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {
// Your code here.
m := Master{
files: files,
nReduce: nReduce,
mapTasks: make([]int, len(files)),
reduceTasks: make([]int, nReduce),
workerCommit: make(map[string]int),
allCommited: false,
timeout: 10 * time.Second,
}

log.Println("[init] with:", files, nReduce)

m.server()
return &m
}
  • worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package mr

import (
"crypto/rand"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"sort"
"strconv"
"time"
)
import "log"
import "net/rpc"
import "hash/fnv"


//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}

// 生成 workid
func genWorkerID() (uuid string) {
// generate 32 bits timestamp
unix32bits := uint32(time.Now().UTC().Unix())

buff := make([]byte, 12)

numRead, err := rand.Read(buff)

if numRead != len(buff) || err != nil {
panic(err)
}
return fmt.Sprintf("%x-%x-%x-%x-%x-%x\n", unix32bits, buff[0:2], buff[2:4], buff[4:6], buff[6:8], buff[8:])
}


//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {

// Your worker implementation here.

// uncomment to send the Example RPC to the master.
// CallExample()
workId := genWorkerID()
retry := 3 // 任务完成后,再多询问retry次,增加容错性


for{
args := WorkArgs{WorkerId: workId}
reply := WorkReply{}
// 申请 task
working := call("Master.Work", &args, &reply)

if reply.IsFinished || !working{
log.Println("finished")
return
}
log.Println("task info:", reply)
// 执行 task
switch reply.MapReduce{
case "map":
MapWork(reply, mapf)
retry = 3
case "reduce":
ReduceWork(reply, reducef)
retry = 3
default:
// 延时关闭,防止错误
log.Println("error reply: would retry times:", retry)
if retry < 0 {
return
}
retry--
}

// 回复任务完成
commitArgs := CommitArgs{WorkerId:workId,TaskId:reply.TaskId,MapReduce: reply.MapReduce}
commitReply := CommitReply{}

call("Master.Commit",&commitArgs,&commitReply)

time.Sleep(500 * time.Millisecond)
}
}

// 执行map任务
func MapWork(task WorkReply,mapf func(string,string) []KeyValue){
// check task info
file,err := os.Open(task.Filename)
if err != nil {
log.Fatalf("cannot open %v", task.Filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", task.Filename)
}
// 正式调用 map 方法
kva := mapf(task.Filename, string(content))

sort.Sort(ByKey(kva))

// create file buckets 生成 nReduce 个 中间文件
tmpName := "mr-tmp-" + strconv.Itoa(task.TaskId)
var fileBucket = make(map[int]*json.Encoder)
for i := 0; i < task.BucketNumber; i++ {
ofile, _ := os.Create(tmpName + "-" + strconv.Itoa(i))
fileBucket[i] = json.NewEncoder(ofile)
defer ofile.Close()
}
for _, kv := range kva {
key := kv.Key
reduce_idx := ihash(key) % task.BucketNumber
err := fileBucket[reduce_idx].Encode(&kv)
if err != nil {
log.Fatal("Unable to write to file")
}
}
}

// 执行reduce任务
// get reduce task and reduce all reduce id = task.Taskid files.
func ReduceWork(task WorkReply, reducef func(string, []string) string) {
intermediate := []KeyValue{}

// read mr-tmp n files to add inermediate then write reduce获取自己index的中间文件 如:mr-tmp-*-1
for mapTaskNumber := 0; mapTaskNumber < task.BucketNumber;mapTaskNumber++ {
fileName := "mr-tmp-" + strconv.Itoa(mapTaskNumber) + "-" + strconv.Itoa(task.TaskId)
f, err := os.Open(fileName)
if err != nil {
log.Fatal("Unable to read from: ", fileName)
}
defer f.Close()
decoder := json.NewDecoder(f)
var kv KeyValue
for decoder.More() {
err := decoder.Decode(&kv)
if err != nil {
log.Fatal("Json decode failed, ", err)
}
intermediate = append(intermediate, kv)
}
}
sort.Sort(ByKey(intermediate))

// write to "mr-out-Y" Y is reduce task id
ofile, err := os.Create("mr-out-" + strconv.Itoa(task.TaskId+1))
if err != nil {
log.Fatal("Unable to create file: ", ofile)
}
defer ofile.Close()

log.Println("complete to ", task.TaskId, "start to write in to ", ofile)

i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}
ofile.Close()
}



//
// example function to show how to make an RPC call to the master.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

// declare an argument structure.
args := ExampleArgs{}

// fill in the argument(s).
args.X = 99

// declare a reply structure.
reply := ExampleReply{}

// send the RPC request, wait for the reply.
call("Master.Example", &args, &reply)

// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
}

//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()

err = c.Call(rpcname, args, reply)
if err == nil {
return true
}

fmt.Println(err)
return false
}

测试

修改mr文件夹中上述三个文件的代码。

在main目录下

  1. 编译动态链接

    1
    go build -buildmode=plugin ../mrapps/wc.go
  2. 运行master server

    1
    go run mrmaster.go pg-*.txt
  3. 运行worker节点

    1
    go run mrworker.go wc.so
  4. 查看输出的结果

    1
    cat mr-out-* | sort | more
  5. 运行官方的测试脚本

    1
    sh ./test-mr.sh

总结:
其实写的代码不多,主要是在原有基础上进行修改。难点在对RPC调用和master分配任务,worker执行MapReduce以及对中间文件的读写操作、作业资源的并发控制。
理清楚传递的状态和master自身的状态后,就只有几个函数需要实现。

参考资料:

试验地址

MIT6.824 Lab 1 解决方案记录

MIT6.824课程总结

MapReduce论文