From c447a85471f9d0f0ddb8e46c0a036c72449a1581 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 30 Apr 2020 17:52:52 +0800 Subject: [PATCH] update --- src/main.go | 23 +++++++ src/optimizer.go | 5 ++ src/resource_pool.go | 6 ++ src/scheduler.go | 4 ++ src/scheduler_FCFS.go | 12 ++++ src/scheduler_fair.go | 123 ++++++++++++++++++++++++++++---------- src/scheduler_priority.go | 12 ++++ 7 files changed, 152 insertions(+), 33 deletions(-) diff --git a/src/main.go b/src/main.go index 173a795..7f75939 100644 --- a/src/main.go +++ b/src/main.go @@ -189,6 +189,29 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "debug_update_enable_share_ratio": + log.Debug("debug_update_enable_share_ratio") + + ratio := 0.75 + if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { + ratio = t + } + js, _ := json.Marshal(scheduler.SetShareRatio(ratio)) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "debug_update_enable_pre_schedule_ratio": + log.Debug("debug_update_enable_pre_schedule_ratio") + ratio := 0.95 + if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { + ratio = t + } + js, _ := json.Marshal(scheduler.SetPreScheduleRatio(ratio)) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + case "debug_get_predicts": log.Debug("debug_get_predicts") js, _ := json.Marshal(InstanceOfOptimizer().getAllPredicts()) diff --git a/src/optimizer.go b/src/optimizer.go index 1102f19..274530b 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -13,6 +13,8 @@ type Optimizer struct { predicts map[string]*OptimizerJobExecutionTime jobUtilsGPU map[string]*OptimizerUtilGPU + + heartbeatInterval int } var optimizerInstance *Optimizer @@ -26,6 +28,7 @@ func InstanceOfOptimizer() *Optimizer { optimizerInstance = &Optimizer{} optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{} optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{} + optimizerInstance.heartbeatInterval = 3 } return optimizerInstance } @@ -74,6 +77,8 @@ func (optimizer *Optimizer) feed(job string, utils []int) { if _, ok := optimizer.predicts[jobName]; !ok { optimizer.predicts[jobName] = &OptimizerJobExecutionTime{} } + postCnt *= optimizer.heartbeatInterval + preCnt *= optimizer.heartbeatInterval predict := optimizer.predicts[jobName] predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1) predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1) diff --git a/src/resource_pool.go b/src/resource_pool.go index 9f67b7f..bd4b066 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -35,6 +35,8 @@ type ResourcePool struct { bindings map[string]map[string]bool bindingsMu sync.Mutex utils map[string][]int + + TotalGPU int } func (pool *ResourcePool) GPUModelToPower(model string) int { @@ -60,6 +62,8 @@ func (pool *ResourcePool) start() { pool.bindings = map[string]map[string]bool{} pool.utils = map[string][]int{} + pool.TotalGPU = 0 + pool.poolsCount = 100 for i := 0; i < pool.poolsCount; i++ { pool.pools = append(pool.pools, map[string]NodeStatus{}) @@ -141,6 +145,8 @@ func (pool *ResourcePool) start() { if len(pool.history) > 60 { pool.history = pool.history[len(pool.history)-60:] } + + pool.TotalGPU = TotalGPU time.Sleep(time.Second * 60) } }() diff --git a/src/scheduler.go b/src/scheduler.go index 3cb3492..51b7126 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -34,4 +34,8 @@ type Scheduler interface { Disable() bool UpdateParallelism(parallelism int) bool + + SetShareRatio(ratio float64) bool + + SetPreScheduleRatio(ratio float64) bool } diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index 4a7c449..fc8296e 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -261,3 +261,15 @@ func (scheduler *SchedulerFCFS) UpdateParallelism(parallelism int) bool { log.Info("parallelism is updated to", parallelism) return true } + +func (scheduler *SchedulerFCFS) SetShareRatio(ratio float64) bool { + //scheduler.enableShareRatio = ratio + log.Info("enableShareRatio is updated to", ratio) + return true +} + +func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool { + //scheduler.enablePreScheduleRatio = ratio + log.Info("enablePreScheduleRatio is updated to", ratio) + return true +} \ No newline at end of file diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 8ea85e5..4b1e8d3 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -28,8 +28,13 @@ type SchedulerFair struct { resourceAllocationsMu sync.Mutex enabled bool parallelism int - enableShare bool - enablePreSchedule bool + + enableShare bool + enableShareRatio float64 + enablePreSchedule bool + enablePreScheduleRatio float64 + + UsingGPU int } type FairJobSorter []Job @@ -53,8 +58,13 @@ func (scheduler *SchedulerFair) Start() { scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.enabled = true scheduler.schedulingJobsCnt = 0 + scheduler.enableShare = true + scheduler.enableShareRatio = 0.75 scheduler.enablePreSchedule = true + scheduler.enablePreScheduleRatio = 0.95 + + scheduler.UsingGPU = 0 scheduler.parallelism = 1 @@ -189,39 +199,15 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { locks := map[int]sync.Mutex{} - allocationType := 1 + allocationType := 0 availableGPUs := map[string][]GPUStatus{} var candidates []NodeStatus - /* first round, find vacant gpu */ - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[(i+poolID)%pool.poolsCount].Lock() - locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount] - for _, node := range pool.pools[(i+poolID)%pool.poolsCount] { - var available []GPUStatus - for _, status := range node.Status { - if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { - available = append(available, status) - } - } - if len(available) >= task.NumberGPU { - candidates = append(candidates, node) - availableGPUs[node.ClientID] = available - if len(candidates) >= 8 { - break - } - } - } - if len(candidates) >= 8 { - break - } - } - log.Info(candidates) - /* second round, find sharable gpu */ - if len(candidates) == 0 && scheduler.enableShare { + /* first, choose sharable GPUs */ + if scheduler.enableShare && (pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) > scheduler.enableShareRatio) { // check sharable - allocationType = 2 + allocationType = 1 if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { for i := 0; i < pool.poolsCount; i++ { @@ -242,7 +228,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { totalUtil += utilT } } - if totalUtil < 110 { + if totalUtil < 100 { available = append(available, status) availableGPUs[node.ClientID] = available } @@ -264,8 +250,67 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { log.Info(candidates) } - log.Info(allocationType) - /*assign*/ + /* second round, find vacant gpu */ + if len(candidates) == 0 { + allocationType = 2 + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[(i+poolID)%pool.poolsCount].Lock() + locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount] + for _, node := range pool.pools[(i+poolID)%pool.poolsCount] { + var available []GPUStatus + for _, status := range node.Status { + if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { + available = append(available, status) + } + } + if len(available) >= task.NumberGPU { + candidates = append(candidates, node) + availableGPUs[node.ClientID] = available + if len(candidates) >= 8 { + break + } + } + } + if len(candidates) >= 8 { + break + } + } + log.Info(candidates) + } + + /* third round, find gpu to be released */ + if len(candidates) == 0 && len(job.Tasks) == 1 && scheduler.enablePreSchedule { + if pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) > scheduler.enablePreScheduleRatio { + allocationType = 3 + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[(i+poolID)%pool.poolsCount].Lock() + locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount] + for _, node := range pool.pools[(i+poolID)%pool.poolsCount] { + var available []GPUStatus + for _, status := range node.Status { + if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { + available = append(available, status) + } + } + if len(available) >= task.NumberGPU { + candidates = append(candidates, node) + availableGPUs[node.ClientID] = available + if len(candidates) >= 8 { + break + } + } + } + if len(candidates) >= 8 { + break + } + } + log.Info(candidates) + } + } + + log.Info("allocationType is ", allocationType) + + /* assign */ if len(candidates) > 0 { node := candidates[0] res.ClientID = node.ClientID @@ -530,3 +575,15 @@ func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool { log.Info("parallelism is updated to", parallelism) return true } + +func (scheduler *SchedulerFair) SetShareRatio(ratio float64) bool { + scheduler.enableShareRatio = ratio + log.Info("enableShareRatio is updated to", ratio) + return true +} + +func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool { + scheduler.enablePreScheduleRatio = ratio + log.Info("enablePreScheduleRatio is updated to", ratio) + return true +} diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 9e3c58a..0ee0bc0 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -285,3 +285,15 @@ func (scheduler *SchedulerPriority) UpdateParallelism(parallelism int) bool { log.Info("parallelism is updated to", parallelism) return true } + +func (scheduler *SchedulerPriority) SetShareRatio(ratio float64) bool { + //scheduler.enableShareRatio = ratio + log.Info("enableShareRatio is updated to", ratio) + return true +} + +func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool { + //scheduler.enablePreScheduleRatio = ratio + log.Info("enablePreScheduleRatio is updated to", ratio) + return true +} \ No newline at end of file