1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-07-21 21:41:11 +08:00
parent eb4e60a97a
commit 731f173503
18 changed files with 388 additions and 438 deletions

View File

@@ -52,8 +52,12 @@ func InstanceOfConfiguration() *Configuration {
EnablePreScheduleRatio: 1.5,
PreScheduleExtraTime: 15,
}
}
return configurationInstance
}
/* override conf value from env */
/* read conf value from env */
func (config *Configuration) InitFromEnv() {
value := os.Getenv("KafkaBrokers")
if len(value) != 0 {
configurationInstance.KafkaBrokers = strings.Split(value, ",")
@@ -112,8 +116,6 @@ func InstanceOfConfiguration() *Configuration {
configurationInstance.PreScheduleTimeout = val
}
}
}
return configurationInstance
}
func (config *Configuration) SetMockEnabled(enabled bool) bool {

View File

@@ -64,7 +64,7 @@ func TestGA(t *testing.T) {
allocation = InstanceOfAllocator().fastBestFit(nodes, tasks)
InstanceOfResourcePool().init(Configuration{})
InstanceOfResourcePool().Start()
allocatedNodes := InstanceOfResourcePool().acquireResource(Job{Tasks: tasks})
log.Info(allocatedNodes)
}

View File

@@ -21,7 +21,7 @@ func InstanceOfGroupManager() *GroupManager {
return groupManagerInstance
}
func (gm *GroupManager) init(conf Configuration) {
func (gm *GroupManager) Start() {
}

View File

@@ -27,7 +27,7 @@ func InstanceJobHistoryLogger() *JobHistoryLogger {
return jobHistoryLoggerInstance
}
func (jhl *JobHistoryLogger) init(conf Configuration) {
func (jhl *JobHistoryLogger) Start() {
log.Info("jhl init")
jhl.jobs = map[string]Job{}
jhl.tasks = map[string][]TaskStatus{}

View File

@@ -12,77 +12,85 @@ import (
)
type JobManager struct {
/* meta */
scheduler Scheduler
job Job
jobStatus JobStatus
resources []NodeStatus
resourcesMu sync.Mutex
isRunning bool
killFlag bool
/* resource */
network string
resources map[string]NodeStatus
resourcesMu sync.Mutex
/* status */
jobStatus JobStatus
isRunning bool
lastHeartBeat int64
/* history info */
stats [][]TaskStatus
}
func (jm *JobManager) start() {
log.Info("start job ", jm.job.Name, " at ", time.Now())
jm.isRunning = false
jm.killFlag = false
jm.isRunning = true
jm.lastHeartBeat = time.Now().Unix()
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
jm.resources = map[string]NodeStatus{}
/* register in JHL */
InstanceJobHistoryLogger().submitJob(jm.job)
/* request for private network */
jm.network = InstanceOfResourcePool().acquireNetwork()
/* request for resources */
jm.resourcesMu.Lock()
jm.network = InstanceOfResourcePool().acquireNetwork()
for {
if jm.killFlag {
if !jm.isRunning {
break
}
jm.resources = jm.scheduler.AcquireResource(jm.job)
if len(jm.resources) > 0 {
resources := jm.scheduler.AcquireResource(jm.job)
if len(resources) > 0 {
for i, node := range resources {
jm.resources[jm.job.Tasks[i].Name] = node
}
log.Info(jm.job.Name, " receive resource", jm.resources)
break
}
/* sleep random Millisecond to avoid deadlock */
time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500)))
}
jm.job.StartedAt = time.Now().Unix()
jm.resourcesMu.Unlock()
if InstanceOfConfiguration().mock {
if jm.isRunning {
jm.scheduler.UpdateProgress(jm.job, Running)
jm.job.Status = Running
jm.isRunning = false
duration := InstanceOfMocker().GetDuration(jm.job, jm.resources)
log.Info("mock ", jm.job.Name, ", wait ", duration)
time.Sleep(time.Second * time.Duration(duration))
jm.returnResource([]TaskStatus{})
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
}
jm.returnResource()
log.Info("JobMaster exited ", jm.job.Name)
return
}
isShare := false
isScheduleAhead := false
if !jm.killFlag {
if jm.isRunning {
/* switch to Running state */
jm.scheduler.UpdateProgress(jm.job, Running)
jm.job.Status = Running
/* bring up containers */
wg := sync.WaitGroup{}
for i := range jm.job.Tasks {
success := true
for i, task := range jm.job.Tasks {
wg.Add(1)
go func(index int) {
go func(task Task, node NodeStatus) {
defer wg.Done()
var UUIDs []string
shouldWait := "0"
for _, GPU := range jm.resources[index].Status {
for _, GPU := range node.Status {
UUIDs = append(UUIDs, GPU.UUID)
if GPU.MemoryUsed == GPU.MemoryTotal {
shouldWait = "1"
@@ -96,40 +104,44 @@ func (jm *JobManager) start() {
GPUs := strings.Join(UUIDs, ",")
v := url.Values{}
v.Set("image", jm.job.Tasks[index].Image)
v.Set("cmd", jm.job.Tasks[index].Cmd)
v.Set("name", jm.job.Tasks[index].Name)
v.Set("image", task.Image)
v.Set("cmd", task.Cmd)
v.Set("name", task.Name)
v.Set("workspace", jm.job.Workspace)
v.Set("gpus", GPUs)
v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[index].Memory)+"m")
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[index].NumberCPU))
v.Set("mem_limit", strconv.Itoa(task.Memory)+"m")
v.Set("cpu_limit", strconv.Itoa(task.NumberCPU))
v.Set("network", jm.network)
v.Set("should_wait", shouldWait)
v.Set("output_dir", "/tmp/")
v.Set("hdfs_address", InstanceOfConfiguration().HDFSAddress)
v.Set("hdfs_dir", InstanceOfConfiguration().HDFSBaseDir+jm.job.Name)
v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[index].MemoryGPU))
v.Set("gpu_mem", strconv.Itoa(task.MemoryGPU))
if InstanceOfConfiguration().DFSBaseDir != "" {
v.Set("dfs_src", InstanceOfConfiguration().DFSBaseDir+jm.job.Name+"/task-"+strconv.Itoa(index))
v.Set("dfs_src", InstanceOfConfiguration().DFSBaseDir+jm.job.Name+"/task-"+task.Name)
} else {
v.Set("dfs_src", "")
}
v.Set("dfs_dst", "/tmp")
resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
spider := Spider{}
spider.Method = "POST"
spider.URL = "http://" + node.ClientHost + ":8000/create"
spider.Data = v
spider.ContentType = "application/x-www-form-urlencoded"
err := spider.do()
if err != nil {
log.Warn(err.Error())
jm.job.Status = Failed
jm.stop(false)
success = false
return
}
resp := spider.getResponse()
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Warn(err)
jm.job.Status = Failed
jm.stop(false)
success = false
return
}
@@ -137,28 +149,36 @@ func (jm *JobManager) start() {
err = json.Unmarshal([]byte(string(body)), &res)
if err != nil || res.Code != 0 {
log.Warn(res)
jm.job.Status = Failed
jm.stop(false)
success = false
return
}
jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost, HostName: jm.job.Tasks[i].Name}
}(i)
taskStatus := TaskStatus{Id: res.Id, Node: node.ClientHost, HostName: jm.job.Tasks[i].Name}
jm.jobStatus.tasks[task.Name] = taskStatus
}(task, jm.resources[task.Name])
}
wg.Wait()
jm.isRunning = true
/* start failed */
if !success {
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Failed)
jm.stop()
}
}
/* monitor job execution */
for {
//jm.status()
if !jm.isRunning || jm.killFlag {
if !jm.isRunning {
break
}
if time.Now().Unix()-jm.lastHeartBeat > 30 {
log.Warn(jm.job.Name, " heartbeat longer tha 30s")
}
time.Sleep(time.Second * 1)
}
/* make sure resources are released */
jm.returnResource(jm.status().Status)
/* release again to make sure resources are released */
jm.returnResource()
/* feed data to optimizer */
isExclusive := InstanceOfResourcePool().isExclusive(jm.job.Name)
@@ -197,47 +217,50 @@ func (jm *JobManager) start() {
if len(jm.job.Tasks) == 1 && !isShare && !isScheduleAhead && jm.job.Status == Finished && isExclusive {
InstanceOfOptimizer().FeedTime(jm.job, stats)
}
/* clear, to reduce memory usage */
jm.stats = [][]TaskStatus{}
/* remove exited containers */
//for _, task := range jm.jobStatus.tasks {
// go func(container TaskStatus) {
// v := url.Values{}
// v.Set("id", container.Id)
//
// spider := Spider{}
// spider.Method = "POST"
// spider.URL = "http://" + container.Node + ":8000/remove"
// spider.Data = v
// spider.ContentType = "application/x-www-form-urlencoded"
// err := spider.do()
// if err != nil {
// log.Warn(err.Error())
// }
// }(task)
//}
log.Info("JobMaster exited ", jm.job.Name)
}
/* release all resource */
func (jm *JobManager) returnResource(status []TaskStatus) {
func (jm *JobManager) returnResource() {
jm.resourcesMu.Lock()
defer jm.resourcesMu.Unlock()
/* return resource */
for i := range jm.resources {
if jm.resources[i].ClientID == "_released_" {
continue
}
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
log.Info("return resource again ", jm.resources[i].ClientID)
jm.resources[i].ClientID = "_released_"
for _, t := range jm.resources[i].Status {
InstanceOfResourcePool().detach(t.UUID, jm.job)
}
if !InstanceOfConfiguration().mock {
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
}
/* remove exited containers */
//v := url.Values{}
//v.Set("id", res.Status[i].Id)
//
//_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
//if err != nil {
// log.Warn(err.Error())
// continue
//}
}
jm.resources = map[string]NodeStatus{}
if jm.network != "" {
InstanceOfResourcePool().releaseNetwork(jm.network)
jm.network = ""
}
}
/* monitor all tasks */
/* monitor all tasks, update job status */
func (jm *JobManager) checkStatus(status []TaskStatus) {
if !jm.isRunning {
return
@@ -245,77 +268,57 @@ func (jm *JobManager) checkStatus(status []TaskStatus) {
flagRunning := false
onlyPS := true
for i := range status {
if status[i].Status == "ready" {
log.Debug(jm.job.Name, "-", i, " is ready to run")
flagRunning = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
} else if status[i].Status == "running" {
log.Debug(jm.job.Name, "-", i, " is running")
if status[i].Status == "ready" || status[i].Status == "running" {
flagRunning = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
} else if status[i].Status == "unknown" {
log.Warn(jm.job.Name, "-", i, " is unknown")
flagRunning = true
if !jm.job.Tasks[i].IsPS {
onlyPS = false
}
//InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
} else {
jm.resourcesMu.Lock()
if jm.resources[i].ClientID == "_released_" {
jm.resourcesMu.Unlock()
continue
}
log.Info(jm.job.Name, "-", i, " ", status[i].Status)
if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag {
if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && jm.isRunning {
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
jm.stop(false)
jm.killFlag = true
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Failed)
jm.job.Status = Failed
} else if !jm.killFlag {
jm.stop()
} else if jm.isRunning {
log.Info("Some instance exited, close others")
jm.stop(false)
jm.killFlag = true
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
jm.stop()
}
if jm.resources[i].ClientID != "_released_" {
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
log.Info("return resource ", jm.resources[i].ClientID)
jm.resources[i].ClientID = "_released_"
jm.resourcesMu.Lock()
nodeID := jm.job.Tasks[i].Name
if _, ok := jm.resources[nodeID]; ok {
jm.scheduler.ReleaseResource(jm.job, jm.resources[nodeID])
log.Info("return resource ", jm.resources[nodeID].ClientID)
for _, t := range jm.resources[i].Status {
for _, t := range jm.resources[nodeID].Status {
InstanceOfResourcePool().detach(t.UUID, jm.job)
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i])
delete(jm.resources, nodeID)
}
jm.resourcesMu.Unlock()
}
}
if flagRunning && onlyPS && !jm.killFlag {
if flagRunning && onlyPS && jm.isRunning {
log.Info("Only PS is running, stop ", jm.job.Name)
jm.stop(false)
jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
}
if !flagRunning && !jm.killFlag {
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.job.Status = Finished
log.Info("finish job ", jm.job.Name)
}
if !flagRunning {
jm.isRunning = false
jm.returnResource(status)
jm.scheduler.UpdateProgress(jm.job, Finished)
jm.stop()
}
if !flagRunning && jm.isRunning {
log.Info("finish job ", jm.job.Name)
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Finished)
}
}
@@ -354,7 +357,8 @@ func (jm *JobManager) logs(taskName string) MsgLog {
/* fetch job tasks status */
func (jm *JobManager) status() MsgJobStatus {
var tasksStatus []TaskStatus
for range jm.job.Tasks { //append would cause uncertain order
/* create slice ahead, since append would cause uncertain order */
for range jm.job.Tasks {
tasksStatus = append(tasksStatus, TaskStatus{})
}
@@ -418,18 +422,32 @@ func (jm *JobManager) status() MsgJobStatus {
return MsgJobStatus{Status: tasksStatus}
}
/* force stop all containers */
func (jm *JobManager) stop(force bool) MsgStop {
func (jm *JobManager) stop() MsgStop {
if jm.isRunning {
jm.isRunning = false
jm.scheduler.UpdateProgress(jm.job, Stopped)
log.Info("kill job, ", jm.job.Name)
}
for _, taskStatus := range jm.jobStatus.tasks {
/* stop at background */
go func(task TaskStatus) {
log.Info("kill ", jm.job.Name, "-", task.Id, " :", task.HostName)
v := url.Values{}
v.Set("id", task.Id)
resp, err := doRequest("POST", "http://"+task.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
spider := Spider{}
spider.Method = "POST"
spider.URL = "http://" + task.Node + ":8000/stop"
spider.Data = v
spider.ContentType = "application/x-www-form-urlencoded"
err := spider.do()
if err != nil {
log.Warn(err.Error())
return
}
resp := spider.getResponse()
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
@@ -445,17 +463,7 @@ func (jm *JobManager) stop(force bool) MsgStop {
if res.Code != 0 {
log.Warn(res.Error)
}
log.Info(jm.job.Name, ":", task.HostName, " is killed:", task.Id)
}(taskStatus)
}
go func() {
if force {
jm.killFlag = true
jm.scheduler.UpdateProgress(jm.job, Stopped)
jm.job.Status = Stopped
log.Info("kill job, ", jm.job.Name)
}
}()
return MsgStop{Code: 0}
}

View File

@@ -5,7 +5,6 @@ import (
"io"
"sync"
"runtime"
"fmt"
)
type Logger struct {
@@ -19,29 +18,59 @@ func (logger *Logger) Init() {
logger.LoggerModuleDisabled = map[string]bool{}
}
func (logger *Logger) Debug(args ... interface{}) {
_log.Debug(args)
}
func (logger *Logger) Info(args ... interface{}) {
func (logger *Logger) Debug(args ...interface{}) {
pc, _, _, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
module := "unknown"
if ok && details != nil {
fmt.Printf("called from %s\n", details.Name())
module = details.Name()
}
_log.Info(args)
args = append(args, module)
_log.Debug(args...)
}
func (logger *Logger) Warn(args ... interface{}) {
_log.Warn(args)
func (logger *Logger) Info(args ...interface{}) {
pc, _, _, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
module := "unknown"
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
_log.Info(args...)
}
func (logger *Logger) Fatal(args ... interface{}) {
_log.Fatal(args)
func (logger *Logger) Warn(args ...interface{}) {
pc, _, _, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
module := "unknown"
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
_log.Warn(args...)
}
func (logger *Logger) Fatalf(format string, args ... interface{}) {
_log.Fatalf(format, args)
func (logger *Logger) Fatal(args ...interface{}) {
pc, _, _, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
module := "unknown"
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
_log.Fatal(args...)
}
func (logger *Logger) Fatalf(format string, args ...interface{}) {
pc, _, _, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
module := "unknown"
if ok && details != nil {
module = details.Name()
}
args = append(args, module)
_log.Fatalf(format, args...)
}
func (logger *Logger) SetOutput(f io.Writer) {

View File

@@ -7,6 +7,7 @@ import (
"strconv"
"math/rand"
"os"
"fmt"
)
var log Logger
@@ -15,6 +16,7 @@ var scheduler Scheduler
func serverAPI(w http.ResponseWriter, r *http.Request) {
switch r.URL.Query().Get("action") {
/* resource pool */
case "agent_report":
log.Debug("agent_report")
msgAgentReport := MsgAgentReport{Code: 0}
@@ -50,6 +52,28 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "pool_status_history":
log.Debug("pool_status_history")
js, _ := json.Marshal(InstanceOfResourcePool().statusHistory())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "get_counter":
log.Debug("get_counters")
js, _ := json.Marshal(InstanceOfResourcePool().getCounter())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_pool_dump":
log.Debug("debug_pool_dump")
js, _ := json.Marshal(InstanceOfResourcePool().Dump())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
/* scheduler */
case "job_submit":
var job Job
log.Debug("job_submit")
@@ -95,6 +119,42 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "job_stop":
log.Debug("job_stop")
js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id"))))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "task_logs":
log.Debug("task_logs")
js, _ := json.Marshal(scheduler.QueryLogs(r.URL.Query().Get("job"), r.URL.Query().Get("task")))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "jobs":
log.Debug("job_list")
js, _ := json.Marshal(scheduler.ListJobs())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_scheduler_dump":
log.Debug("debug_scheduler_dump")
js, _ := json.Marshal(scheduler.DebugDump())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "summary":
log.Debug("summary")
js, _ := json.Marshal(scheduler.Summary())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
/* optimizer */
case "job_predict_req":
log.Debug("job_predict_req")
var job Job
@@ -143,48 +203,15 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "job_stop":
log.Debug("job_stop")
js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id"))))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "task_logs":
log.Debug("task_logs")
js, _ := json.Marshal(scheduler.QueryLogs(r.URL.Query().Get("job"), r.URL.Query().Get("task")))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "jobs":
log.Debug("job_list")
js, _ := json.Marshal(scheduler.ListJobs())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "summary":
log.Debug("summary")
js, _ := json.Marshal(scheduler.Summary())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "pool_status_history":
log.Debug("pool_status_history")
js, _ := json.Marshal(InstanceOfResourcePool().statusHistory())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "get_counter":
log.Debug("get_counters")
js, _ := json.Marshal(InstanceOfResourcePool().getCounter())
/* job history logger */
case "jhl_job_status":
log.Debug("jhl_job_status")
js, _ := json.Marshal(InstanceJobHistoryLogger().getTaskStatus(r.URL.Query().Get("job")))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
/* group */
case "group_list":
log.Debug("group_list")
js, _ := json.Marshal(InstanceOfGroupManager().List())
@@ -227,7 +254,6 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
break
case "group_remove":
/* TODO: rearrange jobs to other queues */
log.Debug("group_remove")
var group Group
msg := MsgGroupCreate{Code: 0}
@@ -244,27 +270,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "jhl_job_status":
log.Debug("jhl_job_status")
js, _ := json.Marshal(InstanceJobHistoryLogger().getTaskStatus(r.URL.Query().Get("job")))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_scheduler_dump":
log.Debug("debug_scheduler_dump")
js, _ := json.Marshal(scheduler.DebugDump())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "debug_pool_dump":
log.Debug("debug_pool_dump")
js, _ := json.Marshal(InstanceOfResourcePool().Dump())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
/* configuration */
case "conf_list":
log.Debug("conf_list")
var msg MsgConfList
@@ -308,8 +314,9 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
break
case "pool.batch.interval":
interval := str2int(value, 1)
if interval, err := strconv.Atoi(value); err == nil {
ok = InstanceOfResourcePool().SetBatchInterval(interval)
}
break
/* scheduler.mock */
@@ -324,8 +331,9 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
/* scheduler.parallelism */
case "scheduler.parallelism":
parallelism, _ := strconv.Atoi(value)
if parallelism, err := strconv.Atoi(value); err == nil {
ok = scheduler.UpdateParallelism(parallelism)
}
break
/* allocator.strategy */
@@ -333,6 +341,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
ok = InstanceOfAllocator().updateStrategy(value)
break
/* logger */
case "logger.level":
ok = log.SetLoggerLevel(value)
break
@@ -350,7 +359,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
msg.Code = 0
if !ok {
msg.Code = 1
msg.Error = "Option not exist or invalid value"
msg.Error = fmt.Sprintf("Option (%s) not exist or invalid value (%s)", option, value)
}
js, _ := json.Marshal(msg)
w.Header().Set("Content-Type", "application/json")
@@ -381,15 +390,14 @@ func main() {
log.SetOutput(f)
}
/* parse configuration */
config := *InstanceOfConfiguration()
config := InstanceOfConfiguration()
config.InitFromEnv()
/* init components */
InstanceOfResourcePool().init(config)
//InstanceOfCollector().init(config)
InstanceJobHistoryLogger().init(config)
InstanceOfOptimizer().Init(config)
InstanceOfGroupManager().init(config)
InstanceOfResourcePool().Start()
InstanceJobHistoryLogger().Start()
InstanceOfOptimizer().Start()
InstanceOfGroupManager().Start()
switch config.SchedulerPolicy {
case "FCFS":

View File

@@ -23,7 +23,7 @@ func InstanceOfMocker() *Mocker {
return MockerInstance
}
func (mocker *Mocker) GetDuration(job Job, nodes []NodeStatus) int {
func (mocker *Mocker) GetDuration(job Job, nodes map[string]NodeStatus) int {
str := strings.Split(job.Name, "-")
duration := 300
@@ -37,11 +37,11 @@ func (mocker *Mocker) GetDuration(job Job, nodes []NodeStatus) int {
} else if len(job.Tasks) == 3 {
var psNodes []string
var workerNodes []string
for i, task := range job.Tasks {
for _, task := range job.Tasks {
if task.IsPS {
psNodes = append(psNodes, nodes[i].ClientHost)
psNodes = append(psNodes, nodes[task.Name].ClientHost)
} else {
workerNodes = append(workerNodes, nodes[i].ClientHost)
workerNodes = append(workerNodes, nodes[task.Name].ClientHost)
}
}
if psNodes[0] == workerNodes[0] {

View File

@@ -50,7 +50,7 @@ func InstanceOfOptimizer() *Optimizer {
return optimizerInstance
}
func (optimizer *Optimizer) Init(conf Configuration) {
func (optimizer *Optimizer) Start() {
log.Info("optimizer started")
}

View File

@@ -8,7 +8,7 @@ import (
func TestPool(t *testing.T) {
return
InstanceOfResourcePool().init(Configuration{})
InstanceOfResourcePool().Start()
for j := 0; j < 100; j++ {
for i := 0; i < 1000; i++ {
@@ -36,7 +36,7 @@ func TestPool(t *testing.T) {
}
func TestAllocate(t *testing.T) {
InstanceOfResourcePool().init(Configuration{})
InstanceOfResourcePool().Start()
job := Job{Name: strconv.Itoa(int(time.Now().Unix() % 1000000000))}
job.Group = "default"

View File

@@ -4,7 +4,6 @@ import (
"sync"
"time"
"net/url"
"strings"
"math/rand"
"strconv"
"sort"
@@ -51,11 +50,11 @@ type ResourcePool struct {
exclusiveJobs map[string]bool
TotalGPU int
TotalGPUMu sync.Mutex
TotalCPU int
TotalMemory int
TotalMu sync.Mutex
UsingGPU int
UsingGPUMu sync.Mutex
UsingMu sync.Mutex
enableBatch bool
batchJobs map[string]Job
@@ -64,7 +63,7 @@ type ResourcePool struct {
batchInterval int
}
func (pool *ResourcePool) init(conf Configuration) {
func (pool *ResourcePool) Start() {
log.Info("RM started ")
pool.networks = map[string]bool{}
@@ -181,13 +180,13 @@ func (pool *ResourcePool) checkDeadNodes() {
}
seg.Lock.Lock()
pool.TotalGPUMu.Lock()
pool.TotalMu.Lock()
if _, ok := seg.Nodes[k]; ok {
pool.TotalGPU -= len(seg.Nodes[k].Status)
pool.TotalCPU -= seg.Nodes[k].NumCPU
pool.TotalMemory -= seg.Nodes[k].MemTotal
}
pool.TotalGPUMu.Unlock()
pool.TotalMu.Unlock()
delete(seg.Nodes, k)
seg.Lock.Unlock()
pool.versionsMu.Lock()
@@ -297,11 +296,11 @@ func (pool *ResourcePool) saveStatusHistory() {
}
pool.historyMu.Unlock()
pool.TotalGPUMu.Lock()
pool.TotalMu.Lock()
pool.TotalGPU = TotalGPU
pool.TotalCPU = TotalCPU
pool.TotalMemory = TotalMemGPU
pool.TotalGPUMu.Unlock()
pool.TotalMu.Unlock()
time.Sleep(time.Second * 60)
}
}
@@ -359,11 +358,11 @@ func (pool *ResourcePool) update(node NodeStatus) {
}
} else {
/* TODO: double check node do belong to this seg */
pool.TotalGPUMu.Lock()
pool.TotalMu.Lock()
pool.TotalGPU += len(node.Status)
pool.TotalCPU += node.NumCPU
pool.TotalMemory += node.MemTotal
pool.TotalGPUMu.Unlock()
pool.TotalMu.Unlock()
log.Info("node ", node.ClientID, " is online")
}
seg.Nodes[node.ClientID] = &node
@@ -517,11 +516,17 @@ func (pool *ResourcePool) acquireNetwork() string {
}
v := url.Values{}
v.Set("name", network)
resp, err := doRequest("POST", "http://yao-agent-master:8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
spider := Spider{}
spider.Method = "POST"
spider.URL = "http://yao-agent-master:8000/create"
spider.Data = v
spider.ContentType = "application/x-www-form-urlencoded"
err := spider.do()
if err != nil {
log.Warn(err.Error())
continue
}
resp := spider.getResponse()
resp.Body.Close()
pool.networksFree[network] = true
pool.networks[network] = true
@@ -580,86 +585,6 @@ func (pool *ResourcePool) countGPU() (int, int) {
return pool.TotalGPU - pool.UsingGPU, pool.UsingGPU
}
func (pool *ResourcePool) pickNode(candidates []*NodeStatus, availableGPUs map[string][]GPUStatus, task Task, job Job, nodes []NodeStatus) *NodeStatus {
/* shuffle */
r := rand.New(rand.NewSource(time.Now().Unix()))
for n := len(candidates); n > 0; n-- {
randIndex := r.Intn(n)
candidates[n-1], candidates[randIndex] = candidates[randIndex], candidates[n-1]
}
/* sort */
// single node, single GPU
sort.Slice(candidates, func(a, b int) bool {
diffA := pool.GPUModelToPower(candidates[a].Status[0].ProductName) - pool.GPUModelToPower(task.ModelGPU)
diffB := pool.GPUModelToPower(candidates[b].Status[0].ProductName) - pool.GPUModelToPower(task.ModelGPU)
if diffA > 0 && diffB >= 0 && diffA > diffB {
return false //b
}
if diffA < 0 && diffB < 0 && diffA > diffB {
return false
}
if diffA < 0 && diffB >= 0 {
return false
}
if diffA == diffB {
if len(availableGPUs[candidates[a].ClientID]) == len(availableGPUs[candidates[b].ClientID]) {
return candidates[a].UtilCPU > candidates[b].UtilCPU
}
return len(availableGPUs[candidates[a].ClientID]) < len(availableGPUs[candidates[b].ClientID])
}
return true //a
})
var t []*NodeStatus
bestGPU := candidates[0].Status[0].ProductName
for _, node := range candidates {
if node.Status[0].ProductName != bestGPU {
break
}
t = append(t, node)
}
candidates = t
if (len(job.Tasks) == 1) && task.NumberGPU > 1 { //single node, multi GPUs
sort.Slice(candidates, func(a, b int) bool {
if len(availableGPUs[candidates[a].ClientID]) == len(availableGPUs[candidates[b].ClientID]) {
return candidates[a].UtilCPU > candidates[b].UtilCPU
}
return len(availableGPUs[candidates[a].ClientID]) < len(availableGPUs[candidates[b].ClientID])
})
}
if len(job.Tasks) > 1 { //multi nodes, multi GPUs
sort.Slice(candidates, func(a, b int) bool {
distanceA := 0
distanceB := 0
for _, node := range nodes {
if node.Rack != candidates[a].Rack {
distanceA += 10
}
if node.ClientID != candidates[a].ClientID {
distanceA += 1
}
if node.Rack != candidates[b].Rack {
distanceB += 10
}
if node.ClientID != candidates[b].ClientID {
distanceB += 1
}
}
if distanceA == distanceB {
return len(availableGPUs[candidates[a].ClientID]) > len(availableGPUs[candidates[b].ClientID])
}
return distanceA*job.Locality < distanceB*job.Locality
})
}
return candidates[0]
}
func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
for i := range job.Tasks {
job.Tasks[i].Job = job.Name
@@ -671,6 +596,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
pool.batchJobs[job.Name] = job
pool.batchMu.Unlock()
for {
/* wait until request is satisfied */
pool.batchMu.Lock()
if _, ok := pool.batchAllocations[job.Name]; ok {
pool.batchMu.Unlock()
@@ -785,9 +711,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingMu.Lock()
pool.UsingGPU ++
pool.UsingGPUMu.Unlock()
pool.UsingMu.Unlock()
}
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
@@ -895,9 +821,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingMu.Lock()
pool.UsingGPU ++
pool.UsingGPUMu.Unlock()
pool.UsingMu.Unlock()
}
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
@@ -989,9 +915,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingMu.Lock()
pool.UsingGPU ++
pool.UsingGPUMu.Unlock()
pool.UsingMu.Unlock()
}
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
@@ -1040,6 +966,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
return ress
}
/*
TODO:
bug-1: node is offline, unable to retrieve allocation info
bug-2: when node offline & back, allocation info is lost
*/
func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) {
segID := pool.getNodePool(agent.ClientID)
seg := pool.pools[segID]
@@ -1052,7 +983,7 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) {
node, ok := seg.Nodes[agent.ClientID]
/* in case node is offline */
if !ok {
/* TODO, update usingTotalGPU correctly */
/* bug-1 */
log.Warn("node ", agent.ClientID, " not present")
return
}
@@ -1060,19 +991,18 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) {
for j := range node.Status {
if gpu.UUID == node.Status[j].UUID {
node.Status[j].MemoryAllocated -= gpu.MemoryTotal
log.Debug(node.Status[j].MemoryAllocated)
if node.Status[j].MemoryAllocated < 0 {
// in case of error
/* Case 0: a node is offline and then online, the allocation info will be lost */
/* bug-2: a node is offline and then online, the allocation info will be lost */
log.Warn(node.ClientID, " UUID=", gpu.UUID, " More Memory Allocated")
node.Status[j].MemoryAllocated = 0
}
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingMu.Lock()
pool.UsingGPU--
pool.UsingGPUMu.Unlock()
pool.UsingMu.Unlock()
log.Info(node.Status[j].UUID, " is released")
}
//log.Info(node.Status[j].MemoryAllocated)
}
}
}

View File

@@ -25,6 +25,7 @@ type Scheduler interface {
UpdateParallelism(parallelism int) bool
/* TODO: rearrange jobs to other queues */
updateGroup(group Group) bool
DebugDump() map[string]interface{}

View File

@@ -107,7 +107,7 @@ func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop {
if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"}
}
return jm.stop(true)
return jm.stop()
}
func (scheduler *SchedulerFCFS) QueryLogs(jobName string, taskName string) MsgLog {

View File

@@ -272,7 +272,7 @@ func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop {
if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"}
}
return jm.stop(true)
return jm.stop()
}
func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog {

View File

@@ -379,6 +379,7 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) {
if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Running
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
scheduler.history[i].StartedAt = time.Now().Unix()
}
}
break
@@ -694,7 +695,7 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop {
jm, ok := scheduler.jobs[jobName]
scheduler.queuesMu.Unlock()
if ok {
return jm.stop(true)
return jm.stop()
} else {
found := false
for queue := range scheduler.queues {
@@ -802,6 +803,9 @@ func (scheduler *SchedulerFair) SetEnabled(enabled bool) bool {
}
func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool {
if parallelism < 1 {
parallelism = 1
}
scheduler.parallelism = parallelism
log.Info("parallelism is updated to ", parallelism)
return true

View File

@@ -252,7 +252,7 @@ func (scheduler *SchedulerPriority) Stop(jobName string) MsgStop {
if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"}
}
return jm.stop(true)
return jm.stop()
}
func (scheduler *SchedulerPriority) QueryLogs(jobName string, taskName string) MsgLog {

View File

@@ -25,17 +25,21 @@ func (spider *Spider) do() error {
return err
}
if len(spider.ContentType) > 0 {
if len(spider.ContentType) == 0 {
spider.ContentType = ""
}
req.Header.Set("Content-Type", spider.ContentType)
}
/* set user-agent */
if len(spider.UserAgent) == 0 {
req.Header.Set("User-Agent", getUA())
spider.UserAgent = spider.getUA()
}
req.Header.Set("User-Agent", spider.UserAgent)
if len(spider.Referer) > 0 {
req.Header.Set("Referer", spider.Referer)
if len(spider.Referer) == 0 {
spider.Referer = ""
}
req.Header.Set("Referer", spider.Referer)
spider.Response, err = client.Do(req)
if err != nil {

View File

@@ -2,10 +2,6 @@ package main
import (
"strconv"
"math/rand"
"time"
"io"
"net/http"
)
type Job struct {
@@ -24,6 +20,7 @@ type Job struct {
Locality int `json:"locality"`
Status State `json:"status"`
NumberGPU int `json:"number_GPU"`
Retries int `json:"retries"`
}
type Task struct {
@@ -65,43 +62,10 @@ type ResourceCount struct {
Memory int
}
func str2int(str string, defaultValue int) int {
i, err := strconv.Atoi(str)
func str2int(s string, defaultValue int) int {
i, err := strconv.Atoi(s)
if err == nil {
return i
}
return defaultValue
}
func getUA() string {
rand.Seed(time.Now().Unix())
UAs := []string{
"Mozilla/5.0 (X11; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0",
"Mozilla/5.0 (X11; Linux i586; rv:63.0) Gecko/20100101 Firefox/63.0",
"Mozilla/5.0 (Windows NT 6.2; WOW64; rv:63.0) Gecko/20100101 Firefox/63.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:10.0) Gecko/20100101 Firefox/62.0",
"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.13; ko; rv:1.9.1b2) Gecko/20081201 Firefox/60.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/58.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14931",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9",
}
return UAs[rand.Intn(len(UAs))]
}
func doRequest(method string, url string, r io.Reader, contentType string, referer string) (*http.Response, error) {
client := &http.Client{}
req, err := http.NewRequest(method, url, r)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
req.Header.Set("User-Agent", getUA())
req.Header.Set("Referer", referer)
resp, err := client.Do(req)
return resp, err
}