1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 16:16:44 +00:00

fix deadline

This commit is contained in:
2020-05-25 19:29:35 +08:00
parent aa685ce411
commit fd9f29932e
3 changed files with 33 additions and 33 deletions

View File

@@ -7,5 +7,4 @@ type PoolSeg struct {
Nodes map[string]*NodeStatus Nodes map[string]*NodeStatus
Lock sync.Mutex Lock sync.Mutex
Next *PoolSeg Next *PoolSeg
IsVirtual bool
} }

View File

@@ -87,11 +87,7 @@ func (pool *ResourcePool) init(conf Configuration) {
/* init pools */ /* init pools */
pool.poolsCount = 300 pool.poolsCount = 300
for i := 0; i < pool.poolsCount; i++ { for i := 0; i < pool.poolsCount; i++ {
pool.pools = append(pool.pools, PoolSeg{Lock: sync.Mutex{}, IsVirtual: true, ID: i}) pool.pools = append(pool.pools, PoolSeg{Lock: sync.Mutex{}, ID: i})
}
/* make non-virtual segs */
for i := 0; i < pool.poolsCount/3; i++ {
pool.pools[rand.Intn(pool.poolsCount)].IsVirtual = false
} }
/* generate working segs */ /* generate working segs */
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@@ -204,7 +200,10 @@ func (pool *ResourcePool) saveStatusHistory() {
AvailableMemGPU := 0 AvailableMemGPU := 0
nodesCount := 0 nodesCount := 0
start := pool.pools[0].Next start := pool.pools[0]
if start.Nodes == nil {
start = *start.Next
}
for cur := start; ; { for cur := start; ; {
cur.Lock.Lock() cur.Lock.Lock()
for _, node := range cur.Nodes { for _, node := range cur.Nodes {
@@ -222,7 +221,7 @@ func (pool *ResourcePool) saveStatusHistory() {
} }
nodesCount += len(cur.Nodes) nodesCount += len(cur.Nodes)
cur.Lock.Unlock() cur.Lock.Unlock()
cur = cur.Next cur = *cur.Next
if cur.ID == start.ID { if cur.ID == start.ID {
break break
} }
@@ -295,8 +294,8 @@ func (pool *ResourcePool) update(node NodeStatus) {
pool.counterTotal++ pool.counterTotal++
pool.versionsMu.Lock() pool.versionsMu.Lock()
if version, ok := pool.versions[node.ClientID]; ok && version == node.Version { if version, ok := pool.versions[node.ClientID]; ok && version == node.Version {
pool.versionsMu.Unlock() //pool.versionsMu.Unlock()
return //return
} }
pool.versionsMu.Unlock() pool.versionsMu.Unlock()
pool.counter++ pool.counter++
@@ -349,7 +348,6 @@ func (pool *ResourcePool) scaleSeg(seg *PoolSeg) {
/* find seg in the nearest middle */ /* find seg in the nearest middle */
minDistance := step minDistance := step
for i := 1; i < step; i++ { for i := 1; i < step; i++ {
if !pool.pools[(i+pre.ID)%pool.poolsCount].IsVirtual {
distance := i - step/2 distance := i - step/2
if distance < 0 { if distance < 0 {
distance = -distance distance = -distance
@@ -358,7 +356,7 @@ func (pool *ResourcePool) scaleSeg(seg *PoolSeg) {
candidate = &pool.pools[i] candidate = &pool.pools[i]
minDistance = distance minDistance = distance
} }
}
} }
/* update Next */ /* update Next */
@@ -417,15 +415,18 @@ func (pool *ResourcePool) getByID(id string) NodeStatus {
func (pool *ResourcePool) list() MsgResource { func (pool *ResourcePool) list() MsgResource {
nodes := map[string]NodeStatus{} nodes := map[string]NodeStatus{}
start := pool.pools[0].Next start := pool.pools[0]
if start.Nodes == nil {
start = *start.Next
}
for cur := start; ; { for cur := start; ; {
cur.Lock.Lock() cur.Lock.Lock()
for k, node := range cur.Nodes { for k, node := range cur.Nodes {
nodes[k] = *node nodes[k] = *node
} }
cur.Lock.Unlock() cur.Lock.Unlock()
cur = cur.Next cur = *cur.Next
if cur == start { if cur.ID == start.ID {
break break
} }
} }
@@ -668,7 +669,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
allocationType = 1 allocationType = 1
if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid {
for cur := start; cur.ID < cur.Next.ID; { for cur := start; ; {
if _, ok := locks[cur.ID]; !ok { if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock() cur.Lock.Lock()
locks[cur.ID] = &cur.Lock locks[cur.ID] = &cur.Lock
@@ -705,10 +706,10 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
if len(candidates) >= len(job.Tasks)*3+5 { if len(candidates) >= len(job.Tasks)*3+5 {
break break
} }
cur = cur.Next if cur.ID > cur.Next.ID {
if cur.ID == start.ID {
break break
} }
cur = cur.Next
} }
} }
//log.Info(candidates) //log.Info(candidates)
@@ -717,7 +718,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
/* second round, find vacant gpu */ /* second round, find vacant gpu */
if len(candidates) == 0 { if len(candidates) == 0 {
allocationType = 2 allocationType = 2
for cur := start; cur.ID < cur.Next.ID; { for cur := start; ; {
if _, ok := locks[cur.ID]; !ok { if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock() cur.Lock.Lock()
locks[cur.ID] = &cur.Lock locks[cur.ID] = &cur.Lock
@@ -740,10 +741,10 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
if len(candidates) >= len(job.Tasks)*3+5 { if len(candidates) >= len(job.Tasks)*3+5 {
break break
} }
cur = cur.Next if cur.ID > cur.Next.ID {
if cur.ID == start.ID {
break break
} }
cur = cur.Next
} }
//log.Info(candidates) //log.Info(candidates)
} }
@@ -758,7 +759,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
if pool.TotalGPU != 0 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enablePreScheduleRatio && valid { if pool.TotalGPU != 0 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enablePreScheduleRatio && valid {
allocationType = 3 allocationType = 3
for cur := start; cur.ID < cur.Next.ID; { for cur := start; ; {
if _, ok := locks[cur.ID]; !ok { if _, ok := locks[cur.ID]; !ok {
cur.Lock.Lock() cur.Lock.Lock()
locks[cur.ID] = &cur.Lock locks[cur.ID] = &cur.Lock
@@ -794,6 +795,10 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
if len(candidates) >= len(job.Tasks)*3+5 { if len(candidates) >= len(job.Tasks)*3+5 {
break break
} }
if cur.ID > cur.Next.ID {
break
}
cur = cur.Next
} }
//log.Info(candidates) //log.Info(candidates)
} }

View File

@@ -31,8 +31,6 @@ type SchedulerCapacity struct {
queuesSchedulingCnt map[string]int queuesSchedulingCnt map[string]int
queuesUsingGPUMu sync.Mutex queuesUsingGPUMu sync.Mutex
mu sync.Mutex
} }
type FairJobSorter []Job type FairJobSorter []Job
@@ -133,8 +131,6 @@ func (scheduler *SchedulerCapacity) Start() {
}() }()
} else { } else {
log.Debug("No more jobs to scheduling ", time.Now()) log.Debug("No more jobs to scheduling ", time.Now())
scheduler.schedulingMu.Lock()
scheduler.schedulingMu.Unlock()
} }
scheduler.queueMu.Unlock() scheduler.queueMu.Unlock()
} }