From 549e559a73663d3e9e521d28161b796a73ba76ad Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 18 Apr 2019 17:25:37 +0800 Subject: [PATCH] support stop --- src/AllocatorFIFO.go | 10 +++++++++- src/job_manager.go | 15 +++++++++++++++ src/main.go | 14 ++++++++++---- src/resource_pool.go | 6 +++--- src/util.go | 7 ++++++- 5 files changed, 43 insertions(+), 9 deletions(-) diff --git a/src/AllocatorFIFO.go b/src/AllocatorFIFO.go index 42b2fe7..c9858e9 100644 --- a/src/AllocatorFIFO.go +++ b/src/AllocatorFIFO.go @@ -3,7 +3,7 @@ package main import ( "sync" "time" -) + ) type AllocatorFIFO struct { history []*Job @@ -127,6 +127,14 @@ func (allocator *AllocatorFIFO) status(jobName string) MsgJobStatus { return jm.status() } +func (allocator *AllocatorFIFO) stop(jobName string) MsgStop { + jm, ok := allocator.jobs[jobName] + if !ok { + return MsgStop{Code: 1, Error: "Job not exist!"} + } + return jm.stop() +} + func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog { jm, ok := allocator.jobs[jobName] if !ok { diff --git a/src/job_manager.go b/src/job_manager.go index 9edca8f..1d06be5 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -162,3 +162,18 @@ func (jm *JobManager) status() MsgJobStatus { return MsgJobStatus{Status: tasksStatus} } + +func (jm *JobManager) stop() MsgStop { + for _, taskStatus := range jm.jobStatus.tasks { + spider := Spider{} + spider.Method = "POST" + spider.URL = "http://" + taskStatus.Node + ":8000/stop?id=" + taskStatus.Id + spider.do() + } + + for i := range jm.resources { + jm.allocator.returnResource(jm.resources[i]) + } + jm.allocator.finish(&jm.job) + return MsgStop{Code: 0} +} diff --git a/src/main.go b/src/main.go index 9d77725..15cf074 100644 --- a/src/main.go +++ b/src/main.go @@ -15,7 +15,7 @@ var pool *ResourcePool var allocator *AllocatorFIFO func serverAPI(w http.ResponseWriter, r *http.Request) { - var nodes []int + var nodes []string for id := range pool.nodes { nodes = append(nodes, id) } @@ -28,7 +28,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { break case "resource_get_by_node": - id := str2int(r.URL.Query().Get("id"), -1) + id := r.URL.Query().Get("id") js, _ := json.Marshal(pool.getByID(id)) w.Header().Set("Content-Type", "application/json") w.Write(js) @@ -57,9 +57,15 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "job_stop": + fmt.Println("job_stop") + js, _ := json.Marshal(allocator.stop(string(r.PostFormValue("id")))) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + case "task_logs": fmt.Println("task_logs") - fmt.Println(r.URL.Query().Get("id")) js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("job"), r.URL.Query().Get("task"))) w.Header().Set("Content-Type", "application/json") w.Write(js) @@ -87,7 +93,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { func main() { pool = &ResourcePool{} - pool.nodes = make(map[int]NodeStatus) + pool.nodes = make(map[string]NodeStatus) allocator = &AllocatorFIFO{} allocator.start() diff --git a/src/resource_pool.go b/src/resource_pool.go index 3fde85b..daea469 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -2,12 +2,12 @@ package main import ( "sync" -) + ) type ResourcePool struct { mu sync.Mutex - nodes map[int]NodeStatus + nodes map[string]NodeStatus } func (pool *ResourcePool) update(node NodeStatus) { @@ -27,7 +27,7 @@ func (pool *ResourcePool) update(node NodeStatus) { //log.Println(pool.nodes) } -func (pool *ResourcePool) getByID(id int) NodeStatus { +func (pool *ResourcePool) getByID(id string) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() diff --git a/src/util.go b/src/util.go index ce785b8..e57f1c0 100644 --- a/src/util.go +++ b/src/util.go @@ -21,6 +21,11 @@ type MsgSubmit struct { Error string `json:"error"` } +type MsgStop struct { + Code int `json:"code"` + Error string `json:"error"` +} + type MsgSummary struct { Code int `json:"code"` Error string `json:"error"` @@ -93,7 +98,7 @@ type GPUStatus struct { } type NodeStatus struct { - ClientID int `json:"code"` + ClientID string `json:"id"` ClientHost string `json:"host"` Status []GPUStatus `json:"status"` }