6.824 Lab1 MapReduce


本实验要求我们构建一个分布式的 MapReduce 系统,并实现 paper 中提到的文章字数统计算法。

参考资料

实现

由于 paper 中已经给了算法流程图,只需要严格遵循该图即可。

Coordinator

其实就是 paper 中的 Master

因为不涉及 MapReduce 具体操作,总体而言比较简单。

结构体

Coordinator 负责任务调度以及维护任务状态,正如 paper 中提到的那样。同时,为了记录一个 Worker 是否超时未回应,需维护每个任务开始的时间戳。

于是结构体可以如下定义:

type TaskState int

const (
    IDLE TaskState = iota
    IN_PROGRESS
    COMPLETED
)

type Coordinator struct {
    files []string

    MapTask      []TaskState
    ReduceTask      []TaskState

    MapStartTimeStamp    []time.Time
    ReduceStartTimeStamp []time.Time

    M     int // total map tasks
    R     int // total reduce tasks
    Mcnt  int // completed map tasks
    Rcnt  int // completed reduce tasks

    State TaskType // current Execution State
    lock  sync.RWMutex
}

任务调度

根据规则 "reduces can't start until the last map has finished",任务调度应当基于当前 MapReduce 执行阶段,即在所有 map 任务被标记为 completed 后才能进行 reduce 任务的调度,否则 reduce Worker 将未能读取部分数据。

各个阶段中任务调度的思想都是一样的,收到一个 Arrange RPC 时,Coordinator 找出一个 idletimeout 的任务并分配;如果没有这样的任务,则需通知 Worker 等待。

func (m *Coordinator) Arrange(message *Args, reply *Reply) error {
    m.lock.Lock()
    defer m.lock.Unlock()

    if m.Rcnt == m.R { // all tasks completed
        reply.Over = true
        return nil
    }
    if m.State == MAP {
        for i := 0; i < m.M; i++ {
            // the task_i is as-yet-unstarted or time-out
            if m.MapTask[i] == IDLE || (m.MapTask[i] == IN_PROGRESS && time.Since(m.MapStartTimeStamp[i]) > 10*time.Second) {
                reply.Task = MAP
                reply.Wait = false
                reply.Filename = m.files[i]
                reply.R = m.R
                reply.MapTaskNumber = i
                reply.TimeStamp = time.Now()

                m.MapStartTimeStamp[i] = reply.TimeStamp
                m.MapTask[i] = IN_PROGRESS
                return nil
            }
       }} else if m.State == REDUCE {
        for i := 0; i < m.R; i++ {
            // the task_i is as-yet-unstarted or time-out
            if m.ReduceTask[i] == IDLE || (m.ReduceTask[i] == IN_PROGRESS && time.Since(m.ReduceStartTimeStamp[i]) > 10*time.Second) {
                reply.Task = REDUCE
                reply.M = m.M
                reply.ReduceTaskNumber = i
                reply.TimeStamp = time.Now()
                m.ReduceStartTimeStamp[i] = reply.TimeStamp
                m.ReduceTask[i] = IN_PROGRESS
                m.lock.Unlock()
                return nil
            }
        }
    }

    // no more as-yet-unstarted tasks
    reply.Wait = true
    return nil
}

完成通知

同样的,需要忽略那些过期回复。考虑这种场景,Coordinator 发给 WorkerA 的任务超时未完成,然后将该任务调度给 WorkerB,但从 WorkerA 的视角来看,它已经完成了该任务,并发送了完成通知,只不过该通知因为网络拥塞或其他原因过了很久才被 Coordinator 收到,Coordinator 如何判断这个通知是不是当前正在执行该任务的 Worker 发送的呢?一个思路是可以维护每个任务当前被哪个 Worker 执行,在这种情况下,任务的当前执行者为 B,那么收到 A 的通知后理所当然会将其忽略。

但还不足够,设想一下,如果 B 也因为同样原因超时,任务再一次调度给了 A 呢?虽然任务当前执行者为 A,但 A 并未完成,Coordinator 收到的是非常古老的那条消息,此时将任务设为 completed,显然会出现问题——A 并没有完成任务,尽管它"完成"过一次。

考虑到 Coordinator 结构体里维护了每个任务的开始时间戳,不妨令任务调度与完成通知中都捎带本次任务的 StartTime,这样就可以进行检查,如果一致则正确接收;反之视为过期消息。这样做减少了额外信息维护,也提高了安全性。

这其实就相当于分配了一个递增的版本号了。

// an RPC handler to tell the Coordinator that the worker finishes the task
func (m *Coordinator) Finished(message *Args, reply *Reply) error {
    m.lock.Lock()
    defer m.lock.Unlock()

    if message.Finished == MAP {
        if m.MapStartTimeStamp[message.MapTaskNumber].Sub(message.TimeStamp) != 0 {
            reply.Wait = true
            return nil
        }
        m.MapTask[message.MapTaskNumber] = COMPLETED
        m.Mcnt++
        if m.Mcnt == m.M {
            m.State = REDUCE
        }
    } else {
        if m.ReduceStartTimeStamp[message.ReduceTaskNumber].Sub(message.TimeStamp) != 0 {
            reply.Wait = true
            return nil
        }
        m.ReduceTask[message.ReduceTaskNumber] = COMPLETED
        m.Rcnt++
    }
    return nil
}

Worker

Worker 就涉及具体的 MapReduce 操作了,好在课程给的代码提供了 map/reduce 函数,我们只需要关注对输入输出的处理即可。在本系统中,任务的调度采用 Worker pull 而非 Coordinator push 的策略。Worker 需不断请求任务,然后根据回复内容执行对应的操作:

  1. 如果所有任务已结束,reply 告知 Over,关闭线程;

  2. 如果没有任务能分配,则调用 time.Sleep() 等待一段时间;

  3. 如果收到一个 map 任务,此时 reply 会捎带所要操作的文件名,然后对文件进行 map 操作(这里需要去 mrsequential.go 里参考一下代码)。由于它这里需要考虑线程崩溃,先把结果写到临时文件,全部写完后(说明没有发生 crash)再输出到 /mr-tmp 中,那么 mrsequential.go 里的代码就不能完全照搬了。

    我的做法是,开一个长度为 nReduce,类型为 []KeyValue 的切片 temp,其中 temp[i] 存放输出到 mr-X-i 中的所有 kv 对,先把所有 kv 对按照 key 的hash 值写到对应的 temp[hash(key) % nReduce] 里,再逐个写到临时文件中。

    示例提供的 json.Encoder() 在第二次及以后打开的时候不会在末尾添加,而是直接覆盖。

  4. reduce 任务大体类似,就是读取 mr-X-Y 文件。因为 Y 是固定的,所以 reply 要捎带 nMap。也是要对 mrsequential.go 里的代码进行一些修改。

    这里遇到一个坑点,和 paper 中所描述的产生了冲突。就是输出文件如果存在就不进行 os.Rename(),否则在 crash test 中会出现 mr-X-? 都有内容而 mr-out-? 没有内容的情况。合理猜测是读取 mr-X-Y 的时候出了点什么问题。

执行

func Worker(mapf func(string, string) []KeyValue,
            reducef func(string, []string) string) {

    // One way to get started is to modify mr/worker.go's Worker()
    // to send an RPC to the coordinator asking for a task.
    for {
        reply := AskForTask()
        if reply.Over {
            return
        }

        if reply.Wait {
            time.Sleep(WorkerWaitTime)
        } else if reply.Task == MAP {
            // map phase
            mapTaskNumber := reply.MapTaskNumber

            // read file
            file, err := os.Open(reply.Filename)
            if err != nil {
                log.Fatalf("cannot open %v", reply.Filename)
            }
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", reply.Filename)
            }
            file.Close()
            kvs := mapf(reply.Filename, string(content))

            // write kv into file buckets
            // key -> filename: "ihash(key)"
            temp := make([][]KeyValue, reply.R)
            for i := range temp {
                temp[i] = make([]KeyValue, 0)
            }

            for _, kv := range kvs {
                hash := ihash(kv.Key) % reply.R
                temp[hash] = append(temp[hash], kv)
            }

            for i := 0; i < len(temp); i++ {
                ofile, _ := ioutil.TempFile("./mr/mapfile", fmt.Sprintf("%d", i))
                enc := json.NewEncoder(ofile)
                for _, kv := range temp[i] {
                    enc.Encode(&kv)
                }

                old_path := ofile.Name()
                new_path := fmt.Sprintf("../main/mr-tmp/mr-%d-%d", mapTaskNumber, i)

                os.Rename(old_path, new_path)
                ofile.Close()
      }

            // tell the master that the map job is done
            CallFinish(MAP, reply.TimeStamp, mapTaskNumber, 0)
        } else {
            // reduce phase
            reduceTaskNumber := reply.ReduceTaskNumber

            // read file
            ofile, _ := ioutil.TempFile("./mr/reducefile", fmt.Sprintf("%d", reduceTaskNumber))
            var kva []KeyValue
            for i := 0; i < reply.M; i++ {
                iFilename := fmt.Sprintf("../main/mr-tmp/mr-%d-%d", i, reduceTaskNumber)
                iFile, err := os.Open(iFilename)
                if err == nil {
                    dec := json.NewDecoder(iFile)
                    for {
                        var kv KeyValue
                        if err := dec.Decode(&kv); err != nil {
                            break
                        }
                        kva = append(kva, kv)
                    }
                } else {
                    log.Fatal(err)
                }
            }
            sort.Sort(ByKey(kva))
            i := 0
            for i < len(kva) {
                j := i + 1
                for j < len(kva) && kva[j].Key == kva[i].Key {
                    j++
                }
                values := []string{}
                for k := i; k < j; k++ {
                    values = append(values, kva[k].Value)
                }
                output := reducef(kva[i].Key, values)

                // this is the correct format for each line of Reduce output.
                fmt.Fprintf(ofile, "%v %v\n", kva[i].Key, output)

                i = j
            }

            old_path := ofile.Name()
            new_path := fmt.Sprintf("../main/mr-tmp/mr-out-%d", reduceTaskNumber)

            // no reduce task finished yet before
            if _, err := os.Open(new_path); err != nil {
                os.Rename(old_path, new_path)
            }
            ofile.Close()

            // tell the master that the reduce job is done
            CallFinish(REDUCE, reply.TimeStamp, 0, reduceTaskNumber)
    }
  }
}

RPC

综上所述,RPC 的结构体定义就呼之欲出了。

type Args struct {
  Finished TaskType

  TimeStamp time.Time

  MapTaskNumber    int
  ReduceTaskNumber int
}

type Reply struct {
  Task TaskType
  Wait bool // true for wait
  Over bool // true for done

  Filename string
  M        int
  R        int

  MapTaskNumber    int
  ReduceTaskNumber int

  TimeStamp time.Time
}

 上一篇
GFS 论文阅读 GFS 论文阅读
6.824 的第二篇论文是 Google "三驾马车"之一的 GFS(Google File System)——一个用于大型分布式数据密集型应用程序的可扩展分布式文件系统。
2022-09-25
下一篇 
MapReduce 论文阅读 MapReduce 论文阅读
6.824 的第一篇论文是 Google "三驾马车"之一的 MapReduce——一种基于分治策略,用于处理和生成大型数据集的模型,且许多现实世界的任务都可以在此模型中表达。
2022-09-22
  目录