diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 242e2d7..0342125 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -39,6 +39,10 @@ type SchedulerFair struct { allocatingGPU int allocatingGPUMu sync.Mutex + + queueUsingGPU map[string]int + reservedGPU int + queuesSchedulingCnt map[string]int } type FairJobSorter []Job @@ -62,6 +66,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.enabled = true scheduler.schedulingJobsCnt = 0 + scheduler.queueUsingGPU = map[string]int{} scheduler.enableShare = true scheduler.enableShareRatio = 0.75 @@ -70,6 +75,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.UsingGPU = 0 scheduler.allocatingGPU = 0 + scheduler.queuesSchedulingCnt = map[string]int{} scheduler.parallelism = 1 @@ -138,6 +144,68 @@ func (scheduler *SchedulerFair) Start() { scheduler.queueMu.Unlock() } }() + + /* schedule capacity queues */ + go func() { + for { + flag := false + scheduler.queueMu.Lock() + for q, t := range scheduler.queues { + if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved { + continue + } + if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 { + continue + } + numberGPU := 0 + for _, v := range t[0].Tasks { + numberGPU += v.NumberGPU + } + + available := InstanceOfGroupManager().groups[t[0].Group].NumGPU + if cnt, ok := scheduler.queueUsingGPU[t[0].Group]; ok { + available -= cnt + } + + if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { + continue + } + + if numberGPU < available { + jm := JobManager{} + jm.job = scheduler.queues[q][0] + + scheduler.schedulingMu.Lock() + scheduler.schedulingJobsCnt++ + scheduler.schedulingMu.Unlock() + + scheduler.allocatingGPUMu.Lock() + scheduler.allocatingGPU += numberGPU + scheduler.allocatingGPUMu.Unlock() + log.Info("allocatingGPU is ", scheduler.allocatingGPU) + log.Info("schedulingJobsCnt is ", scheduler.schedulingJobsCnt) + + scheduler.queues[q] = scheduler.queues[q][1:] + jm.scheduler = scheduler + scheduler.jobs[jm.job.Name] = &jm + + jm.job.Status = Starting + scheduler.historyMu.Lock() + scheduler.history = append(scheduler.history, &jm.job) + scheduler.historyMu.Unlock() + + go func() { + jm.start() + }() + flag = true + } + } + scheduler.queueMu.Unlock() + if !flag { + time.Sleep(time.Millisecond * 100) + } + } + }() } func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) {