1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00

update fair

This commit is contained in:
Newnius 2020-05-28 21:28:18 +08:00
parent 159dc88d15
commit 93dbb21de7
3 changed files with 87 additions and 15 deletions

View File

@ -343,6 +343,7 @@ func (jm *JobManager) stop(force bool) MsgStop {
if res.Code != 0 {
log.Warn(res.Error)
}
log.Info(task.Id, " is killed")
}(taskStatus)
}

View File

@ -920,6 +920,7 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) {
node.Status[j].MemoryAllocated -= gpu.MemoryTotal
if node.Status[j].MemoryAllocated < 0 {
// in case of error
/* Case 0: a node is offline and then online, the allocation info will be lost */
log.Warn(node.ClientID, " UUID=", gpu.UUID, " More Memory Allocated")
node.Status[j].MemoryAllocated = 0
}

View File

@ -16,6 +16,8 @@ type SchedulerFair struct {
queues map[string][]Job
queuesMu sync.Mutex
enableBorrow bool
IOUs map[string]map[string]*ResourceCount
queuesQuota map[string]*ResourceCount
queuesQuotaMu sync.Mutex
@ -39,6 +41,8 @@ func (scheduler *SchedulerFair) Start() {
scheduler.history = []*Job{}
scheduler.queues = map[string][]Job{}
scheduler.queues["default"] = []Job{}
scheduler.enableBorrow = true
scheduler.IOUs = map[string]map[string]*ResourceCount{}
scheduler.queuesQuota = map[string]*ResourceCount{}
scheduler.resourceAllocations = map[string]*ResourceCount{}
scheduler.enabled = true
@ -69,38 +73,102 @@ func (scheduler *SchedulerFair) Start() {
scheduler.queuesMu.Lock()
scheduler.queuesQuotaMu.Lock()
go func() {
scheduler.UpdateQuota()
}()
/* choose queue which has the largest job */
bestQueue := ""
numberGPU := math.MaxInt64
//numberCPU := math.MaxInt64
/* phase 1 */
maxNumberGPU := math.MaxInt64
maxNumberCPU := math.MaxInt64
/* phase 1: execute jobs using self quota */
for queue, jobs := range scheduler.queues {
/* find smallest job */
/* find largest job */
if len(jobs) > 0 {
/* calculate resource request of head job */
numberGPUtmp := 0
numberCPUtmp := 0
for _, task := range jobs[0].Tasks {
numberGPUtmp += task.NumberGPU
numberCPUtmp += task.NumberCPU
}
/* if queue quota cannot satisfy, skip */
if quota, ok := scheduler.queuesQuota[queue]; !ok || quota.NumberGPU/1000 < numberGPUtmp {
continue
}
if bestQueue == "" || numberGPUtmp < numberGPU || (numberGPUtmp == numberGPU) {
/* the more, the better */
if bestQueue == "" || numberGPUtmp > maxNumberGPU || (numberGPUtmp == maxNumberGPU && numberCPUtmp > maxNumberCPU) {
/* cannot borrow more if already borrowed */
if _, ok := scheduler.IOUs[queue]; ok && len(scheduler.IOUs[queue]) > 0 {
continue
}
bestQueue = queue
numberGPU = numberGPUtmp
//numberCPU = numberCPUtmp
maxNumberGPU = numberGPUtmp
maxNumberCPU = numberCPUtmp
}
}
}
/* phase 2 */
if bestQueue == "" {
/* phase 2: borrow */
if bestQueue == "" && scheduler.enableBorrow {
/* firstly, check if quota sum can run a job */
totalGPU := 0
for _, quota := range scheduler.queuesQuota {
totalGPU += quota.NumberGPU
}
/* find job which is short of least resource */
minRequestGPU := math.MaxInt32
for queue, jobs := range scheduler.queues {
if len(jobs) == 0 {
continue
}
numberGPUtmp := 0
for _, task := range jobs[0].Tasks {
numberGPUtmp += task.NumberGPU
}
if _, ok := scheduler.queuesQuota[queue]; !ok {
scheduler.queuesQuota[queue] = &ResourceCount{}
}
needGPU := numberGPUtmp*1000 - scheduler.queuesQuota[queue].NumberGPU
/* the less, the better */
if bestQueue == "" || needGPU < minRequestGPU {
bestQueue = queue
minRequestGPU = needGPU
}
}
/* if totalGPU can satisfy that job, start borrowing */
if bestQueue != "" && totalGPU >= minRequestGPU {
for {
/* if all satisfied, break */
if minRequestGPU == 0 {
break
}
least := math.MaxInt32
for queue, quota := range scheduler.queuesQuota {
if queue == bestQueue {
continue
}
if quota.NumberGPU < least {
least = quota.NumberGPU
}
}
if minRequestGPU < least*(len(scheduler.queuesQuota)-1) {
least = minRequestGPU / (len(scheduler.queuesQuota) - 1)
}
/* start borrow */
for queue, quota := range scheduler.queuesQuota {
quota.NumberGPU -= least
if _, ok := scheduler.IOUs[bestQueue]; !ok {
scheduler.IOUs[bestQueue] = map[string]*ResourceCount{}
}
IOU, ok := scheduler.IOUs[bestQueue][queue]
if !ok {
scheduler.IOUs[bestQueue][queue] = &ResourceCount{}
IOU = scheduler.IOUs[bestQueue][queue]
}
IOU.NumberGPU += least
minRequestGPU -= least
log.Info(bestQueue, " borrow ", least, " from ", queue)
}
}
}
}
/* launch that job */
@ -164,8 +232,10 @@ func (scheduler *SchedulerFair) Start() {
}
} else {
log.Debug("No more jobs to scheduling ", time.Now())
go func() {
scheduler.UpdateQuota()
}()
}
scheduler.queuesQuotaMu.Unlock()
scheduler.queuesMu.Unlock()
}