【MIT 6.5840/6.824】 Lab 3A: Raft (领导选举)

本文最后更新于 2025年9月16日 上午

介绍

Lab 3A 主要任务是实现 Raft 中的领导选举部分。网上讲解 Raft 共识算法的文章很多,这里就不多赘述,只说基本的原理以供回忆。

Raft 服务器具有三种状态:跟随者(Follower)、候选人(Candidate)和领导(Leader)。

状态 描述
Follower 被动状态,不主动发起请求,只响应来自 Leader 或 Candidate 的请求。
Candidate 主动发起选举,尝试成为 Leader。
Leader 负责处理客户端请求,复制日志到其他服务器。

他们的状态转移方式如 图1。领导选举的过程如下:

  1. 最开始时,所有 Raft 服务器都处于跟随者的状态。
  2. 在一段时间没收到 leader 的心跳后,即 选举超时(Election timeout) 后,进入候选人的状态,向其他的所有服务器请求投票。
  3. 率先赢得多数票的 candidate 会成为 leader,向其他服务器发送心跳,并且负责处理后续请求和日志复制。
  4. 若 candidate 没有成为 leader,并且没有收到其他 leader 的心跳,则会重新开始进行领导领导选举。

图1 Raft 服务器的状态

实现

3A 的 Hint 仍然给的非常非常多,也是条条有用,建议看完框架并且厘清逻辑后再写代码实现。我们可以从 Raft 服务器保存的状态开始实现。

Raft 服务器状态

这里,可以直接参照论文中 Figure 2 实现,对应 raft1/raft.go 的 Raft 结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *tester.Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

currentTerm int
heartbeat bool // 用于标识上个 Election timeout 的期间内是否收到 heartbeat
leaderId int32 // 保存 leader id
votedFor int
votes int // 记录票数
log []LogEntry

commitIndex int
lastApplied int

nextIndex []int
matchIndex []int
}

这块部分主要比较模糊的地方是日志条目 LogEntry 的定义。我们参考 Figure 中 RequestVote RPC 和 AppendEntires RPC 的定义,LogEntry 需要保存 command 和对应的任期。于是定义如下:

1
2
3
4
type LogEntry struct {
Term int
Command interface{}
}

Raft RPC 定义

这个直接照抄论文就好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type RequestVoteArgs struct {
Term int
CandidiateId int
LastLogIndex int
LastLogTerm int
}

type RequestVoteReply struct {
Term int
VoteGranted bool
}

// 3A 中只用于 heartbeat
type AppendEntriesArgs struct {
Term int
LeaderId int32
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}

type AppendEntriesReply struct {
Term int
Success bool
}

开始选举条件

那么,接下来就是实现开始选举的条件。在这里,使用条件来判断是否需要开始选举:rf.heartbeat == false,表示自己既没有成为 leader,也没有在过去一个选举周期内收到任何 leader 的心跳。

要注意避免数据竞态,记得上锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rf *Raft) noHeartbeat() bool {
rf.mu.Lock()
defer rf.mu.Unlock()
return !rf.heartbeat
}

func (rf *Raft) ticker() {
for !rf.killed() {
if rf.noHeartbeat() {
go rf.startElection()
}

rf.mu.Lock()
rf.heartbeat = false // 不管有没有超时,都直接让这一周期暂无 heartbeat
rf.mu.Unlock()
// pause for a random amount of time between 50 and 350
// milliseconds.
ms := 50 + (rand.Int63() % 300)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}

选举过程

我们可以先从 RequestVote RPC 这个较为离散的操作来实现。还是参考论文中 Figure 2 的对应部分,接收者的实现如下:

  1. 如果 args 中的 term < currentTerm,不给投票(Raft 一致性)
  2. 如果没有给人投票,并且候选人的最后一条日志至少跟接收者一样新,给投票

实际实现中,我们需要注意如果 term 不匹配需要立刻更新 currentTerm 来保证一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term < rf.currentTerm {
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
// RPC Caller 的 term 更新
if rf.currentTerm < args.Term {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.leaderId = -1
}
// 投过票了
if rf.votedFor != -1 {
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
// 还没投过票,或者还在给这个人投票
if rf.votedFor == -1 || rf.votedFor == args.CandidiateId {
lastLogTerm := rf.log[len(rf.log)-1].Term
lastLogIndex := len(rf.log) - 1
if args.LastLogTerm > lastLogTerm || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex) {
rf.votedFor = args.CandidiateId
reply.VoteGranted = true
reply.Term = rf.currentTerm
}
}
}

那么,开始选举的实现也很简单了。只需要对于所有 peers 发送 RequestVote RPC 就可以了。最后,还需要一个协程用于检查选举是否成功。这里,结束协程的判断条件为:当前任期与选举任期不匹配(超时/有了新的 leader),或者已经产生了 leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (rf *Raft) startElection() {
rf.mu.Lock()
rf.votes = 1
rf.votedFor = rf.me
rf.leaderId = -1
rf.currentTerm += 1
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidiateId: rf.me,
LastLogIndex: len(rf.log) - 1,
LastLogTerm: rf.log[len(rf.log)-1].Term,
}
rf.mu.Unlock()

for i := range rf.peers {
if i != rf.me {
go rf.callSendRequestVote(i, args)
}
}

go rf.checkElection()
}

func (rf *Raft) checkElection() {
// save the current term
// if the term changes, indicating that either
// a leader has been elected or a new election
// has started, return
// otherwise, if votes > n/2, become leader
// else return
rf.mu.Lock()
electionTerm := rf.currentTerm
rf.mu.Unlock()

for !rf.killed() {
// check if election timeout or a leader has been elected
rf.mu.Lock()
if electionTerm != rf.currentTerm || rf.leaderId != -1 {
rf.mu.Unlock()
return
}
// if won the election
if rf.votes > len(rf.peers)/2 {
rf.mu.Unlock()
go rf.startAsLeader()
break
}
rf.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
}

需要注意的地方

在第一次通过测试后,出现了 “warning: term changed even though there were no failures”. 后面发现,在实现中我通过 rf.heartbeat 来判断过去一个任期内是否有心跳,但是在发送心跳的代码中没有给 leader 自己设置这个位置,导致了 leader 自己也会选举超时。

总之,本地操作记得做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) startAsLeader() {
rf.mu.Lock()
rf.leaderId = int32(rf.me)
for i := range rf.peers {
rf.nextIndex[i] = len(rf.log)
rf.matchIndex[i] = 0
}
term := rf.currentTerm
rf.mu.Unlock()

args := &AppendEntriesArgs{
Term: term,
LeaderId: int32(rf.me),
PrevLogIndex: len(rf.log) - 1,
PrevLogTerm: rf.log[len(rf.log)-1].Term,
Entries: []LogEntry{},
LeaderCommit: rf.commitIndex,
}

for !rf.killed() {
rf.mu.Lock()
if rf.currentTerm != term {
rf.mu.Unlock()
return
}
// 这个地方出问题了
rf.heartbeat = true
rf.mu.Unlock()
for i := range rf.peers {
if i != rf.me {
rf.callSendHeartbeat(i, args)
}
}
time.Sleep(10 * time.Millisecond)
}
}

通关记录


【MIT 6.5840/6.824】 Lab 3A: Raft (领导选举)
https://blog.icel.site/2025/09/13/MIT-6-5840-Lab-3A/
作者
IceLocke
发布于
2025年9月13日
许可协议