本 Project 需要我们实现基于 MVCC 的事务模块。
#思路
#Part A Mvcc Txn
需要实现的代码在文件夹 kv/transaction/mvcc 下。
这一 part 需要我们实现支持 MVCC(多版本并发控制, Multiversion Concurrency Control) 的事务 api。
一个事务在时间点 t 处新建时,会被赋予当前存储的 Reader 以及时间戳 StartTS = t。所有的 api 相关操作都是基于时间线来进行处理。
另外还有一个隐性的规则是:两个不同事务的
StartTS一定不相等。
TinyKV 存储使用 3 个 CF 来存放不同类型数据,分别为:
| CF | Key | Value |
|---|---|---|
| default | UserKey_StartTs | Value |
| lock | UserKey | Lock Data Structure To Byte |
| Write | UserKey_CommitTs | Write Data Structure To Byte |
存储按 key 字母序递增排序。同个 key 按存储时间戳降序排序,即越新的在越前面。
#1 Put/Delete
所有写操作都只需要为事务的 writes 切片添加新的 Modify 即可。注意使用实现好的 EncodeKey() 函数将 Key 重新编码为正确格式,以及使用 engine_util 包下的三个 CF 常量: CfDefault, CfLock, CfWrite。
#2 Get
对于 Lock 的读取是简单的,因为任意一个 UserKey 只会对于一个 Lock 变量,调用 txn.Reader.GetCF() 后进行 ParseLock() 即可。
对于 Value 的读取返回在当前时间戳有效的值,则需要考虑在当前事务之前最后一个提交的 Write:
-
如果不存在符合要求的
Write,说明该Value尚未被任何一个事务 commit,这种数据是禁止读取的,否则可能破坏一致性; -
如果最后一个
Write类型是Delete,说明该 Key 在最近的一次事务提交时被删除,那么当前时间点的读取是得不到任何数据的; -
如果最后一个
Write类型是Put,说明在时间戳Write.StartTs处修改的值是有效的,且一定有Write.StartTs <= txn.StartTs,即键EncodeKey(UserKey, Write.StartTs)对应的Value就是我们想要的;
即数据在
LastCommitWrite有效且类型为Put时可读。
对于 Write 则有 2 种不同的 Get
-
CurrentWrite:读取当前事务对给定 Key 施加的Write及其CommitTs,即满足StartTs == txn.StartTs的Write; -
MostRecentWrite:读取给定 Key 的最后一次Write及其CommitTs,
用
IterCF()获取迭代器,因为排序方法的限制,不能用iter.Seek(),这只会得到commit <= txn.StartTs的Write(即前文提到的LastCommitWrite)。需要遍历CfWrite下的所有 K/V。
#Part B TinyKV Server 1
需要实现的代码在文件夹 kv/server 下。
这一 Part 需要我们实现 TinyKV Server 的三大基本 RPC api。
#1 KvGet
KvGet 实现单键读取。对于一个给定的 Key 以及当前时间戳 Version,首先需要检查当前是否被上锁,如果是,则说明必然有某个事务正在对其进行写操作,需要向客户端报告错误。
一把锁的有效时间范围在
[lock.Ts, lock.Ts + lock.Ttl]中(如果lock.Ttl = 0则表明永久有效,直到被删除)。
kv/server/server.golock := getLock(txn, key) if lock.IsLockedAt(Version) { // err: the key has been locked response.Error = &kvrpcpb.KeyError{ Locked: lock.Info(Key), } return response, nil }
如果未上锁,则正式读取,调用 txn.GetValue() 即可。如果返回值为 nil 则修改 response.NotFound = true。
#2 KvPreWrite
kvPreWrite 对应 2PC 的上升阶段。
在执行过程中首先需要对每个 Key 判断是否被上锁,如果上锁了就跟 KvGet 一样报告错误。
其次检查该 Key 的最后一次修改是否与当前事务冲突,即获取 MostRecentWrite 并检查区间 [StartTs, CommitTs] 是否与当前请求时间戳重合,若是则说明存在其他客户端已经发起请求并对数据进行了修改,报告错误。(要保证对同一个 Key 所有修改的时间区间 [StartTs, CommitTs] 不发生重叠)
kv/server/server.gomostRecentWrite, commitTs, err := txn.MostRecentWrite(key) if mostRecentWrite != nil && mostRecentWrite.StartTS < StartVersion && commitTs >= StartVersion { // err: conflict with another transaction response.Errors = append(response.Errors, &kvrpcpb.KeyError{ Conflict: &kvrpcpb.WriteConflict{...}, }) return response, err }
如果安全,则上锁,并根据修改类型对 CfDefault 中的数据进行修改。最后调用 server.storage.Write() 将修改落实到数据库中。
如果中途报错,会直接返回,也就不会走到 server.storage.Write 这一步,更不会对数据库进行修改。
#3 KvCommit
KvCommit 对应 2PC 的下降阶段。提交操作并不修改 CfDefault 中的数据,而是通过修改 CfLock 与 CfWrite 来标志该数据已提交。
对于每个 Key,首先检查是否重复 commit,即是否存在 CurrentWrite,若有则说明后续所有 Key 都确定被提交,直接返回即可。当然如果这个 Write 是 Rollback,那么还需要修改 Retryable 并报错。
再检查是否由当前事务上锁,即
kv/server/server.golock := getLock(txn, key) if !lock.IsLockedAt(req.GetStartVersion()) { response.Error = &kvrpcpb.KeyError{ Retryable: "Unlocked", } return response, nil } if lock.Ts != req.GetStartVersion() { response.Error = &kvrpcpb.KeyError{ Retryable: "Locked By Another Txn", } return response, nil }
如果能够安全提交,则删除之前上的锁,并在 CfWrite 下添加相应条目。最后调用 server.storage.Write() 将修改落实到数据库中。
#Part C TinyKV Server 2
#1 KvScan
KvScan 获取当前时间戳下所有有效的值。任务书建议我们用 Scanner 来包装操作,相关文件为 kv/transaction/mvcc/scan.go。
Scanner 应内置一个迭代器,并在创建时初始化至给定的StartKey 位置处。执行时不断遍历 CfDefault 并检查当前值是否有效。由于排列方式,我们对同一个 Key 最早遍历到的肯定是最新的版本,并且同一个 Key 一定会连续出现,如果
-
该 Key 之前已经被返回过(故
Scanner还需内置一个LastKey变量来记录最后一个成功返回的 Key); -
该 Key 版本晚于当前事务;
-
对应的
LastCommitWrite为 nil,说明尚未提交,或类型非Put;
则跳到下一个,否则返回当前 K/V。
#2 KvCheckTxnStatus
KvCheckTxnStatus 检查给定 PrimaryKey 的当前状态,并执行可能的改动。
-
如果存在
CurrentWrite,则无需进行改动。如果该类型非Rollback则还需告知CommitTs; -
检查是否存在
Lock,若无则回滚; -
检查
Lock是否超时,若超时则删除原有Value与Lock并回滚;
kv/server/server.golock := getLock(txn, PrimaryKey) if !lock.ExistAt(CurrentTs) { response.Action = Action_LockNotExistRollback txn.PutWrite(PrimaryKey, LockTs, &mvcc.Write{...}) } else if lock.IsExpiredAt(CurrentTs) { response.Action = Action_TTLExpireRollback txn.DeleteValue(PrimaryKey) txn.DeleteLock(PrimaryKey) txn.PutWrite(PrimaryKey, LockTs, &mvcc.Write{...}) }
#3 KvBatchRollback
KvBatchRollback 实现多键批量回滚操作。
-
如果该 Key 已被回滚,则跳过后续操作,检查下一个 Key;
-
如果该 Key 已被提交,则报错并返回;
-
如果该 Key 之前被本事务上锁,则删除原有
Value与Lock; -
添加一条
Rollback条目; -
最后调用
server.storage.Write()将修改落实到数据库中。
#4 KvResolveLock
KvResolveLock 需收集由本事务上锁的所有 Key,并根据请求中的 CommitVersion 字段执行不同操作。
-
CommitVersion = 0: 执行BatchRollback; -
CommitVersion != 0: 执行Commit;
kv/server/server.gofunc (server *Server) KvResolveLock(_ context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { ... if CommitVersion > 0 { commitResponse, err := server.KvCommit(context.TODO(), &kvrpcpb.CommitRequest{...}) response.RegionError, response.Error = commitResponse.GetRegionError(), commitResponse.GetError() return response, err } // else rollbackResponse, err := server.KvBatchRollback(context.TODO(), &kvrpcpb.BatchRollbackRequest{...}) response.RegionError, response.Error = rollbackResponse.GetRegionError(), rollbackResponse.GetError() return response, err }