From 259409c77b5cc0eff15d242c42a1c8c2c9eb2de2 Mon Sep 17 00:00:00 2001 From: Newnius Date: Sun, 3 May 2020 23:32:38 +0800 Subject: [PATCH] update --- src/job_manager.go | 1 + src/resource_pool.go | 367 ++++++++++++++++++++++++++------------ src/scheduler_FCFS.go | 35 ++-- src/scheduler_fair.go | 89 +++++---- src/scheduler_priority.go | 35 ++-- src/util.go | 9 + 6 files changed, 363 insertions(+), 173 deletions(-) diff --git a/src/job_manager.go b/src/job_manager.go index e767b64..15cd925 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -115,6 +115,7 @@ func (jm *JobManager) start() { v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) v.Set("network", network) v.Set("should_wait", "1") + v.Set("HDFS_path", "1") resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") if err != nil { diff --git a/src/resource_pool.go b/src/resource_pool.go index 010535f..5753341 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -8,16 +8,14 @@ import ( log "github.com/sirupsen/logrus" "math/rand" "strconv" - "hash/fnv" "sort" + "hash/fnv" ) type ResourcePool struct { - //mu sync.Mutex - //nodes map[string]NodeStatus - pools []map[string]NodeStatus - poolsMu []sync.Mutex poolsCount int + pools []PoolSeg + poolsMu sync.Mutex history []PoolStatus @@ -28,7 +26,8 @@ type ResourcePool struct { networksFree map[string]bool networkMu sync.Mutex - versions map[string]float64 + versions map[string]float64 + versionsMu sync.Mutex counter int counterTotal int @@ -40,6 +39,77 @@ type ResourcePool struct { TotalGPU int } +func (pool *ResourcePool) start() { + pool.networks = map[string]bool{} + pool.networksFree = map[string]bool{} + pool.versions = map[string]float64{} + + pool.bindings = map[string]map[string]int{} + pool.utils = map[string][]UtilGPUTimeSeries{} + + pool.TotalGPU = 0 + + /* init pools */ + pool.poolsCount = 300 + for i := 0; i < pool.poolsCount; i++ { + pool.pools = append(pool.pools, PoolSeg{Lock: sync.Mutex{}, IsVirtual: true, ID: i}) + } + /* make non-virtual seg */ + for i := 0; i < pool.poolsCount/3; i++ { + pool.pools[rand.Intn(pool.poolsCount)].IsVirtual = false + } + /* make working srg */ + for i := 0; i < 10; i++ { + pool.pools[rand.Intn(pool.poolsCount)].Nodes = map[string]*NodeStatus{} + } + /* init Next pointer */ + var pre *PoolSeg + for i := pool.poolsCount*2 - 1; ; i-- { + if pool.pools[i%pool.poolsCount].Next != nil { + break + } + pool.pools[i%pool.poolsCount].Next = pre + if pool.pools[i%pool.poolsCount].Nodes != nil { + pre = &pool.pools[i%pool.poolsCount] + } + } + + pool.heartBeat = map[string]time.Time{} + go func() { + pool.checkDeadNodes() + }() + + pool.history = []PoolStatus{} + go func() { + pool.saveStatusHistory() + }() +} + +/* check dead nodes periodically */ +func (pool *ResourcePool) checkDeadNodes() { + for { + pool.heartBeatMu.Lock() + for k, v := range pool.heartBeat { + if v.Add(time.Second * 30).Before(time.Now()) { + poolID := pool.getNodePool(k) + seg := &pool.pools[poolID] + if seg.Nodes == nil { + seg = seg.Next + } + seg.Lock.Lock() + delete(seg.Nodes, k) + seg.Lock.Unlock() + pool.versionsMu.Lock() + delete(pool.versions, k) + pool.versionsMu.Unlock() + log.Info(" node ", k, " is offline") + } + } + pool.heartBeatMu.Unlock() + time.Sleep(time.Second * 10) + } +} + func (pool *ResourcePool) GPUModelToPower(model string) int { mapper := map[string]int{ "K40": 1, "Tesla K40": 1, @@ -58,111 +128,83 @@ func (pool *ResourcePool) getNodePool(name string) int { return int(h.Sum32()) % pool.poolsCount } -func (pool *ResourcePool) start() { - //TODO: retrieve networks from yao-agent-master in blocking io - pool.networks = map[string]bool{} - pool.networksFree = map[string]bool{} - pool.versions = map[string]float64{} +/* save pool status periodically */ +func (pool *ResourcePool) saveStatusHistory() { + /* waiting for data */ + time.Sleep(time.Second * 30) + for { + summary := PoolStatus{} - pool.bindings = map[string]map[string]int{} - pool.utils = map[string][]UtilGPUTimeSeries{} + UtilCPU := 0.0 + TotalCPU := 0 + TotalMem := 0 + AvailableMem := 0 - pool.TotalGPU = 0 + TotalGPU := 0 + UtilGPU := 0 + TotalMemGPU := 0 + AvailableMemGPU := 0 + nodesCount := 0 - pool.poolsCount = 100 - for i := 0; i < pool.poolsCount; i++ { - pool.pools = append(pool.pools, map[string]NodeStatus{}) - pool.poolsMu = append(pool.poolsMu, sync.Mutex{}) + start := pool.pools[0].Next + for cur := start; ; { + cur.Lock.Lock() + for _, node := range cur.Nodes { + UtilCPU += node.UtilCPU + TotalCPU += node.NumCPU + TotalMem += node.MemTotal + AvailableMem += node.MemAvailable + + for _, GPU := range node.Status { + UtilGPU += GPU.UtilizationGPU + TotalGPU ++ + TotalMemGPU += GPU.MemoryTotal + AvailableMemGPU += GPU.MemoryFree + } + } + nodesCount += len(cur.Nodes) + cur.Lock.Unlock() + cur = cur.Next + if cur == start { + break + } + } + summary.TimeStamp = time.Now().Format("2006-01-02 15:04:05") + summary.UtilCPU = UtilCPU / (float64(nodesCount) + 0.001) + summary.TotalCPU = TotalCPU + summary.TotalMem = TotalMem + summary.AvailableMem = AvailableMem + summary.TotalGPU = TotalGPU + if TotalGPU == 0 { + summary.UtilGPU = 0.0 + } else { + summary.UtilGPU = UtilGPU / TotalGPU + } + summary.TotalMemGPU = TotalMemGPU + summary.AvailableMemGPU = AvailableMemGPU + + pool.history = append(pool.history, summary) + + if len(pool.history) > 60 { + pool.history = pool.history[len(pool.history)-60:] + } + + pool.TotalGPU = TotalGPU + time.Sleep(time.Second * 60) } - - /* check dead nodes */ - go func() { - pool.heartBeat = map[string]time.Time{} - - for { - pool.heartBeatMu.Lock() - for k, v := range pool.heartBeat { - if v.Add(time.Second * 30).Before(time.Now()) { - poolID := pool.getNodePool(k) - pool.poolsMu[poolID].Lock() - delete(pool.pools[poolID], k) - delete(pool.versions, k) - pool.poolsMu[poolID].Unlock() - } - } - pool.heartBeatMu.Unlock() - time.Sleep(time.Second * 10) - } - }() - - /* save pool status periodically */ - go func() { - /* waiting for data */ - pool.history = []PoolStatus{} - time.Sleep(time.Second * 30) - for { - summary := PoolStatus{} - - UtilCPU := 0.0 - TotalCPU := 0 - TotalMem := 0 - AvailableMem := 0 - - TotalGPU := 0 - UtilGPU := 0 - TotalMemGPU := 0 - AvailableMemGPU := 0 - nodesCount := 0 - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[i].Lock() - for _, node := range pool.pools[i] { - UtilCPU += node.UtilCPU - TotalCPU += node.NumCPU - TotalMem += node.MemTotal - AvailableMem += node.MemAvailable - - for _, GPU := range node.Status { - UtilGPU += GPU.UtilizationGPU - TotalGPU ++ - TotalMemGPU += GPU.MemoryTotal - AvailableMemGPU += GPU.MemoryFree - } - } - nodesCount += len(pool.pools[i]) - pool.poolsMu[i].Unlock() - } - summary.TimeStamp = time.Now().Format("2006-01-02 15:04:05") - summary.UtilCPU = UtilCPU / (float64(nodesCount) + 0.001) - summary.TotalCPU = TotalCPU - summary.TotalMem = TotalMem - summary.AvailableMem = AvailableMem - summary.TotalGPU = TotalGPU - if TotalGPU == 0 { - summary.UtilGPU = 0.0 - } else { - summary.UtilGPU = UtilGPU / TotalGPU - } - summary.TotalMemGPU = TotalMemGPU - summary.AvailableMemGPU = AvailableMemGPU - - pool.history = append(pool.history, summary) - - if len(pool.history) > 60 { - pool.history = pool.history[len(pool.history)-60:] - } - - pool.TotalGPU = TotalGPU - time.Sleep(time.Second * 60) - } - }() } +/* update node info */ func (pool *ResourcePool) update(node NodeStatus) { - poolID := pool.getNodePool(node.ClientID) - - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() + segID := pool.getNodePool(node.ClientID) + seg := &pool.pools[segID] + if seg.Nodes == nil { + seg = seg.Next + } + seg.Lock.Lock() + defer seg.Lock.Unlock() + /* init bindings */ go func(node NodeStatus) { pool.bindingsMu.Lock() defer pool.bindingsMu.Unlock() @@ -180,46 +222,137 @@ func (pool *ResourcePool) update(node NodeStatus) { }(node) pool.counterTotal++ + pool.versionsMu.Lock() if version, ok := pool.versions[node.ClientID]; ok && version == node.Version { + pool.versionsMu.Unlock() return } - + pool.versionsMu.Unlock() + pool.counter++ log.Debug(node.Version, "!=", pool.versions[node.ClientID]) - pool.counter++ - status, ok := pool.pools[poolID][node.ClientID] + status, ok := seg.Nodes[node.ClientID] if ok { + /* remain allocation info */ for i, GPU := range status.Status { if GPU.UUID == node.Status[i].UUID { node.Status[i].MemoryAllocated = GPU.MemoryAllocated } } } - pool.pools[poolID][node.ClientID] = node + seg.Nodes[node.ClientID] = &node + if len(seg.Nodes) > 10 { + pool.scaleSeg(seg) + } pool.versions[node.ClientID] = node.Version } +/* spilt seg */ +func (pool *ResourcePool) scaleSeg(seg *PoolSeg) { + go func() { + pool.poolsMu.Lock() + defer pool.poolsMu.Unlock() + + var candidate *PoolSeg + seg.Lock.Lock() + + /* find previous seg */ + var pre *PoolSeg + for i := seg.ID + pool.poolsCount - 1; i >= 0; i-- { + if pool.pools[i%pool.poolsCount].Next != seg { + pre = &pool.pools[i%pool.poolsCount] + break + } + } + + step := seg.ID - pre.ID + if step < 0 { + step += pool.poolsCount + } + + /* find seg in the nearest middle */ + minDistance := step + for i := 1; i < step; i++ { + if !pool.pools[(i+pre.ID)%pool.poolsCount].IsVirtual { + distance := i - step/2 + if distance < 0 { + distance = -distance + } + if candidate == nil || distance < minDistance { + candidate = &pool.pools[i] + minDistance = distance + } + } + } + + /* update Next */ + if candidate != nil { + distance := candidate.ID - seg.ID + if distance < 0 { + distance = -distance + } + for i := 0; i < distance; i++ { + pool.pools[(i+pre.ID)%pool.poolsCount].Lock.Lock() + pool.pools[(i+pre.ID)%pool.poolsCount].Next = candidate + pool.pools[(i+pre.ID)%pool.poolsCount].Lock.Unlock() + } + candidate.Lock.Lock() + candidate.Next = seg + /* move nodes */ + nodesToMove := map[string]*NodeStatus{} + for _, node := range seg.Nodes { + seg2ID := pool.getNodePool(node.ClientID) + seg2 := &pool.pools[seg2ID] + if seg2.Nodes == nil { + seg2 = seg2.Next + } + if seg2 != seg { + nodesToMove[node.ClientID] = node + } + } + for _, node := range nodesToMove { + delete(seg.Nodes, node.ClientID) + } + candidate.Nodes = nodesToMove + candidate.Lock.Unlock() + } + seg.Lock.Unlock() + }() +} + +/* get node by ClientID */ func (pool *ResourcePool) getByID(id string) NodeStatus { poolID := pool.getNodePool(id) + seg := &pool.pools[poolID] + if seg.Nodes == nil { + seg = seg.Next + } + seg.Lock.Lock() + defer seg.Lock.Unlock() - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() - - status, ok := pool.pools[poolID][id] + status, ok := seg.Nodes[id] if ok { - return status + return *status } return NodeStatus{} } +/* get all nodes */ func (pool *ResourcePool) list() MsgResource { nodes := map[string]NodeStatus{} - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[i].Lock() - for k, node := range pool.pools[i] { - nodes[k] = node + + start := pool.pools[0].Next + for cur := start; ; { + cur.Lock.Lock() + cur.Lock.Unlock() + + for k, node := range cur.Nodes { + nodes[k] = *node + } + cur = cur.Next + if cur == start { + break } - pool.poolsMu[i].Unlock() } return MsgResource{Code: 0, Resource: nodes} } diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index ed7fa91..338a6a1 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -88,12 +88,14 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) { } func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { - poolID := rand.Intn(pool.poolsCount) - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() + segID := rand.Intn(pool.poolsCount) + seg := &pool.pools[segID] + if seg.Nodes == nil { + seg = seg.Next + } res := NodeStatus{} - for id, node := range pool.pools[poolID] { + for id, node := range seg.Nodes { var available []GPUStatus for _, status := range node.Status { if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { @@ -122,11 +124,15 @@ func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task, nodes []Node } func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) { - poolID := rand.Intn(pool.poolsCount) - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() + segID := pool.getNodePool(agent.ClientID) + seg := &pool.pools[segID] + if seg.Nodes == nil { + seg = seg.Next + } + seg.Lock.Lock() + defer seg.Lock.Unlock() - node := pool.pools[poolID][agent.ClientID] + node := seg.Nodes[agent.ClientID] for _, gpu := range agent.Status { for j := range node.Status { if gpu.UUID == node.Status[j].UUID { @@ -211,9 +217,10 @@ func (scheduler *SchedulerFCFS) Summary() MsgSummary { FreeGPU := 0 UsingGPU := 0 - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[i].Lock() - for _, node := range pool.pools[i] { + start := pool.pools[0].Next + for cur := start; ; { + cur.Lock.Lock() + for _, node := range cur.Nodes { for j := range node.Status { if node.Status[j].MemoryAllocated == 0 { FreeGPU++ @@ -222,7 +229,11 @@ func (scheduler *SchedulerFCFS) Summary() MsgSummary { } } } - pool.poolsMu[i].Unlock() + cur.Lock.Unlock() + cur = cur.Next + if cur == start { + break + } } summary.FreeGPU = FreeGPU summary.UsingGPU = UsingGPU diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index f5d8483..534cbd8 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -331,7 +331,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) { } func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { - poolID := rand.Intn(pool.poolsCount) + segID := rand.Intn(pool.poolsCount) res := NodeStatus{} locks := map[int]sync.Mutex{} @@ -347,13 +347,14 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node allocationType = 1 if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { - for i := 0; i < pool.poolsCount; i++ { - if _, ok := locks[(i+poolID)%pool.poolsCount]; !ok { - pool.poolsMu[(i+poolID)%pool.poolsCount].Lock() - locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount] + start := pool.pools[segID].Next + for cur := start; ; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = cur.Lock } - for _, node := range pool.pools[(i+poolID)%pool.poolsCount] { + for _, node := range cur.Nodes { var available []GPUStatus for _, status := range node.Status { if status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated && status.MemoryFree > task.MemoryGPU { @@ -375,7 +376,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node } } if len(available) >= task.NumberGPU { - candidates = append(candidates, &node) + candidates = append(candidates, node) if len(candidates) >= 8 { break } @@ -384,6 +385,10 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node if len(candidates) >= 8 { break } + cur = cur.Next + if cur == start { + break + } } } //log.Info(candidates) @@ -392,12 +397,13 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node /* second round, find vacant gpu */ if len(candidates) == 0 { allocationType = 2 - for i := 0; i < pool.poolsCount; i++ { - if _, ok := locks[(i+poolID)%pool.poolsCount]; !ok { - pool.poolsMu[(i+poolID)%pool.poolsCount].Lock() - locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount] + start := pool.pools[segID].Next + for cur := start; ; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = cur.Lock } - for _, node := range pool.pools[(i+poolID)%pool.poolsCount] { + for _, node := range cur.Nodes { var available []GPUStatus for _, status := range node.Status { if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { @@ -405,7 +411,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node } } if len(available) >= task.NumberGPU { - candidates = append(candidates, &node) + candidates = append(candidates, node) availableGPUs[node.ClientID] = available if len(candidates) >= 8 { break @@ -415,6 +421,10 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node if len(candidates) >= 8 { break } + cur = cur.Next + if cur == start { + break + } } //log.Info(candidates) } @@ -429,12 +439,13 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node if pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enablePreScheduleRatio && valid { allocationType = 3 - for i := 0; i < pool.poolsCount; i++ { - if _, ok := locks[(i+poolID)%pool.poolsCount]; !ok { - pool.poolsMu[(i+poolID)%pool.poolsCount].Lock() - locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount] + start := pool.pools[segID].Next + for cur := start; ; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = cur.Lock } - for _, node := range pool.pools[(i+poolID)%pool.poolsCount] { + for _, node := range cur.Nodes { var available []GPUStatus for _, status := range node.Status { bindings := pool.getBindings() @@ -455,7 +466,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node } } if len(available) >= task.NumberGPU { - candidates = append(candidates, &node) + candidates = append(candidates, node) availableGPUs[node.ClientID] = available if len(candidates) >= 8 { break @@ -512,7 +523,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node } for i := range locks { - pool.poolsMu[i].Unlock() + locks[i].Unlock() } go func(res NodeStatus) { @@ -538,11 +549,15 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node } func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { - poolID := pool.getNodePool(agent.ClientID) - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() + segID := pool.getNodePool(agent.ClientID) + seg := pool.pools[segID] + if seg.Nodes == nil { + seg = *seg.Next + } + seg.Lock.Lock() + defer seg.Lock.Unlock() - node := pool.pools[poolID][agent.ClientID] + node := seg.Nodes[agent.ClientID] for _, gpu := range agent.Status { for j := range node.Status { if gpu.UUID == node.Status[j].UUID { @@ -678,9 +693,10 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { FreeGPU := 0 UsingGPU := 0 - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[i].Lock() - for _, node := range pool.pools[i] { + start := pool.pools[0].Next + for cur := start; ; { + cur.Lock.Lock() + for _, node := range cur.Nodes { for j := range node.Status { if node.Status[j].MemoryAllocated == 0 { FreeGPU++ @@ -689,7 +705,11 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { } } } - pool.poolsMu[i].Unlock() + cur.Lock.Unlock() + cur = cur.Next + if cur == start { + break + } } summary.FreeGPU = FreeGPU summary.UsingGPU = UsingGPU @@ -713,9 +733,10 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { MemoryGPU := 0.00001 CPU := 0.00001 Memory := 0.0001 - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[i].Lock() - for _, node := range pool.pools[i] { + start := pool.pools[0].Next + for cur := start; ; { + cur.Lock.Lock() + for _, node := range cur.Nodes { CPU += float64(node.NumCPU) Memory += float64(node.MemTotal) for _, card := range node.Status { @@ -723,7 +744,11 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { MemoryGPU += float64(card.MemoryTotal) } } - pool.poolsMu[i].Unlock() + cur.Lock.Unlock() + cur = cur.Next + if cur == start { + break + } } scheduler.queueMu.Lock() diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 2858b4a..7367517 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -112,12 +112,14 @@ func (scheduler *SchedulerPriority) Schedule(job Job) { } func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { - poolID := rand.Intn(pool.poolsCount) - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() + segID := rand.Intn(pool.poolsCount) + seg := &pool.pools[segID] + if seg.Nodes == nil { + seg = seg.Next + } res := NodeStatus{} - for id, node := range pool.pools[poolID] { + for id, node := range seg.Nodes { var available []GPUStatus for _, status := range node.Status { if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { @@ -146,11 +148,15 @@ func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task, nodes [] } func (scheduler *SchedulerPriority) ReleaseResource(job Job, agent NodeStatus) { - poolID := rand.Intn(pool.poolsCount) - pool.poolsMu[poolID].Lock() - defer pool.poolsMu[poolID].Unlock() + segID := pool.getNodePool(agent.ClientID) + seg := &pool.pools[segID] + if seg.Nodes == nil { + seg = seg.Next + } + seg.Lock.Lock() + defer seg.Lock.Unlock() - node := pool.pools[poolID][agent.ClientID] + node := seg.Nodes[agent.ClientID] for _, gpu := range agent.Status { for j := range node.Status { if gpu.UUID == node.Status[j].UUID { @@ -235,9 +241,10 @@ func (scheduler *SchedulerPriority) Summary() MsgSummary { FreeGPU := 0 UsingGPU := 0 - for i := 0; i < pool.poolsCount; i++ { - pool.poolsMu[i].Lock() - for _, node := range pool.pools[i] { + start := pool.pools[0].Next + for cur := start; ; { + cur.Lock.Lock() + for _, node := range cur.Nodes { for j := range node.Status { if node.Status[j].MemoryAllocated == 0 { FreeGPU++ @@ -246,7 +253,11 @@ func (scheduler *SchedulerPriority) Summary() MsgSummary { } } } - pool.poolsMu[i].Unlock() + cur.Lock.Unlock() + cur = cur.Next + if cur == start { + break + } } summary.FreeGPU = FreeGPU summary.UsingGPU = UsingGPU diff --git a/src/util.go b/src/util.go index 17230ef..7feea72 100644 --- a/src/util.go +++ b/src/util.go @@ -6,6 +6,7 @@ import ( "time" "io" "net/http" + "sync" ) type Configuration struct { @@ -195,6 +196,14 @@ type MsgOptimizerPredict struct { Post int `json:"post"` } +type PoolSeg struct { + ID int + Nodes map[string]*NodeStatus + Lock sync.Mutex + Next *PoolSeg + IsVirtual bool +} + func str2int(str string, defaultValue int) int { i, err := strconv.Atoi(str) if err == nil {