add raft
This commit is contained in:
parent
2de1a9f6f7
commit
25fd77a63b
2
.gitignore
vendored
2
.gitignore
vendored
@ -12,3 +12,5 @@
|
||||
|
||||
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
|
||||
.glide/
|
||||
|
||||
.idea/
|
||||
|
7
src/raft/README.md
Normal file
7
src/raft/README.md
Normal file
@ -0,0 +1,7 @@
|
||||
# NJU-DisSys-2017
|
||||
|
||||
An implementation of Raft. Not guaranteed to be bug free.
|
||||
|
||||
This is the resource repository for the course Distributed System, Fall 2017, CS@NJU.
|
||||
|
||||
In Assignment 2 and Assignment 3, you should primarily focus on /src/raft/...
|
426
src/raft/config.go
Normal file
426
src/raft/config.go
Normal file
@ -0,0 +1,426 @@
|
||||
package raft
|
||||
|
||||
//
|
||||
// support for Raft tester.
|
||||
//
|
||||
// we will use the original config.go to test your code for grading.
|
||||
// so, while you can modify this code to help you debug, please
|
||||
// test with the original before submitting.
|
||||
//
|
||||
|
||||
import "labrpc"
|
||||
import "log"
|
||||
import "sync"
|
||||
import "testing"
|
||||
import "runtime"
|
||||
import crand "crypto/rand"
|
||||
import "encoding/base64"
|
||||
import "sync/atomic"
|
||||
import "time"
|
||||
import "fmt"
|
||||
|
||||
func randstring(n int) string {
|
||||
b := make([]byte, 2*n)
|
||||
crand.Read(b)
|
||||
s := base64.URLEncoding.EncodeToString(b)
|
||||
return s[0:n]
|
||||
}
|
||||
|
||||
type config struct {
|
||||
mu sync.Mutex
|
||||
t *testing.T
|
||||
net *labrpc.Network
|
||||
n int
|
||||
done int32 // tell internal threads to die
|
||||
rafts []*Raft
|
||||
applyErr []string // from apply channel readers
|
||||
connected []bool // whether each server is on the net
|
||||
saved []*Persister
|
||||
endnames [][]string // the port file names each sends to
|
||||
logs []map[int]int // copy of each server's committed entries
|
||||
}
|
||||
|
||||
func make_config(t *testing.T, n int, unreliable bool) *config {
|
||||
runtime.GOMAXPROCS(4)
|
||||
cfg := &config{}
|
||||
cfg.t = t
|
||||
cfg.net = labrpc.MakeNetwork()
|
||||
cfg.n = n
|
||||
cfg.applyErr = make([]string, cfg.n)
|
||||
cfg.rafts = make([]*Raft, cfg.n)
|
||||
cfg.connected = make([]bool, cfg.n)
|
||||
cfg.saved = make([]*Persister, cfg.n)
|
||||
cfg.endnames = make([][]string, cfg.n)
|
||||
cfg.logs = make([]map[int]int, cfg.n)
|
||||
|
||||
cfg.setunreliable(unreliable)
|
||||
|
||||
cfg.net.LongDelays(true)
|
||||
|
||||
// create a full set of Rafts.
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
cfg.logs[i] = map[int]int{}
|
||||
cfg.start1(i)
|
||||
}
|
||||
|
||||
// connect everyone
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
cfg.connect(i)
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
// shut down a Raft server but save its persistent state.
|
||||
func (cfg *config) crash1(i int) {
|
||||
cfg.disconnect(i)
|
||||
cfg.net.DeleteServer(i) // disable client connections to the server.
|
||||
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
// a fresh persister, in case old instance
|
||||
// continues to update the Persister.
|
||||
// but copy old persister's content so that we always
|
||||
// pass Make() the last persisted state.
|
||||
if cfg.saved[i] != nil {
|
||||
cfg.saved[i] = cfg.saved[i].Copy()
|
||||
}
|
||||
|
||||
rf := cfg.rafts[i]
|
||||
if rf != nil {
|
||||
cfg.mu.Unlock()
|
||||
rf.Kill()
|
||||
cfg.mu.Lock()
|
||||
cfg.rafts[i] = nil
|
||||
}
|
||||
|
||||
if cfg.saved[i] != nil {
|
||||
raftlog := cfg.saved[i].ReadRaftState()
|
||||
cfg.saved[i] = &Persister{}
|
||||
cfg.saved[i].SaveRaftState(raftlog)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// start or re-start a Raft.
|
||||
// if one already exists, "kill" it first.
|
||||
// allocate new outgoing port file names, and a new
|
||||
// state persister, to isolate previous instance of
|
||||
// this server. since we cannot really kill it.
|
||||
//
|
||||
func (cfg *config) start1(i int) {
|
||||
cfg.crash1(i)
|
||||
|
||||
// a fresh set of outgoing ClientEnd names.
|
||||
// so that old crashed instance's ClientEnds can't send.
|
||||
cfg.endnames[i] = make([]string, cfg.n)
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
cfg.endnames[i][j] = randstring(20)
|
||||
}
|
||||
|
||||
// a fresh set of ClientEnds.
|
||||
ends := make([]*labrpc.ClientEnd, cfg.n)
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
|
||||
cfg.net.Connect(cfg.endnames[i][j], j)
|
||||
}
|
||||
|
||||
cfg.mu.Lock()
|
||||
|
||||
// a fresh persister, so old instance doesn't overwrite
|
||||
// new instance's persisted state.
|
||||
// but copy old persister's content so that we always
|
||||
// pass Make() the last persisted state.
|
||||
if cfg.saved[i] != nil {
|
||||
cfg.saved[i] = cfg.saved[i].Copy()
|
||||
} else {
|
||||
cfg.saved[i] = MakePersister()
|
||||
}
|
||||
|
||||
cfg.mu.Unlock()
|
||||
|
||||
// listen to messages from Raft indicating newly committed messages.
|
||||
applyCh := make(chan ApplyMsg)
|
||||
go func() {
|
||||
for m := range applyCh {
|
||||
err_msg := ""
|
||||
if m.UseSnapshot {
|
||||
// ignore the snapshot
|
||||
} else if v, ok := (m.Command).(int); ok {
|
||||
cfg.mu.Lock()
|
||||
for j := 0; j < len(cfg.logs); j++ {
|
||||
if old, oldok := cfg.logs[j][m.Index]; oldok && old != v {
|
||||
// some server has already committed a different value for this entry!
|
||||
err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v",
|
||||
m.Index, i, m.Command, j, old)
|
||||
}
|
||||
}
|
||||
_, prevok := cfg.logs[i][m.Index-1]
|
||||
cfg.logs[i][m.Index] = v
|
||||
cfg.mu.Unlock()
|
||||
|
||||
if m.Index > 1 && prevok == false {
|
||||
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.Index)
|
||||
}
|
||||
} else {
|
||||
err_msg = fmt.Sprintf("committed command %v is not an int", m.Command)
|
||||
}
|
||||
|
||||
if err_msg != "" {
|
||||
log.Fatalf("apply error: %v\n", err_msg)
|
||||
cfg.applyErr[i] = err_msg
|
||||
// keep reading after error so that Raft doesn't block
|
||||
// holding locks...
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
rf := Make(ends, i, cfg.saved[i], applyCh)
|
||||
|
||||
cfg.mu.Lock()
|
||||
cfg.rafts[i] = rf
|
||||
cfg.mu.Unlock()
|
||||
|
||||
svc := labrpc.MakeService(rf)
|
||||
srv := labrpc.MakeServer()
|
||||
srv.AddService(svc)
|
||||
cfg.net.AddServer(i, srv)
|
||||
}
|
||||
|
||||
func (cfg *config) cleanup() {
|
||||
for i := 0; i < len(cfg.rafts); i++ {
|
||||
if cfg.rafts[i] != nil {
|
||||
cfg.rafts[i].Kill()
|
||||
}
|
||||
}
|
||||
atomic.StoreInt32(&cfg.done, 1)
|
||||
}
|
||||
|
||||
// attach server i to the net.
|
||||
func (cfg *config) connect(i int) {
|
||||
// fmt.Printf("connect(%d)\n", i)
|
||||
|
||||
cfg.connected[i] = true
|
||||
|
||||
// outgoing ClientEnds
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
if cfg.connected[j] {
|
||||
endname := cfg.endnames[i][j]
|
||||
cfg.net.Enable(endname, true)
|
||||
}
|
||||
}
|
||||
|
||||
// incoming ClientEnds
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
if cfg.connected[j] {
|
||||
endname := cfg.endnames[j][i]
|
||||
cfg.net.Enable(endname, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// detach server i from the net.
|
||||
func (cfg *config) disconnect(i int) {
|
||||
// fmt.Printf("disconnect(%d)\n", i)
|
||||
|
||||
cfg.connected[i] = false
|
||||
|
||||
// outgoing ClientEnds
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
if cfg.endnames[i] != nil {
|
||||
endname := cfg.endnames[i][j]
|
||||
cfg.net.Enable(endname, false)
|
||||
}
|
||||
}
|
||||
|
||||
// incoming ClientEnds
|
||||
for j := 0; j < cfg.n; j++ {
|
||||
if cfg.endnames[j] != nil {
|
||||
endname := cfg.endnames[j][i]
|
||||
cfg.net.Enable(endname, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *config) rpcCount(server int) int {
|
||||
return cfg.net.GetCount(server)
|
||||
}
|
||||
|
||||
func (cfg *config) setunreliable(unrel bool) {
|
||||
cfg.net.Reliable(!unrel)
|
||||
}
|
||||
|
||||
func (cfg *config) setlongreordering(longrel bool) {
|
||||
cfg.net.LongReordering(longrel)
|
||||
}
|
||||
|
||||
// check that there's exactly one leader.
|
||||
// try a few times in case re-elections are needed.
|
||||
func (cfg *config) checkOneLeader() int {
|
||||
for iters := 0; iters < 10; iters++ {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
leaders := make(map[int][]int)
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
if cfg.connected[i] {
|
||||
if t, leader := cfg.rafts[i].GetState(); leader {
|
||||
leaders[t] = append(leaders[t], i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lastTermWithLeader := -1
|
||||
for t, leaders := range leaders {
|
||||
if len(leaders) > 1 {
|
||||
cfg.t.Fatalf("term %d has %d (>1) leaders", t, len(leaders))
|
||||
}
|
||||
if t > lastTermWithLeader {
|
||||
lastTermWithLeader = t
|
||||
}
|
||||
}
|
||||
|
||||
if len(leaders) != 0 {
|
||||
return leaders[lastTermWithLeader][0]
|
||||
}
|
||||
}
|
||||
cfg.t.Fatalf("expected one leader, got none")
|
||||
return -1
|
||||
}
|
||||
|
||||
// check that everyone agrees on the term.
|
||||
func (cfg *config) checkTerms() int {
|
||||
term := -1
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
if cfg.connected[i] {
|
||||
xterm, _ := cfg.rafts[i].GetState()
|
||||
if term == -1 {
|
||||
term = xterm
|
||||
} else if term != xterm {
|
||||
cfg.t.Fatalf("servers disagree on term")
|
||||
}
|
||||
}
|
||||
}
|
||||
return term
|
||||
}
|
||||
|
||||
// check that there's no leader
|
||||
func (cfg *config) checkNoLeader() {
|
||||
for i := 0; i < cfg.n; i++ {
|
||||
if cfg.connected[i] {
|
||||
_, is_leader := cfg.rafts[i].GetState()
|
||||
if is_leader {
|
||||
cfg.t.Fatalf("expected no leader, but %v claims to be leader", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// how many servers think a log entry is committed?
|
||||
func (cfg *config) nCommitted(index int) (int, interface{}) {
|
||||
count := 0
|
||||
cmd := -1
|
||||
for i := 0; i < len(cfg.rafts); i++ {
|
||||
if cfg.applyErr[i] != "" {
|
||||
cfg.t.Fatal(cfg.applyErr[i])
|
||||
}
|
||||
|
||||
cfg.mu.Lock()
|
||||
cmd1, ok := cfg.logs[i][index]
|
||||
cfg.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
if count > 0 && cmd != cmd1 {
|
||||
cfg.t.Fatalf("committed values do not match: index %v, %v, %v\n",
|
||||
index, cmd, cmd1)
|
||||
}
|
||||
count += 1
|
||||
cmd = cmd1
|
||||
}
|
||||
}
|
||||
return count, cmd
|
||||
}
|
||||
|
||||
// wait for at least n servers to commit.
|
||||
// but don't wait forever.
|
||||
func (cfg *config) wait(index int, n int, startTerm int) interface{} {
|
||||
to := 10 * time.Millisecond
|
||||
for iters := 0; iters < 30; iters++ {
|
||||
nd, _ := cfg.nCommitted(index)
|
||||
if nd >= n {
|
||||
break
|
||||
}
|
||||
time.Sleep(to)
|
||||
if to < time.Second {
|
||||
to *= 2
|
||||
}
|
||||
if startTerm > -1 {
|
||||
for _, r := range cfg.rafts {
|
||||
if t, _ := r.GetState(); t > startTerm {
|
||||
// someone has moved on
|
||||
// can no longer guarantee that we'll "win"
|
||||
return -1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
nd, cmd := cfg.nCommitted(index)
|
||||
if nd < n {
|
||||
cfg.t.Fatalf("only %d decided for index %d; wanted %d\n",
|
||||
nd, index, n)
|
||||
}
|
||||
return cmd
|
||||
}
|
||||
|
||||
// do a complete agreement.
|
||||
// it might choose the wrong leader initially,
|
||||
// and have to re-submit after giving up.
|
||||
// entirely gives up after about 10 seconds.
|
||||
// indirectly checks that the servers agree on the
|
||||
// same value, since nCommitted() checks this,
|
||||
// as do the threads that read from applyCh.
|
||||
// returns index.
|
||||
func (cfg *config) one(cmd int, expectedServers int) int {
|
||||
t0 := time.Now()
|
||||
starts := 0
|
||||
for time.Since(t0).Seconds() < 10 {
|
||||
// try all the servers, maybe one is the leader.
|
||||
index := -1
|
||||
for si := 0; si < cfg.n; si++ {
|
||||
starts = (starts + 1) % cfg.n
|
||||
var rf *Raft
|
||||
cfg.mu.Lock()
|
||||
if cfg.connected[starts] {
|
||||
rf = cfg.rafts[starts]
|
||||
}
|
||||
cfg.mu.Unlock()
|
||||
if rf != nil {
|
||||
index1, _, ok := rf.Start(cmd)
|
||||
if ok {
|
||||
index = index1
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if index != -1 {
|
||||
// somebody claimed to be the leader and to have
|
||||
// submitted our command; wait a while for agreement.
|
||||
t1 := time.Now()
|
||||
for time.Since(t1).Seconds() < 2 {
|
||||
nd, cmd1 := cfg.nCommitted(index)
|
||||
if nd > 0 && nd >= expectedServers {
|
||||
// committed
|
||||
if cmd2, ok := cmd1.(int); ok && cmd2 == cmd {
|
||||
// and it was the command we submitted.
|
||||
return index
|
||||
}
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
} else {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
|
||||
return -1
|
||||
}
|
61
src/raft/persister.go
Normal file
61
src/raft/persister.go
Normal file
@ -0,0 +1,61 @@
|
||||
package raft
|
||||
|
||||
//
|
||||
// support for Raft and kvraft to save persistent
|
||||
// Raft state (log &c) and k/v server snapshots.
|
||||
//
|
||||
// we will use the original persister.go to test your code for grading.
|
||||
// so, while you can modify this code to help you debug, please
|
||||
// test with the original before submitting.
|
||||
//
|
||||
|
||||
import "sync"
|
||||
|
||||
type Persister struct {
|
||||
mu sync.Mutex
|
||||
raftstate []byte
|
||||
snapshot []byte
|
||||
}
|
||||
|
||||
func MakePersister() *Persister {
|
||||
return &Persister{}
|
||||
}
|
||||
|
||||
func (ps *Persister) Copy() *Persister {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
np := MakePersister()
|
||||
np.raftstate = ps.raftstate
|
||||
np.snapshot = ps.snapshot
|
||||
return np
|
||||
}
|
||||
|
||||
func (ps *Persister) SaveRaftState(data []byte) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
ps.raftstate = data
|
||||
}
|
||||
|
||||
func (ps *Persister) ReadRaftState() []byte {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
return ps.raftstate
|
||||
}
|
||||
|
||||
func (ps *Persister) RaftStateSize() int {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
return len(ps.raftstate)
|
||||
}
|
||||
|
||||
func (ps *Persister) SaveSnapshot(snapshot []byte) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
ps.snapshot = snapshot
|
||||
}
|
||||
|
||||
func (ps *Persister) ReadSnapshot() []byte {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
return ps.snapshot
|
||||
}
|
564
src/raft/raft.go
Normal file
564
src/raft/raft.go
Normal file
@ -0,0 +1,564 @@
|
||||
package raft
|
||||
|
||||
//
|
||||
// this is an outline of the API that raft must expose to
|
||||
// the service (or tester). see comments below for
|
||||
// each of these functions for more details.
|
||||
//
|
||||
// rf = Make(...)
|
||||
// create a new Raft server.
|
||||
// rf.Start(command interface{}) (index, term, isleader)
|
||||
// start agreement on a new log entry
|
||||
// rf.GetState() (term, isLeader)
|
||||
// ask a Raft for its current term, and whether it thinks it is leader
|
||||
// ApplyMsg
|
||||
// each time a new entry is committed to the log, each Raft peer
|
||||
// should send an ApplyMsg to the service (or tester)
|
||||
// in the same server.
|
||||
//
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"labrpc"
|
||||
"fmt"
|
||||
"time"
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// import "bytes"
|
||||
// import "encoding/gob"
|
||||
|
||||
|
||||
|
||||
//
|
||||
// as each Raft peer becomes aware that successive log entries are
|
||||
// committed, the peer should send an ApplyMsg to the service (or
|
||||
// tester) on the same server, via the applyCh passed to Make().
|
||||
//
|
||||
type ApplyMsg struct {
|
||||
Index int
|
||||
Command interface{}
|
||||
UseSnapshot bool // ignore for lab2; only used in lab3
|
||||
Snapshot []byte // ignore for lab2; only used in lab3
|
||||
}
|
||||
|
||||
//
|
||||
// A Go object implementing a single Raft peer.
|
||||
//
|
||||
type Raft struct {
|
||||
mu sync.Mutex
|
||||
peers []*labrpc.ClientEnd
|
||||
persister *Persister
|
||||
me int // index into peers[]
|
||||
|
||||
applyCh chan ApplyMsg
|
||||
|
||||
// Your data here.
|
||||
// Look at the paper's Figure 2 for a description of what
|
||||
// state a Raft server must maintain.
|
||||
currentTerm int
|
||||
votedFor int
|
||||
logs []map[string]interface{}
|
||||
|
||||
/**/
|
||||
commitIndex int
|
||||
lastApplied int
|
||||
|
||||
/**/
|
||||
nextIndex []int
|
||||
matchIndex []int
|
||||
|
||||
role int //0-follower, 1-candidate, 2-leader
|
||||
|
||||
electionTimeout time.Duration
|
||||
heartBeatTimeout time.Duration
|
||||
|
||||
heartBeatTicker *time.Ticker
|
||||
|
||||
lastHeart time.Time
|
||||
|
||||
running bool
|
||||
|
||||
counter int64
|
||||
|
||||
showLog bool
|
||||
}
|
||||
|
||||
// return currentTerm and whether this server
|
||||
// believes it is the leader.
|
||||
func (rf *Raft) GetState() (int, bool) {
|
||||
if rf.showLog{
|
||||
fmt.Println(rf.me, "says: current state, {", rf.currentTerm, ",", rf.votedFor, ",", rf.logs ,"}")
|
||||
}
|
||||
return rf.currentTerm, rf.role==2
|
||||
}
|
||||
|
||||
func (rf *Raft) GetState2() (bool, []map[string]interface{}, int) {
|
||||
if rf.showLog{
|
||||
fmt.Println(rf.me, "says: current state, {", rf.currentTerm, ",", rf.votedFor, ",", rf.logs ,"}")
|
||||
}
|
||||
return rf.role==2, rf.logs, rf.commitIndex
|
||||
}
|
||||
|
||||
//
|
||||
// save Raft's persistent state to stable storage,
|
||||
// where it can later be retrieved after a crash and restart.
|
||||
// see paper's Figure 2 for a description of what should be persistent.
|
||||
//
|
||||
func (rf *Raft) persist() {
|
||||
w := new(bytes.Buffer)
|
||||
e := gob.NewEncoder(w)
|
||||
e.Encode(rf.currentTerm)
|
||||
e.Encode(rf.votedFor)
|
||||
e.Encode(rf.logs)
|
||||
data := w.Bytes()
|
||||
rf.persister.SaveRaftState(data)
|
||||
}
|
||||
|
||||
//
|
||||
// restore previously persisted state.
|
||||
//
|
||||
func (rf *Raft) readPersist(data []byte) {
|
||||
// Your code here.
|
||||
r := bytes.NewBuffer(data)
|
||||
d := gob.NewDecoder(r)
|
||||
d.Decode(&rf.currentTerm)
|
||||
d.Decode(&rf.votedFor)
|
||||
d.Decode(&rf.logs)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
//
|
||||
// example RequestVote RPC arguments structure.
|
||||
//
|
||||
type RequestVoteArgs struct {
|
||||
// Your data here.
|
||||
Term int
|
||||
CandidateId int
|
||||
LastLogIndex int
|
||||
LastLogTerm int
|
||||
}
|
||||
|
||||
//
|
||||
// example RequestVote RPC reply structure.
|
||||
//
|
||||
type RequestVoteReply struct {
|
||||
// Your data here.
|
||||
Term int
|
||||
VoteGranted bool
|
||||
}
|
||||
|
||||
type AppendEntriesArgs struct {
|
||||
Term int
|
||||
LeaderId int
|
||||
PrevLogIndex int
|
||||
PrevLogTerm int
|
||||
Entries []map[string]interface{}
|
||||
LeaderCommit int
|
||||
}
|
||||
|
||||
type AppendEntriesReply struct {
|
||||
Term int
|
||||
Success bool
|
||||
Len int
|
||||
}
|
||||
|
||||
//
|
||||
// example RequestVote RPC handler.
|
||||
//
|
||||
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
|
||||
rf.mu.Lock()
|
||||
defer rf.mu.Unlock()
|
||||
if args.Term < rf.currentTerm {
|
||||
reply.VoteGranted = false
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", args.CandidateId, ": you are to late, highest term is", rf.currentTerm)
|
||||
}
|
||||
} else {
|
||||
/* whatever, if higher term found, switch to follower */
|
||||
if args.Term > rf.currentTerm {
|
||||
rf.role = 0
|
||||
rf.votedFor = -1
|
||||
rf.currentTerm = args.Term
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says: higher term detected, term=", args.Term)
|
||||
}
|
||||
}
|
||||
|
||||
/* check log first */
|
||||
lastLogTerm := 0
|
||||
if len(rf.logs) > 0 {
|
||||
lastLogTerm = rf.logs[len(rf.logs) - 1]["term"].(int);
|
||||
}
|
||||
if ((rf.votedFor == -1 || rf.votedFor == args.CandidateId) && (lastLogTerm < args.LastLogTerm || (lastLogTerm == args.LastLogTerm && len(rf.logs) <= args.LastLogIndex) )) {
|
||||
reply.VoteGranted = true
|
||||
rf.role = 0
|
||||
rf.votedFor = args.CandidateId
|
||||
rf.lastHeart = time.Now()
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", args.CandidateId, ": vote granted")
|
||||
}
|
||||
} else {
|
||||
reply.VoteGranted = false
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", args.CandidateId, ": vote rejected")
|
||||
}
|
||||
}
|
||||
}
|
||||
rf.persist()
|
||||
reply.Term = rf.currentTerm
|
||||
}
|
||||
|
||||
//
|
||||
// example code to send a RequestVote RPC to a server.
|
||||
// server is the index of the target server in rf.peers[].
|
||||
// expects RPC arguments in args.
|
||||
// fills in *reply with RPC reply, so caller should
|
||||
// pass &reply.
|
||||
// the types of the args and reply passed to Call() must be
|
||||
// the same as the types of the arguments declared in the
|
||||
// handler function (including whether they are pointers).
|
||||
//
|
||||
// returns true if labrpc says the RPC was delivered.
|
||||
//
|
||||
// if you're having trouble getting RPC to work, check that you've
|
||||
// capitalized all field names in structs passed over RPC, and
|
||||
// that the caller passes the address of the reply struct with &, not
|
||||
// the struct itself.
|
||||
//
|
||||
func (rf *Raft) sendRequestVote(server int, args RequestVoteArgs, reply *RequestVoteReply) bool {
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", server, ": vote me, ", args)
|
||||
}
|
||||
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
|
||||
if ok{
|
||||
if reply.Term > rf.currentTerm {
|
||||
rf.mu.Lock()
|
||||
rf.currentTerm = reply.Term
|
||||
rf.role = 0
|
||||
rf.votedFor = -1
|
||||
rf.persist()
|
||||
rf.mu.Unlock()
|
||||
}
|
||||
}
|
||||
return ok && reply.VoteGranted
|
||||
}
|
||||
|
||||
func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
|
||||
rf.mu.Lock()
|
||||
defer rf.mu.Unlock()
|
||||
|
||||
if args.Term < rf.currentTerm {
|
||||
reply.Success = false
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", args.LeaderId, ": you are too late")
|
||||
}
|
||||
} else {
|
||||
reply.Success = true
|
||||
rf.currentTerm = args.Term
|
||||
rf.role = 0
|
||||
rf.votedFor = args.LeaderId
|
||||
|
||||
if args.PrevLogIndex > 0 {
|
||||
if len(rf.logs) >= args.PrevLogIndex && rf.logs[args.PrevLogIndex-1]["term"] == args.PrevLogTerm {
|
||||
reply.Success = true
|
||||
} else {
|
||||
reply.Success = false
|
||||
reply.Len = len(rf.logs)
|
||||
rf.lastHeart = time.Now()
|
||||
//fmt.Println(rf.me, "tells", args.LeaderId, ": I missed something. Here is my logs,", rf.logs)
|
||||
}
|
||||
}
|
||||
}
|
||||
reply.Term = rf.currentTerm
|
||||
if reply.Success {
|
||||
rf.logs = rf.logs[0:args.PrevLogIndex]
|
||||
//sync logs && apply
|
||||
rf.logs = append(rf.logs, args.Entries...)
|
||||
for iter:=rf.commitIndex; iter < args.LeaderCommit; iter++ {
|
||||
command := rf.logs[iter]["command"]
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says: commit", command, "index=", iter+1)
|
||||
}
|
||||
rf.applyCh <- ApplyMsg{Index: iter + 1, Command:command, UseSnapshot:false, }
|
||||
}
|
||||
rf.commitIndex = args.LeaderCommit
|
||||
//rf.lastApplied = len(rf.logs)
|
||||
rf.lastHeart = time.Now()
|
||||
}
|
||||
rf.persist()
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", args.LeaderId, ": pong,", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool {
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "tells", server, ": ping,", args)
|
||||
}
|
||||
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
|
||||
if ok {
|
||||
if reply.Term > rf.currentTerm {
|
||||
rf.mu.Lock()
|
||||
rf.currentTerm = reply.Term
|
||||
rf.role = 0
|
||||
rf.votedFor = -1
|
||||
rf.persist()
|
||||
rf.mu.Unlock()
|
||||
}
|
||||
if ok && rf.role == 2 && !reply.Success {
|
||||
//if args.PrevLogIndex > 0 {
|
||||
rf.nextIndex[server] = args.PrevLogIndex
|
||||
if reply.Len < args.PrevLogIndex {
|
||||
rf.nextIndex[server] = reply.Len + 1
|
||||
}
|
||||
//use snapshot
|
||||
rf.nextIndex[server] = 1
|
||||
//}
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says: decrease nextindex of", server, "to", rf.nextIndex[server])
|
||||
}
|
||||
}
|
||||
}
|
||||
return ok && reply.Success
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// the service using Raft (e.g. a k/v server) wants to start
|
||||
// agreement on the next command to be appended to Raft's log. if this
|
||||
// server isn't the leader, returns false. otherwise start the
|
||||
// agreement and return immediately. there is no guarantee that this
|
||||
// command will ever be committed to the Raft log, since the leader
|
||||
// may fail or lose an election.
|
||||
//
|
||||
// the first return value is the index that the command will appear at
|
||||
// if it's ever committed. the second return value is the current
|
||||
// term. the third return value is true if this server believes it is
|
||||
// the leader.
|
||||
//
|
||||
func (rf *Raft) Start(command interface{}) (int, int, bool) {
|
||||
rf.mu.Lock()
|
||||
defer rf.mu.Unlock()
|
||||
index := len(rf.logs) + 1
|
||||
if rf.role == 2 {
|
||||
log := map[string]interface{}{"command": command, "term": rf.currentTerm}
|
||||
rf.logs = append(rf.logs, log)
|
||||
rf.persist()
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says:", "new command", command, "in term", rf.currentTerm)
|
||||
}
|
||||
} else {
|
||||
//fmt.Println(rf.me, "says: I am not a leader")
|
||||
}
|
||||
return index, rf.currentTerm, rf.role==2
|
||||
}
|
||||
|
||||
//
|
||||
// the tester calls Kill() when a Raft instance won't
|
||||
// be needed again. you are not required to do anything
|
||||
// in Kill(), but it might be convenient to (for example)
|
||||
// turn off debug output from this instance.
|
||||
//
|
||||
func (rf *Raft) Kill() {
|
||||
/* stop the world */
|
||||
rf.running = false
|
||||
}
|
||||
|
||||
//
|
||||
// the service or tester wants to create a Raft server. the ports
|
||||
// of all the Raft servers (including this one) are in peers[]. this
|
||||
// server's port is peers[me]. all the servers' peers[] arrays
|
||||
// have the same order. persister is a place for this server to
|
||||
// save its persistent state, and also initially holds the most
|
||||
// recent saved state, if any. applyCh is a channel on which the
|
||||
// tester or service expects Raft to send ApplyMsg messages.
|
||||
// Make() must return quickly, so it should start goroutines
|
||||
// for any long-running work.
|
||||
//
|
||||
func Make(peers []*labrpc.ClientEnd, me int,
|
||||
persister *Persister, applyCh chan ApplyMsg) *Raft {
|
||||
|
||||
fmt.Println(me, "says:", "hello world!")
|
||||
|
||||
rf := &Raft{}
|
||||
rf.peers = peers
|
||||
rf.persister = persister
|
||||
rf.me = me
|
||||
|
||||
rf.applyCh = applyCh
|
||||
rf.running = true
|
||||
|
||||
rf.showLog = false
|
||||
|
||||
// initialize from state persisted before a crash
|
||||
rf.readPersist(persister.ReadRaftState())
|
||||
|
||||
// Your initialization code here.
|
||||
rf.electionTimeout = time.Duration(rand.Intn(100) + 100) * time.Millisecond
|
||||
rf.heartBeatTimeout = time.Duration(rand.Intn(50) + 50) * time.Millisecond
|
||||
rf.counter = 0
|
||||
rf.lastHeart = time.Now()
|
||||
rf.heartBeatTicker = time.NewTicker(rf.heartBeatTimeout)
|
||||
|
||||
rf.role = 0 //start in follower state
|
||||
|
||||
rf.commitIndex = 0
|
||||
rf.lastApplied = 0
|
||||
|
||||
|
||||
go func() {
|
||||
for ;;{
|
||||
if !rf.running {
|
||||
break
|
||||
}
|
||||
/* wait timeout */
|
||||
time.Sleep(rf.electionTimeout - time.Since(rf.lastHeart))
|
||||
|
||||
if rf.role != 2 && time.Since(rf.lastHeart) >= rf.electionTimeout {
|
||||
rf.mu.Lock()
|
||||
// re-generate time
|
||||
rf.electionTimeout = time.Duration(rand.Intn(100) + 100) * time.Millisecond
|
||||
rf.currentTerm ++
|
||||
rf.votedFor = rf.me
|
||||
rf.role = 1
|
||||
rf.persist()
|
||||
rf.mu.Unlock()
|
||||
rf.doVote()
|
||||
}
|
||||
/* sleep at most electionTimeout duration */
|
||||
if time.Since(rf.lastHeart) >= rf.electionTimeout {
|
||||
rf.lastHeart = time.Now()
|
||||
}
|
||||
}
|
||||
fmt.Println(rf.me, "says: bye~")
|
||||
}()
|
||||
return rf
|
||||
}
|
||||
|
||||
func (rf *Raft) doVote() {
|
||||
rf.mu.Lock()
|
||||
defer rf.mu.Unlock()
|
||||
|
||||
var agreed int64 = 1
|
||||
index := len(rf.logs)
|
||||
term := 0
|
||||
if index != 0 {
|
||||
term = rf.logs[index - 1]["term"].(int)
|
||||
}
|
||||
for i:=0;i< len(rf.peers);i++{
|
||||
if i != rf.me {
|
||||
go func(peer int, currTerm int, index int, term int) {
|
||||
args := RequestVoteArgs{Term: currTerm, CandidateId:rf.me, LastLogIndex:index, LastLogTerm:term}
|
||||
reply := RequestVoteReply{}
|
||||
ok := rf.sendRequestVote(peer, args, &reply)
|
||||
rf.mu.Lock()
|
||||
if ok && args.Term == rf.currentTerm && rf.role == 1{
|
||||
atomic.AddInt64(&agreed, 1)
|
||||
if (int(agreed) * 2 > len(rf.peers)) {
|
||||
rf.role = 2
|
||||
|
||||
rf.nextIndex = rf.nextIndex[0:0]
|
||||
rf.matchIndex = rf.matchIndex[0:0]
|
||||
for i:=0;i<len(rf.peers);i++ {
|
||||
rf.nextIndex = append(rf.nextIndex, len(rf.logs) + 1)
|
||||
rf.matchIndex = append(rf.matchIndex, 0)
|
||||
}
|
||||
/* persist state */
|
||||
rf.persist()
|
||||
go rf.doSubmit()
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says:", "I am the leader in term", rf.currentTerm)
|
||||
}
|
||||
}
|
||||
}
|
||||
rf.mu.Unlock()
|
||||
}(i, rf.currentTerm, index, term)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func (rf *Raft) doSubmit() {
|
||||
/* ensure only one thread is running */
|
||||
if atomic.AddInt64(&rf.counter, 1) > 1 {
|
||||
atomic.AddInt64(&rf.counter, -1)
|
||||
return
|
||||
}
|
||||
for range rf.heartBeatTicker.C {
|
||||
if !rf.running {
|
||||
break
|
||||
}
|
||||
if rf.role != 2 {
|
||||
break
|
||||
}
|
||||
for i:=0;i< len(rf.peers);i++{
|
||||
if i != rf.me {
|
||||
go func(peer int) {
|
||||
rf.mu.Lock()
|
||||
index := rf.nextIndex[peer]
|
||||
term := 0
|
||||
/* TODO: limit entries max size */
|
||||
entries := make([]map[string]interface{}, 0)
|
||||
if len(rf.logs) >= index {
|
||||
entries = append(entries, rf.logs[index-1:]...)
|
||||
}
|
||||
if index > 1 {
|
||||
//fmt.Println(index, rf.logs)
|
||||
term = rf.logs[index - 2]["term"].(int)
|
||||
}
|
||||
args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId:rf.me, PrevLogIndex:index - 1, PrevLogTerm:term, Entries:entries, LeaderCommit:rf.commitIndex}
|
||||
reply := AppendEntriesReply{}
|
||||
rf.nextIndex[peer] = args.PrevLogIndex + len(entries) + 1
|
||||
rf.mu.Unlock()
|
||||
|
||||
ok := rf.sendAppendEntries(peer, args, &reply)
|
||||
|
||||
rf.mu.Lock()
|
||||
if ok && args.Term == rf.currentTerm && rf.role == 2 {
|
||||
/* TODO, think of package re-ordering, resulting previous log entries lose actually */
|
||||
rf.matchIndex[peer] = args.PrevLogIndex + len(entries)
|
||||
/* update commitIndex */
|
||||
for iter:=rf.commitIndex; iter<len(rf.logs); iter++ {
|
||||
if rf.logs[iter]["term"].(int) < rf.currentTerm {
|
||||
continue
|
||||
}
|
||||
count := 1
|
||||
for j:=0;j<len(rf.peers);j++ {
|
||||
if j != rf.me {
|
||||
if rf.matchIndex[j] > iter {
|
||||
count++
|
||||
}
|
||||
//fmt.Println(j, rf.matchIndex[j], count)
|
||||
}
|
||||
}
|
||||
|
||||
if count * 2 > len(rf.peers){
|
||||
for i:=rf.commitIndex; i<=iter; i++ {
|
||||
rf.commitIndex = i + 1
|
||||
command := rf.logs[i]["command"]
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says: ", command, "is committed, index=", i + 1)
|
||||
}
|
||||
rf.applyCh <- ApplyMsg{Index: i + 1, Command: command, UseSnapshot:false, }
|
||||
}
|
||||
} else { // commit in order
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
rf.mu.Unlock()
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
}
|
||||
if rf.showLog {
|
||||
fmt.Println(rf.me, "says: stop heart beat")
|
||||
}
|
||||
atomic.AddInt64(&rf.counter, -1)
|
||||
}
|
929
src/raft/test_test.go
Normal file
929
src/raft/test_test.go
Normal file
@ -0,0 +1,929 @@
|
||||
package raft
|
||||
|
||||
//
|
||||
// Raft tests.
|
||||
//
|
||||
// we will use the original test_test.go to test your code for grading.
|
||||
// so, while you can modify this code to help you debug, please
|
||||
// test with the original before submitting.
|
||||
//
|
||||
|
||||
import "testing"
|
||||
import "fmt"
|
||||
import "time"
|
||||
import "math/rand"
|
||||
import "sync/atomic"
|
||||
import "sync"
|
||||
|
||||
// The tester generously allows solutions to complete elections in one second
|
||||
// (much more than the paper's range of timeouts).
|
||||
const RaftElectionTimeout = 1000 * time.Millisecond
|
||||
|
||||
func TestInitialElection(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: initial election ...\n")
|
||||
|
||||
// is a leader elected?
|
||||
cfg.checkOneLeader()
|
||||
|
||||
// does the leader+term stay the same there is no failure?
|
||||
term1 := cfg.checkTerms()
|
||||
time.Sleep(2 * RaftElectionTimeout)
|
||||
term2 := cfg.checkTerms()
|
||||
if term1 != term2 {
|
||||
fmt.Printf("warning: term changed even though there were no failures")
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestReElection(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: election after network failure ...\n")
|
||||
|
||||
leader1 := cfg.checkOneLeader()
|
||||
|
||||
// if the leader disconnects, a new one should be elected.
|
||||
cfg.disconnect(leader1)
|
||||
cfg.checkOneLeader()
|
||||
|
||||
// if the old leader rejoins, that shouldn't
|
||||
// disturb the old leader.
|
||||
cfg.connect(leader1)
|
||||
leader2 := cfg.checkOneLeader()
|
||||
|
||||
// if there's no quorum, no leader should
|
||||
// be elected.
|
||||
cfg.disconnect(leader2)
|
||||
cfg.disconnect((leader2 + 1) % servers)
|
||||
time.Sleep(2 * RaftElectionTimeout)
|
||||
cfg.checkNoLeader()
|
||||
|
||||
// if a quorum arises, it should elect a leader.
|
||||
cfg.connect((leader2 + 1) % servers)
|
||||
cfg.checkOneLeader()
|
||||
|
||||
// re-join of last node shouldn't prevent leader from existing.
|
||||
cfg.connect(leader2)
|
||||
cfg.checkOneLeader()
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestBasicAgree(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: basic agreement ...\n")
|
||||
|
||||
iters := 3
|
||||
for index := 1; index < iters+1; index++ {
|
||||
nd, _ := cfg.nCommitted(index)
|
||||
if nd > 0 {
|
||||
t.Fatalf("some have committed before Start()")
|
||||
}
|
||||
|
||||
xindex := cfg.one(index*100, servers)
|
||||
if xindex != index {
|
||||
t.Fatalf("got index %v but expected %v", xindex, index)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestFailAgree(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: agreement despite follower failure ...\n")
|
||||
|
||||
cfg.one(101, servers)
|
||||
|
||||
// follower network failure
|
||||
leader := cfg.checkOneLeader()
|
||||
cfg.disconnect((leader + 1) % servers)
|
||||
|
||||
// agree despite one failed server?
|
||||
cfg.one(102, servers-1)
|
||||
cfg.one(103, servers-1)
|
||||
time.Sleep(RaftElectionTimeout)
|
||||
cfg.one(104, servers-1)
|
||||
cfg.one(105, servers-1)
|
||||
|
||||
// failed server re-connected
|
||||
cfg.connect((leader + 1) % servers)
|
||||
|
||||
// agree with full set of servers?
|
||||
cfg.one(106, servers)
|
||||
time.Sleep(RaftElectionTimeout)
|
||||
cfg.one(107, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestFailNoAgree(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: no agreement if too many followers fail ...\n")
|
||||
|
||||
cfg.one(10, servers)
|
||||
|
||||
// 3 of 5 followers disconnect
|
||||
leader := cfg.checkOneLeader()
|
||||
cfg.disconnect((leader + 1) % servers)
|
||||
cfg.disconnect((leader + 2) % servers)
|
||||
cfg.disconnect((leader + 3) % servers)
|
||||
|
||||
index, _, ok := cfg.rafts[leader].Start(20)
|
||||
if ok != true {
|
||||
t.Fatalf("leader rejected Start()")
|
||||
}
|
||||
if index != 2 {
|
||||
t.Fatalf("expected index 2, got %v", index)
|
||||
}
|
||||
|
||||
time.Sleep(2 * RaftElectionTimeout)
|
||||
|
||||
n, _ := cfg.nCommitted(index)
|
||||
if n > 0 {
|
||||
t.Fatalf("%v committed but no majority", n)
|
||||
}
|
||||
|
||||
// repair failures
|
||||
cfg.connect((leader + 1) % servers)
|
||||
cfg.connect((leader + 2) % servers)
|
||||
cfg.connect((leader + 3) % servers)
|
||||
|
||||
// the disconnected majority may have chosen a leader from
|
||||
// among their own ranks, forgetting index 2.
|
||||
// or perhaps
|
||||
leader2 := cfg.checkOneLeader()
|
||||
index2, _, ok2 := cfg.rafts[leader2].Start(30)
|
||||
if ok2 == false {
|
||||
t.Fatalf("leader2 rejected Start()")
|
||||
}
|
||||
if index2 < 2 || index2 > 3 {
|
||||
t.Fatalf("unexpected index %v", index2)
|
||||
}
|
||||
|
||||
cfg.one(1000, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestConcurrentStarts(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: concurrent Start()s ...\n")
|
||||
|
||||
var success bool
|
||||
loop:
|
||||
for try := 0; try < 5; try++ {
|
||||
if try > 0 {
|
||||
// give solution some time to settle
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
leader := cfg.checkOneLeader()
|
||||
_, term, ok := cfg.rafts[leader].Start(1)
|
||||
if !ok {
|
||||
// leader moved on really quickly
|
||||
continue
|
||||
}
|
||||
|
||||
iters := 5
|
||||
var wg sync.WaitGroup
|
||||
is := make(chan int, iters)
|
||||
for ii := 0; ii < iters; ii++ {
|
||||
wg.Add(1)
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
i, term1, ok := cfg.rafts[leader].Start(100 + i)
|
||||
if term1 != term {
|
||||
return
|
||||
}
|
||||
if ok != true {
|
||||
return
|
||||
}
|
||||
is <- i
|
||||
}(ii)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(is)
|
||||
|
||||
for j := 0; j < servers; j++ {
|
||||
if t, _ := cfg.rafts[j].GetState(); t != term {
|
||||
// term changed -- can't expect low RPC counts
|
||||
continue loop
|
||||
}
|
||||
}
|
||||
|
||||
failed := false
|
||||
cmds := []int{}
|
||||
for index := range is {
|
||||
cmd := cfg.wait(index, servers, term)
|
||||
if ix, ok := cmd.(int); ok {
|
||||
if ix == -1 {
|
||||
// peers have moved on to later terms
|
||||
// so we can't expect all Start()s to
|
||||
// have succeeded
|
||||
failed = true
|
||||
break
|
||||
}
|
||||
cmds = append(cmds, ix)
|
||||
} else {
|
||||
t.Fatalf("value %v is not an int", cmd)
|
||||
}
|
||||
}
|
||||
|
||||
if failed {
|
||||
// avoid leaking goroutines
|
||||
go func() {
|
||||
for range is {
|
||||
}
|
||||
}()
|
||||
continue
|
||||
}
|
||||
|
||||
for ii := 0; ii < iters; ii++ {
|
||||
x := 100 + ii
|
||||
ok := false
|
||||
for j := 0; j < len(cmds); j++ {
|
||||
if cmds[j] == x {
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if ok == false {
|
||||
t.Fatalf("cmd %v missing in %v", x, cmds)
|
||||
}
|
||||
}
|
||||
|
||||
success = true
|
||||
break
|
||||
}
|
||||
|
||||
if !success {
|
||||
t.Fatalf("term changed too often")
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestRejoin(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: rejoin of partitioned leader ...\n")
|
||||
|
||||
cfg.one(101, servers)
|
||||
|
||||
// leader network failure
|
||||
leader1 := cfg.checkOneLeader()
|
||||
cfg.disconnect(leader1)
|
||||
|
||||
// make old leader try to agree on some entries
|
||||
cfg.rafts[leader1].Start(102)
|
||||
cfg.rafts[leader1].Start(103)
|
||||
cfg.rafts[leader1].Start(104)
|
||||
|
||||
// new leader commits, also for index=2
|
||||
cfg.one(103, 2)
|
||||
|
||||
// new leader network failure
|
||||
leader2 := cfg.checkOneLeader()
|
||||
cfg.disconnect(leader2)
|
||||
|
||||
// old leader connected again
|
||||
cfg.connect(leader1)
|
||||
|
||||
cfg.one(104, 2)
|
||||
|
||||
// all together now
|
||||
cfg.connect(leader2)
|
||||
|
||||
cfg.one(105, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestBackup(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: leader backs up quickly over incorrect follower logs ...\n")
|
||||
|
||||
cfg.one(rand.Int(), servers)
|
||||
|
||||
// put leader and one follower in a partition
|
||||
leader1 := cfg.checkOneLeader()
|
||||
cfg.disconnect((leader1 + 2) % servers)
|
||||
cfg.disconnect((leader1 + 3) % servers)
|
||||
cfg.disconnect((leader1 + 4) % servers)
|
||||
|
||||
// submit lots of commands that won't commit
|
||||
for i := 0; i < 50; i++ {
|
||||
cfg.rafts[leader1].Start(rand.Int())
|
||||
}
|
||||
|
||||
time.Sleep(RaftElectionTimeout / 2)
|
||||
|
||||
cfg.disconnect((leader1 + 0) % servers)
|
||||
cfg.disconnect((leader1 + 1) % servers)
|
||||
|
||||
// allow other partition to recover
|
||||
cfg.connect((leader1 + 2) % servers)
|
||||
cfg.connect((leader1 + 3) % servers)
|
||||
cfg.connect((leader1 + 4) % servers)
|
||||
|
||||
// lots of successful commands to new group.
|
||||
for i := 0; i < 50; i++ {
|
||||
cfg.one(rand.Int(), 3)
|
||||
}
|
||||
|
||||
// now another partitioned leader and one follower
|
||||
leader2 := cfg.checkOneLeader()
|
||||
other := (leader1 + 2) % servers
|
||||
if leader2 == other {
|
||||
other = (leader2 + 1) % servers
|
||||
}
|
||||
cfg.disconnect(other)
|
||||
|
||||
// lots more commands that won't commit
|
||||
for i := 0; i < 50; i++ {
|
||||
cfg.rafts[leader2].Start(rand.Int())
|
||||
}
|
||||
|
||||
time.Sleep(RaftElectionTimeout / 2)
|
||||
|
||||
// bring original leader back to life,
|
||||
for i := 0; i < servers; i++ {
|
||||
cfg.disconnect(i)
|
||||
}
|
||||
cfg.connect((leader1 + 0) % servers)
|
||||
cfg.connect((leader1 + 1) % servers)
|
||||
cfg.connect(other)
|
||||
|
||||
// lots of successful commands to new group.
|
||||
for i := 0; i < 50; i++ {
|
||||
cfg.one(rand.Int(), 3)
|
||||
}
|
||||
|
||||
// now everyone
|
||||
for i := 0; i < servers; i++ {
|
||||
cfg.connect(i)
|
||||
}
|
||||
cfg.one(rand.Int(), servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestCount(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: RPC counts aren't too high ...\n")
|
||||
|
||||
rpcs := func() (n int) {
|
||||
for j := 0; j < servers; j++ {
|
||||
n += cfg.rpcCount(j)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
leader := cfg.checkOneLeader()
|
||||
|
||||
total1 := rpcs()
|
||||
|
||||
if total1 > 30 || total1 < 1 {
|
||||
t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1)
|
||||
}
|
||||
|
||||
var total2 int
|
||||
var success bool
|
||||
loop:
|
||||
for try := 0; try < 5; try++ {
|
||||
if try > 0 {
|
||||
// give solution some time to settle
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
|
||||
leader = cfg.checkOneLeader()
|
||||
total1 = rpcs()
|
||||
|
||||
iters := 10
|
||||
starti, term, ok := cfg.rafts[leader].Start(1)
|
||||
if !ok {
|
||||
// leader moved on really quickly
|
||||
continue
|
||||
}
|
||||
cmds := []int{}
|
||||
for i := 1; i < iters+2; i++ {
|
||||
x := int(rand.Int31())
|
||||
cmds = append(cmds, x)
|
||||
index1, term1, ok := cfg.rafts[leader].Start(x)
|
||||
if term1 != term {
|
||||
// Term changed while starting
|
||||
continue loop
|
||||
}
|
||||
if !ok {
|
||||
// No longer the leader, so term has changed
|
||||
continue loop
|
||||
}
|
||||
if starti+i != index1 {
|
||||
t.Fatalf("Start() failed")
|
||||
}
|
||||
}
|
||||
|
||||
for i := 1; i < iters+1; i++ {
|
||||
cmd := cfg.wait(starti+i, servers, term)
|
||||
if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] {
|
||||
if ix == -1 {
|
||||
// term changed -- try again
|
||||
continue loop
|
||||
}
|
||||
t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds)
|
||||
}
|
||||
}
|
||||
|
||||
failed := false
|
||||
total2 = 0
|
||||
for j := 0; j < servers; j++ {
|
||||
if t, _ := cfg.rafts[j].GetState(); t != term {
|
||||
// term changed -- can't expect low RPC counts
|
||||
// need to keep going to update total2
|
||||
failed = true
|
||||
}
|
||||
total2 += cfg.rpcCount(j)
|
||||
}
|
||||
|
||||
if failed {
|
||||
continue loop
|
||||
}
|
||||
|
||||
if total2-total1 > (iters+1+3)*3 {
|
||||
t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters)
|
||||
}
|
||||
|
||||
success = true
|
||||
break
|
||||
}
|
||||
|
||||
if !success {
|
||||
t.Fatalf("term changed too often")
|
||||
}
|
||||
|
||||
time.Sleep(RaftElectionTimeout)
|
||||
|
||||
total3 := 0
|
||||
for j := 0; j < servers; j++ {
|
||||
total3 += cfg.rpcCount(j)
|
||||
}
|
||||
|
||||
if total3-total2 > 3*20 {
|
||||
t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2)
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestPersist1(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: basic persistence ...\n")
|
||||
|
||||
cfg.one(11, servers)
|
||||
|
||||
// crash and re-start all
|
||||
for i := 0; i < servers; i++ {
|
||||
cfg.start1(i)
|
||||
}
|
||||
for i := 0; i < servers; i++ {
|
||||
cfg.disconnect(i)
|
||||
cfg.connect(i)
|
||||
}
|
||||
|
||||
cfg.one(12, servers)
|
||||
|
||||
leader1 := cfg.checkOneLeader()
|
||||
cfg.disconnect(leader1)
|
||||
cfg.start1(leader1)
|
||||
cfg.connect(leader1)
|
||||
|
||||
cfg.one(13, servers)
|
||||
|
||||
leader2 := cfg.checkOneLeader()
|
||||
cfg.disconnect(leader2)
|
||||
cfg.one(14, servers-1)
|
||||
cfg.start1(leader2)
|
||||
cfg.connect(leader2)
|
||||
|
||||
cfg.wait(4, servers, -1) // wait for leader2 to join before killing i3
|
||||
|
||||
i3 := (cfg.checkOneLeader() + 1) % servers
|
||||
cfg.disconnect(i3)
|
||||
cfg.one(15, servers-1)
|
||||
cfg.start1(i3)
|
||||
cfg.connect(i3)
|
||||
|
||||
cfg.one(16, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestPersist2(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: more persistence ...\n")
|
||||
|
||||
index := 1
|
||||
for iters := 0; iters < 5; iters++ {
|
||||
cfg.one(10+index, servers)
|
||||
index++
|
||||
|
||||
leader1 := cfg.checkOneLeader()
|
||||
|
||||
cfg.disconnect((leader1 + 1) % servers)
|
||||
cfg.disconnect((leader1 + 2) % servers)
|
||||
|
||||
cfg.one(10+index, servers-2)
|
||||
index++
|
||||
|
||||
cfg.disconnect((leader1 + 0) % servers)
|
||||
cfg.disconnect((leader1 + 3) % servers)
|
||||
cfg.disconnect((leader1 + 4) % servers)
|
||||
|
||||
cfg.start1((leader1 + 1) % servers)
|
||||
cfg.start1((leader1 + 2) % servers)
|
||||
cfg.connect((leader1 + 1) % servers)
|
||||
cfg.connect((leader1 + 2) % servers)
|
||||
|
||||
time.Sleep(RaftElectionTimeout)
|
||||
|
||||
cfg.start1((leader1 + 3) % servers)
|
||||
cfg.connect((leader1 + 3) % servers)
|
||||
|
||||
cfg.one(10+index, servers-2)
|
||||
index++
|
||||
|
||||
cfg.connect((leader1 + 4) % servers)
|
||||
cfg.connect((leader1 + 0) % servers)
|
||||
}
|
||||
|
||||
cfg.one(1000, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestPersist3(t *testing.T) {
|
||||
servers := 3
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: partitioned leader and one follower crash, leader restarts ...\n")
|
||||
|
||||
cfg.one(101, 3)
|
||||
|
||||
leader := cfg.checkOneLeader()
|
||||
cfg.disconnect((leader + 2) % servers)
|
||||
|
||||
cfg.one(102, 2)
|
||||
|
||||
cfg.crash1((leader + 0) % servers)
|
||||
cfg.crash1((leader + 1) % servers)
|
||||
cfg.connect((leader + 2) % servers)
|
||||
cfg.start1((leader + 0) % servers)
|
||||
cfg.connect((leader + 0) % servers)
|
||||
|
||||
cfg.one(103, 2)
|
||||
|
||||
cfg.start1((leader + 1) % servers)
|
||||
cfg.connect((leader + 1) % servers)
|
||||
|
||||
cfg.one(104, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
//
|
||||
// Test the scenarios described in Figure 8 of the extended Raft paper. Each
|
||||
// iteration asks a leader, if there is one, to insert a command in the Raft
|
||||
// log. If there is a leader, that leader will fail quickly with a high
|
||||
// probability (perhaps without committing the command), or crash after a while
|
||||
// with low probability (most likey committing the command). If the number of
|
||||
// alive servers isn't enough to form a majority, perhaps start a new server.
|
||||
// The leader in a new term may try to finish replicating log entries that
|
||||
// haven't been committed yet.
|
||||
//
|
||||
func TestFigure8(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, false)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: Figure 8 ...\n")
|
||||
|
||||
cfg.one(rand.Int(), 1)
|
||||
|
||||
nup := servers
|
||||
for iters := 0; iters < 1000; iters++ {
|
||||
leader := -1
|
||||
for i := 0; i < servers; i++ {
|
||||
if cfg.rafts[i] != nil {
|
||||
_, _, ok := cfg.rafts[i].Start(rand.Int())
|
||||
if ok {
|
||||
leader = i
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rand.Int() % 1000) < 100 {
|
||||
ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2)
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
} else {
|
||||
ms := (rand.Int63() % 13)
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
}
|
||||
|
||||
if leader != -1 {
|
||||
cfg.crash1(leader)
|
||||
nup -= 1
|
||||
}
|
||||
|
||||
if nup < 3 {
|
||||
s := rand.Int() % servers
|
||||
if cfg.rafts[s] == nil {
|
||||
cfg.start1(s)
|
||||
cfg.connect(s)
|
||||
nup += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < servers; i++ {
|
||||
if cfg.rafts[i] == nil {
|
||||
cfg.start1(i)
|
||||
cfg.connect(i)
|
||||
}
|
||||
}
|
||||
|
||||
cfg.one(rand.Int(), servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestUnreliableAgree(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, true)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: unreliable agreement ...\n")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for iters := 1; iters < 50; iters++ {
|
||||
for j := 0; j < 4; j++ {
|
||||
wg.Add(1)
|
||||
go func(iters, j int) {
|
||||
defer wg.Done()
|
||||
cfg.one((100*iters)+j, 1)
|
||||
}(iters, j)
|
||||
}
|
||||
cfg.one(iters, 1)
|
||||
}
|
||||
|
||||
cfg.setunreliable(false)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
cfg.one(100, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestFigure8Unreliable(t *testing.T) {
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, true)
|
||||
defer cfg.cleanup()
|
||||
|
||||
fmt.Printf("Test: Figure 8 (unreliable) ...\n")
|
||||
|
||||
cfg.one(rand.Int()%10000, 1)
|
||||
|
||||
nup := servers
|
||||
for iters := 0; iters < 1000; iters++ {
|
||||
if iters == 200 {
|
||||
cfg.setlongreordering(true)
|
||||
}
|
||||
leader := -1
|
||||
for i := 0; i < servers; i++ {
|
||||
_, _, ok := cfg.rafts[i].Start(rand.Int() % 10000)
|
||||
if ok && cfg.connected[i] {
|
||||
leader = i
|
||||
}
|
||||
}
|
||||
|
||||
if (rand.Int() % 1000) < 100 {
|
||||
ms := rand.Int63() % (int64(RaftElectionTimeout/time.Millisecond) / 2)
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
} else {
|
||||
ms := (rand.Int63() % 13)
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
}
|
||||
|
||||
if leader != -1 && (rand.Int()%1000) < int(RaftElectionTimeout/time.Millisecond)/2 {
|
||||
cfg.disconnect(leader)
|
||||
nup -= 1
|
||||
}
|
||||
|
||||
if nup < 3 {
|
||||
s := rand.Int() % servers
|
||||
if cfg.connected[s] == false {
|
||||
cfg.connect(s)
|
||||
nup += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < servers; i++ {
|
||||
if cfg.connected[i] == false {
|
||||
cfg.connect(i)
|
||||
}
|
||||
}
|
||||
|
||||
cfg.one(rand.Int()%10000, servers)
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func internalChurn(t *testing.T, unreliable bool) {
|
||||
|
||||
if unreliable {
|
||||
fmt.Printf("Test: unreliable churn ...\n")
|
||||
} else {
|
||||
fmt.Printf("Test: churn ...\n")
|
||||
}
|
||||
|
||||
servers := 5
|
||||
cfg := make_config(t, servers, unreliable)
|
||||
defer cfg.cleanup()
|
||||
|
||||
stop := int32(0)
|
||||
|
||||
// create concurrent clients
|
||||
cfn := func(me int, ch chan []int) {
|
||||
var ret []int
|
||||
ret = nil
|
||||
defer func() { ch <- ret }()
|
||||
values := []int{}
|
||||
for atomic.LoadInt32(&stop) == 0 {
|
||||
x := rand.Int()
|
||||
index := -1
|
||||
ok := false
|
||||
for i := 0; i < servers; i++ {
|
||||
// try them all, maybe one of them is a leader
|
||||
cfg.mu.Lock()
|
||||
rf := cfg.rafts[i]
|
||||
cfg.mu.Unlock()
|
||||
if rf != nil {
|
||||
index1, _, ok1 := rf.Start(x)
|
||||
if ok1 {
|
||||
ok = ok1
|
||||
index = index1
|
||||
}
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
// maybe leader will commit our value, maybe not.
|
||||
// but don't wait forever.
|
||||
for _, to := range []int{10, 20, 50, 100, 200} {
|
||||
nd, cmd := cfg.nCommitted(index)
|
||||
if nd > 0 {
|
||||
if xx, ok := cmd.(int); ok {
|
||||
if xx == x {
|
||||
values = append(values, x)
|
||||
}
|
||||
} else {
|
||||
cfg.t.Fatalf("wrong command type")
|
||||
}
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(to) * time.Millisecond)
|
||||
}
|
||||
} else {
|
||||
time.Sleep(time.Duration(79+me*17) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
ret = values
|
||||
}
|
||||
|
||||
ncli := 3
|
||||
cha := []chan []int{}
|
||||
for i := 0; i < ncli; i++ {
|
||||
cha = append(cha, make(chan []int))
|
||||
go cfn(i, cha[i])
|
||||
}
|
||||
|
||||
for iters := 0; iters < 20; iters++ {
|
||||
if (rand.Int() % 1000) < 200 {
|
||||
i := rand.Int() % servers
|
||||
cfg.disconnect(i)
|
||||
}
|
||||
|
||||
if (rand.Int() % 1000) < 500 {
|
||||
i := rand.Int() % servers
|
||||
if cfg.rafts[i] == nil {
|
||||
cfg.start1(i)
|
||||
}
|
||||
cfg.connect(i)
|
||||
}
|
||||
|
||||
if (rand.Int() % 1000) < 200 {
|
||||
i := rand.Int() % servers
|
||||
if cfg.rafts[i] != nil {
|
||||
cfg.crash1(i)
|
||||
}
|
||||
}
|
||||
|
||||
// Make crash/restart infrequent enough that the peers can often
|
||||
// keep up, but not so infrequent that everything has settled
|
||||
// down from one change to the next. Pick a value smaller than
|
||||
// the election timeout, but not hugely smaller.
|
||||
time.Sleep((RaftElectionTimeout * 7) / 10)
|
||||
}
|
||||
|
||||
time.Sleep(RaftElectionTimeout)
|
||||
cfg.setunreliable(false)
|
||||
for i := 0; i < servers; i++ {
|
||||
if cfg.rafts[i] == nil {
|
||||
cfg.start1(i)
|
||||
}
|
||||
cfg.connect(i)
|
||||
}
|
||||
|
||||
atomic.StoreInt32(&stop, 1)
|
||||
|
||||
values := []int{}
|
||||
for i := 0; i < ncli; i++ {
|
||||
vv := <-cha[i]
|
||||
if vv == nil {
|
||||
t.Fatal("client failed")
|
||||
}
|
||||
values = append(values, vv...)
|
||||
}
|
||||
|
||||
time.Sleep(RaftElectionTimeout)
|
||||
|
||||
lastIndex := cfg.one(rand.Int(), servers)
|
||||
|
||||
really := make([]int, lastIndex+1)
|
||||
for index := 1; index <= lastIndex; index++ {
|
||||
v := cfg.wait(index, servers, -1)
|
||||
if vi, ok := v.(int); ok {
|
||||
really = append(really, vi)
|
||||
} else {
|
||||
t.Fatalf("not an int")
|
||||
}
|
||||
}
|
||||
|
||||
for _, v1 := range values {
|
||||
ok := false
|
||||
for _, v2 := range really {
|
||||
if v1 == v2 {
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if ok == false {
|
||||
cfg.t.Fatalf("didn't find a value")
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf(" ... Passed\n")
|
||||
}
|
||||
|
||||
func TestReliableChurn(t *testing.T) {
|
||||
internalChurn(t, false)
|
||||
}
|
||||
|
||||
func TestUnreliableChurn(t *testing.T) {
|
||||
internalChurn(t, true)
|
||||
}
|
13
src/raft/util.go
Normal file
13
src/raft/util.go
Normal file
@ -0,0 +1,13 @@
|
||||
package raft
|
||||
|
||||
import "log"
|
||||
|
||||
// Debugging
|
||||
const Debug = 0
|
||||
|
||||
func DPrintf(format string, a ...interface{}) (n int, err error) {
|
||||
if Debug > 0 {
|
||||
log.Printf(format, a...)
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Reference in New Issue
Block a user