【MIT 6.5840】Lab 1: MapReduce

本文最后更新于 2025年7月24日 晚上

介绍

Lab 1 主要任务为使用 Go 实现一个基于进程和 PRC 通信的 MapReduce 框架。我们需要完成该框架下的 coordinator(在原文中是 master) 和 worker 两个部分,并且实现 worker 的并行计算以及失效容错。

阅读:MapReduce 论文

实现

仔细阅读 MapReduce 论文后完成 Lab 1 并不难。在较为复杂的细节上,例如 GFS、worker 间通信和用户程序上实验也进行了简化。我们可以先根据 Lab 指引,从 worker 入手思考 worker 如何通过 RPC 与 coordinator 通信,从定义 PRC 开始实现框架

RPC 定义

MapReduce 框架

注意到图中 coordinator 与 worker 只有 assign map 和 assign reduce 的两种通信,而实际上论文 3.1 Execution Overview 中提出的 coordinator 与 worker 的通信有以下的几种:

  1. 分配任务:coordinator 做出任务分配的决策,分配给 worker。论文中没有对具体实现做出规定,仅提到 “The master picks idle workers and assigns each one a map task or a reduce task.” 实验代码中,仅 coordinator 作为 RPC 服务端,我们需要定义由空闲节点主动询问 coordinator 来进行任务的分配。
  2. 完成任务:worker 完成 map 任务后,将中间文件的位置返回给 coordinator。论文中的原文对应为 “The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.”
  3. 告知中间文件:coordinator 将 map 任务产生的中间节点增量推送至进行 reduce 任务的节点。值得注意的是,论文规定了所有 map 任务完成后,reduce 任务才能被分配,在实验中简化了这样的需求,无需为此定义 RPC。
  4. 心跳:coordinator 询问 worker 是否存活。Lab 指引中提到,如果任务在 10 秒内没有完成我们可以认为节点已经死了。所以可以通过节点主动报告完成任务,以及一个 goroutine 设置超时来实现心跳,还避免了 coordinator 维护 worker 节点列表该需求也可以被简化

由于所有的实验依赖于本地文件系统,按照 mr-X-YXY 分别为 map 任务 ID 和 reduce 任务 ID)存储中间文件,我们不需要 worker 间的中间文件传输。所以以上(1、2两点)就是所有需要定义的 RPC 了,在 src/mr/rpc.go 中定义它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Worker -> Coordinator, ask for a task
type AssignTaskArgs struct {
WorkerID int
}

// Coordinator -> Worker, reply with a task.
// MapTaskID is given with a KV pair if the worker is assigned a map task,
// ReduceTaskID and MapTaskID are both given if the worker is assigned a reduce task.
type AssignTaskReply struct {
TaskType int // 0 for map task, 1 for reduce task, -1 for exit task, 999 for IDLE
NReduce int // number of reduce tasks
MapTaskID int
ReduceTaskID int
Intermediates []int // prefixes of intermediate files for reduce task, i.e. map task IDs
Filename string // filename for map task
}

// Worker -> Coordinator, report that a task is complete
type CompleteTaskArgs struct {
TaskType int // same as above
MapTaskID int
ReduceTaskID int
Intermediates []int // prefixes of intermediate files for reduce task, i.e. reduce task IDs
}

// Coordinator -> Worker, reply to task completion
type CompleteTaskReply struct {
}

其中 AssignTaskArgs 中的 WorkerID 不是一定需要的,正如前文提到本次实验中可以不用维护 worker 的列表。AssignTaskReply 同时承担了 map 和 reduce 任务的分配。MapTaskID 为 coordinator 分配的 map 任务 ID,与待处理文件列表的下标一一对应;ReduceTaskID 为 map 结果分块的序号,用于与 Intermediates[](包含了产生在该分块内 KV 对的 map 任务 ID),可以组合为 mr-MapTaskID-ReduceTaskID 来读取中间文件。CompleteTaskArgs.Intermediates 则包含了 map 任务产生的块的序号。

Worker 实现

有了 RPC 定义以后,worker 的实现就很简单了。worker.go 中实现 CallAssignTask()CallCompleteTask() 用于请求分配任务和报告完成任务,在 Worker() 主循环中调用分配任务和完成任务两个函数即可。

需要注意的一些细节

  • 主循环中避免忙等,记得 time.Sleep
  • 使用 os.CreateTemp 创建 reduce 任务的临时输出文件。

Coordinator 实现

Coordinator 主要存在共享数据竞争问题,我的解决方案就是直接用一个大锁锁所有的共享数据。

1
2
3
4
5
6
7
8
9
10
type Coordinator struct {
mu sync.Mutex
NReduce int // number of reduce tasks
InputFiles []string
MapTasks []TaskStatus
MapTasksRemaining int
Intermediates [][]int // dim: NReduce x len(MapTasks)
ReduceTasks []TaskStatus
ReduceTasksRemaining int
}

正如论文中 3.2 节 Master Data Structures 中提到的,coordinator 需要存储的数据如下

  1. 对于所有的 map 和 reduce 任务,保存他们的三个状态(idle、in-progress、completed),对应 MapTasksReduceTasks
  2. 对于完成的 map 任务,保存他们产生的中间文件的位置,对应 Intermediates[][]

MapTasksRemainingReduceTasksRemaining 用于辅助判断是否分配退出任务,让 worker 结束进程。

这里需要锁定的数据有 MapTasksMapTasksRemainingReduceTasksReduceTasksRemaining. 虽然 Lab 不需要我们实现原文中的备份机制,避免了一些时候同时进行多个相同任务的情况,但是我们还是好好处理这些共享数据为好。

需要注意的细节

  • 记得检查分配任务后 10 秒任务是否完成;
  • 注意 reduce 任务必须要在所有 map 任务完成后分配,等待的节点记得分配 IDLE 任务(我这里写的是 UnknownTask),一段时间后再次询问;
  • 注意判断分配退出任务的条件是严格的 MapTasksRemaining == 0 && ReduceTasksRemaining == 0

通关记录

单次测试和 100 次测试均通过。

单次测试


【MIT 6.5840】Lab 1: MapReduce
https://blog.icel.site/2025/07/24/MIT-6-5840-Lab-1/
作者
IceLocke
发布于
2025年7月24日
许可协议