1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 08:16:43 +00:00
This commit is contained in:
2020-05-03 23:32:38 +08:00
parent cfc9d9f8b2
commit 259409c77b
6 changed files with 363 additions and 173 deletions

View File

@@ -331,7 +331,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
}
func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus {
poolID := rand.Intn(pool.poolsCount)
segID := rand.Intn(pool.poolsCount)
res := NodeStatus{}
locks := map[int]sync.Mutex{}
@@ -347,13 +347,14 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
allocationType = 1
if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid {
for i := 0; i < pool.poolsCount; i++ {
if _, ok := locks[(i+poolID)%pool.poolsCount]; !ok {
pool.poolsMu[(i+poolID)%pool.poolsCount].Lock()
locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount]
start := pool.pools[segID].Next
for cur := start; ; {
if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock()
locks[cur.ID] = cur.Lock
}
for _, node := range pool.pools[(i+poolID)%pool.poolsCount] {
for _, node := range cur.Nodes {
var available []GPUStatus
for _, status := range node.Status {
if status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated && status.MemoryFree > task.MemoryGPU {
@@ -375,7 +376,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
}
}
if len(available) >= task.NumberGPU {
candidates = append(candidates, &node)
candidates = append(candidates, node)
if len(candidates) >= 8 {
break
}
@@ -384,6 +385,10 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
if len(candidates) >= 8 {
break
}
cur = cur.Next
if cur == start {
break
}
}
}
//log.Info(candidates)
@@ -392,12 +397,13 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
/* second round, find vacant gpu */
if len(candidates) == 0 {
allocationType = 2
for i := 0; i < pool.poolsCount; i++ {
if _, ok := locks[(i+poolID)%pool.poolsCount]; !ok {
pool.poolsMu[(i+poolID)%pool.poolsCount].Lock()
locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount]
start := pool.pools[segID].Next
for cur := start; ; {
if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock()
locks[cur.ID] = cur.Lock
}
for _, node := range pool.pools[(i+poolID)%pool.poolsCount] {
for _, node := range cur.Nodes {
var available []GPUStatus
for _, status := range node.Status {
if status.MemoryAllocated == 0 && status.MemoryUsed < 10 {
@@ -405,7 +411,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
}
}
if len(available) >= task.NumberGPU {
candidates = append(candidates, &node)
candidates = append(candidates, node)
availableGPUs[node.ClientID] = available
if len(candidates) >= 8 {
break
@@ -415,6 +421,10 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
if len(candidates) >= 8 {
break
}
cur = cur.Next
if cur == start {
break
}
}
//log.Info(candidates)
}
@@ -429,12 +439,13 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
if pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enablePreScheduleRatio && valid {
allocationType = 3
for i := 0; i < pool.poolsCount; i++ {
if _, ok := locks[(i+poolID)%pool.poolsCount]; !ok {
pool.poolsMu[(i+poolID)%pool.poolsCount].Lock()
locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount]
start := pool.pools[segID].Next
for cur := start; ; {
if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock()
locks[cur.ID] = cur.Lock
}
for _, node := range pool.pools[(i+poolID)%pool.poolsCount] {
for _, node := range cur.Nodes {
var available []GPUStatus
for _, status := range node.Status {
bindings := pool.getBindings()
@@ -455,7 +466,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
}
}
if len(available) >= task.NumberGPU {
candidates = append(candidates, &node)
candidates = append(candidates, node)
availableGPUs[node.ClientID] = available
if len(candidates) >= 8 {
break
@@ -512,7 +523,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
}
for i := range locks {
pool.poolsMu[i].Unlock()
locks[i].Unlock()
}
go func(res NodeStatus) {
@@ -538,11 +549,15 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
}
func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
poolID := pool.getNodePool(agent.ClientID)
pool.poolsMu[poolID].Lock()
defer pool.poolsMu[poolID].Unlock()
segID := pool.getNodePool(agent.ClientID)
seg := pool.pools[segID]
if seg.Nodes == nil {
seg = *seg.Next
}
seg.Lock.Lock()
defer seg.Lock.Unlock()
node := pool.pools[poolID][agent.ClientID]
node := seg.Nodes[agent.ClientID]
for _, gpu := range agent.Status {
for j := range node.Status {
if gpu.UUID == node.Status[j].UUID {
@@ -678,9 +693,10 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
FreeGPU := 0
UsingGPU := 0
for i := 0; i < pool.poolsCount; i++ {
pool.poolsMu[i].Lock()
for _, node := range pool.pools[i] {
start := pool.pools[0].Next
for cur := start; ; {
cur.Lock.Lock()
for _, node := range cur.Nodes {
for j := range node.Status {
if node.Status[j].MemoryAllocated == 0 {
FreeGPU++
@@ -689,7 +705,11 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
}
}
}
pool.poolsMu[i].Unlock()
cur.Lock.Unlock()
cur = cur.Next
if cur == start {
break
}
}
summary.FreeGPU = FreeGPU
summary.UsingGPU = UsingGPU
@@ -713,9 +733,10 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
MemoryGPU := 0.00001
CPU := 0.00001
Memory := 0.0001
for i := 0; i < pool.poolsCount; i++ {
pool.poolsMu[i].Lock()
for _, node := range pool.pools[i] {
start := pool.pools[0].Next
for cur := start; ; {
cur.Lock.Lock()
for _, node := range cur.Nodes {
CPU += float64(node.NumCPU)
Memory += float64(node.MemTotal)
for _, card := range node.Status {
@@ -723,7 +744,11 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
MemoryGPU += float64(card.MemoryTotal)
}
}
pool.poolsMu[i].Unlock()
cur.Lock.Unlock()
cur = cur.Next
if cur == start {
break
}
}
scheduler.queueMu.Lock()