diff --git a/src/job_manager.go b/src/job_manager.go index b1b297b..c810347 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -343,6 +343,7 @@ func (jm *JobManager) stop(force bool) MsgStop { if res.Code != 0 { log.Warn(res.Error) } + log.Info(task.Id, " is killed") }(taskStatus) } diff --git a/src/resource_pool.go b/src/resource_pool.go index 057ab5b..1a50b38 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -920,6 +920,7 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) { node.Status[j].MemoryAllocated -= gpu.MemoryTotal if node.Status[j].MemoryAllocated < 0 { // in case of error + /* Case 0: a node is offline and then online, the allocation info will be lost */ log.Warn(node.ClientID, " UUID=", gpu.UUID, " More Memory Allocated") node.Status[j].MemoryAllocated = 0 } diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 938e7c8..d9d23ed 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -16,6 +16,8 @@ type SchedulerFair struct { queues map[string][]Job queuesMu sync.Mutex + enableBorrow bool + IOUs map[string]map[string]*ResourceCount queuesQuota map[string]*ResourceCount queuesQuotaMu sync.Mutex @@ -39,6 +41,8 @@ func (scheduler *SchedulerFair) Start() { scheduler.history = []*Job{} scheduler.queues = map[string][]Job{} scheduler.queues["default"] = []Job{} + scheduler.enableBorrow = true + scheduler.IOUs = map[string]map[string]*ResourceCount{} scheduler.queuesQuota = map[string]*ResourceCount{} scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.enabled = true @@ -69,38 +73,102 @@ func (scheduler *SchedulerFair) Start() { scheduler.queuesMu.Lock() scheduler.queuesQuotaMu.Lock() - - go func() { - scheduler.UpdateQuota() - }() - + /* choose queue which has the largest job */ bestQueue := "" - numberGPU := math.MaxInt64 - //numberCPU := math.MaxInt64 - /* phase 1 */ + maxNumberGPU := math.MaxInt64 + maxNumberCPU := math.MaxInt64 + /* phase 1: execute jobs using self quota */ for queue, jobs := range scheduler.queues { - /* find smallest job */ + /* find largest job */ if len(jobs) > 0 { + /* calculate resource request of head job */ numberGPUtmp := 0 numberCPUtmp := 0 for _, task := range jobs[0].Tasks { numberGPUtmp += task.NumberGPU numberCPUtmp += task.NumberCPU } + /* if queue quota cannot satisfy, skip */ if quota, ok := scheduler.queuesQuota[queue]; !ok || quota.NumberGPU/1000 < numberGPUtmp { continue } - if bestQueue == "" || numberGPUtmp < numberGPU || (numberGPUtmp == numberGPU) { + /* the more, the better */ + if bestQueue == "" || numberGPUtmp > maxNumberGPU || (numberGPUtmp == maxNumberGPU && numberCPUtmp > maxNumberCPU) { + /* cannot borrow more if already borrowed */ + if _, ok := scheduler.IOUs[queue]; ok && len(scheduler.IOUs[queue]) > 0 { + continue + } bestQueue = queue - numberGPU = numberGPUtmp - //numberCPU = numberCPUtmp + maxNumberGPU = numberGPUtmp + maxNumberCPU = numberCPUtmp } } } - /* phase 2 */ - if bestQueue == "" { + /* phase 2: borrow */ + if bestQueue == "" && scheduler.enableBorrow { + /* firstly, check if quota sum can run a job */ + totalGPU := 0 + for _, quota := range scheduler.queuesQuota { + totalGPU += quota.NumberGPU + } + /* find job which is short of least resource */ + minRequestGPU := math.MaxInt32 + for queue, jobs := range scheduler.queues { + if len(jobs) == 0 { + continue + } + numberGPUtmp := 0 + for _, task := range jobs[0].Tasks { + numberGPUtmp += task.NumberGPU + } + if _, ok := scheduler.queuesQuota[queue]; !ok { + scheduler.queuesQuota[queue] = &ResourceCount{} + } + needGPU := numberGPUtmp*1000 - scheduler.queuesQuota[queue].NumberGPU + /* the less, the better */ + if bestQueue == "" || needGPU < minRequestGPU { + bestQueue = queue + minRequestGPU = needGPU + } + } + /* if totalGPU can satisfy that job, start borrowing */ + if bestQueue != "" && totalGPU >= minRequestGPU { + for { + /* if all satisfied, break */ + if minRequestGPU == 0 { + break + } + least := math.MaxInt32 + for queue, quota := range scheduler.queuesQuota { + if queue == bestQueue { + continue + } + if quota.NumberGPU < least { + least = quota.NumberGPU + } + } + if minRequestGPU < least*(len(scheduler.queuesQuota)-1) { + least = minRequestGPU / (len(scheduler.queuesQuota) - 1) + } + /* start borrow */ + for queue, quota := range scheduler.queuesQuota { + quota.NumberGPU -= least + if _, ok := scheduler.IOUs[bestQueue]; !ok { + scheduler.IOUs[bestQueue] = map[string]*ResourceCount{} + } + IOU, ok := scheduler.IOUs[bestQueue][queue] + if !ok { + scheduler.IOUs[bestQueue][queue] = &ResourceCount{} + IOU = scheduler.IOUs[bestQueue][queue] + } + IOU.NumberGPU += least + minRequestGPU -= least + log.Info(bestQueue, " borrow ", least, " from ", queue) + } + } + } } /* launch that job */ @@ -164,8 +232,10 @@ func (scheduler *SchedulerFair) Start() { } } else { log.Debug("No more jobs to scheduling ", time.Now()) + go func() { + scheduler.UpdateQuota() + }() } - scheduler.queuesQuotaMu.Unlock() scheduler.queuesMu.Unlock() }