1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 22:01:55 +00:00

consider bw when allocatig, enable preempt switcher

This commit is contained in:
Newnius 2020-08-12 21:09:18 +08:00
parent 00f69969ce
commit 7c6a3ed987
9 changed files with 68 additions and 23 deletions

View File

@ -21,6 +21,7 @@ type Configuration struct {
PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds of schedule ahead except pre+post */ PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds of schedule ahead except pre+post */
PreScheduleTimeout int `json:"PreScheduleTimeout"` PreScheduleTimeout int `json:"PreScheduleTimeout"`
JobMaxRetries int `json:"scheduler.job_max_retries"` JobMaxRetries int `json:"scheduler.job_max_retries"`
PreemptEnabled bool `json:"scheduler.preempt_enabled"`
mock bool mock bool
mu sync.Mutex mu sync.Mutex
@ -53,6 +54,7 @@ func InstanceOfConfiguration() *Configuration {
EnablePreScheduleRatio: 1.5, EnablePreScheduleRatio: 1.5,
PreScheduleExtraTime: 15, PreScheduleExtraTime: 15,
JobMaxRetries: 0, JobMaxRetries: 0,
PreemptEnabled: false,
} }
} }
return configurationInstance return configurationInstance
@ -124,6 +126,8 @@ func (config *Configuration) InitFromEnv() {
configurationInstance.JobMaxRetries = val configurationInstance.JobMaxRetries = val
} }
} }
value = os.Getenv("scheduler.preempt_enabled")
configurationInstance.PreemptEnabled = value == "true"
} }
func (config *Configuration) SetMockEnabled(enabled bool) bool { func (config *Configuration) SetMockEnabled(enabled bool) bool {
@ -166,6 +170,14 @@ func (config *Configuration) SetJobMaxRetries(value int) bool {
return true return true
} }
func (config *Configuration) SetPreemptEnabled(enabled bool) bool {
config.mu.Lock()
defer config.mu.Unlock()
config.PreemptEnabled = enabled
log.Info("scheduler.preempt_enabled is set to ", enabled)
return true
}
func (config *Configuration) Dump() map[string]interface{} { func (config *Configuration) Dump() map[string]interface{} {
config.mu.Lock() config.mu.Lock()
defer config.mu.Unlock() defer config.mu.Unlock()
@ -183,6 +195,7 @@ func (config *Configuration) Dump() map[string]interface{} {
res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio
res["PreScheduleExtraTime"] = config.PreScheduleExtraTime res["PreScheduleExtraTime"] = config.PreScheduleExtraTime
res["PreScheduleTimeout"] = config.PreScheduleTimeout res["PreScheduleTimeout"] = config.PreScheduleTimeout
res["scheduler.preempt_enabled"] = config.PreemptEnabled
res["logger.level"] = log.LoggerLevel res["logger.level"] = log.LoggerLevel
res["logger.modules_disabled"] = log.LoggerModuleDisabled res["logger.modules_disabled"] = log.LoggerModuleDisabled
return res return res

View File

@ -55,10 +55,11 @@ func (eva *Evaluator) add(node NodeStatus, task Task) {
if _, ok := eva.domains[task.Job][node.Domain]; !ok { if _, ok := eva.domains[task.Job][node.Domain]; !ok {
eva.domains[task.Job][node.Domain] = map[string]int{"PS": 0, "Worker": 0} eva.domains[task.Job][node.Domain] = map[string]int{"PS": 0, "Worker": 0}
} }
bwFactor := float64(task.BW)
if task.IsPS { if task.IsPS {
eva.costNetwork += eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"]) eva.costNetwork += bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"])
eva.costNetwork += eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"]) eva.costNetwork += bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"])
eva.costNetwork += eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"]) eva.costNetwork += bwFactor * eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"])
eva.nodes[task.Job][node.ClientID]["PS"]++ eva.nodes[task.Job][node.ClientID]["PS"]++
eva.racks[task.Job][node.Rack]["PS"]++ eva.racks[task.Job][node.Rack]["PS"]++
@ -66,9 +67,9 @@ func (eva *Evaluator) add(node NodeStatus, task Task) {
eva.totalPSs[task.Job]++ eva.totalPSs[task.Job]++
eva.totalPS++ eva.totalPS++
} else { } else {
eva.costNetwork += eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"]) eva.costNetwork += bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"])
eva.costNetwork += eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"]) eva.costNetwork += bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"])
eva.costNetwork += eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"]) eva.costNetwork += bwFactor * eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"])
eva.nodes[task.Job][node.ClientID]["Worker"]++ eva.nodes[task.Job][node.ClientID]["Worker"]++
eva.racks[task.Job][node.Rack]["Worker"]++ eva.racks[task.Job][node.Rack]["Worker"]++
@ -89,11 +90,12 @@ func (eva *Evaluator) add(node NodeStatus, task Task) {
} }
func (eva *Evaluator) remove(node NodeStatus, task Task) { func (eva *Evaluator) remove(node NodeStatus, task Task) {
bwFactor := float64(task.BW)
/* update network cost */ /* update network cost */
if task.IsPS { if task.IsPS {
eva.costNetwork -= eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"]) eva.costNetwork -= bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"])
eva.costNetwork -= eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"]) eva.costNetwork -= bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"])
eva.costNetwork -= eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"]) eva.costNetwork -= bwFactor * eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"])
eva.nodes[task.Job][node.ClientID]["PS"]-- eva.nodes[task.Job][node.ClientID]["PS"]--
eva.racks[task.Job][node.Rack]["PS"]-- eva.racks[task.Job][node.Rack]["PS"]--
@ -101,9 +103,9 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) {
eva.totalPSs[task.Job]-- eva.totalPSs[task.Job]--
eva.totalPS-- eva.totalPS--
} else { } else {
eva.costNetwork -= eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"]) eva.costNetwork -= bwFactor * eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"])
eva.costNetwork -= eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"]) eva.costNetwork -= bwFactor * eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"])
eva.costNetwork -= eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"]) eva.costNetwork -= bwFactor * eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"])
eva.nodes[task.Job][node.ClientID]["Worker"]-- eva.nodes[task.Job][node.ClientID]["Worker"]--
eva.racks[task.Job][node.Rack]["Worker"]-- eva.racks[task.Job][node.Rack]["Worker"]--

View File

@ -181,7 +181,7 @@ func (jm *JobManager) start() {
jm.statusMu.Lock() jm.statusMu.Lock()
for task, pre := range jm.lastHeartBeat { for task, pre := range jm.lastHeartBeat {
if now-pre > 30 { if now-pre > 30 {
log.Warn(jm.job.Name, "-", task, " heartbeat longer tha 30s") log.Warn(jm.job.Name, "-", task, " heartbeat longer than 30s")
} }
} }
jm.statusMu.Unlock() jm.statusMu.Unlock()

View File

@ -25,7 +25,7 @@ func (logger *Logger) Debug(args ...interface{}) {
if ok && details != nil { if ok && details != nil {
module = details.Name() module = details.Name()
} }
args = append(args, "<--"+module) args = append(args, " <-- "+module)
_log.Debug(args...) _log.Debug(args...)
} }
@ -36,7 +36,7 @@ func (logger *Logger) Info(args ...interface{}) {
if ok && details != nil { if ok && details != nil {
module = details.Name() module = details.Name()
} }
args = append(args, "<--"+module) args = append(args, " <-- "+module)
_log.Info(args...) _log.Info(args...)
} }
@ -47,7 +47,7 @@ func (logger *Logger) Warn(args ...interface{}) {
if ok && details != nil { if ok && details != nil {
module = details.Name() module = details.Name()
} }
args = append(args, "<--"+module) args = append(args, " <-- "+module)
_log.Warn(args...) _log.Warn(args...)
} }
@ -58,7 +58,7 @@ func (logger *Logger) Fatal(args ...interface{}) {
if ok && details != nil { if ok && details != nil {
module = details.Name() module = details.Name()
} }
args = append(args, "<--"+module) args = append(args, " <-- "+module)
_log.Fatal(args...) _log.Fatal(args...)
} }
@ -69,7 +69,7 @@ func (logger *Logger) Fatalf(format string, args ...interface{}) {
if ok && details != nil { if ok && details != nil {
module = details.Name() module = details.Name()
} }
args = append(args, "<--"+module) args = append(args, " <-- "+module)
_log.Fatalf(format, args...) _log.Fatalf(format, args...)
} }

View File

@ -96,9 +96,11 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
job.Name = job.Name + "-" job.Name = job.Name + "-"
job.Name += strconv.FormatInt(time.Now().UnixNano(), 10) job.Name += strconv.FormatInt(time.Now().UnixNano(), 10)
job.Name += strconv.Itoa(10000 + rand.Intn(89999)) job.Name += strconv.Itoa(10000 + rand.Intn(89999))
bwWorker := InstanceOfOptimizer().PredictReq(job, "Worker").BW
for i := range job.Tasks { for i := range job.Tasks {
job.Tasks[i].ID = job.Name + ":" + job.Tasks[i].Name job.Tasks[i].ID = job.Name + ":" + job.Tasks[i].Name
job.Tasks[i].Job = job.Name job.Tasks[i].Job = job.Name
job.Tasks[i].BW = bwWorker
} }
job.CreatedAt = int(time.Now().Unix()) job.CreatedAt = int(time.Now().Unix())
msgSubmit.JobName = job.Name msgSubmit.JobName = job.Name
@ -336,6 +338,11 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
} }
break break
/* scheduler.preempt_enabled */
case "scheduler.preempt_enabled":
ok = InstanceOfConfiguration().SetPreemptEnabled(value == "true")
break
/* allocator.strategy */ /* allocator.strategy */
case "allocator.strategy": case "allocator.strategy":
ok = InstanceOfAllocator().updateStrategy(value) ok = InstanceOfAllocator().updateStrategy(value)

View File

@ -96,6 +96,7 @@ type MsgJobReq struct {
UtilGPU int `json:"gpu_util"` UtilGPU int `json:"gpu_util"`
MemGPU int `json:"gpu_mem"` MemGPU int `json:"gpu_mem"`
BW int `json:"bw"` BW int `json:"bw"`
Version int64 `json:"version"`
} }
type MsgJobReqPredict struct { type MsgJobReqPredict struct {

View File

@ -13,6 +13,7 @@ import (
type Optimizer struct { type Optimizer struct {
versions map[string]int versions map[string]int
reqCache map[string]MsgJobReq
cache map[string]OptimizerJobExecutionTime cache map[string]OptimizerJobExecutionTime
cacheMu sync.Mutex cacheMu sync.Mutex
} }
@ -28,6 +29,7 @@ func InstanceOfOptimizer() *Optimizer {
optimizerInstance = &Optimizer{} optimizerInstance = &Optimizer{}
optimizerInstance.versions = map[string]int{} optimizerInstance.versions = map[string]int{}
optimizerInstance.cache = map[string]OptimizerJobExecutionTime{} optimizerInstance.cache = map[string]OptimizerJobExecutionTime{}
optimizerInstance.reqCache = map[string]MsgJobReq{}
go func() { go func() {
/* remove expired cache */ /* remove expired cache */
@ -43,6 +45,15 @@ func InstanceOfOptimizer() *Optimizer {
for _, k := range expired { for _, k := range expired {
delete(optimizerInstance.cache, k) delete(optimizerInstance.cache, k)
} }
expired = []string{}
for k, v := range optimizerInstance.reqCache {
if time.Now().Unix()-v.Version > 300 {
expired = append(expired, k)
}
}
for _, k := range expired {
delete(optimizerInstance.reqCache, k)
}
optimizerInstance.cacheMu.Unlock() optimizerInstance.cacheMu.Unlock()
} }
}() }()
@ -430,7 +441,13 @@ func (optimizer *Optimizer) trainReq(jobName string) {
} }
func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq { func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq {
res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0} optimizer.cacheMu.Lock()
if val, ok := optimizer.reqCache[job.Name]; ok {
optimizer.cacheMu.Unlock()
return val
}
optimizer.cacheMu.Unlock()
res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 1}
var jobName string var jobName string
str := strings.Split(job.Name, "-") str := strings.Split(job.Name, "-")
@ -535,10 +552,14 @@ func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq {
if v, ok := tmp["gpu_mem"]; ok { if v, ok := tmp["gpu_mem"]; ok {
res.MemGPU = int(math.Ceil(v/1024)) * 1024 res.MemGPU = int(math.Ceil(v/1024)) * 1024
} }
if v, ok := tmp["bw"]; ok { if v, ok := tmp["bw_rx"]; ok {
res.BW = int(math.Ceil(v/10)) * 10 res.BW = int(math.Ceil(v / 100000))
} }
} }
res.Version = time.Now().Unix()
optimizer.cacheMu.Lock()
optimizer.reqCache[job.Name] = res
optimizer.cacheMu.Unlock()
return res return res
} }

View File

@ -17,7 +17,7 @@ type SchedulerPriority struct {
schedulingJobs map[string]bool schedulingJobs map[string]bool
schedulingMu sync.Mutex schedulingMu sync.Mutex
jobMasters map[string]*JobManager jobMasters map[string]*JobManager
enabled bool enabled bool
parallelism int parallelism int
} }
@ -68,7 +68,7 @@ func (scheduler *SchedulerPriority) Start() {
go func() { go func() {
jm.start() jm.start()
}() }()
} else { } else if InstanceOfConfiguration().PreemptEnabled {
/* start preempt */ /* start preempt */
var jobs []Job var jobs []Job
preemptee := scheduler.queue[0] preemptee := scheduler.queue[0]

View File

@ -33,6 +33,7 @@ type Task struct {
Memory int `json:"memory"` Memory int `json:"memory"`
NumberGPU int `json:"gpu_number"` NumberGPU int `json:"gpu_number"`
MemoryGPU int `json:"gpu_memory"` MemoryGPU int `json:"gpu_memory"`
BW int `json:"bw"`
IsPS bool `json:"is_ps"` IsPS bool `json:"is_ps"`
ModelGPU string `json:"gpu_model"` ModelGPU string `json:"gpu_model"`
} }