「实验记录」MIT 6.824 Lab1 MapReduce

发布时间 2023-04-24 13:45:43作者: 士全

I. Source

  1. MIT-6.824 2020 课程官网
  2. Lab1: MapReduce 实验主页
  3. simviso 精品付费翻译 MIT 6.824 课程

II. My Code

  1. source code 的 Gitee 地址
  2. Lab1: MapReduce 的 Gitee 地址

课程官网提供的 Lab 代码下载地址,我没有访问成功,于是我从 Github 其他用户那里 clone 到干净的源码,有需要可以访问我的 Gitee 获取

III. Motivation

MapReduce 是经典的分布式计算模型,想要搞清楚何如 MapReduce?首先需要明白何为分布式计算模型以及为什么需要分布式计算模式

不论做什么事,最重要的是有 Motivation!那么问题来了,我们为什么需要分布式计算模型呢?难道单线程的处理程序不行嘛?

答案是肯定的,仅有单线程程序还是不够的,特别是在数据量很大的情况下。举个例子,现在需要对一个 1 MB 的 .txt 文件进行词频统计,在单线程情况下,一眨眼的功夫就能完成,表现还算优异;但如果需要统计 1 GB 的 .txt 文件,那么可能会等上一段时间才能完成,可能是 10 s,也可能更久

图1. 单线程词频统计 1 MB .txt

现在需求来了,我们希望加快解析速度,使统计 1 GB 的 .txt 也像 1 MB 那样快?能不能做到呢?同样,答案也是肯定的

我们现在手头上有很多闲置的机器(在现代硬件资源过剩的情况下很常见),可不可以把这些闲置的机器也派上用场呢,让它们也参与到解析 1 GB 大文件的流程中

这就引出了分布式计算模式的概念,将 1 GB 的 .txt 切割成 1000 份 1 MB 的 .txt,然后分发给集群中的闲置机器进行并行处理,最后再合并每台机器的子文件处理结果,得到 1 GB 的 .txt 文件最终的词频统计结果

图2. 多线程词频统计 1 GB .txt

这是个不错的主意,是分布式计算模式的雏形。虽然这种方法可行,但是还是不够快,没有将分布式的特点发挥到淋漓尽致

其实,我们完全可以针对上述词频统计问题进行再一次的划分,把词频统计任务分为单词输出任务( MapTask )和单词计数任务( ReduceTask ),先扫描 .txt 执行输出任务,在明确有哪些单词之后,再执行计数任务,最后合并结果

图3. MapReduce 概念图

如此设计,简化了业务逻辑,明确了分工,同时也大大提高了产出(解析)效率

IV. Solution

我们知道了 MapReduce 是什么以及它可以解决什么样的问题后,就可以开始动手设计与实现这套流程了

因为 6.824-golabs-2020 采用的是 Go 语言,所以我们还需要有点 Go 的基础,特别是 goroutine 的知识,如果学习过 C 语言,那么掌握 Go 还是比较容易的

当然,如果想深入理解 Go ,特别是它的内存机制( slice、map、channel 以及深浅拷贝...),那么还需要花大精力的

Lab1: MapReduce 实验主页 给出的提示还是比较清晰明了的,我来大致提炼一下。MapReduce 模型由两个角色组成,分别是 Master 和 Worker ,如果想玩转 MapReduce ,那么模型中至少有一个 Master 和一个 Worker

在现实世界中,MapReduce 分布式计算模式都是运行在不同机器上的,即都是远程的。但 6.824-golabs-2020 为了方便起见,用多进程来模拟分布式的效果,同样也是通过 RPC(远距离过程调用,Remote Process call)来实现各对象之间的通信

Your job is to implement a distributed MapReduce, consisting of two programs, the master and the worker. There will be just one master process, and one or more worker processes executing in parallel. In a real system the workers would run on a bunch of different machines, but for this lab you'll run them all on a single machine. The workers will talk to the master via RPC.

下面是重点,为了方便起见,实验采用了 Worker 主动的通信策略,即是 Worker 不停地向 Master 要任务,Worker 拿到任务后开始工作,最后再将结果交给 Master 汇总

Each worker process will ask the master for a task, read the task's input from one or more files, execute the task, and write the task's output to one or more files. The master should notice if a worker hasn't completed its task in a reasonable amount of time (for this lab, use ten seconds), and give the same task to a different worker.

这样的通信模式确实很简洁,因为 Master 无需知道 Worker 的存在,也无需知道每一次与它通信的 Worker 的相关信息。总之,Master 什么都不需要知道,更不需要维护 Worker 列表

Master 只要做一件事,那就是开机,响应 Worker 的任务请求。需要实现的业务逻辑就是分配任务( assign task )和汇总结果( gather result

另外,Lab1: MapReduce 还考虑到了容错机制,即是集群中有 Worker 忽然掉线的情况,在掉线的情况下,还能保证任务的顺利完成,这也是考察的重点

Lab1: MapReduce 实验主页 给出的提示,如果 Worker1 掉线了,那么就将已经分配给它的任务再分配给其他在线的 Worker,比如 Worker2

就这么简单,唯一的难点,应该就是 Master 如何得知 Worker1 已经掉线了。这有好多种做法,有正规的、非常合理的做法,也有简单粗暴的做法

前者类似于 HTTP 的心跳包机制,即是每隔一个时间段,Master 就向 Workers 发送应答报文,如果 Worker1 没有在合理的时间内恢复,则认为它已经掉线了。这是最正确的做法,但是也是最烦人的做法。如果要实现这种机制,那么 Master 就需要维护 Worker 列表,需要记下每个 Worker 的通信地址...,维护的工作又是一大笔开销,有舍有得!

我采用的是后者,即 Master 仍然不需要维护 Worker 列表。它自己有个计时器,如果已经分配出去的任务,在计时器结束时还没有向其交付,那么 Master 就认为该 Worker 已经掉线了

这种方法的关键点在于设置计时器的值,太大了不行,太小了更不行。说实话,该方法不是通用的方法,其时间的取值完全取决于 MapTask 和 ReduceTask 的用时。可以说,其值的设定完全靠经验

但是,从理论上来说,简单粗暴的方法也能实现容错机制。正常情况下,Master 和 Worker 之间的通信流程是这样的,

图4. Worker 与 Master 的交互过程

一般是 Worker 开口向 Master 要任务,接着 Master 从任务队列中挑一个任务回应;当任务都已经完成时,对应图中的最后两根虚线,Master 会向 Worker 发送信号 TaskDone ,表示已无任务需要完成,可以下班了。Worker 在收到结束回复后,关机!结束 MapReduce 流程

S1 - defs.go约定俗成

首先,在 src/mr 目录里新建 defs.go,将一些约定俗成的变量定义好,比如说信号的种类和任务的类别,分为 MapTask 和 ReduceTask,

package mr

import (
	"fmt"
)

const (
	TaskMap      = 1
	TaskReduce   = 2
	TaskWait     = 3
	TaskEnd      = 4
	FixedTimeOut = 15
)

/* debug 模式值为 1,release 模式值为 0 */
var Debug int

type MetaTask struct {
	FileName string
	FileIdx  int /* File 编号 */
}

type MapTask struct {
	MetaTask
	NReduce int /* 要划分成多少份 ReduceTask */
}

type ReduceTask struct {
	MetaTask
	PartIdx int /* ReduceTask 的合并编号 */
}

func DPrintf(format string, data ...interface{}) {
	if Debug > 0 {
		fmt.Printf(format, data...)
	}
}

信号 TaskWait 表示 Master 手头上暂时没有可以分配的任务,如果此时有 Worker 前来申请任务,那么 Master 就向其回复信号 TaskWait,让该 Worker 等等再来,等待一会说不定 Master 就会任务可以分配了。具体如何等待,我将会在 S3 - worker.go实现ask和doing Task机制 中详细展开

信号 TaskEnd 表示所有任务都已经完成了,如果此时有 Worker 前来申请任务,那么 Master 就向其回复信号 TaskEnd,Worker 收到该信号之后就会明白:已经到了下班的时刻

定义 FixedTimeOut 主要是为了容错机制,即是过了 15 秒之后,如果运行队列还有任务的话,则 Master 判定当前有 Worker 掉线了,并将处理到一半的任务重新分配给其他的 Worker,具体我会在 S4 - master.go实现assign和gather Task机制 中详细展开

DebugDPrinf() 是配套的,我用其替代 fmt.Prinf() ,这样以来,调试和测试都比较方便,在执行 sh test-mr.sh 测试时,只需将变量 Debug 设为 0 即可,这样程序就不会在控制台输出一些我们不 care 的调试信息了

紧接着就是 Task 的定义,因为 MapTask 和 ReduceTask 有一些共性的地方,所以我将其提炼出来,形成了 MetaTask

在我看来,共性的部分也要适当地提炼,不能太过头。太过头了,虽然从代码角度上来说,少打了几个字符,但是易读性大打折扣。我的意思是,很多事情都是要博弈的,是要有取舍的,不要为了优雅而优雅,由心而发的那才是真本事

两类任务的共同部分是它们都需要记录下各自的文件名( FileName )和文件编号( FileIdx ),MapTask 独有的部分应该就是:它需要知道这一个 .txt 文件应该被划分成多少份 ReduceTask,对应变量 NReduce ,也对应图 3 中的 mr-x-1 的 \(x\)

而 ReduceTask 需要清楚自己是第几号分片,对应变量 PartIdx ,也对应图 3 中的 mr-1-y 的 \(y\)

S2 - rpc.go定义通信接口

Lab1: MapReducesrc/mr/rpc.go 中提供了一对 RPC demo,一问一答的模式,如下,

type ExampleArgs struct {
	X int
}

type ExampleReply struct {
	Y int
}

注意,其中的 XY 都是大写,这是 Go 规定的,结构体中首字母小写的变量相当于 C++ Class 中的 private 成员变量,其他对象是无法访问的,只有该结构体才行;反过来,大写就是 public 成员变量,是全局的

RPC 是全局下的 RPC,Worker 发出的 RPC,只有 Master 能够访问,那这个 RPC 才有存在的意义。如果改成小写 x ,那么 Master 将无权访问此 RPC ,同时它也不再通用,属于废了

我们照葫芦画瓢,AskTaskArgs 对应图 4 中的 step1. Ask Task,我这里写成了空,这是因为 Worker 只是申请任务而已,相当于举手,无需携带相关的条件信息;而 AskTaskReply 对应图 4 中的 step2. Assign Task,它是定义些变量的,这些变量主要是用来存放 Master 分配的任务信息,比如任务是何种类型( MapTask 或 ReduceTask ),如果是 MapTask,那么将通过变量 MapTask 返回任务信息;反之,利用切片 RedTasks 返回结果

package mr

import "os"
import "strconv"

// Add your RPC definitions here.
/* I. worker 向 master 要任务 */
type AskTaskArgs struct {}

/* II. master 回复 worker 的请求 */
type AskTaskReply struct {
	TaskType 		int
	MapTask 		MapTask
	RedTasks		[]ReduceTask
}

/* III. worker 完成任务,将结果返回给 master */
type TaskDoneReply struct {
	TaskType 		int
	MapTask 		MapTask
	RedTasks		[]ReduceTask
}
...

为什么 RedTasks 是切片呢?因为 ReduceTask 一定是由很多个小片段组成的,如果仅仅分配给 Worker 一个 mr-y-1 文件,它是不能工作,还缺少业务逻辑的其他信息,比如 mr-x-1、mr-z-1...

只有将所有的 mr-?-1 文件都分配给 Worker,它才能够开始解析,这是 MapReduce 的关键。还有一个关键的地方,即是 Reduce 流程开始之前,必须处理完所有的 MapTask

同理,TaskDoneReply 的定义也是如此,这里我就没有提炼,虽然它与 AskTaskReply 的定义相同!当然,你也可以拔高一层。TaskDoneReply 对应图 4 中的 step3. Task Done,Master 响应 Worker 的任务请求

S3 - worker.go实现ask和doing Task机制

首先,我们需要了解 Worker 工作的具体业务逻辑,即是向 Master 要任务,然后做任务,最后交任务。以此往复,一直循环地在做这件事,

图5. Master 与 Worker 的交互过程

看一下代码,

// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.
	for {
		reply := CallAskTask()

		switch reply.TaskType {
		case TaskMap:
			doMapTask(mapf, reply.MapTask)
			break
		case TaskReduce:
			doReduceTask(reducef, reply.RedTasks)
			break
		case TaskWait:
			time.Sleep(1 * time.Second)
			break
		case TaskEnd:
			DPrintf("mission complete! I'm closing...\n")
			return
		default:
			DPrintf("Unknown fault\n")
			return
		}
	}
}

Worker 会调用 CallAskTask() RPC 联络 Master,向其申请任务,变量 Reply 存放任务的具体信息。接着根据任务的类别,做具体不同的解析工作,体现在代码中分别是 doMapTask()doReduceTask()

如果 Worker 收到的回复是信号 TaskWait,这就说明此时 Master 手头上没有任务可供分配,需要等待一会,等待正在解析的 MapTask 完成。我在代码中用 Sleep() 的方式实现等待的效果

同理,Worker 收到信号 TaskEnd,意味着所有任务已完成,可以下班了,此时 Worker 跳出循环,结束业务逻辑

目光集中在 CallAskTask() 上,这是一个请求任务的 RPC 方法,

/* 对应 I. worker 向 master 要任务 */
func CallAskTask() AskTaskReply {
	args := AskTaskArgs{}
	reply := AskTaskReply{}

	/* 对应 II. master 回复 worker 的请求 */
	call("Master.AskTask", &args, &reply)

	return reply
}

很简单,就是去调用 Master 的 AskTask() ,具体会在 S4 - master.go实现assign和gather Task机制 中讲到。需要注意的是,call() 的参数应该是 pass-by-pointer 的,因为只有这样,Master 对其的修改,Worker 才能看到,这点跟 C 语言一样

接着,我们来看一看具体的 doMapTask 业务,

/* 对应 II. 处理 master 回复的 mapTask */
func doMapTask(mapf func(string, string) []KeyValue, mapTask MapTask) {
	DPrintf("doing MapTask, fileName..%v, fileIdx..%v, nReduce..%v\n",
		mapTask.FileName, mapTask.FileIdx, mapTask.NReduce)

	fileName := mapTask.FileName
	file, err := os.Open(fileName) /* 试图打开 pg-x.txt */
	if err != nil {
		log.Fatalf("cannot open %v", fileName)
	}
	content, err := ioutil.ReadAll(file) /* 读取文件的内容 */
	if err != nil {
		log.Fatalf("cannot read %v", fileName)
	}
	file.Close()

	/* 首先通过 map 方法输出每个单词,即遇到一个单词就将其压入 kvs 键值对集合中 */
	kvs := mapf(fileName, string(content))
	sort.Sort(ByKey(kvs))

	/* 以下 map 的切片逻辑也是借鉴 src/main/mrsequential.go 的 */
	reduces := make([][]KeyValue, mapTask.NReduce)
	for _, kv := range kvs { /* 将集合打散,按照最大上限数,生成 NReduce 个哈希桶 */
		idx := ihash(kv.Key) % mapTask.NReduce
		reduces[idx] = append(reduces[idx], kv)
	}

	/* 只是为了模拟工作的效果,time.Sleep 尽量不与 io 操作存在与同一循环中,尤其是 io 写操作 */
	//for idx, _ := range reduces {
	//	DPrintf("split MapTask to ReduceTask, fileName..%v, fileIdx..%v, partIdx..%v\n",
	//		mapTask.FileName, mapTask.FileIdx, idx)
	//	time.Sleep(1 * time.Second) /* 休眠久一点,为的是方便 debug */
	//}

	redTasks := []ReduceTask{}
	for idx, reduce := range reduces { /* 将每个哈希桶中的 '单词-次数' 键值对内容保存至 mr-fileIdx-partIdx.csv 中 */
		redTask := ReduceTask{
			MetaTask: mapTask.MetaTask,
			PartIdx:  idx,
		}

		redTasks = append(redTasks, redTask) /* 并将桶中的内容作为 mapTask 的计算结果返回给 master */

		output := "mr-" + strconv.Itoa(redTask.FileIdx) + "-" + strconv.Itoa(redTask.PartIdx) + ".csv"
		file, err = os.Create(output)
		if err != nil {
			log.Fatalf("cannot create %v", output)
		}

		w := csv.NewWriter(file)
		for _, v := range reduce {
			w.Write([]string{v.Key, v.Value})
		}
		w.Flush()

		file.Close()
	}

	DPrintf("MapTask to ReduceTask done, fileName..%v, fileIdx..%v\n", mapTask.FileName, mapTask.FileIdx)
	/* 在 doMapTask 中 redTasks 才是返回结果,而 mapTask 只充作默认参数 */
	CallTaskDone(TaskMap, mapTask, redTasks)
}

作为函数参数的 mapf 就是具体负责解析的程序,这是由 Lab1: MapReduce 的框架指定的,我们无需操心。我们只需要做好,除了 mapf 之外的工作,比如如何分片

我先通过一些 IO 操作,读取了指定文件的内容(第 6~15 行),接着调用 mapf 方法输出每个单词,并将其存放至 kvs 键值对集合中以及从小到大排序。其中的 ByKey() 是借鉴 src/main/mrsequential.go 的,Lab1: MapReduce 实验主页 的原话,

You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.

/* 以下的功能函数都是从 src/main/mrsequential.go 中抄过来的 */
// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

/*******************************************************************/

然后就是分片环节,其实也是借鉴 src/main/mrsequential.go 的,主要就是要将集合中的元素分成 NReduce 块,具体怎么分,我们可以调用 Lab1: MapReduce 提供的 ihash() 函数,求出哈希值(第 21~26 行)

之后,将每个子集合中的 kv 键值对写到对应的 mr-fileIdx-partIdx.csv 中,相当于保存工作结果,对应代码中的第 35~57 行

最后待完成解析 MapTask 后,调用 CallTaskDone() RPC 方法将结果交给 Master 汇总

其中的 28~33 行通过让进程休眠来模拟 MapTask 工作的场景,这在调试中非常有用,但如果要执行测试脚本,请注释掉这段话

另外,在 Go 语言里最好不要将 IO 操作与 time.Sleep() 放在同一循环中,尤其是写操作,会带来意想不到的 bug,比如写失败...

下面,再来讲讲 doReduceTask 的业务逻辑,其实和 doMapTask 大同小异,无非也是读文件内容,然后一通解析,保存结果,最后交由 Master 汇总,

func doReduceTask(reducef func(string, []string) string, redTasks []ReduceTask) {
	oname := "mr-out-" + strconv.Itoa(redTasks[0].PartIdx)
	ofile, _ := os.Create(oname)
	defer ofile.Close()

	kvs := []KeyValue{}

	/* 以下 reduce 的合并逻辑也是借鉴 src/main/mrsequential.go 的 */
	for _, v := range redTasks {
		DPrintf("doing ReduceTask, fileName..%v, fileIdx..%v, partIdx..%v\n", v.FileName, v.FileIdx, v.PartIdx)

		fileName := "mr-" + strconv.Itoa(v.FileIdx) + "-" + strconv.Itoa(v.PartIdx) + ".csv"
		file, err := os.Open(fileName)
		if err != nil {
			log.Fatalf("cannot open %v", fileName)
		}
		defer file.Close()

		r := csv.NewReader(file)
		context, _ := r.ReadAll()   /* 读取所有编号为 partIdx 的 '单词-次数' 键值对内容 */
		for _, u := range context { /* 并将其存放在 kvs 集合中 */
			kv := KeyValue{Key: u[0], Value: u[1]}
			kvs = append(kvs, kv)
		}

		//time.Sleep(2 * time.Second) /* 休眠久一点,为的是方便 debug */
	}
	sort.Sort(ByKey(kvs))

	i := 0
	for i < len(kvs) {
		j := i + 1
		for j < len(kvs) && kvs[j].Key == kvs[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kvs[k].Value)
		}
		output := reducef(kvs[i].Key, values)

		fmt.Fprintf(ofile, "%v %v\n", kvs[i].Key, output) /* 将合并结果写到 mr-out-x 中 */

		i = j
	}

	DPrintf("ReduceTask done, fileName..%v, fileIdx..%v\n", redTasks[0].FileName, redTasks[0].FileIdx)
	/* 在 doReduceTask 中只需返回将传进来的 redTasks 参数原封不动返回即可,而 MapTask{} 只充作默认参数 */
	CallTaskDone(TaskReduce, MapTask{}, redTasks)
}

其中的合并过程也是借鉴 src/main/mrsequential.go 的(第 30~45 行)。值得注意的是,休眠模拟的工作场景这次我是放在循环里的,因为这与读操作不大冲突

至此,理顺了 Worker 的相关工作

S4 - master.go实现assign和gather Task机制

src/mr/master.go 中,MapReduce 服务开启之前,首先需要创建 Master 对象,

// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeMaster(files []string, nReduce int) *Master {
	m := Master{}

	// Your code here.
	m.mapTaskingQ = make([]MapTask, 0)
	m.nReduce = nReduce
	for i, file := range files {
		task := MapTask{
			MetaTask: MetaTask{
				FileName: file,
				FileIdx:  i,
			},
			NReduce: nReduce,
		}
		m.mapTaskQ = append(m.mapTaskQ, task)
	}

	m.isDone = false /* 请勿关闭 Master 服务 */
	DPrintf("Master working...\n")

	go m.tasking2task()

	m.server()
	return &m
}

第 9~20 行,初始化 Master 的 MapTask 队列,根据文件名向量 files 为每个 MapTask 取名并赋上编号。其中 Master 结构如下,

type Master struct {
	// Your definitions here.
	mtx         sync.Mutex
	mapTaskQ    []MapTask
	redTaskQ    []ReduceTask
	mapTaskingQ []MapTask
	redTaskingQ []ReduceTask
	nReduce     int
	isDone      bool
}

主要有四个队列,分别是 MapTask 和 ReduceTask 队列以及 MapTasking 和 ReduceTasking 队列,后两者表示正在解析的任务

为了更精确和方便地汇总 Worker 返回的结果,Master 有必要记录下已经分配出去的 Task 信息,我选用了运行队列来解决这个问题

以及最关心的切片数量,对应代码的变量 nReduce 和完成标志 isDone ,其主要用于判断 Master 是否可以关闭服务,对应 master.go:Done()

// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
func (m *Master) Done() bool {
 // Your code here.
 return m.isDone
}

src/main/mrmaster.go 中,循环不停地询问是否能够结束 MapReduce,其中的第 8 行,

func main() {
	if len(os.Args) < 2 {
		fmt.Fprintf(os.Stderr, "Usage: mrmaster inputfiles...\n")
		os.Exit(1)
	}

	m := mr.MakeMaster(os.Args[1:], 10)
	for m.Done() == false {
		time.Sleep(time.Second)
	}

	time.Sleep(time.Second)
}

言归正传,讲一讲 Worker 与 Master 交互的两个重要函数,AskTask()TaskDone() ,先看前者的定义,

/* 对应 II. master 接收到 worker 的请求后 */
func (m *Master) AskTask(args *AskTaskArgs, reply *AskTaskReply) error {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	if len(m.mapTaskQ) > 0 {
		reply.TaskType = TaskMap
		reply.MapTask = m.assignMapTask()

		return nil
	}

	if len(m.mapTaskingQ) > 0 {
		reply.TaskType = TaskWait
		DPrintf("some MapTasks are not completed, please wait...\n")

		return nil
	}

	/* 只有当所有的 mapTasks 完成后,才可以开始 reduceTask */
	if len(m.redTaskQ) > 0 {
		reply.TaskType = TaskReduce
		redTasks := m.assignRedTask()
		reply.RedTasks = append(reply.RedTasks, redTasks...)

		return nil
	}

	if len(m.redTaskingQ) > 0 {
		reply.TaskType = TaskWait
		DPrintf("some ReduceTasks are not completed, please wait...\n")

		return nil
	}

	reply.TaskType = TaskEnd
	m.isDone = true
	DPrintf("All tasks done, msg: closing -> worker...\n")
	return nil
}

Master 在接收到申请任务的请求之后,会先在 MapTask 队列中寻找是否有合适的任务(第 6 ~11 行),有则调用 assignMapTask() 分配任务;反之,则检查一下是否有 MapTask 正在运行

如果确有其事,需要停止分配,回复 Worker 过会再来(第 13~18 行)。还记得嘛?MapReduce 的一大要点,所有 MapTask 必须在 ReduceTask 执行之前完成。这就意味着我们只能先把 MapTask 都做完,才能开始 ReduceTask

如果 MapTask 都完成了,那么开始 ReduceTask 的解析工作(第 21~27 行),同样调用 assignRedTask() 分配任务;如果还有 RedTask 正在运行,则回复 Worker 让其等待。这与之前分配 MapTask 是同样的道理

最后,如果任务都完成了,就修改变量 isDone ,告诉 src/main/mrmaster.go :可以结束 MapReduce 服务啦

其中的 assignMapTask() 的定义如下,

/* 从 mapTaskQ 任务队列里挑一个 task 分配给 worker */
func (m *Master) assignMapTask() MapTask {
	task := m.mapTaskQ[0]
	m.mapTaskQ = append(m.mapTaskQ[:0], m.mapTaskQ[1:]...) /* Go slice 出队操作 */
	m.mapTaskingQ = append(m.mapTaskingQ, task)            /* 将 task 的状态标记为正在计算中... */

	DPrintf("assign MapTask, fileName..%v, fileIdx..%v, nReduce..%v\n", task.FileName, task.FileIdx, task.NReduce)
	return task
}

不难理解,从 MapTask 队列中获取合适的任务(默认第一个),然后将其加入运行队列中;同理 assignRedTask() 也一样,

/* 从 redTaskQ 任务队列里挑选出编号为 partIdx 的 redTasks 分配给 worker */
func (m *Master) assignRedTask() []ReduceTask {
	redTasks := make([]ReduceTask, 0) /* 把选中的 redTask 放入其中 */
	partIdx := m.redTaskQ[0].PartIdx

	for i := 0; i < len(m.redTaskQ); {
		if m.redTaskQ[i].PartIdx != partIdx {
			i++
			continue
		}

		/* 已定位到编号为 partIdx 的 redTask */
		task := m.redTaskQ[i]
		m.redTaskingQ = append(m.redTaskingQ, task) /* 将其状态标记为正在计算中... */
		redTasks = append(redTasks, task)
		m.redTaskQ = append(m.redTaskQ[:i], m.redTaskQ[i+1:]...)
		DPrintf("assign ReduceTask, fileName..%v, fileIdx..%v, partIdx..%v\n",
			task.FileName, task.FileIdx, task.PartIdx)
	}

	return redTasks
}

只不过它是挑选编号为 partIdx 的 RedTask 而已。目光转移到 TaskDone()

/* 对应 III. worker 完成任务,将结果返回给 master */
func (m *Master) TaskDone(args *TaskDoneReply, reply *ExampleReply) error {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	switch args.TaskType {
	case TaskMap:
		m.mapTaskingQDeleter(args.MapTask, args.RedTasks)
		break
	case TaskReduce:
		m.redTaskingQDeleter(args.RedTasks)
		break
	default:
		break
	}

	return nil
}

这个 RPC 方法主要负责的即是汇总任务的工作,Worker 处理完成交给 Master,Master 收到后根据任务的类型( TaskMap 或 TaskReduce )来做相应的汇总工作。具体分为汇总 TaskMap 的 mapTaskingQDeleter() 和 TaskReduce 的 redTaskingQDeleter()

故名思义,两者都是从运行队列中移除已经完成的任务,前者的定义如下,

/* 汇总 worker 返回的 mapTask 切片结果 */
func (m *Master) mapTaskingQDeleter(mapTask MapTask, redTasks []ReduceTask) {
	DPrintf("MapTask done, fileName..%v, fileIdx..%v, nReduce..%v\n",
		mapTask.FileName, mapTask.FileIdx, mapTask.NReduce)

	for i := 0; i < len(m.mapTaskingQ); i++ {
		if m.mapTaskingQ[i].FileIdx == mapTask.FileIdx { /* 已彻底完成之前分配的 mapTask  */
			m.mapTaskingQ = append(m.mapTaskingQ[:i], m.mapTaskingQ[i+1:]...)
			break
		}
	}

	for _, v := range redTasks { /* 并将其转化为二阶段 reduceTask */
		m.redTaskQ = append(m.redTaskQ, v)
		DPrintf("add ReduceTask, fileName..%v, fileIdx..%v, partIdx..%v\n", v.FileName, v.FileIdx, v.PartIdx)
	}
}

Worker RPC 方法回复的是解析后的结果,自然保存在切片 redTasks 中,以及需要根据原来的 MapTask 的文件编号,相应的也要从运行队列中移除

后者的定义,同样如此,

/* 处理 worker 返回的 redTask 合并结果 */
func (m *Master) redTaskingQDeleter(redTasks []ReduceTask) {
	for i := 0; i < len(redTasks); i++ {
		task := redTasks[i]
		DPrintf("ReduceTask done, fileName..%v, fileIdx..%v, partIdx..%v\n", task.FileName, task.FileIdx, task.PartIdx)

		for j := 0; j < len(m.redTaskingQ); { /* 将编号为 partIdx 的 reduceTask 状态标记为已完成 */
			if m.redTaskingQ[j].PartIdx == task.PartIdx {
				m.redTaskingQ = append(m.redTaskingQ[:j], m.redTaskingQ[j+1:]...)
				continue
			}
			j++
		}
	}
}

Worker RPC 方法并没有回复解析后的结果,而是原来的 RedTasks,所以只需进行移除操作即可。解析的结果存放在临时文件 mr-out-x 中,这在 Lab1: MapReduce 实验主页 有提示,

The worker implementation should put the output of the X'th reduce task in the file mr-out-X.

S5 - master应对crash

现在,开始讲一下 Master 最重要的 crash 应对措施,它也是测试脚本中的最后一个测试点

上面实现的方案,在 Master 和 Worker 稳定在线的情况下, 能正常工作输出结果。但在现实世界中,我们是无法保证每个实体都稳定在线的,尤其是 Worker

试想,如果有一个正在解析 MapTask 的 Worker 忽然因为电源的原因自动关机了,那么 Master 将无法如期收到处理结果。待 Master 收回其他 Worker 的解析结果后,也无法开始 RedTask 的流程,因为 MapTask 还没有全部完成

这个时候就很苦恼了,Master 仅仅因为没有集齐一小部分的 MapTask 而无法进行下面的 RedTask 流程。这就是 crash 的主要病症,该如何解决?

我采取了较为粗暴的 TimeOut 重置法,思路是这样的,Master 每隔一段时间就会检查一下 MapTask 和 RedTask 两个运行队列,如果队列中还有任务,那么就将该任务移至分配队列中

这个方法的想法挺简单的,也比较有效,但是说实话,该方法玩的是一个绝对时间,能够凑效的话,很大程度上取决 TimeOut 的时间设定和 MapTask and RedTask 的业务处理时间

它不像 HTTP 或 Raft,实时通过心跳包来检测对方是否在线,而是通过轮询的法子,而且轮询的周期不宜控制。在这我将 TimeOut 设置成了 15 sec( src/mr/defs.go ),代码如下,

/* 重新分配超时的 task */
func (m *Master) tasking2task() {
	for {
		time.Sleep(FixedTimeOut * time.Second)
		m.mtx.Lock()
		if len(m.mapTaskingQ) != 0 {
			for i, _ := range m.mapTaskingQ {
				m.mapTaskQ = append(m.mapTaskQ, m.mapTaskingQ[i])
			}
			m.mapTaskingQ = []MapTask{}
		}

		if len(m.redTaskingQ) != 0 {
			DPrintf("redTaskingQ..%v\n", m.redTaskingQ)
			for i, _ := range m.redTaskingQ {
				m.redTaskQ = append(m.redTaskQ, m.redTaskingQ[i])
			}
			m.redTaskingQ = []ReduceTask{}
		}
		m.mtx.Unlock()
	}
}

关于为何不选择心跳包测试对方是否还在线的法子,我在 IV. Solution 开始的地方叙述过我的观点

至此,我已经讲完了整套的 MapReduce 流程

V. Result

在测试之前,我还想补充一下,在 src/mr/master.go 中,我添加了 init() 方法,这个方法是 Go 的特色,相当于进入 main() 之前的预处理函数,是无论如何都会率先执行的,可以去了解一下,网上应该资料很多,我就不展开了

且看定义,

func init() {
	/* test-mr.sh 时要将值设为 0,同时要注释掉下面的 rm 命令 */
	Debug = 0

	///* bash -c 表示脚本命令为接着其后的字符串内容,即 "rm -rf mr-*" 且不容忽略通配符* */
	//cmd := exec.Command("/bin/bash", "-c", "rm -rf mr-*")
	//err := cmd.Run() /* 删掉上次运行的 mr-out 缓存文件 */
	//if err != nil {
	//	log.Fatal("cmd.Run() failed with %v", err)
	//}
}

我做的就是控制是否打印调试信息和关于删除临时文件的相关操作,都是用于调试的,大家视自己的情况而定

过程中的临时文件我是保存在 src/main 路径下的,在每一次 Master 启动时都会将上次运行所产生的临时文件清理掉

自己调试,我是通过 time.Sleep() 来控制流程的速度的,要不然处理的速度太快,看不清交互的细节

另外,我想说的是,多线程的程序,不比单线程,单线程在 Visual Studio 下可以通过单步跟踪的方式进入调试,可以肯定的说,这是最快的方式,比 prinf() 输出相关信息要快的多

但在多线程下,单步调试就不再适用,要让自己习惯于打桩测试,也就是 printf() ,通过这些输出信息来分析

调试一般是,先转到 src/main 路径下,

cd src/main

开一个 Master,

go run mrmaster.go pg-*.txt

编译一下词频统计插件,

go build -buildmode=plugin ../mrapps/wc.go

每一次的调试都需要重新编译插件,再开启两个 Worker,

go run mrworker.go wc.so

确定没有问题之后,可以执行测试脚本来看看是否真的没问题了,

sh test-mr.sh

VI. Evaluation

写在最后,一些我自己的感慨

其实 Go 语言对我来说,远远比不上 C/C++,我是因为 MIT 6.824 这个 Lab 才接触了这个比较坑的语言

说句难听的,不加指针的 Go 语言,程序执行的效率可能还比不上 JavaScript 等脚本语言;加上指针的 Go,代码没法看,太丑了,比 C/C++ 要丑的多

Go 我唯一觉得好用的地方就是它的 goroutine,其他的地方还是坑比较多的,不管是 IO 操作,还是 Timer 和切片哈希表,多得是,我没有一一踩完,自己可以试试

而且 Go 语言当前最好的 IDE 就是 GoLand,特别是 6.824-golabs-2020 ,这个实验框架比较怪,放在 VSCode 或 Vim 中可能识别不了相关的代码,而且不允许我们引用第三方的框架

所以,别跟自己较劲,我们的主要目标是学习分布式,而不是配置环境,选择最舒服的 GoLand 即可