1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 05:51:54 +00:00

bugfix, set status to launching when task is launching

This commit is contained in:
Newnius 2020-07-22 11:13:39 +08:00
parent 731f173503
commit 72d8083d04
2 changed files with 25 additions and 17 deletions

View File

@ -24,7 +24,7 @@ type JobManager struct {
/* status */
jobStatus JobStatus
isRunning bool
lastHeartBeat int64
lastHeartBeat map[string]int64
/* history info */
stats [][]TaskStatus
@ -33,7 +33,7 @@ type JobManager struct {
func (jm *JobManager) start() {
log.Info("start job ", jm.job.Name, " at ", time.Now())
jm.isRunning = true
jm.lastHeartBeat = time.Now().Unix()
jm.lastHeartBeat = map[string]int64{}
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
jm.resources = map[string]NodeStatus{}
@ -52,7 +52,7 @@ func (jm *JobManager) start() {
for i, node := range resources {
jm.resources[jm.job.Tasks[i].Name] = node
}
log.Info(jm.job.Name, " receive resource", jm.resources)
log.Info(jm.job.Name, " receive resource ", jm.resources)
break
}
/* sleep random Millisecond to avoid deadlock */
@ -70,7 +70,7 @@ func (jm *JobManager) start() {
jm.scheduler.UpdateProgress(jm.job, Finished)
}
jm.returnResource()
log.Info("JobMaster exited ", jm.job.Name)
log.Info(jm.job.Name, "JobMaster exited ")
return
}
@ -154,6 +154,7 @@ func (jm *JobManager) start() {
}
taskStatus := TaskStatus{Id: res.Id, Node: node.ClientHost, HostName: jm.job.Tasks[i].Name}
jm.jobStatus.tasks[task.Name] = taskStatus
jm.lastHeartBeat[task.Name] = time.Now().Unix()
}(task, jm.resources[task.Name])
}
@ -163,6 +164,8 @@ func (jm *JobManager) start() {
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Failed)
jm.stop()
} else {
log.Info(jm.job.Name, " all tasks launched success")
}
}
@ -171,8 +174,11 @@ func (jm *JobManager) start() {
if !jm.isRunning {
break
}
if time.Now().Unix()-jm.lastHeartBeat > 30 {
log.Warn(jm.job.Name, " heartbeat longer tha 30s")
now := time.Now().Unix()
for task, pre := range jm.lastHeartBeat {
if now-pre > 30 {
log.Warn(jm.job.Name, "-", task, " heartbeat longer tha 30s")
}
}
time.Sleep(time.Second * 1)
}
@ -239,7 +245,7 @@ func (jm *JobManager) start() {
// }(task)
//}
log.Info("JobMaster exited ", jm.job.Name)
log.Info(jm.job.Name, " JobMaster exited ")
}
/* release all resource */
@ -268,7 +274,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
flagRunning := false
onlyPS := true
for i := range status {
if status[i].Status == "ready" || status[i].Status == "running" {
if status[i].Status == "ready" || status[i].Status == "running" || status[i].Status == "launching" {
flagRunning = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
@ -287,7 +293,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
jm.scheduler.UpdateProgress(jm.job, Failed)
jm.stop()
} else if jm.isRunning {
log.Info("Some instance exited, close others")
log.Info(jm.job.Name, " Some instance exited, close others")
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.stop()
@ -297,7 +303,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
nodeID := jm.job.Tasks[i].Name
if _, ok := jm.resources[nodeID]; ok {
jm.scheduler.ReleaseResource(jm.job, jm.resources[nodeID])
log.Info("return resource ", jm.resources[nodeID].ClientID)
log.Info(jm.job.Name, " return resource ", jm.resources[nodeID].ClientID)
for _, t := range jm.resources[nodeID].Status {
InstanceOfResourcePool().detach(t.UUID, jm.job)
@ -307,16 +313,17 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
}
jm.resourcesMu.Unlock()
}
jm.lastHeartBeat[jm.job.Tasks[i].Name] = time.Now().Unix()
}
if flagRunning && onlyPS && jm.isRunning {
log.Info("Only PS is running, stop ", jm.job.Name)
log.Info(jm.job.Name, " Only PS is running, stop ")
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.stop()
}
if !flagRunning && jm.isRunning {
log.Info("finish job ", jm.job.Name)
log.Info(jm.job.Name, " finish job ")
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished)
}
@ -367,6 +374,7 @@ func (jm *JobManager) status() MsgJobStatus {
/* still in launching phase */
if len(taskStatus.Node) == 0 {
tasksStatus[i] = TaskStatus{Status: "launching", State: map[string]interface{}{"ExitCode": float64(0)}}
continue
}

View File

@ -25,7 +25,7 @@ func (logger *Logger) Debug(args ...interface{}) {
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
args = append(args, "<--"+module)
_log.Debug(args...)
}
@ -36,7 +36,7 @@ func (logger *Logger) Info(args ...interface{}) {
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
args = append(args, "<--"+module)
_log.Info(args...)
}
@ -47,7 +47,7 @@ func (logger *Logger) Warn(args ...interface{}) {
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
args = append(args, "<--"+module)
_log.Warn(args...)
}
@ -58,7 +58,7 @@ func (logger *Logger) Fatal(args ...interface{}) {
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
args = append(args, "<--"+module)
_log.Fatal(args...)
}
@ -69,7 +69,7 @@ func (logger *Logger) Fatalf(format string, args ...interface{}) {
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
args = append(args, "<--"+module)
_log.Fatalf(format, args...)
}