From cfd41dae413465765bd17ace17328da6346aa48d Mon Sep 17 00:00:00 2001 From: Newnius Date: Mon, 25 May 2020 20:50:41 +0800 Subject: [PATCH] fix deadline --- src/ga.go | 96 +------------------------------------------- src/resource_pool.go | 55 +++++++------------------ 2 files changed, 15 insertions(+), 136 deletions(-) diff --git a/src/ga.go b/src/ga.go index 39e6f82..472b700 100644 --- a/src/ga.go +++ b/src/ga.go @@ -4,8 +4,7 @@ import ( "math/rand" "github.com/MaxHalford/eaopt" "time" - "strconv" - "math" + "math" log "github.com/sirupsen/logrus" ) @@ -391,96 +390,3 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome { //fmt.Println(allocation) return allocation } - -func testGA() { - numTask := 20 - - nodesMap = map[string]NodeStatus{} - tasksMap = map[string]Task{} - - for i := 0; i < numTask*3; i++ { - node := NodeStatus{ClientID: strconv.Itoa(i), Rack: strconv.Itoa(i % 40), Domain: strconv.Itoa(i % 4)} - node.NumCPU = 24 - node.MemTotal = 188 - node.TotalBW = 100 - cnt := rand.Intn(3) + 1 - for i := 0; i < cnt; i++ { - node.Status = append(node.Status, GPUStatus{MemoryTotal: 11439, MemoryAllocated: 0, UUID: node.ClientID + strconv.Itoa(i)}) - } - nodesMap[strconv.Itoa(i)] = node - } - for i := 0; i < numTask; i++ { - isPS := false - if i >= 3 { - isPS = true - } - task := Task{Name: strconv.Itoa(i), IsPS: isPS} - task.Memory = 4 - task.NumberCPU = 2 - task.NumberGPU = 1 - tasksMap[strconv.Itoa(i)] = task - } - - var nodes []NodeStatus - var tasks []Task - - for _, node := range nodesMap { - nodes = append(nodes, node) - } - for _, task := range tasksMap { - tasks = append(tasks, task) - } - s := time.Now() - allocation := fastBestFit(nodes, tasks) - log.Println(time.Since(s)) - - // Instantiate a GA with a GAConfig - var ga, err = eaopt.NewDefaultGAConfig().NewGA() - if err != nil { - log.Println(err) - return - } - - // Set the number of generations to run for - ga.NGenerations = math.MaxInt32 - ga.NPops = 1 - ga.PopSize = 30 + uint(numTask/2) - - // Add a custom print function to track progress - ga.Callback = func(ga *eaopt.GA) { - log.Printf("Best fitness at generation %d: %f\n", ga.Generations, ga.HallOfFame[0].Fitness) - } - - bestFitness := math.MaxFloat64 - count := 0 - - ts := time.Now() - - ga.EarlyStop = func(ga *eaopt.GA) bool { - gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness) - if gap <= 0.000001 || ga.HallOfFame[0].Fitness >= bestFitness { - if count >= 30 || time.Since(ts) > time.Second*30 { - log.Println("Early Stop") - return true - } else { - count++ - } - } else { - bestFitness = ga.HallOfFame[0].Fitness - count = 1 - } - return false - } - - // Find the minimum - err = ga.Minimize(VectorFactory) - log.Println(time.Since(ts)) - log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) - //fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) - if err != nil { - log.Println(err) - return - } - - log.Println(allocation) -} diff --git a/src/resource_pool.go b/src/resource_pool.go index 4d16c4a..241882a 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -32,32 +32,29 @@ type ResourcePool struct { history []PoolStatus - heartBeat map[string]time.Time - heartBeatMu sync.Mutex + heartBeat map[string]time.Time + heartBeatMu sync.Mutex + versions map[string]float64 + versionsMu sync.Mutex + counter int + counterTotal int + + subscriptions map[string]map[string]int + subscriptionsMu sync.Mutex networks map[string]bool networksFree map[string]bool networkMu sync.Mutex - versions map[string]float64 - versionsMu sync.Mutex - - counter int - counterTotal int - bindings map[string]map[string]int bindingsMu sync.Mutex utils map[string][]UtilGPUTimeSeries TotalGPU int TotalGPUMu sync.Mutex - UsingGPU int UsingGPUMu sync.Mutex - subscriptions map[string]map[string]int - subscriptionsMu sync.Mutex - enableShare bool enableShareRatio float64 enablePreSchedule bool @@ -70,12 +67,9 @@ func (pool *ResourcePool) init(conf Configuration) { pool.networks = map[string]bool{} pool.networksFree = map[string]bool{} - pool.versions = map[string]float64{} pool.bindings = map[string]map[string]int{} pool.utils = map[string][]UtilGPUTimeSeries{} - pool.subscriptions = map[string]map[string]int{} - pool.TotalGPU = 0 pool.UsingGPU = 0 @@ -105,6 +99,8 @@ func (pool *ResourcePool) init(conf Configuration) { } } + pool.versions = map[string]float64{} + pool.subscriptions = map[string]map[string]int{} pool.heartBeat = map[string]time.Time{} go func() { pool.checkDeadNodes() @@ -114,7 +110,6 @@ func (pool *ResourcePool) init(conf Configuration) { go func() { pool.saveStatusHistory() }() - } /* check dead nodes periodically */ @@ -313,6 +308,7 @@ func (pool *ResourcePool) update(node NodeStatus) { pool.TotalGPUMu.Lock() pool.TotalGPU += len(node.Status) pool.TotalGPUMu.Unlock() + log.Info("node ", node.ClientID, " is online") } seg.Nodes[node.ClientID] = &node if len(seg.Nodes) > 10 { @@ -519,7 +515,7 @@ func (pool *ResourcePool) detach(GPU string, job Job) { if _, ok := pool.bindings[GPU]; ok { if _, ok2 := pool.utils[GPU]; ok2 { - if len(pool.bindings[GPU]) == 1 && job.Status != Failed && job.Status != Stopped { + if len(pool.bindings[GPU]) == 1 && job.Status == Finished { InstanceOfOptimizer().feed(job.Name, pool.utils[GPU]) } delete(pool.utils, GPU) @@ -532,30 +528,7 @@ func (pool *ResourcePool) detach(GPU string, job Job) { } func (pool *ResourcePool) countGPU() (int, int) { - FreeGPU := 0 - UsingGPU := 0 - start := &pool.pools[0] - if start.Nodes == nil { - start = start.Next - } - for cur := start; ; { - cur.Lock.Lock() - for _, node := range cur.Nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ - } - } - } - cur.Lock.Unlock() - cur = cur.Next - if cur.ID == start.ID { - break - } - } - return FreeGPU, UsingGPU + return pool.TotalGPU - pool.UsingGPU, pool.TotalGPU } func (pool *ResourcePool) getBindings() map[string]map[string]int {