From e67a81198ee7abd9a704c4b3ce75c1301f432f78 Mon Sep 17 00:00:00 2001 From: Newnius Date: Fri, 10 Jul 2020 20:43:51 +0800 Subject: [PATCH] bugfix, ignore jobs shared & pre-scheduled --- README.md | 8 ----- src/job_manager.go | 12 ++++++-- src/main.go | 9 +----- src/resource_pool.go | 69 +++++++++++++++++++++----------------------- 4 files changed, 43 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 7676a27..ac83060 100644 --- a/README.md +++ b/README.md @@ -16,14 +16,6 @@ ?action=jhl_job_status&job= ``` -**GetBindings** - -GPU is occupied by which job(s) - -``` -?action=get_bindings -``` - #### Scheduler **EnableSchedule** ``` diff --git a/src/job_manager.go b/src/job_manager.go index f8909a7..c2ba127 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -162,6 +162,8 @@ func (jm *JobManager) start() { jm.returnResource(jm.status().Status) /* feed data to optimizer */ + isExclusive := InstanceOfResourcePool().isExclusive(jm.job.Name) + var stats [][]TaskStatus for _, vals := range jm.stats { var stat []TaskStatus @@ -174,7 +176,9 @@ func (jm *JobManager) start() { stats = append(stats, stat) } } - InstanceOfOptimizer().FeedStats(jm.job, "PS", stats) + if isExclusive { + InstanceOfOptimizer().FeedStats(jm.job, "PS", stats) + } stats = [][]TaskStatus{} for _, vals := range jm.stats { var stat []TaskStatus @@ -187,9 +191,11 @@ func (jm *JobManager) start() { stats = append(stats, stat) } } - InstanceOfOptimizer().FeedStats(jm.job, "Worker", stats) + if isExclusive { + InstanceOfOptimizer().FeedStats(jm.job, "Worker", stats) + } - if len(jm.job.Tasks) == 1 && !isShare && !isScheduleAhead && jm.job.Status == Finished { + if len(jm.job.Tasks) == 1 && !isShare && !isScheduleAhead && jm.job.Status == Finished && isExclusive { InstanceOfOptimizer().FeedTime(jm.job, stats) } log.Info("JobMaster exited ", jm.job.Name) diff --git a/src/main.go b/src/main.go index 2fee39b..7fef262 100644 --- a/src/main.go +++ b/src/main.go @@ -175,13 +175,6 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break - case "get_bindings": - log.Debug("get_bindings") - js, _ := json.Marshal(InstanceOfResourcePool().getBindings()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - case "group_list": log.Debug("group_list") js, _ := json.Marshal(InstanceOfGroupManager().List()) @@ -332,7 +325,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { case "debug_pool_dump": log.Debug("debug_pool_dump") - js, _ := json.Marshal(InstanceOfResourcePool().DebugDump()) + js, _ := json.Marshal(InstanceOfResourcePool().Dump()) w.Header().Set("Content-Type", "application/json") w.Write(js) break diff --git a/src/resource_pool.go b/src/resource_pool.go index 7253f67..035b083 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -46,9 +46,9 @@ type ResourcePool struct { networksFree map[string]bool networkMu sync.Mutex - bindings map[string]map[string]Job - bindingsMu sync.Mutex - utils map[string][]UtilGPUTimeSeries + bindings map[string]map[string]Job + bindingsMu sync.Mutex + exclusiveJobs map[string]bool TotalGPU int TotalGPUMu sync.Mutex @@ -71,7 +71,7 @@ func (pool *ResourcePool) init(conf Configuration) { pool.networksFree = map[string]bool{} pool.bindings = map[string]map[string]Job{} - pool.utils = map[string][]UtilGPUTimeSeries{} + pool.exclusiveJobs = map[string]bool{} pool.TotalGPU = 0 pool.UsingGPU = 0 @@ -323,13 +323,6 @@ func (pool *ResourcePool) update(node NodeStatus) { pool.bindingsMu.Lock() defer pool.bindingsMu.Unlock() for _, gpu := range node.Status { - if _, ok := pool.bindings[gpu.UUID]; ok { - if _, ok2 := pool.utils[gpu.UUID]; ok2 { - pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID], - UtilGPUTimeSeries{Time: time.Now().Unix(), Util: gpu.UtilizationGPU}) - } - } - if _, ok := pool.subscriptions[gpu.UUID]; ok { for jobName := range pool.subscriptions[gpu.UUID] { go func(name string) { @@ -560,14 +553,6 @@ func (pool *ResourcePool) attach(GPU string, job Job) { pool.bindings[GPU] = map[string]Job{} } pool.bindings[GPU][job.Name] = job - - if _, ok := pool.utils[GPU]; !ok { - pool.utils[GPU] = []UtilGPUTimeSeries{} - } - - if len(pool.bindings[GPU]) > 1 { - delete(pool.utils, GPU) - } } func (pool *ResourcePool) detach(GPU string, job Job) { @@ -580,15 +565,6 @@ func (pool *ResourcePool) detach(GPU string, job Job) { delete(pool.subscriptions[GPU], job.Name) } - if _, ok := pool.bindings[GPU]; ok { - if _, ok2 := pool.utils[GPU]; ok2 { - if len(pool.bindings[GPU]) == 1 && job.Status == Finished { - //InstanceOfOptimizer().feed(job.Name, pool.utils[GPU]) - } - delete(pool.utils, GPU) - } - } - if list, ok := pool.bindings[GPU]; ok { delete(list, job.Name) } @@ -599,10 +575,6 @@ func (pool *ResourcePool) countGPU() (int, int) { return pool.TotalGPU - pool.UsingGPU, pool.UsingGPU } -func (pool *ResourcePool) getBindings() map[string]map[string]Job { - return pool.bindings -} - func (pool *ResourcePool) pickNode(candidates []*NodeStatus, availableGPUs map[string][]GPUStatus, task Task, job Job, nodes []NodeStatus) *NodeStatus { /* shuffle */ @@ -727,6 +699,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { locks := map[int]*sync.Mutex{} + /* 1-Share, 2-Vacant, 3-PreSchedule */ allocationType := 0 var candidates []NodeStatus @@ -755,6 +728,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { for _, status := range node.Status { if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated { + pool.bindingsMu.Lock() if jobs, ok := pool.bindings[status.UUID]; ok { totalUtil := pred.UtilGPU for _, job := range jobs { @@ -765,6 +739,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { available = append(available, status) } } + pool.bindingsMu.Unlock() } } if len(available) >= task.NumberGPU { @@ -794,6 +769,14 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { res.Status = availables[node.ClientHost][0:task.NumberGPU] for i := range res.Status { + pool.bindingsMu.Lock() + if jobsT, okT := pool.bindings[res.Status[i].UUID]; okT { + for jobT := range jobsT { + delete(pool.exclusiveJobs, jobT) + } + } + pool.bindingsMu.Unlock() + for j := range node.Status { if res.Status[i].UUID == node.Status[j].UUID { if node.Status[j].MemoryAllocated == 0 { @@ -864,8 +847,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { for _, node := range cur.Nodes { var available []GPUStatus for _, status := range node.Status { - bindings := pool.getBindings() - if jobs, ok := bindings[status.UUID]; ok { + if jobs, ok := pool.bindings[status.UUID]; ok { if len(jobs) > 1 || status.MemoryAllocated == 0 { continue } @@ -1040,6 +1022,12 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { } } + pool.bindingsMu.Lock() + if allocationType == 2 { + pool.exclusiveJobs[job.Name] = true + } + pool.bindingsMu.Unlock() + for segID, lock := range locks { log.Debug("Unlock ", segID) lock.Unlock() @@ -1106,9 +1094,18 @@ func (pool *ResourcePool) SetBatchInterval(interval int) bool { return true } -func (pool *ResourcePool) DebugDump() map[string]interface{} { +func (pool *ResourcePool) isExclusive(jobName string) bool { + pool.bindingsMu.Lock() + defer pool.bindingsMu.Unlock() + _, ok := pool.exclusiveJobs[jobName] + /* clear after called */ + delete(pool.exclusiveJobs, jobName) + return ok +} + +func (pool *ResourcePool) Dump() map[string]interface{} { res := map[string]interface{}{} res["batchJobs"] = pool.batchJobs - //res["pools"] = pool.pools + res["bindings"] = pool.bindings return res }