mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-06-06 22:01:55 +00:00
update fair
This commit is contained in:
parent
d1a9bad295
commit
f6e0081d53
@ -309,12 +309,15 @@ func main() {
|
||||
case "FCFS":
|
||||
scheduler = &SchedulerFCFS{}
|
||||
break
|
||||
case "fair":
|
||||
scheduler = &SchedulerCapacity{}
|
||||
break
|
||||
case "priority":
|
||||
scheduler = &SchedulerPriority{}
|
||||
break
|
||||
case "capacity":
|
||||
scheduler = &SchedulerCapacity{}
|
||||
break
|
||||
case "fair":
|
||||
scheduler = &SchedulerFair{}
|
||||
break
|
||||
default:
|
||||
scheduler = &SchedulerFCFS{}
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ type SchedulerCapacity struct {
|
||||
nextQueue string
|
||||
jobs map[string]*JobManager
|
||||
queues map[string][]Job
|
||||
queueMu sync.Mutex
|
||||
queuesMu sync.Mutex
|
||||
|
||||
schedulingJobs map[string]bool
|
||||
schedulingMu sync.Mutex
|
||||
@ -28,9 +28,6 @@ type SchedulerCapacity struct {
|
||||
|
||||
allocatingGPU int
|
||||
allocatingGPUMu sync.Mutex
|
||||
|
||||
queuesSchedulingCnt map[string]int
|
||||
queuesUsingGPUMu sync.Mutex
|
||||
}
|
||||
|
||||
type FairJobSorter []Job
|
||||
@ -46,7 +43,7 @@ func (s FairJobSorter) Less(i, j int) bool {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerCapacity) Start() {
|
||||
log.Info("JS started")
|
||||
log.Info("JS (capacity) started")
|
||||
|
||||
scheduler.jobs = map[string]*JobManager{}
|
||||
scheduler.history = []*Job{}
|
||||
@ -57,12 +54,10 @@ func (scheduler *SchedulerCapacity) Start() {
|
||||
scheduler.enabled = true
|
||||
scheduler.schedulingJobs = map[string]bool{}
|
||||
scheduler.allocatingGPU = 0
|
||||
scheduler.queuesSchedulingCnt = map[string]int{}
|
||||
|
||||
scheduler.parallelism = 1
|
||||
|
||||
go func() {
|
||||
/* capacity scheduler */
|
||||
flag := true
|
||||
for {
|
||||
log.Debug("Scheduling")
|
||||
@ -82,7 +77,7 @@ func (scheduler *SchedulerCapacity) Start() {
|
||||
}
|
||||
scheduler.schedulingMu.Unlock()
|
||||
|
||||
scheduler.queueMu.Lock()
|
||||
scheduler.queuesMu.Lock()
|
||||
queue := scheduler.nextQueue
|
||||
go func() {
|
||||
scheduler.UpdateNextQueue()
|
||||
@ -99,7 +94,7 @@ func (scheduler *SchedulerCapacity) Start() {
|
||||
pool := InstanceOfResourcePool()
|
||||
log.Info(cnt, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU)
|
||||
if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU)*10) {
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
@ -119,10 +114,6 @@ func (scheduler *SchedulerCapacity) Start() {
|
||||
scheduler.history = append(scheduler.history, &jm.job)
|
||||
scheduler.historyMu.Unlock()
|
||||
|
||||
scheduler.queuesUsingGPUMu.Lock()
|
||||
scheduler.queuesSchedulingCnt[jm.job.Group]++
|
||||
scheduler.queuesUsingGPUMu.Unlock()
|
||||
|
||||
scheduler.schedulingMu.Lock()
|
||||
scheduler.schedulingJobs[jm.job.Name] = true
|
||||
scheduler.schedulingMu.Unlock()
|
||||
@ -132,7 +123,7 @@ func (scheduler *SchedulerCapacity) Start() {
|
||||
} else {
|
||||
log.Debug("No more jobs to scheduling ", time.Now())
|
||||
}
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -147,16 +138,6 @@ func (scheduler *SchedulerCapacity) UpdateProgress(job Job, state State) {
|
||||
|
||||
switch state {
|
||||
case Running:
|
||||
scheduler.queuesUsingGPUMu.Lock()
|
||||
if _, ok := scheduler.queuesSchedulingCnt[job.Group]; ok {
|
||||
scheduler.queuesSchedulingCnt[job.Group]--
|
||||
if scheduler.queuesSchedulingCnt[job.Group] < 0 {
|
||||
scheduler.queuesSchedulingCnt[job.Group] = 0
|
||||
log.Warn("scheduler.queuesSchedulingCnt less than 0", job.Group)
|
||||
}
|
||||
}
|
||||
scheduler.queuesUsingGPUMu.Unlock()
|
||||
|
||||
for i := range scheduler.history {
|
||||
if scheduler.history[i].Name == job.Name {
|
||||
scheduler.history[i].Status = Running
|
||||
@ -192,8 +173,8 @@ func (scheduler *SchedulerCapacity) UpdateProgress(job Job, state State) {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerCapacity) Schedule(job Job) {
|
||||
scheduler.queueMu.Lock()
|
||||
defer scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Lock()
|
||||
defer scheduler.queuesMu.Unlock()
|
||||
|
||||
queue := job.Group
|
||||
_, ok := scheduler.queues[queue]
|
||||
@ -287,9 +268,9 @@ func (scheduler *SchedulerCapacity) ReleaseResource(job Job, agent NodeStatus) {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus {
|
||||
scheduler.queueMu.Lock()
|
||||
scheduler.queuesMu.Lock()
|
||||
jm, ok := scheduler.jobs[jobName]
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
if !ok {
|
||||
return MsgJobStatus{Code: 1, Error: "Job not exist!"}
|
||||
}
|
||||
@ -297,9 +278,9 @@ func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop {
|
||||
scheduler.queueMu.Lock()
|
||||
scheduler.queuesMu.Lock()
|
||||
jm, ok := scheduler.jobs[jobName]
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
if !ok {
|
||||
return MsgStop{Code: 1, Error: "Job not exist!"}
|
||||
}
|
||||
@ -307,9 +288,9 @@ func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog {
|
||||
scheduler.queueMu.Lock()
|
||||
scheduler.queuesMu.Lock()
|
||||
jm, ok := scheduler.jobs[jobName]
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
if !ok {
|
||||
return MsgLog{Code: 1, Error: "Job not exist!"}
|
||||
}
|
||||
@ -347,11 +328,11 @@ func (scheduler *SchedulerCapacity) Summary() MsgSummary {
|
||||
}
|
||||
scheduler.historyMu.Unlock()
|
||||
|
||||
scheduler.queueMu.Lock()
|
||||
scheduler.queuesMu.Lock()
|
||||
for _, v := range scheduler.queues {
|
||||
tmp = append(tmp, v...)
|
||||
}
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
|
||||
for _, job := range tmp {
|
||||
switch job.Status {
|
||||
@ -383,7 +364,7 @@ func (scheduler *SchedulerCapacity) UpdateNextQueue() {
|
||||
|
||||
NumberGPU := float64(InstanceOfResourcePool().TotalGPU) + 0.00001
|
||||
|
||||
scheduler.queueMu.Lock()
|
||||
scheduler.queuesMu.Lock()
|
||||
for k, t := range scheduler.queues {
|
||||
if len(t) == 0 {
|
||||
continue
|
||||
@ -407,7 +388,7 @@ func (scheduler *SchedulerCapacity) UpdateNextQueue() {
|
||||
}
|
||||
}
|
||||
scheduler.nextQueue = next
|
||||
scheduler.queueMu.Unlock()
|
||||
scheduler.queuesMu.Unlock()
|
||||
log.Debug("updateNextQueue ->", next)
|
||||
}
|
||||
|
||||
@ -430,11 +411,5 @@ func (scheduler *SchedulerCapacity) UpdateParallelism(parallelism int) bool {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerCapacity) updateGroup(group Group) bool {
|
||||
num := 0
|
||||
for _, g := range InstanceOfGroupManager().List().Groups {
|
||||
if g.Reserved {
|
||||
num += g.NumGPU
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user