diff --git a/src/configuration.go b/src/configuration.go index 0fe430e..4c35b5d 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -21,6 +21,7 @@ type Configuration struct { PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds of schedule ahead except pre+post */ PreScheduleTimeout int `json:"PreScheduleTimeout"` JobMaxRetries int `json:"scheduler.job_max_retries"` + PreemptEnabled bool `json:"scheduler.preempt_enabled"` mock bool mu sync.Mutex @@ -53,6 +54,7 @@ func InstanceOfConfiguration() *Configuration { EnablePreScheduleRatio: 1.5, PreScheduleExtraTime: 15, JobMaxRetries: 0, + PreemptEnabled: false, } } return configurationInstance @@ -124,6 +126,8 @@ func (config *Configuration) InitFromEnv() { configurationInstance.JobMaxRetries = val } } + value = os.Getenv("scheduler.preempt_enabled") + configurationInstance.PreemptEnabled = value == "true" } func (config *Configuration) SetMockEnabled(enabled bool) bool { @@ -166,6 +170,14 @@ func (config *Configuration) SetJobMaxRetries(value int) bool { return true } +func (config *Configuration) SetPreemptEnabled(enabled bool) bool { + config.mu.Lock() + defer config.mu.Unlock() + config.PreemptEnabled = enabled + log.Info("scheduler.preempt_enabled is set to ", enabled) + return true +} + func (config *Configuration) Dump() map[string]interface{} { config.mu.Lock() defer config.mu.Unlock() @@ -183,6 +195,7 @@ func (config *Configuration) Dump() map[string]interface{} { res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio res["PreScheduleExtraTime"] = config.PreScheduleExtraTime res["PreScheduleTimeout"] = config.PreScheduleTimeout + res["scheduler.preempt_enabled"] = config.PreemptEnabled res["logger.level"] = log.LoggerLevel res["logger.modules_disabled"] = log.LoggerModuleDisabled return res diff --git a/src/evaluator.go b/src/evaluator.go index 0524335..32aab55 100644 --- a/src/evaluator.go +++ b/src/evaluator.go @@ -55,10 +55,11 @@ func (eva *Evaluator) add(node NodeStatus, task Task) { if _, ok := eva.domains[task.Job][node.Domain]; !ok { eva.domains[task.Job][node.Domain] = map[string]int{"PS": 0, "Worker": 0} } + bwFactor := float64(task.BW) if task.IsPS { - eva.costNetwork += eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"]) - eva.costNetwork += eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"]) - eva.costNetwork += eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"]) + eva.costNetwork += bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"]) + eva.costNetwork += bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"]) + eva.costNetwork += bwFactor * eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"]) eva.nodes[task.Job][node.ClientID]["PS"]++ eva.racks[task.Job][node.Rack]["PS"]++ @@ -66,9 +67,9 @@ func (eva *Evaluator) add(node NodeStatus, task Task) { eva.totalPSs[task.Job]++ eva.totalPS++ } else { - eva.costNetwork += eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"]) - eva.costNetwork += eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"]) - eva.costNetwork += eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"]) + eva.costNetwork += bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"]) + eva.costNetwork += bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"]) + eva.costNetwork += bwFactor * eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"]) eva.nodes[task.Job][node.ClientID]["Worker"]++ eva.racks[task.Job][node.Rack]["Worker"]++ @@ -89,11 +90,12 @@ func (eva *Evaluator) add(node NodeStatus, task Task) { } func (eva *Evaluator) remove(node NodeStatus, task Task) { + bwFactor := float64(task.BW) /* update network cost */ if task.IsPS { - eva.costNetwork -= eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"]) - eva.costNetwork -= eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"]) - eva.costNetwork -= eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"]) + eva.costNetwork -= bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"]) + eva.costNetwork -= bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"]) + eva.costNetwork -= bwFactor * eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"]) eva.nodes[task.Job][node.ClientID]["PS"]-- eva.racks[task.Job][node.Rack]["PS"]-- @@ -101,9 +103,9 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) { eva.totalPSs[task.Job]-- eva.totalPS-- } else { - eva.costNetwork -= eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"]) - eva.costNetwork -= eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"]) - eva.costNetwork -= eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"]) + eva.costNetwork -= bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"]) + eva.costNetwork -= bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"]) + eva.costNetwork -= bwFactor * eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"]) eva.nodes[task.Job][node.ClientID]["Worker"]-- eva.racks[task.Job][node.Rack]["Worker"]-- diff --git a/src/job_manager.go b/src/job_manager.go index 6bcbde1..f134849 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -181,7 +181,7 @@ func (jm *JobManager) start() { jm.statusMu.Lock() for task, pre := range jm.lastHeartBeat { if now-pre > 30 { - log.Warn(jm.job.Name, "-", task, " heartbeat longer tha 30s") + log.Warn(jm.job.Name, "-", task, " heartbeat longer than 30s") } } jm.statusMu.Unlock() diff --git a/src/logger.go b/src/logger.go index 8d50f19..9971a9a 100644 --- a/src/logger.go +++ b/src/logger.go @@ -25,7 +25,7 @@ func (logger *Logger) Debug(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, "<--"+module) + args = append(args, " <-- "+module) _log.Debug(args...) } @@ -36,7 +36,7 @@ func (logger *Logger) Info(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, "<--"+module) + args = append(args, " <-- "+module) _log.Info(args...) } @@ -47,7 +47,7 @@ func (logger *Logger) Warn(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, "<--"+module) + args = append(args, " <-- "+module) _log.Warn(args...) } @@ -58,7 +58,7 @@ func (logger *Logger) Fatal(args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, "<--"+module) + args = append(args, " <-- "+module) _log.Fatal(args...) } @@ -69,7 +69,7 @@ func (logger *Logger) Fatalf(format string, args ...interface{}) { if ok && details != nil { module = details.Name() } - args = append(args, "<--"+module) + args = append(args, " <-- "+module) _log.Fatalf(format, args...) } diff --git a/src/main.go b/src/main.go index c07c3b8..fdb35d4 100644 --- a/src/main.go +++ b/src/main.go @@ -96,9 +96,11 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { job.Name = job.Name + "-" job.Name += strconv.FormatInt(time.Now().UnixNano(), 10) job.Name += strconv.Itoa(10000 + rand.Intn(89999)) + bwWorker := InstanceOfOptimizer().PredictReq(job, "Worker").BW for i := range job.Tasks { job.Tasks[i].ID = job.Name + ":" + job.Tasks[i].Name job.Tasks[i].Job = job.Name + job.Tasks[i].BW = bwWorker } job.CreatedAt = int(time.Now().Unix()) msgSubmit.JobName = job.Name @@ -336,6 +338,11 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { } break + /* scheduler.preempt_enabled */ + case "scheduler.preempt_enabled": + ok = InstanceOfConfiguration().SetPreemptEnabled(value == "true") + break + /* allocator.strategy */ case "allocator.strategy": ok = InstanceOfAllocator().updateStrategy(value) diff --git a/src/message.go b/src/message.go index 03ec65d..14033cc 100644 --- a/src/message.go +++ b/src/message.go @@ -96,6 +96,7 @@ type MsgJobReq struct { UtilGPU int `json:"gpu_util"` MemGPU int `json:"gpu_mem"` BW int `json:"bw"` + Version int64 `json:"version"` } type MsgJobReqPredict struct { diff --git a/src/optimizer.go b/src/optimizer.go index 89d308d..3df533f 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -13,6 +13,7 @@ import ( type Optimizer struct { versions map[string]int + reqCache map[string]MsgJobReq cache map[string]OptimizerJobExecutionTime cacheMu sync.Mutex } @@ -28,6 +29,7 @@ func InstanceOfOptimizer() *Optimizer { optimizerInstance = &Optimizer{} optimizerInstance.versions = map[string]int{} optimizerInstance.cache = map[string]OptimizerJobExecutionTime{} + optimizerInstance.reqCache = map[string]MsgJobReq{} go func() { /* remove expired cache */ @@ -43,6 +45,15 @@ func InstanceOfOptimizer() *Optimizer { for _, k := range expired { delete(optimizerInstance.cache, k) } + expired = []string{} + for k, v := range optimizerInstance.reqCache { + if time.Now().Unix()-v.Version > 300 { + expired = append(expired, k) + } + } + for _, k := range expired { + delete(optimizerInstance.reqCache, k) + } optimizerInstance.cacheMu.Unlock() } }() @@ -430,7 +441,13 @@ func (optimizer *Optimizer) trainReq(jobName string) { } func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq { - res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0} + optimizer.cacheMu.Lock() + if val, ok := optimizer.reqCache[job.Name]; ok { + optimizer.cacheMu.Unlock() + return val + } + optimizer.cacheMu.Unlock() + res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 1} var jobName string str := strings.Split(job.Name, "-") @@ -535,10 +552,14 @@ func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq { if v, ok := tmp["gpu_mem"]; ok { res.MemGPU = int(math.Ceil(v/1024)) * 1024 } - if v, ok := tmp["bw"]; ok { - res.BW = int(math.Ceil(v/10)) * 10 + if v, ok := tmp["bw_rx"]; ok { + res.BW = int(math.Ceil(v / 100000)) } } + res.Version = time.Now().Unix() + optimizer.cacheMu.Lock() + optimizer.reqCache[job.Name] = res + optimizer.cacheMu.Unlock() return res } diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index a1bb5cf..763af34 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -17,7 +17,7 @@ type SchedulerPriority struct { schedulingJobs map[string]bool schedulingMu sync.Mutex - jobMasters map[string]*JobManager + jobMasters map[string]*JobManager enabled bool parallelism int } @@ -68,7 +68,7 @@ func (scheduler *SchedulerPriority) Start() { go func() { jm.start() }() - } else { + } else if InstanceOfConfiguration().PreemptEnabled { /* start preempt */ var jobs []Job preemptee := scheduler.queue[0] diff --git a/src/util.go b/src/util.go index 9792dcd..89c15d5 100644 --- a/src/util.go +++ b/src/util.go @@ -33,6 +33,7 @@ type Task struct { Memory int `json:"memory"` NumberGPU int `json:"gpu_number"` MemoryGPU int `json:"gpu_memory"` + BW int `json:"bw"` IsPS bool `json:"is_ps"` ModelGPU string `json:"gpu_model"` }