diff --git a/README.md b/README.md index eafaa61..f8d2754 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ ## API +#### ResourcePool **GetHeartCounter** ``` @@ -23,6 +24,7 @@ GPU is occupied by which job(s) ?action=get_bindings ``` +#### Scheduler **EnableSchedule** ``` ?action=debug_enable @@ -62,21 +64,6 @@ GPU is occupied by which job(s) ?action=debug_update_enable_pre_schedule_ratio&ratio=0.95 ``` -**FeedDLData** -``` -?action=debug_optimizer_feed_dl&job=lstm&seq=1&value=2 -``` - -**TrainDL** -``` -?action=debug_optimizer_train_dl&job=lstm -``` - -**PredictDL** -``` -?action=debug_get_predict_dl&job=lstm&seq=1 -``` - **UpdateAllocateStrategy** ``` ?action=allocator_update_strategy&strategy=bestfit diff --git a/src/job_manager.go b/src/job_manager.go index 942b9c4..98f215f 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -54,12 +54,14 @@ func (jm *JobManager) start() { if InstanceOfConfiguration().mock { jm.scheduler.UpdateProgress(jm.job, Running) + jm.job.Status = Running jm.isRunning = false duration := InstanceOfMocker().GetDuration(jm.job, jm.resources) log.Info("mock ", jm.job.Name, ", wait ", duration) time.Sleep(time.Second * time.Duration(duration)) jm.returnResource([]TaskStatus{}) jm.scheduler.UpdateProgress(jm.job, Finished) + jm.job.Status = Finished log.Info("JobMaster exited ", jm.job.Name) return } @@ -67,6 +69,7 @@ func (jm *JobManager) start() { if !jm.killFlag { /* switch to Running state */ jm.scheduler.UpdateProgress(jm.job, Running) + jm.job.Status = Running /* bring up containers */ wg := sync.WaitGroup{} @@ -149,7 +152,7 @@ func (jm *JobManager) start() { stats = append(stats, stat) } } - InstanceOfOptimizer().feedStats(jm.job, "PS", stats) + InstanceOfOptimizer().FeedStats(jm.job, "PS", stats) stats = [][]TaskStatus{} for _, vals := range jm.stats { var stat []TaskStatus @@ -164,7 +167,7 @@ func (jm *JobManager) start() { } //log.Info(jm.stats) //log.Info(stats) - InstanceOfOptimizer().feedStats(jm.job, "Worker", stats) + InstanceOfOptimizer().FeedStats(jm.job, "Worker", stats) jm.returnResource(jm.status().Status) log.Info("JobMaster exited ", jm.job.Name) } @@ -246,11 +249,13 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { jm.stop(false) jm.killFlag = true jm.scheduler.UpdateProgress(jm.job, Failed) + jm.job.Status = Failed } else if !jm.killFlag { log.Info("Some instance exited, close others") jm.stop(false) jm.killFlag = true jm.scheduler.UpdateProgress(jm.job, Finished) + jm.job.Status = Finished } if jm.resources[i].ClientID != "_released_" { @@ -271,10 +276,12 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { jm.stop(false) jm.killFlag = true jm.scheduler.UpdateProgress(jm.job, Finished) + jm.job.Status = Finished } if !flagRunning && !jm.killFlag { jm.scheduler.UpdateProgress(jm.job, Finished) + jm.job.Status = Finished log.Info("finish job ", jm.job.Name) } @@ -320,7 +327,7 @@ func (jm *JobManager) logs(taskName string) MsgLog { func (jm *JobManager) status() MsgJobStatus { var tasksStatus []TaskStatus for range jm.job.Tasks { //append would cause uncertain order - tasksStatus = append(tasksStatus, TaskStatus{}) + tasksStatus = append(tasksStatus, TaskStatus{TimeStamp: time.Now().Unix()}) } for i, task := range jm.job.Tasks { @@ -415,6 +422,7 @@ func (jm *JobManager) stop(force bool) MsgStop { if force { jm.killFlag = true jm.scheduler.UpdateProgress(jm.job, Stopped) + jm.job.Status = Stopped log.Info("kill job, ", jm.job.Name) } }() diff --git a/src/job_status.go b/src/job_status.go index 6b03970..4e3563e 100644 --- a/src/job_status.go +++ b/src/job_status.go @@ -23,4 +23,5 @@ type TaskStatus struct { UtilGPU int `json:"gpu_util"` UtilMemGPU int `json:"gpu_mem_util"` MemGPU int `json:"gpu_mem"` + TimeStamp int64 `json:"timestamp"` } diff --git a/src/main.go b/src/main.go index 0930b1d..a5311bd 100644 --- a/src/main.go +++ b/src/main.go @@ -81,6 +81,28 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "job_predict_time": + log.Debug("job_predict_time") + var job Job + err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job) + msgJobReq := MsgOptimizerPredict{Code: 0} + if err != nil { + msgJobReq.Code = 1 + msgJobReq.Error = err.Error() + } else { + msg := InstanceOfOptimizer().PredictTime(job) + msgJobReq.Pre = msg.Pre + msgJobReq.Post = msg.Post + msgJobReq.Total = msg.Total + } + js, err := json.Marshal(msgJobReq) + if err != nil { + log.Warn(err) + } + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + case "job_stop": log.Debug("job_stop") js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id")))) @@ -248,69 +270,6 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break - case "debug_get_predicts": - log.Debug("debug_get_predicts") - js, _ := json.Marshal(InstanceOfOptimizer().getAllPredicts()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_get_gpu_utils": - log.Debug("debug_get_gpu_utils") - js, _ := json.Marshal(InstanceOfOptimizer().getAllGPUUtils()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_optimizer_feed_dl": - log.Debug("debug_optimizer_feed_dl") - var job string - var seq int - var value int - job = r.URL.Query().Get("job") - if t, err := strconv.Atoi(r.URL.Query().Get("seq")); err == nil { - seq = t - } - if t, err := strconv.Atoi(r.URL.Query().Get("value")); err == nil { - value = t - } - InstanceOfOptimizer().feedData(job, seq, 0, 0, 0, value) - js, _ := json.Marshal(OptimizerJobExecutionTime{}) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_optimizer_describe_job": - log.Debug("debug_optimizer_describe_job") - var job string - job = r.URL.Query().Get("job") - js, _ := json.Marshal(InstanceOfOptimizer().describe(job)) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_optimizer_train_dl": - log.Debug("debug_optimizer_train_dl") - InstanceOfOptimizer().train(r.URL.Query().Get("job")) - js, _ := json.Marshal(OptimizerJobExecutionTime{}) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_get_predict_dl": - log.Debug("debug_get_predict_dl") - if seq, err := strconv.Atoi(r.URL.Query().Get("seq")); err == nil { - est, _ := InstanceOfOptimizer().predict(r.URL.Query().Get("job"), seq) - js, _ := json.Marshal(est) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - } else { - js, _ := json.Marshal(OptimizerJobExecutionTime{}) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - } - break - case "allocator_update_strategy": log.Debug("allocator_update_strategy") strategy := r.URL.Query().Get("strategy") @@ -389,7 +348,7 @@ func main() { InstanceOfResourcePool().init(config) InstanceOfCollector().init(config) InstanceJobHistoryLogger().init(config) - InstanceOfOptimizer().init(config) + InstanceOfOptimizer().Init(config) InstanceOfGroupManager().init(config) switch config.SchedulerPolicy { diff --git a/src/optimizer.go b/src/optimizer.go index e6a43ab..ae2deb4 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -7,23 +7,11 @@ import ( "io/ioutil" "strconv" "encoding/json" - "time" "math" "hash/fnv" ) type Optimizer struct { - scheduler Scheduler - killedFlag bool - - predicts map[string]*OptimizerJobExecutionTime - - jobUtilsGPU map[string]*OptimizerUtilGPU - - cache map[string]*OptimizerJobExecutionTime - - stats map[string]map[string]float64 - versions map[string]int } @@ -36,20 +24,228 @@ func InstanceOfOptimizer() *Optimizer { if optimizerInstance == nil { optimizerInstance = &Optimizer{} - optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{} - optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{} - optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{} - optimizerInstance.stats = map[string]map[string]float64{} optimizerInstance.versions = map[string]int{} } return optimizerInstance } -func (optimizer *Optimizer) init(conf Configuration) { +func (optimizer *Optimizer) Init(conf Configuration) { log.Info("optimizer started") } -func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus) { +func (optimizer *Optimizer) FeedTime(job Job, stats [][]TaskStatus) { + log.Info("optimizer feedTime", job) + if len(stats) == 0 || len(job.Tasks) != 1 { + return + } + + go func() { + str := strings.Split(job.Name, "-") + if len(str) == 2 { + jobName := str[0] + + var UtilGPUs []UtilGPUTimeSeries + for _, stat := range stats { + for _, task := range stat { + UtilGPUs = append(UtilGPUs, UtilGPUTimeSeries{Time: task.TimeStamp, Util: task.UtilGPU}) + } + } + var preTime int64 + for i := 0; i < len(UtilGPUs); i++ { + if UtilGPUs[i].Util > 15 { + preTime = UtilGPUs[i].Time - UtilGPUs[0].Time + break + } + } + + var postTime int64 + for i := len(UtilGPUs) - 1; i >= 0; i-- { + if UtilGPUs[i].Util > 15 { + postTime = UtilGPUs[len(UtilGPUs)-1].Time - UtilGPUs[i].Time + break + } + } + totalTime := UtilGPUs[len(UtilGPUs)-1].Time - UtilGPUs[0].Time + if preTime+postTime >= totalTime { /* in case GPU is not used */ + preTime /= 2 + postTime /= 2 + } + + tmp := map[string]float64{ + "pre": float64(preTime), + "post": float64(postTime), + "total": float64(totalTime), + } + labels, _ := json.Marshal(tmp) + + cmd := job.Tasks[0].Cmd + params := map[string]int{} + + exceptions := map[string]bool{} + exceptions["train_dir"] = true + exceptions["variable_update"] = true + exceptions["ps_hosts"] = true + exceptions["worker_hosts"] = true + exceptions["task_index"] = true + exceptions["job_name"] = true + + pairs := strings.Split(cmd, " ") + for _, pair := range pairs { + v := strings.Split(pair, "=") + if len(v) == 2 && v[0][:2] == "--" { + var param string + var value int + param = v[0][2:] + + if val, err := strconv.Atoi(v[1]); err == nil { + value = val + } else { + h := fnv.New32a() + h.Write([]byte(v[1])) + value = int(h.Sum32()) + } + if _, ok := exceptions[param]; !ok { + params[param] = value + } + } + } + params["cpu"] = job.Tasks[0].NumberCPU + params["mem"] = job.Tasks[0].Memory + params["gpu"] = job.Tasks[0].NumberGPU + params["gpu_mem"] = job.Tasks[0].MemoryGPU + //log.Info(job.Name, params) + features, _ := json.Marshal(params) + + spider := Spider{} + spider.Method = "GET" + spider.URL = "http://yao-optimizer:8080/feed?job=" + jobName + ":time" + "&features=" + string(features) + "&labels=" + string(labels) + + err := spider.do() + if err != nil { + log.Warn(err) + return + } + + resp := spider.getResponse() + if _, err := ioutil.ReadAll(resp.Body); err != nil { + log.Warn(err) + } + resp.Body.Close() + if err != nil { + log.Warn(err) + return + } + + if optimizer.versions[jobName]%3 == 0 { + optimizer.trainReq(jobName) + } + } + }() +} + +func (optimizer *Optimizer) trainTime(jobName string) { + spider := Spider{} + spider.Method = "GET" + params := "job=" + jobName + ":time" + spider.URL = "http://yao-optimizer:8080/train?" + params + + err := spider.do() + if err != nil { + return + } + + resp := spider.getResponse() + if _, err := ioutil.ReadAll(resp.Body); err != nil { + log.Warn(err) + } + resp.Body.Close() + if err != nil { + return + } +} + +func (optimizer *Optimizer) PredictTime(job Job) OptimizerJobExecutionTime { + res := OptimizerJobExecutionTime{Pre: 0, Post: 0, Total: math.MaxInt64} + + var jobName string + str := strings.Split(job.Name, "-") + if len(str) == 2 { + jobName = str[0] + } + cmd := job.Tasks[0].Cmd + params := map[string]int{} + + exceptions := map[string]bool{} + exceptions["train_dir"] = true + exceptions["variable_update"] = true + exceptions["ps_hosts"] = true + exceptions["worker_hosts"] = true + exceptions["task_index"] = true + exceptions["job_name"] = true + + pairs := strings.Split(cmd, " ") + for _, pair := range pairs { + v := strings.Split(pair, "=") + if len(v) == 2 && v[0][:2] == "--" { + var param string + var value int + param = v[0][2:] + + if val, err := strconv.Atoi(v[1]); err == nil { + value = val + } else { + h := fnv.New32a() + h.Write([]byte(v[1])) + value = int(h.Sum32()) + } + if _, ok := exceptions[param]; !ok { + params[param] = value + } + } + } + params["cpu"] = job.Tasks[0].NumberCPU + params["mem"] = job.Tasks[0].Memory + params["gpu"] = job.Tasks[0].NumberGPU + params["gpu_mem"] = job.Tasks[0].MemoryGPU + //log.Info(job.Name, params) + + features, _ := json.Marshal(params) + + spider := Spider{} + spider.Method = "GET" + spider.URL = "http://yao-optimizer:8080/predict?job=" + jobName + ":time" + "&features=" + string(features) + + err := spider.do() + if err != nil { + return res + } + + resp := spider.getResponse() + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + log.Warn(err) + return res + } + + var msg MsgJobReqPredict + err = json.Unmarshal([]byte(string(body)), &msg) + if err == nil && msg.Code == 0 { + tmp := msg.Labels + if v, ok := tmp["pre"]; ok { + res.Pre = int(math.Ceil(v / 100)) + } + if v, ok := tmp["post"]; ok { + res.Post = int(math.Ceil(v/1024)) * 1024 + } + if v, ok := tmp["total"]; ok { + res.Total = int(math.Ceil(v)/10) * 10 + } + } + return res +} + +func (optimizer *Optimizer) FeedStats(job Job, role string, stats [][]TaskStatus) { if len(stats) == 0 { return } @@ -170,191 +366,15 @@ func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus optimizer.versions[jobName]++ if optimizer.versions[jobName]%3 == 0 { - optimizer.train(jobName) + optimizer.trainReq(jobName) } }() } -func (optimizer *Optimizer) max(values []float64) float64 { - value := 0.0 - for _, v := range values { - if v > value { - value = v - } - } - return value -} - -func (optimizer *Optimizer) mean(values []float64) float64 { - sum := 0.0 - for _, v := range values { - sum += v - } - return sum / float64(len(values)) -} - -func (optimizer *Optimizer) std(values []float64) float64 { - mean := optimizer.mean(values) - std := 0.0 - for j := 0; j < len(values); j++ { - // The use of Pow math function func Pow(x, y float64) float64 - std += math.Pow(values[j]-mean, 2) - } - // The use of Sqrt math function func Sqrt(x float64) float64 - std = math.Sqrt(std / float64(len(values))) - return std -} - -func (optimizer *Optimizer) describe(job string) map[string]float64 { - if stat, ok := optimizer.stats[job]; ok { - return stat - } - return map[string]float64{} -} - -func (optimizer *Optimizer) feed3(job string, utils []UtilGPUTimeSeries) { - log.Info("optimizer feed ", job) - //log.Info(job, utils) - - if len(utils) == 0 { - return - } - - go func() { - str := strings.Split(job, "-") - if len(str) == 2 { - jobName := str[0] - - sum := 0 - for i := 0; i < len(utils); i++ { - sum += utils[i].Util - } - sum /= len(utils) - if _, ok := optimizer.jobUtilsGPU[jobName]; !ok { - optimizer.jobUtilsGPU[jobName] = &OptimizerUtilGPU{} - } - t := optimizer.jobUtilsGPU[jobName] - t.Util = (t.Version*t.Util + sum) / (t.Version + 1) - t.Version++ - - preTime := 0 - for i := 0; i < len(utils); i++ { - if utils[i].Util > 15 { - preTime = utils[i].Time - utils[0].Time - break - } - } - - postTime := 0 - for i := len(utils) - 1; i >= 0; i-- { - if utils[i].Util > 15 { - postTime = utils[len(utils)-1].Time - utils[i].Time - break - } - } - - if _, ok := optimizer.predicts[jobName]; !ok { - optimizer.predicts[jobName] = &OptimizerJobExecutionTime{} - } - totalTime := utils[len(utils)-1].Time - utils[0].Time - - predict := optimizer.predicts[jobName] - if predict.Version == 0 { - predict.Pre = preTime - predict.Post = postTime - predict.Total = totalTime - predict.Main = predict.Total - predict.Pre - predict.Post - if predict.Main < 0 { - predict.Main = 0 - } - } - predict.Pre = (predict.Pre*95 + preTime*5) / 100 - predict.Post = (predict.Post*95 + postTime*5) / 100 - predict.Total = (predict.Total*95 + totalTime*5) / 100 - predict.Main = predict.Total - predict.Pre - predict.Post - if predict.Main < 0 { - predict.Main = 0 - } - predict.Version++ - - optimizer.feedData(jobName, predict.Version, 0, 0, 0, predict.Total) - if predict.Version%10 == 0 && predict.Version > 30 { - optimizer.train(jobName) - } - } - }() -} - -func (optimizer *Optimizer) predictUtilGPU(job string) (int, bool) { - str := strings.Split(job, "-") - if len(str) == 2 { - jobName := str[0] - if _, ok := optimizer.jobUtilsGPU[jobName]; ok { - return optimizer.jobUtilsGPU[jobName].Util, optimizer.jobUtilsGPU[jobName].Version >= 5 - } - } - return 100, false -} - -func (optimizer *Optimizer) predictTime(job string) (*OptimizerJobExecutionTime, bool) { - str := strings.Split(job, "-") - if len(str) == 2 { - jobName := str[0] - if est, ok := optimizer.cache[jobName]; ok && est.Version > (int)(time.Now().Unix())-300 { - return est, true - } - if est, ok := optimizer.predicts[jobName]; ok { - if est.Version > 40 { - if est2, ok := optimizer.predict(jobName, est.Version); ok { - est2.Pre = est.Pre * est2.Total / est.Total - est2.Main = est.Main * est2.Total / est.Total - est2.Post = est.Post * est2.Total / est.Total - est2.Version = (int)(time.Now().Unix()) - optimizer.cache[jobName] = &est2 - return &est2, true - } - } - return est, est.Version >= 5 - } - } - return &OptimizerJobExecutionTime{}, false -} - -func (optimizer *Optimizer) getAllPredicts() map[string]*OptimizerJobExecutionTime { - return optimizer.predicts -} - -func (optimizer *Optimizer) getAllGPUUtils() map[string]*OptimizerUtilGPU { - return optimizer.jobUtilsGPU -} - -func (optimizer *Optimizer) feedData(job string, seq int, pre int, main int, post int, total int) { +func (optimizer *Optimizer) trainReq(jobName string) { spider := Spider{} spider.Method = "GET" - params := "job=" + job + "&seq=" + strconv.Itoa(seq) + "&value=" + strconv.Itoa(total) - spider.URL = "http://yao-optimizer:8080/feed?" + params - - err := spider.do() - if err != nil { - log.Warn(err) - return - } - - resp := spider.getResponse() - if _, err := ioutil.ReadAll(resp.Body); err != nil { - log.Warn(err) - } - resp.Body.Close() - if err != nil { - log.Warn(err) - return - } -} - -func (optimizer *Optimizer) train(job string) { - spider := Spider{} - spider.Method = "GET" - params := "job=" + job + params := "job=" + jobName spider.URL = "http://yao-optimizer:8080/train?" + params err := spider.do() @@ -372,33 +392,6 @@ func (optimizer *Optimizer) train(job string) { } } -func (optimizer *Optimizer) predict(job string, seq int) (OptimizerJobExecutionTime, bool) { - spider := Spider{} - spider.Method = "GET" - params := "job=" + job + "&seq=" + strconv.Itoa(seq) - spider.URL = "http://yao-optimizer:8080/predict?" + params - - err := spider.do() - if err != nil { - return OptimizerJobExecutionTime{}, false - } - - resp := spider.getResponse() - body, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - log.Warn(err) - return OptimizerJobExecutionTime{}, false - } - - var res MsgOptimizerPredict - err = json.Unmarshal([]byte(string(body)), &res) - if err == nil { - return OptimizerJobExecutionTime{Total: res.Total, Pre: res.Pre, Main: res.Main, Post: res.Post}, true - } - return OptimizerJobExecutionTime{}, false -} - func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq { res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0} @@ -507,3 +500,33 @@ func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq { } return res } + +func (optimizer *Optimizer) max(values []float64) float64 { + value := 0.0 + for _, v := range values { + if v > value { + value = v + } + } + return value +} + +func (optimizer *Optimizer) mean(values []float64) float64 { + sum := 0.0 + for _, v := range values { + sum += v + } + return sum / float64(len(values)) +} + +func (optimizer *Optimizer) std(values []float64) float64 { + mean := optimizer.mean(values) + std := 0.0 + for j := 0; j < len(values); j++ { + // The use of Pow math function func Pow(x, y float64) float64 + std += math.Pow(values[j]-mean, 2) + } + // The use of Sqrt math function func Sqrt(x float64) float64 + std = math.Sqrt(std / float64(len(values))) + return std +} diff --git a/src/resource_pool.go b/src/resource_pool.go index 025ffa9..267479b 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -336,7 +336,7 @@ func (pool *ResourcePool) update(node NodeStatus) { if _, ok := pool.bindings[gpu.UUID]; ok { if _, ok2 := pool.utils[gpu.UUID]; ok2 { pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID], - UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU}) + UtilGPUTimeSeries{Time: time.Now().Unix(), Util: gpu.UtilizationGPU}) } } @@ -743,58 +743,59 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { if pool.TotalGPU == 0 { return []NodeStatus{} } - loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU) + //loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU) /* first, choose sharable GPUs */ + /* if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio { // check sharable allocationType = 1 - if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { + pred := InstanceOfOptimizer().PredictReq(job, "Worker") - for cur := start; ; { - if _, ok := locks[cur.ID]; !ok { - cur.Lock.Lock() - locks[cur.ID] = &cur.Lock - } + for cur := start; ; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = &cur.Lock + } - for _, node := range cur.Nodes { - var available []GPUStatus - for _, status := range node.Status { - if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated { + for _, node := range cur.Nodes { + var available []GPUStatus + for _, status := range node.Status { + if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated { - if jobs, ok := pool.bindings[status.UUID]; ok { - totalUtil := util - for job := range jobs { - if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok { - totalUtil += utilT - } else { - totalUtil += 100 - } - } - if totalUtil < 100 { - available = append(available, status) + if jobs, ok := pool.bindings[status.UUID]; ok { + totalUtil := pred.UtilGPU + for job := range jobs { + if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok { + totalUtil += utilT + } else { + totalUtil += 100 } } + if totalUtil < 100 { + available = append(available, status) + } } } - if len(available) >= task.NumberGPU { - candidates = append(candidates, *node) - if len(candidates) >= len(job.Tasks)*3+5 { - break - } + } + if len(available) >= task.NumberGPU { + candidates = append(candidates, *node) + if len(candidates) >= len(job.Tasks)*3+5 { + break } } - if len(candidates) >= len(job.Tasks)*3+5 { - break - } - if cur.ID > cur.Next.ID { - break - } - cur = cur.Next } + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + if cur.ID > cur.Next.ID { + break + } + cur = cur.Next } - //log.Info(candidates) } + */ + //log.Info(candidates) /* second round, find vacant gpu */ if len(candidates) == 0 { @@ -831,10 +832,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { } /* third round, find gpu to be released */ + /* if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule { - estimate, valid := InstanceOfOptimizer().predictTime(job.Name) + estimate := InstanceOfOptimizer().PredictTime(job) - if loadRatio >= pool.enablePreScheduleRatio && valid { + if loadRatio >= pool.enablePreScheduleRatio { allocationType = 3 for cur := start; ; { if _, ok := locks[cur.ID]; !ok { @@ -850,13 +852,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { continue } for taskT, s := range tasks { - est, valid2 := InstanceOfOptimizer().predictTime(taskT) - if valid2 { - now := (int)(time.Now().Unix()) - log.Info(s, now, estimate, est) - if now-s > est.Total-est.Post-estimate.Pre-15 { - available = append(available, status) - } + est := InstanceOfOptimizer().PredictTime(taskT) + now := (int)(time.Now().Unix()) + log.Info(s, now, estimate, est) + if now-s > est.Total-est.Post-estimate.Pre-15 { + available = append(available, status) } } } @@ -879,6 +879,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { //log.Info(candidates) } } + */ if len(candidates) > 0 { log.Info("allocationType is ", allocationType) diff --git a/src/util.go b/src/util.go index 7acb1a8..aa40f08 100644 --- a/src/util.go +++ b/src/util.go @@ -40,8 +40,8 @@ type Task struct { } type UtilGPUTimeSeries struct { - Time int `json:"time"` - Util int `json:"util"` + Time int64 `json:"time"` + Util int `json:"util"` } type OptimizerJobExecutionTime struct {