From 72d8083d046bb7c254aa60f3f21d7726fbf8841f Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 22 Jul 2020 11:13:39 +0800 Subject: [PATCH] bugfix, set status to launching when task is launching --- src/job_manager.go | 32 ++++++++++++++++++++------------ src/logger.go | 10 +++++----- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/job_manager.go b/src/job_manager.go index a907d1c..88bc179 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -24,7 +24,7 @@ type JobManager struct { /* status */ jobStatus JobStatus isRunning bool - lastHeartBeat int64 + lastHeartBeat map[string]int64 /* history info */ stats [][]TaskStatus @@ -33,7 +33,7 @@ type JobManager struct { func (jm *JobManager) start() { log.Info("start job ", jm.job.Name, " at ", time.Now()) jm.isRunning = true - jm.lastHeartBeat = time.Now().Unix() + jm.lastHeartBeat = map[string]int64{} jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} jm.resources = map[string]NodeStatus{} @@ -52,7 +52,7 @@ func (jm *JobManager) start() { for i, node := range resources { jm.resources[jm.job.Tasks[i].Name] = node } - log.Info(jm.job.Name, " receive resource", jm.resources) + log.Info(jm.job.Name, " receive resource ", jm.resources) break } /* sleep random Millisecond to avoid deadlock */ @@ -70,7 +70,7 @@ func (jm *JobManager) start() { jm.scheduler.UpdateProgress(jm.job, Finished) } jm.returnResource() - log.Info("JobMaster exited ", jm.job.Name) + log.Info(jm.job.Name, "JobMaster exited ") return } @@ -154,6 +154,7 @@ func (jm *JobManager) start() { } taskStatus := TaskStatus{Id: res.Id, Node: node.ClientHost, HostName: jm.job.Tasks[i].Name} jm.jobStatus.tasks[task.Name] = taskStatus + jm.lastHeartBeat[task.Name] = time.Now().Unix() }(task, jm.resources[task.Name]) } @@ -163,6 +164,8 @@ func (jm *JobManager) start() { jm.isRunning = false jm.scheduler.UpdateProgress(jm.job, Failed) jm.stop() + } else { + log.Info(jm.job.Name, " all tasks launched success") } } @@ -171,8 +174,11 @@ func (jm *JobManager) start() { if !jm.isRunning { break } - if time.Now().Unix()-jm.lastHeartBeat > 30 { - log.Warn(jm.job.Name, " heartbeat longer tha 30s") + now := time.Now().Unix() + for task, pre := range jm.lastHeartBeat { + if now-pre > 30 { + log.Warn(jm.job.Name, "-", task, " heartbeat longer tha 30s") + } } time.Sleep(time.Second * 1) } @@ -239,7 +245,7 @@ func (jm *JobManager) start() { // }(task) //} - log.Info("JobMaster exited ", jm.job.Name) + log.Info(jm.job.Name, " JobMaster exited ") } /* release all resource */ @@ -268,7 +274,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { flagRunning := false onlyPS := true for i := range status { - if status[i].Status == "ready" || status[i].Status == "running" { + if status[i].Status == "ready" || status[i].Status == "running" || status[i].Status == "launching" { flagRunning = true if !jm.job.Tasks[i].IsPS { onlyPS = false @@ -287,7 +293,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { jm.scheduler.UpdateProgress(jm.job, Failed) jm.stop() } else if jm.isRunning { - log.Info("Some instance exited, close others") + log.Info(jm.job.Name, " Some instance exited, close others") jm.isRunning = false jm.scheduler.UpdateProgress(jm.job, Finished) jm.stop() @@ -297,7 +303,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { nodeID := jm.job.Tasks[i].Name if _, ok := jm.resources[nodeID]; ok { jm.scheduler.ReleaseResource(jm.job, jm.resources[nodeID]) - log.Info("return resource ", jm.resources[nodeID].ClientID) + log.Info(jm.job.Name, " return resource ", jm.resources[nodeID].ClientID) for _, t := range jm.resources[nodeID].Status { InstanceOfResourcePool().detach(t.UUID, jm.job) @@ -307,16 +313,17 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { } jm.resourcesMu.Unlock() } + jm.lastHeartBeat[jm.job.Tasks[i].Name] = time.Now().Unix() } if flagRunning && onlyPS && jm.isRunning { - log.Info("Only PS is running, stop ", jm.job.Name) + log.Info(jm.job.Name, " Only PS is running, stop ") jm.isRunning = false jm.scheduler.UpdateProgress(jm.job, Finished) jm.stop() } if !flagRunning && jm.isRunning { - log.Info("finish job ", jm.job.Name) + log.Info(jm.job.Name, " finish job ") jm.isRunning = false jm.scheduler.UpdateProgress(jm.job, Finished) } @@ -367,6 +374,7 @@ func (jm *JobManager) status() MsgJobStatus { /* still in launching phase */ if len(taskStatus.Node) == 0 { + tasksStatus[i] = TaskStatus{Status: "launching", State: map[string]interface{}{"ExitCode": float64(0)}} continue } diff --git a/src/logger.go b/src/logger.go index 05b26b2..8d50f19 100644 --- a/src/logger.go +++ b/src/logger.go @@ -25,7 +25,7 @@ func (logger *Logger) Debug(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, module) + args = append(args, "<--"+module) _log.Debug(args...) } @@ -36,7 +36,7 @@ func (logger *Logger) Info(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, module) + args = append(args, "<--"+module) _log.Info(args...) } @@ -47,7 +47,7 @@ func (logger *Logger) Warn(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, module) + args = append(args, "<--"+module) _log.Warn(args...) } @@ -58,7 +58,7 @@ func (logger *Logger) Fatal(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, module) + args = append(args, "<--"+module) _log.Fatal(args...) } @@ -69,7 +69,7 @@ func (logger *Logger) Fatalf(format string, args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, module) + args = append(args, "<--"+module) _log.Fatalf(format, args...) }