mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-06-06 22:01:55 +00:00
update, move conf to configuration for better tuning
This commit is contained in:
parent
b1395ba2ed
commit
04232a34e5
@ -17,7 +17,10 @@ type Configuration struct {
|
|||||||
HDFSBaseDir string `json:"HDFSBaseDir"`
|
HDFSBaseDir string `json:"HDFSBaseDir"`
|
||||||
DFSBaseDir string `json:"DFSBaseDir"`
|
DFSBaseDir string `json:"DFSBaseDir"`
|
||||||
EnableShareRatio float64 `json:"EnableShareRatio"`
|
EnableShareRatio float64 `json:"EnableShareRatio"`
|
||||||
|
ShareMaxUtilization float64 `json:"ShareMaxUtilization"`
|
||||||
EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"`
|
EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"`
|
||||||
|
PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds to schedule ahead except pre+post */
|
||||||
|
PreScheduleTimeout int `json:"PreScheduleTimeout"`
|
||||||
|
|
||||||
mock bool
|
mock bool
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
@ -46,7 +49,9 @@ func InstanceOfConfiguration() *Configuration {
|
|||||||
HDFSBaseDir: "/user/root/",
|
HDFSBaseDir: "/user/root/",
|
||||||
DFSBaseDir: "",
|
DFSBaseDir: "",
|
||||||
EnableShareRatio: 1.5,
|
EnableShareRatio: 1.5,
|
||||||
|
ShareMaxUtilization: 1.3, // more than 1.0 to expect more improvement
|
||||||
EnablePreScheduleRatio: 1.5,
|
EnablePreScheduleRatio: 1.5,
|
||||||
|
PreScheduleExtraTime: 15,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* override conf value from env */
|
/* override conf value from env */
|
||||||
@ -84,12 +89,30 @@ func InstanceOfConfiguration() *Configuration {
|
|||||||
configurationInstance.EnableShareRatio = val
|
configurationInstance.EnableShareRatio = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
value = os.Getenv("ShareMaxUtilization")
|
||||||
|
if len(value) != 0 {
|
||||||
|
if val, err := strconv.ParseFloat(value, 32); err == nil {
|
||||||
|
configurationInstance.ShareMaxUtilization = val
|
||||||
|
}
|
||||||
|
}
|
||||||
value = os.Getenv("EnablePreScheduleRatio")
|
value = os.Getenv("EnablePreScheduleRatio")
|
||||||
if len(value) != 0 {
|
if len(value) != 0 {
|
||||||
if val, err := strconv.ParseFloat(value, 32); err == nil {
|
if val, err := strconv.ParseFloat(value, 32); err == nil {
|
||||||
configurationInstance.EnablePreScheduleRatio = val
|
configurationInstance.EnablePreScheduleRatio = val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
value = os.Getenv("PreScheduleExtraTime")
|
||||||
|
if len(value) != 0 {
|
||||||
|
if val, err := strconv.Atoi(value); err == nil {
|
||||||
|
configurationInstance.PreScheduleExtraTime = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
value = os.Getenv("PreScheduleTimeout")
|
||||||
|
if len(value) != 0 {
|
||||||
|
if val, err := strconv.Atoi(value); err == nil {
|
||||||
|
configurationInstance.PreScheduleTimeout = val
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return configurationInstance
|
return configurationInstance
|
||||||
}
|
}
|
||||||
@ -133,6 +156,9 @@ func (config *Configuration) Dump() map[string]interface{} {
|
|||||||
res["HDFSBaseDir"] = config.HDFSBaseDir
|
res["HDFSBaseDir"] = config.HDFSBaseDir
|
||||||
res["DFSBaseDir"] = config.DFSBaseDir
|
res["DFSBaseDir"] = config.DFSBaseDir
|
||||||
res["EnableShareRatio"] = config.EnableShareRatio
|
res["EnableShareRatio"] = config.EnableShareRatio
|
||||||
|
res["ShareMaxUtilization"] = config.ShareMaxUtilization
|
||||||
res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio
|
res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio
|
||||||
|
res["PreScheduleExtraTime"] = config.PreScheduleExtraTime
|
||||||
|
res["PreScheduleTimeout"] = config.PreScheduleTimeout
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
@ -120,6 +120,8 @@ func (jm *JobManager) start() {
|
|||||||
resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(err.Error())
|
log.Warn(err.Error())
|
||||||
|
jm.job.Status = Failed
|
||||||
|
jm.stop(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,6 +129,8 @@ func (jm *JobManager) start() {
|
|||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn(err)
|
log.Warn(err)
|
||||||
|
jm.job.Status = Failed
|
||||||
|
jm.stop(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,6 +138,8 @@ func (jm *JobManager) start() {
|
|||||||
err = json.Unmarshal([]byte(string(body)), &res)
|
err = json.Unmarshal([]byte(string(body)), &res)
|
||||||
if err != nil || res.Code != 0 {
|
if err != nil || res.Code != 0 {
|
||||||
log.Warn(res)
|
log.Warn(res)
|
||||||
|
jm.job.Status = Failed
|
||||||
|
jm.stop(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost, HostName: jm.job.Tasks[i].Name}
|
jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost, HostName: jm.job.Tasks[i].Name}
|
||||||
|
@ -761,7 +761,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
|
|||||||
utilT := InstanceOfOptimizer().PredictReq(job, "Worker").UtilGPU
|
utilT := InstanceOfOptimizer().PredictReq(job, "Worker").UtilGPU
|
||||||
totalUtil += utilT
|
totalUtil += utilT
|
||||||
}
|
}
|
||||||
if totalUtil < 100 {
|
if totalUtil < int(InstanceOfConfiguration().ShareMaxUtilization*100) {
|
||||||
available = append(available, status)
|
available = append(available, status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -872,7 +872,7 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
|
|||||||
for _, jobT := range jobs {
|
for _, jobT := range jobs {
|
||||||
est := InstanceOfOptimizer().PredictTime(jobT)
|
est := InstanceOfOptimizer().PredictTime(jobT)
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
if int(now-jobT.StartedAt) > est.Total-est.Post-estimate.Pre-15 {
|
if int(now-jobT.StartedAt) > est.Total-est.Post-estimate.Pre-InstanceOfConfiguration().PreScheduleExtraTime {
|
||||||
available = append(available, status)
|
available = append(available, status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user