1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 08:16:43 +00:00
This commit is contained in:
2020-05-25 13:28:58 +08:00
parent 3bf05b45a7
commit 3cda4e2480
2 changed files with 20 additions and 20 deletions

View File

@@ -24,7 +24,7 @@ type JobManager struct {
} }
func (jm *JobManager) start() { func (jm *JobManager) start() {
log.Info("start job ", jm.job.Name, time.Now()) log.Info("start job ", jm.job.Name, " at ", time.Now())
jm.isRunning = false jm.isRunning = false
jm.killFlag = false jm.killFlag = false
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
@@ -120,7 +120,7 @@ func (jm *JobManager) start() {
time.Sleep(time.Second * 25) time.Sleep(time.Second * 25)
} }
/* make sure resource are released */ /* make sure resources are released */
jm.returnResource(jm.status().Status) jm.returnResource(jm.status().Status)
log.Info("JobMaster exited ", jm.job.Name) log.Info("JobMaster exited ", jm.job.Name)
} }
@@ -182,28 +182,26 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
log.Info(jm.job.Name, "-", i, " ", status[i].Status) log.Info(jm.job.Name, "-", i, " ", status[i].Status)
if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag { if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag {
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
jm.isRunning = false
jm.stop(false) jm.stop(false)
jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Failed) jm.scheduler.UpdateProgress(jm.job, Failed)
jm.returnResource(status)
break
} }
} }
} }
if jm.isRunning && onlyPS { if flagRunning && onlyPS {
log.Info("Only PS is running, stop ", jm.job.Name) log.Info("Only PS is running, stop ", jm.job.Name)
jm.isRunning = false
jm.stop(false) jm.stop(false)
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.returnResource(status)
} }
if jm.isRunning && !flagRunning && !jm.killFlag { if !flagRunning && !jm.killFlag {
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished) jm.scheduler.UpdateProgress(jm.job, Finished)
jm.returnResource(status)
log.Info("finish job ", jm.job.Name) log.Info("finish job ", jm.job.Name)
} }
if !flagRunning {
jm.isRunning = false
jm.returnResource(status)
}
} }
/* fetch logs of task */ /* fetch logs of task */
@@ -282,14 +280,16 @@ func (jm *JobManager) status() MsgJobStatus {
/* force stop all containers */ /* force stop all containers */
func (jm *JobManager) stop(force bool) MsgStop { func (jm *JobManager) stop(force bool) MsgStop {
for _, taskStatus := range jm.jobStatus.tasks { for _, taskStatus := range jm.jobStatus.tasks {
v := url.Values{} /* stop at background */
v.Set("id", taskStatus.Id) go func(task TaskStatus) {
v := url.Values{}
v.Set("id", task.Id)
_, err := doRequest("POST", "http://"+taskStatus.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") _, err := doRequest("POST", "http://"+task.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
if err != nil { if err != nil {
log.Warn(err.Error()) log.Warn(err.Error())
continue }
} }(taskStatus)
} }
if force { if force {

View File

@@ -35,7 +35,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
log.Debug("job_submit") log.Debug("job_submit")
msgSubmit := MsgSubmit{Code: 0} msgSubmit := MsgSubmit{Code: 0}
err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job) err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job)
log.Info("Submit job", job.Name, time.Now()) log.Info("Submit job ", job.Name, " at ", time.Now())
if err != nil { if err != nil {
msgSubmit.Code = 1 msgSubmit.Code = 1
msgSubmit.Error = err.Error() msgSubmit.Error = err.Error()