diff --git a/src/job_manager.go b/src/job_manager.go index efc554f..7eee09b 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -16,13 +16,16 @@ type JobManager struct { jobStatus JobStatus resources []NodeStatus killedFlag bool + isRunning bool + network string } func (jm *JobManager) start() { log.Info("start job ", jm.job.Name, time.Now()) + jm.isRunning = false jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} - network := jm.scheduler.AcquireNetwork() + jm.network = jm.scheduler.AcquireNetwork() InstanceJobHistoryLogger().submitJob(jm.job) @@ -82,7 +85,7 @@ func (jm *JobManager) start() { } if !jm.killedFlag { jm.scheduler.UpdateProgress(jm.job, Running) - + jm.isRunning = true log.Info("ready to run job ", jm.job.Name, time.Now()) } @@ -116,7 +119,7 @@ func (jm *JobManager) start() { v.Set("gpus", strings.Join(GPUs, ",")) v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m") v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) - v.Set("network", network) + v.Set("network", jm.network) v.Set("should_wait", "1") v.Set("output_dir", "/tmp/") v.Set("hdfs_address", "http://192.168.100.104:50070/") @@ -160,17 +163,12 @@ func (jm *JobManager) start() { } time.Sleep(time.Second * 10) } - - jm.scheduler.ReleaseNetwork(network) - - if !jm.killedFlag { - jm.scheduler.UpdateProgress(jm.job, Finished) - log.Info("finish job ", jm.job.Name) - } - log.Info("JobMaster exited ", jm.job.Name) } func (jm *JobManager) checkStatus() bool { + if !jm.isRunning { + return false + } res := jm.status() flag := false onlyPS := true @@ -227,6 +225,17 @@ func (jm *JobManager) checkStatus() bool { log.Info("Only PS is running, stop ", jm.job.Name) jm.killedFlag = false } + + if !flag { + jm.isRunning = false + jm.scheduler.ReleaseNetwork(jm.network) + + if !jm.killedFlag { + jm.scheduler.UpdateProgress(jm.job, Finished) + log.Info("finish job ", jm.job.Name) + } + log.Info("JobMaster exited ", jm.job.Name) + } return flag }