Skip to content
Go back

2025 6.5840 Lab 3A 实验笔记

Edit page

0 引用

如果没有特别说明,本文引用均出自课程实验描述或raft论文。

1 核心状态定义

type StateType int
const (
    StateFollower StateType = iota
    StateCandidate
    StateLeader
)

type Raft struct {
    // 3A 关键字段
    currentTerm  int       // 当前任期,单调递增
    votedFor     int       // 当前任期投票给了谁(-1 表示未投票)
    state        StateType // 当前状态
    lastHeartbeat time.Time // 上次收到心跳时间
    electionTimeout time.Duration // 选举超时时间
}

2 选举超时设置

The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds (e.g., once per 10 milliseconds). Because the tester limits you tens of heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.

论文第 5.2 节提到的选举超时时间范围为 150 到 300 毫秒。只有当领导者发送心跳的频率远高于每 150 毫秒一次(例如每 10 毫秒一次)时,该范围才合理。由于测试程序将你的心跳频率限制为每秒几十次,因此你必须使用比论文中 150 到 300 毫秒更大的选举超时时间;但也不宜过大,否则可能导致无法在五秒内成功选出领导者。

The tester requires that the leader send heartbeat RPCs no more than ten times per second.

测试程序要求领导者每秒最多发送 10 次心跳 RPC。

综上实验提示,我选择发送心跳包的间隔为 100ms,而选举超时时间为 450ms~750ms。

核心代码如下:

func (rf *Raft) getElectionTimeout() time.Duration {
    return time.Duration(450+rand.Int63()%300) * time.Millisecond
}

func (rf *Raft) resetElectionTimer() {
    rf.lastHeartbeat = time.Now()
    rf.electionTimeout = rf.getElectionTimeout()
}

关键设计:

3 选举触发逻辑

func (rf *Raft) ticker() {
    for rf.killed() == false {
        rf.mu.Lock()
        state := rf.state
        timeout := rf.electionTimeout
        elapsed := time.Since(rf.lastHeartbeat)
        rf.mu.Unlock()

        if state == StateLeader {
            rf.replicateLog()  //  leader 发送心跳
            time.Sleep(heartbeatInterval)
            continue
        }

        if elapsed >= timeout {
            rf.startElection()  // 超时,发起选举
        }
        // ...
    }
}

重点:

4 发起选举

func (rf *Raft) startElection() {
    rf.mu.Lock()
    if rf.state == StateLeader {
        rf.mu.Unlock()
        return
    }

    rf.becomeCandidate()  // 转为候选者,term+1
    currentTerm := rf.currentTerm
    lastLogIndex := rf.lastLogIndex()
    lastLogTerm := rf.lastLogTerm()
    majority := len(rf.peers)/2 + 1
    votes := 1  // 自己投一票
    rf.mu.Unlock()

    // 并行向其他节点请求投票
    for i := range rf.peers {
        if i == rf.me { continue }
        go func(server int) {
            args := &RequestVoteArgs{
                Term:         currentTerm,
                CandidateId:  rf.me,
                LastLogIndex: lastLogIndex,  // 3B 使用
                LastLogTerm:  lastLogTerm,    // 3B 使用
            }
            reply := &RequestVoteReply{}

            if !rf.sendRequestVote(server, args, reply) {
                return  // RPC 失败
            }

            rf.mu.Lock()
            defer rf.mu.Unlock()

            // 检查状态是否过期
            if rf.state != StateCandidate || rf.currentTerm != currentTerm {
                return
            }

            if reply.Term > rf.currentTerm {
                rf.becomeFollower(reply.Term)  // 发现更高任期,退位
                return
            }

            if reply.VoteGranted {
                votes++
                if votes >= majority && rf.state == StateCandidate {
                    rf.becomeLeader()
                    go rf.replicateLog()  // 立即发送心跳宣告领导权
                }
            }
        }(i)
    }
}

核心细节:

  1. 状态检查:RPC 返回后必须检查 state 和 currentTerm 是否还是选举时的值
  2. 双检查成为 Leader:不仅票数要过半,还要确保自己仍是 Candidate(可能已经被其他Leader的心跳转为Follower)

5 处理投票请求

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    reply.Term = rf.currentTerm
    reply.VoteGranted = false

    if args.Term < rf.currentTerm {
        return  // 拒绝旧任期的候选者
    }

    if args.Term > rf.currentTerm {
        rf.becomeFollower(args.Term)  // 更新任期,转为Follower
    }

    // 投票条件:未投票或已投给该候选者,且候选者日志至少一样新
    if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
        rf.isLogUpToDate(args.LastLogIndex, args.LastLogTerm) {

        if rf.votedFor != args.CandidateId {
            rf.votedFor = args.CandidateId
            rf.persist()  // 持久化投票记录
        }
        rf.resetElectionTimer()  // 重置选举定时器
        reply.VoteGranted = true
    }

    reply.Term = rf.currentTerm
}

投票规则(Raft 论文 Figure 2):

投票规则(Raft 论文 Figure 2)

6 状态转换函数

func (rf *Raft) becomeFollower(term int) {
    if term > rf.currentTerm {
        rf.currentTerm = term
        rf.votedFor = -1      // 新任期,清空投票记录
        rf.persist()
    }
    rf.state = StateFollower
    rf.resetElectionTimer()
}

func (rf *Raft) becomeCandidate() {
    rf.state = StateCandidate
    rf.currentTerm++        // 增加任期
    rf.votedFor = rf.me     // 投自己一票
    rf.resetElectionTimer()
    rf.persist()
}

func (rf *Raft) becomeLeader() {
    if rf.state != StateCandidate {
        return
    }
    rf.state = StateLeader
    // 初始化 Leader 状态(nextIndex, matchIndex)
    // ...
}

7 关键并发注意事项

  1. 锁的粒度:所有状态变更都要持有 rf.mu,但 RPC 调用不能持锁(避免阻塞)
  2. stale RPC 处理:RPC 返回后必须检查状态和任期是否变化
if rf.state != StateCandidate || rf.currentTerm != currentTerm {
    return
}
  1. Leader 不会自降身份:只有发现更高任期时才会从 Leader 转为 Follower(在 becomeFollower 和 RPC handler 中处理)

Edit page
Share this post on:

Next Post
Go Map 底层实现深度解析:Go 1.23 vs Go 1.24