diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 4b5529a..b5566a3 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -29,9 +29,7 @@ type SchedulerFair struct { allocatingGPU int allocatingGPUMu sync.Mutex - reservedGPU int queuesSchedulingCnt map[string]int - queueUsingGPU map[string]int queuesUsingGPUMu sync.Mutex mu sync.Mutex @@ -60,8 +58,6 @@ func (scheduler *SchedulerFair) Start() { scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.enabled = true scheduler.schedulingJobs = map[string]bool{} - scheduler.queueUsingGPU = map[string]int{} - scheduler.allocatingGPU = 0 scheduler.queuesSchedulingCnt = map[string]int{} @@ -101,18 +97,10 @@ func (scheduler *SchedulerFair) Start() { for _, task := range jm.job.Tasks { cnt += task.NumberGPU } - reserved := scheduler.reservedGPU - scheduler.queuesUsingGPUMu.Lock() - for g, v := range scheduler.queueUsingGPU { - if InstanceOfGroupManager().groups[g].Reserved { - reserved -= v - } - } - scheduler.queuesUsingGPUMu.Unlock() pool := InstanceOfResourcePool() - log.Info(cnt, reserved, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU) - if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU-reserved)*10) { + log.Info(cnt, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU) + if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU)*10) { scheduler.queueMu.Unlock() continue } @@ -151,76 +139,6 @@ func (scheduler *SchedulerFair) Start() { scheduler.queueMu.Unlock() } }() - - /* schedule capacity queues */ - go func() { - for { - flag := false - scheduler.queueMu.Lock() - for q, t := range scheduler.queues { - if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved { - continue - } - //log.Info(scheduler.queueUsingGPU) - //log.Info(scheduler.queuesSchedulingCnt) - scheduler.queuesUsingGPUMu.Lock() - if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 { - scheduler.queuesUsingGPUMu.Unlock() - continue - } - scheduler.queuesUsingGPUMu.Unlock() - numberGPU := 0 - for _, v := range t[0].Tasks { - numberGPU += v.NumberGPU - } - - available := InstanceOfGroupManager().groups[t[0].Group].NumGPU - scheduler.queuesUsingGPUMu.Lock() - if cnt, ok := scheduler.queueUsingGPU[t[0].Group]; ok { - available -= cnt - } - scheduler.queuesUsingGPUMu.Unlock() - - pool := InstanceOfResourcePool() - if pool.TotalGPU-pool.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { - continue - } - - if numberGPU <= available { - jm := JobManager{} - jm.job = scheduler.queues[q][0] - - scheduler.queuesUsingGPUMu.Lock() - scheduler.queuesSchedulingCnt[jm.job.Group]++ - scheduler.queuesUsingGPUMu.Unlock() - - scheduler.allocatingGPUMu.Lock() - scheduler.allocatingGPU += numberGPU - scheduler.allocatingGPUMu.Unlock() - log.Info("allocatingGPU is ", scheduler.allocatingGPU) - log.Info("schedulingJobs are ", scheduler.schedulingJobs) - - scheduler.queues[q] = scheduler.queues[q][1:] - jm.scheduler = scheduler - scheduler.jobs[jm.job.Name] = &jm - - jm.job.Status = Starting - scheduler.historyMu.Lock() - scheduler.history = append(scheduler.history, &jm.job) - scheduler.historyMu.Unlock() - - go func() { - jm.start() - }() - flag = true - } - } - scheduler.queueMu.Unlock() - if !flag { - time.Sleep(time.Millisecond * 100) - } - } - }() } func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) { @@ -323,9 +241,6 @@ func (scheduler *SchedulerFair) AcquireResource(job Job) []NodeStatus { if len(res) != 0 { for _, task := range job.Tasks { - scheduler.queuesUsingGPUMu.Lock() - scheduler.queueUsingGPU[job.Group] += task.NumberGPU - scheduler.queuesUsingGPUMu.Unlock() scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPU -= task.NumberGPU @@ -358,15 +273,6 @@ func (scheduler *SchedulerFair) AcquireResource(job Job) []NodeStatus { func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { InstanceOfResourcePool().releaseResource(job, agent) - scheduler.queuesUsingGPUMu.Lock() - if _, ok := scheduler.queueUsingGPU[job.Group]; ok { - scheduler.queueUsingGPU[job.Group] -= len(agent.Status) - if scheduler.queueUsingGPU[job.Group] < 0 { - log.Warn("queueUsingGPU exceeded ", scheduler.queueUsingGPU[job.Group]) - scheduler.queueUsingGPU[job.Group] = 0 - } - } - scheduler.queuesUsingGPUMu.Unlock() go func(res NodeStatus) { scheduler.resourceAllocationsMu.Lock() if _, ok := scheduler.resourceAllocations[job.Group]; !ok { @@ -534,8 +440,5 @@ func (scheduler *SchedulerFair) updateGroup(group Group) bool { num += g.NumGPU } } - scheduler.queuesUsingGPUMu.Lock() - scheduler.reservedGPU = num - scheduler.queuesUsingGPUMu.Unlock() return true }