mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-06-07 14:21:55 +00:00
update
This commit is contained in:
parent
2828b4f387
commit
81d5f410cb
17
src/main.go
17
src/main.go
@ -63,9 +63,20 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
case "job_predict_req":
|
case "job_predict_req":
|
||||||
log.Debug("job_predict_req")
|
log.Debug("job_predict_req")
|
||||||
jobName := r.URL.Query().Get("name")
|
var job Job
|
||||||
cmd := r.URL.Query().Get("cmd")
|
role := r.URL.Query().Get("role")
|
||||||
js, _ := json.Marshal(InstanceOfOptimizer().PredictReq(jobName, cmd))
|
err := json.Unmarshal([]byte(string(r.PostFormValue("job"))), &job)
|
||||||
|
msgJobReq := MsgJobReq{Code: 0}
|
||||||
|
if err != nil {
|
||||||
|
msgJobReq.Code = 1
|
||||||
|
msgJobReq.Error = err.Error()
|
||||||
|
} else {
|
||||||
|
msgJobReq = InstanceOfOptimizer().PredictReq(job, role)
|
||||||
|
}
|
||||||
|
js, err := json.Marshal(msgJobReq)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
}
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
w.Write(js)
|
w.Write(js)
|
||||||
break
|
break
|
||||||
|
@ -83,11 +83,11 @@ type MsgOptimizerPredict struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MsgJobReq struct {
|
type MsgJobReq struct {
|
||||||
Code int `json:"code"`
|
Code int `json:"code"`
|
||||||
Error string `json:"error"`
|
Error string `json:"error"`
|
||||||
CPU int `json:"cpu"`
|
CPU int `json:"cpu"`
|
||||||
Mem int `json:"mem"`
|
Mem int `json:"mem"`
|
||||||
GPU int `json:"gpu"`
|
UtilGPU int `json:"gpu_util"`
|
||||||
MemGPU int `json:"cpu"`
|
MemGPU int `json:"gpu_mem"`
|
||||||
BW int `json:"bw"`
|
BW int `json:"bw"`
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ type Optimizer struct {
|
|||||||
cache map[string]*OptimizerJobExecutionTime
|
cache map[string]*OptimizerJobExecutionTime
|
||||||
|
|
||||||
stats map[string]map[string]float64
|
stats map[string]map[string]float64
|
||||||
|
|
||||||
|
versions map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
var optimizerInstance *Optimizer
|
var optimizerInstance *Optimizer
|
||||||
@ -38,6 +40,7 @@ func InstanceOfOptimizer() *Optimizer {
|
|||||||
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
|
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
|
||||||
optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{}
|
optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{}
|
||||||
optimizerInstance.stats = map[string]map[string]float64{}
|
optimizerInstance.stats = map[string]map[string]float64{}
|
||||||
|
optimizerInstance.versions = map[string]int{}
|
||||||
}
|
}
|
||||||
return optimizerInstance
|
return optimizerInstance
|
||||||
}
|
}
|
||||||
@ -155,6 +158,11 @@ func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus
|
|||||||
log.Warn(err)
|
log.Warn(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
optimizer.versions[jobName]++
|
||||||
|
if optimizer.versions[jobName]%5 == 0 {
|
||||||
|
optimizer.train(jobName)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -382,6 +390,90 @@ func (optimizer *Optimizer) predict(job string, seq int) (OptimizerJobExecutionT
|
|||||||
return OptimizerJobExecutionTime{}, false
|
return OptimizerJobExecutionTime{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (optimizer *Optimizer) PredictReq(jobName string, cmd string) MsgJobReq {
|
func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq {
|
||||||
return MsgJobReq{CPU: 4, Mem: 4096, GPU: 1, MemGPU: 8192, BW: 150}
|
cmd := job.Tasks[0].Cmd
|
||||||
|
params := map[string]int{}
|
||||||
|
|
||||||
|
psNumber := 0
|
||||||
|
workerNumber := 0
|
||||||
|
for _, task := range job.Tasks {
|
||||||
|
if (role == "PS" && task.IsPS) || (role == "Worker" && !task.IsPS) {
|
||||||
|
params["num_gpus"] = task.NumberGPU
|
||||||
|
cmd = task.Cmd
|
||||||
|
}
|
||||||
|
if task.IsPS {
|
||||||
|
psNumber++
|
||||||
|
} else {
|
||||||
|
workerNumber++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
params["ps_number"] = psNumber
|
||||||
|
params["worker_number"] = workerNumber
|
||||||
|
|
||||||
|
exceptions := map[string]bool{}
|
||||||
|
exceptions["train_dir"] = true
|
||||||
|
exceptions["variable_update"] = true
|
||||||
|
exceptions["ps_hosts"] = true
|
||||||
|
exceptions["worker_hosts"] = true
|
||||||
|
exceptions["task_index"] = true
|
||||||
|
|
||||||
|
pairs := strings.Split(cmd, " ")
|
||||||
|
for _, pair := range pairs {
|
||||||
|
v := strings.Split(pair, "=")
|
||||||
|
if len(v) == 2 && v[0][:2] == "--" {
|
||||||
|
var param string
|
||||||
|
var value int
|
||||||
|
param = v[0][2:]
|
||||||
|
|
||||||
|
if val, err := strconv.Atoi(v[1]); err == nil {
|
||||||
|
value = val
|
||||||
|
} else {
|
||||||
|
h := fnv.New32a()
|
||||||
|
h.Write([]byte(v[1]))
|
||||||
|
value = int(h.Sum32())
|
||||||
|
}
|
||||||
|
if _, ok := exceptions[param]; !ok {
|
||||||
|
params[param] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//log.Info(job.Name, params)
|
||||||
|
|
||||||
|
features, _ := json.Marshal(params)
|
||||||
|
|
||||||
|
spider := Spider{}
|
||||||
|
spider.Method = "GET"
|
||||||
|
spider.URL = "http://yao-optimizer:8080/predict?job=" + job.Name + "&features=" + string(features)
|
||||||
|
|
||||||
|
err := spider.do()
|
||||||
|
if err != nil {
|
||||||
|
return MsgJobReq{Code: 2, Error: err.Error()}
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := spider.getResponse()
|
||||||
|
body, err := ioutil.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return MsgJobReq{Code: 3, Error: err.Error()}
|
||||||
|
}
|
||||||
|
|
||||||
|
req := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0}
|
||||||
|
var tmp map[string]float64
|
||||||
|
err = json.Unmarshal([]byte(string(body)), &tmp)
|
||||||
|
if err == nil {
|
||||||
|
if v, ok := tmp["cpu"]; ok {
|
||||||
|
req.CPU = int(math.Ceil(v / 100))
|
||||||
|
}
|
||||||
|
if v, ok := tmp["mem"]; ok {
|
||||||
|
req.Mem = int(math.Ceil(v/1024)) * 1024
|
||||||
|
}
|
||||||
|
if v, ok := tmp["gpu_util"]; ok {
|
||||||
|
req.UtilGPU = int(math.Ceil(v)/10) * 10
|
||||||
|
}
|
||||||
|
if v, ok := tmp["gpu_mem"]; ok {
|
||||||
|
req.MemGPU = int(math.Ceil(v/1024)) * 1024
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return req
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user