From dbd880c97195fe478ab1fa9870dd915193ba4dc3 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 30 Apr 2020 14:04:40 +0800 Subject: [PATCH] update --- src/job_manager.go | 1 + src/optimizer.go | 4 +- src/resource_pool.go | 8 ++++ src/scheduler_fair.go | 105 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 98 insertions(+), 20 deletions(-) diff --git a/src/job_manager.go b/src/job_manager.go index bfde255..572f15f 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -139,6 +139,7 @@ func (jm *JobManager) start() { if onlyPS { jm.stop() log.Info("Only PS is running, stop", jm.job.Name) + jm.killedFlag = false break } if !flag { diff --git a/src/optimizer.go b/src/optimizer.go index 7a79c6c..d640fd6 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -88,7 +88,7 @@ func (optimizer *Optimizer) feed(job string, utils []int) { }() } -func (optimizer *Optimizer) predictTime(job string, utils []int) (int, bool) { +func (optimizer *Optimizer) predictUtilGPU(job string) (int, bool) { if _, err := optimizer.jobUtilsGPU[job]; err { return 100, false } @@ -98,7 +98,7 @@ func (optimizer *Optimizer) predictTime(job string, utils []int) (int, bool) { return optimizer.jobUtilsGPU[job], false } -func (optimizer *Optimizer) predictUtilGPU(job string) (OptimizerJobExecutionTime, bool) { +func (optimizer *Optimizer) predictTime(job string) (OptimizerJobExecutionTime, bool) { if _, err := optimizer.predicts[job]; err { return OptimizerJobExecutionTime{}, false } diff --git a/src/resource_pool.go b/src/resource_pool.go index 3f1d362..9f67b7f 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -37,6 +37,14 @@ type ResourcePool struct { utils map[string][]int } +func (pool *ResourcePool) GPUModelToPower(model string) int { + mapper := map[string]int{"k40": 1, "K80": 2, "P100": 3} + if power, err := mapper[model]; !err { + return power + } + return 0 +} + func (pool *ResourcePool) getNodePool(name string) int { h := fnv.New32a() h.Write([]byte(name)) diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 4fd6243..d87cab9 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -183,40 +183,109 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { poolID := rand.Intn(pool.poolsCount) res := NodeStatus{} + var locks []sync.Mutex + + var candidates []NodeStatus + /* first round, find vacant gpu */ for i := poolID; i < pool.poolsCount; i++ { pool.poolsMu[i].Lock() - flag := false - for id, node := range pool.pools[i] { + locks = append(locks, pool.poolsMu[i]) + for _, node := range pool.pools[i] { var available []GPUStatus for _, status := range node.Status { - if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { + if status.MemoryTotal >= task.MemoryGPU && status.MemoryUsed < 10 { available = append(available, status) } } if len(available) >= task.NumberGPU { - res.ClientID = id - res.ClientHost = node.ClientHost - res.Status = available[0:task.NumberGPU] - res.NumCPU = task.NumberCPU - res.MemTotal = task.Memory + tmp := NodeStatus{} + tmp.ClientID = node.ClientID + tmp.ClientHost = node.ClientHost + tmp.Status = available + tmp.NumCPU = node.NumCPU + tmp.MemTotal = node.MemAvailable + candidates = append(candidates, tmp) + if len(candidates) >= 8 { + break + } + } + } + if len(candidates) >= 8 { + break + } + } + log.Info(candidates) - for i := range res.Status { - for j := range node.Status { - if res.Status[i].UUID == node.Status[j].UUID { - node.Status[j].MemoryAllocated += task.MemoryGPU - res.Status[i].MemoryTotal = task.MemoryGPU + /* second round, find sharable gpu */ + if len(candidates) == 0 { + // check sharable + if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { + + for i := poolID; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + locks = append(locks, pool.poolsMu[i]) + for _, node := range pool.pools[i] { + var available []GPUStatus + for _, status := range node.Status { + if status.MemoryTotal >= task.MemoryGPU+status.MemoryAllocated && status.MemoryFree > task.MemoryGPU { + + if jobs, err := pool.bindings[status.UUID]; !err { + totalUtil := util + for job := range jobs { + if utilT, err := InstanceOfOptimizer().predictUtilGPU(job); !err { + totalUtil += utilT + } + } + if totalUtil < 100 { + available = append(available, status) + } + } + } + } + if len(available) >= task.NumberGPU { + tmp := NodeStatus{} + tmp.ClientID = node.ClientID + tmp.ClientHost = node.ClientHost + tmp.Status = available + tmp.NumCPU = node.NumCPU + tmp.MemTotal = node.MemAvailable + candidates = append(candidates, tmp) + if len(candidates) >= 8 { + break } } } - flag = true - break + if len(candidates) >= 8 { + break + } } } - pool.poolsMu[i].Unlock() - if flag { - break + } + log.Info(candidates) + + /*assign*/ + if len(candidates) > 0 { + node := candidates[0] + res := NodeStatus{} + res.ClientID = node.ClientID + res.ClientHost = node.ClientHost + res.Status = candidates[0].Status[0:task.NumberGPU] + res.NumCPU = task.NumberCPU + res.MemTotal = task.Memory + + for i := range res.Status { + for j := range node.Status { + if res.Status[i].UUID == node.Status[j].UUID { + node.Status[j].MemoryAllocated += task.MemoryGPU + res.Status[i].MemoryTotal = task.MemoryGPU + } + } } } + + for _, lock := range locks { + lock.Unlock() + } go func(res NodeStatus) { if len(res.Status) == 0 { return