本实验要求在每一个 Raft 节点上实现一个 K/V 服务器(server),向上接受客户端(client)的请求并返回请求结果,向下生成日志应用到 Raft 节点中。
整个模式大概长这样。
主要流程是这样的:
- client 寻找对应 Raft 节点是 Leader 的 server,并发起一个请求(Put/Append/Get);
- server 收到请求后,调用 Raft 的
Start()
函数生成新日志; - Raft 层间互相进行日志复制之后,将 committed 日志传到到 applyChannel 里;
- server 从 applyChannel 中拿到日志,提取出命令后,正式将命令应用到存储中;
PART A-Key/value service without snapshots
发送请求
由于 Start()
函数会立即返回 isleader 信息,所以如果一个 server 节点调用了 Start() 后发现不是 Leader,则返回一个 ErrWrongLeader
当且仅当 client RPC 发送给正确的 Leader 后,才算成功;否则:
- If the Clerk sends an RPC to the wrong kvserver, or if it cannot reach the kvserver, the Clerk should re-try by sending to a different kvserver.
- If the operation failed to commit (for example, if the Leader was replaced), the server reports an error, and the Clerk retries with a different server.
server 向下调用
首先是根据 Start()
返回的 isleader 来确定是否需要返回 ErrWrongLeader 错误。若成功,则等待流程 4。
注意到,在等待的过程中 server 是上了锁的,那必然被阻塞,此时就需要在 startKVserver 的时候额外开启一个叫 Applier() 的 goroutine 来进行 applyChannel 的读取。
func (kv *KVServer) Applier() {
for !kv.killed() {
msg := <-kv.applyCh
if msg.CommandValid {
// 应用请求
} else if msg.SnapshotValid {
// 应用快照
}
}
}
同时遇到这样一个问题:存在一个网络分区 [1] | [2, 3, 4, 5],Peer1 是分区前的 leader,分区后 1 仍然是 leader。分区 [2, 3, 4, 5] 由于选举导致它们的 Term 大于 Peer1 的 Term,假设该分区的 Leader 为 Peer2。现在有一个 clientA 一直给分区 [1] 发送请求,某一个请求 {requestID: x,clientID: A}
发给 Peer1 后,其调用 Start()
并返回日志所在索引 Index=i
后,网络恢复,Peer1 收到 Term 更高的 Peer2 的 AppendEntries 消息后将原先 Index=i
处的日志覆盖,更新 commitIndex 后告知上层进行 Apply。这就会导致同一 Index 处的命令出现二义性——client 发起了请求 A,但应用到存储却是另一请求 B。
解决方法为:在结构体 Args
中增加一个 channel,当 applier 处理完消息时通过该 channel 返回处理结果,同时判断 msg.commandTerm
和 kv.rf.currentTerm
是否匹配,如果不匹配,说明可能被其他日志覆盖,返回一个 ErrWrongLeader 让上层重发。
注意,只有 Leader 需要对请求做出答复。
if kv.rf.GetRaftState() == raft.Leader && kv.rf.GetCurrentTerm() == msg.CommandTerm {
go func(reply_ *Reply) {
ch <- reply_
}(reply)
}
应用到状态机
Applier() 对 applyChannel 进行一个 for 的等待,收到消息后,根据收到消息的命令类型的不同,执行不同的应用操作。
Put(key, value) replaces the value for a particular key in the database, Append(key, arg) appends arg to key's value, and Get(key) fetches the current value for the key. A Get for a non-existent key should return an empty string. An Append to a non-existent key should act like Put.
执行完后,返回消息到 waitchannel[Index]
,唤醒 server。
Other
刚开始想到和 Raft 里发送请求投票一样的手段,为每一个 server 开一个 goroutine 去发送 RPC,后来发现最后成功发送的只有一个,其它 goroutine 都是在占 cpu,遂放弃,直接用单个 for 循环。另外,有时候 Leader 会在相当一段时间内保持不变,我们可以 cache 上一次发送请求成功时的 serverId,认为这是 Leader,每次发请求时都可以利用这一信息,避免了不必要的 RPC,加快速度。当 RPC 失败,或发生了 ErrWrongLeader,只需要简单的令 Leader 切换到下一个即可(0->1->...->n-1->0->...)
func (ck *Clerk) SendRequest(args *Args) string { ck.mu.Lock() defer ck.mu.Unlock() args.RequestId, args.ClerkId = ck.RequestId, ck.ClerkId for { reply := &Reply{} ok := ck.servers[ck.volatileLeader].Call("KVServer.HandleRequest", args, reply) if ok && reply.Err != ErrWrongLeader { ck.RequestId++ return reply.Value } ck.volatileLeader = (ck.volatileLeader + 1) % len(ck.servers) } }
在分区测试中,有可能发生 Leader 超时未 apply 的情况(即一个 Leader 被分到了 minority 的网络区),此时需要在 server 等待环节加一个 <-time.After() 的信号接受判断,若超时,则直接返回,并且认为超时也是一种"ErrWrongLeader"。
lab3A 还要求我们不能执行同一个 client 的重复请求,那么需要在每个 server 上放一个 clientID 到其最近一次命令序号 requestID 的映射,如果 Worker 收到的这个命令序号已经被执行过了,那么就不再执行,直接返回 ErrDuplicated。Get 是否重复执行无所谓,因为它并不会对数据库产生实质性的影响,主要是防止多次 Put/Append 同一个值。
总体代码量比 Raft 少太多,但因为论文中并没有给出很详细的指示,就走了很多弯路,以至于绝大多数时间都在 debug...
PART B-Key/value service with snapshots
本实验要求在 3A 的基础上加上 snapshot 功能。
虽然标的是hard,但代码量更少了
server 不断检测 raftStateSize,如果过大,即当 persist.RaftStateSize() >= kv.maxRaftState 时,将当前 db 状态保存下来,调用 Raft 层的 snapshot() 并将 db 状态传入。
要实现有两个函数:
MakeSnapshot()
: 当 raftStateSize 过大时保存存储状态;ApplySnapshot()
: raft 层将 snapshotValid 发到 applyCh,被 server 接收到后执行的操作;
persist 里的 raftstate 和 index 没有直接关系,所以不能用在 snapshot 里的 index 参数。