mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-12 23:36:44 +00:00
update
This commit is contained in:
@@ -6,124 +6,109 @@ import (
|
||||
"strings"
|
||||
"io/ioutil"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"sync"
|
||||
"strconv"
|
||||
"math/rand"
|
||||
)
|
||||
|
||||
type JobManager struct {
|
||||
scheduler Scheduler
|
||||
job Job
|
||||
jobStatus JobStatus
|
||||
resources []NodeStatus
|
||||
killedFlag bool
|
||||
isRunning bool
|
||||
network string
|
||||
scheduler Scheduler
|
||||
job Job
|
||||
jobStatus JobStatus
|
||||
resources []NodeStatus
|
||||
resourcesMu sync.Mutex
|
||||
isRunning bool
|
||||
killFlag bool
|
||||
network string
|
||||
}
|
||||
|
||||
func (jm *JobManager) start() {
|
||||
log.Info("start job ", jm.job.Name, time.Now())
|
||||
jm.isRunning = false
|
||||
jm.killFlag = false
|
||||
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
|
||||
|
||||
jm.network = InstanceOfResourcePool().acquireNetwork()
|
||||
|
||||
/* register in JHL */
|
||||
InstanceJobHistoryLogger().submitJob(jm.job)
|
||||
|
||||
/* request for private network */
|
||||
jm.network = InstanceOfResourcePool().acquireNetwork()
|
||||
|
||||
/* request for resources */
|
||||
for range jm.job.Tasks { //append would cause uncertain order
|
||||
jm.resources = append(jm.resources, NodeStatus{ClientID: "null"})
|
||||
}
|
||||
|
||||
var nodes []NodeStatus
|
||||
for {
|
||||
if jm.killedFlag {
|
||||
if jm.killFlag {
|
||||
break
|
||||
}
|
||||
nodes = jm.scheduler.AcquireResource(jm.job)
|
||||
if len(nodes) > 0 {
|
||||
jm.resources = jm.scheduler.AcquireResource(jm.job)
|
||||
if len(jm.resources) > 0 {
|
||||
log.Info("Receive resource", jm.resources)
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second * 1)
|
||||
}
|
||||
log.Info("Receive resource", nodes)
|
||||
jm.resources = nodes
|
||||
|
||||
for _, node := range nodes {
|
||||
for _, t := range node.Status {
|
||||
InstanceOfResourcePool().attach(t.UUID, jm.job.Name)
|
||||
}
|
||||
/* sleep random Millisecond to avoid deadlock */
|
||||
time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500)))
|
||||
}
|
||||
|
||||
if !jm.killedFlag {
|
||||
if !jm.killFlag {
|
||||
/* switch to Running state */
|
||||
jm.scheduler.UpdateProgress(jm.job, Running)
|
||||
jm.isRunning = true
|
||||
log.Info("ready to run job ", jm.job.Name, time.Now())
|
||||
}
|
||||
|
||||
/* bring up containers */
|
||||
for i := range jm.job.Tasks {
|
||||
if jm.killedFlag {
|
||||
break
|
||||
}
|
||||
var GPUs []string
|
||||
for _, GPU := range jm.resources[i].Status {
|
||||
GPUs = append(GPUs, GPU.UUID)
|
||||
}
|
||||
|
||||
for attempt := 0; attempt < 3; attempt++ {
|
||||
if attempt == 2 { //failed more than once
|
||||
//for {
|
||||
// resource := jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], jm.resources)
|
||||
// if len(resource.Status) > 0 {
|
||||
// break
|
||||
// }
|
||||
time.Sleep(time.Second * 1)
|
||||
break
|
||||
//}
|
||||
}
|
||||
|
||||
v := url.Values{}
|
||||
v.Set("image", jm.job.Tasks[i].Image)
|
||||
v.Set("cmd", jm.job.Tasks[i].Cmd)
|
||||
v.Set("name", jm.job.Tasks[i].Name)
|
||||
v.Set("workspace", jm.job.Workspace)
|
||||
v.Set("gpus", strings.Join(GPUs, ","))
|
||||
v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m")
|
||||
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU))
|
||||
v.Set("network", jm.network)
|
||||
v.Set("should_wait", "1")
|
||||
v.Set("output_dir", "/tmp/")
|
||||
v.Set("hdfs_address", "http://192.168.100.104:50070/")
|
||||
v.Set("hdfs_dir", "/user/yao/output/"+jm.job.Name)
|
||||
v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[i].MemoryGPU))
|
||||
|
||||
resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
continue
|
||||
}
|
||||
|
||||
var res MsgCreate
|
||||
err = json.Unmarshal([]byte(string(body)), &res)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
continue
|
||||
}
|
||||
if res.Code != 0 {
|
||||
log.Warn(res)
|
||||
}
|
||||
if res.Code == 0 {
|
||||
jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id, Node: jm.resources[i].ClientHost}
|
||||
break
|
||||
}
|
||||
/* bring up containers */
|
||||
wg := sync.WaitGroup{}
|
||||
for i := range jm.job.Tasks {
|
||||
wg.Add(1)
|
||||
|
||||
go func(index int) {
|
||||
defer wg.Done()
|
||||
var UUIDs []string
|
||||
for _, GPU := range jm.resources[index].Status {
|
||||
UUIDs = append(UUIDs, GPU.UUID)
|
||||
|
||||
/* attach to GPUs */
|
||||
InstanceOfResourcePool().attach(GPU.UUID, jm.job.Name)
|
||||
}
|
||||
GPUs := strings.Join(UUIDs, ",")
|
||||
|
||||
v := url.Values{}
|
||||
v.Set("image", jm.job.Tasks[index].Image)
|
||||
v.Set("cmd", jm.job.Tasks[index].Cmd)
|
||||
v.Set("name", jm.job.Tasks[index].Name)
|
||||
v.Set("workspace", jm.job.Workspace)
|
||||
v.Set("gpus", GPUs)
|
||||
v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[index].Memory)+"m")
|
||||
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[index].NumberCPU))
|
||||
v.Set("network", jm.network)
|
||||
v.Set("should_wait", "1")
|
||||
v.Set("output_dir", "/tmp/")
|
||||
v.Set("hdfs_address", "http://192.168.100.104:50070/")
|
||||
v.Set("hdfs_dir", "/user/yao/output/"+jm.job.Name)
|
||||
v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[index].MemoryGPU))
|
||||
|
||||
resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
return
|
||||
}
|
||||
|
||||
var res MsgCreate
|
||||
err = json.Unmarshal([]byte(string(body)), &res)
|
||||
if err != nil || res.Code != 0 {
|
||||
log.Warn(res)
|
||||
return
|
||||
}
|
||||
jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
jm.isRunning = true
|
||||
}
|
||||
|
||||
/* monitor job execution */
|
||||
@@ -134,87 +119,94 @@ func (jm *JobManager) start() {
|
||||
}
|
||||
time.Sleep(time.Second * 25)
|
||||
}
|
||||
|
||||
/* make sure resource are released */
|
||||
jm.returnResource(jm.status().Status)
|
||||
log.Info("JobMaster exited ", jm.job.Name)
|
||||
}
|
||||
|
||||
func (jm *JobManager) checkStatus(status []TaskStatus) bool {
|
||||
if !jm.isRunning {
|
||||
return false
|
||||
/* release all resource */
|
||||
func (jm *JobManager) returnResource(status []TaskStatus) {
|
||||
jm.resourcesMu.Lock()
|
||||
defer jm.resourcesMu.Unlock()
|
||||
if len(jm.resources) == 0 {
|
||||
return
|
||||
}
|
||||
flag := false
|
||||
/* return resource */
|
||||
for i := range jm.resources {
|
||||
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
|
||||
log.Info("return resource ", jm.resources[i].ClientID)
|
||||
|
||||
for _, t := range jm.resources[i].Status {
|
||||
InstanceOfResourcePool().detach(t.UUID, jm.job)
|
||||
}
|
||||
|
||||
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
|
||||
|
||||
/* remove exited containers */
|
||||
//v := url.Values{}
|
||||
//v.Set("id", res.Status[i].Id)
|
||||
//
|
||||
//_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||
//if err != nil {
|
||||
// log.Warn(err.Error())
|
||||
// continue
|
||||
//}
|
||||
}
|
||||
InstanceOfResourcePool().releaseNetwork(jm.network)
|
||||
jm.resources = []NodeStatus{}
|
||||
}
|
||||
|
||||
/* monitor all tasks */
|
||||
func (jm *JobManager) checkStatus(status []TaskStatus) {
|
||||
if !jm.isRunning {
|
||||
return
|
||||
}
|
||||
flagRunning := false
|
||||
onlyPS := true
|
||||
for i := range status {
|
||||
if status[i].Status == "ready" {
|
||||
log.Debug(jm.job.Name, "-", i, " is ready to run")
|
||||
flag = true
|
||||
if !jm.job.Tasks[i].IsPS {
|
||||
onlyPS = false
|
||||
}
|
||||
} else if status[i].Status == "unknown" {
|
||||
log.Debug(jm.job.Name, "-", i, " is starting")
|
||||
flag = true
|
||||
flagRunning = true
|
||||
if !jm.job.Tasks[i].IsPS {
|
||||
onlyPS = false
|
||||
}
|
||||
} else if status[i].Status == "running" {
|
||||
log.Debug(jm.job.Name, "-", i, " is running")
|
||||
flag = true
|
||||
flagRunning = true
|
||||
if !jm.job.Tasks[i].IsPS {
|
||||
onlyPS = false
|
||||
}
|
||||
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
|
||||
} else {
|
||||
log.Info(jm.job.Name, "-", i, " ", status[i].Status)
|
||||
if exitCode, ok := status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS {
|
||||
if exitCode != 0 && !jm.killedFlag {
|
||||
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
|
||||
jm.killedFlag = true
|
||||
jm.scheduler.UpdateProgress(jm.job, Failed)
|
||||
}
|
||||
}
|
||||
|
||||
/* remove exited containers */
|
||||
//v := url.Values{}
|
||||
//v.Set("id", res.Status[i].Id)
|
||||
//
|
||||
//_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||
//if err != nil {
|
||||
// log.Warn(err.Error())
|
||||
// continue
|
||||
//}
|
||||
|
||||
/* return resource */
|
||||
if jm.resources[i].ClientID != "null" {
|
||||
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
|
||||
log.Info("return resource ", jm.resources[i].ClientID)
|
||||
jm.resources[i].ClientID = "null"
|
||||
|
||||
for _, t := range jm.resources[i].Status {
|
||||
InstanceOfResourcePool().detach(t.UUID, jm.job)
|
||||
}
|
||||
|
||||
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
|
||||
if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag {
|
||||
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
|
||||
jm.isRunning = false
|
||||
jm.stop(false)
|
||||
jm.scheduler.UpdateProgress(jm.job, Failed)
|
||||
jm.returnResource(status)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if flag && onlyPS {
|
||||
jm.stop()
|
||||
if jm.isRunning && onlyPS {
|
||||
log.Info("Only PS is running, stop ", jm.job.Name)
|
||||
jm.killedFlag = false
|
||||
}
|
||||
|
||||
if !flag {
|
||||
jm.isRunning = false
|
||||
InstanceOfResourcePool().releaseNetwork(jm.network)
|
||||
|
||||
if !jm.killedFlag {
|
||||
jm.scheduler.UpdateProgress(jm.job, Finished)
|
||||
log.Info("finish job ", jm.job.Name)
|
||||
}
|
||||
log.Info("JobMaster exited ", jm.job.Name)
|
||||
jm.stop(false)
|
||||
jm.scheduler.UpdateProgress(jm.job, Finished)
|
||||
jm.returnResource(status)
|
||||
}
|
||||
|
||||
if jm.isRunning && !flagRunning && !jm.killFlag {
|
||||
jm.isRunning = false
|
||||
jm.scheduler.UpdateProgress(jm.job, Finished)
|
||||
jm.returnResource(status)
|
||||
log.Info("finish job ", jm.job.Name)
|
||||
}
|
||||
return flag
|
||||
}
|
||||
|
||||
/* fetch logs of task */
|
||||
func (jm *JobManager) logs(taskName string) MsgLog {
|
||||
spider := Spider{}
|
||||
spider.Method = "GET"
|
||||
@@ -234,21 +226,22 @@ func (jm *JobManager) logs(taskName string) MsgLog {
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return MsgLog{Code: 1, Error: err.Error()}
|
||||
return MsgLog{Code: 2, Error: err.Error()}
|
||||
}
|
||||
|
||||
var res MsgLog
|
||||
err = json.Unmarshal([]byte(string(body)), &res)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return MsgLog{Code: 1, Error: "Unknown"}
|
||||
return MsgLog{Code: 3, Error: "Unknown"}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
/* fetch job tasks status */
|
||||
func (jm *JobManager) status() MsgJobStatus {
|
||||
var tasksStatus []TaskStatus
|
||||
for range jm.job.Tasks {
|
||||
for range jm.job.Tasks { //append would cause uncertain order
|
||||
tasksStatus = append(tasksStatus, TaskStatus{})
|
||||
}
|
||||
|
||||
@@ -286,22 +279,23 @@ func (jm *JobManager) status() MsgJobStatus {
|
||||
return MsgJobStatus{Status: tasksStatus}
|
||||
}
|
||||
|
||||
func (jm *JobManager) stop() MsgStop {
|
||||
jm.killedFlag = true
|
||||
go func() { /* kill at background */
|
||||
for _, taskStatus := range jm.jobStatus.tasks {
|
||||
v := url.Values{}
|
||||
v.Set("id", taskStatus.Id)
|
||||
/* force stop all containers */
|
||||
func (jm *JobManager) stop(force bool) MsgStop {
|
||||
for _, taskStatus := range jm.jobStatus.tasks {
|
||||
v := url.Values{}
|
||||
v.Set("id", taskStatus.Id)
|
||||
|
||||
_, err := doRequest("POST", "http://"+taskStatus.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
_, err := doRequest("POST", "http://"+taskStatus.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
continue
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
jm.scheduler.UpdateProgress(jm.job, Stopped)
|
||||
log.Info("kill job, ", jm.job.Name)
|
||||
if force {
|
||||
jm.killFlag = true
|
||||
jm.scheduler.UpdateProgress(jm.job, Stopped)
|
||||
log.Info("kill job, ", jm.job.Name)
|
||||
}
|
||||
return MsgStop{Code: 0}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user