From 66b4468c74fd74f16ea42a4d99f3904d326a14e4 Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 20 Mar 2019 11:14:07 +0800 Subject: [PATCH] update --- src/AllocatorFIFO.go | 153 +++++++++++++++++++++++++++++++++++++++ src/AllocatorParallel.go | 38 ++++++++++ src/collector.go | 2 +- src/job_manager.go | 136 +++++++++++++++++++++++++++++++--- src/main.go | 44 +++++++++-- src/resource_pool.go | 18 ++++- src/spider.go | 68 +++++++++++++++++ src/util.go | 108 ++++++++++++++++++++++++++- 8 files changed, 541 insertions(+), 26 deletions(-) create mode 100644 src/AllocatorFIFO.go create mode 100644 src/AllocatorParallel.go create mode 100644 src/spider.go diff --git a/src/AllocatorFIFO.go b/src/AllocatorFIFO.go new file mode 100644 index 0000000..20ac82e --- /dev/null +++ b/src/AllocatorFIFO.go @@ -0,0 +1,153 @@ +package main + +import ( + "sync" + "time" + "log" + "io/ioutil" + "encoding/json" +) + +type AllocatorFIFO struct { + queue []Job + mu sync.Mutex + scheduling sync.Mutex +} + +func (allocator *AllocatorFIFO) start() { + go func() { + for { + //fmt.Print("Scheduling ") + time.Sleep(time.Second * 5) + allocator.scheduling.Lock() + allocator.mu.Lock() + if len(allocator.queue) > 0 { + + jm := JobManager{} + jm.job = allocator.queue[0] + allocator.queue = allocator.queue[1:] + jm.allocator = allocator + go func() { + jm.start() + }() + } else { + allocator.scheduling.Unlock() + } + allocator.mu.Unlock() + } + }() +} + +func (allocator *AllocatorFIFO) ack() { + allocator.scheduling.Unlock() +} + +func (allocator *AllocatorFIFO) schedule(job Job) { + allocator.mu.Lock() + defer allocator.mu.Unlock() + + allocator.queue = append(allocator.queue, job) +} + +func (allocator *AllocatorFIFO) requestResource(task Task) MsgAgent { + pool.mu.Lock() + defer pool.mu.Unlock() + + res := MsgAgent{} + for id, node := range pool.nodes { + var available []NodeStatus + for _, status := range node { + if status.MemoryAllocated == 0 { + available = append(available, status) + } + } + if len(available) >= task.NumberGPU { + res.ClientID = id + res.Status = available[0:task.NumberGPU] + + for i := range res.Status { + for j := range node { + if res.Status[i].UUID == node[j].UUID { + node[j].MemoryAllocated = task.MemoryGPU + } + } + } + } + } + return res +} + +func (allocator *AllocatorFIFO) returnResource(agent MsgAgent) { + pool.mu.Lock() + defer pool.mu.Unlock() + nodes := pool.nodes[agent.ClientID] + for _, gpu := range agent.Status { + for j := range nodes { + if gpu.UUID == nodes[j].UUID { + nodes[j].MemoryAllocated = 0 + } + } + } +} + +func (allocator *AllocatorFIFO) status(jobName string) MsgJobStatus { + + var tasksStatus []TaskStatus + tasksStatus = append(tasksStatus, TaskStatus{Id: "8b9b665fc4f1"}) + tasksStatus = append(tasksStatus, TaskStatus{Id: "4a4aeee2c5f9"}) + + for i, taskStatus := range tasksStatus { + spider := Spider{} + spider.Method = "GET" + spider.URL = "http://kafka_node1:8000/status?id=" + taskStatus.Id + + err := spider.do() + if err != nil { + continue + } + + resp := spider.getResponse() + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + continue + } + + var res MsgTaskStatus + err = json.Unmarshal([]byte(string(body)), &res) + if err != nil { + continue + } + tasksStatus[i] = res.Status + } + + return MsgJobStatus{Status: tasksStatus} +} + +func (allocator *AllocatorFIFO) logs(taskName string) MsgLog { + spider := Spider{} + spider.Method = "GET" + spider.URL = "http://kafka_node1:8000/logs?id=" + taskName + + err := spider.do() + if err != nil { + return MsgLog{Code: 1, Error: err.Error()} + } + + resp := spider.getResponse() + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return MsgLog{Code: 1, Error: err.Error()} + } + + var res MsgLog + err = json.Unmarshal([]byte(string(body)), &res) + if err != nil { + log.Println(err) + return MsgLog{Code: 1, Error: "Unknown"} + } + return res +} diff --git a/src/AllocatorParallel.go b/src/AllocatorParallel.go new file mode 100644 index 0000000..1c7a1a3 --- /dev/null +++ b/src/AllocatorParallel.go @@ -0,0 +1,38 @@ +package main + +type AllocatorParallel struct { +} + +func (allocator *AllocatorParallel) requestResource(task Task) MsgAgent { + res := MsgAgent{} + for id, node := range pool.nodes { + var available []NodeStatus + for _, status := range node { + if status.MemoryAllocated == 0 { + available = append(available, status) + } + } + if len(available) >= task.NumberGPU { + res.ClientID = id + res.Status = available[0:task.NumberGPU] + + for i := range res.Status { + for j := range node { + if res.Status[i].UUID == node[j].UUID { + node[j].MemoryAllocated = task.MemoryGPU + } + } + } + } + } + return res +} + +func (allocator *AllocatorParallel) returnResource(agent MsgAgent) { + nodes := pool.nodes[agent.ClientID] + for i, gpu := range agent.Status { + if gpu.UUID == nodes[i].UUID { + nodes[i].MemoryAllocated = 0 + } + } +} diff --git a/src/collector.go b/src/collector.go index 5646190..eafc833 100644 --- a/src/collector.go +++ b/src/collector.go @@ -12,7 +12,7 @@ var ( ) func start(pool *ResourcePool) { - consumer, err := sarama.NewConsumer([]string{"kafka_node1:9092", "kafka_node2:9093", "kafka_node3:9094"}, nil) + consumer, err := sarama.NewConsumer([]string{"kafka_node1:9091", "kafka_node2:9092", "kafka_node3:9093"}, nil) if err != nil { panic(err) } diff --git a/src/job_manager.go b/src/job_manager.go index 180e34f..d2ab8c3 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -3,25 +3,143 @@ package main import ( "time" "log" + "net/url" + "strings" + "io/ioutil" + "encoding/json" ) type JobManager struct { + allocator *AllocatorFIFO + job Job + jobStatus JobStatus + resources []MsgAgent } -func (jm *JobManager) start(id int) { - log.Println("start job ", id) - /* request for resource */ +func (jm *JobManager) start() { + log.Println("start job ", jm.job.Name) + + /* request for resources */ + for i := range jm.job.Tasks { + var resource MsgAgent + for { + resource = jm.allocator.requestResource(jm.job.Tasks[i]) + if len(resource.Status) > 0 { + break + } + } + log.Println("Receive resource", resource) + jm.resources = append(jm.resources, resource) + } /* bring up containers */ + for i := range jm.job.Tasks { + v := url.Values{} + v.Set("image", jm.job.Image) + v.Set("cmd", jm.job.Tasks[i].Cmd) + resp, err := doRequest("POST", "http://kafka_node1:8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + if err != nil { + log.Println(err) + return + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Println(err) + return + } + + log.Println(string(body)) + + var res MsgCreate + err = json.Unmarshal([]byte(string(body)), &res) + if err != nil { + log.Println(err) + return + } + + jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id} + } + + jm.allocator.ack() /* monitor job execution */ for { - log.Println("executing job ", id) - time.Sleep(time.Second * 5) + res := jm.status() + for i := range res.Status { + if res.Status[i].Status == "running" { + log.Println(jm.job.Name, "-", i, " is running") + } else { + log.Println(jm.job.Name, "-", i, " ", res.Status[i].Status) + + /* save logs etc. */ + + /* return resource */ + jm.allocator.returnResource(jm.resources[i]) + } + } + time.Sleep(time.Second * 10) } - /* save logs etc. */ - - /* return resource */ - log.Println("finish job", id) + log.Println("finish job", jm.job.Name) +} + +func (jm *JobManager) logs(taskName string) MsgLog { + spider := Spider{} + spider.Method = "GET" + spider.URL = "http://127.0.0.1:8000/logs?id=" + taskName + + err := spider.do() + if err != nil { + return MsgLog{Code: 1, Error: err.Error()} + } + + resp := spider.getResponse() + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return MsgLog{Code: 1, Error: err.Error()} + } + + var res MsgLog + err = json.Unmarshal([]byte(string(body)), &res) + if err != nil { + log.Println(err) + return MsgLog{Code: 1, Error: "Unknown"} + } + return res +} + +func (jm *JobManager) status() MsgJobStatus { + + var tasksStatus []TaskStatus + for _, taskStatus := range jm.jobStatus.tasks { + spider := Spider{} + spider.Method = "GET" + spider.URL = "http://127.0.0.1:8000/status?id=" + taskStatus.Id + + err := spider.do() + if err != nil { + continue + } + + resp := spider.getResponse() + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + continue + } + + var res MsgTaskStatus + err = json.Unmarshal([]byte(string(body)), &res) + if err != nil { + continue + } + tasksStatus = append(tasksStatus, res.Status) + } + + return MsgJobStatus{Status: tasksStatus} } diff --git a/src/main.go b/src/main.go index 0e42a92..261244c 100644 --- a/src/main.go +++ b/src/main.go @@ -5,40 +5,64 @@ import ( "net/http" "log" "encoding/json" + "fmt" ) var addr = flag.String("addr", ":8080", "http service address") var pool *ResourcePool +var allocator *AllocatorFIFO + func serverAPI(w http.ResponseWriter, r *http.Request) { - nodes := make([]int, 1) + var nodes []int for id := range pool.nodes { nodes = append(nodes, id) } switch r.URL.Query().Get("action") { - case "host_gets": + case "node_gets": js, _ := json.Marshal(nodes) w.Header().Set("Content-Type", "application/json") w.Write(js) break + case "resource_get_by_node": id := str2int(r.URL.Query().Get("id"), -1) js, _ := json.Marshal(pool.getByID(id)) w.Header().Set("Content-Type", "application/json") w.Write(js) break + case "job_submit": - jm := &JobManager{} - id := str2int(r.URL.Query().Get("id"), -1) - go func() { - jm.start(id) - }() + var job Job + fmt.Println("job_submit") + err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job) + if err != nil { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(err.Error())) + return + } + allocator.schedule(job) js, _ := json.Marshal(nodes) w.Header().Set("Content-Type", "application/json") w.Write(js) break + + case "job_status": + fmt.Println("job_status") + js, _ := json.Marshal(allocator.status(r.URL.Query().Get("id"))) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "task_logs": + fmt.Println("task_logs") + fmt.Println(r.URL.Query().Get("id")) + js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("id"))) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break default: http.Error(w, "Not Found", http.StatusNotFound) break @@ -47,7 +71,11 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { func main() { pool = &ResourcePool{} - pool.nodes = make(map[int][]Status) + pool.nodes = make(map[int][]NodeStatus) + + allocator = &AllocatorFIFO{} + allocator.start() + go func() { start(pool) }() diff --git a/src/resource_pool.go b/src/resource_pool.go index c822fc0..1e9e6b7 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -2,22 +2,32 @@ package main import ( "sync" -) + ) type ResourcePool struct { mu sync.Mutex - nodes map[int][]Status + nodes map[int][]NodeStatus } func (pool *ResourcePool) update(node MsgAgent) { pool.mu.Lock() defer pool.mu.Unlock() + status, ok := pool.nodes[node.ClientID] + if ok { + for i := range status { + if status[i].UUID == node.Status[i].UUID { + node.Status[i].MemoryAllocated = status[i].MemoryAllocated + } + } + } pool.nodes[node.ClientID] = node.Status + + //log.Println(pool.nodes) } -func (pool *ResourcePool) getByID(id int) []Status { +func (pool *ResourcePool) getByID(id int) []NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() @@ -25,5 +35,5 @@ func (pool *ResourcePool) getByID(id int) []Status { if ok { return status } - return []Status{} + return []NodeStatus{} } diff --git a/src/spider.go b/src/spider.go new file mode 100644 index 0000000..52b8306 --- /dev/null +++ b/src/spider.go @@ -0,0 +1,68 @@ +package main + +import ( + "net/http" + "math/rand" + "time" + "net/url" + "strings" +) + +type Spider struct { + UserAgent string + Method string + URL string + ContentType string + Referer string + Data url.Values + + Response *http.Response +} + +func (spider *Spider) do() error { + client := &http.Client{} + req, err := http.NewRequest(spider.Method, spider.URL, strings.NewReader(spider.Data.Encode())) + if err != nil { + return err + } + + if len(spider.ContentType) > 0 { + req.Header.Set("Content-Type", spider.ContentType) + } + + if len(spider.UserAgent) == 0 { + req.Header.Set("User-Agent", getUA()) + } + + if len(spider.Referer) > 0 { + req.Header.Set("Referer", spider.Referer) + } + + spider.Response, err = client.Do(req) + if err != nil { + return err + } + return nil +} + +func (spider *Spider) getResponse() *http.Response { + return spider.Response +} + +func (spider *Spider) getUA() string { + rand.Seed(time.Now().Unix()) + UAs := []string{ + "Mozilla/5.0 (X11; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0", + "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0", + "Mozilla/5.0 (X11; Linux i586; rv:63.0) Gecko/20100101 Firefox/63.0", + "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:63.0) Gecko/20100101 Firefox/63.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:10.0) Gecko/20100101 Firefox/62.0", + "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.13; ko; rv:1.9.1b2) Gecko/20081201 Firefox/60.0", + "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/58.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14931", + "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9", + } + return UAs[rand.Intn(len(UAs))] +} diff --git a/src/util.go b/src/util.go index af1cfc1..f424e09 100644 --- a/src/util.go +++ b/src/util.go @@ -2,15 +2,58 @@ package main import ( "strconv" + "math/rand" + "time" + "io" + "net/http" ) -type Status struct { +type MsgLog struct { + Code int `json:"code"` + Error string `json:"error"` + Logs string `json:"logs"` +} + +type MsgTaskStatus struct { + Code int `json:"code"` + Error string `json:"error"` + Status TaskStatus `json:"status"` +} + +type MsgJobStatus struct { + Code int `json:"code"` + Error string `json:"error"` + Status []TaskStatus `json:"status"` +} + +type MsgCreate struct { + Code int `json:"code"` + Error string `json:"error"` + Id string `json:"id"` +} + +type TaskStatus struct { + Id string `json:"id"` + Image string `json:"image"` + ImageDigest string `json:"image_digest"` + Command string `json:"command"` + CreatedAt string `json:"created_at"` + FinishedAt string `json:"finished_at"` + Status string `json:"status"` +} + +type JobStatus struct { + Name string + tasks map[string]TaskStatus +} + +type NodeStatus struct { UUID string `json:"uuid"` ProductName string `json:"product_name"` - FanSpeed int `json:"fan_speed"` PerformanceState string `json:"performance_state"` MemoryTotal int `json:"emory_total"` MemoryFree int `json:"memory_free"` + MemoryAllocated int `json:"memory_allocated"` MemoryUsed int `json:"memory_used"` UtilizationGPU int `json:"utilization_gpu"` UtilizationMem int `json:"utilization_mem"` @@ -19,8 +62,32 @@ type Status struct { } type MsgAgent struct { - ClientID int `json:"code"` - Status []Status `json:"status"` + ClientID int `json:"code"` + Status []NodeStatus `json:"status"` +} + +type Job struct { + ID int `json:"id"` + Name string `json:"name"` + Image string `json:"image"` + Tasks []Task `json:"tasks"` + Workspace int `json:"workspace"` + Cluster int `json:"virtual_cluster"` + Priority int `json:"priority"` + RunBefore int `json:"run_before"` + CreatedAt int `json:"created_at"` + UpdatedAt int `json:"updated_at"` + CreatedBy int `json:"created_by"` + Status int `json:"status"` +} + +type Task struct { + Name string `json:"name"` + Cmd string `json:"cmd"` + NumberCPU int `json:"cpu_number"` + Memory int `json:"memory"` + NumberGPU int `json:"gpu_number"` + MemoryGPU int `json:"gpu_memory"` } func str2int(str string, defaultValue int) int { @@ -30,3 +97,36 @@ func str2int(str string, defaultValue int) int { } return defaultValue } + +func getUA() string { + rand.Seed(time.Now().Unix()) + UAs := []string{ + "Mozilla/5.0 (X11; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0", + "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0", + "Mozilla/5.0 (X11; Linux i586; rv:63.0) Gecko/20100101 Firefox/63.0", + "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:63.0) Gecko/20100101 Firefox/63.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:10.0) Gecko/20100101 Firefox/62.0", + "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.13; ko; rv:1.9.1b2) Gecko/20081201 Firefox/60.0", + "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/58.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14931", + "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9", + } + return UAs[rand.Intn(len(UAs))] +} + +func doRequest(method string, url string, r io.Reader, contentType string, referer string) (*http.Response, error) { + client := &http.Client{} + req, err := http.NewRequest(method, url, r) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", contentType) + req.Header.Set("User-Agent", getUA()) + req.Header.Set("Referer", referer) + + resp, err := client.Do(req) + return resp, err +}