1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 22:01:55 +00:00
This commit is contained in:
Newnius 2020-07-03 21:39:40 +08:00
parent 46962fe0f7
commit aac4eefaef
3 changed files with 52 additions and 44 deletions

View File

@ -5,18 +5,22 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"os" "os"
"strings" "strings"
"strconv"
) )
type Configuration struct { type Configuration struct {
KafkaBrokers []string `json:"KafkaBrokers"` KafkaBrokers []string `json:"KafkaBrokers"`
KafkaTopic string `json:"KafkaTopic"` KafkaTopic string `json:"KafkaTopic"`
SchedulerPolicy string `json:"SchedulerPolicy"` SchedulerPolicy string `json:"SchedulerPolicy"`
ListenAddr string `json:"ListenAddr"` ListenAddr string `json:"ListenAddr"`
HDFSAddress string `json:"HDFSAddress"` HDFSAddress string `json:"HDFSAddress"`
HDFSBaseDir string `json:"HDFSBaseDir"` HDFSBaseDir string `json:"HDFSBaseDir"`
DFSBaseDir string `json:"DFSBaseDir"` DFSBaseDir string `json:"DFSBaseDir"`
mock bool EnableShareRatio float64 `json:"EnableShareRatio"`
mu sync.Mutex EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"`
mock bool
mu sync.Mutex
} }
var configurationInstance *Configuration var configurationInstance *Configuration
@ -35,12 +39,14 @@ func InstanceOfConfiguration() *Configuration {
"kafka-node2:9092", "kafka-node2:9092",
"kafka-node3:9092", "kafka-node3:9092",
}, },
KafkaTopic: "yao", KafkaTopic: "yao",
SchedulerPolicy: "fair", SchedulerPolicy: "fair",
ListenAddr: "0.0.0.0:8080", ListenAddr: "0.0.0.0:8080",
HDFSAddress: "", HDFSAddress: "",
HDFSBaseDir: "/user/root/", HDFSBaseDir: "/user/root/",
DFSBaseDir: "", DFSBaseDir: "",
EnableShareRatio: 1.5,
EnablePreScheduleRatio: 1.5,
} }
/* override conf value from env */ /* override conf value from env */
@ -68,9 +74,17 @@ func InstanceOfConfiguration() *Configuration {
if len(value) != 0 { if len(value) != 0 {
configurationInstance.HDFSBaseDir = value configurationInstance.HDFSBaseDir = value
} }
value = os.Getenv("DFSBaseDir") value = os.Getenv("EnableShareRatio")
if len(value) != 0 { if len(value) != 0 {
configurationInstance.DFSBaseDir = value if val, err := strconv.ParseFloat(value, 32); err == nil {
configurationInstance.EnableShareRatio = val
}
}
value = os.Getenv("EnablePreScheduleRatio")
if len(value) != 0 {
if val, err := strconv.ParseFloat(value, 32); err == nil {
configurationInstance.EnablePreScheduleRatio = val
}
} }
} }
return configurationInstance return configurationInstance
@ -92,6 +106,18 @@ func (config *Configuration) DisableMock() bool {
return true return true
} }
func (config *Configuration) SetShareRatio(ratio float64) bool {
config.EnableShareRatio = ratio
log.Info("enableShareRatio is updated to ", ratio)
return true
}
func (config *Configuration) SetPreScheduleRatio(ratio float64) bool {
config.EnablePreScheduleRatio = ratio
log.Info("enablePreScheduleRatio is updated to ", ratio)
return true
}
func (config *Configuration) Dump() map[string]interface{} { func (config *Configuration) Dump() map[string]interface{} {
res := map[string]interface{}{} res := map[string]interface{}{}
res["KafkaBrokers"] = config.KafkaBrokers res["KafkaBrokers"] = config.KafkaBrokers
@ -102,5 +128,7 @@ func (config *Configuration) Dump() map[string]interface{} {
res["HDFSAddress"] = config.HDFSAddress res["HDFSAddress"] = config.HDFSAddress
res["HDFSBaseDir"] = config.HDFSBaseDir res["HDFSBaseDir"] = config.HDFSBaseDir
res["DFSBaseDir"] = config.DFSBaseDir res["DFSBaseDir"] = config.DFSBaseDir
res["EnableShareRatio"] = config.EnableShareRatio
res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio
return res return res
} }

View File

@ -255,7 +255,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil {
ratio = t ratio = t
} }
js, _ := json.Marshal(InstanceOfResourcePool().SetShareRatio(ratio)) js, _ := json.Marshal(InstanceOfConfiguration().SetShareRatio(ratio))
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(js) w.Write(js)
break break
@ -266,7 +266,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil {
ratio = t ratio = t
} }
js, _ := json.Marshal(InstanceOfResourcePool().SetPreScheduleRatio(ratio)) js, _ := json.Marshal(InstanceOfConfiguration().SetPreScheduleRatio(ratio))
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(js) w.Write(js)
break break

View File

@ -57,11 +57,6 @@ type ResourcePool struct {
UsingGPU int UsingGPU int
UsingGPUMu sync.Mutex UsingGPUMu sync.Mutex
enableShare bool
enableShareRatio float64
enablePreSchedule bool
enablePreScheduleRatio float64
enableBatch bool enableBatch bool
batchJobs map[string]Job batchJobs map[string]Job
batchMu sync.Mutex batchMu sync.Mutex
@ -84,11 +79,6 @@ func (pool *ResourcePool) init(conf Configuration) {
pool.TotalCPU = 0 pool.TotalCPU = 0
pool.TotalMemory = 0 pool.TotalMemory = 0
pool.enableShare = true
pool.enableShareRatio = 0.75
pool.enablePreSchedule = true
pool.enablePreScheduleRatio = 0.95
pool.enableBatch = false pool.enableBatch = false
pool.batchAllocations = map[string][]NodeStatus{} pool.batchAllocations = map[string][]NodeStatus{}
pool.batchJobs = map[string]Job{} pool.batchJobs = map[string]Job{}
@ -734,6 +724,8 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
start = start.Next start = start.Next
} }
config := InstanceOfConfiguration()
locks := map[int]*sync.Mutex{} locks := map[int]*sync.Mutex{}
allocationType := 0 allocationType := 0
@ -747,7 +739,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU) loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU)
/* first, choose sharable GPUs */ /* first, choose sharable GPUs */
if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio { if len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= config.EnableShareRatio {
// check sharable // check sharable
allocationType = 1 allocationType = 1
pred := InstanceOfOptimizer().PredictReq(job, "Worker") pred := InstanceOfOptimizer().PredictReq(job, "Worker")
@ -859,11 +851,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
} }
/* third round, find gpu to be released */ /* third round, find gpu to be released */
if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule { if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 {
estimate := InstanceOfOptimizer().PredictTime(job) estimate := InstanceOfOptimizer().PredictTime(job)
log.Debug(estimate) log.Debug(estimate)
if loadRatio >= pool.enablePreScheduleRatio { if loadRatio >= config.EnablePreScheduleRatio {
allocationType = 3 allocationType = 3
availables := map[string][]GPUStatus{} availables := map[string][]GPUStatus{}
for cur := start; ; { for cur := start; ; {
@ -1116,18 +1108,6 @@ func (pool *ResourcePool) SetBatchInterval(interval int) bool {
return true return true
} }
func (pool *ResourcePool) SetShareRatio(ratio float64) bool {
pool.enableShareRatio = ratio
log.Info("enableShareRatio is updated to ", ratio)
return true
}
func (pool *ResourcePool) SetPreScheduleRatio(ratio float64) bool {
pool.enablePreScheduleRatio = ratio
log.Info("enablePreScheduleRatio is updated to ", ratio)
return true
}
func (pool *ResourcePool) DebugDump() map[string]interface{} { func (pool *ResourcePool) DebugDump() map[string]interface{} {
res := map[string]interface{}{} res := map[string]interface{}{}
res["batchJobs"] = pool.batchJobs res["batchJobs"] = pool.batchJobs