diff --git a/src/configuration.go b/src/configuration.go index 6117c82..443643a 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -17,7 +17,10 @@ type Configuration struct { HDFSBaseDir string `json:"HDFSBaseDir"` DFSBaseDir string `json:"DFSBaseDir"` EnableShareRatio float64 `json:"EnableShareRatio"` + ShareMaxUtilization float64 `json:"ShareMaxUtilization"` EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"` + PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds to schedule ahead except pre+post */ + PreScheduleTimeout int `json:"PreScheduleTimeout"` mock bool mu sync.Mutex @@ -46,7 +49,9 @@ func InstanceOfConfiguration() *Configuration { HDFSBaseDir: "/user/root/", DFSBaseDir: "", EnableShareRatio: 1.5, + ShareMaxUtilization: 1.3, // more than 1.0 to expect more improvement EnablePreScheduleRatio: 1.5, + PreScheduleExtraTime: 15, } /* override conf value from env */ @@ -84,12 +89,30 @@ func InstanceOfConfiguration() *Configuration { configurationInstance.EnableShareRatio = val } } + value = os.Getenv("ShareMaxUtilization") + if len(value) != 0 { + if val, err := strconv.ParseFloat(value, 32); err == nil { + configurationInstance.ShareMaxUtilization = val + } + } value = os.Getenv("EnablePreScheduleRatio") if len(value) != 0 { if val, err := strconv.ParseFloat(value, 32); err == nil { configurationInstance.EnablePreScheduleRatio = val } } + value = os.Getenv("PreScheduleExtraTime") + if len(value) != 0 { + if val, err := strconv.Atoi(value); err == nil { + configurationInstance.PreScheduleExtraTime = val + } + } + value = os.Getenv("PreScheduleTimeout") + if len(value) != 0 { + if val, err := strconv.Atoi(value); err == nil { + configurationInstance.PreScheduleTimeout = val + } + } } return configurationInstance } @@ -133,6 +156,9 @@ func (config *Configuration) Dump() map[string]interface{} { res["HDFSBaseDir"] = config.HDFSBaseDir res["DFSBaseDir"] = config.DFSBaseDir res["EnableShareRatio"] = config.EnableShareRatio + res["ShareMaxUtilization"] = config.ShareMaxUtilization res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio + res["PreScheduleExtraTime"] = config.PreScheduleExtraTime + res["PreScheduleTimeout"] = config.PreScheduleTimeout return res } diff --git a/src/job_manager.go b/src/job_manager.go index 0a5e869..f8909a7 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -120,6 +120,8 @@ func (jm *JobManager) start() { resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") if err != nil { log.Warn(err.Error()) + jm.job.Status = Failed + jm.stop(false) return } @@ -127,6 +129,8 @@ func (jm *JobManager) start() { resp.Body.Close() if err != nil { log.Warn(err) + jm.job.Status = Failed + jm.stop(false) return } @@ -134,6 +138,8 @@ func (jm *JobManager) start() { err = json.Unmarshal([]byte(string(body)), &res) if err != nil || res.Code != 0 { log.Warn(res) + jm.job.Status = Failed + jm.stop(false) return } jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost, HostName: jm.job.Tasks[i].Name} diff --git a/src/resource_pool.go b/src/resource_pool.go index 71b72d7..4073f74 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -761,7 +761,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { utilT := InstanceOfOptimizer().PredictReq(job, "Worker").UtilGPU totalUtil += utilT } - if totalUtil < 100 { + if totalUtil < int(InstanceOfConfiguration().ShareMaxUtilization*100) { available = append(available, status) } } @@ -872,7 +872,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { for _, jobT := range jobs { est := InstanceOfOptimizer().PredictTime(jobT) now := time.Now().Unix() - if int(now-jobT.StartedAt) > est.Total-est.Post-estimate.Pre-15 { + if int(now-jobT.StartedAt) > est.Total-est.Post-estimate.Pre-InstanceOfConfiguration().PreScheduleExtraTime { available = append(available, status) } }