MIT6.824-MapReduce实验总结
MIT 6.824 Lab 1: MapReduce 试验总结
需要注意的是,在试验前,推荐先行阅读MapReduce论文,及MIT6.824第一章节关于MapReduce的介绍。
该实验不推荐在Win上实现!
- 本实验并非完整的MapReduce的实现,而是给出Map和Reduce函数(WordCount) 以及RPC等相关操作,要求模仿给定的Example的样子,实现master和worker互相通讯完成单词统计任务。
- 推荐先行了解GRPC调用过程和Go 动态库编译。
对官方案例的分析
1 | git clone git://g.csail.mit.edu/6.824-golabs-2020 6.824 |
- 首先可以看到的是,官方以及给定了很朴素的map和reduce函数,即
../mrapps/wc.go
文件
1 | // |
- 主函数
mrsequential.go
,只是简单的模拟运行了Map和Reduce函数(读取文件,map、写入中间文件、reduce读取中间文件、调用reduce函数、写入结果),其中并不涉及master和worker的通讯和map、reduce之间shuffle的过程,这是我们后面试验的重点!
1 | package main |
ok,分析完官方案例的流程,我们来设更加完整的MapReduce。
对MapReduce设计的思考
官方给定的条件
不使用刚才的
mrsequential.go
直接完成整个过程,而是分成master和worker分开负责。其中
mrmaster.go
负责给输入并启动master, 如:go run mrmaster.go pg-*.txt
1
2
3
4
5
6
7
8
9
10
11
12
13func main() {
if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr, "Usage: mrmaster inputfiles...\n")
os.Exit(1)
}
// 启动master节点,监听网络,分发任务
m := mr.MakeMaster(os.Args[1:], 10)
for m.Done() == false {
time.Sleep(time.Second)
}
time.Sleep(time.Second)
}mrworker.go
负责启动worker节点,申请执行map和reduce任务,并回传状态。如:go run mrworker.go wc.so
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
os.Exit(1)
}
mapf, reducef := loadPlugin(os.Args[1])
// 核心在这里,调用mr.Worker函数
mr.Worker(mapf, reducef)
}
// 加载Map 和 Reduce函数
// load the application Map and Reduce functions
// from a plugin file, e.g. ../mrapps/wc.so
//
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
}
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map in %v", filename)
}
mapf := xmapf.(func(string, string) []mr.KeyValue)
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce in %v", filename)
}
reducef := xreducef.(func(string, []string) string)
return mapf, reducef
}其中RPC网络调用部分,已经被实现如master的
server
和worker的call
方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
rpc.Register(m)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := masterSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := masterSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
我们需要实现那些功能,在刚才的基础上?
- 将map和reduce交给worker节点完成,master只负责调用和记录。
- 由于有N个worker,所以我们必须考虑中间文件存储和分发的过程,即shuffle的过程。由于没有GFS文件系统,只能在本地文件系统实现,所以我们将每一个map都按map任务id % N + 分桶id进行存储,即一个map会产生N个中间文件。reduce时,每个reduce将所有以分桶id == reduce id的进行汇总。
- master和worker之间通过rpc通讯。
需要我们实现的地方
master 定义RPC通讯 的状态
1
2
3type Master struct {
// Your definitions here.
}master 定义worker申请任务的处理
1
func (m *Master) Work(args *WorkArgs,reply *WorkReply) error{}
master 定义worker完成任务的处理
1
func (m *Master) Commit(args *CommitArgs, reply *CommitReply) error {}
master启动时,初始状态的定义
1
2
3
4
5func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
m.server()
return &m
}woker申请执行MapReduce任务
1
2
3
4func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
}rpc 定义传输的参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22type WorkArgs struct {
WorkerId string
}
type WorkReply struct {
IsFinished bool
TaskId int
Filename string
MapReduce string
BucketNumber int
}
type CommitArgs struct {
WorkerId string
TaskId int
MapReduce string
}
type CommitReply struct {
IsOK bool
}
具体实现
- rpc.go
1 | package mr |
- master.go
1 | package mr |
- worker.go
1 | package mr |
测试
修改mr文件夹中上述三个文件的代码。
在main目录下
编译动态链接
1
go build -buildmode=plugin ../mrapps/wc.go
运行master server
1
go run mrmaster.go pg-*.txt
运行worker节点
1
go run mrworker.go wc.so
查看输出的结果
1
cat mr-out-* | sort | more
运行官方的测试脚本
1
sh ./test-mr.sh
总结:
其实写的代码不多,主要是在原有基础上进行修改。难点在对RPC调用和master分配任务,worker执行MapReduce以及对中间文件的读写操作、作业资源的并发控制。
理清楚传递的状态和master自身的状态后,就只有几个函数需要实现。
参考资料:
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Kid1999' Blog!