1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 08:16:43 +00:00
This commit is contained in:
2020-05-23 21:06:31 +08:00
parent 655cf79c00
commit ea2718fe4f
3 changed files with 116 additions and 71 deletions

View File

@@ -121,7 +121,7 @@ type Node struct {
//Status []GPUStatus `json:"status"` //Status []GPUStatus `json:"status"`
} }
type Task struct { type Task3 struct {
Name string `json:"name"` Name string `json:"name"`
Image string `json:"image"` Image string `json:"image"`
Cmd string `json:"cmd"` Cmd string `json:"cmd"`
@@ -189,13 +189,13 @@ func fastBestFit(nodes []Node, tasks []Task) Allocation {
minCost = cost minCost = cost
nodeID = node.ClientID nodeID = node.ClientID
} }
fmt.Println(cost) //fmt.Println(cost)
} }
if nodeID == "" { if nodeID == "" {
allocation.Flags["valid"] = false allocation.Flags["valid"] = false
break break
} else { } else {
fmt.Println(task, nodeID, allocation.TasksOnNode, minCost) //fmt.Println(task, nodeID, allocation.TasksOnNode, minCost)
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task)
eva.add(nodesMap[nodeID], task) eva.add(nodesMap[nodeID], task)
} }
@@ -387,7 +387,7 @@ func (X Allocation) Mutate(rng *rand.Rand) {
} }
//fmt.Println("After", X) //fmt.Println("After", X)
/* exchange tasks */ /* move tasks */
if !X.Flags["valid"] { if !X.Flags["valid"] {
//fmt.Println("Invalid allocation") //fmt.Println("Invalid allocation")
return return
@@ -397,10 +397,20 @@ func (X Allocation) Mutate(rng *rand.Rand) {
nodeIDs = append(nodeIDs, nodeID) nodeIDs = append(nodeIDs, nodeID)
} }
randIndex1 := rng.Intn(len(nodeIDs)) randIndex1 := rng.Intn(len(nodeIDs))
randIndex2 := rng.Intn(len(nodeIDs))
nodeID1 := nodeIDs[randIndex1] nodeID1 := nodeIDs[randIndex1]
nodeID2 := nodeIDs[randIndex2] if tasks, ok := X.TasksOnNode[nodeID1]; ok && len(tasks) > 0 {
X.TasksOnNode[nodeID1], X.TasksOnNode[nodeID2] = X.TasksOnNode[nodeID2], X.TasksOnNode[nodeID1] idx := rng.Intn(len(tasks))
task := tasks[idx]
copy(X.TasksOnNode[nodeID1][idx:], X.TasksOnNode[nodeID1][idx+1:])
X.TasksOnNode[nodeID1] = X.TasksOnNode[nodeID1][:len(X.TasksOnNode[nodeID1])-1]
if nodeID, ok := firstFit(X, task); ok {
X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task)
} else {
X.Flags["valid"] = false
}
}
} }
// Crossover a Vector with another Vector by applying uniform crossover. // Crossover a Vector with another Vector by applying uniform crossover.
@@ -531,7 +541,7 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome {
} }
t := rng.Int() % 10 t := rng.Int() % 10
if t == 0 { if t == -1 {
/* best-fit */ /* best-fit */
ts := time.Now() ts := time.Now()
@@ -572,7 +582,7 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome {
return allocation return allocation
} }
func main() { func main3() {
numTask := 5 numTask := 5
nodesMap = map[string]Node{} nodesMap = map[string]Node{}
@@ -583,12 +593,12 @@ func main() {
node.NumCPU = 24 node.NumCPU = 24
node.MemTotal = 188 node.MemTotal = 188
node.TotalBW = 100 node.TotalBW = 100
node.numberGPU = rand.Intn(8) + 1 node.numberGPU = rand.Intn(3) + 1
nodesMap[strconv.Itoa(i)] = node nodesMap[strconv.Itoa(i)] = node
} }
for i := 0; i < numTask; i++ { for i := 0; i < numTask; i++ {
isPS := false isPS := false
if i%5 == 0 { if i>= 3 {
isPS = true isPS = true
} }
task := Task{Name: strconv.Itoa(i), IsPS: isPS} task := Task{Name: strconv.Itoa(i), IsPS: isPS}
@@ -608,7 +618,7 @@ func main() {
tasks = append(tasks, task) tasks = append(tasks, task)
} }
s := time.Now() s := time.Now()
fmt.Println(fastBestFit(nodes, tasks)) allocation := fastBestFit(nodes, tasks)
fmt.Println(time.Since(s)) fmt.Println(time.Since(s))
// Instantiate a GA with a GAConfig // Instantiate a GA with a GAConfig
@@ -635,8 +645,8 @@ func main() {
ga.EarlyStop = func(ga *eaopt.GA) bool { ga.EarlyStop = func(ga *eaopt.GA) bool {
gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness) gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness)
if gap <= 0.000001 { if gap <= 0.000001 || ga.HallOfFame[0].Fitness >= bestFitness {
if count >= 30 || time.Since(ts) > time.Second*30 { if count >= 50 || time.Since(ts) > time.Second*30 {
fmt.Println("Early Stop") fmt.Println("Early Stop")
return true return true
} else { } else {
@@ -652,10 +662,12 @@ func main() {
// Find the minimum // Find the minimum
err = ga.Minimize(VectorFactory) err = ga.Minimize(VectorFactory)
fmt.Println(time.Since(ts)) fmt.Println(time.Since(ts))
//fmt.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) fmt.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode)
//fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) //fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
} }
fmt.Println(allocation)
} }

View File

@@ -154,62 +154,7 @@ func (jm *JobManager) start() {
/* monitor job execution */ /* monitor job execution */
for { for {
res := jm.status() flag := jm.checkStatus()
flag := false
onlyPS := true
for i := range res.Status {
if res.Status[i].Status == "ready" {
log.Debug(jm.job.Name, "-", i, " is ready to run")
flag = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
} else if res.Status[i].Status == "running" {
log.Debug(jm.job.Name, "-", i, " is running")
flag = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
} else {
log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status)
if exitCode, ok := res.Status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS {
if exitCode != 0 && !jm.killedFlag {
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
jm.killedFlag = true
jm.scheduler.UpdateProgress(jm.job, Failed)
}
}
/* remove exited containers */
//v := url.Values{}
//v.Set("id", res.Status[i].Id)
//
//_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
//if err != nil {
// log.Warn(err.Error())
// continue
//}
/* return resource */
if jm.resources[i].ClientID != "null" {
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
log.Info("return resource ", jm.resources[i].ClientID)
jm.resources[i].ClientID = "null"
for _, t := range jm.resources[i].Status {
jm.scheduler.Detach(t.UUID, jm.job)
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
}
}
}
if flag && onlyPS {
jm.stop()
log.Info("Only PS is running, stop ", jm.job.Name)
jm.killedFlag = false
}
if !flag { if !flag {
break break
} }
@@ -225,6 +170,66 @@ func (jm *JobManager) start() {
log.Info("JobMaster exited ", jm.job.Name) log.Info("JobMaster exited ", jm.job.Name)
} }
func (jm *JobManager) checkStatus() bool {
res := jm.status()
flag := false
onlyPS := true
for i := range res.Status {
if res.Status[i].Status == "ready" {
log.Debug(jm.job.Name, "-", i, " is ready to run")
flag = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
} else if res.Status[i].Status == "running" {
log.Debug(jm.job.Name, "-", i, " is running")
flag = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
} else {
log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status)
if exitCode, ok := res.Status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS {
if exitCode != 0 && !jm.killedFlag {
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
jm.killedFlag = true
jm.scheduler.UpdateProgress(jm.job, Failed)
}
}
/* remove exited containers */
//v := url.Values{}
//v.Set("id", res.Status[i].Id)
//
//_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
//if err != nil {
// log.Warn(err.Error())
// continue
//}
/* return resource */
if jm.resources[i].ClientID != "null" {
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
log.Info("return resource ", jm.resources[i].ClientID)
jm.resources[i].ClientID = "null"
for _, t := range jm.resources[i].Status {
jm.scheduler.Detach(t.UUID, jm.job)
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
}
}
}
if flag && onlyPS {
jm.stop()
log.Info("Only PS is running, stop ", jm.job.Name)
jm.killedFlag = false
}
return flag
}
func (jm *JobManager) logs(taskName string) MsgLog { func (jm *JobManager) logs(taskName string) MsgLog {
spider := Spider{} spider := Spider{}
spider.Method = "GET" spider.Method = "GET"

View File

@@ -38,6 +38,9 @@ type ResourcePool struct {
TotalGPU int TotalGPU int
TotalGPUMu sync.Mutex TotalGPUMu sync.Mutex
subscriptions map[string]map[string]int
subscriptionsMu sync.Mutex
} }
func (pool *ResourcePool) start() { func (pool *ResourcePool) start() {
@@ -50,6 +53,8 @@ func (pool *ResourcePool) start() {
pool.bindings = map[string]map[string]int{} pool.bindings = map[string]map[string]int{}
pool.utils = map[string][]UtilGPUTimeSeries{} pool.utils = map[string][]UtilGPUTimeSeries{}
pool.subscriptions = map[string]map[string]int{}
pool.TotalGPU = 0 pool.TotalGPU = 0
/* init pools */ /* init pools */
@@ -233,6 +238,8 @@ func (pool *ResourcePool) update(node NodeStatus) {
/* init bindings */ /* init bindings */
go func(node NodeStatus) { go func(node NodeStatus) {
pool.subscriptionsMu.Lock()
defer pool.subscriptionsMu.Unlock()
pool.bindingsMu.Lock() pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock() defer pool.bindingsMu.Unlock()
for _, gpu := range node.Status { for _, gpu := range node.Status {
@@ -242,6 +249,12 @@ func (pool *ResourcePool) update(node NodeStatus) {
UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU}) UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU})
} }
} }
if _, ok := pool.subscriptions[gpu.UUID]; ok {
for jobName := range pool.subscriptions[gpu.UUID] {
scheduler.QueryState(jobName)
}
}
} }
pool.heartBeatMu.Lock() pool.heartBeatMu.Lock()
pool.heartBeat[node.ClientID] = time.Now() pool.heartBeat[node.ClientID] = time.Now()
@@ -438,8 +451,16 @@ func (pool *ResourcePool) releaseNetwork(network string) {
} }
func (pool *ResourcePool) attach(GPU string, job string) { func (pool *ResourcePool) attach(GPU string, job string) {
pool.subscriptionsMu.Lock()
defer pool.subscriptionsMu.Unlock()
pool.bindingsMu.Lock() pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock() defer pool.bindingsMu.Unlock()
if _, ok := pool.subscriptions[GPU]; !ok {
pool.subscriptions[GPU] = map[string]int{}
}
pool.subscriptions[GPU][job] = int(time.Now().Unix())
if _, ok := pool.bindings[GPU]; !ok { if _, ok := pool.bindings[GPU]; !ok {
pool.bindings[GPU] = map[string]int{} pool.bindings[GPU] = map[string]int{}
} }
@@ -455,8 +476,15 @@ func (pool *ResourcePool) attach(GPU string, job string) {
} }
func (pool *ResourcePool) detach(GPU string, job Job) { func (pool *ResourcePool) detach(GPU string, job Job) {
pool.subscriptionsMu.Lock()
defer pool.subscriptionsMu.Unlock()
pool.bindingsMu.Lock() pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock() defer pool.bindingsMu.Unlock()
if _, ok := pool.subscriptions[GPU]; ok {
delete(pool.subscriptions[GPU], job.Name)
}
if _, ok := pool.bindings[GPU]; ok { if _, ok := pool.bindings[GPU]; ok {
if _, ok2 := pool.utils[GPU]; ok2 { if _, ok2 := pool.utils[GPU]; ok2 {
if len(pool.bindings[GPU]) == 1 && job.Status != Failed && job.Status != Stopped { if len(pool.bindings[GPU]) == 1 && job.Status != Failed && job.Status != Stopped {