前言 Lab3主要是让我们实现一个单体的KV服务器,我是做完Lab4以后再回来对Lab3做的复盘,时间有点久远部分细节记不太清了,总的来说Lab3写起来还是挺舒服的,最后一个测试的网页可视化工具对我们DEBUG也很方便,这里稍微复习和记录一下写Lab3时踩过的大坑。 写Lab4之前强烈建议多测试几遍Lab3的代码,Lab4的分布式KV很多代码和思想都可以复用Lab3,在这里打好基础后后面的Lab4就会舒服许多,可以专心实现Lab4的分布式细节.
一定多看实验页面给的Hint
线性一致性 Lab3最核心的部分就是对线性一致性的理解。具体定义可以看这两篇文章 Testing Distributed Systems for Linearizability nil.csail.mit.edu/6.5840/2023/notes/l-linearizability.txt 简而言之可以看这张图,有四个操作和四个客户端,他们操作耗时分别不一样,在分布式系统中,先开始的操作有可能再后开始操作之后才结束,而本次实验线性一致性的定义就是在这四个操作时间中,能找到 对应的时间点使得结果符合我们的预期 。比如看这幅图片,在这四个操作中无论什么时候开始结束,我们都能找到Put("x", "0"), Get("x") -> "0", Put("x", "1"), Get("x") -> "1",这就符合线性一致性。 再看这幅图,无论我们怎么尝试都无法画出时间点使得结果合理。 那么如何实现线性一致性呢? 就是利用我们的Raft,使得单体KV服务器群就操作(Get ,PutAppend)达成顺序一致共识。
Get操作也要放到Raft中实现一致性!
Lab3的核心就是让我们设计实现符合线性一致性的KV系统。
重复检测(Replicate Detection) 看这篇实验给的文章就够了 nil.csail.mit.edu/6.5840/2023/notes/l-raft-QA.txt 每个Clerk客户端都应该有一个自己的独特ID和请求序列号,请求序列号用来标注这是当前clerk的第几次操作,用于在服务端那边过滤重复请求。
为什么要这么做呢?
clerk将请求发送到server时要通过start函数将操作共识到Raft中,这个过程是有可能会失败(可能发生Leader更替等情况)的,对于clerk而言不知道是否会失败,所以在共识后如果发生超时,clerk会进行重新尝试,这个时候就有可能导致操作被执行两次,所以我们要在server端为每个clerk维护一个LastSeq记录每个clerk最后一次成功应用的操作,clerk的相同的操作的序列号不会变,操作完成后才允许增加自己的操作ID。server端通过对比发来的seq操作序列号判断这个操作有没有执行过。 同样,针对Get请求,我们也需要维护一个DuplicateTable,用来返回查询结果。为什么Get这种不会造成值变化的操作也要维护呢?这是为了满足线性一致性。 只要执行成功,就应该返回对应时间点的值 例子: C1 C2
put(x,10) first send of get(x), reply(10) dropped //回复10丢失 put(x,20) re-sends get(x), server gets 10 from table, not 20` // 重新发送get请求,此时还应该返回10而不是20
启动一个新协程执行来自applyCh共识结束的命令
func (kv *KVServer) ExectuteOp() { for { msg := <-kv.applyCh //如果是旧的日志,直接跳过 if kv.LastApplyIndex > msg.CommandIndex && msg.CommandValid { continue } //只能安装比上次快照大的index if kv.LastSnapshotIndex > msg.SnapshotIndex && msg.SnapshotValid { continue } if msg.CommandValid { op := msg.Command.(Op) kv.mu.Lock() //泛型判断操作类型 if op.Type == GetType { args := op.Cmd.(GetArgs) //如果操作已经被执行过 if args.Seq <= kv.LastSeq[args.ClientID] { kv.mu.Unlock() continue } if _, ok := kv.DupGetTab[args.ClientID]; !ok { kv.DupGetTab[args.ClientID] = make(map[int64]GetReply) } kv.DupGetTab[args.ClientID][args.Seq] = GetReply{Value: kv.Data[args.Key], Err: OK} kv.LastSeq[args.ClientID] = args.Seq } else if op.Type == PutType { args := op.Cmd.(PutAppendArgs) //如果操作已经被执行过 if args.Seq <= kv.LastSeq[args.ClientID] { kv.mu.Unlock() continue } kv.Data[args.Key] = args.Value kv.LastSeq[args.ClientID] = args.Seq } else if op.Type == AppendType { args := op.Cmd.(PutAppendArgs) if args.Seq <= kv.LastSeq[args.ClientID] { kv.mu.Unlock() continue } if _, ok := kv.DupGetTab[args.ClientID]; !ok { kv.DupGetTab[args.ClientID] = make(map[int64]GetReply) } kv.Data[args.Key] += args.Value kv.LastSeq[args.ClientID] = args.Seq } kv.LastApplyIndex = msg.CommandIndex kv.mu.Unlock() } else if msg.SnapshotValid { //Snapshot kv.mu.Lock() kv.readPersist(msg.Snapshot) kv.mu.Unlock() } } } Get PutAppend 客户端代码
...