From 25fd77a63b66c56b241aa41b748b9e5b4083a160 Mon Sep 17 00:00:00 2001 From: Newnius Date: Tue, 10 Apr 2018 20:50:18 +0800 Subject: [PATCH] add raft --- .gitignore | 2 + src/raft/README.md | 7 + src/raft/config.go | 426 +++++++++++++++++++ src/raft/persister.go | 61 +++ src/raft/raft.go | 564 +++++++++++++++++++++++++ src/raft/test_test.go | 929 ++++++++++++++++++++++++++++++++++++++++++ src/raft/util.go | 13 + 7 files changed, 2002 insertions(+) create mode 100644 src/raft/README.md create mode 100644 src/raft/config.go create mode 100644 src/raft/persister.go create mode 100644 src/raft/raft.go create mode 100644 src/raft/test_test.go create mode 100644 src/raft/util.go diff --git a/.gitignore b/.gitignore index a1338d6..f46854f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ # Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 .glide/ + +.idea/ diff --git a/src/raft/README.md b/src/raft/README.md new file mode 100644 index 0000000..ace5684 --- /dev/null +++ b/src/raft/README.md @@ -0,0 +1,7 @@ +# NJU-DisSys-2017 + +An implementation of Raft. Not guaranteed to be bug free. + +This is the resource repository for the course Distributed System, Fall 2017, CS@NJU. + +In Assignment 2 and Assignment 3, you should primarily focus on /src/raft/... diff --git a/src/raft/config.go b/src/raft/config.go new file mode 100644 index 0000000..fdecba0 --- /dev/null +++ b/src/raft/config.go @@ -0,0 +1,426 @@ +package raft + +// +// support for Raft tester. +// +// we will use the original config.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import "labrpc" +import "log" +import "sync" +import "testing" +import "runtime" +import crand "crypto/rand" +import "encoding/base64" +import "sync/atomic" +import "time" +import "fmt" + +func randstring(n int) string { + b := make([]byte, 2*n) + crand.Read(b) + s := base64.URLEncoding.EncodeToString(b) + return s[0:n] +} + +type config struct { + mu sync.Mutex + t *testing.T + net *labrpc.Network + n int + done int32 // tell internal threads to die + rafts []*Raft + applyErr []string // from apply channel readers + connected []bool // whether each server is on the net + saved []*Persister + endnames [][]string // the port file names each sends to + logs []map[int]int // copy of each server's committed entries +} + +func make_config(t *testing.T, n int, unreliable bool) *config { + runtime.GOMAXPROCS(4) + cfg := &config{} + cfg.t = t + cfg.net = labrpc.MakeNetwork() + cfg.n = n + cfg.applyErr = make([]string, cfg.n) + cfg.rafts = make([]*Raft, cfg.n) + cfg.connected = make([]bool, cfg.n) + cfg.saved = make([]*Persister, cfg.n) + cfg.endnames = make([][]string, cfg.n) + cfg.logs = make([]map[int]int, cfg.n) + + cfg.setunreliable(unreliable) + + cfg.net.LongDelays(true) + + // create a full set of Rafts. + for i := 0; i < cfg.n; i++ { + cfg.logs[i] = map[int]int{} + cfg.start1(i) + } + + // connect everyone + for i := 0; i < cfg.n; i++ { + cfg.connect(i) + } + + return cfg +} + +// shut down a Raft server but save its persistent state. +func (cfg *config) crash1(i int) { + cfg.disconnect(i) + cfg.net.DeleteServer(i) // disable client connections to the server. + + cfg.mu.Lock() + defer cfg.mu.Unlock() + + // a fresh persister, in case old instance + // continues to update the Persister. + // but copy old persister's content so that we always + // pass Make() the last persisted state. + if cfg.saved[i] != nil { + cfg.saved[i] = cfg.saved[i].Copy() + } + + rf := cfg.rafts[i] + if rf != nil { + cfg.mu.Unlock() + rf.Kill() + cfg.mu.Lock() + cfg.rafts[i] = nil + } + + if cfg.saved[i] != nil { + raftlog := cfg.saved[i].ReadRaftState() + cfg.saved[i] = &Persister{} + cfg.saved[i].SaveRaftState(raftlog) + } +} + +// +// start or re-start a Raft. +// if one already exists, "kill" it first. +// allocate new outgoing port file names, and a new +// state persister, to isolate previous instance of +// this server. since we cannot really kill it. +// +func (cfg *config) start1(i int) { + cfg.crash1(i) + + // a fresh set of outgoing ClientEnd names. + // so that old crashed instance's ClientEnds can't send. + cfg.endnames[i] = make([]string, cfg.n) + for j := 0; j < cfg.n; j++ { + cfg.endnames[i][j] = randstring(20) + } + + // a fresh set of ClientEnds. + ends := make([]*labrpc.ClientEnd, cfg.n) + for j := 0; j < cfg.n; j++ { + ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j]) + cfg.net.Connect(cfg.endnames[i][j], j) + } + + cfg.mu.Lock() + + // a fresh persister, so old instance doesn't overwrite + // new instance's persisted state. + // but copy old persister's content so that we always + // pass Make() the last persisted state. + if cfg.saved[i] != nil { + cfg.saved[i] = cfg.saved[i].Copy() + } else { + cfg.saved[i] = MakePersister() + } + + cfg.mu.Unlock() + + // listen to messages from Raft indicating newly committed messages. + applyCh := make(chan ApplyMsg) + go func() { + for m := range applyCh { + err_msg := "" + if m.UseSnapshot { + // ignore the snapshot + } else if v, ok := (m.Command).(int); ok { + cfg.mu.Lock() + for j := 0; j < len(cfg.logs); j++ { + if old, oldok := cfg.logs[j][m.Index]; oldok && old != v { + // some server has already committed a different value for this entry! + err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v", + m.Index, i, m.Command, j, old) + } + } + _, prevok := cfg.logs[i][m.Index-1] + cfg.logs[i][m.Index] = v + cfg.mu.Unlock() + + if m.Index > 1 && prevok == false { + err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.Index) + } + } else { + err_msg = fmt.Sprintf("committed command %v is not an int", m.Command) + } + + if err_msg != "" { + log.Fatalf("apply error: %v\n", err_msg) + cfg.applyErr[i] = err_msg + // keep reading after error so that Raft doesn't block + // holding locks... + } + } + }() + + rf := Make(ends, i, cfg.saved[i], applyCh) + + cfg.mu.Lock() + cfg.rafts[i] = rf + cfg.mu.Unlock() + + svc := labrpc.MakeService(rf) + srv := labrpc.MakeServer() + srv.AddService(svc) + cfg.net.AddServer(i, srv) +} + +func (cfg *config) cleanup() { + for i := 0; i < len(cfg.rafts); i++ { + if cfg.rafts[i] != nil { + cfg.rafts[i].Kill() + } + } + atomic.StoreInt32(&cfg.done, 1) +} + +// attach server i to the net. +func (cfg *config) connect(i int) { + // fmt.Printf("connect(%d)\n", i) + + cfg.connected[i] = true + + // outgoing ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.connected[j] { + endname := cfg.endnames[i][j] + cfg.net.Enable(endname, true) + } + } + + // incoming ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.connected[j] { + endname := cfg.endnames[j][i] + cfg.net.Enable(endname, true) + } + } +} + +// detach server i from the net. +func (cfg *config) disconnect(i int) { + // fmt.Printf("disconnect(%d)\n", i) + + cfg.connected[i] = false + + // outgoing ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.endnames[i] != nil { + endname := cfg.endnames[i][j] + cfg.net.Enable(endname, false) + } + } + + // incoming ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.endnames[j] != nil { + endname := cfg.endnames[j][i] + cfg.net.Enable(endname, false) + } + } +} + +func (cfg *config) rpcCount(server int) int { + return cfg.net.GetCount(server) +} + +func (cfg *config) setunreliable(unrel bool) { + cfg.net.Reliable(!unrel) +} + +func (cfg *config) setlongreordering(longrel bool) { + cfg.net.LongReordering(longrel) +} + +// check that there's exactly one leader. +// try a few times in case re-elections are needed. +func (cfg *config) checkOneLeader() int { + for iters := 0; iters < 10; iters++ { + time.Sleep(500 * time.Millisecond) + leaders := make(map[int][]int) + for i := 0; i < cfg.n; i++ { + if cfg.connected[i] { + if t, leader := cfg.rafts[i].GetState(); leader { + leaders[t] = append(leaders[t], i) + } + } + } + + lastTermWithLeader := -1 + for t, leaders := range leaders { + if len(leaders) > 1 { + cfg.t.Fatalf("term %d has %d (>1) leaders", t, len(leaders)) + } + if t > lastTermWithLeader { + lastTermWithLeader = t + } + } + + if len(leaders) != 0 { + return leaders[lastTermWithLeader][0] + } + } + cfg.t.Fatalf("expected one leader, got none") + return -1 +} + +// check that everyone agrees on the term. +func (cfg *config) checkTerms() int { + term := -1 + for i := 0; i < cfg.n; i++ { + if cfg.connected[i] { + xterm, _ := cfg.rafts[i].GetState() + if term == -1 { + term = xterm + } else if term != xterm { + cfg.t.Fatalf("servers disagree on term") + } + } + } + return term +} + +// check that there's no leader +func (cfg *config) checkNoLeader() { + for i := 0; i < cfg.n; i++ { + if cfg.connected[i] { + _, is_leader := cfg.rafts[i].GetState() + if is_leader { + cfg.t.Fatalf("expected no leader, but %v claims to be leader", i) + } + } + } +} + +// how many servers think a log entry is committed? +func (cfg *config) nCommitted(index int) (int, interface{}) { + count := 0 + cmd := -1 + for i := 0; i < len(cfg.rafts); i++ { + if cfg.applyErr[i] != "" { + cfg.t.Fatal(cfg.applyErr[i]) + } + + cfg.mu.Lock() + cmd1, ok := cfg.logs[i][index] + cfg.mu.Unlock() + + if ok { + if count > 0 && cmd != cmd1 { + cfg.t.Fatalf("committed values do not match: index %v, %v, %v\n", + index, cmd, cmd1) + } + count += 1 + cmd = cmd1 + } + } + return count, cmd +} + +// wait for at least n servers to commit. +// but don't wait forever. +func (cfg *config) wait(index int, n int, startTerm int) interface{} { + to := 10 * time.Millisecond + for iters := 0; iters < 30; iters++ { + nd, _ := cfg.nCommitted(index) + if nd >= n { + break + } + time.Sleep(to) + if to < time.Second { + to *= 2 + } + if startTerm > -1 { + for _, r := range cfg.rafts { + if t, _ := r.GetState(); t > startTerm { + // someone has moved on + // can no longer guarantee that we'll "win" + return -1 + } + } + } + } + nd, cmd := cfg.nCommitted(index) + if nd < n { + cfg.t.Fatalf("only %d decided for index %d; wanted %d\n", + nd, index, n) + } + return cmd +} + +// do a complete agreement. +// it might choose the wrong leader initially, +// and have to re-submit after giving up. +// entirely gives up after about 10 seconds. +// indirectly checks that the servers agree on the +// same value, since nCommitted() checks this, +// as do the threads that read from applyCh. +// returns index. +func (cfg *config) one(cmd int, expectedServers int) int { + t0 := time.Now() + starts := 0 + for time.Since(t0).Seconds() < 10 { + // try all the servers, maybe one is the leader. + index := -1 + for si := 0; si < cfg.n; si++ { + starts = (starts + 1) % cfg.n + var rf *Raft + cfg.mu.Lock() + if cfg.connected[starts] { + rf = cfg.rafts[starts] + } + cfg.mu.Unlock() + if rf != nil { + index1, _, ok := rf.Start(cmd) + if ok { + index = index1 + break + } + } + } + + if index != -1 { + // somebody claimed to be the leader and to have + // submitted our command; wait a while for agreement. + t1 := time.Now() + for time.Since(t1).Seconds() < 2 { + nd, cmd1 := cfg.nCommitted(index) + if nd > 0 && nd >= expectedServers { + // committed + if cmd2, ok := cmd1.(int); ok && cmd2 == cmd { + // and it was the command we submitted. + return index + } + } + time.Sleep(20 * time.Millisecond) + } + } else { + time.Sleep(50 * time.Millisecond) + } + } + cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) + return -1 +} diff --git a/src/raft/persister.go b/src/raft/persister.go new file mode 100644 index 0000000..379d8b9 --- /dev/null +++ b/src/raft/persister.go @@ -0,0 +1,61 @@ +package raft + +// +// support for Raft and kvraft to save persistent +// Raft state (log &c) and k/v server snapshots. +// +// we will use the original persister.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import "sync" + +type Persister struct { + mu sync.Mutex + raftstate []byte + snapshot []byte +} + +func MakePersister() *Persister { + return &Persister{} +} + +func (ps *Persister) Copy() *Persister { + ps.mu.Lock() + defer ps.mu.Unlock() + np := MakePersister() + np.raftstate = ps.raftstate + np.snapshot = ps.snapshot + return np +} + +func (ps *Persister) SaveRaftState(data []byte) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.raftstate = data +} + +func (ps *Persister) ReadRaftState() []byte { + ps.mu.Lock() + defer ps.mu.Unlock() + return ps.raftstate +} + +func (ps *Persister) RaftStateSize() int { + ps.mu.Lock() + defer ps.mu.Unlock() + return len(ps.raftstate) +} + +func (ps *Persister) SaveSnapshot(snapshot []byte) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.snapshot = snapshot +} + +func (ps *Persister) ReadSnapshot() []byte { + ps.mu.Lock() + defer ps.mu.Unlock() + return ps.snapshot +} diff --git a/src/raft/raft.go b/src/raft/raft.go new file mode 100644 index 0000000..b578df2 --- /dev/null +++ b/src/raft/raft.go @@ -0,0 +1,564 @@ +package raft + +// +// this is an outline of the API that raft must expose to +// the service (or tester). see comments below for +// each of these functions for more details. +// +// rf = Make(...) +// create a new Raft server. +// rf.Start(command interface{}) (index, term, isleader) +// start agreement on a new log entry +// rf.GetState() (term, isLeader) +// ask a Raft for its current term, and whether it thinks it is leader +// ApplyMsg +// each time a new entry is committed to the log, each Raft peer +// should send an ApplyMsg to the service (or tester) +// in the same server. +// + +import "sync" +import ( + "labrpc" + "fmt" + "time" + "bytes" + "encoding/gob" + "math/rand" + "sync/atomic" +) + +// import "bytes" +// import "encoding/gob" + + + +// +// as each Raft peer becomes aware that successive log entries are +// committed, the peer should send an ApplyMsg to the service (or +// tester) on the same server, via the applyCh passed to Make(). +// +type ApplyMsg struct { + Index int + Command interface{} + UseSnapshot bool // ignore for lab2; only used in lab3 + Snapshot []byte // ignore for lab2; only used in lab3 +} + +// +// A Go object implementing a single Raft peer. +// +type Raft struct { + mu sync.Mutex + peers []*labrpc.ClientEnd + persister *Persister + me int // index into peers[] + + applyCh chan ApplyMsg + + // Your data here. + // Look at the paper's Figure 2 for a description of what + // state a Raft server must maintain. + currentTerm int + votedFor int + logs []map[string]interface{} + + /**/ + commitIndex int + lastApplied int + + /**/ + nextIndex []int + matchIndex []int + + role int //0-follower, 1-candidate, 2-leader + + electionTimeout time.Duration + heartBeatTimeout time.Duration + + heartBeatTicker *time.Ticker + + lastHeart time.Time + + running bool + + counter int64 + + showLog bool +} + +// return currentTerm and whether this server +// believes it is the leader. +func (rf *Raft) GetState() (int, bool) { + if rf.showLog{ + fmt.Println(rf.me, "says: current state, {", rf.currentTerm, ",", rf.votedFor, ",", rf.logs ,"}") + } + return rf.currentTerm, rf.role==2 +} + +func (rf *Raft) GetState2() (bool, []map[string]interface{}, int) { + if rf.showLog{ + fmt.Println(rf.me, "says: current state, {", rf.currentTerm, ",", rf.votedFor, ",", rf.logs ,"}") + } + return rf.role==2, rf.logs, rf.commitIndex +} + +// +// save Raft's persistent state to stable storage, +// where it can later be retrieved after a crash and restart. +// see paper's Figure 2 for a description of what should be persistent. +// +func (rf *Raft) persist() { + w := new(bytes.Buffer) + e := gob.NewEncoder(w) + e.Encode(rf.currentTerm) + e.Encode(rf.votedFor) + e.Encode(rf.logs) + data := w.Bytes() + rf.persister.SaveRaftState(data) +} + +// +// restore previously persisted state. +// +func (rf *Raft) readPersist(data []byte) { + // Your code here. + r := bytes.NewBuffer(data) + d := gob.NewDecoder(r) + d.Decode(&rf.currentTerm) + d.Decode(&rf.votedFor) + d.Decode(&rf.logs) +} + + + + +// +// example RequestVote RPC arguments structure. +// +type RequestVoteArgs struct { + // Your data here. + Term int + CandidateId int + LastLogIndex int + LastLogTerm int +} + +// +// example RequestVote RPC reply structure. +// +type RequestVoteReply struct { + // Your data here. + Term int + VoteGranted bool +} + +type AppendEntriesArgs struct { + Term int + LeaderId int + PrevLogIndex int + PrevLogTerm int + Entries []map[string]interface{} + LeaderCommit int +} + +type AppendEntriesReply struct { + Term int + Success bool + Len int +} + +// +// example RequestVote RPC handler. +// +func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + if args.Term < rf.currentTerm { + reply.VoteGranted = false + if rf.showLog { + fmt.Println(rf.me, "tells", args.CandidateId, ": you are to late, highest term is", rf.currentTerm) + } + } else { + /* whatever, if higher term found, switch to follower */ + if args.Term > rf.currentTerm { + rf.role = 0 + rf.votedFor = -1 + rf.currentTerm = args.Term + if rf.showLog { + fmt.Println(rf.me, "says: higher term detected, term=", args.Term) + } + } + + /* check log first */ + lastLogTerm := 0 + if len(rf.logs) > 0 { + lastLogTerm = rf.logs[len(rf.logs) - 1]["term"].(int); + } + if ((rf.votedFor == -1 || rf.votedFor == args.CandidateId) && (lastLogTerm < args.LastLogTerm || (lastLogTerm == args.LastLogTerm && len(rf.logs) <= args.LastLogIndex) )) { + reply.VoteGranted = true + rf.role = 0 + rf.votedFor = args.CandidateId + rf.lastHeart = time.Now() + if rf.showLog { + fmt.Println(rf.me, "tells", args.CandidateId, ": vote granted") + } + } else { + reply.VoteGranted = false + if rf.showLog { + fmt.Println(rf.me, "tells", args.CandidateId, ": vote rejected") + } + } + } + rf.persist() + reply.Term = rf.currentTerm +} + +// +// example code to send a RequestVote RPC to a server. +// server is the index of the target server in rf.peers[]. +// expects RPC arguments in args. +// fills in *reply with RPC reply, so caller should +// pass &reply. +// the types of the args and reply passed to Call() must be +// the same as the types of the arguments declared in the +// handler function (including whether they are pointers). +// +// returns true if labrpc says the RPC was delivered. +// +// if you're having trouble getting RPC to work, check that you've +// capitalized all field names in structs passed over RPC, and +// that the caller passes the address of the reply struct with &, not +// the struct itself. +// +func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool { + if rf.showLog { + fmt.Println(rf.me, "tells", server, ": vote me, ", args) + } + ok := rf.peers[server].Call("Raft.RequestVote", args, reply) + if ok{ + if reply.Term > rf.currentTerm { + rf.mu.Lock() + rf.currentTerm = reply.Term + rf.role = 0 + rf.votedFor = -1 + rf.persist() + rf.mu.Unlock() + } + } + return ok && reply.VoteGranted +} + +func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + + if args.Term < rf.currentTerm { + reply.Success = false + if rf.showLog { + fmt.Println(rf.me, "tells", args.LeaderId, ": you are too late") + } + } else { + reply.Success = true + rf.currentTerm = args.Term + rf.role = 0 + rf.votedFor = args.LeaderId + + if args.PrevLogIndex > 0 { + if len(rf.logs) >= args.PrevLogIndex && rf.logs[args.PrevLogIndex-1]["term"] == args.PrevLogTerm { + reply.Success = true + } else { + reply.Success = false + reply.Len = len(rf.logs) + rf.lastHeart = time.Now() + //fmt.Println(rf.me, "tells", args.LeaderId, ": I missed something. Here is my logs,", rf.logs) + } + } + } + reply.Term = rf.currentTerm + if reply.Success { + rf.logs = rf.logs[0:args.PrevLogIndex] + //sync logs && apply + rf.logs = append(rf.logs, args.Entries...) + for iter:=rf.commitIndex; iter < args.LeaderCommit; iter++ { + command := rf.logs[iter]["command"] + if rf.showLog { + fmt.Println(rf.me, "says: commit", command, "index=", iter+1) + } + rf.applyCh <- ApplyMsg{Index: iter + 1, Command:command, UseSnapshot:false, } + } + rf.commitIndex = args.LeaderCommit + //rf.lastApplied = len(rf.logs) + rf.lastHeart = time.Now() + } + rf.persist() + if rf.showLog { + fmt.Println(rf.me, "tells", args.LeaderId, ": pong,", reply) + } +} + +func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool { + if rf.showLog { + fmt.Println(rf.me, "tells", server, ": ping,", args) + } + ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) + if ok { + if reply.Term > rf.currentTerm { + rf.mu.Lock() + rf.currentTerm = reply.Term + rf.role = 0 + rf.votedFor = -1 + rf.persist() + rf.mu.Unlock() + } + if ok && rf.role == 2 && !reply.Success { + //if args.PrevLogIndex > 0 { + rf.nextIndex[server] = args.PrevLogIndex + if reply.Len < args.PrevLogIndex { + rf.nextIndex[server] = reply.Len + 1 + } + //use snapshot + rf.nextIndex[server] = 1 + //} + if rf.showLog { + fmt.Println(rf.me, "says: decrease nextindex of", server, "to", rf.nextIndex[server]) + } + } + } + return ok && reply.Success +} + + +// +// the service using Raft (e.g. a k/v server) wants to start +// agreement on the next command to be appended to Raft's log. if this +// server isn't the leader, returns false. otherwise start the +// agreement and return immediately. there is no guarantee that this +// command will ever be committed to the Raft log, since the leader +// may fail or lose an election. +// +// the first return value is the index that the command will appear at +// if it's ever committed. the second return value is the current +// term. the third return value is true if this server believes it is +// the leader. +// +func (rf *Raft) Start(command interface{}) (int, int, bool) { + rf.mu.Lock() + defer rf.mu.Unlock() + index := len(rf.logs) + 1 + if rf.role == 2 { + log := map[string]interface{}{"command": command, "term": rf.currentTerm} + rf.logs = append(rf.logs, log) + rf.persist() + if rf.showLog { + fmt.Println(rf.me, "says:", "new command", command, "in term", rf.currentTerm) + } + } else { + //fmt.Println(rf.me, "says: I am not a leader") + } + return index, rf.currentTerm, rf.role==2 +} + +// +// the tester calls Kill() when a Raft instance won't +// be needed again. you are not required to do anything +// in Kill(), but it might be convenient to (for example) +// turn off debug output from this instance. +// +func (rf *Raft) Kill() { + /* stop the world */ + rf.running = false +} + +// +// the service or tester wants to create a Raft server. the ports +// of all the Raft servers (including this one) are in peers[]. this +// server's port is peers[me]. all the servers' peers[] arrays +// have the same order. persister is a place for this server to +// save its persistent state, and also initially holds the most +// recent saved state, if any. applyCh is a channel on which the +// tester or service expects Raft to send ApplyMsg messages. +// Make() must return quickly, so it should start goroutines +// for any long-running work. +// +func Make(peers []*labrpc.ClientEnd, me int, + persister *Persister, applyCh chan ApplyMsg) *Raft { + + fmt.Println(me, "says:", "hello world!") + + rf := &Raft{} + rf.peers = peers + rf.persister = persister + rf.me = me + + rf.applyCh = applyCh + rf.running = true + + rf.showLog = false + + // initialize from state persisted before a crash + rf.readPersist(persister.ReadRaftState()) + + // Your initialization code here. + rf.electionTimeout = time.Duration(rand.Intn(100) + 100) * time.Millisecond + rf.heartBeatTimeout = time.Duration(rand.Intn(50) + 50) * time.Millisecond + rf.counter = 0 + rf.lastHeart = time.Now() + rf.heartBeatTicker = time.NewTicker(rf.heartBeatTimeout) + + rf.role = 0 //start in follower state + + rf.commitIndex = 0 + rf.lastApplied = 0 + + + go func() { + for ;;{ + if !rf.running { + break + } + /* wait timeout */ + time.Sleep(rf.electionTimeout - time.Since(rf.lastHeart)) + + if rf.role != 2 && time.Since(rf.lastHeart) >= rf.electionTimeout { + rf.mu.Lock() + // re-generate time + rf.electionTimeout = time.Duration(rand.Intn(100) + 100) * time.Millisecond + rf.currentTerm ++ + rf.votedFor = rf.me + rf.role = 1 + rf.persist() + rf.mu.Unlock() + rf.doVote() + } + /* sleep at most electionTimeout duration */ + if time.Since(rf.lastHeart) >= rf.electionTimeout { + rf.lastHeart = time.Now() + } + } + fmt.Println(rf.me, "says: bye~") + }() + return rf +} + +func (rf *Raft) doVote() { + rf.mu.Lock() + defer rf.mu.Unlock() + + var agreed int64 = 1 + index := len(rf.logs) + term := 0 + if index != 0 { + term = rf.logs[index - 1]["term"].(int) + } + for i:=0;i< len(rf.peers);i++{ + if i != rf.me { + go func(peer int, currTerm int, index int, term int) { + args := RequestVoteArgs{Term: currTerm, CandidateId:rf.me, LastLogIndex:index, LastLogTerm:term} + reply := RequestVoteReply{} + ok := rf.sendRequestVote(peer, args, &reply) + rf.mu.Lock() + if ok && args.Term == rf.currentTerm && rf.role == 1{ + atomic.AddInt64(&agreed, 1) + if (int(agreed) * 2 > len(rf.peers)) { + rf.role = 2 + + rf.nextIndex = rf.nextIndex[0:0] + rf.matchIndex = rf.matchIndex[0:0] + for i:=0;i 1 { + atomic.AddInt64(&rf.counter, -1) + return + } + for range rf.heartBeatTicker.C { + if !rf.running { + break + } + if rf.role != 2 { + break + } + for i:=0;i< len(rf.peers);i++{ + if i != rf.me { + go func(peer int) { + rf.mu.Lock() + index := rf.nextIndex[peer] + term := 0 + /* TODO: limit entries max size */ + entries := make([]map[string]interface{}, 0) + if len(rf.logs) >= index { + entries = append(entries, rf.logs[index-1:]...) + } + if index > 1 { + //fmt.Println(index, rf.logs) + term = rf.logs[index - 2]["term"].(int) + } + args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId:rf.me, PrevLogIndex:index - 1, PrevLogTerm:term, Entries:entries, LeaderCommit:rf.commitIndex} + reply := AppendEntriesReply{} + rf.nextIndex[peer] = args.PrevLogIndex + len(entries) + 1 + rf.mu.Unlock() + + ok := rf.sendAppendEntries(peer, args, &reply) + + rf.mu.Lock() + if ok && args.Term == rf.currentTerm && rf.role == 2 { + /* TODO, think of package re-ordering, resulting previous log entries lose actually */ + rf.matchIndex[peer] = args.PrevLogIndex + len(entries) + /* update commitIndex */ + for iter:=rf.commitIndex; iter iter { + count++ + } + //fmt.Println(j, rf.matchIndex[j], count) + } + } + + if count * 2 > len(rf.peers){ + for i:=rf.commitIndex; i<=iter; i++ { + rf.commitIndex = i + 1 + command := rf.logs[i]["command"] + if rf.showLog { + fmt.Println(rf.me, "says: ", command, "is committed, index=", i + 1) + } + rf.applyCh <- ApplyMsg{Index: i + 1, Command: command, UseSnapshot:false, } + } + } else { // commit in order + break + } + } + } + rf.mu.Unlock() + }(i) + } + } + } + if rf.showLog { + fmt.Println(rf.me, "says: stop heart beat") + } + atomic.AddInt64(&rf.counter, -1) +} diff --git a/src/raft/test_test.go b/src/raft/test_test.go new file mode 100644 index 0000000..75d3807 --- /dev/null +++ b/src/raft/test_test.go @@ -0,0 +1,929 @@ +package raft + +// +// Raft tests. +// +// we will use the original test_test.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import "testing" +import "fmt" +import "time" +import "math/rand" +import "sync/atomic" +import "sync" + +// The tester generously allows solutions to complete elections in one second +// (much more than the paper's range of timeouts). +const RaftElectionTimeout = 1000 * time.Millisecond + +func TestInitialElection(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: initial election ...\n") + + // is a leader elected? + cfg.checkOneLeader() + + // does the leader+term stay the same there is no failure? + term1 := cfg.checkTerms() + time.Sleep(2 * RaftElectionTimeout) + term2 := cfg.checkTerms() + if term1 != term2 { + fmt.Printf("warning: term changed even though there were no failures") + } + + fmt.Printf(" ... Passed\n") +} + +func TestReElection(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: election after network failure ...\n") + + leader1 := cfg.checkOneLeader() + + // if the leader disconnects, a new one should be elected. + cfg.disconnect(leader1) + cfg.checkOneLeader() + + // if the old leader rejoins, that shouldn't + // disturb the old leader. + cfg.connect(leader1) + leader2 := cfg.checkOneLeader() + + // if there's no quorum, no leader should + // be elected. + cfg.disconnect(leader2) + cfg.disconnect((leader2 + 1) % servers) + time.Sleep(2 * RaftElectionTimeout) + cfg.checkNoLeader() + + // if a quorum arises, it should elect a leader. + cfg.connect((leader2 + 1) % servers) + cfg.checkOneLeader() + + // re-join of last node shouldn't prevent leader from existing. + cfg.connect(leader2) + cfg.checkOneLeader() + + fmt.Printf(" ... Passed\n") +} + +func TestBasicAgree(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: basic agreement ...\n") + + iters := 3 + for index := 1; index < iters+1; index++ { + nd, _ := cfg.nCommitted(index) + if nd > 0 { + t.Fatalf("some have committed before Start()") + } + + xindex := cfg.one(index*100, servers) + if xindex != index { + t.Fatalf("got index %v but expected %v", xindex, index) + } + } + + fmt.Printf(" ... Passed\n") +} + +func TestFailAgree(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: agreement despite follower failure ...\n") + + cfg.one(101, servers) + + // follower network failure + leader := cfg.checkOneLeader() + cfg.disconnect((leader + 1) % servers) + + // agree despite one failed server? + cfg.one(102, servers-1) + cfg.one(103, servers-1) + time.Sleep(RaftElectionTimeout) + cfg.one(104, servers-1) + cfg.one(105, servers-1) + + // failed server re-connected + cfg.connect((leader + 1) % servers) + + // agree with full set of servers? + cfg.one(106, servers) + time.Sleep(RaftElectionTimeout) + cfg.one(107, servers) + + fmt.Printf(" ... Passed\n") +} + +func TestFailNoAgree(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: no agreement if too many followers fail ...\n") + + cfg.one(10, servers) + + // 3 of 5 followers disconnect + leader := cfg.checkOneLeader() + cfg.disconnect((leader + 1) % servers) + cfg.disconnect((leader + 2) % servers) + cfg.disconnect((leader + 3) % servers) + + index, _, ok := cfg.rafts[leader].Start(20) + if ok != true { + t.Fatalf("leader rejected Start()") + } + if index != 2 { + t.Fatalf("expected index 2, got %v", index) + } + + time.Sleep(2 * RaftElectionTimeout) + + n, _ := cfg.nCommitted(index) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + + // repair failures + cfg.connect((leader + 1) % servers) + cfg.connect((leader + 2) % servers) + cfg.connect((leader + 3) % servers) + + // the disconnected majority may have chosen a leader from + // among their own ranks, forgetting index 2. + // or perhaps + leader2 := cfg.checkOneLeader() + index2, _, ok2 := cfg.rafts[leader2].Start(30) + if ok2 == false { + t.Fatalf("leader2 rejected Start()") + } + if index2 < 2 || index2 > 3 { + t.Fatalf("unexpected index %v", index2) + } + + cfg.one(1000, servers) + + fmt.Printf(" ... Passed\n") +} + +func TestConcurrentStarts(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: concurrent Start()s ...\n") + + var success bool +loop: + for try := 0; try < 5; try++ { + if try > 0 { + // give solution some time to settle + time.Sleep(3 * time.Second) + } + + leader := cfg.checkOneLeader() + _, term, ok := cfg.rafts[leader].Start(1) + if !ok { + // leader moved on really quickly + continue + } + + iters := 5 + var wg sync.WaitGroup + is := make(chan int, iters) + for ii := 0; ii < iters; ii++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + i, term1, ok := cfg.rafts[leader].Start(100 + i) + if term1 != term { + return + } + if ok != true { + return + } + is <- i + }(ii) + } + + wg.Wait() + close(is) + + for j := 0; j < servers; j++ { + if t, _ := cfg.rafts[j].GetState(); t != term { + // term changed -- can't expect low RPC counts + continue loop + } + } + + failed := false + cmds := []int{} + for index := range is { + cmd := cfg.wait(index, servers, term) + if ix, ok := cmd.(int); ok { + if ix == -1 { + // peers have moved on to later terms + // so we can't expect all Start()s to + // have succeeded + failed = true + break + } + cmds = append(cmds, ix) + } else { + t.Fatalf("value %v is not an int", cmd) + } + } + + if failed { + // avoid leaking goroutines + go func() { + for range is { + } + }() + continue + } + + for ii := 0; ii < iters; ii++ { + x := 100 + ii + ok := false + for j := 0; j < len(cmds); j++ { + if cmds[j] == x { + ok = true + } + } + if ok == false { + t.Fatalf("cmd %v missing in %v", x, cmds) + } + } + + success = true + break + } + + if !success { + t.Fatalf("term changed too often") + } + + fmt.Printf(" ... Passed\n") +} + +func TestRejoin(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: rejoin of partitioned leader ...\n") + + cfg.one(101, servers) + + // leader network failure + leader1 := cfg.checkOneLeader() + cfg.disconnect(leader1) + + // make old leader try to agree on some entries + cfg.rafts[leader1].Start(102) + cfg.rafts[leader1].Start(103) + cfg.rafts[leader1].Start(104) + + // new leader commits, also for index=2 + cfg.one(103, 2) + + // new leader network failure + leader2 := cfg.checkOneLeader() + cfg.disconnect(leader2) + + // old leader connected again + cfg.connect(leader1) + + cfg.one(104, 2) + + // all together now + cfg.connect(leader2) + + cfg.one(105, servers) + + fmt.Printf(" ... Passed\n") +} + +func TestBackup(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: leader backs up quickly over incorrect follower logs ...\n") + + cfg.one(rand.Int(), servers) + + // put leader and one follower in a partition + leader1 := cfg.checkOneLeader() + cfg.disconnect((leader1 + 2) % servers) + cfg.disconnect((leader1 + 3) % servers) + cfg.disconnect((leader1 + 4) % servers) + + // submit lots of commands that won't commit + for i := 0; i < 50; i++ { + cfg.rafts[leader1].Start(rand.Int()) + } + + time.Sleep(RaftElectionTimeout / 2) + + cfg.disconnect((leader1 + 0) % servers) + cfg.disconnect((leader1 + 1) % servers) + + // allow other partition to recover + cfg.connect((leader1 + 2) % servers) + cfg.connect((leader1 + 3) % servers) + cfg.connect((leader1 + 4) % servers) + + // lots of successful commands to new group. + for i := 0; i < 50; i++ { + cfg.one(rand.Int(), 3) + } + + // now another partitioned leader and one follower + leader2 := cfg.checkOneLeader() + other := (leader1 + 2) % servers + if leader2 == other { + other = (leader2 + 1) % servers + } + cfg.disconnect(other) + + // lots more commands that won't commit + for i := 0; i < 50; i++ { + cfg.rafts[leader2].Start(rand.Int()) + } + + time.Sleep(RaftElectionTimeout / 2) + + // bring original leader back to life, + for i := 0; i < servers; i++ { + cfg.disconnect(i) + } + cfg.connect((leader1 + 0) % servers) + cfg.connect((leader1 + 1) % servers) + cfg.connect(other) + + // lots of successful commands to new group. + for i := 0; i < 50; i++ { + cfg.one(rand.Int(), 3) + } + + // now everyone + for i := 0; i < servers; i++ { + cfg.connect(i) + } + cfg.one(rand.Int(), servers) + + fmt.Printf(" ... Passed\n") +} + +func TestCount(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: RPC counts aren't too high ...\n") + + rpcs := func() (n int) { + for j := 0; j < servers; j++ { + n += cfg.rpcCount(j) + } + return + } + + leader := cfg.checkOneLeader() + + total1 := rpcs() + + if total1 > 30 || total1 < 1 { + t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1) + } + + var total2 int + var success bool +loop: + for try := 0; try < 5; try++ { + if try > 0 { + // give solution some time to settle + time.Sleep(3 * time.Second) + } + + leader = cfg.checkOneLeader() + total1 = rpcs() + + iters := 10 + starti, term, ok := cfg.rafts[leader].Start(1) + if !ok { + // leader moved on really quickly + continue + } + cmds := []int{} + for i := 1; i < iters+2; i++ { + x := int(rand.Int31()) + cmds = append(cmds, x) + index1, term1, ok := cfg.rafts[leader].Start(x) + if term1 != term { + // Term changed while starting + continue loop + } + if !ok { + // No longer the leader, so term has changed + continue loop + } + if starti+i != index1 { + t.Fatalf("Start() failed") + } + } + + for i := 1; i < iters+1; i++ { + cmd := cfg.wait(starti+i, servers, term) + if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] { + if ix == -1 { + // term changed -- try again + continue loop + } + t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds) + } + } + + failed := false + total2 = 0 + for j := 0; j < servers; j++ { + if t, _ := cfg.rafts[j].GetState(); t != term { + // term changed -- can't expect low RPC counts + // need to keep going to update total2 + failed = true + } + total2 += cfg.rpcCount(j) + } + + if failed { + continue loop + } + + if total2-total1 > (iters+1+3)*3 { + t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters) + } + + success = true + break + } + + if !success { + t.Fatalf("term changed too often") + } + + time.Sleep(RaftElectionTimeout) + + total3 := 0 + for j := 0; j < servers; j++ { + total3 += cfg.rpcCount(j) + } + + if total3-total2 > 3*20 { + t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2) + } + + fmt.Printf(" ... Passed\n") +} + +func TestPersist1(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: basic persistence ...\n") + + cfg.one(11, servers) + + // crash and re-start all + for i := 0; i < servers; i++ { + cfg.start1(i) + } + for i := 0; i < servers; i++ { + cfg.disconnect(i) + cfg.connect(i) + } + + cfg.one(12, servers) + + leader1 := cfg.checkOneLeader() + cfg.disconnect(leader1) + cfg.start1(leader1) + cfg.connect(leader1) + + cfg.one(13, servers) + + leader2 := cfg.checkOneLeader() + cfg.disconnect(leader2) + cfg.one(14, servers-1) + cfg.start1(leader2) + cfg.connect(leader2) + + cfg.wait(4, servers, -1) // wait for leader2 to join before killing i3 + + i3 := (cfg.checkOneLeader() + 1) % servers + cfg.disconnect(i3) + cfg.one(15, servers-1) + cfg.start1(i3) + cfg.connect(i3) + + cfg.one(16, servers) + + fmt.Printf(" ... Passed\n") +} + +func TestPersist2(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: more persistence ...\n") + + index := 1 + for iters := 0; iters < 5; iters++ { + cfg.one(10+index, servers) + index++ + + leader1 := cfg.checkOneLeader() + + cfg.disconnect((leader1 + 1) % servers) + cfg.disconnect((leader1 + 2) % servers) + + cfg.one(10+index, servers-2) + index++ + + cfg.disconnect((leader1 + 0) % servers) + cfg.disconnect((leader1 + 3) % servers) + cfg.disconnect((leader1 + 4) % servers) + + cfg.start1((leader1 + 1) % servers) + cfg.start1((leader1 + 2) % servers) + cfg.connect((leader1 + 1) % servers) + cfg.connect((leader1 + 2) % servers) + + time.Sleep(RaftElectionTimeout) + + cfg.start1((leader1 + 3) % servers) + cfg.connect((leader1 + 3) % servers) + + cfg.one(10+index, servers-2) + index++ + + cfg.connect((leader1 + 4) % servers) + cfg.connect((leader1 + 0) % servers) + } + + cfg.one(1000, servers) + + fmt.Printf(" ... Passed\n") +} + +func TestPersist3(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: partitioned leader and one follower crash, leader restarts ...\n") + + cfg.one(101, 3) + + leader := cfg.checkOneLeader() + cfg.disconnect((leader + 2) % servers) + + cfg.one(102, 2) + + cfg.crash1((leader + 0) % servers) + cfg.crash1((leader + 1) % servers) + cfg.connect((leader + 2) % servers) + cfg.start1((leader + 0) % servers) + cfg.connect((leader + 0) % servers) + + cfg.one(103, 2) + + cfg.start1((leader + 1) % servers) + cfg.connect((leader + 1) % servers) + + cfg.one(104, servers) + + fmt.Printf(" ... Passed\n") +} + +// +// Test the scenarios described in Figure 8 of the extended Raft paper. Each +// iteration asks a leader, if there is one, to insert a command in the Raft +// log. If there is a leader, that leader will fail quickly with a high +// probability (perhaps without committing the command), or crash after a while +// with low probability (most likey committing the command). If the number of +// alive servers isn't enough to form a majority, perhaps start a new server. +// The leader in a new term may try to finish replicating log entries that +// haven't been committed yet. +// +func TestFigure8(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + fmt.Printf("Test: Figure 8 ...\n") + + cfg.one(rand.Int(), 1) + + nup := servers + for iters := 0; iters < 1000; iters++ { + leader := -1 + for i := 0; i < servers; i++ { + if cfg.rafts[i] != nil { + _, _, ok := cfg.rafts[i].Start(rand.Int()) + if ok { + leader = i + } + } + } + + if (rand.Int() % 1000) < 100 { + ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2) + time.Sleep(time.Duration(ms) * time.Millisecond) + } else { + ms := (rand.Int63() % 13) + time.Sleep(time.Duration(ms) * time.Millisecond) + } + + if leader != -1 { + cfg.crash1(leader) + nup -= 1 + } + + if nup < 3 { + s := rand.Int() % servers + if cfg.rafts[s] == nil { + cfg.start1(s) + cfg.connect(s) + nup += 1 + } + } + } + + for i := 0; i < servers; i++ { + if cfg.rafts[i] == nil { + cfg.start1(i) + cfg.connect(i) + } + } + + cfg.one(rand.Int(), servers) + + fmt.Printf(" ... Passed\n") +} + +func TestUnreliableAgree(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, true) + defer cfg.cleanup() + + fmt.Printf("Test: unreliable agreement ...\n") + + var wg sync.WaitGroup + + for iters := 1; iters < 50; iters++ { + for j := 0; j < 4; j++ { + wg.Add(1) + go func(iters, j int) { + defer wg.Done() + cfg.one((100*iters)+j, 1) + }(iters, j) + } + cfg.one(iters, 1) + } + + cfg.setunreliable(false) + + wg.Wait() + + cfg.one(100, servers) + + fmt.Printf(" ... Passed\n") +} + +func TestFigure8Unreliable(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, true) + defer cfg.cleanup() + + fmt.Printf("Test: Figure 8 (unreliable) ...\n") + + cfg.one(rand.Int()%10000, 1) + + nup := servers + for iters := 0; iters < 1000; iters++ { + if iters == 200 { + cfg.setlongreordering(true) + } + leader := -1 + for i := 0; i < servers; i++ { + _, _, ok := cfg.rafts[i].Start(rand.Int() % 10000) + if ok && cfg.connected[i] { + leader = i + } + } + + if (rand.Int() % 1000) < 100 { + ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2) + time.Sleep(time.Duration(ms) * time.Millisecond) + } else { + ms := (rand.Int63() % 13) + time.Sleep(time.Duration(ms) * time.Millisecond) + } + + if leader != -1 && (rand.Int()%1000) < int(RaftElectionTimeout/time.Millisecond)/2 { + cfg.disconnect(leader) + nup -= 1 + } + + if nup < 3 { + s := rand.Int() % servers + if cfg.connected[s] == false { + cfg.connect(s) + nup += 1 + } + } + } + + for i := 0; i < servers; i++ { + if cfg.connected[i] == false { + cfg.connect(i) + } + } + + cfg.one(rand.Int()%10000, servers) + + fmt.Printf(" ... Passed\n") +} + +func internalChurn(t *testing.T, unreliable bool) { + + if unreliable { + fmt.Printf("Test: unreliable churn ...\n") + } else { + fmt.Printf("Test: churn ...\n") + } + + servers := 5 + cfg := make_config(t, servers, unreliable) + defer cfg.cleanup() + + stop := int32(0) + + // create concurrent clients + cfn := func(me int, ch chan []int) { + var ret []int + ret = nil + defer func() { ch <- ret }() + values := []int{} + for atomic.LoadInt32(&stop) == 0 { + x := rand.Int() + index := -1 + ok := false + for i := 0; i < servers; i++ { + // try them all, maybe one of them is a leader + cfg.mu.Lock() + rf := cfg.rafts[i] + cfg.mu.Unlock() + if rf != nil { + index1, _, ok1 := rf.Start(x) + if ok1 { + ok = ok1 + index = index1 + } + } + } + if ok { + // maybe leader will commit our value, maybe not. + // but don't wait forever. + for _, to := range []int{10, 20, 50, 100, 200} { + nd, cmd := cfg.nCommitted(index) + if nd > 0 { + if xx, ok := cmd.(int); ok { + if xx == x { + values = append(values, x) + } + } else { + cfg.t.Fatalf("wrong command type") + } + break + } + time.Sleep(time.Duration(to) * time.Millisecond) + } + } else { + time.Sleep(time.Duration(79+me*17) * time.Millisecond) + } + } + ret = values + } + + ncli := 3 + cha := []chan []int{} + for i := 0; i < ncli; i++ { + cha = append(cha, make(chan []int)) + go cfn(i, cha[i]) + } + + for iters := 0; iters < 20; iters++ { + if (rand.Int() % 1000) < 200 { + i := rand.Int() % servers + cfg.disconnect(i) + } + + if (rand.Int() % 1000) < 500 { + i := rand.Int() % servers + if cfg.rafts[i] == nil { + cfg.start1(i) + } + cfg.connect(i) + } + + if (rand.Int() % 1000) < 200 { + i := rand.Int() % servers + if cfg.rafts[i] != nil { + cfg.crash1(i) + } + } + + // Make crash/restart infrequent enough that the peers can often + // keep up, but not so infrequent that everything has settled + // down from one change to the next. Pick a value smaller than + // the election timeout, but not hugely smaller. + time.Sleep((RaftElectionTimeout * 7) / 10) + } + + time.Sleep(RaftElectionTimeout) + cfg.setunreliable(false) + for i := 0; i < servers; i++ { + if cfg.rafts[i] == nil { + cfg.start1(i) + } + cfg.connect(i) + } + + atomic.StoreInt32(&stop, 1) + + values := []int{} + for i := 0; i < ncli; i++ { + vv := <-cha[i] + if vv == nil { + t.Fatal("client failed") + } + values = append(values, vv...) + } + + time.Sleep(RaftElectionTimeout) + + lastIndex := cfg.one(rand.Int(), servers) + + really := make([]int, lastIndex+1) + for index := 1; index <= lastIndex; index++ { + v := cfg.wait(index, servers, -1) + if vi, ok := v.(int); ok { + really = append(really, vi) + } else { + t.Fatalf("not an int") + } + } + + for _, v1 := range values { + ok := false + for _, v2 := range really { + if v1 == v2 { + ok = true + } + } + if ok == false { + cfg.t.Fatalf("didn't find a value") + } + } + + fmt.Printf(" ... Passed\n") +} + +func TestReliableChurn(t *testing.T) { + internalChurn(t, false) +} + +func TestUnreliableChurn(t *testing.T) { + internalChurn(t, true) +} diff --git a/src/raft/util.go b/src/raft/util.go new file mode 100644 index 0000000..d77e71f --- /dev/null +++ b/src/raft/util.go @@ -0,0 +1,13 @@ +package raft + +import "log" + +// Debugging +const Debug = 0 + +func DPrintf(format string, a ...interface{}) (n int, err error) { + if Debug > 0 { + log.Printf(format, a...) + } + return +}