From 3cda4e2480c14576fc28520db6628e5d52247f1b Mon Sep 17 00:00:00 2001 From: Newnius Date: Mon, 25 May 2020 13:28:58 +0800 Subject: [PATCH] update --- src/job_manager.go | 38 +++++++++++++++++++------------------- src/main.go | 2 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/job_manager.go b/src/job_manager.go index 57c1741..88fec86 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -24,7 +24,7 @@ type JobManager struct { } func (jm *JobManager) start() { - log.Info("start job ", jm.job.Name, time.Now()) + log.Info("start job ", jm.job.Name, " at ", time.Now()) jm.isRunning = false jm.killFlag = false jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} @@ -120,7 +120,7 @@ func (jm *JobManager) start() { time.Sleep(time.Second * 25) } - /* make sure resource are released */ + /* make sure resources are released */ jm.returnResource(jm.status().Status) log.Info("JobMaster exited ", jm.job.Name) } @@ -182,28 +182,26 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { log.Info(jm.job.Name, "-", i, " ", status[i].Status) if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag { log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) - jm.isRunning = false jm.stop(false) + jm.killFlag = true jm.scheduler.UpdateProgress(jm.job, Failed) - jm.returnResource(status) - break } } } - if jm.isRunning && onlyPS { + if flagRunning && onlyPS { log.Info("Only PS is running, stop ", jm.job.Name) - jm.isRunning = false jm.stop(false) - jm.scheduler.UpdateProgress(jm.job, Finished) - jm.returnResource(status) } - if jm.isRunning && !flagRunning && !jm.killFlag { - jm.isRunning = false + if !flagRunning && !jm.killFlag { jm.scheduler.UpdateProgress(jm.job, Finished) - jm.returnResource(status) log.Info("finish job ", jm.job.Name) } + + if !flagRunning { + jm.isRunning = false + jm.returnResource(status) + } } /* fetch logs of task */ @@ -282,14 +280,16 @@ func (jm *JobManager) status() MsgJobStatus { /* force stop all containers */ func (jm *JobManager) stop(force bool) MsgStop { for _, taskStatus := range jm.jobStatus.tasks { - v := url.Values{} - v.Set("id", taskStatus.Id) + /* stop at background */ + go func(task TaskStatus) { + v := url.Values{} + v.Set("id", task.Id) - _, err := doRequest("POST", "http://"+taskStatus.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") - if err != nil { - log.Warn(err.Error()) - continue - } + _, err := doRequest("POST", "http://"+task.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + if err != nil { + log.Warn(err.Error()) + } + }(taskStatus) } if force { diff --git a/src/main.go b/src/main.go index 58e8973..75a9b74 100644 --- a/src/main.go +++ b/src/main.go @@ -35,7 +35,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { log.Debug("job_submit") msgSubmit := MsgSubmit{Code: 0} err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job) - log.Info("Submit job", job.Name, time.Now()) + log.Info("Submit job ", job.Name, " at ", time.Now()) if err != nil { msgSubmit.Code = 1 msgSubmit.Error = err.Error()