diff --git a/src/ga.go b/src/ga.go index 515a938..a83c179 100644 --- a/src/ga.go +++ b/src/ga.go @@ -121,7 +121,7 @@ type Node struct { //Status []GPUStatus `json:"status"` } -type Task struct { +type Task3 struct { Name string `json:"name"` Image string `json:"image"` Cmd string `json:"cmd"` @@ -189,13 +189,13 @@ func fastBestFit(nodes []Node, tasks []Task) Allocation { minCost = cost nodeID = node.ClientID } - fmt.Println(cost) + //fmt.Println(cost) } if nodeID == "" { allocation.Flags["valid"] = false break } else { - fmt.Println(task, nodeID, allocation.TasksOnNode, minCost) + //fmt.Println(task, nodeID, allocation.TasksOnNode, minCost) allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) eva.add(nodesMap[nodeID], task) } @@ -387,7 +387,7 @@ func (X Allocation) Mutate(rng *rand.Rand) { } //fmt.Println("After", X) - /* exchange tasks */ + /* move tasks */ if !X.Flags["valid"] { //fmt.Println("Invalid allocation") return @@ -397,10 +397,20 @@ func (X Allocation) Mutate(rng *rand.Rand) { nodeIDs = append(nodeIDs, nodeID) } randIndex1 := rng.Intn(len(nodeIDs)) - randIndex2 := rng.Intn(len(nodeIDs)) nodeID1 := nodeIDs[randIndex1] - nodeID2 := nodeIDs[randIndex2] - X.TasksOnNode[nodeID1], X.TasksOnNode[nodeID2] = X.TasksOnNode[nodeID2], X.TasksOnNode[nodeID1] + if tasks, ok := X.TasksOnNode[nodeID1]; ok && len(tasks) > 0 { + idx := rng.Intn(len(tasks)) + task := tasks[idx] + copy(X.TasksOnNode[nodeID1][idx:], X.TasksOnNode[nodeID1][idx+1:]) + X.TasksOnNode[nodeID1] = X.TasksOnNode[nodeID1][:len(X.TasksOnNode[nodeID1])-1] + + if nodeID, ok := firstFit(X, task); ok { + X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task) + } else { + X.Flags["valid"] = false + } + } + } // Crossover a Vector with another Vector by applying uniform crossover. @@ -531,7 +541,7 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome { } t := rng.Int() % 10 - if t == 0 { + if t == -1 { /* best-fit */ ts := time.Now() @@ -572,7 +582,7 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome { return allocation } -func main() { +func main3() { numTask := 5 nodesMap = map[string]Node{} @@ -583,12 +593,12 @@ func main() { node.NumCPU = 24 node.MemTotal = 188 node.TotalBW = 100 - node.numberGPU = rand.Intn(8) + 1 + node.numberGPU = rand.Intn(3) + 1 nodesMap[strconv.Itoa(i)] = node } for i := 0; i < numTask; i++ { isPS := false - if i%5 == 0 { + if i>= 3 { isPS = true } task := Task{Name: strconv.Itoa(i), IsPS: isPS} @@ -608,7 +618,7 @@ func main() { tasks = append(tasks, task) } s := time.Now() - fmt.Println(fastBestFit(nodes, tasks)) + allocation := fastBestFit(nodes, tasks) fmt.Println(time.Since(s)) // Instantiate a GA with a GAConfig @@ -635,8 +645,8 @@ func main() { ga.EarlyStop = func(ga *eaopt.GA) bool { gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness) - if gap <= 0.000001 { - if count >= 30 || time.Since(ts) > time.Second*30 { + if gap <= 0.000001 || ga.HallOfFame[0].Fitness >= bestFitness { + if count >= 50 || time.Since(ts) > time.Second*30 { fmt.Println("Early Stop") return true } else { @@ -652,10 +662,12 @@ func main() { // Find the minimum err = ga.Minimize(VectorFactory) fmt.Println(time.Since(ts)) - //fmt.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) + fmt.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) //fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) if err != nil { fmt.Println(err) return } + + fmt.Println(allocation) } diff --git a/src/job_manager.go b/src/job_manager.go index d864523..efc554f 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -154,62 +154,7 @@ func (jm *JobManager) start() { /* monitor job execution */ for { - res := jm.status() - flag := false - onlyPS := true - for i := range res.Status { - if res.Status[i].Status == "ready" { - log.Debug(jm.job.Name, "-", i, " is ready to run") - flag = true - if !jm.job.Tasks[i].IsPS { - onlyPS = false - } - } else if res.Status[i].Status == "running" { - log.Debug(jm.job.Name, "-", i, " is running") - flag = true - if !jm.job.Tasks[i].IsPS { - onlyPS = false - } - InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) - } else { - log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status) - if exitCode, ok := res.Status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS { - if exitCode != 0 && !jm.killedFlag { - log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) - jm.killedFlag = true - jm.scheduler.UpdateProgress(jm.job, Failed) - } - } - - /* remove exited containers */ - //v := url.Values{} - //v.Set("id", res.Status[i].Id) - // - //_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") - //if err != nil { - // log.Warn(err.Error()) - // continue - //} - - /* return resource */ - if jm.resources[i].ClientID != "null" { - jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) - log.Info("return resource ", jm.resources[i].ClientID) - jm.resources[i].ClientID = "null" - - for _, t := range jm.resources[i].Status { - jm.scheduler.Detach(t.UUID, jm.job) - } - - InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) - } - } - } - if flag && onlyPS { - jm.stop() - log.Info("Only PS is running, stop ", jm.job.Name) - jm.killedFlag = false - } + flag := jm.checkStatus() if !flag { break } @@ -225,6 +170,66 @@ func (jm *JobManager) start() { log.Info("JobMaster exited ", jm.job.Name) } +func (jm *JobManager) checkStatus() bool { + res := jm.status() + flag := false + onlyPS := true + for i := range res.Status { + if res.Status[i].Status == "ready" { + log.Debug(jm.job.Name, "-", i, " is ready to run") + flag = true + if !jm.job.Tasks[i].IsPS { + onlyPS = false + } + } else if res.Status[i].Status == "running" { + log.Debug(jm.job.Name, "-", i, " is running") + flag = true + if !jm.job.Tasks[i].IsPS { + onlyPS = false + } + InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) + } else { + log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status) + if exitCode, ok := res.Status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS { + if exitCode != 0 && !jm.killedFlag { + log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) + jm.killedFlag = true + jm.scheduler.UpdateProgress(jm.job, Failed) + } + } + + /* remove exited containers */ + //v := url.Values{} + //v.Set("id", res.Status[i].Id) + // + //_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + //if err != nil { + // log.Warn(err.Error()) + // continue + //} + + /* return resource */ + if jm.resources[i].ClientID != "null" { + jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) + log.Info("return resource ", jm.resources[i].ClientID) + jm.resources[i].ClientID = "null" + + for _, t := range jm.resources[i].Status { + jm.scheduler.Detach(t.UUID, jm.job) + } + + InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) + } + } + } + if flag && onlyPS { + jm.stop() + log.Info("Only PS is running, stop ", jm.job.Name) + jm.killedFlag = false + } + return flag +} + func (jm *JobManager) logs(taskName string) MsgLog { spider := Spider{} spider.Method = "GET" diff --git a/src/resource_pool.go b/src/resource_pool.go index 8ea3448..cd74d6a 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -38,6 +38,9 @@ type ResourcePool struct { TotalGPU int TotalGPUMu sync.Mutex + + subscriptions map[string]map[string]int + subscriptionsMu sync.Mutex } func (pool *ResourcePool) start() { @@ -50,6 +53,8 @@ func (pool *ResourcePool) start() { pool.bindings = map[string]map[string]int{} pool.utils = map[string][]UtilGPUTimeSeries{} + pool.subscriptions = map[string]map[string]int{} + pool.TotalGPU = 0 /* init pools */ @@ -233,6 +238,8 @@ func (pool *ResourcePool) update(node NodeStatus) { /* init bindings */ go func(node NodeStatus) { + pool.subscriptionsMu.Lock() + defer pool.subscriptionsMu.Unlock() pool.bindingsMu.Lock() defer pool.bindingsMu.Unlock() for _, gpu := range node.Status { @@ -242,6 +249,12 @@ func (pool *ResourcePool) update(node NodeStatus) { UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU}) } } + + if _, ok := pool.subscriptions[gpu.UUID]; ok { + for jobName := range pool.subscriptions[gpu.UUID] { + scheduler.QueryState(jobName) + } + } } pool.heartBeatMu.Lock() pool.heartBeat[node.ClientID] = time.Now() @@ -438,8 +451,16 @@ func (pool *ResourcePool) releaseNetwork(network string) { } func (pool *ResourcePool) attach(GPU string, job string) { + pool.subscriptionsMu.Lock() + defer pool.subscriptionsMu.Unlock() pool.bindingsMu.Lock() defer pool.bindingsMu.Unlock() + + if _, ok := pool.subscriptions[GPU]; !ok { + pool.subscriptions[GPU] = map[string]int{} + } + pool.subscriptions[GPU][job] = int(time.Now().Unix()) + if _, ok := pool.bindings[GPU]; !ok { pool.bindings[GPU] = map[string]int{} } @@ -455,8 +476,15 @@ func (pool *ResourcePool) attach(GPU string, job string) { } func (pool *ResourcePool) detach(GPU string, job Job) { + pool.subscriptionsMu.Lock() + defer pool.subscriptionsMu.Unlock() pool.bindingsMu.Lock() defer pool.bindingsMu.Unlock() + + if _, ok := pool.subscriptions[GPU]; ok { + delete(pool.subscriptions[GPU], job.Name) + } + if _, ok := pool.bindings[GPU]; ok { if _, ok2 := pool.utils[GPU]; ok2 { if len(pool.bindings[GPU]) == 1 && job.Status != Failed && job.Status != Stopped {