Github
调库可比造轮子简单多了
结构体设计
RPC 设计
首先考虑四种 RPC
结构体的设计
因为要容错, 所以根据建议, 为每个客户端分配一个 64 位的大整数
每个客户端还要配备一个 SeqId
从而给命令编号
PutAppendArgs
Op
表示当前操作是 Put
还是 Append
Value
, Key
ClientId
和 SeqId
表示客户端 Id 和命令 Id
PutAppendReply
: Err
GetArgs
: Key
, ClientId
, SeqId
同上
GetReply
:
Server
设计
chans
记录每个日志下标 对应的 channel
client2seq
记录每个客户端对应的, 已经应用的 SeqId
keyValue
存储键值对
lastApplied
: 记录上次快照的日志下标
Client
设计
加入 ClientId
和 SeqId
即可
核心代码
客户端发送 RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 n := len (ck.servers) for { for si := 0 ; si < n; si++ { srv := ck.servers[si] var reply GetReply ok := srv.Call("KVServer.Get" , &args, &reply) if ok && (reply.Err == OK || reply.Err == ErrNoKey) { return reply.Value } } time.Sleep(50 * time.Millisecond) }
客户端处理 RPC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { oldOp := Op{"GET" , args.ClientId, args.SeqId, args.Key, "" } index, _, isLeader := kv.rf.Start(oldOp) if !isLeader || kv.killed() { return } ch := kv.getCh(index) select { case newOp := <-ch: if newOp.ClientId != oldOp.ClientId || newOp.SeqId != oldOp.SeqId { reply.Err = ErrWrongLeader } else { reply.Err = OK kv.mu.Lock() reply.Value = kv.keyvalue[args.Key] kv.mu.Unlock() } case <-time.After(100 * time.Millisecond): reply.Err = ErrWrongLeader } kv.mu.Lock() delete (kv.chans, index) kv.mu.Unlock() }
同时在 StartKVServer
中 go
一个协程
至于快照就只需要保存 client2seq
和 keyValue
就可以
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func (kv *KVServer) getLog() { for !kv.killed() { entry := <-kv.applyCh if entry.CommandValid { op := entry.Command.(Op) if kv.exist(op.ClientId, op.SeqId) { kv.mu.Lock() if op.Name == "Put" { kv.keyvalue[op.Key] = op.Value } else if op.Name == "Append" { kv.keyvalue[op.Key] += op.Value } kv.client2seq[op.ClientId] = op.SeqId kv.mu.Unlock() } if kv.maxraftstate != -1 && kv.rf.Persister.RaftStateSize() > kv.maxraftstate { kv.rf.Snapshot(entry.CommandIndex, kv.makeSnapshot()) } kv.getCh(entry.CommandIndex) <- op } else if entry.SnapshotValid { kv.mu.Lock() if entry.SnapshotIndex > kv.lastApplied { kv.decodeSnapshot(entry.Snapshot) kv.lastApplied = entry.SnapshotIndex } kv.mu.Unlock() } } }