本实验要求我们构建一个分布式的 MapReduce 系统,并实现 paper 中提到的文章字数统计算法。
参考资料
实现
由于 paper 中已经给了算法流程图,只需要严格遵循该图即可。
Coordinator
其实就是 paper 中的 Master
因为不涉及 MapReduce 具体操作,总体而言比较简单。
结构体
Coordinator 负责任务调度以及维护任务状态,正如 paper 中提到的那样。同时,为了记录一个 Worker 是否超时未回应,需维护每个任务开始的时间戳。
于是结构体可以如下定义:
type TaskState int
const (
IDLE TaskState = iota
IN_PROGRESS
COMPLETED
)
type Coordinator struct {
files []string
MapTask []TaskState
ReduceTask []TaskState
MapStartTimeStamp []time.Time
ReduceStartTimeStamp []time.Time
M int // total map tasks
R int // total reduce tasks
Mcnt int // completed map tasks
Rcnt int // completed reduce tasks
State TaskType // current Execution State
lock sync.RWMutex
}
任务调度
根据规则 "reduces can't start until the last map has finished",任务调度应当基于当前 MapReduce 执行阶段,即在所有 map 任务被标记为 completed
后才能进行 reduce 任务的调度,否则 reduce Worker 将未能读取部分数据。
各个阶段中任务调度的思想都是一样的,收到一个 Arrange RPC 时,Coordinator 找出一个 idle
或 timeout
的任务并分配;如果没有这样的任务,则需通知 Worker 等待。
func (m *Coordinator) Arrange(message *Args, reply *Reply) error {
m.lock.Lock()
defer m.lock.Unlock()
if m.Rcnt == m.R { // all tasks completed
reply.Over = true
return nil
}
if m.State == MAP {
for i := 0; i < m.M; i++ {
// the task_i is as-yet-unstarted or time-out
if m.MapTask[i] == IDLE || (m.MapTask[i] == IN_PROGRESS && time.Since(m.MapStartTimeStamp[i]) > 10*time.Second) {
reply.Task = MAP
reply.Wait = false
reply.Filename = m.files[i]
reply.R = m.R
reply.MapTaskNumber = i
reply.TimeStamp = time.Now()
m.MapStartTimeStamp[i] = reply.TimeStamp
m.MapTask[i] = IN_PROGRESS
return nil
}
}} else if m.State == REDUCE {
for i := 0; i < m.R; i++ {
// the task_i is as-yet-unstarted or time-out
if m.ReduceTask[i] == IDLE || (m.ReduceTask[i] == IN_PROGRESS && time.Since(m.ReduceStartTimeStamp[i]) > 10*time.Second) {
reply.Task = REDUCE
reply.M = m.M
reply.ReduceTaskNumber = i
reply.TimeStamp = time.Now()
m.ReduceStartTimeStamp[i] = reply.TimeStamp
m.ReduceTask[i] = IN_PROGRESS
m.lock.Unlock()
return nil
}
}
}
// no more as-yet-unstarted tasks
reply.Wait = true
return nil
}
完成通知
同样的,需要忽略那些过期回复。考虑这种场景,Coordinator 发给 WorkerA 的任务超时未完成,然后将该任务调度给 WorkerB,但从 WorkerA 的视角来看,它已经完成了该任务,并发送了完成通知,只不过该通知因为网络拥塞或其他原因过了很久才被 Coordinator 收到,Coordinator 如何判断这个通知是不是当前正在执行该任务的 Worker 发送的呢?一个思路是可以维护每个任务当前被哪个 Worker 执行,在这种情况下,任务的当前执行者为 B,那么收到 A 的通知后理所当然会将其忽略。
但还不足够,设想一下,如果 B 也因为同样原因超时,任务再一次调度给了 A 呢?虽然任务当前执行者为 A,但 A 并未完成,Coordinator 收到的是非常古老的那条消息,此时将任务设为 completed
,显然会出现问题——A 并没有完成任务,尽管它"完成"过一次。
考虑到 Coordinator 结构体里维护了每个任务的开始时间戳,不妨令任务调度与完成通知中都捎带本次任务的 StartTime,这样就可以进行检查,如果一致则正确接收;反之视为过期消息。这样做减少了额外信息维护,也提高了安全性。
这其实就相当于分配了一个递增的版本号了。
// an RPC handler to tell the Coordinator that the worker finishes the task
func (m *Coordinator) Finished(message *Args, reply *Reply) error {
m.lock.Lock()
defer m.lock.Unlock()
if message.Finished == MAP {
if m.MapStartTimeStamp[message.MapTaskNumber].Sub(message.TimeStamp) != 0 {
reply.Wait = true
return nil
}
m.MapTask[message.MapTaskNumber] = COMPLETED
m.Mcnt++
if m.Mcnt == m.M {
m.State = REDUCE
}
} else {
if m.ReduceStartTimeStamp[message.ReduceTaskNumber].Sub(message.TimeStamp) != 0 {
reply.Wait = true
return nil
}
m.ReduceTask[message.ReduceTaskNumber] = COMPLETED
m.Rcnt++
}
return nil
}
Worker
Worker 就涉及具体的 MapReduce 操作了,好在课程给的代码提供了 map/reduce
函数,我们只需要关注对输入输出的处理即可。在本系统中,任务的调度采用 Worker pull 而非 Coordinator push 的策略。Worker 需不断请求任务,然后根据回复内容执行对应的操作:
如果所有任务已结束,reply 告知 Over,关闭线程;
如果没有任务能分配,则调用
time.Sleep()
等待一段时间;如果收到一个 map 任务,此时 reply 会捎带所要操作的文件名,然后对文件进行 map 操作(这里需要去 mrsequential.go 里参考一下代码)。由于它这里需要考虑线程崩溃,先把结果写到临时文件,全部写完后(说明没有发生 crash)再输出到
/mr-tmp
中,那么 mrsequential.go 里的代码就不能完全照搬了。我的做法是,开一个长度为 nReduce,类型为 []KeyValue 的切片
temp
,其中temp[i]
存放输出到mr-X-i
中的所有 kv 对,先把所有 kv 对按照 key 的hash 值写到对应的temp[hash(key) % nReduce]
里,再逐个写到临时文件中。示例提供的 json.Encoder() 在第二次及以后打开的时候不会在末尾添加,而是直接覆盖。
reduce 任务大体类似,就是读取
mr-X-Y
文件。因为 Y 是固定的,所以 reply 要捎带 nMap。也是要对 mrsequential.go 里的代码进行一些修改。这里遇到一个坑点,和 paper 中所描述的产生了冲突。就是输出文件如果存在就不进行
os.Rename()
,否则在 crash test 中会出现mr-X-?
都有内容而mr-out-?
没有内容的情况。合理猜测是读取 mr-X-Y 的时候出了点什么问题。
执行
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// One way to get started is to modify mr/worker.go's Worker()
// to send an RPC to the coordinator asking for a task.
for {
reply := AskForTask()
if reply.Over {
return
}
if reply.Wait {
time.Sleep(WorkerWaitTime)
} else if reply.Task == MAP {
// map phase
mapTaskNumber := reply.MapTaskNumber
// read file
file, err := os.Open(reply.Filename)
if err != nil {
log.Fatalf("cannot open %v", reply.Filename)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", reply.Filename)
}
file.Close()
kvs := mapf(reply.Filename, string(content))
// write kv into file buckets
// key -> filename: "ihash(key)"
temp := make([][]KeyValue, reply.R)
for i := range temp {
temp[i] = make([]KeyValue, 0)
}
for _, kv := range kvs {
hash := ihash(kv.Key) % reply.R
temp[hash] = append(temp[hash], kv)
}
for i := 0; i < len(temp); i++ {
ofile, _ := ioutil.TempFile("./mr/mapfile", fmt.Sprintf("%d", i))
enc := json.NewEncoder(ofile)
for _, kv := range temp[i] {
enc.Encode(&kv)
}
old_path := ofile.Name()
new_path := fmt.Sprintf("../main/mr-tmp/mr-%d-%d", mapTaskNumber, i)
os.Rename(old_path, new_path)
ofile.Close()
}
// tell the master that the map job is done
CallFinish(MAP, reply.TimeStamp, mapTaskNumber, 0)
} else {
// reduce phase
reduceTaskNumber := reply.ReduceTaskNumber
// read file
ofile, _ := ioutil.TempFile("./mr/reducefile", fmt.Sprintf("%d", reduceTaskNumber))
var kva []KeyValue
for i := 0; i < reply.M; i++ {
iFilename := fmt.Sprintf("../main/mr-tmp/mr-%d-%d", i, reduceTaskNumber)
iFile, err := os.Open(iFilename)
if err == nil {
dec := json.NewDecoder(iFile)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
} else {
log.Fatal(err)
}
}
sort.Sort(ByKey(kva))
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)
i = j
}
old_path := ofile.Name()
new_path := fmt.Sprintf("../main/mr-tmp/mr-out-%d", reduceTaskNumber)
// no reduce task finished yet before
if _, err := os.Open(new_path); err != nil {
os.Rename(old_path, new_path)
}
ofile.Close()
// tell the master that the reduce job is done
CallFinish(REDUCE, reply.TimeStamp, 0, reduceTaskNumber)
}
}
}
RPC
综上所述,RPC 的结构体定义就呼之欲出了。
type Args struct {
Finished TaskType
TimeStamp time.Time
MapTaskNumber int
ReduceTaskNumber int
}
type Reply struct {
Task TaskType
Wait bool // true for wait
Over bool // true for done
Filename string
M int
R int
MapTaskNumber int
ReduceTaskNumber int
TimeStamp time.Time
}