diff --git a/src/main.go b/src/main.go index 2035e39..6132ca7 100644 --- a/src/main.go +++ b/src/main.go @@ -122,6 +122,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { msg.Error = err.Error() } else { msg = InstanceOfGroupManager().Add(group) + scheduler.updateGroup(group) } js, _ := json.Marshal(msg) w.Header().Set("Content-Type", "application/json") @@ -138,6 +139,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { msg.Error = err.Error() } else { msg = InstanceOfGroupManager().Update(group) + scheduler.updateGroup(group) } js, _ := json.Marshal(msg) w.Header().Set("Content-Type", "application/json") @@ -154,6 +156,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { msg.Error = err.Error() } else { msg = InstanceOfGroupManager().Remove(group) + scheduler.updateGroup(group) } js, _ := json.Marshal(msg) w.Header().Set("Content-Type", "application/json") diff --git a/src/scheduler.go b/src/scheduler.go index 085a4d2..c5a0269 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -38,4 +38,6 @@ type Scheduler interface { SetShareRatio(ratio float64) bool SetPreScheduleRatio(ratio float64) bool + + updateGroup(group Group) bool } diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index 7e5ccc3..ed7fa91 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -273,3 +273,7 @@ func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool { log.Info("enablePreScheduleRatio is updated to", ratio) return true } + +func (scheduler *SchedulerFCFS) updateGroup(group Group) bool { + return true +} diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 7f3c767..6395201 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -40,6 +40,7 @@ type SchedulerFair struct { allocatingGPU int allocatingGPUMu sync.Mutex + queuesUsingGPUMu sync.Mutex queueUsingGPU map[string]int reservedGPU int queuesSchedulingCnt map[string]int @@ -105,8 +106,17 @@ func (scheduler *SchedulerFair) Start() { for _, task := range jm.job.Tasks { cnt += task.NumberGPU } - if scheduler.schedulingJobsCnt > 1 { - if (cnt+scheduler.allocatingGPU+1)*13 > (pool.TotalGPU-scheduler.UsingGPU-scheduler.reservedGPU)*10 { + reserved := scheduler.reservedGPU + scheduler.queuesUsingGPUMu.Lock() + for g, v := range scheduler.queueUsingGPU { + if InstanceOfGroupManager().groups[g].Reserved { + reserved -= v + } + } + scheduler.queuesUsingGPUMu.Unlock() + + if scheduler.schedulingJobsCnt > 0 { + if (cnt+scheduler.allocatingGPU+1)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10 { scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt-- scheduler.schedulingMu.Unlock() @@ -163,9 +173,11 @@ func (scheduler *SchedulerFair) Start() { } available := InstanceOfGroupManager().groups[t[0].Group].NumGPU + scheduler.queuesUsingGPUMu.Lock() if cnt, ok := scheduler.queueUsingGPU[t[0].Group]; ok { available -= cnt } + scheduler.queuesUsingGPUMu.Unlock() if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { continue @@ -447,8 +459,6 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node if node.Status[j].MemoryAllocated == 0 { scheduler.UsingGPUMu.Lock() scheduler.UsingGPU ++ - scheduler.queueUsingGPU[job.Group] += task.NumberGPU - scheduler.reservedGPU += task.NumberGPU scheduler.UsingGPUMu.Unlock() } node.Status[j].MemoryAllocated += task.MemoryGPU @@ -459,6 +469,11 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node for _, t := range res.Status { scheduler.Attach(t.UUID, job.Name) } + + scheduler.queuesUsingGPUMu.Lock() + scheduler.queueUsingGPU[job.Group] += task.NumberGPU + scheduler.queuesUsingGPUMu.Unlock() + scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPU -= task.NumberGPU scheduler.allocatingGPUMu.Unlock() @@ -509,16 +524,6 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { if node.Status[j].MemoryAllocated == 0 { scheduler.UsingGPUMu.Lock() scheduler.UsingGPU-- - if _, ok := scheduler.queueUsingGPU[job.Group]; ok { - scheduler.queueUsingGPU[job.Group]-- - scheduler.reservedGPU-- - if scheduler.queueUsingGPU[job.Group] < 0 { - scheduler.queueUsingGPU[job.Group] = 0 - } - if scheduler.reservedGPU < 0 { - scheduler.reservedGPU = 0 - } - } scheduler.UsingGPUMu.Unlock() log.Info(node.Status[j].UUID, " is released") } @@ -526,6 +531,15 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { } } } + scheduler.queuesUsingGPUMu.Lock() + if _, ok := scheduler.queueUsingGPU[job.Group]; ok { + scheduler.queueUsingGPU[job.Group] -= len(agent.Status) + if scheduler.queueUsingGPU[job.Group] < 0 { + log.Warn("queueUsingGPU exceeded ", scheduler.queueUsingGPU[job.Group]) + scheduler.queueUsingGPU[job.Group] = 0 + } + } + scheduler.queuesUsingGPUMu.Unlock() go func(res NodeStatus) { scheduler.resourceAllocationsMu.Lock() if _, ok := scheduler.resourceAllocations[job.Group]; !ok { @@ -740,3 +754,16 @@ func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool { log.Info("enablePreScheduleRatio is updated to ", ratio) return true } + +func (scheduler *SchedulerFair) updateGroup(group Group) bool { + num := 0 + for _, g := range InstanceOfGroupManager().List().Groups { + if g.Reserved { + num += g.NumGPU + } + } + scheduler.queuesUsingGPUMu.Lock() + scheduler.reservedGPU = num + scheduler.queuesUsingGPUMu.Unlock() + return true +} diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index faef69f..2858b4a 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -297,3 +297,7 @@ func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool { log.Info("enablePreScheduleRatio is updated to", ratio) return true } + +func (scheduler *SchedulerPriority) updateGroup(group Group) bool { + return true +}