1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-05-24 00:47:40 +08:00
parent ea2718fe4f
commit a787825a91

View File

@@ -16,13 +16,16 @@ type JobManager struct {
jobStatus JobStatus jobStatus JobStatus
resources []NodeStatus resources []NodeStatus
killedFlag bool killedFlag bool
isRunning bool
network string
} }
func (jm *JobManager) start() { func (jm *JobManager) start() {
log.Info("start job ", jm.job.Name, time.Now()) log.Info("start job ", jm.job.Name, time.Now())
jm.isRunning = false
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
network := jm.scheduler.AcquireNetwork() jm.network = jm.scheduler.AcquireNetwork()
InstanceJobHistoryLogger().submitJob(jm.job) InstanceJobHistoryLogger().submitJob(jm.job)
@@ -82,7 +85,7 @@ func (jm *JobManager) start() {
} }
if !jm.killedFlag { if !jm.killedFlag {
jm.scheduler.UpdateProgress(jm.job, Running) jm.scheduler.UpdateProgress(jm.job, Running)
jm.isRunning = true
log.Info("ready to run job ", jm.job.Name, time.Now()) log.Info("ready to run job ", jm.job.Name, time.Now())
} }
@@ -116,7 +119,7 @@ func (jm *JobManager) start() {
v.Set("gpus", strings.Join(GPUs, ",")) v.Set("gpus", strings.Join(GPUs, ","))
v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m") v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m")
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU))
v.Set("network", network) v.Set("network", jm.network)
v.Set("should_wait", "1") v.Set("should_wait", "1")
v.Set("output_dir", "/tmp/") v.Set("output_dir", "/tmp/")
v.Set("hdfs_address", "http://192.168.100.104:50070/") v.Set("hdfs_address", "http://192.168.100.104:50070/")
@@ -160,17 +163,12 @@ func (jm *JobManager) start() {
} }
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
} }
jm.scheduler.ReleaseNetwork(network)
if !jm.killedFlag {
jm.scheduler.UpdateProgress(jm.job, Finished)
log.Info("finish job ", jm.job.Name)
}
log.Info("JobMaster exited ", jm.job.Name)
} }
func (jm *JobManager) checkStatus() bool { func (jm *JobManager) checkStatus() bool {
if !jm.isRunning {
return false
}
res := jm.status() res := jm.status()
flag := false flag := false
onlyPS := true onlyPS := true
@@ -227,6 +225,17 @@ func (jm *JobManager) checkStatus() bool {
log.Info("Only PS is running, stop ", jm.job.Name) log.Info("Only PS is running, stop ", jm.job.Name)
jm.killedFlag = false jm.killedFlag = false
} }
if !flag {
jm.isRunning = false
jm.scheduler.ReleaseNetwork(jm.network)
if !jm.killedFlag {
jm.scheduler.UpdateProgress(jm.job, Finished)
log.Info("finish job ", jm.job.Name)
}
log.Info("JobMaster exited ", jm.job.Name)
}
return flag return flag
} }