前言
Lab3主要是让我们实现一个单体的KV服务器,我是做完Lab4以后再回来对Lab3做的复盘,时间有点久远部分细节记不太清了,总的来说Lab3写起来还是挺舒服的,最后一个测试的网页可视化工具对我们DEBUG也很方便,这里稍微复习和记录一下写Lab3时踩过的大坑。 写Lab4之前强烈建议多测试几遍Lab3的代码,Lab4的分布式KV很多代码和思想都可以复用Lab3,在这里打好基础后后面的Lab4就会舒服许多,可以专心实现Lab4的分布式细节.
一定多看实验页面给的Hint
线性一致性
Lab3最核心的部分就是对线性一致性的理解。具体定义可以看这两篇文章
Testing Distributed Systems for Linearizability
nil.csail.mit.edu/6.5840/2023/notes/l-linearizability.txt
简而言之可以看这张图,有四个操作和四个客户端,他们操作耗时分别不一样,在分布式系统中,先开始的操作有可能再后开始操作之后才结束,而本次实验线性一致性的定义就是在这四个操作时间中,能找到 对应的时间点使得结果符合我们的预期 。比如看这幅图片,在这四个操作中无论什么时候开始结束,我们都能找到Put("x", "0")
, Get("x") -> "0"
, Put("x", "1")
, Get("x") -> "1"
,这就符合线性一致性。
再看这幅图,无论我们怎么尝试都无法画出时间点使得结果合理。
那么如何实现线性一致性呢?
就是利用我们的Raft,使得单体KV服务器群就操作(Get
,PutAppend
)达成顺序一致共识。
Get操作也要放到Raft中实现一致性!
Lab3的核心就是让我们设计实现符合线性一致性的KV系统。
重复检测(Replicate Detection)
看这篇实验给的文章就够了
nil.csail.mit.edu/6.5840/2023/notes/l-raft-QA.txt
每个Clerk客户端都应该有一个自己的独特ID和请求序列号,请求序列号用来标注这是当前clerk
的第几次操作,用于在服务端那边过滤重复请求。
为什么要这么做呢?
clerk
将请求发送到server时要通过start函数将操作共识到Raft中,这个过程是有可能会失败(可能发生Leader更替等情况)的,对于clerk
而言不知道是否会失败,所以在共识后如果发生超时,clerk
会进行重新尝试,这个时候就有可能导致操作被执行两次,所以我们要在server端为每个clerk
维护一个LastSeq
记录每个clerk
最后一次成功应用的操作,clerk
的相同的操作的序列号不会变,操作完成后才允许增加自己的操作ID。server端通过对比发来的seq
操作序列号判断这个操作有没有执行过。
同样,针对Get
请求,我们也需要维护一个DuplicateTable
,用来返回查询结果。为什么Get
这种不会造成值变化的操作也要维护呢?这是为了满足线性一致性。
只要执行成功,就应该返回对应时间点的值
例子:
C1 C2
put(x,10) first send of get(x), reply(10) dropped //回复10丢失 put(x,20) re-sends get(x), server gets 10 from table, not 20` // 重新发送get请求,此时还应该返回10而不是20
启动一个新协程执行来自applyCh共识结束的命令
func (kv *KVServer) ExectuteOp() {
for {
msg := <-kv.applyCh
//如果是旧的日志,直接跳过
if kv.LastApplyIndex > msg.CommandIndex && msg.CommandValid {
continue
}
//只能安装比上次快照大的index
if kv.LastSnapshotIndex > msg.SnapshotIndex && msg.SnapshotValid {
continue
}
if msg.CommandValid {
op := msg.Command.(Op)
kv.mu.Lock()
//泛型判断操作类型
if op.Type == GetType {
args := op.Cmd.(GetArgs)
//如果操作已经被执行过
if args.Seq <= kv.LastSeq[args.ClientID] {
kv.mu.Unlock()
continue
}
if _, ok := kv.DupGetTab[args.ClientID]; !ok {
kv.DupGetTab[args.ClientID] = make(map[int64]GetReply)
}
kv.DupGetTab[args.ClientID][args.Seq] = GetReply{Value: kv.Data[args.Key], Err: OK}
kv.LastSeq[args.ClientID] = args.Seq
} else if op.Type == PutType {
args := op.Cmd.(PutAppendArgs)
//如果操作已经被执行过
if args.Seq <= kv.LastSeq[args.ClientID] {
kv.mu.Unlock()
continue
}
kv.Data[args.Key] = args.Value
kv.LastSeq[args.ClientID] = args.Seq
} else if op.Type == AppendType {
args := op.Cmd.(PutAppendArgs)
if args.Seq <= kv.LastSeq[args.ClientID] {
kv.mu.Unlock()
continue
}
if _, ok := kv.DupGetTab[args.ClientID]; !ok {
kv.DupGetTab[args.ClientID] = make(map[int64]GetReply)
}
kv.Data[args.Key] += args.Value
kv.LastSeq[args.ClientID] = args.Seq
}
kv.LastApplyIndex = msg.CommandIndex
kv.mu.Unlock()
} else if msg.SnapshotValid {
//Snapshot
kv.mu.Lock()
kv.readPersist(msg.Snapshot)
kv.mu.Unlock()
}
}
}
Get PutAppend 客户端代码
func (ck *Clerk) Get(key string) string {
ck.mu.Lock()
ok := false
var reply *GetReply
i := ck.LastLeader
args := GetArgs{
Key: key,
ClientID: ck.UUID,
Seq: ck.Seq,
}
for !ok || reply.Err != OK {
reply = &GetReply{}
reply.Err = ""
flag := make(chan bool)
go func() {
ok = ck.servers[i].Call("KVServer.Get", &args, &reply)
flag <- true
}()
select {
case <-flag:
case <-time.After(500 * time.Millisecond):
}
i = nrand() % int64(len(ck.servers))
}
ck.Seq++
ck.LastLeader = reply.Leader
ck.mu.Unlock()
return reply.Value
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.mu.Lock()
ok := false
var reply *PutAppendReply
i := ck.LastLeader
args := PutAppendArgs{
Key: key,
Value: value,
Op: op,
ClientID: ck.UUID,
Seq: ck.Seq,
}
for !ok || reply.Err != OK {
reply = &PutAppendReply{}
reply.Err = ""
flag := make(chan bool)
go func() {
ok = ck.servers[i].Call("KVServer.PutAppend", &args, &reply)
flag <- true
}()
select {
case <-flag:
case <-time.After(500 * time.Millisecond):
}
i = nrand() % int64(len(ck.servers))
}
ck.Seq++
ck.LastLeader = reply.Leader
ck.mu.Unlock()
}
服务端操作
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
reply.Err = OK
cmd := Op{Type: GetType, Cmd: *args}
kv.mu.RLock()
if rpl, ok := kv.DupGetTab[args.ClientID][args.Seq]; ok {
reply.Err = OK
reply.Value = rpl.Value
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()
_, _, isLeader := kv.rf.Start(cmd)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
reply.Leader = int64(kv.me)
for {
select {
case <-time.After(200 * time.Millisecond):
reply.Err = ErrAgreement
return
default:
kv.mu.RLock()
if rpl, ok := kv.DupGetTab[args.ClientID][args.Seq]; ok {
reply.Err = OK
reply.Value = rpl.Value
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()
time.Sleep(10 * time.Millisecond)
}
}
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
reply.Err = OK
cmd := Op{}
cmd.Cmd = *args
//说明是很久以前的请求,让client重新发送
kv.mu.RLock()
if kv.LastSeq[args.ClientID] >= args.Seq {
kv.mu.RUnlock()
reply.Err = OK
return
}
kv.mu.RUnlock()
if args.Op == "Put" {
cmd.Type = PutType
} else {
cmd.Type = AppendType
}
_, _, IsLeader := kv.rf.Start(cmd)
if !IsLeader {
reply.Err = ErrWrongLeader
return
}
reply.Leader = int64(kv.me)
for {
select {
case <-time.After(200 * time.Millisecond):
reply.Err = ErrAgreement
return
default:
kv.mu.RLock()
if kv.LastSeq[args.ClientID] >= args.Seq {
reply.Err = OK
kv.mu.RUnlock()
return
}
kv.mu.RUnlock()
time.Sleep(10 * time.Millisecond)
}
}
}
QPS过低
在lab2中,我设置的心跳间隔是50ms
,这导致了就算来了新操作Raft也会把这些操作攒到一起等50ms
再发出去,这样导致了qps过低,这里应该改进为收到后就立马发出。
在我的实现中,lab2又不能发出去的太快,否则虽然能过Lab3的qps测试了,但过不了lab2的测试,所以我设置了10ms发送,Lab2和lab3都能稳定一百次通过。
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.currentStatus != 3 {
return index, term, false
}
term = rf.currentTerm
lg := LogEntry{
Term: term,
Cmd: command,
}
rf.log = append(rf.log, lg)
index = len(rf.log) - 1 + rf.lastIncludedIndex
rf.persist(rf.Persister.ReadSnapshot())
if rf.hbTicker != nil {
rf.hbTicker.Reset(10 * time.Millisecond)
}
//给所有follower发送AppendEntries
// Your code here (2B).
return index, term, isLeader
Snapshot
前面的实现完的话,快照部分就简单了,就是一个新起一个Snapshot
协程定时判断是否大于约定大小,大于的话就调用Raft裁剪就行了。同时在applyCh
那边监听Leader发过来的安装快照就行了,需要注意的是不能什么东西都往快照持久化里面扔,测试脚本有判断快照大小的测试,DuplicateTable
就不用持久化。
func (kv *KVServer) SnapshotRaft() {
if kv.maxraftstate == -1 {
return
}
for {
time.Sleep(200 * time.Millisecond)
RaftSize := kv.rf.Persister.RaftStateSize()
if RaftSize > kv.maxraftstate-500 {
kv.mu.Lock()
if kv.LastApplyIndex <= kv.LastSnapshotIndex {
kv.mu.Unlock()
continue
}
kv.rf.Snapshot(kv.LastApplyIndex, kv.persist())
kv.LastSnapshotIndex = kv.LastApplyIndex
kv.mu.Unlock()
}
}
}
func (kv *KVServer) persist() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.Data)
e.Encode(kv.LastSeq)
e.Encode(kv.LastApplyIndex)
e.Encode(kv.LastSnapshotIndex)
return w.Bytes()
}
func (kv *KVServer) readPersist(data []byte) {
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
d.Decode(&kv.Data)
d.Decode(&kv.LastSeq)
d.Decode(&kv.LastApplyIndex)
d.Decode(&kv.LastSnapshotIndex)
}
还写了个小GC,不过好像在Lab3里面没啥用,到Lab4的Chanllenge里面居然发挥作用了。
// 简单垃圾回收
func (kv *KVServer) GC() {
for {
//STW !!!
time.Sleep(200 * time.Millisecond)
kv.mu.Lock()
for client, seq := range kv.DupGetTab {
for s, _ := range seq {
if kv.LastSeq[client]-200 > s {
delete(kv.DupGetTab[client], s)
}
}
}
kv.mu.Unlock()
}
}
总结
Lab3是我写的最舒服的一个Lab,没有刚遇到Lab2 Unreliable网络环境下的迷茫,又不用关心Lab4中配置变更和分片转移的问题,感觉自己的进步很大,写微服务时也开始考虑以前不会去测试和思考的问题,我觉得程序员很重要的一个能力就是跳出舒适圈,不能每天只写一些简单重复的东西,而是应该去多挑战一下困难,这样才可以带来技术和心态的成长。
最后附上通关截图