1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 06:11:56 +00:00

support switch scheduler.strategy

This commit is contained in:
Newnius 2020-08-13 19:17:27 +08:00
parent 7c6a3ed987
commit 8ce969a163
2 changed files with 43 additions and 17 deletions

View File

@ -10,7 +10,7 @@ import (
type Configuration struct { type Configuration struct {
KafkaBrokers []string `json:"KafkaBrokers"` KafkaBrokers []string `json:"KafkaBrokers"`
KafkaTopic string `json:"KafkaTopic"` KafkaTopic string `json:"KafkaTopic"`
SchedulerPolicy string `json:"SchedulerPolicy"` SchedulerPolicy string `json:"scheduler.strategy"`
ListenAddr string `json:"ListenAddr"` ListenAddr string `json:"ListenAddr"`
HDFSAddress string `json:"HDFSAddress"` HDFSAddress string `json:"HDFSAddress"`
HDFSBaseDir string `json:"HDFSBaseDir"` HDFSBaseDir string `json:"HDFSBaseDir"`
@ -178,6 +178,45 @@ func (config *Configuration) SetPreemptEnabled(enabled bool) bool {
return true return true
} }
func (config *Configuration) SetSchedulePolicy(strategy string) bool {
config.mu.Lock()
defer config.mu.Unlock()
available := map[string]bool{
"FCFS": true,
"priority": true,
"capacity": true,
"fair": true,
}
if _, ok := available[strategy]; ok {
config.SchedulerPolicy = strategy
}
log.Info("scheduler.strategy is set to ", config.SchedulerPolicy)
return true
}
func (config *Configuration) GetScheduler() Scheduler {
config.mu.Lock()
defer config.mu.Unlock()
var scheduler Scheduler
switch config.SchedulerPolicy {
case "FCFS":
scheduler = &SchedulerFCFS{}
break
case "priority":
scheduler = &SchedulerPriority{}
break
case "capacity":
scheduler = &SchedulerCapacity{}
break
case "fair":
scheduler = &SchedulerFair{}
break
default:
scheduler = &SchedulerFCFS{}
}
return scheduler
}
func (config *Configuration) Dump() map[string]interface{} { func (config *Configuration) Dump() map[string]interface{} {
config.mu.Lock() config.mu.Lock()
defer config.mu.Unlock() defer config.mu.Unlock()

View File

@ -346,6 +346,8 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
/* allocator.strategy */ /* allocator.strategy */
case "allocator.strategy": case "allocator.strategy":
ok = InstanceOfAllocator().updateStrategy(value) ok = InstanceOfAllocator().updateStrategy(value)
scheduler = InstanceOfConfiguration().GetScheduler()
scheduler.Start()
break break
/* logger */ /* logger */
@ -412,22 +414,7 @@ func main() {
InstanceOfOptimizer().Start() InstanceOfOptimizer().Start()
InstanceOfGroupManager().Start() InstanceOfGroupManager().Start()
switch config.SchedulerPolicy { scheduler = config.GetScheduler()
case "FCFS":
scheduler = &SchedulerFCFS{}
break
case "priority":
scheduler = &SchedulerPriority{}
break
case "capacity":
scheduler = &SchedulerCapacity{}
break
case "fair":
scheduler = &SchedulerFair{}
break
default:
scheduler = &SchedulerFCFS{}
}
scheduler.Start() scheduler.Start()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {