275 lines
6.3 KiB
Go
275 lines
6.3 KiB
Go
package diskv
|
|
|
|
import "net"
|
|
import "fmt"
|
|
import "net/rpc"
|
|
import "log"
|
|
import "time"
|
|
import "paxos"
|
|
import "sync"
|
|
import "sync/atomic"
|
|
import "os"
|
|
import "syscall"
|
|
import "encoding/gob"
|
|
import "encoding/base32"
|
|
import "math/rand"
|
|
import "shardmaster"
|
|
import "io/ioutil"
|
|
import "strconv"
|
|
|
|
|
|
const Debug = 0
|
|
|
|
func DPrintf(format string, a ...interface{}) (n int, err error) {
|
|
if Debug > 0 {
|
|
log.Printf(format, a...)
|
|
}
|
|
return
|
|
}
|
|
|
|
|
|
type Op struct {
|
|
// Your definitions here.
|
|
}
|
|
|
|
|
|
type DisKV struct {
|
|
mu sync.Mutex
|
|
l net.Listener
|
|
me int
|
|
dead int32 // for testing
|
|
unreliable int32 // for testing
|
|
sm *shardmaster.Clerk
|
|
px *paxos.Paxos
|
|
dir string // each replica has its own data directory
|
|
|
|
gid int64 // my replica group ID
|
|
|
|
// Your definitions here.
|
|
}
|
|
|
|
//
|
|
// these are handy functions that might be useful
|
|
// for reading and writing key/value files, and
|
|
// for reading and writing entire shards.
|
|
// puts the key files for each shard in a separate
|
|
// directory.
|
|
//
|
|
|
|
func (kv *DisKV) shardDir(shard int) string {
|
|
d := kv.dir + "/shard-" + strconv.Itoa(shard) + "/"
|
|
// create directory if needed.
|
|
_, err := os.Stat(d)
|
|
if err != nil {
|
|
if err := os.Mkdir(d, 0777); err != nil {
|
|
log.Fatalf("Mkdir(%v): %v", d, err)
|
|
}
|
|
}
|
|
return d
|
|
}
|
|
|
|
// cannot use keys in file names directly, since
|
|
// they might contain troublesome characters like /.
|
|
// base32-encode the key to get a file name.
|
|
// base32 rather than base64 b/c Mac has case-insensitive
|
|
// file names.
|
|
func (kv *DisKV) encodeKey(key string) string {
|
|
return base32.StdEncoding.EncodeToString([]byte(key))
|
|
}
|
|
|
|
func (kv *DisKV) decodeKey(filename string) (string, error) {
|
|
key, err := base32.StdEncoding.DecodeString(filename)
|
|
return string(key), err
|
|
}
|
|
|
|
// read the content of a key's file.
|
|
func (kv *DisKV) fileGet(shard int, key string) (string, error) {
|
|
fullname := kv.shardDir(shard) + "/key-" + kv.encodeKey(key)
|
|
content, err := ioutil.ReadFile(fullname)
|
|
return string(content), err
|
|
}
|
|
|
|
// replace the content of a key's file.
|
|
// uses rename() to make the replacement atomic with
|
|
// respect to crashes.
|
|
func (kv *DisKV) filePut(shard int, key string, content string) error {
|
|
fullname := kv.shardDir(shard) + "/key-" + kv.encodeKey(key)
|
|
tempname := kv.shardDir(shard) + "/temp-" + kv.encodeKey(key)
|
|
if err := ioutil.WriteFile(tempname, []byte(content), 0666); err != nil {
|
|
return err
|
|
}
|
|
if err := os.Rename(tempname, fullname); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// return content of every key file in a given shard.
|
|
func (kv *DisKV) fileReadShard(shard int) map[string]string {
|
|
m := map[string]string{}
|
|
d := kv.shardDir(shard)
|
|
files, err := ioutil.ReadDir(d)
|
|
if err != nil {
|
|
log.Fatalf("fileReadShard could not read %v: %v", d, err)
|
|
}
|
|
for _, fi := range files {
|
|
n1 := fi.Name()
|
|
if n1[0:4] == "key-" {
|
|
key, err := kv.decodeKey(n1[4:])
|
|
if err != nil {
|
|
log.Fatalf("fileReadShard bad file name %v: %v", n1, err)
|
|
}
|
|
content, err := kv.fileGet(shard, key)
|
|
if err != nil {
|
|
log.Fatalf("fileReadShard fileGet failed for %v: %v", key, err)
|
|
}
|
|
m[key] = content
|
|
}
|
|
}
|
|
return m
|
|
}
|
|
|
|
// replace an entire shard directory.
|
|
func (kv *DisKV) fileReplaceShard(shard int, m map[string]string) {
|
|
d := kv.shardDir(shard)
|
|
os.RemoveAll(d) // remove all existing files from shard.
|
|
for k, v := range m {
|
|
kv.filePut(shard, k, v)
|
|
}
|
|
}
|
|
|
|
|
|
func (kv *DisKV) Get(args *GetArgs, reply *GetReply) error {
|
|
// Your code here.
|
|
return nil
|
|
}
|
|
|
|
// RPC handler for client Put and Append requests
|
|
func (kv *DisKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) error {
|
|
// Your code here.
|
|
return nil
|
|
}
|
|
|
|
//
|
|
// Ask the shardmaster if there's a new configuration;
|
|
// if so, re-configure.
|
|
//
|
|
func (kv *DisKV) tick() {
|
|
// Your code here.
|
|
}
|
|
|
|
// tell the server to shut itself down.
|
|
// please don't change these two functions.
|
|
func (kv *DisKV) kill() {
|
|
atomic.StoreInt32(&kv.dead, 1)
|
|
kv.l.Close()
|
|
kv.px.Kill()
|
|
}
|
|
|
|
// call this to find out if the server is dead.
|
|
func (kv *DisKV) isdead() bool {
|
|
return atomic.LoadInt32(&kv.dead) != 0
|
|
}
|
|
|
|
// please do not change these two functions.
|
|
func (kv *DisKV) Setunreliable(what bool) {
|
|
if what {
|
|
atomic.StoreInt32(&kv.unreliable, 1)
|
|
} else {
|
|
atomic.StoreInt32(&kv.unreliable, 0)
|
|
}
|
|
}
|
|
|
|
func (kv *DisKV) isunreliable() bool {
|
|
return atomic.LoadInt32(&kv.unreliable) != 0
|
|
}
|
|
|
|
//
|
|
// Start a shardkv server.
|
|
// gid is the ID of the server's replica group.
|
|
// shardmasters[] contains the ports of the
|
|
// servers that implement the shardmaster.
|
|
// servers[] contains the ports of the servers
|
|
// in this replica group.
|
|
// Me is the index of this server in servers[].
|
|
// dir is the directory name under which this
|
|
// replica should store all its files.
|
|
// each replica is passed a different directory.
|
|
// restart is false the very first time this server
|
|
// is started, and true to indicate a re-start
|
|
// after a crash or after a crash with disk loss.
|
|
//
|
|
func StartServer(gid int64, shardmasters []string,
|
|
servers []string, me int, dir string, restart bool) *DisKV {
|
|
|
|
kv := new(DisKV)
|
|
kv.me = me
|
|
kv.gid = gid
|
|
kv.sm = shardmaster.MakeClerk(shardmasters)
|
|
kv.dir = dir
|
|
|
|
// Your initialization code here.
|
|
// Don't call Join().
|
|
|
|
// log.SetOutput(ioutil.Discard)
|
|
|
|
gob.Register(Op{})
|
|
|
|
rpcs := rpc.NewServer()
|
|
rpcs.Register(kv)
|
|
|
|
kv.px = paxos.Make(servers, me, rpcs)
|
|
|
|
// log.SetOutput(os.Stdout)
|
|
|
|
|
|
|
|
os.Remove(servers[me])
|
|
l, e := net.Listen("unix", servers[me])
|
|
if e != nil {
|
|
log.Fatal("listen error: ", e)
|
|
}
|
|
kv.l = l
|
|
|
|
// please do not change any of the following code,
|
|
// or do anything to subvert it.
|
|
|
|
go func() {
|
|
for kv.isdead() == false {
|
|
conn, err := kv.l.Accept()
|
|
if err == nil && kv.isdead() == false {
|
|
if kv.isunreliable() && (rand.Int63()%1000) < 100 {
|
|
// discard the request.
|
|
conn.Close()
|
|
} else if kv.isunreliable() && (rand.Int63()%1000) < 200 {
|
|
// process the request but force discard of reply.
|
|
c1 := conn.(*net.UnixConn)
|
|
f, _ := c1.File()
|
|
err := syscall.Shutdown(int(f.Fd()), syscall.SHUT_WR)
|
|
if err != nil {
|
|
fmt.Printf("shutdown: %v\n", err)
|
|
}
|
|
go rpcs.ServeConn(conn)
|
|
} else {
|
|
go rpcs.ServeConn(conn)
|
|
}
|
|
} else if err == nil {
|
|
conn.Close()
|
|
}
|
|
if err != nil && kv.isdead() == false {
|
|
fmt.Printf("DisKV(%v) accept: %v\n", me, err.Error())
|
|
kv.kill()
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
for kv.isdead() == false {
|
|
kv.tick()
|
|
time.Sleep(250 * time.Millisecond)
|
|
}
|
|
}()
|
|
|
|
return kv
|
|
}
|