建议开始前的准备工作:


整个Lab2完成后,感觉踩过的很多坑和一些奇怪的错误都是因为没有好好读实验要求建议和推荐的文章导致的,尤其是 Raft论文 中的 Figure2 ,里面的每一行要求都是 必须 完成的,基本上完成了Figure2和Figure2部分说明的要求大体框架部分就完成了,但部分细节和实验测试点仅仅实现论文里要求的内容是不够的,这些要求都在Lab2页面里提到了。

下面这些文章和资料基本可以解决你遇到的大部分问题:

  • Debugging by Pretty Printing这是助教提供的DEBUG脚本,里面大部分是讲他怎么开发的脚本,强烈建议使用这个测试脚本和Util里面提供的DPrintf进行调试,不然2B 2C大量日志够受的,这里我们重点看一下使用方法

[!NOTE] 参数选项

  • --sequential / -s: 按顺序运行每个测试组中的所有测试。默认情况下,测试是并发运行的。
  • --workers / -p <数量>: 指定并行任务的数量。默认为1。
  • --iter / -n <次数>: 设置要运行的迭代次数。默认为10次。
  • --output / -o <路径>: 指定输出路径。如果没有指定,则不会保存输出结果。
  • --verbose / -v: 设置详细程度。每多一次-v,输出的信息就更详细。默认为0。
  • --archive / -a: 保存所有日志,而不仅仅是失败的测试日志。
  • --race / --no-race / -r / -R: 启用或禁用竞态检测器。默认情况下,不启用竞态检测。
  • --loop / -l: 持续运行测试。每次迭代后,迭代次数将根据增长率调整。
  • --growth / -g <比率>: 设置在使用--loop时迭代次数的增长率。默认为10。
  • --timing / -t: 报告运行时长。只在macOS系统上有效。

运行测试时记得使用 -race检查数据竞争,发生数据竞争可能会导致无法预测的结果。 每个Lab下面的Hint一定认真阅读。

实验记录


Lab2A

第一个实验还算比较简单,测试点也比较松,感觉最主要的问题还是因为刚开始上手,对整个开发流程和架构还不是很熟悉 。 Go并发安全的Channel还是挺好用的,建议可以灵活运用CSP编程思想,可以用Channel通信的地方尽量用Channel,避免过多的拿锁解锁。 可以使用 Context灵活终止函数。 在我的设计里,每个Raft实例都运行着AppendListenerticker,通过lastAppendTime判断是否发生心跳超时。 整个测试脚本对electionTimeout还是比较宽松,不建议时间太短,不然会导致后面选举频繁刷新Term导致出错。 AppendListener():

func (rf *Raft) AppendListener() {

	lastAppendTime := time.Now()
	isElecting := false
	ctx, cancel := context.WithCancel(context.Background())
	for rf.killed() == false {
		select {
		case msg := <-rf.appendReceiveChan:
			if isElecting {
				rf.mu.Lock()
				if msg.Term >= rf.currentTerm {
					//如果当前正在选举,终止选举状态
					cancel()
					//重新初始化ctx和cancel
					ctx, cancel = context.WithCancel(context.Background())
					isElecting = false
				} //否则reject msg
				rf.mu.Unlock()
			} else {
				lastAppendTime = time.Now()
			}

		case electionTimeout := <-rf.tickerChan: // check election timeout
			if isElecting {
				//如果当前正在选举,终止选举状态
				cancel()
				//重新初始化ctx和cancel
				ctx, cancel = context.WithCancel(context.Background())
				isElecting = false
			}
			rf.mu.Lock()

			//如果已经选举为leader
			if rf.currentStatus == 3 {
				rf.mu.Unlock()
				lastAppendTime = time.Now()
				isElecting = false
				continue
			}
			rf.mu.Unlock()

			if time.Since(lastAppendTime) > electionTimeout {
				lastAppendTime = time.Now()
				go rf.startElection(ctx)
				isElecting = true
			}
		}
	}
}

ticker()

func (rf *Raft) ticker() {
	for rf.killed() == false {

		// Your code here (2A)
		// Check if a leader election should be started.

		electionTimeout := time.Duration(800+rand.Int63()%400) * time.Millisecond
		time.Sleep(electionTimeout)
		rf.tickerChan <- electionTimeout
		
	}

}

startElection()

func (rf *Raft) startElection(ctx context.Context) {
	rf.mu.Lock()
	serverNum := len(rf.peers)
	//Start an election
	rf.currentStatus = 2
	rf.currentTerm++
	rf.votedFor = rf.me
	voteChan := make(chan RequestVoteReply, serverNum)
	args := &RequestVoteArgs{
		Term:         rf.currentTerm,
		CandidateId:  rf.me,
		LastLogIndex: len(rf.log) - 1 ,
		LastLogTerm:  rf.log[len(rf.log)-1].Term,
	}
	rf.mu.Unlock()
	for i := 0; i < serverNum; i++ {
		if i != rf.me {
			go rf.sendRequestVote(i, args, &voteChan)
		}
	}

	voteYes := 1
	cnt := 1
	for {
		select {
		case <-ctx.Done():
			return
		case rpl := <-voteChan:
			rf.mu.Lock()
			if rf.currentTerm != args.Term {
				rf.mu.Unlock()
				continue
			}
			if rpl.Term > rf.currentTerm {
				rf.RefreshTerm(rpl.Term)
				rf.mu.Unlock()
				return
			}
			if rpl.VoteGranted {
				voteYes++
				if voteYes*2 > serverNum {
					//初始化Leader状态
					rf.currentStatus = 3
					rf.mu.Unlock()
					go rf.LeaderHeartBeatProducer()
					return
				}
				rf.mu.Unlock()
			} else {
				cnt++
				if cnt*2 > serverNum {
					rf.currentStatus = 1
					rf.mu.Unlock()
					return
				}
				rf.mu.Unlock()
			}
		}

	}

}

其他地方按照论文设计即可,2A还算比较简单。

Lab2B


如果遇到问题不建议死磕,多查查资料。

从Lab2B开始就上强度了,2B完成的好的话后面2C会非常轻松,在运行2B的测试脚本的时候建议多运行几遍,出现的每个FAIL都要解决,2B的测试脚本比较宽松,到2C的时候会暴露很多2B存在的问题,到那个时候会很头疼。 Raft实例初始化时严格按照论文要求。 Lab2B让我们实现Raft的日志记录与提交,我觉得最难的是边界情况和对日志下标的处理,这部分只能自己慢慢调,不断耐心打日志分析出错的原因。 发送日志的部分,这里不要过度设计,心跳HeartBeatProducer根据 NextIndex 动态生成发送给Follower的日志就行了,不要在额外设计函数处理了,我一开始还单独设计了发送日志的函数导致中途重构了一次。

发送日志的函数 LeaderSendLog()

func (rf *Raft) LeaderSendLog() {
	rf.mu.Lock()
	for i := range rf.peers {
		if i == rf.me {
			continue
		}

		appArg := AppendArg{
			Term:         rf.currentTerm,
			LeaderId:     rf.me,
			PrevLogIndex: rf.nextIndex[i] - 1, 
			PrevLogTerm:  rf.log[rf.nextIndex[i]-1].Term,
			Entries:      rf.log[rf.nextIndex[i]:],
			LeaderCommit: rf.commitIndex,
		}

		go rf.sendAppendEntries(i, &appArg, len(rf.log)-1)
	}
	rf.mu.Unlock()

}

测试脚本后面有RPC次数检测,不能发送过多的RPC,所以再重试的时候要谨慎处理,根据助教的博客添加AppendEntries失败后对NextIndex的优化。

每次AppendEntries成功后及时更新 MatchIndex的值,这个数组表示有已知Follower的日志有多长,成功后判断是否大多数都达到MatchIndex,达到后即可提交。 需要注意Leader只能提交当前Term的日志 (Figure8)

func (rf *Raft) checkMatchIndexAndCommit(index int) {

	if index < rf.commitIndex {
		return
	}

	if rf.currentStatus != 3 {
		return
	}

	if rf.log[index].Term != rf.currentTerm {//Figure8
		return
	}
	cnt := 1
	for i := range rf.peers {
		if i == rf.me {
			continue
		}
		if rf.matchIndex[i] >= index {
			cnt++
		}
	}
	if cnt*2 > len(rf.peers){
		rf.apply2StateMachine(index)
		return
	}

}

func (rf *Raft) apply2StateMachine(CommitIdx int) {
	if rf.log[CommitIdx].Term != rf.currentTerm {
		return
	}
	rf.commitIndex = CommitIdx
	for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
		alyMsg := ApplyMsg{
			CommandValid: true,
			Command:      rf.log[i-rf.lastIncludedIndex].Cmd,
			CommandIndex: i,
		}
		//applyCH会阻塞,而且最好保证只有一个goroutine提交
		// rf.mu.Unlock()
		rf.applyChBuffer <- alyMsg
		// rf.mu.Lock()
	}
	rf.lastApplied = rf.commitIndex
}

Lab2C


Lab2C让我们完成Raft状态持久化,Raft实例在崩溃后会从持久存储(Persister)读取Raft状态。

理论上Lab2C只用完成存储就行了,然后在每个状态变更的时候都Persist()一下,但更多的是让我们填2B埋下的坑。 我觉得Lab2B和Lab2C应该合并起來一起测,前面Lab2B实现的不够完美的话,Lab2C会非常痛苦,特别是TestFigure8Unreliable2C的情况,该测试模拟了在真实网络环境下可能存在的丢包和延迟乱序等情况,我们必须根据拒绝过时的请求和数据,特别是乱序问题,处理不好的话Raft会变得一团乱。 例如,Leader在发送AppendEntries并收到回复,必须检查从发送开始自己的状态有没有变化,

if rf.currentTerm != args.Term || rf.currentStatus != 3 {
		return false
	}

发送RequestVote收到回复时,

if rf.currentTerm != args.Term {
	rf.mu.Unlock()
	continue
}

针对Figure8的情况,可以看出是因为Leader提交了不是自己Term的Log导致的,所以在Leader提交时务必检查这个Index是否是在自己的Term下的。

//checkMatchIndexAndCommit
if rf.log[index].Term != rf.currentTerm {
		return
	}

同时记得提高心跳频率,以防网络丢包对方没收到心跳数据等,我的设计是50ms发送一次心跳。 Leader在收到RPC请求或回复时,及时检查自己的Term有没有过期,并及时更新自己的状态,终止发送一切RPC,否则也会出错。

Lab2C的ElectionTimeOut一定要设置的足够长,我的设置是800ms-1200ms

Lab2C对reach agreement的要求时间比较短,如果出现这个错误,尝试检查下自己的NextIndex重试优化有没有问题。

在Raft论文中,有日志一致性的说法,即当 如果两个Log Entries的Index和Term都相同,则他们的Command及其之前的所有日志一定相同,所以当你Debug的时候如果出现两个日志Term和Index都一样,但日志却不一样时,可以尝试检查 VotedFor 状态有没有正确存储和更改,一个Follower不可能同时投票给两个相同Term的Leader。

Lab2D


完成了前三个测试,最后一个离成功就不远了,Lab2D让我们完成Raft的快照机制,防止日志过长以及快速帮助太落后的节点和新加入的节点跟上 Leader 状态。

2D我完成的相对顺利,只遇到了一个死锁问题,加一个缓冲区就解决了。具体就是之前向applyCh提交时并没有释放锁,而是等待提交完成后再释放锁,在2D中会与测试脚本发生锁竞争形成死锁,自己make一个带大一点缓冲区的Channel就可以了,再开一个Goroutine专门等待阻塞并提交。

func (rf *Raft) applier() {
	for rf.killed() == false {
		for data := range rf.applyChBuffer {
			rf.applyCh <- data
		}
	}
}

rf.applyChBuffer = make(chan ApplyMsg, 1000)

其他就是一些小细节问题了,记得给每个用到Index的地方都 + - 一下LastIncludedIndex,持久化的时候把LastIncludedIndexLastIncludedTerm也持久化起來,报错的时候对着错误提示慢慢改下标就行了。

在2D中rf.persister.Save(raftState, nil)就不能再存储 nil了,否则会把之前的快照覆盖掉,正确做法是存储``rf.persister.Save(raftState, rf.persister.ReadSnapshot())`.

StateMachine会不定时发送Snapshot给Raft,当Snapshot的index < lastIncludedIndex时记得拒绝。

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if args.Term < rf.currentTerm {
		return
	}
	rf.RefreshTerm(args.Term)
	reply.Term = rf.currentTerm

	if args.LastIncludedIndex <= rf.lastIncludedIndex {
		return
	}
	rf.lastIncludedIndex = args.LastIncludedIndex
	rf.lastIncludedTerm = args.LastIncludedTerm
	rf.lastApplied = args.LastIncludedIndex
	rf.commitIndex = args.LastIncludedIndex
	applyMsg := ApplyMsg{
		SnapshotValid: true,
		Snapshot:      args.SnapShot,
		SnapshotTerm:  rf.lastIncludedTerm,
		SnapshotIndex: rf.lastIncludedIndex,
	}
	rf.applyCh <- applyMsg
	// If existing log entry has same index and term as snapshot’s last included entry, retain log entries following it and reply
	if args.LastIncludedIndex < len(rf.log)-1+rf.lastIncludedIndex && rf.log[args.LastIncludedIndex-rf.lastIncludedIndex].Term == args.LastIncludedTerm {
		rf.log = rf.log[args.LastIncludedIndex-rf.lastIncludedIndex:]
		rf.persist(args.SnapShot)
		return
	}
	rf.log = make([]LogEntry, 1)
	rf.log[0] = LogEntry{Term: rf.lastIncludedTerm, Cmd: nil}
	rf.persist(args.SnapShot)
}

func (rf *Raft) LeaderSendLog() {
	rf.mu.Lock()
	for i := range rf.peers {
		if i == rf.me {
			continue
		}
	
		if rf.nextIndex[i] <= rf.lastIncludedIndex {
			//发送snapshot
			installArgs := &InstallSnapshotArgs{
				Term:              rf.currentTerm,
				LeaderId:          rf.me,
				LastIncludedIndex: rf.lastIncludedIndex,
				LastIncludedTerm:  rf.lastIncludedTerm,
				SnapShot:          rf.persister.ReadSnapshot(),
			}
			go rf.SendInstallSnapshot(i, installArgs)
			continue
		}

		appArg := AppendArg{
			Term:         rf.currentTerm,
			LeaderId:     rf.me,
			PrevLogIndex: rf.nextIndex[i] - 1, 
			PrevLogTerm:  rf.log[rf.nextIndex[i]-1-rf.lastIncludedIndex].Term,
			Entries:      rf.log[rf.nextIndex[i]-rf.lastIncludedIndex:],
			LeaderCommit: rf.commitIndex,
		}

		go rf.sendAppendEntries(i, &appArg, len(rf.log)-1+rf.lastIncludedIndex)
	}
	rf.mu.Unlock()

}

以上建议仅供参考,不一定全部正确,过程中需要结合自己的架构去实现

总结


Lab2前前后后花了一个半月左右完成,拖延症比较严重。完成Lab2感觉最大的收获是学习方法和心态上的提升,之前学习都是比较浮躁,急于求成,但学习本身就是沉淀积累的过程,太过于急于求成反而会让自己焦虑浮躁,最后啥也没学到。第一次写那么长的技术文章,做完Lab2还是很有成就感的,让我能静下心来好好复习一下整个实验过程。以前一直不太理解 世上无难事,只怕有心人。这句话的意思,现在才有点悟到其中的道理,只要踏踏实实的静心学习,没有什么学不会的东西。作为一个Ambitious的大学生,不想只满足于CRUD,路漫漫其修远兮呀。

Next,Lab3 !

最后附上通关和100次成功截图。

Success1 Success2