raft/src/diskv/server.go
2018-04-10 20:53:13 +08:00

275 lines
6.3 KiB
Go

package diskv
import "net"
import "fmt"
import "net/rpc"
import "log"
import "time"
import "paxos"
import "sync"
import "sync/atomic"
import "os"
import "syscall"
import "encoding/gob"
import "encoding/base32"
import "math/rand"
import "shardmaster"
import "io/ioutil"
import "strconv"
const Debug = 0
func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug > 0 {
log.Printf(format, a...)
}
return
}
type Op struct {
// Your definitions here.
}
type DisKV struct {
mu sync.Mutex
l net.Listener
me int
dead int32 // for testing
unreliable int32 // for testing
sm *shardmaster.Clerk
px *paxos.Paxos
dir string // each replica has its own data directory
gid int64 // my replica group ID
// Your definitions here.
}
//
// these are handy functions that might be useful
// for reading and writing key/value files, and
// for reading and writing entire shards.
// puts the key files for each shard in a separate
// directory.
//
func (kv *DisKV) shardDir(shard int) string {
d := kv.dir + "/shard-" + strconv.Itoa(shard) + "/"
// create directory if needed.
_, err := os.Stat(d)
if err != nil {
if err := os.Mkdir(d, 0777); err != nil {
log.Fatalf("Mkdir(%v): %v", d, err)
}
}
return d
}
// cannot use keys in file names directly, since
// they might contain troublesome characters like /.
// base32-encode the key to get a file name.
// base32 rather than base64 b/c Mac has case-insensitive
// file names.
func (kv *DisKV) encodeKey(key string) string {
return base32.StdEncoding.EncodeToString([]byte(key))
}
func (kv *DisKV) decodeKey(filename string) (string, error) {
key, err := base32.StdEncoding.DecodeString(filename)
return string(key), err
}
// read the content of a key's file.
func (kv *DisKV) fileGet(shard int, key string) (string, error) {
fullname := kv.shardDir(shard) + "/key-" + kv.encodeKey(key)
content, err := ioutil.ReadFile(fullname)
return string(content), err
}
// replace the content of a key's file.
// uses rename() to make the replacement atomic with
// respect to crashes.
func (kv *DisKV) filePut(shard int, key string, content string) error {
fullname := kv.shardDir(shard) + "/key-" + kv.encodeKey(key)
tempname := kv.shardDir(shard) + "/temp-" + kv.encodeKey(key)
if err := ioutil.WriteFile(tempname, []byte(content), 0666); err != nil {
return err
}
if err := os.Rename(tempname, fullname); err != nil {
return err
}
return nil
}
// return content of every key file in a given shard.
func (kv *DisKV) fileReadShard(shard int) map[string]string {
m := map[string]string{}
d := kv.shardDir(shard)
files, err := ioutil.ReadDir(d)
if err != nil {
log.Fatalf("fileReadShard could not read %v: %v", d, err)
}
for _, fi := range files {
n1 := fi.Name()
if n1[0:4] == "key-" {
key, err := kv.decodeKey(n1[4:])
if err != nil {
log.Fatalf("fileReadShard bad file name %v: %v", n1, err)
}
content, err := kv.fileGet(shard, key)
if err != nil {
log.Fatalf("fileReadShard fileGet failed for %v: %v", key, err)
}
m[key] = content
}
}
return m
}
// replace an entire shard directory.
func (kv *DisKV) fileReplaceShard(shard int, m map[string]string) {
d := kv.shardDir(shard)
os.RemoveAll(d) // remove all existing files from shard.
for k, v := range m {
kv.filePut(shard, k, v)
}
}
func (kv *DisKV) Get(args *GetArgs, reply *GetReply) error {
// Your code here.
return nil
}
// RPC handler for client Put and Append requests
func (kv *DisKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error {
// Your code here.
return nil
}
//
// Ask the shardmaster if there's a new configuration;
// if so, re-configure.
//
func (kv *DisKV) tick() {
// Your code here.
}
// tell the server to shut itself down.
// please don't change these two functions.
func (kv *DisKV) kill() {
atomic.StoreInt32(&kv.dead, 1)
kv.l.Close()
kv.px.Kill()
}
// call this to find out if the server is dead.
func (kv *DisKV) isdead() bool {
return atomic.LoadInt32(&kv.dead) != 0
}
// please do not change these two functions.
func (kv *DisKV) Setunreliable(what bool) {
if what {
atomic.StoreInt32(&kv.unreliable, 1)
} else {
atomic.StoreInt32(&kv.unreliable, 0)
}
}
func (kv *DisKV) isunreliable() bool {
return atomic.LoadInt32(&kv.unreliable) != 0
}
//
// Start a shardkv server.
// gid is the ID of the server's replica group.
// shardmasters[] contains the ports of the
// servers that implement the shardmaster.
// servers[] contains the ports of the servers
// in this replica group.
// Me is the index of this server in servers[].
// dir is the directory name under which this
// replica should store all its files.
// each replica is passed a different directory.
// restart is false the very first time this server
// is started, and true to indicate a re-start
// after a crash or after a crash with disk loss.
//
func StartServer(gid int64, shardmasters []string,
servers []string, me int, dir string, restart bool) *DisKV {
kv := new(DisKV)
kv.me = me
kv.gid = gid
kv.sm = shardmaster.MakeClerk(shardmasters)
kv.dir = dir
// Your initialization code here.
// Don't call Join().
// log.SetOutput(ioutil.Discard)
gob.Register(Op{})
rpcs := rpc.NewServer()
rpcs.Register(kv)
kv.px = paxos.Make(servers, me, rpcs)
// log.SetOutput(os.Stdout)
os.Remove(servers[me])
l, e := net.Listen("unix", servers[me])
if e != nil {
log.Fatal("listen error: ", e)
}
kv.l = l
// please do not change any of the following code,
// or do anything to subvert it.
go func() {
for kv.isdead() == false {
conn, err := kv.l.Accept()
if err == nil && kv.isdead() == false {
if kv.isunreliable() && (rand.Int63()%1000) < 100 {
// discard the request.
conn.Close()
} else if kv.isunreliable() && (rand.Int63()%1000) < 200 {
// process the request but force discard of reply.
c1 := conn.(*net.UnixConn)
f, _ := c1.File()
err := syscall.Shutdown(int(f.Fd()), syscall.SHUT_WR)
if err != nil {
fmt.Printf("shutdown: %v\n", err)
}
go rpcs.ServeConn(conn)
} else {
go rpcs.ServeConn(conn)
}
} else if err == nil {
conn.Close()
}
if err != nil && kv.isdead() == false {
fmt.Printf("DisKV(%v) accept: %v\n", me, err.Error())
kv.kill()
}
}
}()
go func() {
for kv.isdead() == false {
kv.tick()
time.Sleep(250 * time.Millisecond)
}
}()
return kv
}