1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
YAO-scheduler/src/optimizer.go

498 lines
12 KiB
Go
Raw Normal View History

2020-04-12 03:14:53 +00:00
package main
import (
log "github.com/sirupsen/logrus"
"sync"
2020-04-12 17:30:25 +00:00
"strings"
2020-05-02 12:55:46 +00:00
"io/ioutil"
"strconv"
"encoding/json"
2020-05-03 07:19:21 +00:00
"time"
2020-06-05 07:33:23 +00:00
"math"
2020-06-22 02:25:47 +00:00
"hash/fnv"
2020-04-12 03:14:53 +00:00
)
type Optimizer struct {
scheduler Scheduler
killedFlag bool
2020-04-12 17:30:25 +00:00
2020-04-30 07:50:05 +00:00
predicts map[string]*OptimizerJobExecutionTime
2020-04-30 05:08:08 +00:00
2020-04-30 08:45:43 +00:00
jobUtilsGPU map[string]*OptimizerUtilGPU
2020-04-30 09:52:52 +00:00
2020-05-01 06:54:29 +00:00
cache map[string]*OptimizerJobExecutionTime
2020-06-05 07:33:23 +00:00
stats map[string]map[string]float64
2020-06-24 14:47:27 +00:00
versions map[string]int
2020-04-12 03:14:53 +00:00
}
var optimizerInstance *Optimizer
var OptimizerInstanceLock sync.Mutex
func InstanceOfOptimizer() *Optimizer {
defer OptimizerInstanceLock.Unlock()
OptimizerInstanceLock.Lock()
if optimizerInstance == nil {
optimizerInstance = &Optimizer{}
2020-04-30 07:50:05 +00:00
optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{}
2020-04-30 08:45:43 +00:00
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
2020-05-01 06:54:29 +00:00
optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{}
2020-06-05 07:33:23 +00:00
optimizerInstance.stats = map[string]map[string]float64{}
2020-06-24 14:47:27 +00:00
optimizerInstance.versions = map[string]int{}
2020-04-12 03:14:53 +00:00
}
return optimizerInstance
}
2020-05-24 13:07:02 +00:00
func (optimizer *Optimizer) init(conf Configuration) {
log.Info("optimizer started")
}
2020-06-22 02:25:47 +00:00
func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus) {
2020-06-22 07:08:26 +00:00
if len(stats) == 0 {
return
}
str := strings.Split(job.Name, "-")
if len(str) == 1 {
return
}
jobName := str[0]
2020-06-05 07:35:56 +00:00
go func() {
2020-06-22 07:08:26 +00:00
2020-06-05 07:35:56 +00:00
var UtilsCPU []float64
var Mems []float64
var BwRxs []float64
var BwTxs []float64
2020-06-24 08:47:21 +00:00
var UtilGPUs []float64
var MemGPUs []float64
2020-06-22 07:08:26 +00:00
for _, stat := range stats {
for _, task := range stat {
UtilsCPU = append(UtilsCPU, task.UtilCPU)
Mems = append(Mems, task.Mem)
BwRxs = append(BwRxs, task.BwRX)
BwTxs = append(BwTxs, task.BWTx)
2020-06-24 08:47:21 +00:00
UtilGPUs = append(UtilGPUs, float64(task.UtilGPU))
MemGPUs = append(MemGPUs, float64(task.MemGPU))
2020-06-22 02:25:47 +00:00
}
}
2020-06-22 07:08:26 +00:00
tmp := map[string]float64{
2020-06-24 08:47:21 +00:00
"cpu": optimizer.max(UtilsCPU),
"cpu_std": optimizer.std(UtilsCPU),
"cpu_mean": optimizer.mean(UtilsCPU),
"mem": optimizer.max(Mems),
"bw_rx": optimizer.mean(BwRxs),
"bw_tx": optimizer.mean(BwTxs),
"gpu_util": optimizer.mean(UtilGPUs),
"gpu_util_std": optimizer.std(UtilGPUs),
"gpu_mem": optimizer.max(MemGPUs),
2020-06-22 07:08:26 +00:00
}
labels, _ := json.Marshal(tmp)
2020-06-22 02:25:47 +00:00
cmd := job.Tasks[0].Cmd
params := map[string]int{}
psNumber := 0
workerNumber := 0
for _, task := range job.Tasks {
if (role == "PS" && task.IsPS) || (role == "Worker" && !task.IsPS) {
params["num_gpus"] = task.NumberGPU
cmd = task.Cmd
}
if task.IsPS {
psNumber++
} else {
workerNumber++
}
}
params["ps_number"] = psNumber
params["worker_number"] = workerNumber
exceptions := map[string]bool{}
exceptions["train_dir"] = true
exceptions["variable_update"] = true
exceptions["ps_hosts"] = true
exceptions["worker_hosts"] = true
exceptions["task_index"] = true
pairs := strings.Split(cmd, " ")
for _, pair := range pairs {
v := strings.Split(pair, "=")
if len(v) == 2 && v[0][:2] == "--" {
var param string
var value int
param = v[0][2:]
if val, err := strconv.Atoi(v[1]); err == nil {
value = val
} else {
h := fnv.New32a()
h.Write([]byte(v[1]))
value = int(h.Sum32())
}
if _, ok := exceptions[param]; !ok {
params[param] = value
}
2020-06-05 07:35:56 +00:00
}
2020-06-05 07:33:23 +00:00
}
2020-06-22 07:08:26 +00:00
//log.Info(job.Name, params)
features, _ := json.Marshal(params)
spider := Spider{}
spider.Method = "GET"
2020-06-24 09:05:40 +00:00
spider.URL = "http://yao-optimizer:8080/feed?job=" + jobName + "&features=" + string(features) + "&labels=" + string(labels)
2020-06-22 07:08:26 +00:00
err := spider.do()
if err != nil {
log.Warn(err)
return
}
resp := spider.getResponse()
if _, err := ioutil.ReadAll(resp.Body); err != nil {
log.Warn(err)
}
resp.Body.Close()
if err != nil {
log.Warn(err)
return
}
2020-06-24 14:47:27 +00:00
optimizer.versions[jobName]++
if optimizer.versions[jobName]%5 == 0 {
optimizer.train(jobName)
}
2020-06-05 07:35:56 +00:00
}()
2020-06-05 07:33:23 +00:00
}
func (optimizer *Optimizer) max(values []float64) float64 {
value := 0.0
for _, v := range values {
2020-06-07 08:40:57 +00:00
if v > value {
2020-06-05 07:33:23 +00:00
value = v
}
}
return value
}
func (optimizer *Optimizer) mean(values []float64) float64 {
sum := 0.0
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func (optimizer *Optimizer) std(values []float64) float64 {
mean := optimizer.mean(values)
std := 0.0
for j := 0; j < len(values); j++ {
// The use of Pow math function func Pow(x, y float64) float64
std += math.Pow(values[j]-mean, 2)
}
// The use of Sqrt math function func Sqrt(x float64) float64
std = math.Sqrt(std / float64(len(values)))
return std
}
func (optimizer *Optimizer) describe(job string) map[string]float64 {
if stat, ok := optimizer.stats[job]; ok {
return stat
}
return map[string]float64{}
}
2020-06-24 11:24:18 +00:00
func (optimizer *Optimizer) feed3(job string, utils []UtilGPUTimeSeries) {
2020-06-22 02:25:47 +00:00
log.Info("optimizer feed ", job)
2020-05-05 07:44:48 +00:00
//log.Info(job, utils)
2020-04-12 17:30:25 +00:00
2020-04-30 05:13:38 +00:00
if len(utils) == 0 {
return
}
2020-04-12 17:30:25 +00:00
go func() {
str := strings.Split(job, "-")
if len(str) == 2 {
2020-04-30 06:50:21 +00:00
jobName := str[0]
2020-04-30 05:08:08 +00:00
sum := 0
for i := 0; i < len(utils); i++ {
2020-04-30 15:06:12 +00:00
sum += utils[i].Util
2020-04-30 05:08:08 +00:00
}
2020-04-30 09:11:16 +00:00
sum /= len(utils)
if _, ok := optimizer.jobUtilsGPU[jobName]; !ok {
optimizer.jobUtilsGPU[jobName] = &OptimizerUtilGPU{}
2020-04-30 05:08:08 +00:00
}
2020-04-30 09:11:16 +00:00
t := optimizer.jobUtilsGPU[jobName]
t.Util = (t.Version*t.Util + sum) / (t.Version + 1)
t.Version++
2020-04-30 05:08:08 +00:00
2020-04-30 15:06:12 +00:00
preTime := 0
2020-04-12 17:30:25 +00:00
for i := 0; i < len(utils); i++ {
2020-04-30 15:06:12 +00:00
if utils[i].Util > 15 {
preTime = utils[i].Time - utils[0].Time
2020-04-12 17:30:25 +00:00
break
}
}
2020-04-30 15:06:12 +00:00
postTime := 0
2020-04-30 05:08:08 +00:00
for i := len(utils) - 1; i >= 0; i-- {
2020-04-30 15:06:12 +00:00
if utils[i].Util > 15 {
postTime = utils[len(utils)-1].Time - utils[i].Time
2020-04-12 17:30:25 +00:00
break
}
}
2020-04-30 07:50:05 +00:00
if _, ok := optimizer.predicts[jobName]; !ok {
optimizer.predicts[jobName] = &OptimizerJobExecutionTime{}
2020-04-12 17:30:25 +00:00
}
2020-04-30 15:06:12 +00:00
totalTime := utils[len(utils)-1].Time - utils[0].Time
2020-04-30 14:19:24 +00:00
2020-04-30 07:50:05 +00:00
predict := optimizer.predicts[jobName]
2020-05-01 06:12:28 +00:00
if predict.Version == 0 {
predict.Pre = preTime
predict.Post = postTime
predict.Total = totalTime
predict.Main = predict.Total - predict.Pre - predict.Post
if predict.Main < 0 {
predict.Main = 0
}
}
predict.Pre = (predict.Pre*95 + preTime*5) / 100
predict.Post = (predict.Post*95 + postTime*5) / 100
predict.Total = (predict.Total*95 + totalTime*5) / 100
2020-04-30 05:08:08 +00:00
predict.Main = predict.Total - predict.Pre - predict.Post
2020-04-30 08:45:43 +00:00
if predict.Main < 0 {
predict.Main = 0
}
2020-04-12 17:30:25 +00:00
predict.Version++
2020-05-02 12:55:46 +00:00
optimizer.feedData(jobName, predict.Version, 0, 0, 0, predict.Total)
if predict.Version%10 == 0 && predict.Version > 30 {
optimizer.train(jobName)
}
2020-04-12 17:30:25 +00:00
}
}()
2020-04-12 03:14:53 +00:00
}
2020-04-30 05:08:08 +00:00
2020-04-30 06:04:40 +00:00
func (optimizer *Optimizer) predictUtilGPU(job string) (int, bool) {
2020-04-30 07:11:06 +00:00
str := strings.Split(job, "-")
if len(str) == 2 {
jobName := str[0]
2020-04-30 08:45:43 +00:00
if _, ok := optimizer.jobUtilsGPU[jobName]; ok {
return optimizer.jobUtilsGPU[jobName].Util, optimizer.jobUtilsGPU[jobName].Version >= 5
2020-04-30 07:11:06 +00:00
}
2020-04-30 05:08:08 +00:00
}
2020-04-30 07:11:06 +00:00
return 100, false
2020-04-30 05:08:08 +00:00
}
2020-04-30 07:50:05 +00:00
func (optimizer *Optimizer) predictTime(job string) (*OptimizerJobExecutionTime, bool) {
2020-04-30 07:11:06 +00:00
str := strings.Split(job, "-")
if len(str) == 2 {
jobName := str[0]
2020-05-03 07:19:21 +00:00
if est, ok := optimizer.cache[jobName]; ok && est.Version > (int)(time.Now().Unix())-300 {
2020-05-02 12:55:46 +00:00
return est, true
}
if est, ok := optimizer.predicts[jobName]; ok {
if est.Version > 40 {
if est2, ok := optimizer.predict(jobName, est.Version); ok {
est2.Pre = est.Pre * est2.Total / est.Total
est2.Main = est.Main * est2.Total / est.Total
est2.Post = est.Post * est2.Total / est.Total
2020-05-03 07:19:21 +00:00
est2.Version = (int)(time.Now().Unix())
2020-05-02 12:55:46 +00:00
optimizer.cache[jobName] = &est2
return &est2, true
}
}
return est, est.Version >= 5
2020-04-30 07:11:06 +00:00
}
2020-04-30 05:08:08 +00:00
}
2020-04-30 07:50:05 +00:00
return &OptimizerJobExecutionTime{}, false
2020-04-30 05:08:08 +00:00
}
2020-04-30 08:11:34 +00:00
func (optimizer *Optimizer) getAllPredicts() map[string]*OptimizerJobExecutionTime {
return optimizer.predicts
}
2020-04-30 08:45:43 +00:00
func (optimizer *Optimizer) getAllGPUUtils() map[string]*OptimizerUtilGPU {
2020-04-30 08:11:34 +00:00
return optimizer.jobUtilsGPU
}
2020-05-02 12:55:46 +00:00
func (optimizer *Optimizer) feedData(job string, seq int, pre int, main int, post int, total int) {
spider := Spider{}
spider.Method = "GET"
params := "job=" + job + "&seq=" + strconv.Itoa(seq) + "&value=" + strconv.Itoa(total)
spider.URL = "http://yao-optimizer:8080/feed?" + params
err := spider.do()
if err != nil {
2020-05-05 07:44:48 +00:00
log.Warn(err)
2020-05-02 12:55:46 +00:00
return
}
resp := spider.getResponse()
if _, err := ioutil.ReadAll(resp.Body); err != nil {
log.Warn(err)
}
resp.Body.Close()
if err != nil {
2020-05-05 07:44:48 +00:00
log.Warn(err)
2020-05-02 12:55:46 +00:00
return
}
}
func (optimizer *Optimizer) train(job string) {
spider := Spider{}
spider.Method = "GET"
params := "job=" + job
spider.URL = "http://yao-optimizer:8080/train?" + params
err := spider.do()
if err != nil {
return
}
resp := spider.getResponse()
if _, err := ioutil.ReadAll(resp.Body); err != nil {
log.Warn(err)
}
resp.Body.Close()
if err != nil {
return
}
}
func (optimizer *Optimizer) predict(job string, seq int) (OptimizerJobExecutionTime, bool) {
spider := Spider{}
spider.Method = "GET"
params := "job=" + job + "&seq=" + strconv.Itoa(seq)
spider.URL = "http://yao-optimizer:8080/predict?" + params
err := spider.do()
if err != nil {
return OptimizerJobExecutionTime{}, false
}
resp := spider.getResponse()
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Warn(err)
return OptimizerJobExecutionTime{}, false
}
var res MsgOptimizerPredict
err = json.Unmarshal([]byte(string(body)), &res)
2020-05-02 13:24:58 +00:00
if err == nil {
2020-05-02 12:55:46 +00:00
return OptimizerJobExecutionTime{Total: res.Total, Pre: res.Pre, Main: res.Main, Post: res.Post}, true
}
return OptimizerJobExecutionTime{}, false
}
2020-06-21 05:14:10 +00:00
2020-06-24 14:47:27 +00:00
func (optimizer *Optimizer) PredictReq(job Job, role string) MsgJobReq {
2020-06-25 03:32:53 +00:00
res := MsgJobReq{CPU: 4, Mem: 4096, UtilGPU: 100, MemGPU: 8192, BW: 0}
2020-06-24 15:27:04 +00:00
var jobName string
str := strings.Split(job.Name, "-")
if len(str) == 2 {
jobName = str[0]
}
cmd := ""
2020-06-24 14:47:27 +00:00
params := map[string]int{}
2020-06-24 15:35:57 +00:00
log.Info(job)
log.Info(role)
2020-06-24 14:47:27 +00:00
psNumber := 0
workerNumber := 0
2020-06-25 03:32:53 +00:00
flag := false
2020-06-24 14:47:27 +00:00
for _, task := range job.Tasks {
if (role == "PS" && task.IsPS) || (role == "Worker" && !task.IsPS) {
params["num_gpus"] = task.NumberGPU
cmd = task.Cmd
2020-06-25 03:32:53 +00:00
flag = true
2020-06-24 14:47:27 +00:00
}
if task.IsPS {
psNumber++
} else {
workerNumber++
}
}
params["ps_number"] = psNumber
params["worker_number"] = workerNumber
2020-06-25 03:32:53 +00:00
if !flag {
return res
}
2020-06-24 14:47:27 +00:00
exceptions := map[string]bool{}
exceptions["train_dir"] = true
exceptions["variable_update"] = true
exceptions["ps_hosts"] = true
exceptions["worker_hosts"] = true
exceptions["task_index"] = true
pairs := strings.Split(cmd, " ")
for _, pair := range pairs {
v := strings.Split(pair, "=")
if len(v) == 2 && v[0][:2] == "--" {
var param string
var value int
param = v[0][2:]
if val, err := strconv.Atoi(v[1]); err == nil {
value = val
} else {
h := fnv.New32a()
h.Write([]byte(v[1]))
value = int(h.Sum32())
}
if _, ok := exceptions[param]; !ok {
params[param] = value
}
}
}
//log.Info(job.Name, params)
features, _ := json.Marshal(params)
spider := Spider{}
spider.Method = "GET"
2020-06-24 15:27:04 +00:00
spider.URL = "http://yao-optimizer:8080/predict?job=" + jobName + "&features=" + string(features)
2020-06-24 14:47:27 +00:00
err := spider.do()
if err != nil {
return MsgJobReq{Code: 2, Error: err.Error()}
}
resp := spider.getResponse()
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Warn(err)
return MsgJobReq{Code: 3, Error: err.Error()}
}
2020-06-24 15:52:23 +00:00
var msg MsgJobReqPredict
err = json.Unmarshal([]byte(string(body)), &msg)
if err == nil && msg.Code == 0 {
2020-06-25 03:28:07 +00:00
tmp := msg.Labels
2020-06-24 14:47:27 +00:00
if v, ok := tmp["cpu"]; ok {
2020-06-25 03:32:53 +00:00
res.CPU = int(math.Ceil(v / 100))
2020-06-24 14:47:27 +00:00
}
if v, ok := tmp["mem"]; ok {
2020-06-25 03:32:53 +00:00
res.Mem = int(math.Ceil(v/1024)) * 1024
2020-06-24 14:47:27 +00:00
}
if v, ok := tmp["gpu_util"]; ok {
2020-06-25 03:32:53 +00:00
res.UtilGPU = int(math.Ceil(v)/10) * 10
2020-06-24 14:47:27 +00:00
}
if v, ok := tmp["gpu_mem"]; ok {
2020-06-25 03:32:53 +00:00
res.MemGPU = int(math.Ceil(v/1024)) * 1024
2020-06-24 14:47:27 +00:00
}
2020-06-24 15:27:04 +00:00
if v, ok := tmp["bw"]; ok {
2020-06-25 03:32:53 +00:00
res.BW = int(math.Ceil(v/10)) * 10
2020-06-24 15:27:04 +00:00
}
2020-06-24 14:47:27 +00:00
}
2020-06-25 03:32:53 +00:00
return res
2020-06-21 05:14:10 +00:00
}