1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 08:16:43 +00:00
This commit is contained in:
2020-05-03 10:30:12 +08:00
parent c20ec0fdd6
commit 56e4b5474d
5 changed files with 54 additions and 14 deletions

View File

@@ -122,6 +122,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
msg.Error = err.Error() msg.Error = err.Error()
} else { } else {
msg = InstanceOfGroupManager().Add(group) msg = InstanceOfGroupManager().Add(group)
scheduler.updateGroup(group)
} }
js, _ := json.Marshal(msg) js, _ := json.Marshal(msg)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@@ -138,6 +139,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
msg.Error = err.Error() msg.Error = err.Error()
} else { } else {
msg = InstanceOfGroupManager().Update(group) msg = InstanceOfGroupManager().Update(group)
scheduler.updateGroup(group)
} }
js, _ := json.Marshal(msg) js, _ := json.Marshal(msg)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@@ -154,6 +156,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
msg.Error = err.Error() msg.Error = err.Error()
} else { } else {
msg = InstanceOfGroupManager().Remove(group) msg = InstanceOfGroupManager().Remove(group)
scheduler.updateGroup(group)
} }
js, _ := json.Marshal(msg) js, _ := json.Marshal(msg)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")

View File

@@ -38,4 +38,6 @@ type Scheduler interface {
SetShareRatio(ratio float64) bool SetShareRatio(ratio float64) bool
SetPreScheduleRatio(ratio float64) bool SetPreScheduleRatio(ratio float64) bool
updateGroup(group Group) bool
} }

View File

@@ -273,3 +273,7 @@ func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool {
log.Info("enablePreScheduleRatio is updated to", ratio) log.Info("enablePreScheduleRatio is updated to", ratio)
return true return true
} }
func (scheduler *SchedulerFCFS) updateGroup(group Group) bool {
return true
}

View File

@@ -40,6 +40,7 @@ type SchedulerFair struct {
allocatingGPU int allocatingGPU int
allocatingGPUMu sync.Mutex allocatingGPUMu sync.Mutex
queuesUsingGPUMu sync.Mutex
queueUsingGPU map[string]int queueUsingGPU map[string]int
reservedGPU int reservedGPU int
queuesSchedulingCnt map[string]int queuesSchedulingCnt map[string]int
@@ -105,8 +106,17 @@ func (scheduler *SchedulerFair) Start() {
for _, task := range jm.job.Tasks { for _, task := range jm.job.Tasks {
cnt += task.NumberGPU cnt += task.NumberGPU
} }
if scheduler.schedulingJobsCnt > 1 { reserved := scheduler.reservedGPU
if (cnt+scheduler.allocatingGPU+1)*13 > (pool.TotalGPU-scheduler.UsingGPU-scheduler.reservedGPU)*10 { scheduler.queuesUsingGPUMu.Lock()
for g, v := range scheduler.queueUsingGPU {
if InstanceOfGroupManager().groups[g].Reserved {
reserved -= v
}
}
scheduler.queuesUsingGPUMu.Unlock()
if scheduler.schedulingJobsCnt > 0 {
if (cnt+scheduler.allocatingGPU+1)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10 {
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt-- scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
@@ -163,9 +173,11 @@ func (scheduler *SchedulerFair) Start() {
} }
available := InstanceOfGroupManager().groups[t[0].Group].NumGPU available := InstanceOfGroupManager().groups[t[0].Group].NumGPU
scheduler.queuesUsingGPUMu.Lock()
if cnt, ok := scheduler.queueUsingGPU[t[0].Group]; ok { if cnt, ok := scheduler.queueUsingGPU[t[0].Group]; ok {
available -= cnt available -= cnt
} }
scheduler.queuesUsingGPUMu.Unlock()
if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 {
continue continue
@@ -447,8 +459,6 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
if node.Status[j].MemoryAllocated == 0 { if node.Status[j].MemoryAllocated == 0 {
scheduler.UsingGPUMu.Lock() scheduler.UsingGPUMu.Lock()
scheduler.UsingGPU ++ scheduler.UsingGPU ++
scheduler.queueUsingGPU[job.Group] += task.NumberGPU
scheduler.reservedGPU += task.NumberGPU
scheduler.UsingGPUMu.Unlock() scheduler.UsingGPUMu.Unlock()
} }
node.Status[j].MemoryAllocated += task.MemoryGPU node.Status[j].MemoryAllocated += task.MemoryGPU
@@ -459,6 +469,11 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
for _, t := range res.Status { for _, t := range res.Status {
scheduler.Attach(t.UUID, job.Name) scheduler.Attach(t.UUID, job.Name)
} }
scheduler.queuesUsingGPUMu.Lock()
scheduler.queueUsingGPU[job.Group] += task.NumberGPU
scheduler.queuesUsingGPUMu.Unlock()
scheduler.allocatingGPUMu.Lock() scheduler.allocatingGPUMu.Lock()
scheduler.allocatingGPU -= task.NumberGPU scheduler.allocatingGPU -= task.NumberGPU
scheduler.allocatingGPUMu.Unlock() scheduler.allocatingGPUMu.Unlock()
@@ -509,16 +524,6 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
if node.Status[j].MemoryAllocated == 0 { if node.Status[j].MemoryAllocated == 0 {
scheduler.UsingGPUMu.Lock() scheduler.UsingGPUMu.Lock()
scheduler.UsingGPU-- scheduler.UsingGPU--
if _, ok := scheduler.queueUsingGPU[job.Group]; ok {
scheduler.queueUsingGPU[job.Group]--
scheduler.reservedGPU--
if scheduler.queueUsingGPU[job.Group] < 0 {
scheduler.queueUsingGPU[job.Group] = 0
}
if scheduler.reservedGPU < 0 {
scheduler.reservedGPU = 0
}
}
scheduler.UsingGPUMu.Unlock() scheduler.UsingGPUMu.Unlock()
log.Info(node.Status[j].UUID, " is released") log.Info(node.Status[j].UUID, " is released")
} }
@@ -526,6 +531,15 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
} }
} }
} }
scheduler.queuesUsingGPUMu.Lock()
if _, ok := scheduler.queueUsingGPU[job.Group]; ok {
scheduler.queueUsingGPU[job.Group] -= len(agent.Status)
if scheduler.queueUsingGPU[job.Group] < 0 {
log.Warn("queueUsingGPU exceeded ", scheduler.queueUsingGPU[job.Group])
scheduler.queueUsingGPU[job.Group] = 0
}
}
scheduler.queuesUsingGPUMu.Unlock()
go func(res NodeStatus) { go func(res NodeStatus) {
scheduler.resourceAllocationsMu.Lock() scheduler.resourceAllocationsMu.Lock()
if _, ok := scheduler.resourceAllocations[job.Group]; !ok { if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
@@ -740,3 +754,16 @@ func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool {
log.Info("enablePreScheduleRatio is updated to ", ratio) log.Info("enablePreScheduleRatio is updated to ", ratio)
return true return true
} }
func (scheduler *SchedulerFair) updateGroup(group Group) bool {
num := 0
for _, g := range InstanceOfGroupManager().List().Groups {
if g.Reserved {
num += g.NumGPU
}
}
scheduler.queuesUsingGPUMu.Lock()
scheduler.reservedGPU = num
scheduler.queuesUsingGPUMu.Unlock()
return true
}

View File

@@ -297,3 +297,7 @@ func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool {
log.Info("enablePreScheduleRatio is updated to", ratio) log.Info("enablePreScheduleRatio is updated to", ratio)
return true return true
} }
func (scheduler *SchedulerPriority) updateGroup(group Group) bool {
return true
}