1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-04-30 13:08:08 +08:00
parent 8eb492964e
commit 913eb47a0d
3 changed files with 42 additions and 3 deletions

View File

@@ -70,6 +70,7 @@ func (jm *JobManager) start() {
v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m") v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m")
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU))
v.Set("network", network) v.Set("network", network)
v.Set("should_wait", "1")
resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
if err != nil { if err != nil {
@@ -106,7 +107,6 @@ func (jm *JobManager) start() {
} else if res.Status[i].Status == "running" { } else if res.Status[i].Status == "running" {
log.Debug(jm.job.Name, "-", i, " is running") log.Debug(jm.job.Name, "-", i, " is running")
flag = true flag = true
log.Info(jm.job.Tasks[i].IsPS)
if !jm.job.Tasks[i].IsPS { if !jm.job.Tasks[i].IsPS {
onlyPS = false onlyPS = false
} }
@@ -138,6 +138,7 @@ func (jm *JobManager) start() {
} }
if onlyPS { if onlyPS {
jm.stop() jm.stop()
log.Info("Only PS is running, stop", jm.job.Name)
break break
} }
if !flag { if !flag {
@@ -232,6 +233,6 @@ func (jm *JobManager) stop() MsgStop {
}() }()
jm.scheduler.UpdateProgress(jm.job.Name, Stopped) jm.scheduler.UpdateProgress(jm.job.Name, Stopped)
log.Info("kill job", jm.job.Name) log.Info("kill job, ", jm.job.Name)
return MsgStop{Code: 0} return MsgStop{Code: 0}
} }

View File

@@ -11,6 +11,9 @@ type Optimizer struct {
killedFlag bool killedFlag bool
predicts map[string]OptimizerJobExecutionTime predicts map[string]OptimizerJobExecutionTime
jobUtilsGPU map[string]int
versions map[string]int
} }
var optimizerInstance *Optimizer var optimizerInstance *Optimizer
@@ -23,6 +26,8 @@ func InstanceOfOptimizer() *Optimizer {
if optimizerInstance == nil { if optimizerInstance == nil {
optimizerInstance = &Optimizer{} optimizerInstance = &Optimizer{}
optimizerInstance.predicts = map[string]OptimizerJobExecutionTime{} optimizerInstance.predicts = map[string]OptimizerJobExecutionTime{}
optimizerInstance.jobUtilsGPU = map[string]int{}
optimizerInstance.versions = map[string]int{}
} }
return optimizerInstance return optimizerInstance
} }
@@ -30,11 +35,25 @@ func InstanceOfOptimizer() *Optimizer {
func (optimizer *Optimizer) feed(job string, utils []int) { func (optimizer *Optimizer) feed(job string, utils []int) {
log.Info("optimizer feed") log.Info("optimizer feed")
log.Info(job, utils) log.Info(job, utils)
log.Info(optimizer.jobUtilsGPU)
log.Info(optimizer.predicts)
go func() { go func() {
str := strings.Split(job, "-") str := strings.Split(job, "-")
if len(str) == 2 { if len(str) == 2 {
preCnt := 0 preCnt := 0
sum := 0
for i := 0; i < len(utils); i++ {
sum += utils[i]
}
last := 0
if t, err := optimizer.jobUtilsGPU[job]; !err {
last = t
}
optimizer.jobUtilsGPU[job] = (optimizer.versions[job]*last + sum/len(utils)) / (optimizer.versions[job] + 1)
optimizer.versions[job]++
for i := 0; i < len(utils); i++ { for i := 0; i < len(utils); i++ {
if utils[i] > 15 { if utils[i] > 15 {
break break
@@ -43,7 +62,7 @@ func (optimizer *Optimizer) feed(job string, utils []int) {
} }
postCnt := 0 postCnt := 0
for i := len(utils)-1; i >= 0; i-- { for i := len(utils) - 1; i >= 0; i-- {
if utils[i] > 15 { if utils[i] > 15 {
break break
} }
@@ -57,7 +76,25 @@ func (optimizer *Optimizer) feed(job string, utils []int) {
predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1) predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1)
predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1) predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1)
predict.Total = ((predict.Total * predict.Version) + len(utils)) / (predict.Version + 1) predict.Total = ((predict.Total * predict.Version) + len(utils)) / (predict.Version + 1)
predict.Main = predict.Total - predict.Pre - predict.Post
predict.Version++ predict.Version++
} }
}() }()
} }
func (optimizer *Optimizer) predictTime(job string, utils []int) (int, bool) {
if _, err := optimizer.jobUtilsGPU[job]; err {
return 100, false
}
if optimizer.versions[job] > 5 {
return optimizer.jobUtilsGPU[job], true
}
return optimizer.jobUtilsGPU[job], false
}
func (optimizer *Optimizer) predictUtilGPU(job string) (OptimizerJobExecutionTime, bool) {
if _, err := optimizer.predicts[job]; err {
return OptimizerJobExecutionTime{}, false
}
return optimizer.predicts[job], optimizer.predicts[job].Version > 5
}

View File

@@ -170,6 +170,7 @@ type OptimizerJobExecutionTime struct {
Pre int `json:"pre"` Pre int `json:"pre"`
Post int `json:"post"` Post int `json:"post"`
Total int `json:"total"` Total int `json:"total"`
Main int `json:"main"`
Version int `json:"version"` Version int `json:"version"`
} }