diff --git a/src/resource_pool.go b/src/resource_pool.go index 6794976..49fb6c7 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -50,10 +50,12 @@ type ResourcePool struct { bindingsMu sync.Mutex utils map[string][]UtilGPUTimeSeries - TotalGPU int - TotalGPUMu sync.Mutex - UsingGPU int - UsingGPUMu sync.Mutex + TotalGPU int + TotalGPUMu sync.Mutex + TotalCPU int + TotalMemory int + UsingGPU int + UsingGPUMu sync.Mutex enableShare bool enableShareRatio float64 @@ -73,6 +75,9 @@ func (pool *ResourcePool) init(conf Configuration) { pool.TotalGPU = 0 pool.UsingGPU = 0 + pool.TotalCPU = 0 + pool.TotalMemory = 0 + pool.enableShare = true pool.enableShareRatio = 0.75 pool.enablePreSchedule = true @@ -129,6 +134,8 @@ func (pool *ResourcePool) checkDeadNodes() { pool.TotalGPUMu.Lock() if _, ok := seg.Nodes[k]; ok { pool.TotalGPU -= len(seg.Nodes[k].Status) + pool.TotalCPU -= seg.Nodes[k].NumCPU + pool.TotalMemory -= seg.Nodes[k].MemTotal } pool.TotalGPUMu.Unlock() delete(seg.Nodes, k) @@ -240,6 +247,8 @@ func (pool *ResourcePool) saveStatusHistory() { pool.TotalGPUMu.Lock() pool.TotalGPU = TotalGPU + pool.TotalCPU = TotalCPU + pool.TotalMemory = TotalMemGPU pool.TotalGPUMu.Unlock() time.Sleep(time.Second * 60) } @@ -307,6 +316,8 @@ func (pool *ResourcePool) update(node NodeStatus) { /* TODO: double check node do belong to this seg */ pool.TotalGPUMu.Lock() pool.TotalGPU += len(node.Status) + pool.TotalCPU += node.NumCPU + pool.TotalMemory += node.MemTotal pool.TotalGPUMu.Unlock() log.Info("node ", node.ClientID, " is online") } diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 651ff73..20debb3 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -107,9 +107,11 @@ func (scheduler *SchedulerFair) Start() { if bestQueue != "" { numberGPUtmp := 0 numberCPUtmp := 0 + Memorytmp := 0 for _, task := range scheduler.queues[bestQueue][0].Tasks { numberGPUtmp += task.NumberGPU numberCPUtmp += task.NumberCPU + Memorytmp += task.Memory } log.Info("schedulingJobs are ", scheduler.schedulingJobs) @@ -122,6 +124,7 @@ func (scheduler *SchedulerFair) Start() { if quota, ok := scheduler.queuesQuota[bestQueue]; ok { quota.NumberGPU -= numberGPUtmp * 100 quota.CPU -= numberCPUtmp * 100 + quota.Memory -= Memorytmp } log.Info("After, ", scheduler.queuesQuota[bestQueue]) @@ -300,25 +303,35 @@ func (scheduler *SchedulerFair) UpdateQuota() { /* phase 1: DRF */ usingGPU := 0 + usingCPU := 0 + usingMemory := 0 allocatedGPU := 0 + allocatedCPU := 0 + allocatedMemory := 0 scheduler.resourceAllocationsMu.Lock() for _, quota := range scheduler.resourceAllocations { usingGPU += quota.NumberGPU + usingCPU += quota.CPU + usingMemory += quota.Memory } scheduler.resourceAllocationsMu.Unlock() for _, quota := range scheduler.queuesQuota { allocatedGPU += quota.NumberGPU + allocatedCPU += quota.CPU + allocatedMemory += quota.Memory } pool := InstanceOfResourcePool() - available := pool.TotalGPU - usingGPU - allocatedGPU/100 + availableGPU := pool.TotalGPU - usingGPU - allocatedGPU/100 + availableCPU := pool.TotalCPU - usingCPU - allocatedCPU/100 + availableMemory := pool.TotalMemory - usingMemory - allocatedMemory /* <0 means some nodes exited */ - if available <= 0 { + if availableGPU <= 0 { return } - log.Info("Can allocate ", available) + log.Info("Can allocate ", availableGPU) log.Info("Before ") for queue, quota := range scheduler.queuesQuota { log.Info("Queue<->", queue) @@ -326,20 +339,52 @@ func (scheduler *SchedulerFair) UpdateQuota() { log.Info("CPU:", quota.CPU) log.Info("Memory:", quota.Memory) } - available *= 100 - per := available / len(scheduler.queues) - for queue := range scheduler.queues { + availableGPU *= 100 + availableCPU *= 100 + + var candidates []string + requests := map[string]ResourceCount{} + weights := 0 + + for queue, jobs := range scheduler.queues { + if len(jobs) > 0 { + candidates = append(candidates, queue) + } + weights += InstanceOfGroupManager().groups[queue].Weight + request := ResourceCount{} + for _, job := range jobs { + GPU := 0 + CPU := 0 + Memory := 0 + for _, task := range job.Tasks { + GPU += task.NumberGPU + CPU += task.NumberCPU + Memory += task.Memory + } + request.NumberGPU += GPU + request.CPU += CPU + request.Memory += Memory + } + requests[queue] = request + } + + per := availableGPU / weights + for _, queue := range candidates { if _, ok := scheduler.queuesQuota[queue]; !ok { scheduler.queuesQuota[queue] = &ResourceCount{} } + weight := InstanceOfGroupManager().groups[queue].Weight quota := scheduler.queuesQuota[queue] - quota.NumberGPU += per - available -= per + quota.NumberGPU += per * weight + availableGPU -= per * weight + + quota.CPU += (requests[queue].CPU / requests[queue].NumberGPU) * per * weight + quota.Memory += (requests[queue].Memory / requests[queue].NumberGPU) * per * weight } - if available > 0 { + if availableGPU > 0 { for queue := range scheduler.queues { quota := scheduler.queuesQuota[queue] - quota.NumberGPU += available + quota.NumberGPU += availableGPU break } }