From aac4eefaefd6863f31b63452f1f6c2068b49d417 Mon Sep 17 00:00:00 2001 From: Newnius Date: Fri, 3 Jul 2020 21:39:40 +0800 Subject: [PATCH] update --- src/configuration.go | 62 ++++++++++++++++++++++++++++++++------------ src/main.go | 4 +-- src/resource_pool.go | 30 ++++----------------- 3 files changed, 52 insertions(+), 44 deletions(-) diff --git a/src/configuration.go b/src/configuration.go index 0a4234c..6012403 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -5,18 +5,22 @@ import ( log "github.com/sirupsen/logrus" "os" "strings" + "strconv" ) type Configuration struct { - KafkaBrokers []string `json:"KafkaBrokers"` - KafkaTopic string `json:"KafkaTopic"` - SchedulerPolicy string `json:"SchedulerPolicy"` - ListenAddr string `json:"ListenAddr"` - HDFSAddress string `json:"HDFSAddress"` - HDFSBaseDir string `json:"HDFSBaseDir"` - DFSBaseDir string `json:"DFSBaseDir"` - mock bool - mu sync.Mutex + KafkaBrokers []string `json:"KafkaBrokers"` + KafkaTopic string `json:"KafkaTopic"` + SchedulerPolicy string `json:"SchedulerPolicy"` + ListenAddr string `json:"ListenAddr"` + HDFSAddress string `json:"HDFSAddress"` + HDFSBaseDir string `json:"HDFSBaseDir"` + DFSBaseDir string `json:"DFSBaseDir"` + EnableShareRatio float64 `json:"EnableShareRatio"` + EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"` + + mock bool + mu sync.Mutex } var configurationInstance *Configuration @@ -35,12 +39,14 @@ func InstanceOfConfiguration() *Configuration { "kafka-node2:9092", "kafka-node3:9092", }, - KafkaTopic: "yao", - SchedulerPolicy: "fair", - ListenAddr: "0.0.0.0:8080", - HDFSAddress: "", - HDFSBaseDir: "/user/root/", - DFSBaseDir: "", + KafkaTopic: "yao", + SchedulerPolicy: "fair", + ListenAddr: "0.0.0.0:8080", + HDFSAddress: "", + HDFSBaseDir: "/user/root/", + DFSBaseDir: "", + EnableShareRatio: 1.5, + EnablePreScheduleRatio: 1.5, } /* override conf value from env */ @@ -68,9 +74,17 @@ func InstanceOfConfiguration() *Configuration { if len(value) != 0 { configurationInstance.HDFSBaseDir = value } - value = os.Getenv("DFSBaseDir") + value = os.Getenv("EnableShareRatio") if len(value) != 0 { - configurationInstance.DFSBaseDir = value + if val, err := strconv.ParseFloat(value, 32); err == nil { + configurationInstance.EnableShareRatio = val + } + } + value = os.Getenv("EnablePreScheduleRatio") + if len(value) != 0 { + if val, err := strconv.ParseFloat(value, 32); err == nil { + configurationInstance.EnablePreScheduleRatio = val + } } } return configurationInstance @@ -92,6 +106,18 @@ func (config *Configuration) DisableMock() bool { return true } +func (config *Configuration) SetShareRatio(ratio float64) bool { + config.EnableShareRatio = ratio + log.Info("enableShareRatio is updated to ", ratio) + return true +} + +func (config *Configuration) SetPreScheduleRatio(ratio float64) bool { + config.EnablePreScheduleRatio = ratio + log.Info("enablePreScheduleRatio is updated to ", ratio) + return true +} + func (config *Configuration) Dump() map[string]interface{} { res := map[string]interface{}{} res["KafkaBrokers"] = config.KafkaBrokers @@ -102,5 +128,7 @@ func (config *Configuration) Dump() map[string]interface{} { res["HDFSAddress"] = config.HDFSAddress res["HDFSBaseDir"] = config.HDFSBaseDir res["DFSBaseDir"] = config.DFSBaseDir + res["EnableShareRatio"] = config.EnableShareRatio + res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio return res } diff --git a/src/main.go b/src/main.go index 18d8fd5..811607f 100644 --- a/src/main.go +++ b/src/main.go @@ -255,7 +255,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { ratio = t } - js, _ := json.Marshal(InstanceOfResourcePool().SetShareRatio(ratio)) + js, _ := json.Marshal(InstanceOfConfiguration().SetShareRatio(ratio)) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -266,7 +266,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { ratio = t } - js, _ := json.Marshal(InstanceOfResourcePool().SetPreScheduleRatio(ratio)) + js, _ := json.Marshal(InstanceOfConfiguration().SetPreScheduleRatio(ratio)) w.Header().Set("Content-Type", "application/json") w.Write(js) break diff --git a/src/resource_pool.go b/src/resource_pool.go index 1281495..13a3f94 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -57,11 +57,6 @@ type ResourcePool struct { UsingGPU int UsingGPUMu sync.Mutex - enableShare bool - enableShareRatio float64 - enablePreSchedule bool - enablePreScheduleRatio float64 - enableBatch bool batchJobs map[string]Job batchMu sync.Mutex @@ -84,11 +79,6 @@ func (pool *ResourcePool) init(conf Configuration) { pool.TotalCPU = 0 pool.TotalMemory = 0 - pool.enableShare = true - pool.enableShareRatio = 0.75 - pool.enablePreSchedule = true - pool.enablePreScheduleRatio = 0.95 - pool.enableBatch = false pool.batchAllocations = map[string][]NodeStatus{} pool.batchJobs = map[string]Job{} @@ -734,6 +724,8 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { start = start.Next } + config := InstanceOfConfiguration() + locks := map[int]*sync.Mutex{} allocationType := 0 @@ -747,7 +739,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU) /* first, choose sharable GPUs */ - if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio { + if len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= config.EnableShareRatio { // check sharable allocationType = 1 pred := InstanceOfOptimizer().PredictReq(job, "Worker") @@ -859,11 +851,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { } /* third round, find gpu to be released */ - if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule { + if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 { estimate := InstanceOfOptimizer().PredictTime(job) log.Debug(estimate) - if loadRatio >= pool.enablePreScheduleRatio { + if loadRatio >= config.EnablePreScheduleRatio { allocationType = 3 availables := map[string][]GPUStatus{} for cur := start; ; { @@ -1116,18 +1108,6 @@ func (pool *ResourcePool) SetBatchInterval(interval int) bool { return true } -func (pool *ResourcePool) SetShareRatio(ratio float64) bool { - pool.enableShareRatio = ratio - log.Info("enableShareRatio is updated to ", ratio) - return true -} - -func (pool *ResourcePool) SetPreScheduleRatio(ratio float64) bool { - pool.enablePreScheduleRatio = ratio - log.Info("enablePreScheduleRatio is updated to ", ratio) - return true -} - func (pool *ResourcePool) DebugDump() map[string]interface{} { res := map[string]interface{}{} res["batchJobs"] = pool.batchJobs