diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 5c68c66..4b5529a 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -17,8 +17,8 @@ type SchedulerFair struct { queues map[string][]Job queueMu sync.Mutex - schedulingJobsCnt int - schedulingMu sync.Mutex + schedulingJobs map[string]bool + schedulingMu sync.Mutex resourceAllocations map[string]*ResourceCount resourceAllocationsMu sync.Mutex @@ -59,7 +59,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.queues["default"] = []Job{} scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.enabled = true - scheduler.schedulingJobsCnt = 0 + scheduler.schedulingJobs = map[string]bool{} scheduler.queueUsingGPU = map[string]int{} scheduler.allocatingGPU = 0 @@ -81,12 +81,11 @@ func (scheduler *SchedulerFair) Start() { continue } scheduler.schedulingMu.Lock() - if scheduler.schedulingJobsCnt >= scheduler.parallelism { + if len(scheduler.schedulingJobs) >= scheduler.parallelism { scheduler.schedulingMu.Unlock() time.Sleep(time.Millisecond * 100) continue } - scheduler.schedulingJobsCnt++ scheduler.schedulingMu.Unlock() scheduler.queueMu.Lock() @@ -113,10 +112,7 @@ func (scheduler *SchedulerFair) Start() { pool := InstanceOfResourcePool() log.Info(cnt, reserved, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU) - if scheduler.schedulingJobsCnt > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU-reserved)*10) { - scheduler.schedulingMu.Lock() - scheduler.schedulingJobsCnt-- - scheduler.schedulingMu.Unlock() + if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU-reserved)*10) { scheduler.queueMu.Unlock() continue } @@ -126,7 +122,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.allocatingGPU += cnt scheduler.allocatingGPUMu.Unlock() log.Info("allocatingGPU is ", scheduler.allocatingGPU) - log.Info("schedulingJobsCnt is ", scheduler.schedulingJobsCnt) + log.Info("schedulingJobs are ", scheduler.schedulingJobs) scheduler.queues[queue] = scheduler.queues[queue][1:] jm.scheduler = scheduler @@ -141,13 +137,15 @@ func (scheduler *SchedulerFair) Start() { scheduler.queuesSchedulingCnt[jm.job.Group]++ scheduler.queuesUsingGPUMu.Unlock() + scheduler.schedulingMu.Lock() + scheduler.schedulingJobs[jm.job.Name] = true + scheduler.schedulingMu.Unlock() go func() { jm.start() }() } else { log.Debug("No more jobs to scheduling ", time.Now()) scheduler.schedulingMu.Lock() - scheduler.schedulingJobsCnt-- scheduler.schedulingMu.Unlock() } scheduler.queueMu.Unlock() @@ -192,10 +190,6 @@ func (scheduler *SchedulerFair) Start() { jm := JobManager{} jm.job = scheduler.queues[q][0] - scheduler.schedulingMu.Lock() - scheduler.schedulingJobsCnt++ - scheduler.schedulingMu.Unlock() - scheduler.queuesUsingGPUMu.Lock() scheduler.queuesSchedulingCnt[jm.job.Group]++ scheduler.queuesUsingGPUMu.Unlock() @@ -204,7 +198,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.allocatingGPU += numberGPU scheduler.allocatingGPUMu.Unlock() log.Info("allocatingGPU is ", scheduler.allocatingGPU) - log.Info("schedulingJobsCnt is ", scheduler.schedulingJobsCnt) + log.Info("schedulingJobs are ", scheduler.schedulingJobs) scheduler.queues[q] = scheduler.queues[q][1:] jm.scheduler = scheduler @@ -233,12 +227,12 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) { scheduler.historyMu.Lock() defer scheduler.historyMu.Unlock() + scheduler.schedulingMu.Lock() + delete(scheduler.schedulingJobs, job.Name) + scheduler.schedulingMu.Unlock() + switch state { case Running: - scheduler.schedulingMu.Lock() - scheduler.schedulingJobsCnt-- - scheduler.schedulingMu.Unlock() - scheduler.queuesUsingGPUMu.Lock() if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok { scheduler.queuesSchedulingCnt[job.Group]--