diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 5ee36af..508d6f0 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -16,17 +16,18 @@ type ResourceCount struct { } type SchedulerFair struct { - history []*Job - queues map[string][]Job - queueMu sync.Mutex - schedulingMu sync.Mutex - schedulingJobsCnt int - jobs map[string]*JobManager - nextQueue string - resourceAllocations map[string]*ResourceCount - enabled bool - latestPoolIndex int - parallelism int + history []*Job + historyMu sync.Mutex + queues map[string][]Job + queueMu sync.Mutex + schedulingMu sync.Mutex + schedulingJobsCnt int + jobs map[string]*JobManager + nextQueue string + resourceAllocations map[string]*ResourceCount + resourceAllocationsMu sync.Mutex + enabled bool + parallelism int } type FairJobSorter []Job @@ -79,7 +80,9 @@ func (scheduler *SchedulerFair) Start() { scheduler.jobs[jm.job.Name] = &jm jm.job.Status = Starting + scheduler.historyMu.Lock() scheduler.history = append(scheduler.history, &jm.job) + scheduler.historyMu.Unlock() go func() { jm.start() @@ -100,6 +103,9 @@ func (scheduler *SchedulerFair) Start() { } func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { + scheduler.historyMu.Lock() + defer scheduler.historyMu.Unlock() + switch state { case Running: scheduler.schedulingMu.Lock() @@ -215,6 +221,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { if len(res.Status) == 0 { return } + scheduler.resourceAllocationsMu.Lock() if _, ok := scheduler.resourceAllocations[job.Group]; !ok { scheduler.resourceAllocations[job.Group] = &ResourceCount{} } @@ -225,6 +232,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { cnt.NumberGPU ++ cnt.MemoryGPU += v.MemoryTotal } + scheduler.resourceAllocationsMu.Unlock() scheduler.UpdateNextQueue() }(res) @@ -250,6 +258,7 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { } } go func(res NodeStatus) { + scheduler.resourceAllocationsMu.Lock() if _, ok := scheduler.resourceAllocations[job.Group]; !ok { scheduler.resourceAllocations[job.Group] = &ResourceCount{} } @@ -260,6 +269,7 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { cnt.NumberGPU -- cnt.MemoryGPU -= v.MemoryTotal } + scheduler.resourceAllocationsMu.Unlock() scheduler.UpdateNextQueue() }(agent) } @@ -290,9 +300,11 @@ func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLo func (scheduler *SchedulerFair) ListJobs() MsgJobList { var jobs []Job + scheduler.historyMu.Lock() for _, job := range scheduler.history { jobs = append(jobs, *job) } + scheduler.historyMu.Unlock() var tmp []Job for _, v := range scheduler.queues { tmp = append(tmp, v...) @@ -311,9 +323,11 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { pendingJobsCounter := 0 var tmp []Job + scheduler.historyMu.Lock() for _, job := range scheduler.history { tmp = append(tmp, *job) } + scheduler.historyMu.Unlock() for _, v := range scheduler.queues { tmp = append(tmp, v...) } @@ -393,6 +407,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { if len(t) == 0 { continue } + scheduler.resourceAllocationsMu.Lock() if _, ok := scheduler.resourceAllocations[k]; !ok { scheduler.resourceAllocations[k] = &ResourceCount{} } @@ -403,6 +418,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { tmp += float64(v.Memory) / Memory tmp += float64(v.NumberGPU) / NumberGPU tmp += float64(v.MemoryGPU) / MemoryGPU + scheduler.resourceAllocationsMu.Unlock() tmp /= 4 weight := 10 if g, ok2 := InstanceOfGroupManager().groups[k]; !ok2 {