From 804dfa969fb5404512fc1b6d1af461c3f4cdc07b Mon Sep 17 00:00:00 2001 From: Newnius Date: Mon, 25 Mar 2019 15:36:30 +0800 Subject: [PATCH] update --- src/AllocatorFIFO.go | 105 ++++++++++++++++++------------------------- src/job_manager.go | 14 ++++-- src/main.go | 9 +++- src/util.go | 14 ++++++ 4 files changed, 76 insertions(+), 66 deletions(-) diff --git a/src/AllocatorFIFO.go b/src/AllocatorFIFO.go index 20ac82e..6647576 100644 --- a/src/AllocatorFIFO.go +++ b/src/AllocatorFIFO.go @@ -3,18 +3,21 @@ package main import ( "sync" "time" - "log" - "io/ioutil" - "encoding/json" ) type AllocatorFIFO struct { - queue []Job - mu sync.Mutex + history []*Job + queue []Job + mu sync.Mutex scheduling sync.Mutex + + jobs map[string]*JobManager } func (allocator *AllocatorFIFO) start() { + allocator.jobs = map[string]*JobManager{} + allocator.history = []*Job{} + go func() { for { //fmt.Print("Scheduling ") @@ -27,6 +30,14 @@ func (allocator *AllocatorFIFO) start() { jm.job = allocator.queue[0] allocator.queue = allocator.queue[1:] jm.allocator = allocator + allocator.jobs[jm.job.Name] = &jm + + for i := range allocator.history { + if allocator.history[i].Name == jm.job.Name { + allocator.history[i].Status = Starting + } + } + go func() { jm.start() }() @@ -38,8 +49,21 @@ func (allocator *AllocatorFIFO) start() { }() } -func (allocator *AllocatorFIFO) ack() { +func (allocator *AllocatorFIFO) ack(job *Job) { allocator.scheduling.Unlock() + for i := range allocator.history { + if allocator.history[i].Name == job.Name { + allocator.history[i].Status = Running + } + } +} + +func (allocator *AllocatorFIFO) finish(job *Job) { + for i := range allocator.history { + if allocator.history[i].Name == job.Name { + allocator.history[i].Status = Finished + } + } } func (allocator *AllocatorFIFO) schedule(job Job) { @@ -47,6 +71,7 @@ func (allocator *AllocatorFIFO) schedule(job Job) { defer allocator.mu.Unlock() allocator.queue = append(allocator.queue, job) + allocator.history = append(allocator.history, &job) } func (allocator *AllocatorFIFO) requestResource(task Task) MsgAgent { @@ -91,63 +116,21 @@ func (allocator *AllocatorFIFO) returnResource(agent MsgAgent) { } func (allocator *AllocatorFIFO) status(jobName string) MsgJobStatus { - - var tasksStatus []TaskStatus - tasksStatus = append(tasksStatus, TaskStatus{Id: "8b9b665fc4f1"}) - tasksStatus = append(tasksStatus, TaskStatus{Id: "4a4aeee2c5f9"}) - - for i, taskStatus := range tasksStatus { - spider := Spider{} - spider.Method = "GET" - spider.URL = "http://kafka_node1:8000/status?id=" + taskStatus.Id - - err := spider.do() - if err != nil { - continue - } - - resp := spider.getResponse() - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - continue - } - - var res MsgTaskStatus - err = json.Unmarshal([]byte(string(body)), &res) - if err != nil { - continue - } - tasksStatus[i] = res.Status + jm, ok := allocator.jobs[jobName] + if !ok { + return MsgJobStatus{Code: 1, Error: "Job not exist!"} } - - return MsgJobStatus{Status: tasksStatus} + return jm.status() } -func (allocator *AllocatorFIFO) logs(taskName string) MsgLog { - spider := Spider{} - spider.Method = "GET" - spider.URL = "http://kafka_node1:8000/logs?id=" + taskName - - err := spider.do() - if err != nil { - return MsgLog{Code: 1, Error: err.Error()} +func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog { + jm, ok := allocator.jobs[jobName] + if !ok { + return MsgLog{Code: 1, Error: "Job not exist!"} } - - resp := spider.getResponse() - defer resp.Body.Close() - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return MsgLog{Code: 1, Error: err.Error()} - } - - var res MsgLog - err = json.Unmarshal([]byte(string(body)), &res) - if err != nil { - log.Println(err) - return MsgLog{Code: 1, Error: "Unknown"} - } - return res + return jm.logs(taskName) +} + +func (allocator *AllocatorFIFO) listJobs() MsgJobList { + return MsgJobList{Code: 0, Jobs: allocator.history} } diff --git a/src/job_manager.go b/src/job_manager.go index d2ab8c3..65083d9 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -18,6 +18,7 @@ type JobManager struct { func (jm *JobManager) start() { log.Println("start job ", jm.job.Name) + jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} /* request for resources */ for i := range jm.job.Tasks { @@ -62,14 +63,16 @@ func (jm *JobManager) start() { jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id} } - jm.allocator.ack() + jm.allocator.ack(&jm.job) /* monitor job execution */ for { res := jm.status() + flag := false for i := range res.Status { if res.Status[i].Status == "running" { log.Println(jm.job.Name, "-", i, " is running") + flag = true } else { log.Println(jm.job.Name, "-", i, " ", res.Status[i].Status) @@ -79,16 +82,20 @@ func (jm *JobManager) start() { jm.allocator.returnResource(jm.resources[i]) } } + if !flag { + break + } time.Sleep(time.Second * 10) } + jm.allocator.finish(&jm.job) log.Println("finish job", jm.job.Name) } func (jm *JobManager) logs(taskName string) MsgLog { spider := Spider{} spider.Method = "GET" - spider.URL = "http://127.0.0.1:8000/logs?id=" + taskName + spider.URL = "http://kafka_node1:8000/logs?id=" + taskName err := spider.do() if err != nil { @@ -113,12 +120,11 @@ func (jm *JobManager) logs(taskName string) MsgLog { } func (jm *JobManager) status() MsgJobStatus { - var tasksStatus []TaskStatus for _, taskStatus := range jm.jobStatus.tasks { spider := Spider{} spider.Method = "GET" - spider.URL = "http://127.0.0.1:8000/status?id=" + taskStatus.Id + spider.URL = "http://kafka_node1:8000/status?id=" + taskStatus.Id err := spider.do() if err != nil { diff --git a/src/main.go b/src/main.go index 261244c..cec8ae6 100644 --- a/src/main.go +++ b/src/main.go @@ -59,7 +59,14 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { case "task_logs": fmt.Println("task_logs") fmt.Println(r.URL.Query().Get("id")) - js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("id"))) + js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("job"), r.URL.Query().Get("task"))) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "jobs": + fmt.Println("job_list") + js, _ := json.Marshal(allocator.listJobs()) w.Header().Set("Content-Type", "application/json") w.Write(js) break diff --git a/src/util.go b/src/util.go index f424e09..bb82868 100644 --- a/src/util.go +++ b/src/util.go @@ -8,6 +8,20 @@ import ( "net/http" ) +const ( + Created = 0 + Starting = 1 + Running = 2 + Stopped = 3 + Finished = 4 +) + +type MsgJobList struct { + Code int `json:"code"` + Error string `json:"error"` + Jobs []*Job `json:"jobs"` +} + type MsgLog struct { Code int `json:"code"` Error string `json:"error"`