diff --git a/src/job_manager.go b/src/job_manager.go index bef25bb..bfde255 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -70,6 +70,7 @@ func (jm *JobManager) start() { v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m") v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) v.Set("network", network) + v.Set("should_wait", "1") resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") if err != nil { @@ -106,7 +107,6 @@ func (jm *JobManager) start() { } else if res.Status[i].Status == "running" { log.Debug(jm.job.Name, "-", i, " is running") flag = true - log.Info(jm.job.Tasks[i].IsPS) if !jm.job.Tasks[i].IsPS { onlyPS = false } @@ -138,6 +138,7 @@ func (jm *JobManager) start() { } if onlyPS { jm.stop() + log.Info("Only PS is running, stop", jm.job.Name) break } if !flag { @@ -232,6 +233,6 @@ func (jm *JobManager) stop() MsgStop { }() jm.scheduler.UpdateProgress(jm.job.Name, Stopped) - log.Info("kill job", jm.job.Name) + log.Info("kill job, ", jm.job.Name) return MsgStop{Code: 0} } diff --git a/src/optimizer.go b/src/optimizer.go index d3e57f6..3886acf 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -11,6 +11,9 @@ type Optimizer struct { killedFlag bool predicts map[string]OptimizerJobExecutionTime + + jobUtilsGPU map[string]int + versions map[string]int } var optimizerInstance *Optimizer @@ -23,6 +26,8 @@ func InstanceOfOptimizer() *Optimizer { if optimizerInstance == nil { optimizerInstance = &Optimizer{} optimizerInstance.predicts = map[string]OptimizerJobExecutionTime{} + optimizerInstance.jobUtilsGPU = map[string]int{} + optimizerInstance.versions = map[string]int{} } return optimizerInstance } @@ -30,11 +35,25 @@ func InstanceOfOptimizer() *Optimizer { func (optimizer *Optimizer) feed(job string, utils []int) { log.Info("optimizer feed") log.Info(job, utils) + log.Info(optimizer.jobUtilsGPU) + log.Info(optimizer.predicts) go func() { str := strings.Split(job, "-") if len(str) == 2 { preCnt := 0 + + sum := 0 + for i := 0; i < len(utils); i++ { + sum += utils[i] + } + last := 0 + if t, err := optimizer.jobUtilsGPU[job]; !err { + last = t + } + optimizer.jobUtilsGPU[job] = (optimizer.versions[job]*last + sum/len(utils)) / (optimizer.versions[job] + 1) + optimizer.versions[job]++ + for i := 0; i < len(utils); i++ { if utils[i] > 15 { break @@ -43,7 +62,7 @@ func (optimizer *Optimizer) feed(job string, utils []int) { } postCnt := 0 - for i := len(utils)-1; i >= 0; i-- { + for i := len(utils) - 1; i >= 0; i-- { if utils[i] > 15 { break } @@ -57,7 +76,25 @@ func (optimizer *Optimizer) feed(job string, utils []int) { predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1) predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1) predict.Total = ((predict.Total * predict.Version) + len(utils)) / (predict.Version + 1) + predict.Main = predict.Total - predict.Pre - predict.Post predict.Version++ } }() } + +func (optimizer *Optimizer) predictTime(job string, utils []int) (int, bool) { + if _, err := optimizer.jobUtilsGPU[job]; err { + return 100, false + } + if optimizer.versions[job] > 5 { + return optimizer.jobUtilsGPU[job], true + } + return optimizer.jobUtilsGPU[job], false +} + +func (optimizer *Optimizer) predictUtilGPU(job string) (OptimizerJobExecutionTime, bool) { + if _, err := optimizer.predicts[job]; err { + return OptimizerJobExecutionTime{}, false + } + return optimizer.predicts[job], optimizer.predicts[job].Version > 5 +} diff --git a/src/util.go b/src/util.go index 9faf2fa..61a30ec 100644 --- a/src/util.go +++ b/src/util.go @@ -170,6 +170,7 @@ type OptimizerJobExecutionTime struct { Pre int `json:"pre"` Post int `json:"post"` Total int `json:"total"` + Main int `json:"main"` Version int `json:"version"` }