diff --git a/src/ga_test.go b/src/ga_test.go new file mode 100644 index 0000000..a3ae128 --- /dev/null +++ b/src/ga_test.go @@ -0,0 +1,104 @@ +package main + +import ( + "strconv" + "math/rand" + "time" + "log" + "github.com/MaxHalford/eaopt" + "math" + "testing" +) + +func TestGA(t *testing.T) { + numTask := 20 + + nodesMap = map[string]NodeStatus{} + tasksMap = map[string]Task{} + + for i := 0; i < numTask*3; i++ { + node := NodeStatus{ClientID: strconv.Itoa(i), Rack: strconv.Itoa(i % 40), Domain: strconv.Itoa(i % 4)} + node.NumCPU = 24 + node.MemTotal = 188 + node.TotalBW = 100 + cnt := rand.Intn(3) + 1 + for i := 0; i < cnt; i++ { + node.Status = append(node.Status, GPUStatus{MemoryTotal: 11439, MemoryAllocated: 0, UUID: node.ClientID + strconv.Itoa(i)}) + } + nodesMap[strconv.Itoa(i)] = node + } + for i := 0; i < numTask; i++ { + isPS := false + if i >= 3 { + isPS = true + } + task := Task{Name: strconv.Itoa(i), IsPS: isPS} + task.Memory = 4 + task.NumberCPU = 2 + task.NumberGPU = 1 + tasksMap[strconv.Itoa(i)] = task + } + + var nodes []NodeStatus + var tasks []Task + + for _, node := range nodesMap { + nodes = append(nodes, node) + } + for _, task := range tasksMap { + tasks = append(tasks, task) + } + s := time.Now() + allocation := fastBestFit(nodes, tasks) + log.Println(time.Since(s)) + + // Instantiate a GA with a GAConfig + var ga, err = eaopt.NewDefaultGAConfig().NewGA() + if err != nil { + log.Println(err) + return + } + + // Set the number of generations to run for + ga.NGenerations = math.MaxInt32 + ga.NPops = 1 + ga.PopSize = 30 + uint(numTask/2) + + // Add a custom print function to track progress + ga.Callback = func(ga *eaopt.GA) { + log.Printf("Best fitness at generation %d: %f\n", ga.Generations, ga.HallOfFame[0].Fitness) + } + + bestFitness := math.MaxFloat64 + count := 0 + + ts := time.Now() + + ga.EarlyStop = func(ga *eaopt.GA) bool { + gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness) + if gap <= 0.000001 || ga.HallOfFame[0].Fitness >= bestFitness { + if count >= 30 || time.Since(ts) > time.Second*30 { + log.Println("Early Stop") + return true + } else { + count++ + } + } else { + bestFitness = ga.HallOfFame[0].Fitness + count = 1 + } + return false + } + + // Find the minimum + err = ga.Minimize(VectorFactory) + log.Println(time.Since(ts)) + log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) + //fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) + if err != nil { + log.Println(err) + return + } + + log.Println(allocation) +} diff --git a/src/resource_pool.go b/src/resource_pool.go index 241882a..4daa98e 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -250,6 +250,8 @@ func (pool *ResourcePool) saveStatusHistory() { /* update node info */ func (pool *ResourcePool) update(node NodeStatus) { + pool.poolsMu.Lock() + pool.poolsMu.Unlock() segID := pool.getNodePool(node.ClientID) seg := &pool.pools[segID] if seg.Nodes == nil { @@ -305,6 +307,7 @@ func (pool *ResourcePool) update(node NodeStatus) { } } } else { + /* TODO: double check node do belong to this seg */ pool.TotalGPUMu.Lock() pool.TotalGPU += len(node.Status) pool.TotalGPUMu.Unlock() @@ -312,7 +315,9 @@ func (pool *ResourcePool) update(node NodeStatus) { } seg.Nodes[node.ClientID] = &node if len(seg.Nodes) > 10 { - pool.scaleSeg(seg) + go func() { + pool.scaleSeg(seg) + }() } pool.versions[node.ClientID] = node.Version } @@ -320,74 +325,69 @@ func (pool *ResourcePool) update(node NodeStatus) { /* spilt seg */ func (pool *ResourcePool) scaleSeg(seg *PoolSeg) { log.Info("Scaling seg ", seg.ID) - go func() { - pool.poolsMu.Lock() - defer pool.poolsMu.Unlock() - var candidate *PoolSeg - seg.Lock.Lock() + pool.poolsMu.Lock() + defer pool.poolsMu.Unlock() - /* find previous seg */ - var pre *PoolSeg - for i := seg.ID + pool.poolsCount - 1; i >= 0; i-- { - if pool.pools[i%pool.poolsCount].Next != seg { - pre = &pool.pools[i%pool.poolsCount] - break - } + var segIDs []int + segIDs = append(segIDs, seg.ID) + + /* find previous seg */ + var pre *PoolSeg + for i := seg.ID + pool.poolsCount - 1; i >= 0; i-- { + segIDs = append(segIDs, i%pool.poolsCount) + if pool.pools[i%pool.poolsCount].Next != seg { + pre = &pool.pools[i%pool.poolsCount] + break } + } - step := seg.ID - pre.ID - if step < 0 { - step += pool.poolsCount + distance := seg.ID - pre.ID + if distance < 0 { + distance += pool.poolsCount + } + if distance <= 1 { + log.Warn("Unable to scale, already full") + return + } + + candidate := pre + /* walk to the nearest middle */ + for i := 0; i < distance/2; i++ { + candidate = candidate.Next + } + + /* lock in asc sequence to avoid deadlock */ + sort.Ints(segIDs) + for _, id := range segIDs { + pool.pools[id].Lock.Lock() + } + + /* update Next */ + for cur := pre; cur.ID != candidate.ID; { + cur, cur.Next = cur.Next, candidate + } + + /* move nodes */ + nodesToMove := map[string]*NodeStatus{} + for _, node := range seg.Nodes { + seg2ID := pool.getNodePool(node.ClientID) + seg2 := &pool.pools[seg2ID] + if seg2.Nodes == nil { + seg2 = seg2.Next } - - /* find seg in the nearest middle */ - minDistance := step - for i := 1; i < step; i++ { - distance := i - step/2 - if distance < 0 { - distance = -distance - } - if candidate == nil || distance < minDistance { - candidate = &pool.pools[i] - minDistance = distance - } - + if seg2 != seg { + nodesToMove[node.ClientID] = node } + } + for _, node := range nodesToMove { + delete(seg.Nodes, node.ClientID) + } + candidate.Nodes = nodesToMove - /* update Next */ - if candidate != nil { - distance := candidate.ID - seg.ID - if distance < 0 { - distance = -distance - } - for i := 0; i < distance; i++ { - pool.pools[(i+pre.ID)%pool.poolsCount].Lock.Lock() - pool.pools[(i+pre.ID)%pool.poolsCount].Next = candidate - pool.pools[(i+pre.ID)%pool.poolsCount].Lock.Unlock() - } - candidate.Lock.Lock() - candidate.Next = seg - /* move nodes */ - nodesToMove := map[string]*NodeStatus{} - for _, node := range seg.Nodes { - seg2ID := pool.getNodePool(node.ClientID) - seg2 := &pool.pools[seg2ID] - if seg2.Nodes == nil { - seg2 = seg2.Next - } - if seg2 != seg { - nodesToMove[node.ClientID] = node - } - } - for _, node := range nodesToMove { - delete(seg.Nodes, node.ClientID) - } - candidate.Nodes = nodesToMove - candidate.Lock.Unlock() - } - seg.Lock.Unlock() - }() + for _, id := range segIDs { + pool.pools[id].Lock.Unlock() + } } /* get node by ClientID */