diff --git a/src/collector.go b/src/collector.go index e6ecc4a..49950aa 100644 --- a/src/collector.go +++ b/src/collector.go @@ -11,7 +11,7 @@ import ( var collectorInstance *Collector var collectorInstanceLock sync.Mutex -func InstanceOfColector() *Collector { +func InstanceOfCollector() *Collector { defer collectorInstanceLock.Unlock() collectorInstanceLock.Lock() diff --git a/src/ga.go b/src/ga.go index 7ba1a6e..39e6f82 100644 --- a/src/ga.go +++ b/src/ga.go @@ -1,142 +1,21 @@ package main import ( - "fmt" "math/rand" "github.com/MaxHalford/eaopt" "time" "strconv" "math" + log "github.com/sirupsen/logrus" ) -type Evaluator struct { - domains map[string]map[string]int - racks map[string]map[string]int - nodes map[string]map[string]int - upstreams map[string]string - cost float64 - totalPS int - totalWorker int - - costNetwork float64 - - factorNode float64 - factorRack float64 - factorDomain float64 -} - -func (eva *Evaluator) init(nodes []Node, tasks []Task) { - eva.domains = map[string]map[string]int{} - eva.racks = map[string]map[string]int{} - eva.nodes = map[string]map[string]int{} - eva.upstreams = map[string]string{} - eva.totalPS = 0 - eva.totalWorker = 0 - eva.factorNode = 1.0 - eva.factorRack = 4.0 - eva.factorDomain = 40.0 - eva.cost = 0.0 - eva.costNetwork = 0.0 -} - -func (eva *Evaluator) add(node Node, task Task) { - /* update node load cost */ - - /* update network cost */ - if _, ok := eva.nodes[node.ClientID]; !ok { - eva.nodes[node.ClientID] = map[string]int{"PS": 0, "Worker": 0} - } - if _, ok := eva.racks[node.Rack]; !ok { - eva.racks[node.Rack] = map[string]int{"PS": 0, "Worker": 0} - } - if _, ok := eva.domains[node.Domain]; !ok { - eva.domains[node.Domain] = map[string]int{"PS": 0, "Worker": 0} - } - if task.IsPS { - eva.costNetwork += eva.factorNode * float64(eva.racks[node.Rack]["Worker"]-eva.nodes[node.ClientID]["Worker"]) - eva.costNetwork += eva.factorRack * float64(eva.domains[node.Domain]["Worker"]-eva.racks[node.Rack]["Worker"]) - eva.costNetwork += eva.factorDomain * float64(eva.totalWorker-eva.domains[node.Domain]["Worker"]) - - eva.nodes[node.ClientID]["PS"]++ - eva.racks[node.Rack]["PS"]++ - eva.domains[node.Domain]["PS"]++ - eva.totalPS++ - } else { - eva.costNetwork += eva.factorNode * float64(eva.racks[node.Rack]["PS"]-eva.nodes[node.ClientID]["PS"]) - eva.costNetwork += eva.factorRack * float64(eva.domains[node.Domain]["PS"]-eva.racks[node.Rack]["PS"]) - eva.costNetwork += eva.factorDomain * float64(eva.totalPS-eva.domains[node.Domain]["PS"]) - - eva.nodes[node.ClientID]["Worker"]++ - eva.racks[node.Rack]["Worker"]++ - eva.domains[node.Domain]["Worker"]++ - eva.totalWorker++ - } - eva.cost = eva.costNetwork -} - -func (eva *Evaluator) remove(node Node, task Task) { - if task.IsPS { - eva.costNetwork -= eva.factorNode * float64(eva.racks[node.Rack]["Worker"]-eva.nodes[node.ClientID]["Worker"]) - eva.costNetwork -= eva.factorRack * float64(eva.domains[node.Domain]["Worker"]-eva.racks[node.Rack]["Worker"]) - eva.costNetwork -= eva.factorDomain * float64(eva.totalWorker-eva.domains[node.Domain]["Worker"]) - - eva.nodes[node.ClientID]["PS"]-- - eva.racks[node.Rack]["PS"]-- - eva.domains[node.Domain]["PS"]-- - eva.totalPS-- - } else { - eva.costNetwork -= eva.factorNode * float64(eva.racks[node.Rack]["PS"]-eva.nodes[node.ClientID]["PS"]) - eva.costNetwork -= eva.factorRack * float64(eva.domains[node.Domain]["PS"]-eva.racks[node.Rack]["PS"]) - eva.costNetwork -= eva.factorDomain * float64(eva.totalPS-eva.domains[node.Domain]["PS"]) - - //fmt.Println(eva.totalWorker, eva.domains[node.Domain]) - - eva.nodes[node.ClientID]["Worker"]-- - eva.racks[node.Rack]["Worker"]-- - eva.domains[node.Domain]["Worker"]-- - eva.totalWorker-- - } - eva.cost = eva.costNetwork -} - -func (eva *Evaluator) calculate() float64 { - return eva.cost -} - -var nodesMap map[string]Node +var nodesMap map[string]NodeStatus var tasksMap map[string]Task -type Node struct { - ClientID string `json:"id"` - Domain string `json:"domain"` - Rack string `json:"rack"` - Version float64 `json:"version"` - NumCPU int `json:"cpu_num"` - UtilCPU float64 `json:"cpu_load"` - MemTotal int `json:"mem_total"` - MemAvailable int `json:"mem_available"` - UsingBW float64 `json:"bw_using"` - TotalBW float64 `json:"bw_total"` - numberGPU int - //Status []GPUStatus `json:"status"` -} - -type Task3 struct { - Name string `json:"name"` - Image string `json:"image"` - Cmd string `json:"cmd"` - NumberCPU int `json:"cpu_number"` - Memory int `json:"memory"` - NumberGPU int `json:"gpu_number"` - MemoryGPU int `json:"gpu_memory"` - IsPS bool `json:"is_ps"` - ModelGPU string `json:"gpu_model"` -} - -// An valid allocation +// A resource allocation type Allocation struct { TasksOnNode map[string][]Task // tasks on nodes[id] - Nodes map[string]Node + Nodes map[string]NodeStatus NodeIDs []string Flags map[string]bool Evaluator Evaluator @@ -146,7 +25,13 @@ func randomFit(allocation Allocation, task Task) (string, bool) { flag := false nodeID := "" for nodeID = range allocation.Nodes { - if node, ok := allocation.Nodes[nodeID]; ok && len(allocation.TasksOnNode[nodeID]) < node.numberGPU { + numberGPU := 0 + for _, gpu := range allocation.Nodes[nodeID].Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 0 + } + } + if _, ok := allocation.Nodes[nodeID]; ok && len(allocation.TasksOnNode[nodeID]) < numberGPU { flag = true break } @@ -158,7 +43,13 @@ func firstFit(allocation Allocation, task Task) (string, bool) { flag := false nodeID := "" for _, nodeID = range allocation.NodeIDs { - if node, ok := allocation.Nodes[nodeID]; ok && len(allocation.TasksOnNode[nodeID]) < node.numberGPU { + numberGPU := 0 + for _, gpu := range allocation.Nodes[nodeID].Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 0 + } + } + if _, ok := allocation.Nodes[nodeID]; ok && len(allocation.TasksOnNode[nodeID]) < numberGPU { flag = true break } @@ -166,7 +57,7 @@ func firstFit(allocation Allocation, task Task) (string, bool) { return nodeID, flag } -func fastBestFit(nodes []Node, tasks []Task) Allocation { +func fastBestFit(nodes []NodeStatus, tasks []Task) Allocation { eva := Evaluator{} eva.init(nodes, tasks) @@ -179,7 +70,13 @@ func fastBestFit(nodes []Node, tasks []Task) Allocation { if _, ok := allocation.TasksOnNode[node.ClientID]; !ok { allocation.TasksOnNode[node.ClientID] = []Task{} } - if len(allocation.TasksOnNode[node.ClientID]) >= node.numberGPU { + numberGPU := 0 + for _, gpu := range allocation.Nodes[nodeID].Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 0 + } + } + if len(allocation.TasksOnNode[node.ClientID]) >= numberGPU { continue } eva.add(node, task) @@ -200,7 +97,7 @@ func fastBestFit(nodes []Node, tasks []Task) Allocation { eva.add(nodesMap[nodeID], task) } } - fmt.Println(eva.calculate()) + log.Println(eva.calculate()) return allocation } @@ -209,12 +106,18 @@ func bestFit(allocation Allocation, task Task) (string, bool) { nodeID := "" minCost := math.MaxFloat64 for _, id := range allocation.NodeIDs { - if node, ok := allocation.Nodes[id]; ok && len(allocation.TasksOnNode[id]) < node.numberGPU { + numberGPU := 0 + for _, gpu := range allocation.Nodes[id].Status { + if gpu.MemoryAllocated == 0 { + numberGPU += 0 + } + } + if _, ok := allocation.Nodes[id]; ok && len(allocation.TasksOnNode[id]) < numberGPU { /* add */ allocation.TasksOnNode[id] = append(allocation.TasksOnNode[id], task) /* evaluate */ - cost := evaluatue(allocation) + cost := evaluate(allocation) /* revert */ idx := -1 @@ -236,99 +139,6 @@ func bestFit(allocation Allocation, task Task) (string, bool) { return nodeID, flag } -func evaluatue(allocation Allocation) float64 { - /* Calculate cost for network */ - costNetwork := 0.0 - domains := map[string]map[string]int{} - racks := map[string]map[string]int{} - upstreams := map[string]string{} - totalPS := 0 - totalWorker := 0 - - taskToNode := map[string]string{} - for nodeID, tasks := range allocation.TasksOnNode { - numPS := 0 - numWorker := 0 - node := allocation.Nodes[nodeID] - for _, task := range tasks { - taskToNode[task.Name] = nodeID - - if _, ok := domains[node.Domain]; !ok { - domains[node.Domain] = map[string]int{"PS": 0, "Worker": 0} - } - if _, ok := racks[node.Rack]; !ok { - racks[node.Rack] = map[string]int{"PS": 0, "Worker": 0} - } - - if task.IsPS { - domains[node.Domain]["PS"]++ - racks[node.Rack]["PS"]++ - numPS++ - totalPS++ - } else { - domains[node.Domain]["Worker"]++ - racks[node.Rack]["Worker"]++ - numWorker++ - totalWorker++ - } - upstreams[node.Rack] = node.Domain - } - costNetwork -= float64(numPS * numWorker) - } - - /* in the same domain */ - for rackID, pair := range racks { - // in the same rack - costNetwork += float64(pair["PS"]*pair["Worker"]) * 1.0 - // cross rack, but in the same domain - costNetwork += float64(pair["PS"]*(domains[upstreams[rackID]]["Worker"]-pair["Worker"])) * 4.0 - } - - /* across domain */ - for _, pair := range domains { - costNetwork += float64(pair["PS"]*(totalWorker-pair["Worker"])) * 40.0 - } - - /* calculate cost for node fitness */ - //cpu, memory, bw - costLB := 0.0 - for nodeID, tasks := range allocation.TasksOnNode { - costCPU := 0.0 - costMem := 0.0 - costBW := 0.0 - costGPU := 0.0 - requestCPU := 0 - requestMem := 0 - requestBW := 0.0 - requestGPU := 0 - numberPS := 0 - numberWorker := 0 - for _, task := range tasks { - requestCPU += task.NumberCPU - requestMem += task.Memory - requestGPU += task.NumberGPU - if task.IsPS { - numberPS++ - } else { - numberWorker++ - } - } - requestBW = float64(numberPS*(totalWorker-numberWorker) + numberWorker*(totalPS-numberPS)) - node := allocation.Nodes[nodeID] - costCPU += (float64(requestCPU) + node.UtilCPU) / float64(node.NumCPU) * 1.0 - costMem += (float64(requestMem + (node.MemTotal - node.MemAvailable))) / float64(node.MemTotal) * 1.0 - costBW += (float64(requestBW) + (node.TotalBW - node.UsingBW)) / node.TotalBW * 2.0 - costGPU += (float64(requestGPU + node.numberGPU)) / float64(node.numberGPU) * 3.0 - costLB += (costCPU + costMem + costBW + costGPU) / (1.0 + 1.0 + 2.0 + 3.0) - } - costLB /= float64(len(allocation.TasksOnNode)) - costLB *= 100 - //fmt.Println(costLB) - - cost := 0.0*costLB + 1.0*costNetwork - return cost -} - /* Evaluate the allocation */ func (X Allocation) Evaluate() (float64, error) { if !X.Flags["valid"] { @@ -336,7 +146,7 @@ func (X Allocation) Evaluate() (float64, error) { return math.MaxFloat64, nil } - costNetwork := evaluatue(X) + costNetwork := evaluate(X) cost := costNetwork //fmt.Println(taskToNode, cost, len(X.Nodes)) @@ -443,7 +253,7 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { idx := -1 nodeID2, ok := taskToNode[task.Name] if !ok { - fmt.Println("Error", taskToNode, X.TasksOnNode, task.Name) + log.Println("Error", taskToNode, X.TasksOnNode, task.Name) } for i, task2 := range X.TasksOnNode[nodeID2] { if task2.Name == task.Name { @@ -451,7 +261,7 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) { } } if idx == -1 { - fmt.Println("Error 2", taskToNode, X.TasksOnNode, task.Name) + log.Println("Error 2", taskToNode, X.TasksOnNode, task.Name) } //fmt.Println(X.TasksOnNode) copy(X.TasksOnNode[nodeID2][idx:], X.TasksOnNode[nodeID2][idx+1:]) @@ -496,7 +306,7 @@ func (X Allocation) Clone() eaopt.Genome { if !X.Flags["valid"] { //fmt.Println(X.Valid) } - Y := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]Node{}, Flags: map[string]bool{"valid": X.Flags["valid"]}} + Y := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": X.Flags["valid"]}} for id, node := range X.Nodes { Y.Nodes[id] = node Y.NodeIDs = append(Y.NodeIDs, node.ClientID) @@ -512,9 +322,9 @@ func (X Allocation) Clone() eaopt.Genome { } func VectorFactory(rng *rand.Rand) eaopt.Genome { - allocation := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]Node{}, Flags: map[string]bool{"valid": true}} + allocation := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": true}} - var nodes []Node + var nodes []NodeStatus var tasks []Task for _, node := range nodesMap { @@ -556,7 +366,7 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome { */ allocation.TasksOnNode = fastBestFit(nodes, tasks).TasksOnNode - fmt.Println(time.Since(ts)) + log.Println(time.Since(ts)) //fmt.Println("Best Fit") } else if t%2 == 0 { /* first-fit */ @@ -582,23 +392,26 @@ func VectorFactory(rng *rand.Rand) eaopt.Genome { return allocation } -func main3() { +func testGA() { numTask := 20 - nodesMap = map[string]Node{} + nodesMap = map[string]NodeStatus{} tasksMap = map[string]Task{} for i := 0; i < numTask*3; i++ { - node := Node{ClientID: strconv.Itoa(i), Rack: strconv.Itoa(i % 40), Domain: strconv.Itoa(i % 4)} + node := NodeStatus{ClientID: strconv.Itoa(i), Rack: strconv.Itoa(i % 40), Domain: strconv.Itoa(i % 4)} node.NumCPU = 24 node.MemTotal = 188 node.TotalBW = 100 - node.numberGPU = rand.Intn(3) + 1 + cnt := rand.Intn(3) + 1 + for i := 0; i < cnt; i++ { + node.Status = append(node.Status, GPUStatus{MemoryTotal: 11439, MemoryAllocated: 0, UUID: node.ClientID + strconv.Itoa(i)}) + } nodesMap[strconv.Itoa(i)] = node } for i := 0; i < numTask; i++ { isPS := false - if i>= 3 { + if i >= 3 { isPS = true } task := Task{Name: strconv.Itoa(i), IsPS: isPS} @@ -608,7 +421,7 @@ func main3() { tasksMap[strconv.Itoa(i)] = task } - var nodes []Node + var nodes []NodeStatus var tasks []Task for _, node := range nodesMap { @@ -619,12 +432,12 @@ func main3() { } s := time.Now() allocation := fastBestFit(nodes, tasks) - fmt.Println(time.Since(s)) + log.Println(time.Since(s)) // Instantiate a GA with a GAConfig var ga, err = eaopt.NewDefaultGAConfig().NewGA() if err != nil { - fmt.Println(err) + log.Println(err) return } @@ -635,7 +448,7 @@ func main3() { // Add a custom print function to track progress ga.Callback = func(ga *eaopt.GA) { - fmt.Printf("Best fitness at generation %d: %f\n", ga.Generations, ga.HallOfFame[0].Fitness) + log.Printf("Best fitness at generation %d: %f\n", ga.Generations, ga.HallOfFame[0].Fitness) } bestFitness := math.MaxFloat64 @@ -647,7 +460,7 @@ func main3() { gap := math.Abs(ga.HallOfFame[0].Fitness - bestFitness) if gap <= 0.000001 || ga.HallOfFame[0].Fitness >= bestFitness { if count >= 30 || time.Since(ts) > time.Second*30 { - fmt.Println("Early Stop") + log.Println("Early Stop") return true } else { count++ @@ -661,13 +474,13 @@ func main3() { // Find the minimum err = ga.Minimize(VectorFactory) - fmt.Println(time.Since(ts)) - fmt.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) + log.Println(time.Since(ts)) + log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) //fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) if err != nil { - fmt.Println(err) + log.Println(err) return } - fmt.Println(allocation) + log.Println(allocation) } diff --git a/src/group_models.go b/src/group_models.go index ce8b9e9..24efa71 100644 --- a/src/group_models.go +++ b/src/group_models.go @@ -9,5 +9,3 @@ type Group struct { CPU int `json:"quota_cpu"` Memory int `json:"quota_mem"` } - - diff --git a/src/job_manager.go b/src/job_manager.go index 909d4ab..57c1741 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -6,124 +6,109 @@ import ( "strings" "io/ioutil" "encoding/json" - "strconv" log "github.com/sirupsen/logrus" + "sync" + "strconv" + "math/rand" ) type JobManager struct { - scheduler Scheduler - job Job - jobStatus JobStatus - resources []NodeStatus - killedFlag bool - isRunning bool - network string + scheduler Scheduler + job Job + jobStatus JobStatus + resources []NodeStatus + resourcesMu sync.Mutex + isRunning bool + killFlag bool + network string } func (jm *JobManager) start() { log.Info("start job ", jm.job.Name, time.Now()) jm.isRunning = false + jm.killFlag = false jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} - jm.network = InstanceOfResourcePool().acquireNetwork() - + /* register in JHL */ InstanceJobHistoryLogger().submitJob(jm.job) + /* request for private network */ + jm.network = InstanceOfResourcePool().acquireNetwork() + /* request for resources */ - for range jm.job.Tasks { //append would cause uncertain order - jm.resources = append(jm.resources, NodeStatus{ClientID: "null"}) - } - - var nodes []NodeStatus for { - if jm.killedFlag { + if jm.killFlag { break } - nodes = jm.scheduler.AcquireResource(jm.job) - if len(nodes) > 0 { + jm.resources = jm.scheduler.AcquireResource(jm.job) + if len(jm.resources) > 0 { + log.Info("Receive resource", jm.resources) break } - time.Sleep(time.Second * 1) - } - log.Info("Receive resource", nodes) - jm.resources = nodes - - for _, node := range nodes { - for _, t := range node.Status { - InstanceOfResourcePool().attach(t.UUID, jm.job.Name) - } + /* sleep random Millisecond to avoid deadlock */ + time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500))) } - if !jm.killedFlag { + if !jm.killFlag { + /* switch to Running state */ jm.scheduler.UpdateProgress(jm.job, Running) - jm.isRunning = true log.Info("ready to run job ", jm.job.Name, time.Now()) - } - /* bring up containers */ - for i := range jm.job.Tasks { - if jm.killedFlag { - break - } - var GPUs []string - for _, GPU := range jm.resources[i].Status { - GPUs = append(GPUs, GPU.UUID) - } - - for attempt := 0; attempt < 3; attempt++ { - if attempt == 2 { //failed more than once - //for { - // resource := jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], jm.resources) - // if len(resource.Status) > 0 { - // break - // } - time.Sleep(time.Second * 1) - break - //} - } - - v := url.Values{} - v.Set("image", jm.job.Tasks[i].Image) - v.Set("cmd", jm.job.Tasks[i].Cmd) - v.Set("name", jm.job.Tasks[i].Name) - v.Set("workspace", jm.job.Workspace) - v.Set("gpus", strings.Join(GPUs, ",")) - v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m") - v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) - v.Set("network", jm.network) - v.Set("should_wait", "1") - v.Set("output_dir", "/tmp/") - v.Set("hdfs_address", "http://192.168.100.104:50070/") - v.Set("hdfs_dir", "/user/yao/output/"+jm.job.Name) - v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[i].MemoryGPU)) - - resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") - if err != nil { - log.Warn(err.Error()) - continue - } - - body, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - log.Warn(err) - continue - } - - var res MsgCreate - err = json.Unmarshal([]byte(string(body)), &res) - if err != nil { - log.Warn(err) - continue - } - if res.Code != 0 { - log.Warn(res) - } - if res.Code == 0 { - jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id, Node: jm.resources[i].ClientHost} - break - } + /* bring up containers */ + wg := sync.WaitGroup{} + for i := range jm.job.Tasks { + wg.Add(1) + + go func(index int) { + defer wg.Done() + var UUIDs []string + for _, GPU := range jm.resources[index].Status { + UUIDs = append(UUIDs, GPU.UUID) + + /* attach to GPUs */ + InstanceOfResourcePool().attach(GPU.UUID, jm.job.Name) + } + GPUs := strings.Join(UUIDs, ",") + + v := url.Values{} + v.Set("image", jm.job.Tasks[index].Image) + v.Set("cmd", jm.job.Tasks[index].Cmd) + v.Set("name", jm.job.Tasks[index].Name) + v.Set("workspace", jm.job.Workspace) + v.Set("gpus", GPUs) + v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[index].Memory)+"m") + v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[index].NumberCPU)) + v.Set("network", jm.network) + v.Set("should_wait", "1") + v.Set("output_dir", "/tmp/") + v.Set("hdfs_address", "http://192.168.100.104:50070/") + v.Set("hdfs_dir", "/user/yao/output/"+jm.job.Name) + v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[index].MemoryGPU)) + + resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + if err != nil { + log.Warn(err.Error()) + return + } + + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + log.Warn(err) + return + } + + var res MsgCreate + err = json.Unmarshal([]byte(string(body)), &res) + if err != nil || res.Code != 0 { + log.Warn(res) + return + } + jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost} + }(i) } + wg.Wait() + jm.isRunning = true } /* monitor job execution */ @@ -134,87 +119,94 @@ func (jm *JobManager) start() { } time.Sleep(time.Second * 25) } + + /* make sure resource are released */ + jm.returnResource(jm.status().Status) + log.Info("JobMaster exited ", jm.job.Name) } -func (jm *JobManager) checkStatus(status []TaskStatus) bool { - if !jm.isRunning { - return false +/* release all resource */ +func (jm *JobManager) returnResource(status []TaskStatus) { + jm.resourcesMu.Lock() + defer jm.resourcesMu.Unlock() + if len(jm.resources) == 0 { + return } - flag := false + /* return resource */ + for i := range jm.resources { + jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) + log.Info("return resource ", jm.resources[i].ClientID) + + for _, t := range jm.resources[i].Status { + InstanceOfResourcePool().detach(t.UUID, jm.job) + } + + InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) + + /* remove exited containers */ + //v := url.Values{} + //v.Set("id", res.Status[i].Id) + // + //_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + //if err != nil { + // log.Warn(err.Error()) + // continue + //} + } + InstanceOfResourcePool().releaseNetwork(jm.network) + jm.resources = []NodeStatus{} +} + +/* monitor all tasks */ +func (jm *JobManager) checkStatus(status []TaskStatus) { + if !jm.isRunning { + return + } + flagRunning := false onlyPS := true for i := range status { if status[i].Status == "ready" { log.Debug(jm.job.Name, "-", i, " is ready to run") - flag = true - if !jm.job.Tasks[i].IsPS { - onlyPS = false - } - } else if status[i].Status == "unknown" { - log.Debug(jm.job.Name, "-", i, " is starting") - flag = true + flagRunning = true if !jm.job.Tasks[i].IsPS { onlyPS = false } } else if status[i].Status == "running" { log.Debug(jm.job.Name, "-", i, " is running") - flag = true + flagRunning = true if !jm.job.Tasks[i].IsPS { onlyPS = false } InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) } else { log.Info(jm.job.Name, "-", i, " ", status[i].Status) - if exitCode, ok := status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS { - if exitCode != 0 && !jm.killedFlag { - log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) - jm.killedFlag = true - jm.scheduler.UpdateProgress(jm.job, Failed) - } - } - - /* remove exited containers */ - //v := url.Values{} - //v.Set("id", res.Status[i].Id) - // - //_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") - //if err != nil { - // log.Warn(err.Error()) - // continue - //} - - /* return resource */ - if jm.resources[i].ClientID != "null" { - jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) - log.Info("return resource ", jm.resources[i].ClientID) - jm.resources[i].ClientID = "null" - - for _, t := range jm.resources[i].Status { - InstanceOfResourcePool().detach(t.UUID, jm.job) - } - - InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) + if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag { + log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) + jm.isRunning = false + jm.stop(false) + jm.scheduler.UpdateProgress(jm.job, Failed) + jm.returnResource(status) + break } } } - if flag && onlyPS { - jm.stop() + if jm.isRunning && onlyPS { log.Info("Only PS is running, stop ", jm.job.Name) - jm.killedFlag = false - } - - if !flag { jm.isRunning = false - InstanceOfResourcePool().releaseNetwork(jm.network) - - if !jm.killedFlag { - jm.scheduler.UpdateProgress(jm.job, Finished) - log.Info("finish job ", jm.job.Name) - } - log.Info("JobMaster exited ", jm.job.Name) + jm.stop(false) + jm.scheduler.UpdateProgress(jm.job, Finished) + jm.returnResource(status) + } + + if jm.isRunning && !flagRunning && !jm.killFlag { + jm.isRunning = false + jm.scheduler.UpdateProgress(jm.job, Finished) + jm.returnResource(status) + log.Info("finish job ", jm.job.Name) } - return flag } +/* fetch logs of task */ func (jm *JobManager) logs(taskName string) MsgLog { spider := Spider{} spider.Method = "GET" @@ -234,21 +226,22 @@ func (jm *JobManager) logs(taskName string) MsgLog { body, err := ioutil.ReadAll(resp.Body) if err != nil { - return MsgLog{Code: 1, Error: err.Error()} + return MsgLog{Code: 2, Error: err.Error()} } var res MsgLog err = json.Unmarshal([]byte(string(body)), &res) if err != nil { log.Println(err) - return MsgLog{Code: 1, Error: "Unknown"} + return MsgLog{Code: 3, Error: "Unknown"} } return res } +/* fetch job tasks status */ func (jm *JobManager) status() MsgJobStatus { var tasksStatus []TaskStatus - for range jm.job.Tasks { + for range jm.job.Tasks { //append would cause uncertain order tasksStatus = append(tasksStatus, TaskStatus{}) } @@ -286,22 +279,23 @@ func (jm *JobManager) status() MsgJobStatus { return MsgJobStatus{Status: tasksStatus} } -func (jm *JobManager) stop() MsgStop { - jm.killedFlag = true - go func() { /* kill at background */ - for _, taskStatus := range jm.jobStatus.tasks { - v := url.Values{} - v.Set("id", taskStatus.Id) +/* force stop all containers */ +func (jm *JobManager) stop(force bool) MsgStop { + for _, taskStatus := range jm.jobStatus.tasks { + v := url.Values{} + v.Set("id", taskStatus.Id) - _, err := doRequest("POST", "http://"+taskStatus.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") - if err != nil { - log.Warn(err.Error()) - continue - } + _, err := doRequest("POST", "http://"+taskStatus.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + if err != nil { + log.Warn(err.Error()) + continue } - }() + } - jm.scheduler.UpdateProgress(jm.job, Stopped) - log.Info("kill job, ", jm.job.Name) + if force { + jm.killFlag = true + jm.scheduler.UpdateProgress(jm.job, Stopped) + log.Info("kill job, ", jm.job.Name) + } return MsgStop{Code: 0} } diff --git a/src/main.go b/src/main.go index b7c62e1..58e8973 100644 --- a/src/main.go +++ b/src/main.go @@ -292,7 +292,7 @@ func main() { /* init components */ InstanceOfResourcePool().init(config) - InstanceOfColector().init(config) + InstanceOfCollector().init(config) InstanceJobHistoryLogger().init(config) InstanceOfOptimizer().init(config) InstanceOfGroupManager().init(config) diff --git a/src/pool_status.go b/src/pool_status.go index dafcd2f..bd1bac3 100644 --- a/src/pool_status.go +++ b/src/pool_status.go @@ -30,11 +30,13 @@ type NodeStatus struct { ClientID string `json:"id"` ClientHost string `json:"host"` Domain string `json:"domain"` - Rack int `json:"rack"` + Rack string `json:"rack"` Version float64 `json:"version"` NumCPU int `json:"cpu_num"` UtilCPU float64 `json:"cpu_load"` MemTotal int `json:"mem_total"` MemAvailable int `json:"mem_available"` + UsingBW float64 `json:"bw_using"` + TotalBW float64 `json:"bw_total"` Status []GPUStatus `json:"status"` } diff --git a/src/resource_pool.go b/src/resource_pool.go index 5008bb6..5f01eca 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -807,6 +807,12 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { /* assign */ var ress []NodeStatus if len(candidates) > 0 { + /* + for range job.Tasks { //append would cause uncertain order + resources = append(resources, NodeStatus{ClientID: "null"}) + } + */ + var nodes []NodeStatus if len(job.Tasks) == 1 { node := pool.pickNode(candidates, availableGPUs, task, job, []NodeStatus{}) diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 0948ec6..5c68c66 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -407,7 +407,7 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } - return jm.stop() + return jm.stop(true) } func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLog { diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 47fb36d..724cfa9 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -4,7 +4,7 @@ import ( "sync" "time" log "github.com/sirupsen/logrus" - ) +) type SchedulerPriority struct { history []*Job diff --git a/src/util.go b/src/util.go index 6387878..87ceadc 100644 --- a/src/util.go +++ b/src/util.go @@ -6,7 +6,7 @@ import ( "time" "io" "net/http" - ) +) type Configuration struct { KafkaBrokers []string `json:"kafkaBrokers"`