From 5dd02053701dc46611990711bc2697e1284dcd6e Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 11 Apr 2018 10:50:11 +0800 Subject: [PATCH] add docs --- src/kvraft/kvraft.md | 283 +++++++++++++++++++++++++ src/raft/raft-1.md | 337 ++++++++++++++++++++++++++++++ src/raft/raft-2.md | 477 +++++++++++++++++++++++++++++++++++++++++++ src/rpc/README.md | 35 ++++ src/rpc/ntp.md | 411 +++++++++++++++++++++++++++++++++++++ src/rpc/ntp.py | 52 +++++ 6 files changed, 1595 insertions(+) create mode 100644 src/kvraft/kvraft.md create mode 100644 src/raft/raft-1.md create mode 100644 src/raft/raft-2.md create mode 100644 src/rpc/README.md create mode 100644 src/rpc/ntp.md create mode 100644 src/rpc/ntp.py diff --git a/src/kvraft/kvraft.md b/src/kvraft/kvraft.md new file mode 100644 index 0000000..7f0e8dd --- /dev/null +++ b/src/kvraft/kvraft.md @@ -0,0 +1,283 @@ +

KVRaft: Key-Value Storage Service based on Raft

+
Shuai Liu, MG1733039
+
+ + +# Abstract +RDBMS has long been an oracle in the field of database, but actually there are weakness for RDBMS to fit some situations where simple key value service is more appropriate. + +In this experiment, we build up a key-value storage service named KVRaft based on distributed consensus framework Raft and fully validate the robust and perfomance of KVRaft including in extreme situations. + +## keywords +Key-Value database, Distributed, NoSQL + +# Introduction + +RDBMS has been the only choice for a long time in the field of database, however, RDMBS can be too redundant in many cases like cache for example. Thus, a new kind of storage service called key-value storage occured. By removing unecessary parts and re-design the structure, a key-value storage service can achieve much higher performance than RDBMS. + +In this experiment, we try to implement a key-value storage service named KVRaft based on our previous work Raft. The requirements we want to get from KVRaft are: + + - Each server coordinates with each other only by Raft logs + - Even if a minority servers fail, the system can still work fine + - The result has to be completely correct + +The basic idea of KVRaft is, logging the changes into the Raft logs and rebuilding the state machine from the logs when a client makes a query. + +# Model + +The main architecture of KVRaft looks like this + +KVRaft Architecture + +Each KVRaft server contains a Raft server in its process, and the KVRaft servers don't coordinate with each other directly, they communicate with others only through the Raft logs. When a client wants to make a request, it tries the KVRaft servers one by one until one of the servers accepts the request. When receiving a request from a client, the KVRaft server firstly check whether the related Raft server is the leader in the Raft cluster currently, and accepts the request if true. That server then asks the Raft server to log the change and waits until the change is successfully distributed to the Raft servers. + +There are three kinds of requests in the KVRaft storage service, _Get_, _Put_ and _Append_, and the requests are made by RPC. A _Put k v_ request sets the value of key _k_ to _v_, an _Append k v_ request sets the value of _k_ to be _Vold + v_, and a _Get k_ request queries the value of key _k_. If an _Append_ request tries to update a key which is not exist, it acts like a _Put_ request. + +# Fault Tolerance + +In a distributed system, failure is considered usual, so the KVRaft system should handle many kinds of failures. + +## Retry +There are many cases where requests cannot be processed such as network disconnection, network partition, non-leader. In these situations, if the client doesn't receive a posotive response from the KVRaft server, it then goes on tring the next server until the request is accepted. A KVRaft server will wait for some time before it receives a success message from the Raft server. If timeouts, the KVRaft server will return error to the client so that the client knows the request fails and will try next server. + +By retrying, a request can be accpted without infinite waiting even when a minority of serves fail, and the _timeout_ can be set to a bit longer than than _revote timeout_. + +## Identity the real leader +Due to the weakness of Raft as I mentioned in the previous works, a Raft server cannot know it has already lost the leadership, thus it will still act like the leader and accept requests. In KVRaft, if a KVRaft server receives a query request, it doesn't know actually it is not the real leader currently, neither the Raft server. In this situation, the KVRaft server will accept the request and may generate a wrong value as a result. + +Consider that _Put_ and _Append_ won't encounter this error because they will be loged into the Raft logs. If a Raft server is not the real leader, it cannot commit the log successfully. So the easist way to identity whether the Raft server is the real server, we can log the _Get_ request into the Raft logs as well, so that we can say the Raft server must be the leader if that server successfully commits the _Get_. + +## Handle duplicate +When a failure encountered, the client will retry next server until the request is accepted. Let's think about a situation where a leader accepts the request but loses the leadership before commiting it, so the request will be sent to the next leader and that leader will log that request and commit it, if then the previous leader is re-voted as the leader it will commit the same request. This would cause duplicate log entries in the Raft logs and it cannot be avoided by the Raft servers. + +One way to handle that is by modifing the request and append the last request the client makes, so that in the progress of rebuild, the KVRaft server can ignore duplicate requests by the orders. However, there is a problem. If the first request of a client fails and it raises duplicate log entries, it is unable to find them as there is no requests ahead. Also, two clients may make same requests and thus would cause potential errors because it is hard to determine the origin sequence. + +To prevent the first situation, we can make a _Get_ request at first for each client. To fully solve the duplicate problem clearlly, we use the UUID to identity each request, when rebuilding the logs, just ignore the requests which has already been processed. + +The UUID consists of ClientID and IncrementialID of each client. The IncrementialID starts with 1 and increments by one for each request. To generate a universal ClientID, we need another system which would arouse other problems. But fortunatelly, in this experiment, we can get the index of the log where the request would appear. The index is universal unique and we can use the index as the ClientID. Every time a new client is created, it first check if it is the first time to make a request and if so it would make a _Get_ request to receive the ClientID. The only addon for this change is another _Get_ request for each client and the cost is acceptable. + +# Implementation + +## Modify Raft +In our previous experiments, we assume the _command_ is an integer, so we have to update the related parts to make the Raft servers support non-integer commands by modifing the interface. + +```golang +logs []map[string]interface{} +``` + +The other modification is to expose the logs to the KVRaft server by adding a function _getState2_ which would return the logs and whether the server is the leader. + +```golang +func (rf *Raft) GetState2() (bool, []map[string]interface{}, int) { + return rf.role==2, rf.logs, rf.commitIndex +} +``` + +## Define RPC +There are three kinds of requests in KVRaft, _Get_, _Put_ and _Append_. we can combine _Put_ and _Append_ so there would be two RPCs. + +#### _PutAppend_ +``` +type PutAppendArgs struct { + Key string + Value string + Opt string + UUID string +} +``` + +```golang +type PutAppendReply struct { + WrongLeader bool +} +``` +When the server successfully submits the request, the _WrongLeader_ is set to false meaning the request is loged, otherwise false. + +#### _Get_ +```golang +type GetArgs struct { + Key string + UUID string +} +``` + +```golang +type GetReply struct { + WrongLeader bool + Value string + ID int +} +``` + +The _ID_ actually means where the request is in the Raft logs and will be used as _ClientID_ by the UUID part. + +## Client +The client firstly checks if a _Get_ request is made previously and if not make one. Then it retries until a server responds positively. + +``` +func (ck *Clerk) PutAppend(key string, value string, opt string) { + if ck.id == 0 { + ck.Get("nobody") + } + success := false + for !success{ + for i:=0;i 30 { + break + } + if kv.currentIndex == index { + _, logs, commitIndex := kv.rf.GetState2() + var db map[string]string + db = make(map[string]string) + var UUIDs map[string]int + UUIDs = make(map[string]int) + for i:=0;i 0 { + continue + } + UUIDs[op.UUID] += 1 + switch op.Opt { + case "Put": + db[op.Key] = op.Value + break + case "Append": + db[op.Key] = db[op.Key] + op.Value + break + } + } + reply.WrongLeader = false + reply.Value = db[args.Key] + reply.ID = index + break + } + time.Sleep(time.Millisecond * 10) + } + } +} +``` + +# Optimization + +## Remember the leader +In reality, the Raft cluster is stable in most of the time which means the leader changes rarely. In our design, the client tries from the first server every time, but actually this is a waste of time. The leader should be remembered and tried first as the next request, only when the remembered leader loses its leadership, the client has to try another server. + +``` +try remembered leader x +if success: + return +else: + for i in range(1, n+1): + try server (leader + i) % n + if success: + return +``` + +The servers are logically organized as a ring by the clients, this design can make sure every server would be tried at most once per request when the Raft cluster is still in service. + +## Minimize log size +As time passes, the size Raft logs grows and would make it more time consuming to rebuild the state machine. If we look into the Raft logs, we can see many redundent log entries since we just want to rebuild the state machine, not the history of changing. We can rewrite the log entries periodically to minimize the size and speed up the rebuild process. + +For example, if we have a slice as follows: + +``` +put k m +append k n +put k o +append k p +``` + +we can rewrite them to: +``` +put k op +``` + +One way to do this in hot can be starting another thread to copy the logs and rewiting them asynchronously and then replacing the log entries using snapshot in Raft. Another thing to take care is that the way we generate ClientID, which is the index of log entry. By changing list to map can solve this problem. + +## Ignore other keys +Every time the KVRaft server receives a query, it has to rebuild the state machine and the state machine cannot be cached or reused. Actually, the query request only needs one key and the others are unnecessary, so we can focus only on the required key and ignore other keys, this method can minimize the size of UUID set and speed up the rebuilding. + +# Validation + +## Robust + +There are a total of 12 test cases for KVRaft, covering many kinds of extreme situations _TestBasic_, _TestConcurrent_, _TestUnreliable_, _TestUnreliableOneKey_, _TestOnePartition_, _TestManyPartitionsOneClient_, _TestManyPartitionsManyClients_, _TestPersistOneClient_, _TestPersistConcurrent_, _TestPersistConcurrentUnreliable_, _TestPersistPartition_ and _TestPersistPartitionUnreliable_. + +Run the tests many times and the result shows that our system passes all the test cases successfully. + +validation + +One of the debug logs shows that KVRaft can successfully handle duplicate requests. + +``` +logs... +0 => {Get nobody 0_0} +1 => {Get nobody 0_0} +2 => {Get nobody 0_0} +3 => {Get nobody 0_0} +4 => {Get nobody 0_0} +5 => {Put 0 1_1} +6 => {Put 1 2_1} +7 => {Put 2 3_1} +8 => {Put 4 5_1} +9 => {Put 3 4_1} +10 => {Put 0 1_1} +skip {Put 0 1_1} +11 => {Append 1 x 1 0 y 2_2} +12 => {Append 2 x 2 0 y 3_2} +13 => {Get 4 5_2} +logs end... +``` + +## Delay + +We can see from the test cases that each request normally takes several milliseconds to be processed, and only in some extreme situations such partition it will take longer. Overall, KVRaft work fine as a distributed key-value storage service. + +# Conclusion +This experiment builds a key-value storage service KVRaft based on the distributed consensus gramework Raft and the result shows that KVRaft is realiable and robust. The average delay of requests can be limited within several milliseconds and most of the requests in extreme states can be processed within seconds. + +_* The full and up-to-date code is hosted at https://github.com/newnius/NJU-DisSys-2017_ + +# Reference + +[1] Ongaro D, Ousterhout J K. In search of an understandable consensus algorithm[C]//USENIX Annual Technical Conference. 2014: 305-319. + +[2] [Raft](http://thesecretlivesofdata.com/raft/) + +[3] [Go maps in action](https://blog.golang.org/go-maps-in-action) + +[4] [Go by Example: Non-Blocking Channel Operations](https://gobyexample.com/non-blocking-channel-operations) + +[5] [Go Channel 详解](http://colobu.com/2016/04/14/Golang-Channels/) + +[6] [Universally unique identifier(UUID)](https://en.wikipedia.org/wiki/Universally_unique_identifier) + + + + diff --git a/src/raft/raft-1.md b/src/raft/raft-1.md new file mode 100644 index 0000000..6c6d987 --- /dev/null +++ b/src/raft/raft-1.md @@ -0,0 +1,337 @@ +

Raft Implementation

+
Shuai Liu, MG1733039
+
+ +# Abstract +In this big data time, high performance distributed systems are required to process the large volumn of data. However, it is not easy to organize plenty of nodes. One of the significant problems is distributed consensus, which means every node in the cluster will eventually reach a consensus without any conflict. + +Raft is a distributed consensus algorithm which has been proved workable. This expriment mainly focus on designing and implementing leader election described in rart algorithm. + +## keywords +Distributed Consensus, Leader Election, Log Replication + + +# Introduction + +Before Raft, (multi-)Paxos has been treated as an industry standard for a long time. However, even with the help of Paxos, we still find it hard to build up a reliable distributed system. + +Just as the comment from Chubby implementers: + +> There are significant gaps between the description of the Paxos algorithm and the needs of a real-world system.... the final system will be based on an unproven protocol. + +Paxos is rather difficult to implement mainly because it is not easy to understand for those who are not mathematicians. + +Then Raft came out, which has a good understandability. Compared with Paxos, there are smaller state space achieved by reducing states. Also, Raft decomposes the problem into leader election, log replication, safety and membership changes, instead of treating them as a total of mess. + +To further understand distributed consensus, this expriment tries to implement the first section _leader election_ in Raft and leave the rest parts in the following expriments. + +# Design & Realization +The language we use in this expriment is Go 1.9. + +## Structure +There are some states such as current term, logs, role of node that have to be stored and shared across the threads, so we design a structure called _Raft_. Among the variables, _currentTerm_, _votedFor_ and _logs_ are required to be persisted while the rest are volatile. + +```go +type Raft struct { + mu sync.Mutex + peers []*labrpc.ClientEnd + persister *Persister + me int // index into peers[] + + currentTerm int + votedFor int + logs []map[string]int + + commitIndex int + lastApplied int + + nextIndex []int + matchIndex []int + + role int //0-follower, 1-candidate, 2-leader + electionTimeout time.Duration +} +``` + +## Initialization & main loop + +When a node starts up, it firstly initializes the Raft object, generates election timeout and heart beat timeout randomlly. Then, it reads persisted data which are stored before last crash. After initailazation, it comes to the infinite main loop. In the main loop, the node sleeps for a certain time, when it wakes up, it checks whether he is the leader or the leader has connected with him not long ago (within _heartBeatTimeout_). If not, it increases the _currentTerm_ and starts a new election in another new thread. + +There are two things to mention. The first is random _electionTimeout_. Same _electionTimeout_ may cause infinite elections if each node starts up at nearly the same time. Different timeout can help reduce the conflicts. To further reduce the conflicts, the node will re-generate _electionTimeout_ randomlly before an election for some nodes may have same _electionTimeout_ and may cause infinite elections especially in small clusters. The other is fake sleep. There is no easy approaches to extend the sleep time when the thread is in sleep. So the node will wake up earlier than expected. To emulate the delay, when the node wakes up, we will compare the time last heart beat arrived with current time, if the duration is smaller than _electionTimeout_, we make the node resume sleeping for the duration. + +```go + /* initialization */ + rf := &Raft{} + rf.peers = peers + rf.persister = persister + rf.me = me + rf.electionTimeout = time.Duration(rand.Intn(100) + 100) * time.Millisecond + rf.heartBeatTimeout = time.Duration(rand.Intn(50) + 50) * time.Millisecond + rf.counter = 0 + rf.lastHeart = time.Now() + rf.heartBeatTicker = time.NewTicker(rf.heartBeatTimeout) + rf.role = 0 //start in follower state + rf.commitIndex = 0 + rf.lastApplied = 0 + // initialize from state persisted before a crash + rf.readPersist(persister.ReadRaftState()) + + go func() { //main loop + for ;;{ + /* wait timeout */ + time.Sleep(rf.electionTimeout - time.Since(rf.lastHeart)) + + if rf.role != 2 && time.Since(rf.lastHeart) >= rf.electionTimeout { + rf.mu.Lock() + rf.electionTimeout = time.Duration(rand.Intn(100) + 100) * time.Millisecond + rf.currentTerm ++ + rf.votedFor = rf.me + rf.role = 1 + rf.persist() + rf.mu.Unlock() + rf.doVote() + } + /* sleep at most electionTimeout duration */ + if time.Since(rf.lastHeart) >= rf.electionTimeout { + rf.lastHeart = time.Now() + } + } + }() + return rf +} +``` + +main loop + +## Election +In the election, a node will exchange messages with other nodes. Two typical are request and response message of vote request. The term will keep until the leader crashed and another election are be made. + +```go +type RequestVoteArgs struct { + Term int + CandidateId int + LastLogIndex int + LastLogTerm int +} +``` + +```go +type RequestVoteReply struct { + Term int + VoteGranted bool +} +``` + +The election performs as follows. The node who wants to be the leader firstly switch to candidate state and send a _RequestVote_ call to other nodes, attaching the term and logd infomation. The called node will compare the _Term_ with its _currentTerm_, it _Term_ is smaller, it simply refuse the request. Otherwise, the node will check whether if the logs of candidate is at least up to date with its _logs_. If all pass, it grants the request and switch to follower state and set _votedFor_ to _candidateId_. Whatever the result is, if higher _Term_ detected, the node will update _currentTerm_ to _Term_ and switch to follower state. In this expriment, there will be no logs appended, so the log checks is skipped. + +When a majority of nodes agree the request, it switches to leader state and starts to send heart beat message to each node periodically until it is no longer a leader. If the heart beat reply or vote request reply reports a higher term, it will switch to follower state whatever the state is. + +```go +func (rf *Raft) doVote() { + rf.mu.Lock() + defer rf.mu.Unlock() + + var agreed int64 = 1 + index := rf.commitIndex + term := 0 + if index != 0 { + term = rf.logs[index - 1]["term"] + } + for i:=0;i< len(rf.peers);i++{ + if i != rf.me { + go func(peer int, currTerm int, index int, term int) { + args := RequestVoteArgs{Term: currTerm, CandidateId:rf.me, LastLogIndex:index, LastLogTerm:term} + reply := RequestVoteReply{} + ok := rf.sendRequestVote(peer, args, &reply) + rf.mu.Lock() + if ok && args.Term == rf.currentTerm && rf.role == 1{ + atomic.AddInt64(&agreed, 1) + if (int(agreed) * 2 > len(rf.peers)) { + rf.role = 2 + /* persist state */ + rf.persist() + for i:=0;i rf.currentTerm { + rf.mu.Lock() + rf.currentTerm = reply.Term + rf.role = 0 + rf.votedFor = -1 + rf.persist() + rf.mu.Unlock() + } + return ok && reply.VoteGranted +} +``` + +```go +func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + if args.Term < rf.currentTerm { + reply.VoteGranted = false + } else { + /* whatever, if higher term found, switch to follower */ + if args.Term > rf.currentTerm { + rf.role = 0 + rf.votedFor = -1 + rf.currentTerm = args.Term + } + + /* check log first, there is no requests in this expriment, so skip the details */ + if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) { + reply.VoteGranted = true + rf.role = 0 + rf.votedFor = args.CandidateId + rf.lastHeart = time.Now() + } else { + reply.VoteGranted = false + } + } + rf.persist() + reply.Term = rf.currentTerm +} +``` + +## Heart beat +The heart beat message can be empty, but for conpatible with following expriments, we set several variables in the request and response messages. As mentioned above, the the reply message reports a higher _Term_, the leader will immediatelly stop the heart beat and switch to follower state. When a node receives a heart beat message, it will update _lastHeart_ to _time.Now()_ to delay wake up. + +```go +type AppendEntriesArgs struct { + Term int + LeaderId int + PrevLogIndex int + PrevLogTerm int + Entries []map[string]int + LeaderCommit int +} +``` + +```go +type AppendEntriesReply struct { + Term int + Success bool +} +``` + +```go +func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + if args.Term < rf.currentTerm { + reply.Success = false + } else if args.Term > rf.currentTerm { + reply.Success = true + rf.currentTerm = args.Term + rf.role = 0 + rf.votedFor = args.LeaderId + } else { + reply.Success = true + } + if reply.Success { + rf.lastHeart = time.Now() + } + reply.Term = rf.currentTerm +} +``` + +```go +func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool { + ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) + if reply.Term > rf.currentTerm { + rf.mu.Lock() + rf.currentTerm = reply.Term + rf.role = 0 + rf.votedFor = -1 + rf.persist() + rf.mu.Unlock() + } + return ok && reply.Success +} +``` + +```go +func (rf *Raft) doSubmit() { + /* ensure only one thread is running */ + if atomic.AddInt64(&rf.counter, 1) > 1 { + atomic.AddInt64(&rf.counter, -1) + return + } + for range rf.heartBeatTicker.C { + if rf.role != 2 { + break + } + for i:=0;i< len(rf.peers);i++{ + if i != rf.me { + go func(peer int) { + rf.mu.Lock() + index := rf.nextIndex[peer] + term := 0 + entries := make([]map[string]int, 0) + args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId:rf.me, PrevLogIndex:index - 1, PrevLogTerm:term, Entries:entries, LeaderCommit:rf.commitIndex} + reply := AppendEntriesReply{} + rf.mu.Unlock() + + ok := rf.sendAppendEntries(peer, args, &reply) + }(i) + } + } + } + atomic.AddInt64(&rf.counter, -1) +} +``` + +During the implemention, we found some inadequates in Raft, one of them is the strategy of processing vote request. It says if a node meets a higher term, it will switch to follower state. Assume a cluster with three nodes A, B and C. A is the leader in term 1, and shortly B encounters a network failure whose duration is longer than _electionTimeout_, thus B will start a new term and request for votes. When the network resumes, A and C will detect a higher term from B's request and according the Raft algorithm they will stop this term and make a new election. However, the cluster works well and a new election is unnecessary. This means in a large cluster, even if only a single node restarts, the whole cluster has to be re-built. This will reduce the performance of the cluster to a large extent especially when the cluster is large. + +Raft uses this strategy to make sure that leader can switch to follower state. But we can add another logic to realize that by counting followers. If a leader finds not a majority of nodes replies the heart beat message and it lasts longer than _electionTimeout_, it then switches to follower state and starts a new election. + +# Validation +There two test cases _TestInitialElection_ and _TestReElection_ are designed to test the correctness of the system. Run the tests many times and the result shows our system passes all the test cases successfully. + +tests + +# Conclusion +This expriment mainly focus on implementing the leader election part of Raft algorithm. The result shows that the cluster quickly generates a leader and remains the normal state until a failure, and after the failure the cluster can re-generate a new leader in a short time. This expriment proves the reliablity of Raft algorithm in another way. + +_* The full and up-to-date code is hosted at https://github.com/newnius/NJU-DisSys-2017_ + +# References + +[1] Ongaro D, Ousterhout J K. In search of an understandable consensus algorithm[C]//USENIX Annual Technical Conference. 2014: 305-319. + +[2] [Raft](http://thesecretlivesofdata.com/raft/) + +[3] [Go by Example: Atomic Counters](https://gobyexample.com/atomic-counters) + +[4] [Go by Example: Non-Blocking Channel Operations](https://gobyexample.com/non-blocking-channel-operations) + +[5] [Go by Example: Timers and Tickers](https://mmcgrana.github.io/2012/09/go-by-example-timers-and-tickers.html) + +[6] [time: Timer.Reset is not possible to use correctly](https://github.com/golang/go/issues/14038) + +[7] [论golang Timer Reset方法使用的正确姿势](http://tonybai.com/2016/12/21/how-to-use-timer-reset-in-golang-correctly/) + +[8] [Go Channel 详解](http://colobu.com/2016/04/14/Golang-Channels/) + +[9] [Go 语言切片(Slice)](http://www.runoob.com/go/go-slice.html) + +[10] [Go 语言函数方法](https://wizardforcel.gitbooks.io/w3school-go/content/20-5.html) + +[11] [Golang初学者易犯的三种错误](http://www.jianshu.com/p/e737cb26c141) + +[12] [Not able to install go lang plugin on intellij 2017.2 community edition](https://github.com/go-lang-plugin-org/go-lang-idea-plugin/issues/2897) diff --git a/src/raft/raft-2.md b/src/raft/raft-2.md new file mode 100644 index 0000000..80d5265 --- /dev/null +++ b/src/raft/raft-2.md @@ -0,0 +1,477 @@ +

Raft Implementation

+
Shuai Liu, MG1733039
+
+ +# Abstract +In this big data time, high performance distributed systems are required to process the large volumn of data. However, it is not easy to organize plenty of nodes. One of the significant problems is distributed consensus, which means every node in the cluster will eventually reach a consensus without any conflicts. + +Raft is a distributed consensus algorithm which has been proved workable. This expriment contitues the previous expriment and implements the log replication and finally tests the whole system in many abnormal situations. + +## keywords +Distributed Consensus, Leader Election, Log Replication + + +# Introduction + +Before Raft, (multi-)Paxos has been treated as an industry standard for a long time. However, even with the help of Paxos, we still find it hard to build up a reliable distributed system. + +Just as the comment from Chubby implementers: + +> There are significant gaps between the description of the Paxos algorithm and the needs of a real-world system.... the final system will be based on an unproven protocol. + +Paxos is rather difficult to implement mainly because it is not easy to understand for those who are not mathematicians. + +Then Raft came out, which has a good understandability. Compared with Paxos, there are smaller state space achieved by reducing states. Also, Raft decomposes the problem into leader election, log replication, safety and membership changes, instead of treating them as a total of mess. + +To further understand distributed consensus, this expriment tries to implement the Raft algorithm in go language. + +# Design & Realization +The language we use in this expriment is Go 1.9. + +## Structure +There are some states such as current term, logs, role of node that have to be stored and shared across the threads, so we design a structure called _Raft_. Among the variables, _currentTerm_, _votedFor_ and _logs_ are required to be persisted while the rest are volatile. To save space, some variables for loop control are not mentioned here. + +```go +type Raft struct { + mu sync.Mutex + peers []*labrpc.ClientEnd + persister *Persister + me int // index into peers[] + + currentTerm int + votedFor int + logs []map[string]int + + commitIndex int + lastApplied int + + nextIndex []int + matchIndex []int + + role int //0-follower, 1-candidate, 2-leader + electionTimeout time.Duration +} +``` + +## Initialization & main loop + +The main loop is almost the same as we mentioned in leader election in the previous expriment, so here we only discuss the differences. + +## Election + +In this expriment, we add following lines to the vote function, meaning inititialze _nextIndex_ and _matchIndex_ after being elected as the leader. + +```go +rf.nextIndex = rf.nextIndex[0:0] +rf.matchIndex = rf.matchIndex[0:0] +for i:=0; i rf.currentTerm { + rf.role = 0 + rf.votedFor = -1 + rf.currentTerm = args.Term + fmt.Println(rf.me, "says: higher term detected, term=", args.Term) + } + + /* check log first */ + lastLogTerm := 0 + if len(rf.logs) > 0 { + lastLogTerm = rf.logs[len(rf.logs) - 1]["term"]; + } + if ((rf.votedFor == -1 || rf.votedFor == args.CandidateId) && (lastLogTerm < args.LastLogTerm || (lastLogTerm == args.LastLogTerm && len(rf.logs) <= args.LastLogIndex) )) { + reply.VoteGranted = true + rf.role = 0 + rf.votedFor = args.CandidateId + rf.lastHeart = time.Now() + } else { + reply.VoteGranted = false + } + } + rf.persist() + reply.Term = rf.currentTerm +} +``` + + +## AppendEneries + +The main process of log replication works as follows. + + - First of all, when a new command reaches the leader, the leader appendes it to _log_ and reply an index where command will exists if successfully replicated. Followers won't accept requests from clients, they simply redirect clients to the leader. + - The leader sends new log entries to each server, attching index of previous log entry and the term of that entry.The log entries are determined by _nextIndex_. + - Followers check whether they have previous log entries and then append new log entries to certain location. + - If a majority of followers accept a command, the leader increases his _commitIndex_ and replies to the client. + - The _commitIndex_ will be sent in next _appendEntry_ request, followers commit the commands known to have been replicated in a majority servers. + +The format of request and reply. + +```go +type AppendEntriesArgs struct { + Term int + LeaderId int + PrevLogIndex int + PrevLogTerm int + Entries []map[string]int + LeaderCommit int +} +``` + +```go +type AppendEntriesReply struct { + Term int + Success bool + Len int +} +``` + +The leader sends _appendEntries_ request to each follower periodically to keep the role state. In a request, if a follower has log entries not replicated, the next log entries will be attached in the request. Each reuqest is executed in a new thread in case the network is slow or unreachable, which will block the loop. + +Every _appendEntries_ contains more than one log entries. When a follower receives this request, it first confirms the role of the server claimed to be the server. Then it checks if it has already recorded the log entries before newer ones. If all pass, the server overwrite the log and replace log from certain index with given log entries in request. + +The leader receives reply from followers and increase _matchIndex_, which means logs entries known to replicated in a follower. If a majority followers have replicated a log entry, the leader increase the _commitIndex_ by one and replies to the client. To prevent potential problems of unreliable network, the log entries are commited one by one in incremental order. + +tests + +There is a potential problem in the log replication. If a leader receives a command but fails to replicate it due to network failure, it then becomes the leader in later term, now it can replicate the command to other followers. Unfortunatelly, it fails again shortyly after commiting the entry. In the origin algorithm, if a server is elected as the leader but that server does not contain that command, it will override the entry, and results in some followers commiting same log entry twice but with different commands. It conflicts with Raft rules that commands commited will exist in following leaders. + +To prevent this, we extend the algorithm to not commit commands from older term immediatelly after a majority replicates. Instead, they are commited just after a command in current term to be commited. This ensures only server which contains the newest lon entry can be elected as the leader. + +tests + +tests + +```go +func (rf *Raft) doSubmit() { + /* ensure only one thread is running */ + if atomic.AddInt64(&rf.counter, 1) > 1 { + atomic.AddInt64(&rf.counter, -1) + return + } + for range rf.heartBeatTicker.C { + if !rf.running { + break + } + if rf.role != 2 { + break + } + for i:=0;i< len(rf.peers);i++{ + if i != rf.me { + go func(peer int) { + rf.mu.Lock() + index := rf.nextIndex[peer] + term := 0 + /* TODO: limit entries max size */ + entries := make([]map[string]int, 0) + if len(rf.logs) >= index { + entries = append(entries, rf.logs[index-1:]...) + } + if index > 1 { + //fmt.Println(index, rf.logs) + term = rf.logs[index - 2]["term"] + } + args := AppendEntriesArgs{Term: rf.currentTerm, LeaderId:rf.me, PrevLogIndex:index - 1, PrevLogTerm:term, Entries:entries, LeaderCommit:rf.commitIndex} + reply := AppendEntriesReply{} + rf.nextIndex[peer] = args.PrevLogIndex + len(entries) + 1 + rf.mu.Unlock() + + ok := rf.sendAppendEntries(peer, args, &reply) + + rf.mu.Lock() + if ok && args.Term == rf.currentTerm && rf.role == 2 { + rf.matchIndex[peer] = args.PrevLogIndex + len(entries) + /* update commitIndex */ + for iter:=rf.commitIndex; iter iter { + count++ + } + //fmt.Println(j, rf.matchIndex[j], count) + } + } + + if count * 2 > len(rf.peers){ + for i:=rf.commitIndex; i<=iter; i++ { + rf.commitIndex = i + 1 + command := rf.logs[i]["command"] + fmt.Println(rf.me, "says: ", command, "is committed, index=", i + 1) + rf.applyCh <- ApplyMsg{Index: i + 1, Command: command, UseSnapshot:false, Snapshot:rf.persister.ReadRaftState()} + } + } else { // commit in order + break + } + } + } + rf.mu.Unlock() + }(i) + } + } + } + fmt.Println(rf.me, "says: stop heart beat") + atomic.AddInt64(&rf.counter, -1) +} +``` + +```go +func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) { + rf.mu.Lock() + defer rf.mu.Unlock() + + if args.Term < rf.currentTerm { + reply.Success = false + } else { + reply.Success = true + rf.currentTerm = args.Term + rf.role = 0 + rf.votedFor = args.LeaderId + + if args.PrevLogIndex > 0 { + if len(rf.logs) >= args.PrevLogIndex && rf.logs[args.PrevLogIndex-1]["term"] == args.PrevLogTerm { + reply.Success = true + } else { + reply.Success = false + reply.Len = len(rf.logs) + rf.lastHeart = time.Now() + } + } + } + reply.Term = rf.currentTerm + if reply.Success { + rf.logs = rf.logs[0:args.PrevLogIndex] + //sync logs && apply + rf.logs = append(rf.logs, args.Entries...) + for iter:=rf.commitIndex; iter < args.LeaderCommit; iter++ { + command := rf.logs[iter]["command"] + rf.applyCh <- ApplyMsg{Index: iter + 1, Command:command, UseSnapshot:false, Snapshot:rf.persister.ReadRaftState()} + } + rf.commitIndex = args.LeaderCommit + rf.lastHeart = time.Now() + } + rf.persist() +} +``` + +```go +func (rf *Raft) sendAppendEntries(server int, args AppendEntriesArgs, reply *AppendEntriesReply) bool { + ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) + if ok { + if reply.Term > rf.currentTerm { + rf.mu.Lock() + rf.currentTerm = reply.Term + rf.role = 0 + rf.votedFor = -1 + rf.persist() + rf.mu.Unlock() + } + if ok && rf.role == 2 && !reply.Success { + rf.nextIndex[server] = args.PrevLogIndex + if reply.Len < args.PrevLogIndex { + rf.nextIndex[server] = reply.Len + 1 + } + } + } + return ok && reply.Success +} +``` + +In some cases, a follower may lose log entries and as a result it can not simply override the log. In these cases, it replies false in the _appendEntries_ request, the leader will then decrease the _nextIndex_ and sends older log entries until the missing logs entries are fixed. + +However, a server may lose too many log entries. If we use the above strategy, it consumes a long time. To speed up the decreament, we add a parameter _len_ to the reply structure which means the log longth of follower. Then the leader can reset _nextIndex_ to _len_ to reduce the time re-trying. Or we can use a snapshot. + +# Validation + +There are a total 17 test case _TestInitialElection_, _TestReElection_, _TestBasicAgree_, _TestFailAgree_, _TestFailNoAgree_, _TestConcurrentStarts_, _TestRejoin_, _TestBackup_, _TestCount_, _TestPersist1_, _TestPersist2_, _TestPersist3_, _TestFigure8_, _TestUnreliableAgree_, _TestFigure8Unreliable_, _TestReliableChurn_ and _internalChurn_, ranging from normal state to unreliable network such as network delay, network partition, package loss, duplicated packages and reordering of packages. + +Run the tests many times and the result shows that our system passes all the test cases successfully. + +tests + +This is the output of _TestBasicAgree_ + +``` +0 says: hello world! +1 says: hello world! +2 says: hello world! +3 says: hello world! +4 says: hello world! +Test: basic agreement ... +1 says: I am not a leader +2 says: I am not a leader +3 says: I am not a leader +4 says: I am not a leader +0 says: I am not a leader +2 says: bye~ +0 says: stop heart beat +1 says: I am not a leader +2 says: I am not a leader +3 says: I am not a leader +4 says: I am not a leader +0 says: I am not a leader +0 says: bye~ +1 says: I am not a leader +2 says: I am not a leader +3 says: I am not a leader +4 says: I am not a leader +0 says: I am not a leader +1 says: bye~ +1 tells 2 : vote me, {28 1 0 0} +1 tells 0 : vote me, {28 1 0 0} +0 says: higher term detected, term= 28 +0 tells 1 : vote granted +2 says: higher term detected, term= 28 +2 tells 1 : vote granted +1 says: I am the leader in term 28 +1 says: stop heart beat +3 tells 0 : vote me, {1 3 0 0} +3 tells 4 : vote me, {1 3 0 0} +3 tells 1 : vote me, {1 3 0 0} +3 tells 2 : vote me, {1 3 0 0} +0 says: higher term detected, term= 1 +0 tells 3 : vote granted +4 says: higher term detected, term= 1 +4 tells 3 : vote granted +1 says: higher term detected, term= 1 +1 tells 3 : vote granted +2 says: higher term detected, term= 1 +2 tells 3 : vote granted +3 says: I am the leader in term 1 +3 tells 4 : ping, {1 3 0 0 [] 0} +3 tells 1 : ping, {1 3 0 0 [] 0} +3 tells 2 : ping, {1 3 0 0 [] 0} +3 tells 0 : ping, {1 3 0 0 [] 0} +4 tells 3 : pong, &{1 true} +2 tells 3 : pong, &{1 true} +1 tells 3 : pong, &{1 true} +0 tells 3 : pong, &{1 true} +1 says: I am not a leader +2 says: I am not a leader +3 says: new command 100 in term 1 +3 tells 2 : ping, {1 3 0 0 [map[command:100 term:1]] 0} +3 tells 0 : ping, {1 3 0 0 [map[command:100 term:1]] 0} +3 tells 1 : ping, {1 3 0 0 [map[command:100 term:1]] 0} +3 tells 4 : ping, {1 3 0 0 [map[command:100 term:1]] 0} +1 tells 3 : pong, &{1 true} +2 tells 3 : pong, &{1 true} +4 tells 3 : pong, &{1 true} +0 tells 3 : pong, &{1 true} +3 says: 100 is committed, index= 1 +3 tells 4 : ping, {1 3 1 1 [] 1} +3 tells 2 : ping, {1 3 1 1 [] 1} +3 tells 1 : ping, {1 3 1 1 [] 1} +3 tells 0 : ping, {1 3 1 1 [] 1} +2 says: commit 100 index= 1 +4 says: commit 100 index= 1 +2 tells 3 : pong, &{1 true} +4 tells 3 : pong, &{1 true} +1 says: commit 100 index= 1 +1 tells 3 : pong, &{1 true} +0 says: commit 100 index= 1 +0 tells 3 : pong, &{1 true} +1 says: I am not a leader +2 says: I am not a leader +3 says: new command 200 in term 1 +3 tells 1 : ping, {1 3 1 1 [map[command:200 term:1]] 1} +3 tells 2 : ping, {1 3 1 1 [map[command:200 term:1]] 1} +3 tells 4 : ping, {1 3 1 1 [map[command:200 term:1]] 1} +3 tells 0 : ping, {1 3 1 1 [map[command:200 term:1]] 1} +1 tells 3 : pong, &{1 true} +4 tells 3 : pong, &{1 true} +2 tells 3 : pong, &{1 true} +0 tells 3 : pong, &{1 true} +3 says: 200 is committed, index= 2 +3 tells 4 : ping, {1 3 2 1 [] 2} +3 tells 0 : ping, {1 3 2 1 [] 2} +3 tells 1 : ping, {1 3 2 1 [] 2} +3 tells 2 : ping, {1 3 2 1 [] 2} +0 says: commit 200 index= 2 +0 tells 3 : pong, &{1 true} +2 says: commit 200 index= 2 +4 says: commit 200 index= 2 +2 tells 3 : pong, &{1 true} +1 says: commit 200 index= 2 +1 tells 3 : pong, &{1 true} +4 tells 3 : pong, &{1 true} +1 says: I am not a leader +2 says: I am not a leader +3 says: new command 300 in term 1 +3 tells 1 : ping, {1 3 2 1 [map[command:300 term:1]] 2} +3 tells 2 : ping, {1 3 2 1 [map[term:1 command:300]] 2} +3 tells 0 : ping, {1 3 2 1 [map[command:300 term:1]] 2} +3 tells 4 : ping, {1 3 2 1 [map[command:300 term:1]] 2} +1 tells 3 : pong, &{1 true} +2 tells 3 : pong, &{1 true} +3 says: 300 is committed, index= 3 +4 tells 3 : pong, &{1 true} +0 tells 3 : pong, &{1 true} +3 tells 4 : ping, {1 3 3 1 [] 3} +3 tells 1 : ping, {1 3 3 1 [] 3} +3 tells 2 : ping, {1 3 3 1 [] 3} +3 tells 0 : ping, {1 3 3 1 [] 3} +4 says: commit 300 index= 3 +4 tells 3 : pong, &{1 true} +1 says: commit 300 index= 3 +2 says: commit 300 index= 3 +0 says: commit 300 index= 3 +1 tells 3 : pong, &{1 true} +2 tells 3 : pong, &{1 true} +0 tells 3 : pong, &{1 true} + ... Passed + +``` + + +# Conclusion +This expriment implements the rest parts of Raft and then makes a fully test on the whole system. The result shows that the cluster quickly generates a leader and remains the normal state until a failure, and after the failure the cluster can re-elect a new leader in a short time. Even in some extremely bad network situations, the system can tolerant the unreliable network and works well. This expriment proves the reliablity of Raft algorithm in another way. + +What's more, this expriment shows that test-driven development has great value, it can expose potential problems which is not easy to find by code review. + +_* The full and up-to-date code is hosted at https://github.com/newnius/NJU-DisSys-2017_ + +# References + +[1] Ongaro D, Ousterhout J K. In search of an understandable consensus algorithm[C]//USENIX Annual Technical Conference. 2014: 305-319. + +[2] [Raft](http://thesecretlivesofdata.com/raft/) + +[3] [Go by Example: Atomic Counters](https://gobyexample.com/atomic-counters) + +[4] [Go by Example: Non-Blocking Channel Operations](https://gobyexample.com/non-blocking-channel-operations) + +[5] [Go by Example: Timers and Tickers](https://mmcgrana.github.io/2012/09/go-by-example-timers-and-tickers.html) + +[6] [time: Timer.Reset is not possible to use correctly](https://github.com/golang/go/issues/14038) + +[7] [论golang Timer Reset方法使用的正确姿势](http://tonybai.com/2016/12/21/how-to-use-timer-reset-in-golang-correctly/) + +[8] [Go Channel 详解](http://colobu.com/2016/04/14/Golang-Channels/) + +[9] [Go 语言切片(Slice)](http://www.runoob.com/go/go-slice.html) + +[10] [Go 语言函数方法](https://wizardforcel.gitbooks.io/w3school-go/content/20-5.html) + +[11] [Golang初学者易犯的三种错误](http://www.jianshu.com/p/e737cb26c141) + +[12] [Not able to install go lang plugin on intellij 2017.2 community edition](https://github.com/go-lang-plugin-org/go-lang-idea-plugin/issues/2897) diff --git a/src/rpc/README.md b/src/rpc/README.md new file mode 100644 index 0000000..621572c --- /dev/null +++ b/src/rpc/README.md @@ -0,0 +1,35 @@ +# How to run +## Server + +```bash +./server +``` + +## Client +go +```bash +./client +``` + +python +```bash +python ntp.py +``` +The client output as follow: + +``` +Current: 10/23 20:19:05 +Current: 10/23 20:19:06 +Current: 10/23 20:19:07 +Current: 10/23 20:19:08 +Current: 10/23 20:19:09 +Current: 10/23 20:19:10 +RPC call cost 1.482382ms +DEBUG: synced +Current: 10/23 20:20:51 +Current: 10/23 20:20:52 +Current: 10/23 20:20:53 +``` + +__Notice__ +The address is set to ntp.newnius.com, you have to add `127.0.0.1ntp.newnius.com` in `/etc/hosts` or change the address to localhost in client. diff --git a/src/rpc/ntp.md b/src/rpc/ntp.md new file mode 100644 index 0000000..8ce85bd --- /dev/null +++ b/src/rpc/ntp.md @@ -0,0 +1,411 @@ +

Network Time Synchronization based on RPC

+
Shuai Liu, MG1733039
+
+ +# Abstract +It is vital in some cases to sync time with server. This expriment uses RPC call to realize network time synchronization. + +## keywords +RPC, Time Synchronization, Go + +# Introduction +There are many cases that time synchonization is more or less significant. For some competitive online games, it would be a mess if various clients hold inconsistant time. In this expriment, we try to design and realize a network time syncronization in the architecture of C/S. + +We have a plenty of communication methods between server and clients to choose from. Considering convenient and scalability, and finally RPC is selected. Mature RPC fragments have the advantages of service discovery, retry on failure, load balance which is rather helpful. As to the develop language, go is choosen since it is designed for such high concurrency distributed appliations. + + +# Design & Realization +There are several requirements from this network time syncronization application. First of all, it should be language independent and cross-platform which means the server is able to support different kinds of clients running in various platforms. Secondly, the server can serve a quantity of clients at the same time. Thirdly, in this application, only authorized clients could get time from server. Except for the requirements above, we have more things to take into consideration such as network delay, network failure, scalability, etc. + +After research, we found some available packages of RPC in go language and after comperison gRPC turned out to be the best choice from our application. + +||rpc|rpc/http|rpcx|grpc|thrift| +|---|---|---|---|---|---| +|Language independent | N | N | N | Y | Y | +|Authorization| N | N | Y | Y | N | + +## Communication delay & failure +It is rather common for an online application that the network latency is too high to be ignored and even package lose. This problem is more obvious for our network time synchonization application. However, it is not easy to solve this problem since the network changes unpredictedly all the time. In our expriment, we don't want to have a extremely precise result and can tolerate a second level of error. + +To do this, we record the local time before and after making RPC call which we call `start` and `stop`, and obviously `stop - start` is the time need for one RPC call. We assume that processing time in both sides can be ignored and the network changes little during the request. So we can calculate the time to reach another side simply by dividing the difference by 2. + +```go +start := time.Now().UnixNano() + +// do RPC call + +stop := time.Now().UnixNano() + +delay := ( stop - start ) / 2; +``` + +Then, we can get the real time by subtracting delay from the time replied. + +```go +realtime = reply.time - delay +``` + +## Setup Environment + +## Install Go +```bash +# Download +wget https://storage.googleapis.com/golang/go1.9.1.linux-amd64.tar.gz + +# Extract +sudo tar -C /usr/local/ -xzf go1.9.1.linux-amd64.tar.gz + +# Configure system environments of go +sudo bash -c 'cat >> /etc/profile <> /etc/profile < + +Fault Tolerance + +## Cross-platform & Language-independent + +Since we don't have Windows system in hand, so all the work were made in Linux. But consider that python is a cross-platform language so we can say that the client can run on most platforms. + +Language-independent + +## Authorization & Security +In our tests, only clients using the right method and certificate files can get response from server. + +Security + +## Concurrency Performance +We deployed 6 VPSs to perform as clients and one as server. The resource of VPSs ranges from 1 Core/0.5G Ram to 1 Core/2G Ram, and the server is deployed in a 1 core/2G Ram VPS. All of them are in system Linux without desktop and most of the resources are vacant. To better simulate the real environment, the VPSs are located around the world. + +First of all, we tried to run as many as possible clients in the 6 nodes, each client would send a query requst every 5 seconds. The total number of client is about 2000. + +Client Numbers + +Client Numbers + +After that, we run a client in our develop machine, and the log showed that the RPC call is made successfully. The figures below shows that under the concurrency of 1000, the server still works perfect and can hold more clients. + +Establised connections +
+Server load + +Server load +
+Server load + +# Future Work +There are still a lot of work to do with this application. + +Although the communication delay is considered, it is not that precise for higher requirements. To achieve this goal, we can apply more approaches to fix this with more than one query. Secondly, in linux system, each process can only have at most 1024 active connections, which means only 1024 clients can be served as the same time. What's more, each connection may consume more resource than needed. These limitations can be solved by changing the settings. + +What's more, gRPC uses `HTTP`, which is based on `TCP` in communication, this improves performance for frequent queries. But in our case, the request may be very slow and failure is acceptable, so we can use UDP to serve more clients. + +# Conclusion +This expriment shows that RPC is a good fragment to make communications between server and client. With the help of a muture fragement, we can easily build a high available online application. + +# References + +[1] [Getting Started - The Go Programming Language](https://golang.org/doc/install) + +[2] [c - Error on trying to run a simple RPC program - Stack Overflow](https://stackoverflow.com/questions/10448696/error-on-trying-to-run-a-simple-rpc-program) + +[3] [RPC: Unknown Protocol - Stack Overflow](https://stackoverflow.com/questions/3818624/rpc-unknown-protocol) + +[4] [Go官方库RPC开发指南](http://colobu.com/2016/09/18/go-net-rpc-guide/) + +[5] [golang与java间的json-rpc跨语言调用](http://www.cnblogs.com/geomantic/p/4751859.html) + +[6] [Go语言内部rpc简单实例,实现python调用go的jsonrpc小实例](http://blog.csdn.net/fyxichen/article/details/46998101) + +[7] [GO语言RPC调用方案调研](https://scguoi.github.io/DivisionByZero/2016/11/15/GO%E8%AF%AD%E8%A8%80RPC%E6%96%B9%E6%A1%88%E8%B0%83%E7%A0%94.html) + +[8] [gRPC 初探 - phper成长之路 - SegmentFault](https://segmentfault.com/a/1190000011483346) + +[9] [Google 开源 RPC 框架 gRPC 初探](http://blog.jrwang.me/2016/grpc-at-first-view/) + +[10] [Authentication in gRPC](http://mycodesmells.com/post/authentication-in-grpc) + +[11] [Golang gRPC实践 连载四 gRPC认证](https://my.oschina.net/ifraincoat/blog/879545) + +[12] [Secure gRPC with TLS/SSL · Libelli](https://bbengfort.github.io/programmer/2017/03/03/secure-grpc.html) + +[13] [开源RPC(gRPC/Thrift)框架性能评测](http://www.eit.name/blog/read.php?566) + +[14] [Go RPC 开发指南](https://smallnest.gitbooks.io/go-rpc/content/) + +[15] [grpc / Authentication](https://grpc.io/docs/guides/auth.html) + +[16] [Unix系统下的连接数限制](http://www.jianshu.com/p/faccda312cfe) + +[17] [Linux下高并发socket最大连接数所受的各种限制](http://blog.sae.sina.com.cn/archives/1988) diff --git a/src/rpc/ntp.py b/src/rpc/ntp.py new file mode 100644 index 0000000..4f794bd --- /dev/null +++ b/src/rpc/ntp.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +import grpc +import ntp_pb2 +import datetime, time, threading + +import ntp +ntp.delay = 0.0 + +running = True + +def sync(): + try: + creds = grpc.ssl_channel_credentials(open('server-cert.pem').read()) + channel = grpc.secure_channel('ntp.newnius.com:8844', creds) + + #channel = grpc.insecure_channel('ntp.newnius.com:8844') + + stub = ntp_pb2.NtpStub(channel) + + request = ntp_pb2.NtpRequest() + while running: + start = round(time.time() * 1000) + #print start / 1000 + #time.sleep(5) + reply = stub.Query(request) + #time.sleep(5) + m = ( round(time.time() * 1000) - start ) / 2 + print "RPC call used: ", m * 2, "ms" + #print start + #print reply.message / float(1000000) + #m = 0 + ntp.delay = ( reply.message / float(1000000) - start - m ) / float(1000) + #print ntp.delay + print "DEBUG: synced" + time.sleep(5) + except Exception as e: + print "error occured: ", e + +def tick(): + while True: + current = datetime.datetime.now() + datetime.timedelta(0, ntp.delay) + print "Current: ", current + time.sleep(1) + +if __name__ == '__main__': + try: + #sync() + t = threading.Thread(target=sync) + t.start() + tick() + except KeyboardInterrupt: + running = False