1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 22:01:55 +00:00
This commit is contained in:
Newnius 2020-06-30 16:16:30 +08:00
parent eda386fdf6
commit 51976911e1
3 changed files with 80 additions and 28 deletions

View File

@ -51,6 +51,7 @@ func (jm *JobManager) start() {
/* sleep random Millisecond to avoid deadlock */
time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500)))
}
jm.job.StartedAt = time.Now().Unix()
if InstanceOfConfiguration().mock {
jm.scheduler.UpdateProgress(jm.job, Running)

View File

@ -46,7 +46,7 @@ type ResourcePool struct {
networksFree map[string]bool
networkMu sync.Mutex
bindings map[string]map[string]int
bindings map[string]map[string]Job
bindingsMu sync.Mutex
utils map[string][]UtilGPUTimeSeries
@ -75,7 +75,7 @@ func (pool *ResourcePool) init(conf Configuration) {
pool.networks = map[string]bool{}
pool.networksFree = map[string]bool{}
pool.bindings = map[string]map[string]int{}
pool.bindings = map[string]map[string]Job{}
pool.utils = map[string][]UtilGPUTimeSeries{}
pool.TotalGPU = 0
@ -555,7 +555,7 @@ func (pool *ResourcePool) releaseNetwork(network string) {
pool.networkMu.Unlock()
}
func (pool *ResourcePool) attach(GPU string, job string) {
func (pool *ResourcePool) attach(GPU string, job Job) {
pool.subscriptionsMu.Lock()
defer pool.subscriptionsMu.Unlock()
pool.bindingsMu.Lock()
@ -564,12 +564,12 @@ func (pool *ResourcePool) attach(GPU string, job string) {
if _, ok := pool.subscriptions[GPU]; !ok {
pool.subscriptions[GPU] = map[string]int{}
}
pool.subscriptions[GPU][job] = int(time.Now().Unix())
pool.subscriptions[GPU][job.Name] = int(time.Now().Unix())
if _, ok := pool.bindings[GPU]; !ok {
pool.bindings[GPU] = map[string]int{}
pool.bindings[GPU] = map[string]Job{}
}
pool.bindings[GPU][job] = int(time.Now().Unix())
pool.bindings[GPU][job.Name] = job
if _, ok := pool.utils[GPU]; !ok {
pool.utils[GPU] = []UtilGPUTimeSeries{}
@ -609,7 +609,7 @@ func (pool *ResourcePool) countGPU() (int, int) {
return pool.TotalGPU - pool.UsingGPU, pool.UsingGPU
}
func (pool *ResourcePool) getBindings() map[string]map[string]int {
func (pool *ResourcePool) getBindings() map[string]map[string]Job {
return pool.bindings
}
@ -743,15 +743,14 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
if pool.TotalGPU == 0 {
return []NodeStatus{}
}
//loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU)
loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU)
/* first, choose sharable GPUs */
/*
if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio {
// check sharable
allocationType = 1
pred := InstanceOfOptimizer().PredictReq(job, "Worker")
availables := map[string][]GPUStatus{}
for cur := start; ; {
if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock()
@ -765,14 +764,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
if jobs, ok := pool.bindings[status.UUID]; ok {
totalUtil := pred.UtilGPU
for job := range jobs {
if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok {
totalUtil += utilT
} else {
totalUtil += 100
}
for _, job := range jobs {
utilT := InstanceOfOptimizer().PredictReq(job, "Worker").UtilGPU
totalUtil += utilT
}
if totalUtil < 100 {
if totalUtil < 200 {
available = append(available, status)
}
}
@ -780,6 +776,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
}
if len(available) >= task.NumberGPU {
candidates = append(candidates, *node)
availables[node.ClientHost] = available
if len(candidates) >= len(job.Tasks)*3+5 {
break
}
@ -793,8 +790,35 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
}
cur = cur.Next
}
if len(candidates) > 0 {
node := candidates[0]
res := NodeStatus{}
res.ClientID = node.ClientID
res.ClientHost = node.ClientHost
res.NumCPU = task.NumberCPU
res.MemTotal = task.Memory
res.Status = availables[node.ClientHost][0:task.NumberGPU]
for i := range res.Status {
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingGPU ++
pool.UsingGPUMu.Unlock()
}
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
}
}
}
for _, t := range res.Status {
pool.attach(t.UUID, job)
}
return []NodeStatus{res}
}
}
*/
//log.Info(candidates)
/* second round, find vacant gpu */
@ -832,12 +856,12 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
}
/* third round, find gpu to be released */
/*
if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule {
estimate := InstanceOfOptimizer().PredictTime(job)
if loadRatio >= pool.enablePreScheduleRatio {
allocationType = 3
availables := map[string][]GPUStatus{}
for cur := start; ; {
if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock()
@ -847,15 +871,14 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
var available []GPUStatus
for _, status := range node.Status {
bindings := pool.getBindings()
if tasks, ok := bindings[status.UUID]; ok {
if len(tasks) > 1 || status.MemoryAllocated == 0 {
if jobs, ok := bindings[status.UUID]; ok {
if len(jobs) > 1 || status.MemoryAllocated == 0 {
continue
}
for taskT, s := range tasks {
est := InstanceOfOptimizer().PredictTime(taskT)
now := (int)(time.Now().Unix())
log.Info(s, now, estimate, est)
if now-s > est.Total-est.Post-estimate.Pre-15 {
for _, jobT := range jobs {
est := InstanceOfOptimizer().PredictTime(jobT)
now := time.Now().Unix()
if int(now-jobT.StartedAt) > est.Total-est.Post-estimate.Pre-15 {
available = append(available, status)
}
}
@ -863,6 +886,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
}
if len(available) >= task.NumberGPU {
candidates = append(candidates, *node)
availables[node.ClientHost] = available
if len(candidates) >= len(job.Tasks)*3+5 {
break
}
@ -877,9 +901,35 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
cur = cur.Next
}
//log.Info(candidates)
if len(candidates) > 0 {
node := candidates[0]
res := NodeStatus{}
res.ClientID = node.ClientID
res.ClientHost = node.ClientHost
res.NumCPU = task.NumberCPU
res.MemTotal = task.Memory
res.Status = availables[node.ClientHost][0:task.NumberGPU]
for i := range res.Status {
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingGPU ++
pool.UsingGPUMu.Unlock()
}
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
}
}
}
for _, t := range res.Status {
pool.attach(t.UUID, job)
}
return []NodeStatus{res}
}
}
}
*/
if len(candidates) > 0 {
log.Info("allocationType is ", allocationType)
@ -967,7 +1017,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
}
}
for _, t := range res.Status {
pool.attach(t.UUID, task.Job)
pool.attach(t.UUID, job)
}
flag := false

View File

@ -18,6 +18,7 @@ type Job struct {
Priority JobPriority `json:"priority"`
RunBefore int `json:"run_before"`
CreatedAt int `json:"created_at"`
StartedAt int64 `json:"started_at"`
UpdatedAt int `json:"updated_at"`
CreatedBy int `json:"created_by"`
Locality int `json:"locality"`