mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-15 08:16:43 +00:00
support stop
This commit is contained in:
@@ -3,7 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AllocatorFIFO struct {
|
type AllocatorFIFO struct {
|
||||||
history []*Job
|
history []*Job
|
||||||
@@ -127,6 +127,14 @@ func (allocator *AllocatorFIFO) status(jobName string) MsgJobStatus {
|
|||||||
return jm.status()
|
return jm.status()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (allocator *AllocatorFIFO) stop(jobName string) MsgStop {
|
||||||
|
jm, ok := allocator.jobs[jobName]
|
||||||
|
if !ok {
|
||||||
|
return MsgStop{Code: 1, Error: "Job not exist!"}
|
||||||
|
}
|
||||||
|
return jm.stop()
|
||||||
|
}
|
||||||
|
|
||||||
func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog {
|
func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog {
|
||||||
jm, ok := allocator.jobs[jobName]
|
jm, ok := allocator.jobs[jobName]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|||||||
@@ -162,3 +162,18 @@ func (jm *JobManager) status() MsgJobStatus {
|
|||||||
|
|
||||||
return MsgJobStatus{Status: tasksStatus}
|
return MsgJobStatus{Status: tasksStatus}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (jm *JobManager) stop() MsgStop {
|
||||||
|
for _, taskStatus := range jm.jobStatus.tasks {
|
||||||
|
spider := Spider{}
|
||||||
|
spider.Method = "POST"
|
||||||
|
spider.URL = "http://" + taskStatus.Node + ":8000/stop?id=" + taskStatus.Id
|
||||||
|
spider.do()
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range jm.resources {
|
||||||
|
jm.allocator.returnResource(jm.resources[i])
|
||||||
|
}
|
||||||
|
jm.allocator.finish(&jm.job)
|
||||||
|
return MsgStop{Code: 0}
|
||||||
|
}
|
||||||
|
|||||||
14
src/main.go
14
src/main.go
@@ -15,7 +15,7 @@ var pool *ResourcePool
|
|||||||
var allocator *AllocatorFIFO
|
var allocator *AllocatorFIFO
|
||||||
|
|
||||||
func serverAPI(w http.ResponseWriter, r *http.Request) {
|
func serverAPI(w http.ResponseWriter, r *http.Request) {
|
||||||
var nodes []int
|
var nodes []string
|
||||||
for id := range pool.nodes {
|
for id := range pool.nodes {
|
||||||
nodes = append(nodes, id)
|
nodes = append(nodes, id)
|
||||||
}
|
}
|
||||||
@@ -28,7 +28,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
|
|||||||
break
|
break
|
||||||
|
|
||||||
case "resource_get_by_node":
|
case "resource_get_by_node":
|
||||||
id := str2int(r.URL.Query().Get("id"), -1)
|
id := r.URL.Query().Get("id")
|
||||||
js, _ := json.Marshal(pool.getByID(id))
|
js, _ := json.Marshal(pool.getByID(id))
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.Write(js)
|
w.Write(js)
|
||||||
@@ -57,9 +57,15 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Write(js)
|
w.Write(js)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
case "job_stop":
|
||||||
|
fmt.Println("job_stop")
|
||||||
|
js, _ := json.Marshal(allocator.stop(string(r.PostFormValue("id"))))
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Write(js)
|
||||||
|
break
|
||||||
|
|
||||||
case "task_logs":
|
case "task_logs":
|
||||||
fmt.Println("task_logs")
|
fmt.Println("task_logs")
|
||||||
fmt.Println(r.URL.Query().Get("id"))
|
|
||||||
js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("job"), r.URL.Query().Get("task")))
|
js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("job"), r.URL.Query().Get("task")))
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.Write(js)
|
w.Write(js)
|
||||||
@@ -87,7 +93,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
pool = &ResourcePool{}
|
pool = &ResourcePool{}
|
||||||
pool.nodes = make(map[int]NodeStatus)
|
pool.nodes = make(map[string]NodeStatus)
|
||||||
|
|
||||||
allocator = &AllocatorFIFO{}
|
allocator = &AllocatorFIFO{}
|
||||||
allocator.start()
|
allocator.start()
|
||||||
|
|||||||
@@ -2,12 +2,12 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ResourcePool struct {
|
type ResourcePool struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
nodes map[int]NodeStatus
|
nodes map[string]NodeStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) update(node NodeStatus) {
|
func (pool *ResourcePool) update(node NodeStatus) {
|
||||||
@@ -27,7 +27,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
|
|||||||
//log.Println(pool.nodes)
|
//log.Println(pool.nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) getByID(id int) NodeStatus {
|
func (pool *ResourcePool) getByID(id string) NodeStatus {
|
||||||
pool.mu.Lock()
|
pool.mu.Lock()
|
||||||
defer pool.mu.Unlock()
|
defer pool.mu.Unlock()
|
||||||
|
|
||||||
|
|||||||
@@ -21,6 +21,11 @@ type MsgSubmit struct {
|
|||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MsgStop struct {
|
||||||
|
Code int `json:"code"`
|
||||||
|
Error string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
type MsgSummary struct {
|
type MsgSummary struct {
|
||||||
Code int `json:"code"`
|
Code int `json:"code"`
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
@@ -93,7 +98,7 @@ type GPUStatus struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type NodeStatus struct {
|
type NodeStatus struct {
|
||||||
ClientID int `json:"code"`
|
ClientID string `json:"id"`
|
||||||
ClientHost string `json:"host"`
|
ClientHost string `json:"host"`
|
||||||
Status []GPUStatus `json:"status"`
|
Status []GPUStatus `json:"status"`
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user