引子
本文记录完成 Lab01 的过程,跟着 AI 照猫画虎.
MapReduce
首先我们需要简单了解一下 MapReduce 这篇论文,其提出了分布式的思想,具体如下:
- Map:将一个大的问题分解为小问题,应用用户定义的 Map 函数,生成中间键值对——文本分割为单词,输出
{world, 1}这样的键值对 - Reduce: 将中间键值对按 key 分组,应用用户定义的 Reduce 函数,生成最终输出——Reduce 对同一 key 进行计数求和
{word, count} - 系统可以自动做到并行,任务调度,负载均衡,容错,让用户只需要开发 Map 和 Reduce 这两个函数
这样的一个系统由三部分组成
- Master: 主节点,类似于 K3s 中的 server 节点
- Worker: 类似于 K3s 中的 agent 节点
- 分布式文件系统,例如 GFS: 存储输入数据,中间结果和最终输出
工作流程如下:
- 输入数据被分割成固定大小的分片
- Master 将 Map 任务分配给 Worker,Worker 读取分片,执行 Map 函数,生成中间键值对,写入本地磁盘。
- 中间键值对按键分区(partitioned),传输到 Reduce Worker
- Reduce Worker 对键值对排序、分组,执行 Reduce 函数,生成最终输出,写入分布式文件系统
而本 lab 的目标就是实现一个简化的 MapReduce 系统,模拟 Master 和 Worker. 处理任务分配、并行执行和故障容错.
Lab 实现
在本 Lab 中,我们只需要编写 src/main/mr 下的三个代码即可.
Coordinator.go
对于 Master 节点,我们需要让他来进行调度,如何确保 Worker 们按照我们想要的方式来执行任务.
Coordinator
首先,我们先要定义 Coordiantor 本身
type Coordinator struct {
// Your definitions here.
mu sync.Mutex // 互斥锁
files []string // 输入文件列表
nReduce int // Reduce任务数
mapTasks []Task // Map任务列表,以taskID作为下标
reduceTasks []Task // Reduce任务列表
completedMaps int // 已完成的Map任务数量
completedReds int // 已完成的Reduce任务数量
}
我们作为控制者,要获取输入的文件集合,保存 Map 和 Reduce 两种不同任务,并统计二者的初始数量与已完成的数量——用来看某类任务是否已经全部完成.
对于这些变量,为了防止数据冲突,自然要处理并发;在这里我们使用了 mutex 直接上锁,具体操作见后文.
Task
其次,我们需要定义有关 Task 的信息,分别是 Type 结构体和任务类型 TaskType
type TaskType string
type Task struct {
TaskID int // 任务ID
TaskType TaskType // 是Map还是Reduce
InputFile string // 对Map来说输入文件
OutputFile string // 对Reduce来说最终的输出文件
NReduce int // Reduce任务数
NMap int
StartTime time.Time // 用于计时,看是否超时
Completed bool
//
}
const (
Map TaskType = "Map"
Reduce TaskType = "Reduce"
Wait TaskType = "Wait"
Exit TaskType = "Exit"
)
同时,我们还定义了一些常量表示不同任务类型.
MakeCoordinator
这是一个初始化方法,入参已经给定,我们来初始化 Coordinator,Task切片.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
files: files,
nReduce: nReduce,
mapTasks: make([]Task, len(files)),
reduceTasks: make([]Task, nReduce),
completedMaps: 0,
completedReds: 0,
}
// 初始化Map任务
for i, file := range files {
c.mapTasks[i] = Task{
TaskID: i,
TaskType: Map,
InputFile: file,
NReduce: nReduce,
NMap: len(files),
}
}
// 初始化Reduce任务
for i := 0; i < nReduce; i++ {
c.reduceTasks[i] = Task{
TaskID: i,
TaskType: Reduce,
OutputFile: fmt.Sprintf("mr-out-%d", i),
NReduce: nReduce,
NMap: len(files),
}
}
c.server()
return &c
}
最终启动了 server,server 是已经写好的代码,它用来作为 Coordinator 监听相关 RPC 请求的服务端.
RPC方法
在调度框架中, Worker 会通过发送 RPC 请求向 Master 请求资源,例如请求任务,所以我们在 Master 端需要写相对应的请求任务的逻辑.
RequestTask
优先分配未完成或者超时的 Map 任务,所有 Map 任务完成后分配 Reduce 任务(实现了 Reduce 对 Map 的依赖关系) 由于存在并发,所以我们一开始就用 mutex 保护共享状态.
func (c *Coordinator) RequestTask(args *TaskArgs, reply *TaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
// 优先分配Map
for i := range c.mapTasks {
// 如果任务未完成
if !c.mapTasks[i].Completed {
// 检查任务是否超时
if !c.mapTasks[i].StartTime.IsZero() && time.Since(c.mapTasks[i].StartTime) > 10*time.Second {
// 若超时则重新分配
c.mapTasks[i].StartTime = time.Now()
reply.Task = c.mapTasks[i]
return nil
} else if c.mapTasks[i].StartTime.IsZero() {
// 任务未分配
c.mapTasks[i].StartTime = time.Now()
reply.Task = c.mapTasks[i]
return nil
}
}
}
// 如果所有Map任务都完成
if c.completedMaps == len(c.mapTasks) {
// 分配Reduce 任务
for i := range c.reduceTasks {
if !c.reduceTasks[i].Completed {
if !c.reduceTasks[i].StartTime.IsZero() && time.Since(c.reduceTasks[i].StartTime) > 10*time.Second {
// 若超时则重新分配
c.reduceTasks[i].StartTime = time.Now()
reply.Task = c.reduceTasks[i]
return nil
} else if c.reduceTasks[i].StartTime.IsZero() {
// 任务未分配
c.reduceTasks[i].StartTime = time.Now()
reply.Task = c.reduceTasks[i]
return nil
}
}
}
// 所有 reduce 完成
if c.completedReds == len(c.reduceTasks) {
reply.Task.TaskType = Exit
reply.Finished = true
return nil
}
}
// 无可用任务, Wo 等待
reply.Task.TaskType = Wait
return nil
}
TaskCompleted
同时, Worker 也要向 Master 发送对应的任务处理情况通知,为了让 Master 去管理全局状态,例如已经完成的任务数的更新.
func (c *Coordinator) TaskCompleted(args *TaskCompletionArgs, reply *TaskCompletionArgs) error {
c.mu.Lock()
defer c.mu.Unlock()
switch args.TaskType {
case Map:
if args.TaskID < len(c.mapTasks) && !c.mapTasks[args.TaskID].Completed {
c.mapTasks[args.TaskID].Completed = true // 在范围内且传回来未完成?
c.completedMaps++
log.Printf("Map任务 %d 已完成", args.TaskID)
}
case Reduce:
if args.TaskID < len(c.reduceTasks) && !c.reduceTasks[args.TaskID].Completed {
c.reduceTasks[args.TaskID].Completed = true
c.completedReds++
log.Printf("Reduce任务 %d 已完成", args.TaskID)
}
}
return nil
}
如果报告的的是 Map 任务且状态为未完成,我们才会去更新任务状态为已完成.
- 为了保证任务状态的幂等性——重复通知不改变任务状态,所以我们只有未完成才去更新(通知了一定是完成了)
- 之后的通知此任务状态为已完成,不会再更新
Done
最后,主函数需要调用 Done 来判断整个系统是否运行结束,我们根据已完成的任务数与预期的任务数对比来判断.
func (c *Coordinator) Done() bool {
//ret := false
// Your code here.
c.mu.Lock()
defer c.mu.Unlock()
return c.completedMaps == len(c.mapTasks) && c.completedReds == len(c.reduceTasks)
}
rpc.go
在 lab 中, Master 和 Worker 是通过套接字 socket 来通信的,在 rpc.go 中我们需要定义 RPC 通信的请求和响应结构,可以携带一些状态或信息.
通常以请求和响应参数成对出现,从 Worker 请求到 Master,从 Master 响应到 Worker
与任务的请求有关的:
type TaskArgs struct{}
type TaskReply struct {
Task Task
Finished bool
}
与任务完成情况有关的:
type TaskCompletionArgs struct {
TaskID int
TaskType TaskType
}
type TaskCompletionReply struct{}
- Worker 需要向 Master 传递 TaskID 等信息供后者使用.
- Master 无需向 Worker 传递什么信息.
值得注意的是,在 RPC 调用中,我们要保证 RPC 的参数类型在调用双方是匹配的,这里我们都用了之前声明的结构体,没有此问题.
后续更新,其实有关 Task 的定义也该放在这里.
worker.go
在 worker 中,我们就要实际进行 Map 和 Reduce 的相关操作了. 为了本文的连续性,我们先写有关于 RPC 的相关代码.
RPC相关
我们需要一个 call 方法来调用 Master 中的方法,具体就是靠 rpc.DialHTTP 打开一个连接,Worker 作为客户端去调用 Call 方法
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
同时我们也要写与服务端,也就是 Master 中对应的 RPC 方法,本质上是通过上面的 call 方法调用服务端上的方法.
requestTask,请求任务.
func requestTask() (Task, error) {
// declare an argument structure.
args := &TaskArgs{}
reply := &TaskReply{}
ok := call("Coordinator.RequestTask", args, reply)
if !ok {
return Task{}, fmt.Errorf("RPC调用失败")
}
// declare a reply structure.
//reply := ExampleReply{}
return reply.Task, nil
}
reportTaskCompletion,报告任务完成情况.
func reportTaskCompletion(taskID int, taskType TaskType) {
args := &TaskCompletionArgs{
TaskID: taskID,
TaskType: taskType,
}
var reply struct{}
ok := call("Coordinator.TaskCompleted", args, &reply)
if !ok {
log.Printf("报告任务完成失败: TaskID=%d, Type=%s", taskID, taskType)
}
}
Worker本身
Worker 的主要的逻辑是请求任务,执行任务(如果没有任务则等待),所有任务执行完后退出.
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// 注册RPC客户端,连接Coordinator,持续请求任务直到收到退出信号
for {
// 请求任务
task, err := requestTask()
if err != nil {
log.Printf("获取任务失败: %v", err)
time.Sleep(2 * time.Second)
continue
}
// 执行任务
switch task.TaskType {
case Map:
log.Printf("开始处理Map任务: %d, 文件: %s", task.TaskID, task.InputFile)
executeMapTask(task, mapf)
reportTaskCompletion(task.TaskID, task.TaskType)
case Reduce:
log.Printf("开始处理Reduce任务: %d", task.TaskID)
executeReduceTask(task, reducef)
reportTaskCompletion(task.TaskID, task.TaskType)
case Wait:
log.Printf("没有可用任务,等待中...")
time.Sleep(2 * time.Second)
case Exit:
log.Println("所有任务已完成,Worker退出")
return
}
}
}
- 请求任务失败的话暂停2秒继续请求
- 根据请求到的任务类型来进行指定的操作,具体的操作见下文.
executeMapTask
用于处理 Map 任务
- 读取输入文件,将其内容保存在 content 中
- 调用 mapf 方法处理内容,得到中间结果 kv pairs
- 将这些对根据 reduceID(即相同的 key 组成一个集合)保存在一个二维切片中
- 将这些 pairs 编码为 JSON 格式并存储在临时文件中
- 要注意对文件的命名符合实验要求
- 将上述操作放在一个 goroutine 中执行,并且保证不发生冲突
func executeMapTask(task Task, mapf func(string, string) []KeyValue) {
// 读取输入文件
file, err := os.Open(task.InputFile)
if err != nil {
log.Fatalf("Cannot open %v", task.InputFile)
}
defer file.Close()
content, _ := io.ReadAll(file)
// 调用Map
kvs := mapf(task.InputFile, string(content))
// 按 Reduce 任务分区存储中间结果
nReduce := task.NReduce
intermediate := make([][]KeyValue, nReduce) // reduceID -> kv ?
for _, kv := range kvs {
reduceID := ihash(kv.Key) % nReduce
intermediate[reduceID] = append(intermediate[reduceID], kv)
}
// 写入临时文件并原子重命名
var wg sync.WaitGroup
for i := 0; i < nReduce; i++ {
wg.Add(1)
go func(reduceID int, data []KeyValue) {
defer wg.Done()
tempFile, err := os.CreateTemp("", fmt.Sprintf("mr-%d-%d.tmp", task.TaskID, reduceID))
if err != nil {
log.Fatalf("Can't create tempFile: %v", err)
}
defer tempFile.Close()
encoder := json.NewEncoder(tempFile)
for _, kv := range data {
if err := encoder.Encode(&kv); err != nil {
log.Fatalf("编码 JSON 失败: %v", err)
}
}
// 原子重命名为最终文件名
finalName := fmt.Sprintf("mr-%d-%d", task.TaskID, reduceID)
if err := os.Rename(tempFile.Name(), finalName); err != nil {
log.Fatalf("重命名文件失败: %v", err)
}
}(i, intermediate[i])
}
wg.Wait()
}
executeReduceTask
几乎同理,我们处理刚才的 Map 过程产生的文件,读取其中信息并进行解码,最终保存在一个一维切片中.
相同key作为一组调用 reducef 方法进行处理,处理完后的结果写入到输出文件中.
func executeReduceTask(task Task, reducef func(string, []string) string) {
// 收集所有Map任务的中间文件
var kvs []KeyValue
for mapID := 0; mapID < task.NMap; mapID++ {
fileName := fmt.Sprintf("mr-%d-%d", mapID, task.TaskID)
file, err := os.Open(fileName)
if err != nil {
continue // 忽略未完成的 Map 任务文件
}
decoder := json.NewDecoder(file)
for {
var kv KeyValue
if err := decoder.Decode(&kv); err != nil {
break // 读完所有记录
}
kvs = append(kvs, kv)
}
file.Close()
}
// 按 Key 排序
sort.Slice(kvs, func(i, j int) bool { return kvs[i].Key < kvs[j].Key })
// 分组调用Reduce,找key相同的集合放到values中
var results []string
i := 0
for i < len(kvs) {
j := i
for j < len(kvs) && kvs[j].Key == kvs[i].Key {
j++
}
keys := kvs[i].Key
values := []string{}
for k := i; k < j; k++ {
values = append(values, kvs[k].Value)
}
result := reducef(keys, values)
results = append(results, fmt.Sprintf("%v %v\n", keys, result))
i = j
}
// 写入 Reduce 输出文件
tempFile, err := os.CreateTemp("", fmt.Sprintf("mr-out-%d.tmp", task.TaskID))
if err != nil {
log.Fatalf("创建临时文件失败: %v", err)
}
defer tempFile.Close()
_, err = tempFile.WriteString(strings.Join(results, ""))
if err != nil {
log.Fatalf("写入文件失败: %v", err)
}
finalName := fmt.Sprintf("mr-out-%d", task.TaskID)
if err := os.Rename(tempFile.Name(), finalName); err != nil {
log.Fatalf("文件重命名失败: %v", err)
}
}
至此, Worker.go 结束. 最后执行实验命令可以通过实验.
总结
至此,就是我对于 lab1 的做法流程梳理,实现了一个小型的 MapReduce 分布式框架结构,这对于我自己目前在做的毕业设计项目也有很好的启发.
特别是在 Task 如何定义,在 lab 中我们定义为了 go 中的 struct,在未来要写的分布式框架中我们对于不同的 Task 的定义是通过 json 文件进行定义,来确定其所占据的资源信息.
这里是LTX,感谢您阅读这篇博客,人生海海,和自己对话,像只蝴蝶纵横四海。