1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-12 23:36:44 +00:00

bugfix, ignore jobs shared & pre-scheduled

This commit is contained in:
2020-07-10 20:43:51 +08:00
parent ccdc09bbf7
commit e67a81198e
4 changed files with 43 additions and 55 deletions

View File

@@ -16,14 +16,6 @@
?action=jhl_job_status&job=
```
**GetBindings**
GPU is occupied by which job(s)
```
?action=get_bindings
```
#### Scheduler
**EnableSchedule**
```

View File

@@ -162,6 +162,8 @@ func (jm *JobManager) start() {
jm.returnResource(jm.status().Status)
/* feed data to optimizer */
isExclusive := InstanceOfResourcePool().isExclusive(jm.job.Name)
var stats [][]TaskStatus
for _, vals := range jm.stats {
var stat []TaskStatus
@@ -174,7 +176,9 @@ func (jm *JobManager) start() {
stats = append(stats, stat)
}
}
InstanceOfOptimizer().FeedStats(jm.job, "PS", stats)
if isExclusive {
InstanceOfOptimizer().FeedStats(jm.job, "PS", stats)
}
stats = [][]TaskStatus{}
for _, vals := range jm.stats {
var stat []TaskStatus
@@ -187,9 +191,11 @@ func (jm *JobManager) start() {
stats = append(stats, stat)
}
}
InstanceOfOptimizer().FeedStats(jm.job, "Worker", stats)
if isExclusive {
InstanceOfOptimizer().FeedStats(jm.job, "Worker", stats)
}
if len(jm.job.Tasks) == 1 && !isShare && !isScheduleAhead && jm.job.Status == Finished {
if len(jm.job.Tasks) == 1 && !isShare && !isScheduleAhead && jm.job.Status == Finished && isExclusive {
InstanceOfOptimizer().FeedTime(jm.job, stats)
}
log.Info("JobMaster exited ", jm.job.Name)

View File

@@ -175,13 +175,6 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "get_bindings":
log.Debug("get_bindings")
js, _ := json.Marshal(InstanceOfResourcePool().getBindings())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "group_list":
log.Debug("group_list")
js, _ := json.Marshal(InstanceOfGroupManager().List())
@@ -332,7 +325,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
case "debug_pool_dump":
log.Debug("debug_pool_dump")
js, _ := json.Marshal(InstanceOfResourcePool().DebugDump())
js, _ := json.Marshal(InstanceOfResourcePool().Dump())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break

View File

@@ -46,9 +46,9 @@ type ResourcePool struct {
networksFree map[string]bool
networkMu sync.Mutex
bindings map[string]map[string]Job
bindingsMu sync.Mutex
utils map[string][]UtilGPUTimeSeries
bindings map[string]map[string]Job
bindingsMu sync.Mutex
exclusiveJobs map[string]bool
TotalGPU int
TotalGPUMu sync.Mutex
@@ -71,7 +71,7 @@ func (pool *ResourcePool) init(conf Configuration) {
pool.networksFree = map[string]bool{}
pool.bindings = map[string]map[string]Job{}
pool.utils = map[string][]UtilGPUTimeSeries{}
pool.exclusiveJobs = map[string]bool{}
pool.TotalGPU = 0
pool.UsingGPU = 0
@@ -323,13 +323,6 @@ func (pool *ResourcePool) update(node NodeStatus) {
pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock()
for _, gpu := range node.Status {
if _, ok := pool.bindings[gpu.UUID]; ok {
if _, ok2 := pool.utils[gpu.UUID]; ok2 {
pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID],
UtilGPUTimeSeries{Time: time.Now().Unix(), Util: gpu.UtilizationGPU})
}
}
if _, ok := pool.subscriptions[gpu.UUID]; ok {
for jobName := range pool.subscriptions[gpu.UUID] {
go func(name string) {
@@ -560,14 +553,6 @@ func (pool *ResourcePool) attach(GPU string, job Job) {
pool.bindings[GPU] = map[string]Job{}
}
pool.bindings[GPU][job.Name] = job
if _, ok := pool.utils[GPU]; !ok {
pool.utils[GPU] = []UtilGPUTimeSeries{}
}
if len(pool.bindings[GPU]) > 1 {
delete(pool.utils, GPU)
}
}
func (pool *ResourcePool) detach(GPU string, job Job) {
@@ -580,15 +565,6 @@ func (pool *ResourcePool) detach(GPU string, job Job) {
delete(pool.subscriptions[GPU], job.Name)
}
if _, ok := pool.bindings[GPU]; ok {
if _, ok2 := pool.utils[GPU]; ok2 {
if len(pool.bindings[GPU]) == 1 && job.Status == Finished {
//InstanceOfOptimizer().feed(job.Name, pool.utils[GPU])
}
delete(pool.utils, GPU)
}
}
if list, ok := pool.bindings[GPU]; ok {
delete(list, job.Name)
}
@@ -599,10 +575,6 @@ func (pool *ResourcePool) countGPU() (int, int) {
return pool.TotalGPU - pool.UsingGPU, pool.UsingGPU
}
func (pool *ResourcePool) getBindings() map[string]map[string]Job {
return pool.bindings
}
func (pool *ResourcePool) pickNode(candidates []*NodeStatus, availableGPUs map[string][]GPUStatus, task Task, job Job, nodes []NodeStatus) *NodeStatus {
/* shuffle */
@@ -727,6 +699,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
locks := map[int]*sync.Mutex{}
/* 1-Share, 2-Vacant, 3-PreSchedule */
allocationType := 0
var candidates []NodeStatus
@@ -755,6 +728,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
for _, status := range node.Status {
if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated {
pool.bindingsMu.Lock()
if jobs, ok := pool.bindings[status.UUID]; ok {
totalUtil := pred.UtilGPU
for _, job := range jobs {
@@ -765,6 +739,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
available = append(available, status)
}
}
pool.bindingsMu.Unlock()
}
}
if len(available) >= task.NumberGPU {
@@ -794,6 +769,14 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
res.Status = availables[node.ClientHost][0:task.NumberGPU]
for i := range res.Status {
pool.bindingsMu.Lock()
if jobsT, okT := pool.bindings[res.Status[i].UUID]; okT {
for jobT := range jobsT {
delete(pool.exclusiveJobs, jobT)
}
}
pool.bindingsMu.Unlock()
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
@@ -864,8 +847,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
for _, node := range cur.Nodes {
var available []GPUStatus
for _, status := range node.Status {
bindings := pool.getBindings()
if jobs, ok := bindings[status.UUID]; ok {
if jobs, ok := pool.bindings[status.UUID]; ok {
if len(jobs) > 1 || status.MemoryAllocated == 0 {
continue
}
@@ -1040,6 +1022,12 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
}
}
pool.bindingsMu.Lock()
if allocationType == 2 {
pool.exclusiveJobs[job.Name] = true
}
pool.bindingsMu.Unlock()
for segID, lock := range locks {
log.Debug("Unlock ", segID)
lock.Unlock()
@@ -1106,9 +1094,18 @@ func (pool *ResourcePool) SetBatchInterval(interval int) bool {
return true
}
func (pool *ResourcePool) DebugDump() map[string]interface{} {
func (pool *ResourcePool) isExclusive(jobName string) bool {
pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock()
_, ok := pool.exclusiveJobs[jobName]
/* clear after called */
delete(pool.exclusiveJobs, jobName)
return ok
}
func (pool *ResourcePool) Dump() map[string]interface{} {
res := map[string]interface{}{}
res["batchJobs"] = pool.batchJobs
//res["pools"] = pool.pools
res["bindings"] = pool.bindings
return res
}