1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
This commit is contained in:
Newnius 2020-06-29 23:24:33 +08:00
parent 769fa702f3
commit 1f32eeea40
7 changed files with 332 additions and 353 deletions

View File

@ -3,6 +3,7 @@
## API ## API
#### ResourcePool
**GetHeartCounter** **GetHeartCounter**
``` ```
@ -23,6 +24,7 @@ GPU is occupied by which job(s)
?action=get_bindings ?action=get_bindings
``` ```
#### Scheduler
**EnableSchedule** **EnableSchedule**
``` ```
?action=debug_enable ?action=debug_enable
@ -62,21 +64,6 @@ GPU is occupied by which job(s)
?action=debug_update_enable_pre_schedule_ratio&ratio=0.95 ?action=debug_update_enable_pre_schedule_ratio&ratio=0.95
``` ```
**FeedDLData**
```
?action=debug_optimizer_feed_dl&job=lstm&seq=1&value=2
```
**TrainDL**
```
?action=debug_optimizer_train_dl&job=lstm
```
**PredictDL**
```
?action=debug_get_predict_dl&job=lstm&seq=1
```
**UpdateAllocateStrategy** **UpdateAllocateStrategy**
``` ```
?action=allocator_update_strategy&strategy=bestfit ?action=allocator_update_strategy&strategy=bestfit

View File

@ -54,12 +54,14 @@ func (jm *JobManager) start() {
if InstanceOfConfiguration().mock { if InstanceOfConfiguration().mock {
jm.scheduler.UpdateProgress(jm.job, Running) jm.scheduler.UpdateProgress(jm.job, Running)
jm.job.Status = Running
jm.isRunning = false jm.isRunning = false
duration := InstanceOfMocker().GetDuration(jm.job, jm.resources) duration := InstanceOfMocker().GetDuration(jm.job, jm.resources)
log.Info("mock ", jm.job.Name, ", wait ", duration) log.Info("mock ", jm.job.Name, ", wait ", duration)
time.Sleep(time.Second * time.Duration(duration)) time.Sleep(time.Second * time.Duration(duration))
jm.returnResource([]TaskStatus{}) jm.returnResource([]TaskStatus{})
jm.scheduler.UpdateProgress(jm.job, Finished) jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
log.Info("JobMaster exited ", jm.job.Name) log.Info("JobMaster exited ", jm.job.Name)
return return
} }
@ -67,6 +69,7 @@ func (jm *JobManager) start() {
if !jm.killFlag { if !jm.killFlag {
/* switch to Running state */ /* switch to Running state */
jm.scheduler.UpdateProgress(jm.job, Running) jm.scheduler.UpdateProgress(jm.job, Running)
jm.job.Status = Running
/* bring up containers */ /* bring up containers */
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -149,7 +152,7 @@ func (jm *JobManager) start() {
stats = append(stats, stat) stats = append(stats, stat)
} }
} }
InstanceOfOptimizer().feedStats(jm.job, "PS", stats) InstanceOfOptimizer().FeedStats(jm.job, "PS", stats)
stats = [][]TaskStatus{} stats = [][]TaskStatus{}
for _, vals := range jm.stats { for _, vals := range jm.stats {
var stat []TaskStatus var stat []TaskStatus
@ -164,7 +167,7 @@ func (jm *JobManager) start() {
} }
//log.Info(jm.stats) //log.Info(jm.stats)
//log.Info(stats) //log.Info(stats)
InstanceOfOptimizer().feedStats(jm.job, "Worker", stats) InstanceOfOptimizer().FeedStats(jm.job, "Worker", stats)
jm.returnResource(jm.status().Status) jm.returnResource(jm.status().Status)
log.Info("JobMaster exited ", jm.job.Name) log.Info("JobMaster exited ", jm.job.Name)
} }
@ -246,11 +249,13 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
jm.stop(false) jm.stop(false)
jm.killFlag = true jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Failed) jm.scheduler.UpdateProgress(jm.job, Failed)
jm.job.Status = Failed
} else if !jm.killFlag { } else if !jm.killFlag {
log.Info("Some instance exited, close others") log.Info("Some instance exited, close others")
jm.stop(false) jm.stop(false)
jm.killFlag = true jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Finished) jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
} }
if jm.resources[i].ClientID != "_released_" { if jm.resources[i].ClientID != "_released_" {
@ -271,10 +276,12 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
jm.stop(false) jm.stop(false)
jm.killFlag = true jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Finished) jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
} }
if !flagRunning && !jm.killFlag { if !flagRunning && !jm.killFlag {
jm.scheduler.UpdateProgress(jm.job, Finished) jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
log.Info("finish job ", jm.job.Name) log.Info("finish job ", jm.job.Name)
} }
@ -320,7 +327,7 @@ func (jm *JobManager) logs(taskName string) MsgLog {
func (jm *JobManager) status() MsgJobStatus { func (jm *JobManager) status() MsgJobStatus {
var tasksStatus []TaskStatus var tasksStatus []TaskStatus
for range jm.job.Tasks { //append would cause uncertain order for range jm.job.Tasks { //append would cause uncertain order
tasksStatus = append(tasksStatus, TaskStatus{}) tasksStatus = append(tasksStatus, TaskStatus{TimeStamp: time.Now().Unix()})
} }
for i, task := range jm.job.Tasks { for i, task := range jm.job.Tasks {
@ -415,6 +422,7 @@ func (jm *JobManager) stop(force bool) MsgStop {
if force { if force {
jm.killFlag = true jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Stopped) jm.scheduler.UpdateProgress(jm.job, Stopped)
jm.job.Status = Stopped
log.Info("kill job, ", jm.job.Name) log.Info("kill job, ", jm.job.Name)
} }
}() }()

View File

@ -23,4 +23,5 @@ type TaskStatus struct {
UtilGPU int `json:"gpu_util"` UtilGPU int `json:"gpu_util"`
UtilMemGPU int `json:"gpu_mem_util"` UtilMemGPU int `json:"gpu_mem_util"`
MemGPU int `json:"gpu_mem"` MemGPU int `json:"gpu_mem"`
TimeStamp int64 `json:"timestamp"`
} }

View File

@ -81,6 +81,28 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js) w.Write(js)
break break
case "job_predict_time":
log.Debug("job_predict_time")
var job Job
err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job)
msgJobReq := MsgOptimizerPredict{Code: 0}
if err != nil {
msgJobReq.Code = 1
msgJobReq.Error = err.Error()
} else {
msg := InstanceOfOptimizer().PredictTime(job)
msgJobReq.Pre = msg.Pre
msgJobReq.Post = msg.Post
msgJobReq.Total = msg.Total
}
js, err := json.Marshal(msgJobReq)
if err != nil {
log.Warn(err)
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "job_stop": case "job_stop":
log.Debug("job_stop") log.Debug("job_stop")
js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id")))) js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id"))))
@ -248,69 +270,6 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js) w.Write(js)
break break
case "debug_get_predicts":
log.Debug("debug_get_predicts")
js, _ := json.Marshal(InstanceOfOptimizer().getAllPredicts())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_get_gpu_utils":
log.Debug("debug_get_gpu_utils")
js, _ := json.Marshal(InstanceOfOptimizer().getAllGPUUtils())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_optimizer_feed_dl":
log.Debug("debug_optimizer_feed_dl")
var job string
var seq int
var value int
job = r.URL.Query().Get("job")
if t, err := strconv.Atoi(r.URL.Query().Get("seq")); err == nil {
seq = t
}
if t, err := strconv.Atoi(r.URL.Query().Get("value")); err == nil {
value = t
}
InstanceOfOptimizer().feedData(job, seq, 0, 0, 0, value)
js, _ := json.Marshal(OptimizerJobExecutionTime{})
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_optimizer_describe_job":
log.Debug("debug_optimizer_describe_job")
var job string
job = r.URL.Query().Get("job")
js, _ := json.Marshal(InstanceOfOptimizer().describe(job))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_optimizer_train_dl":
log.Debug("debug_optimizer_train_dl")
InstanceOfOptimizer().train(r.URL.Query().Get("job"))
js, _ := json.Marshal(OptimizerJobExecutionTime{})
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_get_predict_dl":
log.Debug("debug_get_predict_dl")
if seq, err := strconv.Atoi(r.URL.Query().Get("seq")); err == nil {
est, _ := InstanceOfOptimizer().predict(r.URL.Query().Get("job"), seq)
js, _ := json.Marshal(est)
w.Header().Set("Content-Type", "application/json")
w.Write(js)
} else {
js, _ := json.Marshal(OptimizerJobExecutionTime{})
w.Header().Set("Content-Type", "application/json")
w.Write(js)
}
break
case "allocator_update_strategy": case "allocator_update_strategy":
log.Debug("allocator_update_strategy") log.Debug("allocator_update_strategy")
strategy := r.URL.Query().Get("strategy") strategy := r.URL.Query().Get("strategy")
@ -389,7 +348,7 @@ func main() {
InstanceOfResourcePool().init(config) InstanceOfResourcePool().init(config)
InstanceOfCollector().init(config) InstanceOfCollector().init(config)
InstanceJobHistoryLogger().init(config) InstanceJobHistoryLogger().init(config)
InstanceOfOptimizer().init(config) InstanceOfOptimizer().Init(config)
InstanceOfGroupManager().init(config) InstanceOfGroupManager().init(config)
switch config.SchedulerPolicy { switch config.SchedulerPolicy {

View File

@ -7,23 +7,11 @@ import (
"io/ioutil" "io/ioutil"
"strconv" "strconv"
"encoding/json" "encoding/json"
"time"
"math" "math"
"hash/fnv" "hash/fnv"
) )
type Optimizer struct { type Optimizer struct {
scheduler Scheduler
killedFlag bool
predicts map[string]*OptimizerJobExecutionTime
jobUtilsGPU map[string]*OptimizerUtilGPU
cache map[string]*OptimizerJobExecutionTime
stats map[string]map[string]float64
versions map[string]int versions map[string]int
} }
@ -36,20 +24,228 @@ func InstanceOfOptimizer() *Optimizer {
if optimizerInstance == nil { if optimizerInstance == nil {
optimizerInstance = &Optimizer{} optimizerInstance = &Optimizer{}
optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{}
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{}
optimizerInstance.stats = map[string]map[string]float64{}
optimizerInstance.versions = map[string]int{} optimizerInstance.versions = map[string]int{}
} }
return optimizerInstance return optimizerInstance
} }
func (optimizer *Optimizer) init(conf Configuration) { func (optimizer *Optimizer) Init(conf Configuration) {
log.Info("optimizer started") log.Info("optimizer started")
} }
func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus) { func (optimizer *Optimizer) FeedTime(job Job, stats [][]TaskStatus) {
log.Info("optimizer feedTime", job)
if len(stats) == 0 || len(job.Tasks) != 1 {
return
}
go func() {
str := strings.Split(job.Name, "-")
if len(str) == 2 {
jobName := str[0]
var UtilGPUs []UtilGPUTimeSeries
for _, stat := range stats {
for _, task := range stat {
UtilGPUs = append(UtilGPUs, UtilGPUTimeSeries{Time: task.TimeStamp, Util: task.UtilGPU})
}
}
var preTime int64
for i := 0; i < len(UtilGPUs); i++ {
if UtilGPUs[i].Util > 15 {
preTime = UtilGPUs[i].Time - UtilGPUs[0].Time
break
}
}
var postTime int64
for i := len(UtilGPUs) - 1; i >= 0; i-- {
if UtilGPUs[i].Util > 15 {
postTime = UtilGPUs[len(UtilGPUs)-1].Time - UtilGPUs[i].Time
break
}
}
totalTime := UtilGPUs[len(UtilGPUs)-1].Time - UtilGPUs[0].Time
if preTime+postTime >= totalTime { /* in case GPU is not used */
preTime /= 2
postTime /= 2
}
tmp := map[string]float64{
"pre": float64(preTime),
"post": float64(postTime),
"total": float64(totalTime),
}
labels, _ := json.Marshal(tmp)
cmd := job.Tasks[0].Cmd
params := map[string]int{}
exceptions := map[string]bool{}
exceptions["train_dir"] = true
exceptions["variable_update"] = true
exceptions["ps_hosts"] = true
exceptions["worker_hosts"] = true
exceptions["task_index"] = true
exceptions["job_name"] = true
pairs := strings.Split(cmd, " ")
for _, pair := range pairs {
v := strings.Split(pair, "=")
if len(v) == 2 && v[0][:2] == "--" {
var param string
var value int
param = v[0][2:]
if val, err := strconv.Atoi(v[1]); err == nil {
value = val
} else {
h := fnv.New32a()
h.Write([]byte(v[1]))
value = int(h.Sum32())
}
if _, ok := exceptions[param]; !ok {
params[param] = value
}
}
}
params["cpu"] = job.Tasks[0].NumberCPU
params["mem"] = job.Tasks[0].Memory
params["gpu"] = job.Tasks[0].NumberGPU
params["gpu_mem"] = job.Tasks[0].MemoryGPU
//log.Info(job.Name, params)
features, _ := json.Marshal(params)
spider := Spider{}
spider.Method = "GET"
spider.URL = "http://yao-optimizer:8080/feed?job=" + jobName + ":time" + "&features=" + string(features) + "&labels=" + string(labels)
err := spider.do()
if err != nil {
log.Warn(err)
return
}
resp := spider.getResponse()
if _, err := ioutil.ReadAll(resp.Body); err != nil {
log.Warn(err)
}
resp.Body.Close()
if err != nil {
log.Warn(err)
return
}
if optimizer.versions[jobName]%3 == 0 {
optimizer.trainReq(jobName)
}
}
}()
}
func (optimizer *Optimizer) trainTime(jobName string) {
spider := Spider{}
spider.Method = "GET"
params := "job=" + jobName + ":time"
spider.URL = "http://yao-optimizer:8080/train?" + params
err := spider.do()
if err != nil {
return
}
resp := spider.getResponse()
if _, err := ioutil.ReadAll(resp.Body); err != nil {
log.Warn(err)
}
resp.Body.Close()
if err != nil {
return
}
}
func (optimizer *Optimizer) PredictTime(job Job) OptimizerJobExecutionTime {
res := OptimizerJobExecutionTime{Pre: 0, Post: 0, Total: math.MaxInt64}
var jobName string
str := strings.Split(job.Name, "-")
if len(str) == 2 {
jobName = str[0]
}
cmd := job.Tasks[0].Cmd
params := map[string]int{}
exceptions := map[string]bool{}
exceptions["train_dir"] = true
exceptions["variable_update"] = true
exceptions["ps_hosts"] = true
exceptions["worker_hosts"] = true
exceptions["task_index"] = true
exceptions["job_name"] = true
pairs := strings.Split(cmd, " ")
for _, pair := range pairs {
v := strings.Split(pair, "=")
if len(v) == 2 && v[0][:2] == "--" {
var param string
var value int
param = v[0][2:]
if val, err := strconv.Atoi(v[1]); err == nil {
value = val
} else {
h := fnv.New32a()
h.Write([]byte(v[1]))
value = int(h.Sum32())
}
if _, ok := exceptions[param]; !ok {
params[param] = value
}
}
}
params["cpu"] = job.Tasks[0].NumberCPU
params["mem"] = job.Tasks[0].Memory
params["gpu"] = job.Tasks[0].NumberGPU
params["gpu_mem"] = job.Tasks[0].MemoryGPU
//log.Info(job.Name, params)
features, _ := json.Marshal(params)
spider := Spider{}
spider.Method = "GET"
spider.URL = "http://yao-optimizer:8080/predict?job=" + jobName + ":time" + "&features=" + string(features)
err := spider.do()
if err != nil {
return res
}
resp := spider.getResponse()
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Warn(err)
return res
}
var msg MsgJobReqPredict
err = json.Unmarshal([]byte(string(body)), &msg)
if err == nil && msg.Code == 0 {
tmp := msg.Labels
if v, ok := tmp["pre"]; ok {
res.Pre = int(math.Ceil(v / 100))
}
if v, ok := tmp["post"]; ok {
res.Post = int(math.Ceil(v/1024)) * 1024
}
if v, ok := tmp["total"]; ok {
res.Total = int(math.Ceil(v)/10) * 10
}
}
return res
}
func (optimizer *Optimizer) FeedStats(job Job, role string, stats [][]TaskStatus) {
if len(stats) == 0 { if len(stats) == 0 {
return return
} }
@ -170,191 +366,15 @@ func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus
optimizer.versions[jobName]++ optimizer.versions[jobName]++
if optimizer.versions[jobName]%3 == 0 { if optimizer.versions[jobName]%3 == 0 {
optimizer.train(jobName) optimizer.trainReq(jobName)
} }
}() }()
} }
func (optimizer *Optimizer) max(values []float64) float64 { func (optimizer *Optimizer) trainReq(jobName string) {
value := 0.0
for _, v := range values {
if v > value {
value = v
}
}
return value
}
func (optimizer *Optimizer) mean(values []float64) float64 {
sum := 0.0
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func (optimizer *Optimizer) std(values []float64) float64 {
mean := optimizer.mean(values)
std := 0.0
for j := 0; j < len(values); j++ {
// The use of Pow math function func Pow(x, y float64) float64
std += math.Pow(values[j]-mean, 2)
}
// The use of Sqrt math function func Sqrt(x float64) float64
std = math.Sqrt(std / float64(len(values)))
return std
}
func (optimizer *Optimizer) describe(job string) map[string]float64 {
if stat, ok := optimizer.stats[job]; ok {
return stat
}
return map[string]float64{}
}
func (optimizer *Optimizer) feed3(job string, utils []UtilGPUTimeSeries) {
log.Info("optimizer feed ", job)
//log.Info(job, utils)
if len(utils) == 0 {
return
}
go func() {
str := strings.Split(job, "-")
if len(str) == 2 {
jobName := str[0]
sum := 0
for i := 0; i < len(utils); i++ {
sum += utils[i].Util
}
sum /= len(utils)
if _, ok := optimizer.jobUtilsGPU[jobName]; !ok {
optimizer.jobUtilsGPU[jobName] = &OptimizerUtilGPU{}
}
t := optimizer.jobUtilsGPU[jobName]
t.Util = (t.Version*t.Util + sum) / (t.Version + 1)
t.Version++
preTime := 0
for i := 0; i < len(utils); i++ {
if utils[i].Util > 15 {
preTime = utils[i].Time - utils[0].Time
break
}
}
postTime := 0
for i := len(utils) - 1; i >= 0; i-- {
if utils[i].Util > 15 {
postTime = utils[len(utils)-1].Time - utils[i].Time
break
}
}
if _, ok := optimizer.predicts[jobName]; !ok {
optimizer.predicts[jobName] = &OptimizerJobExecutionTime{}
}
totalTime := utils[len(utils)-1].Time - utils[0].Time
predict := optimizer.predicts[jobName]
if predict.Version == 0 {
predict.Pre = preTime
predict.Post = postTime
predict.Total = totalTime
predict.Main = predict.Total - predict.Pre - predict.Post
if predict.Main < 0 {
predict.Main = 0
}
}
predict.Pre = (predict.Pre*95 + preTime*5) / 100
predict.Post = (predict.Post*95 + postTime*5) / 100
predict.Total = (predict.Total*95 + totalTime*5) / 100
predict.Main = predict.Total - predict.Pre - predict.Post
if predict.Main < 0 {
predict.Main = 0
}
predict.Version++
optimizer.feedData(jobName, predict.Version, 0, 0, 0, predict.Total)
if predict.Version%10 == 0 && predict.Version > 30 {
optimizer.train(jobName)
}
}
}()
}
func (optimizer *Optimizer) predictUtilGPU(job string) (int, bool) {
str := strings.Split(job, "-")
if len(str) == 2 {
jobName := str[0]
if _, ok := optimizer.jobUtilsGPU[jobName]; ok {
return optimizer.jobUtilsGPU[jobName].Util, optimizer.jobUtilsGPU[jobName].Version >= 5
}
}
return 100, false
}
func (optimizer *Optimizer) predictTime(job string) (*OptimizerJobExecutionTime, bool) {
str := strings.Split(job, "-")
if len(str) == 2 {
jobName := str[0]
if est, ok := optimizer.cache[jobName]; ok && est.Version > (int)(time.Now().Unix())-300 {
return est, true
}
if est, ok := optimizer.predicts[jobName]; ok {
if est.Version > 40 {
if est2, ok := optimizer.predict(jobName, est.Version); ok {
est2.Pre = est.Pre * est2.Total / est.Total
est2.Main = est.Main * est2.Total / est.Total
est2.Post = est.Post * est2.Total / est.Total
est2.Version = (int)(time.Now().Unix())
optimizer.cache[jobName] = &est2
return &est2, true
}
}
return est, est.Version >= 5
}
}
return &OptimizerJobExecutionTime{}, false
}
func (optimizer *Optimizer) getAllPredicts() map[string]*OptimizerJobExecutionTime {
return optimizer.predicts
}
func (optimizer *Optimizer) getAllGPUUtils() map[string]*OptimizerUtilGPU {
return optimizer.jobUtilsGPU
}
func (optimizer *Optimizer) feedData(job string, seq int, pre int, main int, post int, total int) {
spider := Spider{} spider := Spider{}
spider.Method = "GET" spider.Method = "GET"
params := "job=" + job + "&seq=" + strconv.Itoa(seq) + "&value=" + strconv.Itoa(total) params := "job=" + jobName
spider.URL = "http://yao-optimizer:8080/feed?" + params
err := spider.do()
if err != nil {
log.Warn(err)
return
}
resp := spider.getResponse()
if _, err := ioutil.ReadAll(resp.Body); err != nil {
log.Warn(err)
}
resp.Body.Close()
if err != nil {
log.Warn(err)
return
}
}
func (optimizer *Optimizer) train(job string) {
spider := Spider{}
spider.Method = "GET"
params := "job=" + job
spider.URL = "http://yao-optimizer:8080/train?" + params spider.URL = "http://yao-optimizer:8080/train?" + params
err := spider.do() err := spider.do()
@ -372,33 +392,6 @@ func (optimizer *Optimizer) train(job string) {
} }
} }
func (optimizer *Optimizer) predict(job string, seq int) (OptimizerJobExecutionTime, bool) {
spider := Spider{}
spider.Method = "GET"
params := "job=" + job + "&seq=" + strconv.Itoa(seq)
spider.URL = "http://yao-optimizer:8080/predict?" + params
err := spider.do()
if err != nil {
return OptimizerJobExecutionTime{}, false
}
resp := spider.getResponse()
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Warn(err)
return OptimizerJobExecutionTime{}, false
}
var res MsgOptimizerPredict
err = json.Unmarshal([]byte(string(body)), &res)
if err == nil {
return OptimizerJobExecutionTime{Total: res.Total, Pre: res.Pre, Main: res.Main, Post: res.Post}, true
}
return OptimizerJobExecutionTime{}, false
}
func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq { func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq {
res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0} res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0}
@ -507,3 +500,33 @@ func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq {
} }
return res return res
} }
func (optimizer *Optimizer) max(values []float64) float64 {
value := 0.0
for _, v := range values {
if v > value {
value = v
}
}
return value
}
func (optimizer *Optimizer) mean(values []float64) float64 {
sum := 0.0
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func (optimizer *Optimizer) std(values []float64) float64 {
mean := optimizer.mean(values)
std := 0.0
for j := 0; j < len(values); j++ {
// The use of Pow math function func Pow(x, y float64) float64
std += math.Pow(values[j]-mean, 2)
}
// The use of Sqrt math function func Sqrt(x float64) float64
std = math.Sqrt(std / float64(len(values)))
return std
}

View File

@ -336,7 +336,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
if _, ok := pool.bindings[gpu.UUID]; ok { if _, ok := pool.bindings[gpu.UUID]; ok {
if _, ok2 := pool.utils[gpu.UUID]; ok2 { if _, ok2 := pool.utils[gpu.UUID]; ok2 {
pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID], pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID],
UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU}) UtilGPUTimeSeries{Time: time.Now().Unix(), Util: gpu.UtilizationGPU})
} }
} }
@ -743,58 +743,59 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
if pool.TotalGPU == 0 { if pool.TotalGPU == 0 {
return []NodeStatus{} return []NodeStatus{}
} }
loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU) //loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU)
/* first, choose sharable GPUs */ /* first, choose sharable GPUs */
/*
if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio { if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio {
// check sharable // check sharable
allocationType = 1 allocationType = 1
if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { pred := InstanceOfOptimizer().PredictReq(job, "Worker")
for cur := start; ; { for cur := start; ; {
if _, ok := locks[cur.ID]; !ok { if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock() cur.Lock.Lock()
locks[cur.ID] = &cur.Lock locks[cur.ID] = &cur.Lock
} }
for _, node := range cur.Nodes { for _, node := range cur.Nodes {
var available []GPUStatus var available []GPUStatus
for _, status := range node.Status { for _, status := range node.Status {
if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated { if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated {
if jobs, ok := pool.bindings[status.UUID]; ok { if jobs, ok := pool.bindings[status.UUID]; ok {
totalUtil := util totalUtil := pred.UtilGPU
for job := range jobs { for job := range jobs {
if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok { if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok {
totalUtil += utilT totalUtil += utilT
} else { } else {
totalUtil += 100 totalUtil += 100
}
}
if totalUtil < 100 {
available = append(available, status)
} }
} }
if totalUtil < 100 {
available = append(available, status)
}
} }
} }
if len(available) >= task.NumberGPU { }
candidates = append(candidates, *node) if len(available) >= task.NumberGPU {
if len(candidates) >= len(job.Tasks)*3+5 { candidates = append(candidates, *node)
break if len(candidates) >= len(job.Tasks)*3+5 {
} break
} }
} }
if len(candidates) >= len(job.Tasks)*3+5 {
break
}
if cur.ID > cur.Next.ID {
break
}
cur = cur.Next
} }
if len(candidates) >= len(job.Tasks)*3+5 {
break
}
if cur.ID > cur.Next.ID {
break
}
cur = cur.Next
} }
//log.Info(candidates)
} }
*/
//log.Info(candidates)
/* second round, find vacant gpu */ /* second round, find vacant gpu */
if len(candidates) == 0 { if len(candidates) == 0 {
@ -831,10 +832,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
} }
/* third round, find gpu to be released */ /* third round, find gpu to be released */
/*
if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule { if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule {
estimate, valid := InstanceOfOptimizer().predictTime(job.Name) estimate := InstanceOfOptimizer().PredictTime(job)
if loadRatio >= pool.enablePreScheduleRatio && valid { if loadRatio >= pool.enablePreScheduleRatio {
allocationType = 3 allocationType = 3
for cur := start; ; { for cur := start; ; {
if _, ok := locks[cur.ID]; !ok { if _, ok := locks[cur.ID]; !ok {
@ -850,13 +852,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
continue continue
} }
for taskT, s := range tasks { for taskT, s := range tasks {
est, valid2 := InstanceOfOptimizer().predictTime(taskT) est := InstanceOfOptimizer().PredictTime(taskT)
if valid2 { now := (int)(time.Now().Unix())
now := (int)(time.Now().Unix()) log.Info(s, now, estimate, est)
log.Info(s, now, estimate, est) if now-s > est.Total-est.Post-estimate.Pre-15 {
if now-s > est.Total-est.Post-estimate.Pre-15 { available = append(available, status)
available = append(available, status)
}
} }
} }
} }
@ -879,6 +879,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
//log.Info(candidates) //log.Info(candidates)
} }
} }
*/
if len(candidates) > 0 { if len(candidates) > 0 {
log.Info("allocationType is ", allocationType) log.Info("allocationType is ", allocationType)

View File

@ -40,8 +40,8 @@ type Task struct {
} }
type UtilGPUTimeSeries struct { type UtilGPUTimeSeries struct {
Time int `json:"time"` Time int64 `json:"time"`
Util int `json:"util"` Util int `json:"util"`
} }
type OptimizerJobExecutionTime struct { type OptimizerJobExecutionTime struct {