1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 08:16:43 +00:00
This commit is contained in:
2020-05-25 21:41:39 +08:00
parent cfd41dae41
commit 35aa64491e
2 changed files with 166 additions and 62 deletions

View File

@@ -250,6 +250,8 @@ func (pool *ResourcePool) saveStatusHistory() {
/* update node info */
func (pool *ResourcePool) update(node NodeStatus) {
pool.poolsMu.Lock()
pool.poolsMu.Unlock()
segID := pool.getNodePool(node.ClientID)
seg := &pool.pools[segID]
if seg.Nodes == nil {
@@ -305,6 +307,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
}
}
} else {
/* TODO: double check node do belong to this seg */
pool.TotalGPUMu.Lock()
pool.TotalGPU += len(node.Status)
pool.TotalGPUMu.Unlock()
@@ -312,7 +315,9 @@ func (pool *ResourcePool) update(node NodeStatus) {
}
seg.Nodes[node.ClientID] = &node
if len(seg.Nodes) > 10 {
pool.scaleSeg(seg)
go func() {
pool.scaleSeg(seg)
}()
}
pool.versions[node.ClientID] = node.Version
}
@@ -320,74 +325,69 @@ func (pool *ResourcePool) update(node NodeStatus) {
/* spilt seg */
func (pool *ResourcePool) scaleSeg(seg *PoolSeg) {
log.Info("Scaling seg ", seg.ID)
go func() {
pool.poolsMu.Lock()
defer pool.poolsMu.Unlock()
var candidate *PoolSeg
seg.Lock.Lock()
pool.poolsMu.Lock()
defer pool.poolsMu.Unlock()
/* find previous seg */
var pre *PoolSeg
for i := seg.ID + pool.poolsCount - 1; i >= 0; i-- {
if pool.pools[i%pool.poolsCount].Next != seg {
pre = &pool.pools[i%pool.poolsCount]
break
}
var segIDs []int
segIDs = append(segIDs, seg.ID)
/* find previous seg */
var pre *PoolSeg
for i := seg.ID + pool.poolsCount - 1; i >= 0; i-- {
segIDs = append(segIDs, i%pool.poolsCount)
if pool.pools[i%pool.poolsCount].Next != seg {
pre = &pool.pools[i%pool.poolsCount]
break
}
}
step := seg.ID - pre.ID
if step < 0 {
step += pool.poolsCount
distance := seg.ID - pre.ID
if distance < 0 {
distance += pool.poolsCount
}
if distance <= 1 {
log.Warn("Unable to scale, already full")
return
}
candidate := pre
/* walk to the nearest middle */
for i := 0; i < distance/2; i++ {
candidate = candidate.Next
}
/* lock in asc sequence to avoid deadlock */
sort.Ints(segIDs)
for _, id := range segIDs {
pool.pools[id].Lock.Lock()
}
/* update Next */
for cur := pre; cur.ID != candidate.ID; {
cur, cur.Next = cur.Next, candidate
}
/* move nodes */
nodesToMove := map[string]*NodeStatus{}
for _, node := range seg.Nodes {
seg2ID := pool.getNodePool(node.ClientID)
seg2 := &pool.pools[seg2ID]
if seg2.Nodes == nil {
seg2 = seg2.Next
}
/* find seg in the nearest middle */
minDistance := step
for i := 1; i < step; i++ {
distance := i - step/2
if distance < 0 {
distance = -distance
}
if candidate == nil || distance < minDistance {
candidate = &pool.pools[i]
minDistance = distance
}
if seg2 != seg {
nodesToMove[node.ClientID] = node
}
}
for _, node := range nodesToMove {
delete(seg.Nodes, node.ClientID)
}
candidate.Nodes = nodesToMove
/* update Next */
if candidate != nil {
distance := candidate.ID - seg.ID
if distance < 0 {
distance = -distance
}
for i := 0; i < distance; i++ {
pool.pools[(i+pre.ID)%pool.poolsCount].Lock.Lock()
pool.pools[(i+pre.ID)%pool.poolsCount].Next = candidate
pool.pools[(i+pre.ID)%pool.poolsCount].Lock.Unlock()
}
candidate.Lock.Lock()
candidate.Next = seg
/* move nodes */
nodesToMove := map[string]*NodeStatus{}
for _, node := range seg.Nodes {
seg2ID := pool.getNodePool(node.ClientID)
seg2 := &pool.pools[seg2ID]
if seg2.Nodes == nil {
seg2 = seg2.Next
}
if seg2 != seg {
nodesToMove[node.ClientID] = node
}
}
for _, node := range nodesToMove {
delete(seg.Nodes, node.ClientID)
}
candidate.Nodes = nodesToMove
candidate.Lock.Unlock()
}
seg.Lock.Unlock()
}()
for _, id := range segIDs {
pool.pools[id].Lock.Unlock()
}
}
/* get node by ClientID */