1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00

update fair

This commit is contained in:
Newnius 2020-05-28 14:26:08 +08:00
parent f43f4e24ad
commit 10a46937c9
2 changed files with 70 additions and 14 deletions

View File

@ -50,10 +50,12 @@ type ResourcePool struct {
bindingsMu sync.Mutex bindingsMu sync.Mutex
utils map[string][]UtilGPUTimeSeries utils map[string][]UtilGPUTimeSeries
TotalGPU int TotalGPU int
TotalGPUMu sync.Mutex TotalGPUMu sync.Mutex
UsingGPU int TotalCPU int
UsingGPUMu sync.Mutex TotalMemory int
UsingGPU int
UsingGPUMu sync.Mutex
enableShare bool enableShare bool
enableShareRatio float64 enableShareRatio float64
@ -73,6 +75,9 @@ func (pool *ResourcePool) init(conf Configuration) {
pool.TotalGPU = 0 pool.TotalGPU = 0
pool.UsingGPU = 0 pool.UsingGPU = 0
pool.TotalCPU = 0
pool.TotalMemory = 0
pool.enableShare = true pool.enableShare = true
pool.enableShareRatio = 0.75 pool.enableShareRatio = 0.75
pool.enablePreSchedule = true pool.enablePreSchedule = true
@ -129,6 +134,8 @@ func (pool *ResourcePool) checkDeadNodes() {
pool.TotalGPUMu.Lock() pool.TotalGPUMu.Lock()
if _, ok := seg.Nodes[k]; ok { if _, ok := seg.Nodes[k]; ok {
pool.TotalGPU -= len(seg.Nodes[k].Status) pool.TotalGPU -= len(seg.Nodes[k].Status)
pool.TotalCPU -= seg.Nodes[k].NumCPU
pool.TotalMemory -= seg.Nodes[k].MemTotal
} }
pool.TotalGPUMu.Unlock() pool.TotalGPUMu.Unlock()
delete(seg.Nodes, k) delete(seg.Nodes, k)
@ -240,6 +247,8 @@ func (pool *ResourcePool) saveStatusHistory() {
pool.TotalGPUMu.Lock() pool.TotalGPUMu.Lock()
pool.TotalGPU = TotalGPU pool.TotalGPU = TotalGPU
pool.TotalCPU = TotalCPU
pool.TotalMemory = TotalMemGPU
pool.TotalGPUMu.Unlock() pool.TotalGPUMu.Unlock()
time.Sleep(time.Second * 60) time.Sleep(time.Second * 60)
} }
@ -307,6 +316,8 @@ func (pool *ResourcePool) update(node NodeStatus) {
/* TODO: double check node do belong to this seg */ /* TODO: double check node do belong to this seg */
pool.TotalGPUMu.Lock() pool.TotalGPUMu.Lock()
pool.TotalGPU += len(node.Status) pool.TotalGPU += len(node.Status)
pool.TotalCPU += node.NumCPU
pool.TotalMemory += node.MemTotal
pool.TotalGPUMu.Unlock() pool.TotalGPUMu.Unlock()
log.Info("node ", node.ClientID, " is online") log.Info("node ", node.ClientID, " is online")
} }

View File

@ -107,9 +107,11 @@ func (scheduler *SchedulerFair) Start() {
if bestQueue != "" { if bestQueue != "" {
numberGPUtmp := 0 numberGPUtmp := 0
numberCPUtmp := 0 numberCPUtmp := 0
Memorytmp := 0
for _, task := range scheduler.queues[bestQueue][0].Tasks { for _, task := range scheduler.queues[bestQueue][0].Tasks {
numberGPUtmp += task.NumberGPU numberGPUtmp += task.NumberGPU
numberCPUtmp += task.NumberCPU numberCPUtmp += task.NumberCPU
Memorytmp += task.Memory
} }
log.Info("schedulingJobs are ", scheduler.schedulingJobs) log.Info("schedulingJobs are ", scheduler.schedulingJobs)
@ -122,6 +124,7 @@ func (scheduler *SchedulerFair) Start() {
if quota, ok := scheduler.queuesQuota[bestQueue]; ok { if quota, ok := scheduler.queuesQuota[bestQueue]; ok {
quota.NumberGPU -= numberGPUtmp * 100 quota.NumberGPU -= numberGPUtmp * 100
quota.CPU -= numberCPUtmp * 100 quota.CPU -= numberCPUtmp * 100
quota.Memory -= Memorytmp
} }
log.Info("After, ", scheduler.queuesQuota[bestQueue]) log.Info("After, ", scheduler.queuesQuota[bestQueue])
@ -300,25 +303,35 @@ func (scheduler *SchedulerFair) UpdateQuota() {
/* phase 1: DRF */ /* phase 1: DRF */
usingGPU := 0 usingGPU := 0
usingCPU := 0
usingMemory := 0
allocatedGPU := 0 allocatedGPU := 0
allocatedCPU := 0
allocatedMemory := 0
scheduler.resourceAllocationsMu.Lock() scheduler.resourceAllocationsMu.Lock()
for _, quota := range scheduler.resourceAllocations { for _, quota := range scheduler.resourceAllocations {
usingGPU += quota.NumberGPU usingGPU += quota.NumberGPU
usingCPU += quota.CPU
usingMemory += quota.Memory
} }
scheduler.resourceAllocationsMu.Unlock() scheduler.resourceAllocationsMu.Unlock()
for _, quota := range scheduler.queuesQuota { for _, quota := range scheduler.queuesQuota {
allocatedGPU += quota.NumberGPU allocatedGPU += quota.NumberGPU
allocatedCPU += quota.CPU
allocatedMemory += quota.Memory
} }
pool := InstanceOfResourcePool() pool := InstanceOfResourcePool()
available := pool.TotalGPU - usingGPU - allocatedGPU/100 availableGPU := pool.TotalGPU - usingGPU - allocatedGPU/100
availableCPU := pool.TotalCPU - usingCPU - allocatedCPU/100
availableMemory := pool.TotalMemory - usingMemory - allocatedMemory
/* <0 means some nodes exited */ /* <0 means some nodes exited */
if available <= 0 { if availableGPU <= 0 {
return return
} }
log.Info("Can allocate ", available) log.Info("Can allocate ", availableGPU)
log.Info("Before ") log.Info("Before ")
for queue, quota := range scheduler.queuesQuota { for queue, quota := range scheduler.queuesQuota {
log.Info("Queue<->", queue) log.Info("Queue<->", queue)
@ -326,20 +339,52 @@ func (scheduler *SchedulerFair) UpdateQuota() {
log.Info("CPU:", quota.CPU) log.Info("CPU:", quota.CPU)
log.Info("Memory:", quota.Memory) log.Info("Memory:", quota.Memory)
} }
available *= 100 availableGPU *= 100
per := available / len(scheduler.queues) availableCPU *= 100
for queue := range scheduler.queues {
var candidates []string
requests := map[string]ResourceCount{}
weights := 0
for queue, jobs := range scheduler.queues {
if len(jobs) > 0 {
candidates = append(candidates, queue)
}
weights += InstanceOfGroupManager().groups[queue].Weight
request := ResourceCount{}
for _, job := range jobs {
GPU := 0
CPU := 0
Memory := 0
for _, task := range job.Tasks {
GPU += task.NumberGPU
CPU += task.NumberCPU
Memory += task.Memory
}
request.NumberGPU += GPU
request.CPU += CPU
request.Memory += Memory
}
requests[queue] = request
}
per := availableGPU / weights
for _, queue := range candidates {
if _, ok := scheduler.queuesQuota[queue]; !ok { if _, ok := scheduler.queuesQuota[queue]; !ok {
scheduler.queuesQuota[queue] = &ResourceCount{} scheduler.queuesQuota[queue] = &ResourceCount{}
} }
weight := InstanceOfGroupManager().groups[queue].Weight
quota := scheduler.queuesQuota[queue] quota := scheduler.queuesQuota[queue]
quota.NumberGPU += per quota.NumberGPU += per * weight
available -= per availableGPU -= per * weight
quota.CPU += (requests[queue].CPU / requests[queue].NumberGPU) * per * weight
quota.Memory += (requests[queue].Memory / requests[queue].NumberGPU) * per * weight
} }
if available > 0 { if availableGPU > 0 {
for queue := range scheduler.queues { for queue := range scheduler.queues {
quota := scheduler.queuesQuota[queue] quota := scheduler.queuesQuota[queue]
quota.NumberGPU += available quota.NumberGPU += availableGPU
break break
} }
} }