From cc6f35869971a4d7f4c266c07e32e6bba3981ce0 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 2 Jul 2020 16:58:44 +0800 Subject: [PATCH] update --- conf/config.json | 9 ------ src/configuration.go | 77 ++++++++++++++++++++++++++++++++++++++++---- src/job_manager.go | 6 ++-- src/main.go | 38 +++++++++++----------- src/optimizer.go | 34 ++++++++++++++++++- src/util.go | 10 +++--- 6 files changed, 130 insertions(+), 44 deletions(-) delete mode 100644 conf/config.json diff --git a/conf/config.json b/conf/config.json deleted file mode 100644 index e918126..0000000 --- a/conf/config.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "kafkaBrokers": [ - "kafka-node1:9092", - "kafka-node2:9092", - "kafka-node3:9092" - ], - "kafkaTopic": "yao", - "schedulerPolicy": "fair" -} \ No newline at end of file diff --git a/src/configuration.go b/src/configuration.go index 4fe0c91..0a4234c 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -3,27 +3,77 @@ package main import ( "sync" log "github.com/sirupsen/logrus" + "os" + "strings" ) type Configuration struct { - KafkaBrokers []string `json:"kafkaBrokers"` - KafkaTopic string `json:"kafkaTopic"` - SchedulerPolicy string `json:"schedulerPolicy"` + KafkaBrokers []string `json:"KafkaBrokers"` + KafkaTopic string `json:"KafkaTopic"` + SchedulerPolicy string `json:"SchedulerPolicy"` + ListenAddr string `json:"ListenAddr"` + HDFSAddress string `json:"HDFSAddress"` + HDFSBaseDir string `json:"HDFSBaseDir"` + DFSBaseDir string `json:"DFSBaseDir"` mock bool mu sync.Mutex } -var ConfigurationInstance *Configuration +var configurationInstance *Configuration var ConfigurationInstanceLock sync.Mutex func InstanceOfConfiguration() *Configuration { ConfigurationInstanceLock.Lock() defer ConfigurationInstanceLock.Unlock() - if ConfigurationInstance == nil { - ConfigurationInstance = &Configuration{mock: false} + if configurationInstance == nil { + /* set default values */ + configurationInstance = &Configuration{ + mock: false, + KafkaBrokers: []string{ + "kafka-node1:9092", + "kafka-node2:9092", + "kafka-node3:9092", + }, + KafkaTopic: "yao", + SchedulerPolicy: "fair", + ListenAddr: "0.0.0.0:8080", + HDFSAddress: "", + HDFSBaseDir: "/user/root/", + DFSBaseDir: "", + } + + /* override conf value from env */ + value := os.Getenv("KafkaBrokers") + if len(value) != 0 { + configurationInstance.KafkaBrokers = strings.Split(value, ",") + } + value = os.Getenv("KafkaTopic") + if len(value) != 0 { + configurationInstance.KafkaTopic = value + } + value = os.Getenv("SchedulerPolicy") + if len(value) != 0 { + configurationInstance.SchedulerPolicy = value + } + value = os.Getenv("ListenAddr") + if len(value) != 0 { + configurationInstance.ListenAddr = value + } + value = os.Getenv("HDFSAddress") + if len(value) != 0 { + configurationInstance.HDFSAddress = value + } + value = os.Getenv("HDFSBaseDir") + if len(value) != 0 { + configurationInstance.HDFSBaseDir = value + } + value = os.Getenv("DFSBaseDir") + if len(value) != 0 { + configurationInstance.DFSBaseDir = value + } } - return ConfigurationInstance + return configurationInstance } func (config *Configuration) EnableMock() bool { @@ -41,3 +91,16 @@ func (config *Configuration) DisableMock() bool { log.Info("configuration.mock = false") return true } + +func (config *Configuration) Dump() map[string]interface{} { + res := map[string]interface{}{} + res["KafkaBrokers"] = config.KafkaBrokers + res["KafkaTopic"] = config.KafkaTopic + res["SchedulerPolicy"] = config.SchedulerPolicy + res["ListenAddr"] = config.ListenAddr + res["Mock"] = config.mock + res["HDFSAddress"] = config.HDFSAddress + res["HDFSBaseDir"] = config.HDFSBaseDir + res["DFSBaseDir"] = config.DFSBaseDir + return res +} diff --git a/src/job_manager.go b/src/job_manager.go index fb03f8a..6996ea8 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -107,10 +107,10 @@ func (jm *JobManager) start() { v.Set("network", jm.network) v.Set("should_wait", shouldWait) v.Set("output_dir", "/tmp/") - v.Set("hdfs_address", "http://192.168.100.104:50070/") - v.Set("hdfs_dir", "/user/yao/output/"+jm.job.Name) + v.Set("hdfs_address", InstanceOfConfiguration().HDFSAddress) + v.Set("hdfs_dir", InstanceOfConfiguration().HDFSBaseDir+jm.job.Name) v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[index].MemoryGPU)) - v.Set("dfs_src", "/dfs/yao-jobs/"+jm.job.Name+"/task-"+strconv.Itoa(index)) + v.Set("dfs_src", InstanceOfConfiguration().DFSBaseDir+jm.job.Name+"/task-"+strconv.Itoa(index)) v.Set("dfs_dst", "/tmp") resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") diff --git a/src/main.go b/src/main.go index 867db1f..56cadf9 100644 --- a/src/main.go +++ b/src/main.go @@ -1,19 +1,15 @@ package main import ( - "flag" "net/http" log "github.com/sirupsen/logrus" "encoding/json" - "os" "time" "strconv" "math/rand" + "os" ) -var addr = flag.String("addr", "0.0.0.0:8080", "http service address") -var confFile = flag.String("conf", "/etc/yao/config.json", "configuration file path") - var scheduler Scheduler func serverAPI(w http.ResponseWriter, r *http.Request) { @@ -326,6 +322,13 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "debug_conf_dump": + log.Debug("debug_conf_dump") + js, _ := json.Marshal(InstanceOfConfiguration().Dump()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + default: http.Error(w, "Not Found", http.StatusNotFound) break @@ -333,21 +336,19 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { } func main() { - flag.Parse() - /* read configuration */ - file, err := os.Open(*confFile) - if err != nil { - log.Fatal(err) + value := os.Getenv("LoggerOutputDir") + if len(value) != 0 { + f, err := os.OpenFile(value, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + defer f.Close() + if err != nil { + log.Fatalf("error opening file: %v", err) + } + log.SetOutput(f) } - defer file.Close() + //log.SetLevel(log.InfoLevel) /* parse configuration */ - decoder := json.NewDecoder(file) - config := Configuration{} - err = decoder.Decode(&config) - if err != nil { - log.Fatal(err) - } + config := *InstanceOfConfiguration() /* init components */ InstanceOfResourcePool().init(config) @@ -378,9 +379,8 @@ func main() { serverAPI(w, r) }) - err = http.ListenAndServe(*addr, nil) + err := http.ListenAndServe(config.ListenAddr, nil) if err != nil { log.Fatal("ListenAndServe: ", err) } - } diff --git a/src/optimizer.go b/src/optimizer.go index 3fd17ce..0002958 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -9,10 +9,13 @@ import ( "encoding/json" "math" "hash/fnv" + "time" ) type Optimizer struct { versions map[string]int + cache map[string]OptimizerJobExecutionTime + cacheMu sync.Mutex } var optimizerInstance *Optimizer @@ -25,6 +28,25 @@ func InstanceOfOptimizer() *Optimizer { if optimizerInstance == nil { optimizerInstance = &Optimizer{} optimizerInstance.versions = map[string]int{} + optimizerInstance.cache = map[string]OptimizerJobExecutionTime{} + + go func() { + /* remove expired cache */ + for { + time.Sleep(time.Second * 30) + optimizerInstance.cacheMu.Lock() + var expired []string + for k, v := range optimizerInstance.cache { + if time.Now().Unix()-v.Version > 300 { + expired = append(expired, k) + } + } + for _, k := range expired { + delete(optimizerInstance.cache, k) + } + optimizerInstance.cacheMu.Unlock() + } + }() } return optimizerInstance } @@ -165,8 +187,14 @@ func (optimizer *Optimizer) trainTime(jobName string) { } func (optimizer *Optimizer) PredictTime(job Job) OptimizerJobExecutionTime { - res := OptimizerJobExecutionTime{Pre: 0, Post: 0, Total: math.MaxInt64} + optimizer.cacheMu.Lock() + if val, ok := optimizer.cache[job.Name]; ok { + optimizer.cacheMu.Unlock() + return val + } + optimizer.cacheMu.Unlock() + res := OptimizerJobExecutionTime{Pre: 0, Post: 0, Total: math.MaxInt64} var jobName string str := strings.Split(job.Name, "-") if len(str) == 2 { @@ -246,6 +274,10 @@ func (optimizer *Optimizer) PredictTime(job Job) OptimizerJobExecutionTime { res.Total = int(math.Ceil(v)) } } + res.Version = time.Now().Unix() + optimizer.cacheMu.Lock() + optimizer.cache[job.Name] = res + optimizer.cacheMu.Unlock() return res } diff --git a/src/util.go b/src/util.go index eedbc09..0f5e612 100644 --- a/src/util.go +++ b/src/util.go @@ -46,11 +46,11 @@ type UtilGPUTimeSeries struct { } type OptimizerJobExecutionTime struct { - Pre int `json:"pre"` - Post int `json:"post"` - Total int `json:"total"` - Main int `json:"main"` - Version int `json:"version"` + Pre int `json:"pre"` + Post int `json:"post"` + Total int `json:"total"` + Main int `json:"main"` + Version int64 `json:"version"` } type OptimizerUtilGPU struct {