diff --git a/src/pool_seg.go b/src/pool_seg.go index b746cf1..ee8a905 100644 --- a/src/pool_seg.go +++ b/src/pool_seg.go @@ -7,5 +7,4 @@ type PoolSeg struct { Nodes map[string]*NodeStatus Lock sync.Mutex Next *PoolSeg - IsVirtual bool } diff --git a/src/resource_pool.go b/src/resource_pool.go index 5f01eca..4d16c4a 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -87,11 +87,7 @@ func (pool *ResourcePool) init(conf Configuration) { /* init pools */ pool.poolsCount = 300 for i := 0; i < pool.poolsCount; i++ { - pool.pools = append(pool.pools, PoolSeg{Lock: sync.Mutex{}, IsVirtual: true, ID: i}) - } - /* make non-virtual segs */ - for i := 0; i < pool.poolsCount/3; i++ { - pool.pools[rand.Intn(pool.poolsCount)].IsVirtual = false + pool.pools = append(pool.pools, PoolSeg{Lock: sync.Mutex{}, ID: i}) } /* generate working segs */ for i := 0; i < 10; i++ { @@ -204,7 +200,10 @@ func (pool *ResourcePool) saveStatusHistory() { AvailableMemGPU := 0 nodesCount := 0 - start := pool.pools[0].Next + start := pool.pools[0] + if start.Nodes == nil { + start = *start.Next + } for cur := start; ; { cur.Lock.Lock() for _, node := range cur.Nodes { @@ -222,7 +221,7 @@ func (pool *ResourcePool) saveStatusHistory() { } nodesCount += len(cur.Nodes) cur.Lock.Unlock() - cur = cur.Next + cur = *cur.Next if cur.ID == start.ID { break } @@ -295,8 +294,8 @@ func (pool *ResourcePool) update(node NodeStatus) { pool.counterTotal++ pool.versionsMu.Lock() if version, ok := pool.versions[node.ClientID]; ok && version == node.Version { - pool.versionsMu.Unlock() - return + //pool.versionsMu.Unlock() + //return } pool.versionsMu.Unlock() pool.counter++ @@ -349,16 +348,15 @@ func (pool *ResourcePool) scaleSeg(seg *PoolSeg) { /* find seg in the nearest middle */ minDistance := step for i := 1; i < step; i++ { - if !pool.pools[(i+pre.ID)%pool.poolsCount].IsVirtual { - distance := i - step/2 - if distance < 0 { - distance = -distance - } - if candidate == nil || distance < minDistance { - candidate = &pool.pools[i] - minDistance = distance - } + distance := i - step/2 + if distance < 0 { + distance = -distance } + if candidate == nil || distance < minDistance { + candidate = &pool.pools[i] + minDistance = distance + } + } /* update Next */ @@ -417,15 +415,18 @@ func (pool *ResourcePool) getByID(id string) NodeStatus { func (pool *ResourcePool) list() MsgResource { nodes := map[string]NodeStatus{} - start := pool.pools[0].Next + start := pool.pools[0] + if start.Nodes == nil { + start = *start.Next + } for cur := start; ; { cur.Lock.Lock() for k, node := range cur.Nodes { nodes[k] = *node } cur.Lock.Unlock() - cur = cur.Next - if cur == start { + cur = *cur.Next + if cur.ID == start.ID { break } } @@ -668,7 +669,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { allocationType = 1 if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { - for cur := start; cur.ID < cur.Next.ID; { + for cur := start; ; { if _, ok := locks[cur.ID]; !ok { cur.Lock.Lock() locks[cur.ID] = &cur.Lock @@ -705,10 +706,10 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { if len(candidates) >= len(job.Tasks)*3+5 { break } - cur = cur.Next - if cur.ID == start.ID { + if cur.ID > cur.Next.ID { break } + cur = cur.Next } } //log.Info(candidates) @@ -717,7 +718,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { /* second round, find vacant gpu */ if len(candidates) == 0 { allocationType = 2 - for cur := start; cur.ID < cur.Next.ID; { + for cur := start; ; { if _, ok := locks[cur.ID]; !ok { cur.Lock.Lock() locks[cur.ID] = &cur.Lock @@ -740,10 +741,10 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { if len(candidates) >= len(job.Tasks)*3+5 { break } - cur = cur.Next - if cur.ID == start.ID { + if cur.ID > cur.Next.ID { break } + cur = cur.Next } //log.Info(candidates) } @@ -758,7 +759,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { if pool.TotalGPU != 0 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enablePreScheduleRatio && valid { allocationType = 3 - for cur := start; cur.ID < cur.Next.ID; { + for cur := start; ; { if _, ok := locks[cur.ID]; !ok { cur.Lock.Lock() locks[cur.ID] = &cur.Lock @@ -794,6 +795,10 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { if len(candidates) >= len(job.Tasks)*3+5 { break } + if cur.ID > cur.Next.ID { + break + } + cur = cur.Next } //log.Info(candidates) } diff --git a/src/scheduler_capacity.go b/src/scheduler_capacity.go index 9242a17..07b251c 100644 --- a/src/scheduler_capacity.go +++ b/src/scheduler_capacity.go @@ -31,8 +31,6 @@ type SchedulerCapacity struct { queuesSchedulingCnt map[string]int queuesUsingGPUMu sync.Mutex - - mu sync.Mutex } type FairJobSorter []Job @@ -133,8 +131,6 @@ func (scheduler *SchedulerCapacity) Start() { }() } else { log.Debug("No more jobs to scheduling ", time.Now()) - scheduler.schedulingMu.Lock() - scheduler.schedulingMu.Unlock() } scheduler.queueMu.Unlock() }