1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
This commit is contained in:
Newnius 2020-06-22 10:25:47 +08:00
parent 4c657e698e
commit ee4cff5d55
2 changed files with 71 additions and 9 deletions

View File

@ -137,7 +137,20 @@ func (jm *JobManager) start() {
} }
/* make sure resources are released */ /* make sure resources are released */
InstanceOfOptimizer().feedStats(jm.job.Name, jm.stats) var stats [][]TaskStatus
for i, task := range jm.job.Tasks {
if task.IsPS {
stats = append(stats, jm.stats[i])
}
}
InstanceOfOptimizer().feedStats(jm.job, "PS", stats)
stats = [][]TaskStatus{}
for i, task := range jm.job.Tasks {
if !task.IsPS {
stats = append(stats, jm.stats[i])
}
}
InstanceOfOptimizer().feedStats(jm.job, "Worker", stats)
jm.returnResource(jm.status().Status) jm.returnResource(jm.status().Status)
log.Info("JobMaster exited ", jm.job.Name) log.Info("JobMaster exited ", jm.job.Name)
} }

View File

@ -9,6 +9,7 @@ import (
"encoding/json" "encoding/json"
"time" "time"
"math" "math"
"hash/fnv"
) )
type Optimizer struct { type Optimizer struct {
@ -45,13 +46,13 @@ func (optimizer *Optimizer) init(conf Configuration) {
log.Info("optimizer started") log.Info("optimizer started")
} }
func (optimizer *Optimizer) feedStats(job string, stats [][]TaskStatus) { func (optimizer *Optimizer) feedStats(job Job, role string, stats [][]TaskStatus) {
go func() { go func() {
var UtilsCPU []float64 var UtilsCPU []float64
var Mems []float64 var Mems []float64
var BwRxs []float64 var BwRxs []float64
var BwTxs []float64 var BwTxs []float64
str := strings.Split(job, "-") str := strings.Split(job.Name, "-")
if len(str) == 2 { if len(str) == 2 {
jobName := str[0] jobName := str[0]
for _, stat := range stats { for _, stat := range stats {
@ -63,13 +64,61 @@ func (optimizer *Optimizer) feedStats(job string, stats [][]TaskStatus) {
} }
} }
optimizer.stats[jobName] = map[string]float64{ optimizer.stats[jobName] = map[string]float64{
"cpu": optimizer.mean(UtilsCPU), "cpu": optimizer.max(UtilsCPU),
"cpu_std": optimizer.std(UtilsCPU), "cpu_std": optimizer.std(UtilsCPU),
"cpu_mean": optimizer.mean(UtilsCPU),
"mem": optimizer.max(Mems), "mem": optimizer.max(Mems),
"bw_rx": optimizer.mean(BwRxs), "bw_rx": optimizer.mean(BwRxs),
"bw_tx": optimizer.mean(BwTxs), "bw_tx": optimizer.mean(BwTxs),
} }
} }
cmd := job.Tasks[0].Cmd
params := map[string]int{}
psNumber := 0
workerNumber := 0
for _, task := range job.Tasks {
if (role == "PS" && task.IsPS) || (role == "Worker" && !task.IsPS) {
params["num_gpus"] = task.NumberGPU
cmd = task.Cmd
}
if task.IsPS {
psNumber++
} else {
workerNumber++
}
}
params["ps_number"] = psNumber
params["worker_number"] = workerNumber
exceptions := map[string]bool{}
exceptions["train_dir"] = true
exceptions["variable_update"] = true
exceptions["ps_hosts"] = true
exceptions["worker_hosts"] = true
exceptions["task_index"] = true
pairs := strings.Split(cmd, " ")
for _, pair := range pairs {
v := strings.Split(pair, "=")
if len(v) == 2 && v[0][:2] == "--" {
var param string
var value int
param = v[0][2:]
if val, err := strconv.Atoi(v[1]); err == nil {
value = val
} else {
h := fnv.New32a()
h.Write([]byte(v[1]))
value = int(h.Sum32())
}
if _, ok := exceptions[param]; !ok {
params[param] = value
}
}
}
log.Info(job.Name, params)
}() }()
} }
@ -111,7 +160,7 @@ func (optimizer *Optimizer) describe(job string) map[string]float64 {
} }
func (optimizer *Optimizer) feed(job string, utils []UtilGPUTimeSeries) { func (optimizer *Optimizer) feed(job string, utils []UtilGPUTimeSeries) {
log.Info("optimizer feed") log.Info("optimizer feed ", job)
//log.Info(job, utils) //log.Info(job, utils)
if len(utils) == 0 { if len(utils) == 0 {