From d35e0a57d6cc75a9b2d045d78b4a9415a436d70b Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 27 May 2020 18:04:05 +0800 Subject: [PATCH] update GA --- README.md | 5 + src/allocator.go | 294 +++++++++++++++++++++++++++++++++++++++++++ src/evaluator.go | 122 +++--------------- src/ga.go | 181 +++++++++----------------- src/ga_test.go | 146 +++------------------ src/main.go | 8 ++ src/pool_status.go | 4 +- src/pool_test.go | 9 +- src/resource_pool.go | 4 +- 9 files changed, 402 insertions(+), 371 deletions(-) create mode 100644 src/allocator.go diff --git a/README.md b/README.md index a2a497e..03d631f 100644 --- a/README.md +++ b/README.md @@ -75,4 +75,9 @@ GPU is occupied by which job(s) **PredictDL** ``` ?action=debug_get_predict_dl&job=lstm&seq=1 +``` + +**UpdateAllocateStrategy** +``` +?action=allocator_update_strategy&strategy=bestfit ``` \ No newline at end of file diff --git a/src/allocator.go b/src/allocator.go new file mode 100644 index 0000000..083659c --- /dev/null +++ b/src/allocator.go @@ -0,0 +1,294 @@ +package main + +import ( + "sync" + log "github.com/sirupsen/logrus" + "math" + "time" + "github.com/MaxHalford/eaopt" + "math/rand" +) + +var allocatorInstance *Allocator +var allocatorInstanceLock sync.Mutex + +func InstanceOfAllocator() *Allocator { + defer allocatorInstanceLock.Unlock() + allocatorInstanceLock.Lock() + + if allocatorInstance == nil { + allocatorInstance = &Allocator{} + } + return allocatorInstance +} + +type Allocator struct { + allocationStrategy string +} + +func (allocator *Allocator) init(conf Configuration) { + +} + +func (allocator *Allocator) updateStrategy(strategy string) bool { + allocator.allocationStrategy = strategy + log.Info("Allocator strategy switched to ", strategy) + return true +} + +func (allocator *Allocator) allocate(nodes []NodeStatus, tasks []Task) Allocation { + //log.Info(nodes) + //log.Info(tasks) + var allocation Allocation + switch allocator.allocationStrategy { + case "bestfit": + allocation = allocator.fastBestFit(nodes, tasks) + break + case "ga": + if len(tasks) >= 3 { + allocation = allocator.GA(nodes, tasks, false) + } else { + allocation = allocator.fastBestFit(nodes, tasks) + } + break + case "mixed": + if len(tasks) > 3 { + allocation = allocator.GA(nodes, tasks, true) + } else { + allocation = allocator.fastBestFit(nodes, tasks) + } + break + default: + allocation = allocator.fastBestFit(nodes, tasks) + } + return allocation +} + +func (allocator *Allocator) fastBestFit(nodes []NodeStatus, tasks []Task) Allocation { + eva := Evaluator{} + eva.init(nodes, tasks) + + allocation := Allocation{Flags: map[string]bool{"valid": true}} + allocation.TasksOnNode = map[string][]Task{} + for _, task := range tasks { + minCost := math.MaxFloat64 + var best *NodeStatus + if task.IsPS { + eva.factorSpread = 1.0 + } else { + eva.factorSpread = -1.0 + } + for i, node := range nodes { + if _, ok := allocation.TasksOnNode[node.ClientID]; !ok { + allocation.TasksOnNode[node.ClientID] = []Task{} + } + numberGPU := 0 + for _, gpu := range node.Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 1 + } + } + if task.NumberGPU > numberGPU { + continue + } + eva.add(node, task) + cost := eva.calculate() + eva.remove(node, task) + //log.Info(node, cost) + if cost < minCost || best == nil { + minCost = cost + best = &nodes[i] + } + } + if best == nil { + allocation.Flags["valid"] = false + break + } else { + //log.Info(task, " choose ", best.ClientID) + //fmt.Println(task, nodeID, allocation.TasksOnNode, minCost) + allocation.TasksOnNode[best.ClientID] = append(allocation.TasksOnNode[best.ClientID], task) + eva.add(*best, task) + for i := range best.Status { + //allocate more than 1 + if best.Status[i].MemoryAllocated == 0 { + best.Status[i].MemoryAllocated += task.MemoryGPU + break + } + } + } + } + log.Info("BestFit Cost:", eva.calculate()) + return allocation +} + +func (allocator *Allocator) GA(nodes []NodeStatus, tasks []Task, useBestFit bool) Allocation { + // Instantiate a GA with a GAConfig + var ga, err = eaopt.NewDefaultGAConfig().NewGA() + if err != nil { + log.Warn(err) + return Allocation{Flags: map[string]bool{"valid": false}} + } + + // Set the number of generations to run for + ga.NGenerations = math.MaxInt32 + ga.NPops = 1 + ga.PopSize = 30 + uint(len(tasks)/2) + + // Add a custom print function to track progress + ga.Callback = func(ga *eaopt.GA) { + log.Info("Best fitness at generation ", ga.Generations, ": ", ga.HallOfFame[0].Fitness) + } + + /* remember best */ + best := Allocation{Flags: map[string]bool{"valid": false}} + bestFitness := math.MaxFloat64 + count := 0 + + ts := time.Now() + ga.EarlyStop = func(ga *eaopt.GA) bool { + improvement := -(ga.HallOfFame[0].Fitness - bestFitness) + if improvement <= 0.000001 { + if count >= 30+len(tasks) || time.Since(ts) > time.Second*30 { + //log.Info("Early Stop") + return true + } else { + count++ + } + } else { + bestFitness = ga.HallOfFame[0].Fitness + count = 1 + best = ga.HallOfFame[0].Genome.(Allocation) + } + return false + } + + var Factory = func(rng *rand.Rand) eaopt.Genome { + allocation := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": true}} + + var nodesT []NodeStatus + for _, node := range nodes { + /* copy in order not to modify original data */ + nodesT = append(nodesT, node.Copy()) + } + for _, node := range nodesT { + allocation.Nodes[node.ClientID] = node + } + for _, task := range tasks { + allocation.Tasks = append(allocation.Tasks, task) + } + + /* shuffle */ + for n := len(tasks); n > 0; n-- { + randIndex := rng.Intn(n) + allocation.Tasks[n-1], allocation.Tasks[randIndex] = allocation.Tasks[randIndex], allocation.Tasks[n-1] + } + + for _, node := range nodesT { + allocation.Nodes[node.ClientID] = node + allocation.NodeIDs = append(allocation.NodeIDs, node.ClientID) + } + + t := rng.Int() % 10 + if t == 0 && useBestFit { + /* best-fit */ + //ts := time.Now() + allocation.TasksOnNode = allocator.fastBestFit(nodesT, tasks).TasksOnNode + //log.Println(time.Since(ts)) + //fmt.Println("Best Fit") + } else if t%2 == 0 { + /* first-fit */ + for _, task := range tasks { + if nodeID, ok := randomFit(allocation, task); ok { + allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) + cnt := task.NumberGPU + for i := range allocation.Nodes[nodeID].Status { + if allocation.Nodes[nodeID].Status[i].MemoryAllocated == 0 { + allocation.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU + cnt-- + if cnt == 0 { + break + } + } + } + } else { + allocation.Flags["valid"] = false + break + } + } + } else { + /* random-fit */ + for _, task := range tasks { + if nodeID, ok := randomFit(allocation, task); ok { + allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) + cnt := task.NumberGPU + for i := range allocation.Nodes[nodeID].Status { + if allocation.Nodes[nodeID].Status[i].MemoryAllocated == 0 { + allocation.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU + cnt-- + if cnt == 0 { + break + } + } + } + } else { + allocation.Flags["valid"] = false + break + } + } + } + //fmt.Println(evaluatue(allocation)) + //fmt.Println(allocation) + return allocation + } + + // Find the minimum + err = ga.Minimize(Factory) + log.Info("GA uses ", time.Since(ts)) + //log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) + //log.Println(ga.HallOfFame[0].Genome.(Allocation).Flags) + //log.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) + if err != nil { + log.Warn(err) + return Allocation{Flags: map[string]bool{"valid": false}} + } + return best +} + +func randomFit(allocation Allocation, task Task) (string, bool) { + flag := false + nodeID := "" + for nodeID = range allocation.Nodes { + numberGPU := 0 + for _, gpu := range allocation.Nodes[nodeID].Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 1 + } + } + if task.NumberGPU <= numberGPU { + flag = true + break + } + } + return nodeID, flag +} + +func firstFit(allocation Allocation, task Task) (string, bool) { + flag := false + nodeID := "" + for _, nodeID = range allocation.NodeIDs { + if _, ok := allocation.Nodes[nodeID]; !ok { + continue + } + numberGPU := 0 + for _, gpu := range allocation.Nodes[nodeID].Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 1 + } + } + if task.NumberGPU <= numberGPU { + flag = true + break + } + } + return nodeID, flag +} diff --git a/src/evaluator.go b/src/evaluator.go index 95ac17e..ff82eca 100644 --- a/src/evaluator.go +++ b/src/evaluator.go @@ -34,8 +34,6 @@ func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) { } func (eva *Evaluator) add(node NodeStatus, task Task) { - /* update node load cost */ - /* update network cost */ if _, ok := eva.nodes[node.ClientID]; !ok { eva.nodes[node.ClientID] = map[string]int{"PS": 0, "Worker": 0} @@ -66,17 +64,19 @@ func (eva *Evaluator) add(node NodeStatus, task Task) { eva.totalWorker++ } + /* update node load cost */ numberGPU := 1 for _, gpu := range node.Status { if gpu.MemoryAllocated != 0 { numberGPU += 1 } } - eva.costLoad += float64(numberGPU) / float64(len(node.Status)) + eva.costLoad += float64(numberGPU+task.NumberGPU) / float64(len(node.Status)) } func (eva *Evaluator) remove(node NodeStatus, task Task) { + /* update network cost */ if task.IsPS { eva.costNetwork -= eva.factorNode * float64(eva.racks[node.Rack]["Worker"]-eva.nodes[node.ClientID]["Worker"]) eva.costNetwork -= eva.factorRack * float64(eva.domains[node.Domain]["Worker"]-eva.racks[node.Rack]["Worker"]) @@ -91,123 +91,31 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) { eva.costNetwork -= eva.factorRack * float64(eva.domains[node.Domain]["PS"]-eva.racks[node.Rack]["PS"]) eva.costNetwork -= eva.factorDomain * float64(eva.totalPS-eva.domains[node.Domain]["PS"]) - //fmt.Println(eva.totalWorker, eva.domains[node.Domain]) - eva.nodes[node.ClientID]["Worker"]-- eva.racks[node.Rack]["Worker"]-- eva.domains[node.Domain]["Worker"]-- eva.totalWorker-- } + /* update node load cost */ numberGPU := 1 for _, gpu := range node.Status { if gpu.MemoryAllocated != 0 { numberGPU += 1 } } - eva.costLoad -= float64(numberGPU) / float64(len(node.Status)) + eva.costLoad -= float64(numberGPU+task.NumberGPU) / float64(len(node.Status)) } func (eva *Evaluator) calculate() float64 { - return eva.costNetwork + eva.factorSpread*eva.costLoad/float64(eva.totalPS+eva.totalWorker) -} - -func evaluate(allocation Allocation) float64 { - /* Calculate cost for network */ - costNetwork := 0.0 - domains := map[string]map[string]int{} - racks := map[string]map[string]int{} - upstreams := map[string]string{} - totalPS := 0 - totalWorker := 0 - - taskToNode := map[string]string{} - for nodeID, tasks := range allocation.TasksOnNode { - numPS := 0 - numWorker := 0 - node := allocation.Nodes[nodeID] - for _, task := range tasks { - taskToNode[task.Name] = nodeID - - if _, ok := domains[node.Domain]; !ok { - domains[node.Domain] = map[string]int{"PS": 0, "Worker": 0} - } - if _, ok := racks[node.Rack]; !ok { - racks[node.Rack] = map[string]int{"PS": 0, "Worker": 0} - } - - if task.IsPS { - domains[node.Domain]["PS"]++ - racks[node.Rack]["PS"]++ - numPS++ - totalPS++ - } else { - domains[node.Domain]["Worker"]++ - racks[node.Rack]["Worker"]++ - numWorker++ - totalWorker++ - } - upstreams[node.Rack] = node.Domain - } - costNetwork -= float64(numPS * numWorker) - } - - /* in the same domain */ - for rackID, pair := range racks { - // in the same rack - costNetwork += float64(pair["PS"]*pair["Worker"]) * 1.0 - // cross rack, but in the same domain - costNetwork += float64(pair["PS"]*(domains[upstreams[rackID]]["Worker"]-pair["Worker"])) * 4.0 - } - - /* across domain */ - for _, pair := range domains { - costNetwork += float64(pair["PS"]*(totalWorker-pair["Worker"])) * 40.0 - } - - /* calculate cost for node fitness */ - //cpu, memory, bw - costLB := 0.0 - for nodeID, tasks := range allocation.TasksOnNode { - costCPU := 0.0 - costMem := 0.0 - costBW := 0.0 - costGPU := 0.0 - requestCPU := 0 - requestMem := 0 - requestBW := 0.0 - requestGPU := 0 - numberPS := 0 - numberWorker := 0 - for _, task := range tasks { - requestCPU += task.NumberCPU - requestMem += task.Memory - requestGPU += task.NumberGPU - if task.IsPS { - numberPS++ - } else { - numberWorker++ - } - } - requestBW = float64(numberPS*(totalWorker-numberWorker) + numberWorker*(totalPS-numberPS)) - node := allocation.Nodes[nodeID] - costCPU += (float64(requestCPU) + node.UtilCPU) / float64(node.NumCPU) * 1.0 - costMem += (float64(requestMem + (node.MemTotal - node.MemAvailable))) / float64(node.MemTotal) * 1.0 - costBW += (float64(requestBW) + (node.TotalBW - node.UsingBW)) / node.TotalBW * 2.0 - numberGPU := 0 - for _, gpu := range node.Status { - if gpu.MemoryAllocated == 0 { - numberGPU += 0 - } - } - costGPU += (float64(requestGPU + numberGPU)) / float64(len(node.Status)) * 3.0 - costLB += (costCPU + costMem + costBW + costGPU) / (1.0 + 1.0 + 2.0 + 3.0) - } - costLB /= float64(len(allocation.TasksOnNode)) - costLB *= 100 - //fmt.Println(costLB) - - cost := costNetwork - //cost := 0.0*costLB + 1.0*costNetwork - return cost + usingNodes := 0.0 + for _, pair := range eva.nodes { + if v, ok := pair["PS"]; ok && v > 0 { + usingNodes += 1.0 + } else if v, ok := pair["Worker"]; ok && v > 0 { + usingNodes += 1.0 + } + } + usingNodes /= float64(eva.totalWorker + eva.totalPS) + return eva.costNetwork + eva.factorSpread*eva.costLoad/float64(eva.totalPS+eva.totalWorker) + usingNodes } diff --git a/src/ga.go b/src/ga.go index 4240843..1d8ff77 100644 --- a/src/ga.go +++ b/src/ga.go @@ -17,115 +17,13 @@ type Allocation struct { Tasks []Task } -func randomFit(allocation Allocation, task Task) (string, bool) { - flag := false - nodeID := "" - for nodeID = range allocation.Nodes { - numberGPU := 0 - for _, gpu := range allocation.Nodes[nodeID].Status { - if gpu.MemoryAllocated == 0 { - numberGPU += 1 - } - } - if task.NumberGPU <= numberGPU { - flag = true - break - } - } - return nodeID, flag -} - -func firstFit(allocation Allocation, task Task) (string, bool) { - flag := false - nodeID := "" - for _, nodeID = range allocation.NodeIDs { - if _, ok := allocation.Nodes[nodeID]; !ok { - continue - } - numberGPU := 0 - for _, gpu := range allocation.Nodes[nodeID].Status { - if gpu.MemoryAllocated == 0 { - numberGPU += 1 - } - } - if task.NumberGPU <= numberGPU { - flag = true - break - } - } - return nodeID, flag -} - -func fastBestFit(nodes []NodeStatus, tasks []Task) Allocation { - log.Info(nodes) - log.Info(tasks) - eva := Evaluator{} - eva.init(nodes, tasks) - - allocation := Allocation{Flags: map[string]bool{"valid": true}} - allocation.TasksOnNode = map[string][]Task{} - for _, task := range tasks { - minCost := math.MaxFloat64 - var best *NodeStatus - if task.IsPS { - eva.factorSpread = 1.0 - } else { - eva.factorSpread = -1.0 - } - for i, node := range nodes { - if _, ok := allocation.TasksOnNode[node.ClientID]; !ok { - allocation.TasksOnNode[node.ClientID] = []Task{} - } - numberGPU := 0 - for _, gpu := range node.Status { - if gpu.MemoryAllocated == 0 { - numberGPU += 1 - } - } - if task.NumberGPU > numberGPU { - continue - } - eva.add(node, task) - cost := eva.calculate() - eva.remove(node, task) - //log.Info(node, cost) - if cost < minCost || best == nil { - minCost = cost - best = &nodes[i] - } - } - if best == nil { - allocation.Flags["valid"] = false - break - } else { - log.Info(task, " choose ", best.ClientID) - //fmt.Println(task, nodeID, allocation.TasksOnNode, minCost) - allocation.TasksOnNode[best.ClientID] = append(allocation.TasksOnNode[best.ClientID], task) - eva.add(*best, task) - for i := range best.Status { - //allocate more than 1 - if best.Status[i].MemoryAllocated == 0 { - best.Status[i].MemoryAllocated += task.MemoryGPU - break - } - } - } - } - //log.Info(allocation.TasksOnNode) - log.Println("BestFit Cost:", eva.calculate()) - return allocation -} - /* Evaluate the allocation */ func (X Allocation) Evaluate() (float64, error) { - //log.Info(X) if !X.Flags["valid"] { //fmt.Println("Invalid allocation") return math.MaxFloat64, nil } - //costNetwork := evaluate(X) - var nodes []NodeStatus for _, node := range X.Nodes { nodes = append(nodes, node) @@ -141,19 +39,18 @@ func (X Allocation) Evaluate() (float64, error) { cost := eva.calculate() //log.Info(cost) - //fmt.Println(taskToNode, cost, len(X.Nodes)) - return float64(cost), nil + //return float64(cost) + float64(len(X.Nodes)), nil + return float64(cost) + float64(len(X.Nodes))/float64(len(X.Tasks)), nil + //return float64(cost), nil } -// Mutate a Vector by resampling each element from a normal distribution with -// probability 0.8. +// Mutate a Vector by resampling each element from a normal distribution with probability 0.8. func (X Allocation) Mutate(rng *rand.Rand) { /* remove a node randomly */ - // make sure rng.Intn != 0 && cnt >0 - cnt := rng.Intn(1+len(X.Nodes)/100)%50 + 1 - for i := 0; i < cnt; i++ { + // make sure n > 0 && round >0 + round := rng.Intn(1+len(X.Nodes)/100)%50 + 1 + for i := 0; i < round; i++ { if !X.Flags["valid"] { - //fmt.Println("Invalid allocation") return } //fmt.Println("Mutate") @@ -174,7 +71,10 @@ func (X Allocation) Mutate(rng *rand.Rand) { } delete(X.TasksOnNode, nodeID) } + //log.Info("Delete node ", nodeID) + //log.Info("Before ", X.Nodes) delete(X.Nodes, nodeID) + //log.Info("After ", X.Nodes) //fmt.Println(tasks) @@ -182,18 +82,26 @@ func (X Allocation) Mutate(rng *rand.Rand) { for _, task := range tasks { if nodeID, ok := firstFit(X, task); ok { X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task) + cnt := task.NumberGPU + //log.Info("Add task ", task.Name, " in ", nodeID) + //log.Info("Before ", X.Nodes[nodeID].Status) for i := range X.Nodes[nodeID].Status { if X.Nodes[nodeID].Status[i].MemoryAllocated == 0 { X.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU - break + cnt-- + if cnt == 0 { + break + } } } + + //log.Info("After ", X.Nodes[nodeID].Status) } else { X.Flags["valid"] = false + break } } } - //fmt.Println("After", X) return /* move tasks */ @@ -224,9 +132,9 @@ func (X Allocation) Mutate(rng *rand.Rand) { // Crossover a Vector with another Vector by applying uniform crossover. func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { - // make sure rng.Intn != 0 && cnt >0 - cnt := rng.Intn(1+len(X.Nodes)/100)%10 + 1 - for i := 0; i < cnt; i++ { + // make sure n > 0 && round > 0 + round := rng.Intn(1+len(X.Nodes)/100)%10 + 1 + for i := 0; i < round; i++ { if !Y.(Allocation).Flags["valid"] || !X.Flags["valid"] { return } @@ -246,12 +154,13 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { randIndex := rng.Intn(len(nodeIDs)) nodeID := nodeIDs[randIndex] + /* remove duplicated tasks */ for _, task := range Y.(Allocation).TasksOnNode[nodeID] { //fmt.Println(Y.(Allocation).TasksOnNode[nodeID]) idx := -1 nodeID2, ok := taskToNode[task.Name] if !ok { - log.Println("Error", taskToNode, X.TasksOnNode, task.Name) + log.Warn("Error", taskToNode, X.TasksOnNode, task.Name) } for i, task2 := range X.TasksOnNode[nodeID2] { if task2.Name == task.Name { @@ -259,19 +168,31 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { } } if idx == -1 { - log.Println("Error 2", taskToNode, X.TasksOnNode, task.Name) + log.Warn("Error 2", taskToNode, X.TasksOnNode, task.Name) } //fmt.Println(X.TasksOnNode) copy(X.TasksOnNode[nodeID2][idx:], X.TasksOnNode[nodeID2][idx+1:]) X.TasksOnNode[nodeID2] = X.TasksOnNode[nodeID2][:len(X.TasksOnNode[nodeID2])-1] - for i := range X.Nodes[nodeID].Status { - if X.Nodes[nodeID].Status[i].MemoryAllocated == 0 { - X.Nodes[nodeID].Status[i].MemoryAllocated -= task.MemoryGPU - break + cnt := task.NumberGPU + //log.Info("Remove task ", task.Name, " in ", nodeID2) + //log.Info("Before ", X.Nodes[nodeID2].Status) + for i := range X.Nodes[nodeID2].Status { + /* TODO: determine correct GPU */ + if X.Nodes[nodeID2].Status[i].MemoryAllocated == task.MemoryGPU { + X.Nodes[nodeID2].Status[i].MemoryAllocated -= task.MemoryGPU + cnt-- + if cnt == 0 { + break + } } } + if cnt != 0 { + log.Warn("Cross remove ", cnt) + } + //log.Info("After ", X.Nodes[nodeID].Status) //fmt.Println(X.TasksOnNode) } + /* reschedule tasks on tgt node */ var tasks []Task if _, ok := X.TasksOnNode[nodeID]; ok { @@ -284,7 +205,7 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { if _, ok := X.Nodes[nodeID]; ok { delete(X.Nodes, nodeID) } - X.Nodes[nodeID] = Y.(Allocation).Nodes[nodeID] + X.Nodes[nodeID] = Y.(Allocation).Nodes[nodeID].Copy() var newTasksOnNode []Task for _, task := range Y.(Allocation).TasksOnNode[nodeID] { @@ -296,14 +217,25 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { for _, task := range tasks { if nodeID, ok := firstFit(X, task); ok { X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task) + cnt := task.NumberGPU + //log.Info("Remove task ", task.Name, " in ", nodeID) + //log.Info("Before ", X.Nodes[nodeID].Status) for i := range X.Nodes[nodeID].Status { if X.Nodes[nodeID].Status[i].MemoryAllocated == 0 { X.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU - break + cnt-- + if cnt == 0 { + break + } } } + //log.Info("After ", X.Nodes[nodeID].Status) + if cnt != 0 { + log.Warn("cross add", cnt) + } } else { X.Flags["valid"] = false + break } } } @@ -318,7 +250,7 @@ func (X Allocation) Clone() eaopt.Genome { } Y := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": X.Flags["valid"]}} for id, node := range X.Nodes { - Y.Nodes[id] = node + Y.Nodes[id] = node.Copy() Y.NodeIDs = append(Y.NodeIDs, node.ClientID) } for id, tasks := range X.TasksOnNode { @@ -328,5 +260,6 @@ func (X Allocation) Clone() eaopt.Genome { } Y.TasksOnNode[id] = t } + Y.Tasks = X.Tasks return Y } diff --git a/src/ga_test.go b/src/ga_test.go index ff652bc..f71d802 100644 --- a/src/ga_test.go +++ b/src/ga_test.go @@ -2,28 +2,25 @@ package main import ( "strconv" - "math/rand" "time" log "github.com/sirupsen/logrus" - "github.com/MaxHalford/eaopt" - "math" "testing" ) func TgenerateCase() ([]NodeStatus, []Task) { - numTask := 2 + numTask := 4 var nodes []NodeStatus var tasks []Task - for i := 0; i < numTask; i++ { + for i := 0; i < numTask*3; i++ { node := NodeStatus{ClientID: strconv.Itoa(i), Rack: "Rack-" + strconv.Itoa(i%40), Domain: "Domain-" + strconv.Itoa(i%4)} node.NumCPU = 24 node.UtilCPU = 2.0 node.MemTotal = 188 node.MemAvailable = 20 node.TotalBW = 100 - cnt := 1 + cnt := 4 //cnt := rand.Intn(3) + 1 for i := 0; i < cnt; i++ { node.Status = append(node.Status, GPUStatus{MemoryTotal: 11439, MemoryAllocated: 0, UUID: node.ClientID + "-" + strconv.Itoa(i)}) @@ -32,7 +29,7 @@ func TgenerateCase() ([]NodeStatus, []Task) { } for i := 0; i < numTask; i++ { isPS := false - if i%4 == 0 { + if i < numTask/4 { isPS = true } task := Task{Name: "task-" + strconv.Itoa(i), IsPS: isPS} @@ -46,144 +43,29 @@ func TgenerateCase() ([]NodeStatus, []Task) { } func TestBestFit(t *testing.T) { + return nodes, tasks := TgenerateCase() for _, node := range nodes { log.Info(node) } s := time.Now() - allocation := fastBestFit(nodes, tasks) + allocation := InstanceOfAllocator().fastBestFit(nodes, tasks) log.Println(time.Since(s)) log.Println(allocation) } func TestGA(t *testing.T) { - return + nodes, tasks := TgenerateCase() - // Instantiate a GA with a GAConfig - var ga, err = eaopt.NewDefaultGAConfig().NewGA() - if err != nil { - log.Println(err) - return - } + allocation := InstanceOfAllocator().GA(nodes, tasks, true) - // Set the number of generations to run for - ga.NGenerations = math.MaxInt32 - ga.NPops = 1 - ga.PopSize = 30 + uint(len(tasks)/2) + log.Info(allocation.TasksOnNode) + log.Info(allocation.Nodes) - // Add a custom print function to track progress - ga.Callback = func(ga *eaopt.GA) { - log.Printf("Best fitness at generation %d: %f\n", ga.Generations, ga.HallOfFame[0].Fitness) - } + allocation = InstanceOfAllocator().fastBestFit(nodes, tasks) - bestFitness := math.MaxFloat64 - count := 0 - - ts := time.Now() - - ga.EarlyStop = func(ga *eaopt.GA) bool { - gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness) - if gap <= 0.000001 || ga.HallOfFame[0].Fitness >= bestFitness { - if count >= 30 || time.Since(ts) > time.Second*30 { - log.Println("Early Stop") - return true - } else { - count++ - } - } else { - bestFitness = ga.HallOfFame[0].Fitness - count = 1 - } - return false - } - - var f = func(rng *rand.Rand) eaopt.Genome { - allocation := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": true}} - - //log.Println(nodes) - var nodesT []NodeStatus - for _, node := range nodes { - nodesT = append(nodesT, node.Copy()) - } - - //nodesT[0].Status[0].MemoryAllocated = 100 - //log.Println(nodes[0].Status[0].MemoryAllocated) - - //log.Println(&nodesT[0]) - //log.Println(&nodes[0]) - - for _, node := range nodesT { - allocation.Nodes[node.ClientID] = node - } - for _, task := range tasks { - allocation.Tasks = append(allocation.Tasks, task) - } - - /* shuffle */ - for n := len(tasks); n > 0; n-- { - randIndex := rng.Intn(n) - allocation.Tasks[n-1], allocation.Tasks[randIndex] = allocation.Tasks[randIndex], allocation.Tasks[n-1] - } - - /* pick nodes */ - for _, node := range nodesT { - allocation.Nodes[node.ClientID] = node - allocation.NodeIDs = append(allocation.NodeIDs, node.ClientID) - } - - t := rng.Int() % 10 - if t == 0 { - /* best-fit */ - ts := time.Now() - allocation.TasksOnNode = fastBestFit(nodesT, tasks).TasksOnNode - log.Println(time.Since(ts)) - //fmt.Println("Best Fit") - } else if t%2 == 0 { - /* first-fit */ - for _, task := range tasks { - if nodeID, ok := randomFit(allocation, task); ok { - allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) - for i := range allocation.Nodes[nodeID].Status { - if allocation.Nodes[nodeID].Status[i].MemoryAllocated == 0 { - allocation.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU - break - } - } - } else { - allocation.Flags["valid"] = false - } - } - } else { - /* random-fit */ - for _, task := range tasks { - if nodeID, ok := randomFit(allocation, task); ok { - allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) - for i := range allocation.Nodes[nodeID].Status { - if allocation.Nodes[nodeID].Status[i].MemoryAllocated == 0 { - allocation.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU - break - } - } - } else { - allocation.Flags["valid"] = false - } - } - } - //fmt.Println(evaluatue(allocation)) - //fmt.Println(allocation) - return allocation - - } - - // Find the minimum - err = ga.Minimize(f) - log.Println(time.Since(ts)) - log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) - //log.Println(ga.HallOfFame[0].Genome.(Allocation).Flags) - //log.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) - if err != nil { - log.Println(err) - return - } + InstanceOfResourcePool().init(Configuration{}) + allocatedNodes := InstanceOfResourcePool().acquireResource(Job{Tasks: tasks}) + log.Info(allocatedNodes) } diff --git a/src/main.go b/src/main.go index 009c19b..aa6c53b 100644 --- a/src/main.go +++ b/src/main.go @@ -267,6 +267,14 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { } break + case "allocator_update_strategy": + log.Debug("allocator_update_strategy") + strategy := r.URL.Query().Get("strategy") + js, _ := json.Marshal(InstanceOfAllocator().updateStrategy(strategy)) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + default: http.Error(w, "Not Found", http.StatusNotFound) break diff --git a/src/pool_status.go b/src/pool_status.go index 4cd0d2f..f86fd1c 100644 --- a/src/pool_status.go +++ b/src/pool_status.go @@ -41,8 +41,8 @@ type NodeStatus struct { Status []GPUStatus `json:"status"` } -func (X *NodeStatus) Copy() NodeStatus { - res := *X +func (X NodeStatus) Copy() NodeStatus { + res := X res.Status = make([]GPUStatus, len(X.Status)) copy(res.Status, X.Status) return res diff --git a/src/pool_test.go b/src/pool_test.go index c047859..734cf47 100644 --- a/src/pool_test.go +++ b/src/pool_test.go @@ -3,7 +3,7 @@ package main import ( "testing" "strconv" - "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "time" ) @@ -20,10 +20,10 @@ func TestPool(t *testing.T) { count := 0 for _, seg := range InstanceOfResourcePool().pools { - logrus.Info(seg.ID, "<--->", len(seg.Nodes), " ", seg.Nodes == nil, " Next:", seg.Next.ID) + log.Info(seg.ID, "<--->", len(seg.Nodes), " ", seg.Nodes == nil, " Next:", seg.Next.ID) count += len(seg.Nodes) } - logrus.Info(count) + log.Info(count) counter := map[int]int{} for i := 0; i < 1000; i++ { @@ -52,5 +52,6 @@ func TestAllocate(t *testing.T) { tasks = append(tasks, task) job.Tasks = tasks - InstanceOfResourcePool().acquireResource(job) + allocation := InstanceOfResourcePool().acquireResource(job) + log.Info(allocation) } diff --git a/src/resource_pool.go b/src/resource_pool.go index 22d6bb5..cb0d023 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -821,7 +821,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { } } - allocation := fastBestFit(nodesT, tasks) + allocation := InstanceOfAllocator().allocate(nodesT, tasks) if allocation.Flags["valid"] { for range job.Tasks { //append would cause uncertain order @@ -897,8 +897,8 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) { defer seg.Lock.Unlock() node, ok := seg.Nodes[agent.ClientID] + /* in case node is offline */ if !ok { - /* in case node is offline */ /* TODO, update usingTotalGPU correctly */ return }