引子

本文记录完成 Lab01 的过程,跟着 AI 照猫画虎.

MapReduce

首先我们需要简单了解一下 MapReduce 这篇论文,其提出了分布式的思想,具体如下:

  • Map:将一个大的问题分解为小问题,应用用户定义的 Map 函数,生成中间键值对——文本分割为单词,输出 {world, 1} 这样的键值对
  • Reduce: 将中间键值对按 key 分组,应用用户定义的 Reduce 函数,生成最终输出——Reduce 对同一 key 进行计数求和 {word, count}
  • 系统可以自动做到并行,任务调度,负载均衡,容错,让用户只需要开发 Map 和 Reduce 这两个函数

这样的一个系统由三部分组成

  • Master: 主节点,类似于 K3s 中的 server 节点
  • Worker: 类似于 K3s 中的 agent 节点
  • 分布式文件系统,例如 GFS: 存储输入数据,中间结果和最终输出

工作流程如下:

  1. 输入数据被分割成固定大小的分片
  2. Master 将 Map 任务分配给 Worker,Worker 读取分片,执行 Map 函数,生成中间键值对,写入本地磁盘。
  3. 中间键值对按键分区(partitioned),传输到 Reduce Worker
  4. Reduce Worker 对键值对排序、分组,执行 Reduce 函数,生成最终输出,写入分布式文件系统

而本 lab 的目标就是实现一个简化的 MapReduce 系统,模拟 Master 和 Worker. 处理任务分配、并行执行和故障容错.

Lab 实现

在本 Lab 中,我们只需要编写 src/main/mr 下的三个代码即可.

Coordinator.go

对于 Master 节点,我们需要让他来进行调度,如何确保 Worker 们按照我们想要的方式来执行任务.

Coordinator

首先,我们先要定义 Coordiantor 本身

type Coordinator struct {
	// Your definitions here.
	mu            sync.Mutex // 互斥锁
	files         []string   // 输入文件列表
	nReduce       int        // Reduce任务数
	mapTasks      []Task     // Map任务列表,以taskID作为下标
	reduceTasks   []Task     // Reduce任务列表
	completedMaps int        // 已完成的Map任务数量
	completedReds int        // 已完成的Reduce任务数量
}

我们作为控制者,要获取输入的文件集合,保存 Map 和 Reduce 两种不同任务,并统计二者的初始数量与已完成的数量——用来看某类任务是否已经全部完成.

对于这些变量,为了防止数据冲突,自然要处理并发;在这里我们使用了 mutex 直接上锁,具体操作见后文.

Task

其次,我们需要定义有关 Task 的信息,分别是 Type 结构体和任务类型 TaskType

type TaskType string

type Task struct {
	TaskID     int      // 任务ID
	TaskType   TaskType // 是Map还是Reduce
	InputFile  string // 对Map来说输入文件
	OutputFile string // 对Reduce来说最终的输出文件
	NReduce    int // Reduce任务数
	NMap       int
	StartTime  time.Time // 用于计时,看是否超时
	Completed  bool
	//
}

const (
	Map    TaskType = "Map"
	Reduce TaskType = "Reduce"
	Wait   TaskType = "Wait"
	Exit   TaskType = "Exit"
) 

同时,我们还定义了一些常量表示不同任务类型.

MakeCoordinator

这是一个初始化方法,入参已经给定,我们来初始化 Coordinator,Task切片.

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{
		files:         files,
		nReduce:       nReduce,
		mapTasks:      make([]Task, len(files)),
		reduceTasks:   make([]Task, nReduce),
		completedMaps: 0,
		completedReds: 0,
	}
	// 初始化Map任务
	for i, file := range files {
		c.mapTasks[i] = Task{
			TaskID:    i,
			TaskType:  Map,
			InputFile: file,
			NReduce:   nReduce,
			NMap:      len(files),
		}
	}
	// 初始化Reduce任务
	for i := 0; i < nReduce; i++ {
		c.reduceTasks[i] = Task{
			TaskID:     i,
			TaskType:   Reduce,
			OutputFile: fmt.Sprintf("mr-out-%d", i),
			NReduce:    nReduce,
			NMap:       len(files),
		}
	}
	c.server()
	return &c
}

最终启动了 server,server 是已经写好的代码,它用来作为 Coordinator 监听相关 RPC 请求的服务端.

RPC方法

在调度框架中, Worker 会通过发送 RPC 请求向 Master 请求资源,例如请求任务,所以我们在 Master 端需要写相对应的请求任务的逻辑.

RequestTask

优先分配未完成或者超时的 Map 任务,所有 Map 任务完成后分配 Reduce 任务(实现了 Reduce 对 Map 的依赖关系) 由于存在并发,所以我们一开始就用 mutex 保护共享状态.

func (c *Coordinator) RequestTask(args *TaskArgs, reply *TaskReply) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// 优先分配Map
	for i := range c.mapTasks {
		// 如果任务未完成
		if !c.mapTasks[i].Completed {
			// 检查任务是否超时
			if !c.mapTasks[i].StartTime.IsZero() && time.Since(c.mapTasks[i].StartTime) > 10*time.Second {
				// 若超时则重新分配
				c.mapTasks[i].StartTime = time.Now()
				reply.Task = c.mapTasks[i]
				return nil
			} else if c.mapTasks[i].StartTime.IsZero() {
				// 任务未分配
				c.mapTasks[i].StartTime = time.Now()
				reply.Task = c.mapTasks[i]
				return nil
			}
		}
	}

	// 如果所有Map任务都完成
	if c.completedMaps == len(c.mapTasks) {
		// 分配Reduce 任务
		for i := range c.reduceTasks {
			if !c.reduceTasks[i].Completed {
				if !c.reduceTasks[i].StartTime.IsZero() && time.Since(c.reduceTasks[i].StartTime) > 10*time.Second {
					// 若超时则重新分配
					c.reduceTasks[i].StartTime = time.Now()
					reply.Task = c.reduceTasks[i]
					return nil
				} else if c.reduceTasks[i].StartTime.IsZero() {
					// 任务未分配
					c.reduceTasks[i].StartTime = time.Now()
					reply.Task = c.reduceTasks[i]
					return nil
				}
			}
		}
		// 所有 reduce 完成
		if c.completedReds == len(c.reduceTasks) {
			reply.Task.TaskType = Exit
			reply.Finished = true
			return nil
		}
	}

	// 无可用任务, Wo 等待
	reply.Task.TaskType = Wait
	return nil
}
TaskCompleted

同时, Worker 也要向 Master 发送对应的任务处理情况通知,为了让 Master 去管理全局状态,例如已经完成的任务数的更新.

func (c *Coordinator) TaskCompleted(args *TaskCompletionArgs, reply *TaskCompletionArgs) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	switch args.TaskType {
	case Map:
		if args.TaskID < len(c.mapTasks) && !c.mapTasks[args.TaskID].Completed {
			c.mapTasks[args.TaskID].Completed = true // 在范围内且传回来未完成?
			c.completedMaps++
			log.Printf("Map任务 %d 已完成", args.TaskID)
		}
	case Reduce:
		if args.TaskID < len(c.reduceTasks) && !c.reduceTasks[args.TaskID].Completed {
			c.reduceTasks[args.TaskID].Completed = true
			c.completedReds++
			log.Printf("Reduce任务 %d 已完成", args.TaskID)
		}
	}

	return nil
}

如果报告的的是 Map 任务且状态为未完成,我们才会去更新任务状态为已完成.

  • 为了保证任务状态的幂等性——重复通知不改变任务状态,所以我们只有未完成才去更新(通知了一定是完成了)
  • 之后的通知此任务状态为已完成,不会再更新

Done

最后,主函数需要调用 Done 来判断整个系统是否运行结束,我们根据已完成的任务数与预期的任务数对比来判断.

func (c *Coordinator) Done() bool {
	//ret := false
	// Your code here.
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.completedMaps == len(c.mapTasks) && c.completedReds == len(c.reduceTasks)
}

rpc.go

在 lab 中, Master 和 Worker 是通过套接字 socket 来通信的,在 rpc.go 中我们需要定义 RPC 通信的请求和响应结构,可以携带一些状态或信息.

通常以请求和响应参数成对出现,从 Worker 请求到 Master,从 Master 响应到 Worker

与任务的请求有关的:

type TaskArgs struct{}

type TaskReply struct {
	Task     Task
	Finished bool
}

与任务完成情况有关的:

type TaskCompletionArgs struct {
	TaskID   int
	TaskType TaskType
}

type TaskCompletionReply struct{}
  • Worker 需要向 Master 传递 TaskID 等信息供后者使用.
  • Master 无需向 Worker 传递什么信息.

值得注意的是,在 RPC 调用中,我们要保证 RPC 的参数类型在调用双方是匹配的,这里我们都用了之前声明的结构体,没有此问题.

后续更新,其实有关 Task 的定义也该放在这里.

worker.go

在 worker 中,我们就要实际进行 Map 和 Reduce 的相关操作了. 为了本文的连续性,我们先写有关于 RPC 的相关代码.

RPC相关

我们需要一个 call 方法来调用 Master 中的方法,具体就是靠 rpc.DialHTTP 打开一个连接,Worker 作为客户端去调用 Call 方法

func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := coordinatorSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}

同时我们也要写与服务端,也就是 Master 中对应的 RPC 方法,本质上是通过上面的 call 方法调用服务端上的方法.

requestTask,请求任务.

func requestTask() (Task, error) {

	// declare an argument structure.
	args := &TaskArgs{}
	reply := &TaskReply{}
	ok := call("Coordinator.RequestTask", args, reply)
	if !ok {
		return Task{}, fmt.Errorf("RPC调用失败")
	}
	// declare a reply structure.
	//reply := ExampleReply{}
	return reply.Task, nil
}

reportTaskCompletion,报告任务完成情况.

func reportTaskCompletion(taskID int, taskType TaskType) {
	args := &TaskCompletionArgs{
		TaskID:   taskID,
		TaskType: taskType,
	}
	var reply struct{}
	ok := call("Coordinator.TaskCompleted", args, &reply)
	if !ok {
		log.Printf("报告任务完成失败: TaskID=%d, Type=%s", taskID, taskType)
	}
}

Worker本身

Worker 的主要的逻辑是请求任务,执行任务(如果没有任务则等待),所有任务执行完后退出.

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {

	// Your worker implementation here.
	// 注册RPC客户端,连接Coordinator,持续请求任务直到收到退出信号
	for {
		// 请求任务
		task, err := requestTask()
		if err != nil {
			log.Printf("获取任务失败: %v", err)
			time.Sleep(2 * time.Second)
			continue
		}

		// 执行任务
		switch task.TaskType {
		case Map:
			log.Printf("开始处理Map任务: %d, 文件: %s", task.TaskID, task.InputFile)
			executeMapTask(task, mapf)
			reportTaskCompletion(task.TaskID, task.TaskType)
		case Reduce:
			log.Printf("开始处理Reduce任务: %d", task.TaskID)
			executeReduceTask(task, reducef)
			reportTaskCompletion(task.TaskID, task.TaskType)
		case Wait:
			log.Printf("没有可用任务,等待中...")
			time.Sleep(2 * time.Second)
		case Exit:
			log.Println("所有任务已完成,Worker退出")
			return
		}
	}
}
  • 请求任务失败的话暂停2秒继续请求
  • 根据请求到的任务类型来进行指定的操作,具体的操作见下文.

executeMapTask

用于处理 Map 任务

  • 读取输入文件,将其内容保存在 content 中
  • 调用 mapf 方法处理内容,得到中间结果 kv pairs
  • 将这些对根据 reduceID(即相同的 key 组成一个集合)保存在一个二维切片中
  • 将这些 pairs 编码为 JSON 格式并存储在临时文件中
  • 要注意对文件的命名符合实验要求
  • 将上述操作放在一个 goroutine 中执行,并且保证不发生冲突
func executeMapTask(task Task, mapf func(string, string) []KeyValue) {
	// 读取输入文件
	file, err := os.Open(task.InputFile)
	if err != nil {
		log.Fatalf("Cannot open %v", task.InputFile)
	}
	defer file.Close()
	content, _ := io.ReadAll(file)

	// 调用Map
	kvs := mapf(task.InputFile, string(content))
	// 按 Reduce 任务分区存储中间结果
	nReduce := task.NReduce
	intermediate := make([][]KeyValue, nReduce) // reduceID -> kv ?
	for _, kv := range kvs {
		reduceID := ihash(kv.Key) % nReduce
		intermediate[reduceID] = append(intermediate[reduceID], kv)
	}

	// 写入临时文件并原子重命名
	var wg sync.WaitGroup
	for i := 0; i < nReduce; i++ {
		wg.Add(1)
		go func(reduceID int, data []KeyValue) {
			defer wg.Done()
			tempFile, err := os.CreateTemp("", fmt.Sprintf("mr-%d-%d.tmp", task.TaskID, reduceID))
			if err != nil {
				log.Fatalf("Can't create tempFile: %v", err)
			}
			defer tempFile.Close()

			encoder := json.NewEncoder(tempFile)
			for _, kv := range data {
				if err := encoder.Encode(&kv); err != nil {
					log.Fatalf("编码 JSON 失败: %v", err)
				}
			}
			// 原子重命名为最终文件名
			finalName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)
			if err := os.Rename(tempFile.Name(), finalName); err != nil {
				log.Fatalf("重命名文件失败: %v", err)
			}
		}(i, intermediate[i])
	}
	wg.Wait()
}

executeReduceTask

几乎同理,我们处理刚才的 Map 过程产生的文件,读取其中信息并进行解码,最终保存在一个一维切片中.

相同key作为一组调用 reducef 方法进行处理,处理完后的结果写入到输出文件中.

func executeReduceTask(task Task, reducef func(string, []string) string) {
	// 收集所有Map任务的中间文件
	var kvs []KeyValue
	for mapID := 0; mapID < task.NMap; mapID++ {
		fileName := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
		file, err := os.Open(fileName)
		if err != nil {
			continue // 忽略未完成的 Map 任务文件
		}

		decoder := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := decoder.Decode(&kv); err != nil {
				break // 读完所有记录
			}
			kvs = append(kvs, kv)
		}
		file.Close()
	}
	// 按 Key 排序
	sort.Slice(kvs, func(i, j int) bool { return kvs[i].Key < kvs[j].Key })

	// 分组调用Reduce,找key相同的集合放到values中
	var results []string
	i := 0
	for i < len(kvs) {
		j := i
		for j < len(kvs) && kvs[j].Key == kvs[i].Key {
			j++
		}
		keys := kvs[i].Key
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kvs[k].Value)
		}
		result := reducef(keys, values)
		results = append(results, fmt.Sprintf("%v %v\n", keys, result))
		i = j
	}
	// 写入 Reduce 输出文件
	tempFile, err := os.CreateTemp("", fmt.Sprintf("mr-out-%d.tmp", task.TaskID))
	if err != nil {
		log.Fatalf("创建临时文件失败: %v", err)
	}
	defer tempFile.Close()

	_, err = tempFile.WriteString(strings.Join(results, ""))
	if err != nil {
		log.Fatalf("写入文件失败: %v", err)
	}

	finalName := fmt.Sprintf("mr-out-%d", task.TaskID)
	if err := os.Rename(tempFile.Name(), finalName); err != nil {
		log.Fatalf("文件重命名失败: %v", err)
	}
}

至此, Worker.go 结束. 最后执行实验命令可以通过实验.

总结

至此,就是我对于 lab1 的做法流程梳理,实现了一个小型的 MapReduce 分布式框架结构,这对于我自己目前在做的毕业设计项目也有很好的启发.

特别是在 Task 如何定义,在 lab 中我们定义为了 go 中的 struct,在未来要写的分布式框架中我们对于不同的 Task 的定义是通过 json 文件进行定义,来确定其所占据的资源信息.

这里是LTX,感谢您阅读这篇博客,人生海海,和自己对话,像只蝴蝶纵横四海。

引用