0 引用
如果没有特别说明,本文引用均出自课程实验描述或raft论文。
- 6.5840 Lab 3C - Raft 持久化实验官方文档
- Raft 论文 - In Search of an Understandable Consensus Algorithm (Extended Version)
1 核心状态定义
type StateType int
const (
StateFollower StateType = iota
StateCandidate
StateLeader
)
type Raft struct {
// 3A 关键字段
currentTerm int // 当前任期,单调递增
votedFor int // 当前任期投票给了谁(-1 表示未投票)
state StateType // 当前状态
lastHeartbeat time.Time // 上次收到心跳时间
electionTimeout time.Duration // 选举超时时间
}
2 选举超时设置
The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds (e.g., once per 10 milliseconds). Because the tester limits you tens of heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.
论文第 5.2 节提到的选举超时时间范围为 150 到 300 毫秒。只有当领导者发送心跳的频率远高于每 150 毫秒一次(例如每 10 毫秒一次)时,该范围才合理。由于测试程序将你的心跳频率限制为每秒几十次,因此你必须使用比论文中 150 到 300 毫秒更大的选举超时时间;但也不宜过大,否则可能导致无法在五秒内成功选出领导者。
The tester requires that the leader send heartbeat RPCs no more than ten times per second.
测试程序要求领导者每秒最多发送 10 次心跳 RPC。
综上实验提示,我选择发送心跳包的间隔为 100ms,而选举超时时间为 450ms~750ms。
核心代码如下:
func (rf *Raft) getElectionTimeout() time.Duration {
return time.Duration(450+rand.Int63()%300) * time.Millisecond
}
func (rf *Raft) resetElectionTimer() {
rf.lastHeartbeat = time.Now()
rf.electionTimeout = rf.getElectionTimeout()
}
关键设计:
- 超时时间必须 随机化(450-750ms),避免多个节点同时发起选举导致活锁
- 心跳间隔固定 50ms,远小于选举超时
- 每次收到心跳或投票后都要重置定时器
3 选举触发逻辑
func (rf *Raft) ticker() {
for rf.killed() == false {
rf.mu.Lock()
state := rf.state
timeout := rf.electionTimeout
elapsed := time.Since(rf.lastHeartbeat)
rf.mu.Unlock()
if state == StateLeader {
rf.replicateLog() // leader 发送心跳
time.Sleep(heartbeatInterval)
continue
}
if elapsed >= timeout {
rf.startElection() // 超时,发起选举
}
// ...
}
}
重点:
- 只有非 Leader 节点在超时时才发起选举
- Leader 定期广播心跳(通过 replicateLog)
- 每次循环睡眠 50-350ms 随机时间,避免 CPU 空转
4 发起选举
func (rf *Raft) startElection() {
rf.mu.Lock()
if rf.state == StateLeader {
rf.mu.Unlock()
return
}
rf.becomeCandidate() // 转为候选者,term+1
currentTerm := rf.currentTerm
lastLogIndex := rf.lastLogIndex()
lastLogTerm := rf.lastLogTerm()
majority := len(rf.peers)/2 + 1
votes := 1 // 自己投一票
rf.mu.Unlock()
// 并行向其他节点请求投票
for i := range rf.peers {
if i == rf.me { continue }
go func(server int) {
args := &RequestVoteArgs{
Term: currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex, // 3B 使用
LastLogTerm: lastLogTerm, // 3B 使用
}
reply := &RequestVoteReply{}
if !rf.sendRequestVote(server, args, reply) {
return // RPC 失败
}
rf.mu.Lock()
defer rf.mu.Unlock()
// 检查状态是否过期
if rf.state != StateCandidate || rf.currentTerm != currentTerm {
return
}
if reply.Term > rf.currentTerm {
rf.becomeFollower(reply.Term) // 发现更高任期,退位
return
}
if reply.VoteGranted {
votes++
if votes >= majority && rf.state == StateCandidate {
rf.becomeLeader()
go rf.replicateLog() // 立即发送心跳宣告领导权
}
}
}(i)
}
}
核心细节:
- 状态检查:RPC 返回后必须检查 state 和 currentTerm 是否还是选举时的值
- 双检查成为 Leader:不仅票数要过半,还要确保自己仍是 Candidate(可能已经被其他Leader的心跳转为Follower)
5 处理投票请求
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.VoteGranted = false
if args.Term < rf.currentTerm {
return // 拒绝旧任期的候选者
}
if args.Term > rf.currentTerm {
rf.becomeFollower(args.Term) // 更新任期,转为Follower
}
// 投票条件:未投票或已投给该候选者,且候选者日志至少一样新
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
rf.isLogUpToDate(args.LastLogIndex, args.LastLogTerm) {
if rf.votedFor != args.CandidateId {
rf.votedFor = args.CandidateId
rf.persist() // 持久化投票记录
}
rf.resetElectionTimer() // 重置选举定时器
reply.VoteGranted = true
}
reply.Term = rf.currentTerm
}
投票规则(Raft 论文 Figure 2):

6 状态转换函数
func (rf *Raft) becomeFollower(term int) {
if term > rf.currentTerm {
rf.currentTerm = term
rf.votedFor = -1 // 新任期,清空投票记录
rf.persist()
}
rf.state = StateFollower
rf.resetElectionTimer()
}
func (rf *Raft) becomeCandidate() {
rf.state = StateCandidate
rf.currentTerm++ // 增加任期
rf.votedFor = rf.me // 投自己一票
rf.resetElectionTimer()
rf.persist()
}
func (rf *Raft) becomeLeader() {
if rf.state != StateCandidate {
return
}
rf.state = StateLeader
// 初始化 Leader 状态(nextIndex, matchIndex)
// ...
}
7 关键并发注意事项
- 锁的粒度:所有状态变更都要持有 rf.mu,但 RPC 调用不能持锁(避免阻塞)
- stale RPC 处理:RPC 返回后必须检查状态和任期是否变化
if rf.state != StateCandidate || rf.currentTerm != currentTerm {
return
}
- Leader 不会自降身份:只有发现更高任期时才会从 Leader 转为 Follower(在 becomeFollower 和 RPC handler 中处理)