diff --git a/src/main.go b/src/main.go index aa6c53b..5be110e 100644 --- a/src/main.go +++ b/src/main.go @@ -309,12 +309,15 @@ func main() { case "FCFS": scheduler = &SchedulerFCFS{} break - case "fair": - scheduler = &SchedulerCapacity{} - break case "priority": scheduler = &SchedulerPriority{} break + case "capacity": + scheduler = &SchedulerCapacity{} + break + case "fair": + scheduler = &SchedulerFair{} + break default: scheduler = &SchedulerFCFS{} } diff --git a/src/scheduler_capacity.go b/src/scheduler_capacity.go index 07b251c..28054c2 100644 --- a/src/scheduler_capacity.go +++ b/src/scheduler_capacity.go @@ -15,7 +15,7 @@ type SchedulerCapacity struct { nextQueue string jobs map[string]*JobManager queues map[string][]Job - queueMu sync.Mutex + queuesMu sync.Mutex schedulingJobs map[string]bool schedulingMu sync.Mutex @@ -28,9 +28,6 @@ type SchedulerCapacity struct { allocatingGPU int allocatingGPUMu sync.Mutex - - queuesSchedulingCnt map[string]int - queuesUsingGPUMu sync.Mutex } type FairJobSorter []Job @@ -46,7 +43,7 @@ func (s FairJobSorter) Less(i, j int) bool { } func (scheduler *SchedulerCapacity) Start() { - log.Info("JS started") + log.Info("JS (capacity) started") scheduler.jobs = map[string]*JobManager{} scheduler.history = []*Job{} @@ -57,12 +54,10 @@ func (scheduler *SchedulerCapacity) Start() { scheduler.enabled = true scheduler.schedulingJobs = map[string]bool{} scheduler.allocatingGPU = 0 - scheduler.queuesSchedulingCnt = map[string]int{} scheduler.parallelism = 1 go func() { - /* capacity scheduler */ flag := true for { log.Debug("Scheduling") @@ -82,7 +77,7 @@ func (scheduler *SchedulerCapacity) Start() { } scheduler.schedulingMu.Unlock() - scheduler.queueMu.Lock() + scheduler.queuesMu.Lock() queue := scheduler.nextQueue go func() { scheduler.UpdateNextQueue() @@ -99,7 +94,7 @@ func (scheduler *SchedulerCapacity) Start() { pool := InstanceOfResourcePool() log.Info(cnt, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU) if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU)*10) { - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() continue } @@ -119,10 +114,6 @@ func (scheduler *SchedulerCapacity) Start() { scheduler.history = append(scheduler.history, &jm.job) scheduler.historyMu.Unlock() - scheduler.queuesUsingGPUMu.Lock() - scheduler.queuesSchedulingCnt[jm.job.Group]++ - scheduler.queuesUsingGPUMu.Unlock() - scheduler.schedulingMu.Lock() scheduler.schedulingJobs[jm.job.Name] = true scheduler.schedulingMu.Unlock() @@ -132,7 +123,7 @@ func (scheduler *SchedulerCapacity) Start() { } else { log.Debug("No more jobs to scheduling ", time.Now()) } - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() } }() } @@ -147,16 +138,6 @@ func (scheduler *SchedulerCapacity) UpdateProgress(job Job, state State) { switch state { case Running: - scheduler.queuesUsingGPUMu.Lock() - if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok { - scheduler.queuesSchedulingCnt[job.Group]-- - if scheduler.queuesSchedulingCnt[job.Group] < 0 { - scheduler.queuesSchedulingCnt[job.Group] = 0 - log.Warn("scheduler.queuesSchedulingCnt less than 0", job.Group) - } - } - scheduler.queuesUsingGPUMu.Unlock() - for i := range scheduler.history { if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Running @@ -192,8 +173,8 @@ func (scheduler *SchedulerCapacity) UpdateProgress(job Job, state State) { } func (scheduler *SchedulerCapacity) Schedule(job Job) { - scheduler.queueMu.Lock() - defer scheduler.queueMu.Unlock() + scheduler.queuesMu.Lock() + defer scheduler.queuesMu.Unlock() queue := job.Group _, ok := scheduler.queues[queue] @@ -287,9 +268,9 @@ func (scheduler *SchedulerCapacity) ReleaseResource(job Job, agent NodeStatus) { } func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus { - scheduler.queueMu.Lock() + scheduler.queuesMu.Lock() jm, ok := scheduler.jobs[jobName] - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} } @@ -297,9 +278,9 @@ func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus { } func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop { - scheduler.queueMu.Lock() + scheduler.queuesMu.Lock() jm, ok := scheduler.jobs[jobName] - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } @@ -307,9 +288,9 @@ func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop { } func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog { - scheduler.queueMu.Lock() + scheduler.queuesMu.Lock() jm, ok := scheduler.jobs[jobName] - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} } @@ -347,11 +328,11 @@ func (scheduler *SchedulerCapacity) Summary() MsgSummary { } scheduler.historyMu.Unlock() - scheduler.queueMu.Lock() + scheduler.queuesMu.Lock() for _, v := range scheduler.queues { tmp = append(tmp, v...) } - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() for _, job := range tmp { switch job.Status { @@ -383,7 +364,7 @@ func (scheduler *SchedulerCapacity) UpdateNextQueue() { NumberGPU := float64(InstanceOfResourcePool().TotalGPU) + 0.00001 - scheduler.queueMu.Lock() + scheduler.queuesMu.Lock() for k, t := range scheduler.queues { if len(t) == 0 { continue @@ -407,7 +388,7 @@ func (scheduler *SchedulerCapacity) UpdateNextQueue() { } } scheduler.nextQueue = next - scheduler.queueMu.Unlock() + scheduler.queuesMu.Unlock() log.Debug("updateNextQueue ->", next) } @@ -430,11 +411,5 @@ func (scheduler *SchedulerCapacity) UpdateParallelism(parallelism int) bool { } func (scheduler *SchedulerCapacity) updateGroup(group Group) bool { - num := 0 - for _, g := range InstanceOfGroupManager().List().Groups { - if g.Reserved { - num += g.NumGPU - } - } return true }