1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-05-03 11:04:17 +08:00
parent 56e4b5474d
commit 08097b80bd

View File

@@ -16,18 +16,22 @@ type ResourceCount struct {
} }
type SchedulerFair struct { type SchedulerFair struct {
history []*Job history []*Job
historyMu sync.Mutex historyMu sync.Mutex
queues map[string][]Job
queueMu sync.Mutex nextQueue string
schedulingMu sync.Mutex jobs map[string]*JobManager
schedulingJobsCnt int queues map[string][]Job
jobs map[string]*JobManager queueMu sync.Mutex
nextQueue string
schedulingJobsCnt int
schedulingMu sync.Mutex
resourceAllocations map[string]*ResourceCount resourceAllocations map[string]*ResourceCount
resourceAllocationsMu sync.Mutex resourceAllocationsMu sync.Mutex
enabled bool
parallelism int enabled bool
parallelism int
enableShare bool enableShare bool
enableShareRatio float64 enableShareRatio float64
@@ -40,10 +44,10 @@ type SchedulerFair struct {
allocatingGPU int allocatingGPU int
allocatingGPUMu sync.Mutex allocatingGPUMu sync.Mutex
queuesUsingGPUMu sync.Mutex
queueUsingGPU map[string]int
reservedGPU int reservedGPU int
queuesSchedulingCnt map[string]int queuesSchedulingCnt map[string]int
queueUsingGPU map[string]int
queuesUsingGPUMu sync.Mutex
} }
type FairJobSorter []Job type FairJobSorter []Job
@@ -81,8 +85,13 @@ func (scheduler *SchedulerFair) Start() {
scheduler.parallelism = 1 scheduler.parallelism = 1
go func() { go func() {
flag := true
for { for {
log.Debug("Scheduling") log.Debug("Scheduling")
if !flag {
time.Sleep(time.Second * 100)
}
flag = false
if !scheduler.enabled { if !scheduler.enabled {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
continue continue
@@ -93,9 +102,9 @@ func (scheduler *SchedulerFair) Start() {
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
continue continue
} }
scheduler.schedulingJobsCnt++ scheduler.schedulingJobsCnt++
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
scheduler.queueMu.Lock() scheduler.queueMu.Lock()
queue := scheduler.nextQueue queue := scheduler.nextQueue
if len(scheduler.queues[queue]) > 0 { if len(scheduler.queues[queue]) > 0 {
@@ -115,14 +124,20 @@ func (scheduler *SchedulerFair) Start() {
} }
scheduler.queuesUsingGPUMu.Unlock() scheduler.queuesUsingGPUMu.Unlock()
if scheduler.schedulingJobsCnt > 0 { if cnt == 1 {
if (cnt+scheduler.allocatingGPU+1)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10 { cnt *= 10
scheduler.schedulingMu.Lock() } else {
scheduler.schedulingJobsCnt-- cnt *= 13
scheduler.schedulingMu.Unlock()
continue
}
} }
if cnt+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10 {
scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock()
scheduler.queueMu.Unlock()
continue
}
flag = true
scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPUMu.Lock()
scheduler.allocatingGPU += cnt scheduler.allocatingGPU += cnt
scheduler.allocatingGPUMu.Unlock() scheduler.allocatingGPUMu.Unlock()
@@ -146,7 +161,6 @@ func (scheduler *SchedulerFair) Start() {
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt-- scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
time.Sleep(time.Millisecond * 100)
go func() { go func() {
scheduler.UpdateNextQueue() scheduler.UpdateNextQueue()
}() }()
@@ -162,11 +176,16 @@ func (scheduler *SchedulerFair) Start() {
scheduler.queueMu.Lock() scheduler.queueMu.Lock()
for q, t := range scheduler.queues { for q, t := range scheduler.queues {
if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved { if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved {
scheduler.queueMu.Unlock()
continue continue
} }
scheduler.queuesUsingGPUMu.Lock()
if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 { if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 {
scheduler.queueMu.Unlock()
scheduler.queuesUsingGPUMu.Unlock()
continue continue
} }
scheduler.queuesUsingGPUMu.Unlock()
numberGPU := 0 numberGPU := 0
for _, v := range t[0].Tasks { for _, v := range t[0].Tasks {
numberGPU += v.NumberGPU numberGPU += v.NumberGPU
@@ -180,6 +199,7 @@ func (scheduler *SchedulerFair) Start() {
scheduler.queuesUsingGPUMu.Unlock() scheduler.queuesUsingGPUMu.Unlock()
if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 {
scheduler.queueMu.Unlock()
continue continue
} }
@@ -189,9 +209,12 @@ func (scheduler *SchedulerFair) Start() {
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt++ scheduler.schedulingJobsCnt++
scheduler.queuesSchedulingCnt[jm.job.Group]++
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
scheduler.queuesUsingGPUMu.Lock()
scheduler.queuesSchedulingCnt[jm.job.Group]++
scheduler.queuesUsingGPUMu.Unlock()
scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPUMu.Lock()
scheduler.allocatingGPU += numberGPU scheduler.allocatingGPU += numberGPU
scheduler.allocatingGPUMu.Unlock() scheduler.allocatingGPUMu.Unlock()
@@ -229,10 +252,13 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) {
case Running: case Running:
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt-- scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock()
scheduler.queuesUsingGPUMu.Lock()
if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok { if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok {
scheduler.queuesSchedulingCnt[job.Group]-- scheduler.queuesSchedulingCnt[job.Group]--
} }
scheduler.schedulingMu.Unlock() scheduler.queuesUsingGPUMu.Unlock()
for i := range scheduler.history { for i := range scheduler.history {
if scheduler.history[i].Name == job.Name { if scheduler.history[i].Name == job.Name {
@@ -558,7 +584,9 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
} }
func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus { func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus {
scheduler.queueMu.Lock()
jm, ok := scheduler.jobs[jobName] jm, ok := scheduler.jobs[jobName]
scheduler.queueMu.Unlock()
if !ok { if !ok {
return MsgJobStatus{Code: 1, Error: "Job not exist!"} return MsgJobStatus{Code: 1, Error: "Job not exist!"}
} }
@@ -566,7 +594,9 @@ func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus {
} }
func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { func (scheduler *SchedulerFair) Stop(jobName string) MsgStop {
scheduler.queueMu.Lock()
jm, ok := scheduler.jobs[jobName] jm, ok := scheduler.jobs[jobName]
scheduler.queueMu.Unlock()
if !ok { if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"} return MsgStop{Code: 1, Error: "Job not exist!"}
} }
@@ -574,7 +604,9 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop {
} }
func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLog { func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLog {
scheduler.queueMu.Lock()
jm, ok := scheduler.jobs[jobName] jm, ok := scheduler.jobs[jobName]
scheduler.queueMu.Unlock()
if !ok { if !ok {
return MsgLog{Code: 1, Error: "Job not exist!"} return MsgLog{Code: 1, Error: "Job not exist!"}
} }
@@ -611,9 +643,12 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
tmp = append(tmp, *job) tmp = append(tmp, *job)
} }
scheduler.historyMu.Unlock() scheduler.historyMu.Unlock()
scheduler.queueMu.Lock()
for _, v := range scheduler.queues { for _, v := range scheduler.queues {
tmp = append(tmp, v...) tmp = append(tmp, v...)
} }
scheduler.queueMu.Unlock()
for _, job := range tmp { for _, job := range tmp {
switch job.Status { switch job.Status {
@@ -686,6 +721,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
pool.poolsMu[i].Unlock() pool.poolsMu[i].Unlock()
} }
scheduler.queueMu.Lock()
for k, t := range scheduler.queues { for k, t := range scheduler.queues {
if len(t) == 0 { if len(t) == 0 {
continue continue
@@ -714,6 +750,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
} }
} }
scheduler.nextQueue = next scheduler.nextQueue = next
scheduler.queueMu.Unlock()
log.Debug("updateNextQueue ->", next) log.Debug("updateNextQueue ->", next)
} }