对于单个副本组而言,所有机器存储共享同一存储空间,机器数量的增加并不会扩大存储能力。而当 kv 对数量不断增长时,副本组的存储能力成为瓶颈。
参考 Aurora 的做法,不妨将数据划分为多个片段,每个副本组负责分片(Shard)的一个子集。组内机器使用 Raft 来复制分片。此外还有一个分片控制器(Shardctrler),决定哪个副本组应该为每个分片服务,此信息称为配置(Config)。本实验就是实现这样一个 multi-raft 的 kv 存储系统。
lab4 和 lab3 非常像,主要流程是这样的:
- client 通过 clerk 发送请求;
- clerk 根据 key 所在的分片,将请求转发给对应 group 的 Leader;
- Leader 通过 raft 层复制操作日志。
PART A-The Shard controller
这一部分是比较简单的,去重检测、处理请求等方面几乎可以把 lab3 的代码搬过来。关键在于处理上层调用的 Join
、Leave
、Move
、Query
这 4 个命令。由于 Shardctrler 管理一系列配置,故需要在结构体中额外设置一个 Config 切片。结构体如下:
type ShardCtrler struct {
mu sync.RWMutex
me int
applyCh chan raft.ApplyMsg
rf *raft.Raft
lastRequestInfo map[int64]*RequestInfo // clerkID -> requestID
configs []Config // indexed by config num
}
新建配置
Query
相当于读命令,不会对 Config 有任何更改,是可以不经过去重的,直接根据规则返回相应 Config 即可。
其它 3 个需要经过去重表。其中 Move
较为简单,直接修改 Config 中 Shards[]
对应的值即可。对于 Join
和 Leave
,在应用命令时需要创建一个新的 Config。
在 Go 中,map 是引用类型,不能直接拷贝(浅拷贝),而是需要创建一个新的对象并单独复制键和值(深拷贝)。
func (sc *ShardCtrler) makeNewConfig() *Config {
lastconfig := sc.configs[len(sc.configs)-1]
newconfig := Config{
Num: lastconfig.Num + 1,
Shards: lastconfig.Shards,
Groups: make(map[int][]string),
}
for gid, members := range lastconfig.Groups {
newconfig.Groups[gid] = members
}
return &newconfig
}
func (sc *ShardCtrler) ApplyCommand(msg raft.ApplyMsg) {
args := msg.Command.(Args)
ch := args.Ch
reply := new(Reply)
if args.Op == Query {
reply.Err = OK
if args.Num == -1 || args.Num >= len(sc.configs) {
reply.Config = sc.configs[len(sc.configs)-1]
} else {
reply.Config = sc.configs[args.Num]
}
} else if sc.isDuplicated(args.RequestId, args.ClerkId) {
reply.Err = sc.lastRequestInfo[args.ClerkId].Err
} else {
newconfig := sc.makeNewConfig()
switch args.Op {
case Move:
newconfig.Shards[args.Shard] = args.GIDs[0]
case Join:
for gid, members := range args.Servers {
newconfig.Groups[gid] = members
}
sc.shuffleShard(newconfig)
case Leave:
for _, gid := range args.GIDs {
delete(newconfig.Groups, gid)
}
sc.shuffleShard(newconfig)
}
sc.configs = append(sc.configs, *newconfig)
sc.lastRequestInfo[args.ClerkId] = &RequestInfo{
RequestID: args.RequestId,
Err: reply.Err,
}
}
if sc.rf.GetRaftState() == raft.Leader && sc.rf.GetCurrentTerm() == msg.CommandTerm {
go func(reply_ *Reply) { ch <- reply_ }(reply)
}
}
分片再分配
lab 要求新 Config 应尽可能将 Shard 均匀地分配到 groups 中,并应移动尽可能少的 Shard。这样做也是为了减少不必要的网络资源消耗,毕竟后续 Shard 的迁移是需要 RPC 的。
若分配前存在分片处于无主状态(即 Config.Shards[i] = 0),则优先将该分片分配给拥有最少分片的 group。反之,则每次选取拥有最多分片的 group 给拥有最少分片的 group 迁移 Shard,直至两者拥有数量不超过 1。
在 Go 中,map 迭代顺序是不确定的。
func (sc *ShardCtrler) shuffleShard(config *Config) {
N := len(config.Groups)
if N == 0 { // 所有 group 离开,将所有 Shard 设为无主
for i := 0; i < NShards; i++ {
config.Shards[i] = 0
}
return
}
allocated := 0
NumofShards := make(map[int]int)
for gid := range config.Groups {
NumofShards[gid] = 0
}
for _, gid := range config.Shards {
if _, ok := config.Groups[gid]; ok {
allocated++
NumofShards[gid]++
}
}
for {
WhoHasTheMostShards, WhoHasTheLeastShards := -1, -1
MaxmShards, MinmShards := -1, NShards+1
for gid, num := range NumofShards {
if num > MaxmShards {
MaxmShards = num
WhoHasTheMostShards = gid
}
if num < MinmShards {
MinmShards = num
WhoHasTheLeastShards = gid
}
}
if allocated == NShards && MaxmShards < MinmShards+2 { // allocated != NShards 说明存在无主分片,不能简单凭 MaxmShards < MinmShards+2 退出循环
break
}
for sid, gid := range config.Shards {
if _, ok := config.Groups[gid]; !ok { // gid 已离开,直接分给拥有分片最少的 group
config.Shards[sid] = WhoHasTheLeastShards
allocated++
NumofShards[WhoHasTheLeastShards]++
break
} else if gid == WhoHasTheMostShards && allocated == NShards {
config.Shards[sid] = WhoHasTheLeastShards
NumofShards[WhoHasTheMostShards]--
NumofShards[WhoHasTheLeastShards]++
break
}
}
}
}
PART B-Shared Key/Value Server
这一部分我认为是整个 6.824 lab 中最难的一个 Part。尽管 raft 算法也很难,但只要认真研读论文的图 2,依然能够写出 bugfree 的代码。这部分的难点在于,需要我们从头设计一个基于 Shard 的 KV 服务,而没有任何文献资料/Guide 来作指引,一旦没有灵感迸发,就会寸步难行。
开始 4B 之前需要想明白的一些事
磨蹭大半个月+仔细阅读实验要求后,我得出如下总结:
该部分与 lab3 的不同之处在于,group 会不断加入/离开系统,为了平衡负载,系统需要在 group 之间转移 Shard。同时,随着 groups 的变化,系统的 Config 也会不断发生变化,且需要告知所有 group。简而言之,该 lab 的难点并不在于对 client 操作的实现——它已经在 lab3 中做完了,直接照抄即可——而是:
- 如何 ReConfig,Config 有更新后怎么做;
- 如何迁移 Shard,迁移后怎么做;
能够想明白上面两个问题,4B 的所有难点都迎刃而解,包括两个 Challenge。
Clerk
Clerk 的工作和 lab3 非常相似,样例代码的思路也很清楚——一旦收到 ErrWrongGroup 回复,则说明自己缓存的 Config 有误,需向 Shardctrler 请求最新的 Config。和 lab3 类似,Clerk 需要缓存每个 group 的 Leader。
这里有个很重要的一点:如果对某个 group 所有成员均发送失败,则认为该 group 全挂了,系统发生成员变动,此时 Clerk 缓存的 Config 一定已过时,再继续发下去极有可能死循环,需要尝试获取最新 Config。
func (ck *Clerk) SendRequest(Command *OperationCommand) string {
Command.ClerkId, Command.RequestId = ck.ClerkId, ck.RequestId
for {
shard := key2shard(Command.Key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok { // 若 gid 在当前 config 中
if _, ok := ck.volatileLeader[gid]; !ok {
ck.volatileLeader[gid] = 0
}
guard := ck.volatileLeader[gid]
leader := guard
for {
// 寻找 Leader 发送请求
}
}
time.Sleep(100 * time.Millisecond)
ck.config = ck.sm.Query(-1)
}
}
Server
首先思考第一个问题,如何 ReConfig,Config 有更新后怎么做。这个问题还是比较容易的,只需要在启动 server 时异步开启一个 ConfigPuller 协程,定期向 Shardctrler 请求最新的 Config 即可。每个 server 还需维护一个 CurrentConfig
变量,一旦获得较新的 Config,就将其赋给 CurrentConfig。
这个变量主要用于收到操作请求时判断是否返回 ErrWrongGroup,即如果
CurrentConfig.Shards[key2shard(key)] != kv.gid
,说明本 group 不负责该分片,需报错。
这里有一个问题,group 里的每个成员能够单独进行 Config 的拉取和更新吗?回答显然是不能的,这是对 raft 层的不尊重,单独进行 Config 拉取必然会造成不同步,则后续对不同 server 存储的读写也会产生管辖范围不一致的问题。所以这里需要进行是否为 Leader 的判断,如果 Leader 拿到了较新的Config,则向下层发一条更新记录,随着 raft 层的运作,这条记录最终会被所有 server 获得,传到上层进行 Apply 后更新 CurrentConfig。
随着 Config 的更新,Shard 的迁移问题也随之而来,这里我设计为新 owner 向旧 owner 发送 pull 请求(当然 push 也是可以的):旧 owner 收到 pull RPC 后将属于该 Shard 的 kv 对打包进 reply 中发回给新 owner。
最开始我将节点的存储简单的设计为了单个 map,后来发现这样做不够优雅——所有数据都存到同一块区域,每次迁移 Shard 时需要对所有元素进行遍历,尽管对 Config/Shard 的操作只需要对服务器上一把大锁,但 \(O(n)\) 的遍历无端增加了时间消耗,显然是现实不可用的。
在阅读了 Aurora 的论文后我有所启发:我们完全可以定义一个名为 Shard
的结构体,每个 Shard
拥有单独的 map 存储,这样一来进行分片迁移/垃圾回收时便能以 Shard 为粒度进行操作,而不需要遍历所有数据,还能对每个 Shard
进行一些额外的变量设置。
那么能不能在 Apply 更新 Config 日志后立马进行一次 pull Shard 呢?我最开始尝试通过条件变量来进行协程之间的通信,更新完 CurrentConfig
后通知另一个协程发送 pull Shard rpc。但考虑到一旦这个环节 group 挂了,server 重启后并不能够知道当前进行到哪一步,并且 Shard 在 pull 之前是不能提供服务的。所以这个方法立即被我弃用了。
不妨换个思路。任意时刻,server 可能会拥有以下几类 Shard:
- 继续持有的;
- 不再持有,等待被其它 server pull 的;
- 不再持有,但已经被其它 server pull 的;
- 新持有,等待 server 从其他 group 那 pull 的;
- 新持有,已经 pull 到了的;
如果考虑要做 Challenge1,又能够把 3 和 5 进一步细分为:
- 继续持有的;
- 不再持有,等待被其它 server pull 的;
- 不再持有,但
- 被 pull,等待接受 GC 通知;
- 被 pull,已进行 GC;
- 新持有,等待 server 从其他 group 那 pull 的;
- 新持有,且
- pull 到,需发送 GC 通知;
- pull 到,GC 发送完毕。
之所以设计为新 owner pull 到 Shard 后向旧 owner 发送 GC 通知而不是旧 owner 在收到 pull Shard 并 reply OK 后自行 GC,还是因为如果 reply 发过去的途中新 owner 挂了,重启后因为没 pull 到还得重新发 pull Shard 请求,而此时旧 owner GC 完毕后存储清空了,就会产生错误——系统永久性地失去了该 Shard 对应的数据。
收到 GC 通知也意味着新 owner 收到了 reply 并完成了 Shard 的覆盖,可以进行数据回收而不用担心是否挂掉。当然,新 owner 收到 reply 也意味着旧 owner 完成了 GC,不用再发送 GC 通知了。
那当然是选择完成 Challenge 啦!为了对这些不同种类的 Shard 进行区分,比较好的思路是在 Shard 结构体内设置一个 ShardStatus 变量,并且需要将其持久化。这样无论是否挂掉,group 都能在任意时刻根据 Shard 内部的 Status 变量进行相应的操作。并且除了 ConfigPuller 协程,我们还需要再启动两个 ShardPuller
和 GarbageCollector
协程,异步地进行遍历 Shard 并根据 ShardStatus 执行 RPC 等操作。和之前提到的一样,我们依然需要 Leader 来进行一系列交互行为,并利用 Raft 层进行日志同步,所有 server 收到下层传上来的 ApplyMsg 后执行最终的修改操作。
其中 1, 3.b, 5.b 都意味着无需任何后续操作,统一设置为 Ready 状态
type status int
const (
// everyone
Ready status = iota // 一切就绪
// new owner
NeedPull // 表明该分片等待从其他 group 处拉取
ReadyButNeedSendGC // 就绪,但需要通知其他 group 进行 GC
// old owner
Waiting // 表明该分片等待被其他 group 拉取 + 通知 GC
)
type Shard struct {
KVs map[string]string
ShardStatus status
}
一个 Shard 能不能正确处理 client 请求,一是看它是否被当前 server 持有,二是看该 Shard 状态是否为 Ready/ReadyButNeedSendGC,任一条件不满足,都返回 ErrWrongGroup
。在 Apply 客户端操作时,需要将 lab3 的代码修改为:
func (kv *ShardKV) ApplyCommand(msg raft.ApplyMsg) {
// locked
command := msg.Command.(OperationCommand)
ch := command.Ch
reply := new(Reply)
shardId := key2shard(command.Key)
if !kv.ReadyForServer(shardId) {
reply.Err = ErrWrongGroup
} else if command.Op == "Get" {
// Get
} else if kv.isDuplicated(command.RequestId, command.ClerkId) {
reply.Err = kv.lastRequestInfo[command.ClerkId].Err
} else {
// Put or Append
kv.lastRequestInfo[command.ClerkId] = &RequestInfo{
RequestID: command.RequestId,
Err: reply.Err,
}
}
if kv.rf.GetRaftState() == raft.Leader && kv.rf.GetCurrentTerm() == msg.CommandTerm {
go func(reply_ *Reply) {
ch <- reply_
}(reply)
}
}
考虑到 group 会挂掉这个情况,重启后找谁 pull Shard 也是个问题,故除了 CurrentConfig,server 内部还需设置一个 LastConfig 变量。在 lab3 的基础上,server 结构体应该修改为这样:
type ShardKV struct {
mu sync.RWMutex
me int
mck *shardctrler.Clerk
rf *raft.Raft
applyCh chan raft.ApplyMsg
make_end func(string) *labrpc.ClientEnd
gid int
ctrlers []*labrpc.ClientEnd
maxraftstate int // snapshot if log grows this big
lastapplied int
Shards [shardctrler.NShards]Shard
lastRequestInfo map[int64]*RequestInfo
lastConfig shardctrler.Config // (!new)
currentConfig shardctrler.Config
}
🎉这样一来整体的工作流程就很明朗了:
ConfigPuller
获得较新的 Config,通过 Raft 层进行UpdateConfig
日志同步;- Apply 到
UpdateConfig
日志后- 对于仍持有的 Shard,不作变动;
- 对于不再持有的,设置状态为
Waiting
; - 对于新持有的,设置状态为
NeedPull
;
ShardPuller
定期进行检测,一旦发现存在状态为NeedPull
的 Shard,根据 LastConfig 找到旧 owner 并发送 pull Shard RPC;- 旧 owner 无条件将 Shard 写入 reply 并回复,不必考虑是否过期,因为不会改变其状态;
- 新 owner 收到 reply 后,立即
rf.Start()
一条InsertShard
日志,并在 Apply 后设置相应 Shard 的状态为ReadyButNeedSendGC
; GarbageCollector
定期进行检测,一旦发现存在状态为ReadyButNeedSendGC
的 Shard,根据 LastConfig 找到旧 owner 并发送 garbage collector RPC;- 旧 owner 收到 GC 通知,就可以认为上次的 PullReply 已经被新 owner 接收,立即
rf.Start()
一条GarbageCollect
日志,Apply 后清理数据,并设置状态为Ready
; - 新 owner 收到 GC 的 reply 后,就可以认为 GC 通知已经发到旧 owner 手上,可以修改 ShardStatus 防止 GarbageCollector 进行重复 GC 通知,即
rf.Start()
一条GarbageCollect
日志,Apply 后设置状态为Ready
。
7, 8 两步的不同之处在于,新旧 owner Apply GC 日志时,相应的 Shard 状态不同,前者为
ReadyButNeedSendGC
,后者为Waiting
。根据状态不同执行不同操作即可,而不需要为新 owner 再新定义一个UpdateStatus
这样的日志类别。
注意:ConfigPuller 不能无条件进行 Update Config,否则会破坏当前 Shard 的状态,甚至可能存在部分 Shard 仍未 pull 到就需要回复其它 group 的 pull 请求,结果是一部分任务永远无法正确完成,从而系统陷入无尽的错误。由于我们引入了 ShardStatus 这一概念,故令仅当所有 Shard 都变为 Ready 后才能更新 Config。这是一种更安全的行为。
注意:可能某些 Shard 在迁移前仍然由旧 owner 向 client 提供服务,故存在这样一种情况:client 向 group1 发送 shard1 写请求后,group1 实际上执行了这一操作,但给 client 的 reply 丢包了,而其它 server 由于不是 Leader 无法应对请求,故认为 group1 全挂了,此时恰好发现 Config 存在更新,且新 Config 下 shard1 归属于 group2,client 转而向 group2 发送同一写请求。group2 向 group1 pull 了应用过请求的 Shard 后,收到 client 的同一请求,又执行了一遍。为了满足幂等性,pull Shard reply 里还应该包括旧 owner 的去重表来应对这一情况。新 owner 收到后会和自己的去重表进行比对,取同一 clerkId 下的最新 request。
另外,我也遇到了谭佬遇到的同样问题:
最后,在 lab2 的文档中我就提到了 leader 上线后应该立刻 append 一条空日志,这样才可以保证 leader 的状态机最新,然而不幸的是,lab2 的测试在加了空日志后便 Fail 了,因此我便没有再关注。在实现 lab4 时,我最开始并没有关注这件事,最后成了一个大坑,导致我花费了一天的时间才找到问题。该 bug 一般跑 100 次测试能够复现一次,对外的表现是集群出现活锁,无法再服务请求直到超时,而且仅会在几个涉及到重启的测试中出现。经过一番探索,最终发现是在节点频繁的重启过后,出现了 lab2 中描述空日志必要性的例子。这导致某一 raft 组的状态机无法达到最新且不全是默认状态,这使得配置更新协程也无法提交新的配置日志,此时客户端碰巧没有向该 raft 组执行读写请求,因而该 raft 组始终没有当前 term 的日志,从而无法推进 commitIndex,因此整个集群便出现了活锁。该 bug 的解决方法很简单,就是让 raft 层的 leader 在 kv 层周期性的去检测下层是否包含当前 term 的日志,如果没有便 append 一条空日志,这样即可保证新选出的 leader 状态机能够迅速达到最新。其实我认为将空日志检测做到 KV 层并不够优雅,KV 层不需要去了解 raft 层有无空日志会怎么样,更优雅地方式应该是 raft 层的 leader 一上线就提交一个空日志。但总之目前在 6.824 的框架下,也只能在 KV 层做检测了。
我参考了他的做法,加入了空日志检测机制。原文中有提到,不再赘述。
基于以上讨论,整体架构就能进行如下设计了。
StartServer
注意 Shard.KVs 需要先 make() 对象。
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, ctrlers []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
labgob.Register(OperationCommand{})
labgob.Register(ConfigCommand{})
labgob.Register(ShardCommand{})
labgob.Register(EmptyCommand{})
kv := &ShardKV{
me: me,
mck: shardctrler.MakeClerk(ctrlers),
make_end: make_end,
gid: gid,
ctrlers: ctrlers,
maxraftstate: maxraftstate,
lastapplied: 0,
lastRequestInfo: make(map[int64]*RequestInfo),
lastConfig: shardctrler.Config{Num: 0},
currentConfig: shardctrler.Config{Num: 0},
}
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
for i := range kv.Shards {
kv.Shards[i].KVs = make(map[string]string)
}
kv.ApplySnapshot(-1, -1, persister.ReadSnapshot())
go kv.Applier()
go kv.ConfigPuller()
go kv.ShardPuller()
go kv.GarbageCollector()
go kv.EmptyEntryDetector()
return kv
}
MsgStruct
为不同操作命令定义了不同类型,这样就能通过 msg.Command.(type) 进行强转,并进一步根据 Command.Op 执行不同操作。rpc 的参数也统一采用 RPCArgs 的格式——pull Shard 请求和 GC 通知都需要 ShardId 与 ConfigNum。
// cmd
type OperationCommand struct {
Op string
Key string
Value string
ClerkId int64
RequestId int
Ch chan *Reply
}
type Reply struct {
Err Err
Value string
}
type ShardCommand struct {
Op string
Shard *Shard
Sid int
ConfigNum int
LastRequestInfo map[int64]RequestInfo
}
type ConfigCommand struct {
LastConfig shardctrler.Config
NewConfig shardctrler.Config
}
type EmptyCommand struct {
}
// RPC
type RPCArgs struct {
Op string
ShardId int
ConfigNum int
}
type RPCReply struct {
Err Err
Shard Shard
ConfigNum int
LastRequestInfo map[int64]RequestInfo
}
Applier
和 lab3 大同小异,只是多加了一些 Command 类型的判断。
func (kv *ShardKV) Applier() {
// goroutine
for !kv.killed() {
for msg := range kv.applyCh {
if msg.CommandValid {
kv.mu.Lock()
if msg.CommandIndex > kv.lastapplied {
kv.lastapplied = msg.CommandIndex
// 基于 msg.Command.type 执行不同 Apply 操作
if kv.NeedSnapshot() {
kv.MakeSnapshot(msg.CommandIndex)
}
}
kv.mu.Unlock()
} else if msg.SnapshotValid {
kv.mu.Lock()
kv.ApplySnapshot(msg.SnapshotIndex, msg.SnapshotTerm, msg.Snapshot)
kv.mu.Unlock()
}
}
}
}
ConfigPuller
func (kv *ShardKV) ApplyUpdateConfigCommand(msg raft.ApplyMsg) {
// locked
Command := msg.Command.(ConfigCommand)
lastconfig, newconfig := Command.LastConfig, Command.NewConfig
if newconfig.Num == kv.currentConfig.Num+1 { // 逐版本进行更新
kv.lastConfig = lastconfig
kv.currentConfig = newconfig
if newconfig.Num > 1 {
// config 发生变化:
// 1. 分配且仍持有的保持 Ready
// 2. 分配但未持有的变为 NeedPull,等到拉取 shard 完成后变为 Valid
// 3. 持有但未分配的变为 Waiting,等待其他组 pull 后被回收
}
}
}
func (kv *ShardKV) ConfigPuller() {
// goroutine
for !kv.killed() {
if kv.rf.GetRaftState() == raft.Leader {
kv.mu.RLock()
CanPullConfig := true
// 只要有一个 Shard 不是 Ready,就不能 Pull New Config
kv.mu.RUnlock() // 及时放锁
if CanPullConfig {
newconfig := kv.GetNewConfig()
if newconfig.Num > configNum {
if configNum == 0 && newconfig.Num > 1 { // 第一次加入集群,需初始化 lastconfig
lastConfig = kv.mck.Query(newconfig.Num - 1)
}
kv.rf.Start(ConfigCommand{
LastConfig: lastConfig,
NewConfig: newconfig,
})
}
}
}
time.Sleep(NewConfigQueryTimeOut)
}
}
ShardPuller & GarbageCollector
两者其实非常相似。
func (kv *ShardKV) ApplyShardCommand(msg raft.ApplyMsg) {
Command := msg.Command.(ShardCommand)
switch Command.Op {
case "InsertShard":
kv.InsertShard(Command)
case "GarbageCollect":
kv.GarbageCollect(Command)
default:
panic("Undefined Command!")
}
}
func (kv *ShardKV) InsertShard(Command ShardCommand) {
// locked
configNum, newshard, sid := Command.ConfigNum, Command.Shard, Command.Sid
if configNum == kv.currentConfig.Num && kv.Shards[sid].ShardStatus == NeedPull { // 覆盖 Shard 前检查该命令是否过期
// 覆盖 kv 对与去重表,并修改 ShardStatus 为 ReadyButNeedSendGC
}
}
func (kv *ShardKV) ShardPuller() {
// goroutine
for !kv.killed() {
if kv.rf.GetRaftState() == raft.Leader {
kv.mu.RLock()
configNum := kv.currentConfig.Num // 捎带当前 config.num,防止收到过期回复
var wg sync.WaitGroup
for sid, gid := range kv.lastConfig.Shards {
if kv.NeedPull(sid) {
wg.Add(1)
go func(sid, gid int, others []string) {
defer wg.Done()
// Send RPC and Pull Shard
}(sid, gid, kv.lastConfig.Groups[gid])
}
}
kv.mu.RUnlock()
wg.Wait()
}
time.Sleep(ShardPullerTimeOut)
}
}
func (kv *ShardKV) GarbageCollect(Command ShardCommand) {
// locked
configNum, sid := Command.ConfigNum, Command.Sid
if configNum == kv.currentConfig.Num {
if kv.Shards[sid].ShardStatus == ReadyButNeedSendGC {
kv.Shards[sid].ShardStatus = Ready
} else if kv.Shards[sid].ShardStatus == Waiting {
kv.Shards[sid].KVs = make(map[string]string)
kv.Shards[sid].ShardStatus = Ready
}
}
}
func (kv *ShardKV) GarbageCollector() {
// locked
for !kv.killed() {
if kv.rf.GetRaftState() == raft.Leader {
kv.mu.RLock()
configNum := kv.currentConfig.Num // 捎带当前 config.num,防止收到过期回复
var wg sync.WaitGroup
for sid, gid := range kv.lastConfig.Shards {
if kv.ReadyButNeedSendGC(sid) { // 向前任持有者发送 GC 请求
wg.Add(1)
go func(sid, gid int, others []string) {
defer wg.Done()
// Send RPC and make GC
}(sid, gid, kv.lastConfig.Groups[gid])
}
}
kv.mu.RUnlock()
wg.Wait()
}
time.Sleep(GarbageCollectorTimeOut)
}
}
EmptyEntryDetector
func (kv *ShardKV) ApplyEmptyCommand() {
// "EMPTY COMMAND"
}
func (kv *ShardKV) EmptyEntryDetector() {
for !kv.killed() {
if kv.rf.GetRaftState() == raft.Leader {
if !kv.rf.HasLogAtCurrentTerm() {
kv.rf.Start(EmptyCommand{})
}
}
time.Sleep(EmptyEntryDetectorTimeOut)
}
}
组间 RPC
func (kv *ShardKV) HandleBetweenGroupRequest(args *RPCArgs, reply *RPCReply) {
// RPC
if kv.rf.GetRaftState() != raft.Leader {
reply.Err = ErrWrongLeader
return
}
kv.mu.RLock()
defer kv.mu.RUnlock()
sid, configNum := args.ShardId, args.ConfigNum
if configNum > kv.currentConfig.Num {
reply.ConfigNum, reply.Err = kv.currentConfig.Num, ErrNotReady
return
}
// 允许接收过期 RPC,但 reply 会捎带 CurrentConfigNum,这样尽管收到过期 reply 也不会进行任何操作
switch args.Op {
case "PullShard":
// 无条件回复
case "GarbageCollect":
// 无条件 Start 新命令,将状态判断留给 Apply 阶段
}
reply.ConfigNum, reply.Err = configNum, OK
}
总结
这么设计有以下好处:
- pull Shard 后能立即为 client 提供服务,而无需阻塞;
- 不同 group 之间 Shard 状态互为同步关系,网络顺畅时系统才能够持续进行;
- 不同 ShardId 的 Shard 互相独立,当两组互相迁移 Shard 时并不会造成死锁;
- group 能够 step-by-step 地更新 Config,所有任务都能稳定执行,而不会出现步子跨太大扯着蛋的情况;
- 失去 Shard 所有权的 group 能够定期删除数据,而不会使存储无线增长;
- 基于 ShardStatus 的任务执行减少了不必要的阻塞,使得整个系统设计更加优雅;
总之系统的设计还是需要灵感与无数前人的经验总结,至少 Aurora 的论文给了我很大启迪。虽然因为磕盐原因 4B 磨蹭了很久,整个 lab 前前后后磨蹭了近 3 个月,但总归是 solo && all passed 了,对 Distributed System 也有了更进一步的理解。完结撒花~