1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
This commit is contained in:
Newnius 2020-06-14 21:12:22 +08:00
parent b979373cbd
commit 4a2bf436c7
6 changed files with 181 additions and 7 deletions

View File

@ -110,4 +110,14 @@ GPU is occupied by which job(s)
**PoolDump** **PoolDump**
``` ```
?action=debug_pool_dump ?action=debug_pool_dump
```
**EnableMock**
```
?action=debug_enable_mock
```
**DisableMock**
```
?action=debug_disable_mock
``` ```

43
src/configuration.go Normal file
View File

@ -0,0 +1,43 @@
package main
import (
"sync"
log "github.com/sirupsen/logrus"
)
type Configuration struct {
KafkaBrokers []string `json:"kafkaBrokers"`
KafkaTopic string `json:"kafkaTopic"`
SchedulerPolicy string `json:"schedulerPolicy"`
mock bool
mu sync.Mutex
}
var ConfigurationInstance *Configuration
var ConfigurationInstanceLock sync.Mutex
func InstanceOfConfiguration() *Configuration {
ConfigurationInstanceLock.Lock()
defer ConfigurationInstanceLock.Unlock()
if ConfigurationInstance == nil {
ConfigurationInstance = &Configuration{mock: false}
}
return ConfigurationInstance
}
func (config *Configuration) EnableMock() bool {
config.mu.Lock()
defer config.mu.Unlock()
config.mock = true
log.Info("configuration.mock = true")
return true
}
func (config *Configuration) DisableMock() bool {
config.mu.Lock()
defer config.mu.Unlock()
config.mock = false
log.Info("configuration.mock = false")
return true
}

View File

@ -52,6 +52,16 @@ func (jm *JobManager) start() {
time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500))) time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500)))
} }
if InstanceOfConfiguration().mock {
jm.isRunning = false
duration := InstanceOfMocker().GetDuration(jm.job, jm.resources)
log.Info("mock ", jm.job.Name, ", wait ", duration)
time.Sleep(time.Second * time.Duration(duration))
jm.returnResource([]TaskStatus{})
log.Info("JobMaster exited ", jm.job.Name)
return
}
if !jm.killFlag { if !jm.killFlag {
/* switch to Running state */ /* switch to Running state */
jm.scheduler.UpdateProgress(jm.job, Running) jm.scheduler.UpdateProgress(jm.job, Running)
@ -147,7 +157,9 @@ func (jm *JobManager) returnResource(status []TaskStatus) {
InstanceOfResourcePool().detach(t.UUID, jm.job) InstanceOfResourcePool().detach(t.UUID, jm.job)
} }
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) if !InstanceOfConfiguration().mock {
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
}
/* remove exited containers */ /* remove exited containers */
//v := url.Values{} //v := url.Values{}

View File

@ -327,6 +327,20 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js) w.Write(js)
break break
case "debug_enable_mock":
log.Debug("debug_enable_mock")
js, _ := json.Marshal(InstanceOfConfiguration().EnableMock())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_disable_mock":
log.Debug("debug_disable_mock")
js, _ := json.Marshal(InstanceOfConfiguration().DisableMock())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
default: default:
http.Error(w, "Not Found", http.StatusNotFound) http.Error(w, "Not Found", http.StatusNotFound)
break break

101
src/mocker.go Normal file
View File

@ -0,0 +1,101 @@
package main
import (
"sync"
"strings"
)
type Mocker struct {
mu sync.Mutex
}
var MockerInstance *Mocker
var MockerInstanceLock sync.Mutex
func InstanceOfMocker() *Mocker {
MockerInstanceLock.Lock()
defer MockerInstanceLock.Unlock()
if MockerInstance == nil {
MockerInstance = &Mocker{}
}
return MockerInstance
}
func (mocker *Mocker) GetDuration(job Job, nodes []NodeStatus) int {
str := strings.Split(job.Name, "-")
duration := 300
mode := "unknown"
if len(job.Tasks) == 1 {
if job.Tasks[0].NumberGPU == 1 {
mode = "s1"
} else if job.Tasks[0].NumberGPU == 2 {
mode = "s2"
}
} else if len(job.Tasks) == 3 {
var psNodes []string
var workerNodes []string
for i, task := range job.Tasks {
if task.IsPS {
psNodes = append(psNodes, nodes[i].ClientHost)
} else {
workerNodes = append(workerNodes, nodes[i].ClientHost)
}
}
if psNodes[0] == workerNodes[0] {
if psNodes[0] == workerNodes[1] {
mode = "pww"
} else {
mode = "pw:w"
}
} else {
if psNodes[0] == workerNodes[1] {
mode = "pw:w"
} else if workerNodes[0] == workerNodes[1] {
mode = "p:ww"
} else {
mode = "p:w:w"
}
}
}
if len(str) == 2 {
jobName := str[0]
durations := map[string]map[string]int{
"vgg16": {
"s1": 220,
"s2": 227,
"pww": 510,
"pw:w": 767,
"p:ww": 1190,
"p:w:w": 810,
},
"resnet50": {
"s1": 146,
"s2": 164,
"pww": 203,
"pw:w": 204,
"p:ww": 255,
"p:w:w": 210,
},
"inception3": {
"s1": 253,
"s2": 257,
"pww": 289,
"pw:w": 295,
"p:ww": 310,
"p:w:w": 290,
},
}
if vals, ok := durations[jobName]; ok {
if val, ok2 := vals[mode]; ok2 {
return val
}
}
}
return duration
}

View File

@ -8,12 +8,6 @@ import (
"net/http" "net/http"
) )
type Configuration struct {
KafkaBrokers []string `json:"kafkaBrokers"`
KafkaTopic string `json:"kafkaTopic"`
SchedulerPolicy string `json:"schedulerPolicy"`
}
type Job struct { type Job struct {
ID int `json:"id"` ID int `json:"id"`
Name string `json:"name"` Name string `json:"name"`