MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce

发布时间 2023-04-10 11:32:08作者: Alyjay

MIT 6.5840 2023 Spring(6.824)LAB1:MapReduce

前言

本次lab主要是完成一个基于RPC远程调用的单机单文件系统的简单MapReduce框架,并完成单词计数任务。基于golang实现,单Master,多Worker。实现worker的奔溃恢复(Fault Torrance),通过超时重新执行实现。主要的任务有,RPC调用参数及返回参数的设计,Master RPC Handler设计,Master控制逻辑设计,Worker任务执行逻辑设计,任务完成判断逻辑设计。

什么是MapReduce?

具体框架图如下

  • 一个核心思想:分治

  • 两个重要阶段:Map阶段,Reduce阶段

    1. 在Map阶段中,将一个Task分为多个小Task,发送到不同机器上运行,并将输出的结果根据Key的哈希值,分发到不同的中间文件中。
    2. 等待所有Map结束后,启动Reduce阶段。在Reduce阶段中,每个Reduce对一个中间文件执行Reduce操作,得到输出,输出为文件
  • MapReduce具体信息,请参照MapReduce原论文。

环境准备

  • 操作系统:Ubuntu 22.04 LTS
  • golang toolchain:go1.20.3 linux/amd64
  • IDE:Goland
  • 操作方式:虚拟机安装OS,goland ssh到虚拟机上进行编码
  • 版本控制:公司内部的gitlab

本次LAB我的系统整体框架图

概要设计解析

RPC设计

rpc分为调用参数和返回参数。其中,worker调用rpc,可以分为两种行为。

  • 初始化调用:Worker第一次调用RPC,这时候Master将为其进行注册WorkerId

  • 任务控制信息调用:附带workerId,以及任务的状态,任务ID等,以及任务的输出

    1. 任务完成
    2. 任务失败

worker调用rpc后,会进行阻塞,直到rpc调用成功,获得Reply。Reply有两种

  • 任务分配Reply

    1. Map任务
    2. Reduce任务
  • 控制信息Reply

    1. worker退出控制信息

我的类设计如下

type WorkerArgs struct {
	WorkerId    int
	RequestType int // request的类型

	// 下面的两个参数仅任务完成时存在且发送
	Output   []string // map任务生成的中间文件名或者reduce任务生成的文件名
	Input    []string
	TaskId   int // 对应的task id
	TaskType int
}

const (
	InitialRequest  = 0
	FinishedRequest = 1
	FailedRequest   = 2
)

type WorkerReply struct {
	WorkerId  int // 记录worker id
	TaskType  int // 任务的类型
	NReduce   int // reduce需要输出文件的数量
	TaskId    int // task的id
	Input     []string
	ReduceNum int
	ExitMsg   bool
}

Master设计

master有哪些职责?

  1. 回复Worker的RPC调用
  2. 任务的状态控制
  3. 崩溃恢复(通过超时重传)
  4. 任务完成判断

先从简单的讲起

  • 任务完成判断:所有Reduce任务完成,并且没有Worker阻塞在RPC调用中,那么任务即完成,可以安全退出。
  • 崩溃恢复:超时重新执行任务
  • 任务的状态控制:通过单独的任务状态控制模块来实现
  • 回复worker rpc调用:单独的handler

其中有一些问题:

  1. 是否有可能相同任务会造成多次完成?因为有超时重传机制存在,因此可能会有相同任务的多次Finish信息的存在,因此需要在任务控制模块中进行一些相应的控制
  2. 基于共享内存的状态控制太难写了,很多锁套锁,太难受了,怎么办(一开始我就是这么设计的)?使用基于通信的并发控制,将状态控制解耦,成为一个单独的模块。
  3. TODO

下面是我的类设计

type Coordinator struct {
	NReduce  int
	InputNum int
	Timeout  time.Duration

	NextWorkerId      int
	NextWorkerIdMutex sync.Mutex

	NextTaskId      int
	NextTaskIdMutex sync.Mutex

	TaskChan    chan Task
	TaskMsgChan chan TaskMsg

	MapResult      []string // 收到worker发来的结束消息后,task池里面的task就会删除然后放入MapResult
	MapResultMutex sync.Mutex

	WorkingTask      map[int]TaskStatus // 正在运行的task就放在这里面
	WorkingTaskMutex sync.Mutex

	//ReduceFinished      bool
	//ReduceFinishedMutex sync.Mutex

	ReduceTaskNum      int
	ReduceTaskNumMutex sync.Mutex

	WorkerNum      int
	WorkerNumMutex sync.Mutex

	// 阻塞在RPC调用的任务的数量
	WaitingNum      int
	WaitingNumMutex sync.Mutex
}
type Task struct {
	Input           []string
	CreateTask      bool
	TaskType        int
	TaskId          int
	ExcludeWorkerId int
	ReduceNum       int
	ExitTask        bool
}
type TaskMsg struct {
	Msg       int
	TaskId    int
	TaskType  int
	Input     []string
	Output    []string
	TimeStamp time.Time
	WorkerId  int
	ReduceNum int
}

const (
	CreateTaskMsg = 0
	FinishTaskMsg = 1
	UpdateTaskMsg = 2
	FailedTaskMsg = 3
)
const (
	TaskTypeMap    = 0
	TaskTypeReduce = 1
)

type TaskStatus struct {
	TaskId    int       // task的id
	TaskType  int       // task的类型
	Input     []string  // task的输入
	StartTime time.Time // task的开始时间
	WorkerId  int
	ReduceNum int
}

Worker设计

worker设计就比较简单了,其实就是调用RPC,然后对reply做出相应的动作,这里略了。

详细设计

master任务状态控制模块

状态控制信息有四种。

  • 任务状态创建信息
    case CreateTaskMsg:
    		{
    			taskss := TaskStatus{
    				msg.TaskId,
    				msg.TaskType,
    				msg.Input,
    				msg.TimeStamp,
    				msg.WorkerId,
    				msg.ReduceNum,
    			}
    			c.WorkingTaskMutex.Lock()
    			c.WorkingTask[taskss.TaskId] = taskss
    			c.WorkingTaskMutex.Unlock()
    		}
    
  • 任务完成信息
    case FinishTaskMsg:
    		{
    			c.WorkingTaskMutex.Lock()
    			if _, ok := c.WorkingTask[msg.TaskId]; ok {
    				// 如果存在,那么表示这个task结束了,直接删除
    				if msg.TaskType == TaskTypeMap {
    					c.MapResultMutex.Lock()
    					c.MapResult = append(c.MapResult, msg.Output...)
    					//fmt.Printf("%v %v", len(c.MapResult), c.InputNum)
    					if len(c.MapResult) == c.InputNum*c.NReduce {
    						// 表示所有map任务都已经完成,可以加入reduce任务了
    						reduceTask := make([][]string, c.NReduce)
    
    						for _, mres := range c.MapResult {
    							var idx int
    							var taskId int
    							var workerId int
    							fmt.Sscanf(mres, "mr-int-%d-%d-%d", &taskId, &workerId, &idx)
    							reduceTask[idx] = append(reduceTask[idx], mres)
    						}
    						for i := 0; i < c.NReduce; i++ {
    							task := Task{
    								reduceTask[i],
    								true,
    								TaskTypeReduce,
    								c.getNextTaskId(),
    								0,
    								i,
    								false,
    							}
    							c.TaskChan <- task
    						}
    					}
    					c.MapResultMutex.Unlock()
    				} else {
    					c.ReduceTaskNumMutex.Lock()
    					c.ReduceTaskNum -= 1
    					c.ReduceTaskNumMutex.Unlock()
    				}
    				delete(c.WorkingTask, msg.TaskId)
    			}
    			c.WorkingTaskMutex.Unlock()
    			if c.getReduceTaskNum() == 0 {
    				task := Task{ExitTask: true}
    				for {
    					//进入收尾阶段,啥都不干了
    					c.TaskChan <- task
    				}
    			}
    		}
    
  • 任务失败信息
    case UpdateTaskMsg:
    		{
    			c.WorkingTaskMutex.Lock()
    			if _, ok := c.WorkingTask[msg.TaskId]; ok {
    				c.WorkingTask[msg.TaskId] = TaskStatus{
    					msg.TaskId,
    					msg.TaskType,
    					msg.Input,
    					msg.TimeStamp,
    					msg.WorkerId,
    					msg.ReduceNum,
    				}
    			}
    			c.WorkingTaskMutex.Unlock()
    		}
    
  • 任务状态更新信息
    case FailedTaskMsg:
    		{
    			c.WorkingTaskMutex.Lock()
    			if _, ok := c.WorkingTask[msg.TaskId]; ok {
    				c.TaskChan <- Task{
    					msg.Input,
    					false,
    					msg.TaskType,
    					msg.TaskId,
    					msg.WorkerId,
    					c.WorkingTask[msg.TaskId].ReduceNum,
    					false,
    				}
    			}
    			c.WorkingTaskMutex.Unlock()
    		}
    

Master RPC Handler模块

  • Handler模块
        // worker阻塞数量加1
        c.incWaitingNum()
        defer c.decWaitingNum()
        if args.RequestType == InitialRequest {
            reply.WorkerId = c.getNextWorkerId()
            args.WorkerId = reply.WorkerId
            c.incWorkerNum()
        } else if args.RequestType == FinishedRequest {
            taskmsg := TaskMsg{
                FinishTaskMsg,
                args.TaskId,
                args.TaskType,
                args.Input,
                args.Output,
                time.Now(),
                args.WorkerId,
                0,
            }
            c.TaskMsgChan <- taskmsg
        } else if args.RequestType == FailedRequest {
            taskmsg := TaskMsg{
                FailedTaskMsg,
                args.TaskId,
                args.TaskType,
                args.Input,
                args.Output,
                time.Now(),
                args.WorkerId,
                0,
            }
            c.TaskMsgChan <- taskmsg
        }
        task := <-c.TaskChan
        if task.ExitTask {
            reply.ExitMsg = true
            c.decWorkerNum()
            return nil
        }
        for task.ExcludeWorkerId == args.WorkerId {
            c.TaskChan <- task
            time.Sleep(time.Duration(time.Millisecond))
            task = <-c.TaskChan
        }
        taskmsg := TaskMsg{}
        if task.CreateTask {
            taskmsg.Msg = CreateTaskMsg
        } else {
            taskmsg.Msg = UpdateTaskMsg
        }
        taskmsg.TaskId = task.TaskId
        taskmsg.TimeStamp = time.Now()
        taskmsg.Input = task.Input
        taskmsg.WorkerId = args.WorkerId
        taskmsg.TaskType = task.TaskType
        taskmsg.ReduceNum = task.ReduceNum
        c.TaskMsgChan <- taskmsg
        reply.TaskId = taskmsg.TaskId
        reply.TaskType = taskmsg.TaskType
        reply.Input = taskmsg.Input
        reply.NReduce = c.NReduce
        reply.WorkerId = args.WorkerId
        reply.ReduceNum = task.ReduceNum
        return nil
    
  • 辅助函数模块
    func (c *Coordinator) getNextTaskId() int {
    c.NextTaskIdMutex.Lock()
    defer c.NextTaskIdMutex.Unlock()
    res := c.NextTaskId
    c.NextTaskId += 1
    return res
    }
    func (c *Coordinator) getNextWorkerId() int {
        c.NextWorkerIdMutex.Lock()
        defer c.NextWorkerIdMutex.Unlock()
        res := c.NextWorkerId
        c.NextWorkerId += 1
        return res
    }
    func (c *Coordinator) incWorkerNum() {
        c.WorkerNumMutex.Lock()
        defer c.WorkerNumMutex.Unlock()
        c.WorkerNum += 1
    }
    func (c *Coordinator) decWorkerNum() {
        c.WorkerNumMutex.Lock()
        defer c.WorkerNumMutex.Unlock()
        c.WorkerNum -= 1
    }
    func (c *Coordinator) getReduceTaskNum() int {
        c.ReduceTaskNumMutex.Lock()
        defer c.ReduceTaskNumMutex.Unlock()
        return c.ReduceTaskNum
    }
    func (c *Coordinator) incWaitingNum() {
        c.WaitingNumMutex.Lock()
        defer c.WaitingNumMutex.Unlock()
        c.WaitingNum += 1
    }
    func (c *Coordinator) decWaitingNum() {
        c.WaitingNumMutex.Lock()
        defer c.WaitingNumMutex.Unlock()
        c.WaitingNum -= 1
    }
    func (c *Coordinator) getWaitingNum() int {
        c.WaitingNumMutex.Lock()
        defer c.WaitingNumMutex.Unlock()
        return c.WaitingNum
    }
    
    

超时重传模块

func (c *Coordinator) CheckTimeout() {
	for {
		c.WorkingTaskMutex.Lock()
		now := time.Now()
		for key, _ := range c.WorkingTask {
			if now.Sub(c.WorkingTask[key].StartTime) >= c.Timeout {
				// 超时了
				task := Task{
					c.WorkingTask[key].Input,
					false,
					c.WorkingTask[key].TaskType,
					c.WorkingTask[key].TaskId,
					c.WorkingTask[key].WorkerId,
					c.WorkingTask[key].ReduceNum,
					false,
				}
				c.TaskChan <- task
				// 其实这边可以只发送一个msg给task status manager
			}
		}
		c.WorkingTaskMutex.Unlock()
		time.Sleep(c.Timeout)
	}
}

Master Init模块

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}
	// Your code here.
	c.NReduce = nReduce
	c.InputNum = len(files)
	c.NextTaskId = 1
	c.NextWorkerId = 1
	c.TaskChan = make(chan Task, len(files)+1)
	c.TaskMsgChan = make(chan TaskMsg, len(files)+1)
	c.Timeout = time.Duration(time.Second * 10)
	for i := 0; i < len(files); i++ {
		input := []string{files[i]}
		task := Task{
			input,
			true,
			TaskTypeMap,
			c.getNextTaskId(),
			0,
			0,
			false,
		}
		c.TaskChan <- task
	}
	c.MapResult = []string{}
	c.WorkingTask = make(map[int]TaskStatus)
	c.WorkerNum = 0
	c.ReduceTaskNum = nReduce
	go c.CheckTimeout()
	go c.TaskStatusManager()
	c.WaitingNum = 0
	c.server()
	return &c
}

Worker RPC调用模块

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.

	// uncomment to send the Example RPC to the coordinator.
	// CallExample()
	// initial request
	var workerId int
	var nReduce int
	var taskId int
	var taskType int
	var inputs []string
	var failed bool
	var output []string
	args := WorkerArgs{WorkerId: 0, RequestType: InitialRequest}
	reply := WorkerReply{}
	call("Coordinator.WorkerHandler", &args, &reply)
	for {
		if reply.ExitMsg {
			break
		}
		failed = false
		workerId = reply.WorkerId
		nReduce = reply.NReduce
		taskId = reply.TaskId
		taskType = reply.TaskType
		inputs = reply.Input
		if taskType == TaskTypeMap {
			// 执行map操作
			intermediate := []KeyValue{}
			for _, filename := range inputs {
				file, err := os.Open(filename)
				if err != nil {
					file.Close()
					fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
					failed = true
					break
				}
				data, err := io.ReadAll(file)
				if err != nil {
					file.Close()
					fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
					failed = true
					break
				}
				kva := mapf(filename, string(data))
				intermediate = append(intermediate, kva...)
			}
			sort.Sort(ByKey(intermediate))
			files := []*os.File{}
			output = []string{}
			for i := 0; i < nReduce; i++ {
				oname := "mr-int-" + strconv.Itoa(taskId) + "-" + strconv.Itoa(workerId) + "-" + strconv.Itoa(i)
				file, _ := os.Create(oname)
				file.Seek(0, 0)
				files = append(files, file)
				output = append(output, oname)
			}

			for _, kv := range intermediate {
				fmt.Fprintf(files[ihash(kv.Key)%nReduce], "%v %v\n", kv.Key, kv.Value)
			}
			if failed {
				continue
			}
			args = WorkerArgs{
				workerId,
				FinishedRequest,
				output,
				inputs,
				taskId,
				taskType,
			}
			call("Coordinator.WorkerHandler", &args, &reply)
		} else {
			// 执行reduce操作
			res := ByKey{}
			kvs := []KeyValue{}
			for _, filename := range inputs {
				file, err := os.Open(filename)
				if err != nil {
					file.Close()
					fail(workerId, FailedRequest, make([]string, 0), inputs, taskId, taskType, &reply)
					failed = true
					break
				}
				var key string
				var value string
				for {
					if _, err = fmt.Fscanf(file, "%v %v\n", &key, &value); err != nil {
						break
					}
					kvs = append(kvs, KeyValue{key, value})
				}

			}
			sort.Sort(ByKey(kvs))
			if failed {
				continue
			}
			i := 0
			for i < len(kvs) {
				j := i + 1
				for j < len(kvs) && kvs[j].Key == kvs[i].Key {
					j++
				}
				values := []string{}
				for k := i; k < j; k++ {
					values = append(values, kvs[k].Value)
				}
				output := reducef(kvs[i].Key, values)
				res = append(res, KeyValue{kvs[i].Key, output})
				i = j
			}
			oname := "mr-out-" + strconv.Itoa(reply.ReduceNum)
			ofile, _ := os.Create(oname)
			for _, kv := range res {
				_, err := fmt.Fprintf(ofile, "%v %v\n", kv.Key, kv.Value)
				if err != nil {
				}
			}
			args = WorkerArgs{
				workerId,
				FinishedRequest,
				output,
				inputs,
				taskId,
				taskType,
			}
			call("Coordinator.WorkerHandler", &args, &reply)
		}
	}
}
func fail(workerId int, requestType int, output []string, input []string, taskId int, taskType int, reply *WorkerReply) {
	args := WorkerArgs{
		workerId,
		FailedRequest,
		make([]string, 0),
		input,
		taskId,
		taskType,
	}
	call("Coordinator.WorkerHandler", &args, reply)
}

吐槽,碎碎念

golang没有枚举类型真的好难用啊。。。如果有Rust那样强大的枚举类型就好了。