1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
This commit is contained in:
Newnius 2020-06-10 21:24:00 +08:00
parent 0503752a51
commit 6eca76eed7
3 changed files with 64 additions and 20 deletions

View File

@ -91,3 +91,13 @@ GPU is occupied by which job(s)
``` ```
?action=debug_optimizer_describe_job&job= ?action=debug_optimizer_describe_job&job=
``` ```
**EnableBatchAllocation**
```
?action=pool_enable_batch
```
**DisableBatchAllocation**
```
?action=pool_disable_batch
```

View File

@ -295,6 +295,20 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js) w.Write(js)
break break
case "pool_enable_batch":
log.Debug("pool_enable_batch")
js, _ := json.Marshal(InstanceOfResourcePool().EnableBatch())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "pool_disable_batch":
log.Debug("pool_disable_batch")
js, _ := json.Marshal(InstanceOfResourcePool().DisableBatch())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
default: default:
http.Error(w, "Not Found", http.StatusNotFound) http.Error(w, "Not Found", http.StatusNotFound)
break break

View File

@ -124,28 +124,36 @@ func (pool *ResourcePool) init(conf Configuration) {
}() }()
go func() { go func() {
time.Sleep(time.Second * 10) /* batch allocation */
pool.batchMu.Lock() for {
var tasks []Task time.Sleep(time.Second * 15)
for _, job := range pool.batchJobs { if !pool.enableBatch {
for _, task := range job.Tasks { continue
task.Job = job.Name
tasks = append(tasks, task)
} }
} pool.batchMu.Lock()
job := Job{Tasks: tasks} var tasks []Task
for _, job := range pool.batchJobs {
for _, task := range job.Tasks {
task.Job = job.Name
tasks = append(tasks, task)
}
}
if len(tasks) != 0 {
job := Job{Tasks: tasks}
nodes := pool.doAcquireResource(job) nodes := pool.doAcquireResource(job)
for i, task := range job.Tasks { for i, task := range job.Tasks {
if _, ok := pool.batchAllocations[task.Job]; !ok { if _, ok := pool.batchAllocations[task.Job]; !ok {
pool.batchAllocations[task.Job] = []NodeStatus{} pool.batchAllocations[task.Job] = []NodeStatus{}
}
pool.batchAllocations[task.Job] = append(pool.batchAllocations[task.Job], nodes[i])
}
if len(nodes) > 0 {
pool.batchJobs = []Job{}
}
} }
pool.batchAllocations[task.Job] = append(pool.batchAllocations[task.Job], nodes[i]) pool.batchMu.Unlock()
} }
if len(nodes) > 0 {
pool.batchJobs = []Job{}
}
pool.batchMu.Unlock()
}() }()
} }
@ -885,9 +893,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
} }
} }
log.Info(tasks, factor) //log.Info(tasks, factor)
allocation := InstanceOfAllocator().allocate(nodesT, tasks) allocation := InstanceOfAllocator().allocate(nodesT, tasks)
log.Info(allocation) //log.Info(allocation)
if allocation.Flags["valid"] { if allocation.Flags["valid"] {
for range job.Tasks { //append would cause uncertain order for range job.Tasks { //append would cause uncertain order
@ -991,6 +999,18 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) {
} }
} }
func (pool *ResourcePool) EnableBatch() bool {
pool.enableBatch = true
log.Info("enableBatch is set to true")
return true
}
func (pool *ResourcePool) DisableBatch() bool {
pool.enableBatch = false
log.Info("enableBatch is set to false")
return true
}
func (pool *ResourcePool) SetShareRatio(ratio float64) bool { func (pool *ResourcePool) SetShareRatio(ratio float64) bool {
pool.enableShareRatio = ratio pool.enableShareRatio = ratio
log.Info("enableShareRatio is updated to ", ratio) log.Info("enableShareRatio is updated to ", ratio)