From 08097b80bdf9045f1488c846bb30525e1a11623b Mon Sep 17 00:00:00 2001 From: Newnius Date: Sun, 3 May 2020 11:04:17 +0800 Subject: [PATCH] update --- src/scheduler_fair.go | 83 +++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 6395201..4417c74 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -16,18 +16,22 @@ type ResourceCount struct { } type SchedulerFair struct { - history []*Job - historyMu sync.Mutex - queues map[string][]Job - queueMu sync.Mutex - schedulingMu sync.Mutex - schedulingJobsCnt int - jobs map[string]*JobManager - nextQueue string + history []*Job + historyMu sync.Mutex + + nextQueue string + jobs map[string]*JobManager + queues map[string][]Job + queueMu sync.Mutex + + schedulingJobsCnt int + schedulingMu sync.Mutex + resourceAllocations map[string]*ResourceCount resourceAllocationsMu sync.Mutex - enabled bool - parallelism int + + enabled bool + parallelism int enableShare bool enableShareRatio float64 @@ -40,10 +44,10 @@ type SchedulerFair struct { allocatingGPU int allocatingGPUMu sync.Mutex - queuesUsingGPUMu sync.Mutex - queueUsingGPU map[string]int reservedGPU int queuesSchedulingCnt map[string]int + queueUsingGPU map[string]int + queuesUsingGPUMu sync.Mutex } type FairJobSorter []Job @@ -81,8 +85,13 @@ func (scheduler *SchedulerFair) Start() { scheduler.parallelism = 1 go func() { + flag := true for { log.Debug("Scheduling") + if !flag { + time.Sleep(time.Second * 100) + } + flag = false if !scheduler.enabled { time.Sleep(time.Millisecond * 100) continue @@ -93,9 +102,9 @@ func (scheduler *SchedulerFair) Start() { time.Sleep(time.Millisecond * 100) continue } - scheduler.schedulingJobsCnt++ scheduler.schedulingMu.Unlock() + scheduler.queueMu.Lock() queue := scheduler.nextQueue if len(scheduler.queues[queue]) > 0 { @@ -115,14 +124,20 @@ func (scheduler *SchedulerFair) Start() { } scheduler.queuesUsingGPUMu.Unlock() - if scheduler.schedulingJobsCnt > 0 { - if (cnt+scheduler.allocatingGPU+1)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10 { - scheduler.schedulingMu.Lock() - scheduler.schedulingJobsCnt-- - scheduler.schedulingMu.Unlock() - continue - } + if cnt == 1 { + cnt *= 10 + } else { + cnt *= 13 } + if cnt+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10 { + scheduler.schedulingMu.Lock() + scheduler.schedulingJobsCnt-- + scheduler.schedulingMu.Unlock() + scheduler.queueMu.Unlock() + continue + } + + flag = true scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPU += cnt scheduler.allocatingGPUMu.Unlock() @@ -146,7 +161,6 @@ func (scheduler *SchedulerFair) Start() { scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt-- scheduler.schedulingMu.Unlock() - time.Sleep(time.Millisecond * 100) go func() { scheduler.UpdateNextQueue() }() @@ -162,11 +176,16 @@ func (scheduler *SchedulerFair) Start() { scheduler.queueMu.Lock() for q, t := range scheduler.queues { if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved { + scheduler.queueMu.Unlock() continue } + scheduler.queuesUsingGPUMu.Lock() if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 { + scheduler.queueMu.Unlock() + scheduler.queuesUsingGPUMu.Unlock() continue } + scheduler.queuesUsingGPUMu.Unlock() numberGPU := 0 for _, v := range t[0].Tasks { numberGPU += v.NumberGPU @@ -180,6 +199,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.queuesUsingGPUMu.Unlock() if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { + scheduler.queueMu.Unlock() continue } @@ -189,9 +209,12 @@ func (scheduler *SchedulerFair) Start() { scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt++ - scheduler.queuesSchedulingCnt[jm.job.Group]++ scheduler.schedulingMu.Unlock() + scheduler.queuesUsingGPUMu.Lock() + scheduler.queuesSchedulingCnt[jm.job.Group]++ + scheduler.queuesUsingGPUMu.Unlock() + scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPU += numberGPU scheduler.allocatingGPUMu.Unlock() @@ -229,10 +252,13 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) { case Running: scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt-- + scheduler.schedulingMu.Unlock() + + scheduler.queuesUsingGPUMu.Lock() if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok { scheduler.queuesSchedulingCnt[job.Group]-- } - scheduler.schedulingMu.Unlock() + scheduler.queuesUsingGPUMu.Unlock() for i := range scheduler.history { if scheduler.history[i].Name == job.Name { @@ -558,7 +584,9 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { } func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus { + scheduler.queueMu.Lock() jm, ok := scheduler.jobs[jobName] + scheduler.queueMu.Unlock() if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} } @@ -566,7 +594,9 @@ func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus { } func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { + scheduler.queueMu.Lock() jm, ok := scheduler.jobs[jobName] + scheduler.queueMu.Unlock() if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } @@ -574,7 +604,9 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { } func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLog { + scheduler.queueMu.Lock() jm, ok := scheduler.jobs[jobName] + scheduler.queueMu.Unlock() if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} } @@ -611,9 +643,12 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { tmp = append(tmp, *job) } scheduler.historyMu.Unlock() + + scheduler.queueMu.Lock() for _, v := range scheduler.queues { tmp = append(tmp, v...) } + scheduler.queueMu.Unlock() for _, job := range tmp { switch job.Status { @@ -686,6 +721,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { pool.poolsMu[i].Unlock() } + scheduler.queueMu.Lock() for k, t := range scheduler.queues { if len(t) == 0 { continue @@ -714,6 +750,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { } } scheduler.nextQueue = next + scheduler.queueMu.Unlock() log.Debug("updateNextQueue ->", next) }