From d8c6e4c9be802aa9f57c5d053c5017f952df24e3 Mon Sep 17 00:00:00 2001 From: Newnius Date: Sun, 3 May 2020 00:14:08 +0800 Subject: [PATCH] update --- src/scheduler_fair.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 0342125..4bc72f6 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -177,6 +177,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt++ + scheduler.queuesSchedulingCnt[jm.job.Group]++ scheduler.schedulingMu.Unlock() scheduler.allocatingGPUMu.Lock() @@ -208,7 +209,7 @@ func (scheduler *SchedulerFair) Start() { }() } -func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { +func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) { scheduler.historyMu.Lock() defer scheduler.historyMu.Unlock() @@ -216,10 +217,13 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { case Running: scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt-- + if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok { + scheduler.queuesSchedulingCnt[job.Group]-- + } scheduler.schedulingMu.Unlock() for i := range scheduler.history { - if scheduler.history[i].Name == jobName { + if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Running scheduler.history[i].UpdatedAt = int(time.Now().Unix()) } @@ -227,7 +231,7 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { break case Finished: for i := range scheduler.history { - if scheduler.history[i].Name == jobName { + if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Finished scheduler.history[i].UpdatedAt = int(time.Now().Unix()) } @@ -235,7 +239,7 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { break case Stopped: for i := range scheduler.history { - if scheduler.history[i].Name == jobName { + if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Stopped scheduler.history[i].UpdatedAt = int(time.Now().Unix()) } @@ -443,6 +447,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node if node.Status[j].MemoryAllocated == 0 { scheduler.UsingGPUMu.Lock() scheduler.UsingGPU ++ + scheduler.queueUsingGPU[job.Group] += task.NumberGPU scheduler.UsingGPUMu.Unlock() } node.Status[j].MemoryAllocated += task.MemoryGPU @@ -462,6 +467,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node for i := range locks { pool.poolsMu[i].Unlock() } + go func(res NodeStatus) { if len(res.Status) == 0 { return @@ -502,6 +508,12 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { if node.Status[j].MemoryAllocated == 0 { scheduler.UsingGPUMu.Lock() scheduler.UsingGPU-- + if _, ok := scheduler.queueUsingGPU[job.Group]; ok { + scheduler.queueUsingGPU[job.Group]-- + if scheduler.queueUsingGPU[job.Group] < 0 { + scheduler.queueUsingGPU[job.Group] = 0 + } + } scheduler.UsingGPUMu.Unlock() log.Info(node.Status[j].UUID, " is released") }