1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 16:16:44 +00:00
This commit is contained in:
2020-05-25 13:44:54 +08:00
parent 3cda4e2480
commit 9fed1ed817

View File

@@ -17,7 +17,7 @@ type SchedulerFair struct {
queues map[string][]Job queues map[string][]Job
queueMu sync.Mutex queueMu sync.Mutex
schedulingJobsCnt int schedulingJobs map[string]bool
schedulingMu sync.Mutex schedulingMu sync.Mutex
resourceAllocations map[string]*ResourceCount resourceAllocations map[string]*ResourceCount
@@ -59,7 +59,7 @@ func (scheduler *SchedulerFair) Start() {
scheduler.queues["default"] = []Job{} scheduler.queues["default"] = []Job{}
scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.resourceAllocations = map[string]*ResourceCount{}
scheduler.enabled = true scheduler.enabled = true
scheduler.schedulingJobsCnt = 0 scheduler.schedulingJobs = map[string]bool{}
scheduler.queueUsingGPU = map[string]int{} scheduler.queueUsingGPU = map[string]int{}
scheduler.allocatingGPU = 0 scheduler.allocatingGPU = 0
@@ -81,12 +81,11 @@ func (scheduler *SchedulerFair) Start() {
continue continue
} }
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
if scheduler.schedulingJobsCnt >= scheduler.parallelism { if len(scheduler.schedulingJobs) >= scheduler.parallelism {
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
continue continue
} }
scheduler.schedulingJobsCnt++
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
scheduler.queueMu.Lock() scheduler.queueMu.Lock()
@@ -113,10 +112,7 @@ func (scheduler *SchedulerFair) Start() {
pool := InstanceOfResourcePool() pool := InstanceOfResourcePool()
log.Info(cnt, reserved, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU) log.Info(cnt, reserved, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU)
if scheduler.schedulingJobsCnt > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU-reserved)*10) { if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU-reserved)*10) {
scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock()
scheduler.queueMu.Unlock() scheduler.queueMu.Unlock()
continue continue
} }
@@ -126,7 +122,7 @@ func (scheduler *SchedulerFair) Start() {
scheduler.allocatingGPU += cnt scheduler.allocatingGPU += cnt
scheduler.allocatingGPUMu.Unlock() scheduler.allocatingGPUMu.Unlock()
log.Info("allocatingGPU is ", scheduler.allocatingGPU) log.Info("allocatingGPU is ", scheduler.allocatingGPU)
log.Info("schedulingJobsCnt is ", scheduler.schedulingJobsCnt) log.Info("schedulingJobs are ", scheduler.schedulingJobs)
scheduler.queues[queue] = scheduler.queues[queue][1:] scheduler.queues[queue] = scheduler.queues[queue][1:]
jm.scheduler = scheduler jm.scheduler = scheduler
@@ -141,13 +137,15 @@ func (scheduler *SchedulerFair) Start() {
scheduler.queuesSchedulingCnt[jm.job.Group]++ scheduler.queuesSchedulingCnt[jm.job.Group]++
scheduler.queuesUsingGPUMu.Unlock() scheduler.queuesUsingGPUMu.Unlock()
scheduler.schedulingMu.Lock()
scheduler.schedulingJobs[jm.job.Name] = true
scheduler.schedulingMu.Unlock()
go func() { go func() {
jm.start() jm.start()
}() }()
} else { } else {
log.Debug("No more jobs to scheduling ", time.Now()) log.Debug("No more jobs to scheduling ", time.Now())
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
} }
scheduler.queueMu.Unlock() scheduler.queueMu.Unlock()
@@ -192,10 +190,6 @@ func (scheduler *SchedulerFair) Start() {
jm := JobManager{} jm := JobManager{}
jm.job = scheduler.queues[q][0] jm.job = scheduler.queues[q][0]
scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt++
scheduler.schedulingMu.Unlock()
scheduler.queuesUsingGPUMu.Lock() scheduler.queuesUsingGPUMu.Lock()
scheduler.queuesSchedulingCnt[jm.job.Group]++ scheduler.queuesSchedulingCnt[jm.job.Group]++
scheduler.queuesUsingGPUMu.Unlock() scheduler.queuesUsingGPUMu.Unlock()
@@ -204,7 +198,7 @@ func (scheduler *SchedulerFair) Start() {
scheduler.allocatingGPU += numberGPU scheduler.allocatingGPU += numberGPU
scheduler.allocatingGPUMu.Unlock() scheduler.allocatingGPUMu.Unlock()
log.Info("allocatingGPU is ", scheduler.allocatingGPU) log.Info("allocatingGPU is ", scheduler.allocatingGPU)
log.Info("schedulingJobsCnt is ", scheduler.schedulingJobsCnt) log.Info("schedulingJobs are ", scheduler.schedulingJobs)
scheduler.queues[q] = scheduler.queues[q][1:] scheduler.queues[q] = scheduler.queues[q][1:]
jm.scheduler = scheduler jm.scheduler = scheduler
@@ -233,12 +227,12 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) {
scheduler.historyMu.Lock() scheduler.historyMu.Lock()
defer scheduler.historyMu.Unlock() defer scheduler.historyMu.Unlock()
switch state {
case Running:
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt-- delete(scheduler.schedulingJobs, job.Name)
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
switch state {
case Running:
scheduler.queuesUsingGPUMu.Lock() scheduler.queuesUsingGPUMu.Lock()
if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok { if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok {
scheduler.queuesSchedulingCnt[job.Group]-- scheduler.queuesSchedulingCnt[job.Group]--