1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00

update preempt

This commit is contained in:
Newnius 2020-06-05 15:33:23 +08:00
parent 0d67d4558e
commit 6f0a9617e4
5 changed files with 85 additions and 0 deletions

View File

@ -86,3 +86,8 @@ GPU is occupied by which job(s)
```
?action=debug_scheduler_dump
```
**DescribeJob**
```
?action=debug_optimizer_describe_job&job=
```

View File

@ -22,6 +22,8 @@ type JobManager struct {
killFlag bool
network string
stats [][]TaskStatus
}
func (jm *JobManager) start() {
@ -320,6 +322,8 @@ func (jm *JobManager) status() MsgJobStatus {
go func() {
jm.checkStatus(tasksStatus)
}()
jm.stats = append(jm.stats, tasksStatus)
}
return MsgJobStatus{Status: tasksStatus}
}

View File

@ -16,4 +16,8 @@ type TaskStatus struct {
FinishedAt string `json:"finished_at"`
Status string `json:"status"`
State map[string]interface{} `json:"state"`
UtilCPU float64 `json:"cpu"`
Mem float64 `json:"mem"`
BwRX float64 `json:"bw_rx"`
BWTx float64 `json:"bw_tx"`
}

View File

@ -256,6 +256,15 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "debug_optimizer_describe_job":
log.Debug("debug_optimizer_describe_job")
var job string
job = r.URL.Query().Get("job")
js, _ := json.Marshal(InstanceOfOptimizer().describe(job))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_optimizer_train_dl":
log.Debug("debug_optimizer_train_dl")
InstanceOfOptimizer().train(r.URL.Query().Get("job"))

View File

@ -8,6 +8,7 @@ import (
"strconv"
"encoding/json"
"time"
"math"
)
type Optimizer struct {
@ -19,6 +20,8 @@ type Optimizer struct {
jobUtilsGPU map[string]*OptimizerUtilGPU
cache map[string]*OptimizerJobExecutionTime
stats map[string]map[string]float64
}
var optimizerInstance *Optimizer
@ -33,6 +36,7 @@ func InstanceOfOptimizer() *Optimizer {
optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{}
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{}
optimizerInstance.stats = map[string]map[string]float64{}
}
return optimizerInstance
}
@ -41,6 +45,65 @@ func (optimizer *Optimizer) init(conf Configuration) {
log.Info("optimizer started")
}
func (optimizer *Optimizer) feedStats(job string, stats [][]TaskStatus) {
var UtilsCPU []float64
var Mems []float64
var BwRxs []float64
var BwTxs []float64
for _, stat := range stats {
for _, task := range stat {
UtilsCPU = append(UtilsCPU, task.UtilCPU)
Mems = append(Mems, task.Mem)
BwRxs = append(BwRxs, task.BwRX)
BwTxs = append(BwTxs, task.BWTx)
}
}
optimizer.stats[job] = map[string]float64{
"cpu": optimizer.mean(UtilsCPU),
"cpu_std": optimizer.std(UtilsCPU),
"mem": optimizer.max(Mems),
"bw_rx": optimizer.mean(BwRxs),
"bw_tx": optimizer.mean(BwTxs),
}
}
func (optimizer *Optimizer) max(values []float64) float64 {
value := 0.0
for _, v := range values {
if v < value {
value = v
}
}
return value
}
func (optimizer *Optimizer) mean(values []float64) float64 {
sum := 0.0
for _, v := range values {
sum += v
}
return sum / float64(len(values))
}
func (optimizer *Optimizer) std(values []float64) float64 {
mean := optimizer.mean(values)
std := 0.0
for j := 0; j < len(values); j++ {
// The use of Pow math function func Pow(x, y float64) float64
std += math.Pow(values[j]-mean, 2)
}
// The use of Sqrt math function func Sqrt(x float64) float64
std = math.Sqrt(std / float64(len(values)))
return std
}
func (optimizer *Optimizer) describe(job string) map[string]float64 {
if stat, ok := optimizer.stats[job]; ok {
return stat
}
return map[string]float64{}
}
func (optimizer *Optimizer) feed(job string, utils []UtilGPUTimeSeries) {
log.Info("optimizer feed")
//log.Info(job, utils)