From 8ce969a163331e5588bdd41ce4a80985bf692203 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 13 Aug 2020 19:17:27 +0800 Subject: [PATCH] support switch scheduler.strategy --- src/configuration.go | 41 ++++++++++++++++++++++++++++++++++++++++- src/main.go | 19 +++---------------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/configuration.go b/src/configuration.go index 4c35b5d..aef26fa 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -10,7 +10,7 @@ import ( type Configuration struct { KafkaBrokers []string `json:"KafkaBrokers"` KafkaTopic string `json:"KafkaTopic"` - SchedulerPolicy string `json:"SchedulerPolicy"` + SchedulerPolicy string `json:"scheduler.strategy"` ListenAddr string `json:"ListenAddr"` HDFSAddress string `json:"HDFSAddress"` HDFSBaseDir string `json:"HDFSBaseDir"` @@ -178,6 +178,45 @@ func (config *Configuration) SetPreemptEnabled(enabled bool) bool { return true } +func (config *Configuration) SetSchedulePolicy(strategy string) bool { + config.mu.Lock() + defer config.mu.Unlock() + available := map[string]bool{ + "FCFS": true, + "priority": true, + "capacity": true, + "fair": true, + } + if _, ok := available[strategy]; ok { + config.SchedulerPolicy = strategy + } + log.Info("scheduler.strategy is set to ", config.SchedulerPolicy) + return true +} + +func (config *Configuration) GetScheduler() Scheduler { + config.mu.Lock() + defer config.mu.Unlock() + var scheduler Scheduler + switch config.SchedulerPolicy { + case "FCFS": + scheduler = &SchedulerFCFS{} + break + case "priority": + scheduler = &SchedulerPriority{} + break + case "capacity": + scheduler = &SchedulerCapacity{} + break + case "fair": + scheduler = &SchedulerFair{} + break + default: + scheduler = &SchedulerFCFS{} + } + return scheduler +} + func (config *Configuration) Dump() map[string]interface{} { config.mu.Lock() defer config.mu.Unlock() diff --git a/src/main.go b/src/main.go index fdb35d4..f801170 100644 --- a/src/main.go +++ b/src/main.go @@ -346,6 +346,8 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { /* allocator.strategy */ case "allocator.strategy": ok = InstanceOfAllocator().updateStrategy(value) + scheduler = InstanceOfConfiguration().GetScheduler() + scheduler.Start() break /* logger */ @@ -412,22 +414,7 @@ func main() { InstanceOfOptimizer().Start() InstanceOfGroupManager().Start() - switch config.SchedulerPolicy { - case "FCFS": - scheduler = &SchedulerFCFS{} - break - case "priority": - scheduler = &SchedulerPriority{} - break - case "capacity": - scheduler = &SchedulerCapacity{} - break - case "fair": - scheduler = &SchedulerFair{} - break - default: - scheduler = &SchedulerFCFS{} - } + scheduler = config.GetScheduler() scheduler.Start() http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {