1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 05:51:54 +00:00
This commit is contained in:
Newnius 2020-07-02 16:58:44 +08:00
parent a66e882e08
commit cc6f358699
6 changed files with 130 additions and 44 deletions

View File

@ -1,9 +0,0 @@
{
"kafkaBrokers": [
"kafka-node1:9092",
"kafka-node2:9092",
"kafka-node3:9092"
],
"kafkaTopic": "yao",
"schedulerPolicy": "fair"
}

View File

@ -3,27 +3,77 @@ package main
import (
"sync"
log "github.com/sirupsen/logrus"
"os"
"strings"
)
type Configuration struct {
KafkaBrokers []string `json:"kafkaBrokers"`
KafkaTopic string `json:"kafkaTopic"`
SchedulerPolicy string `json:"schedulerPolicy"`
KafkaBrokers []string `json:"KafkaBrokers"`
KafkaTopic string `json:"KafkaTopic"`
SchedulerPolicy string `json:"SchedulerPolicy"`
ListenAddr string `json:"ListenAddr"`
HDFSAddress string `json:"HDFSAddress"`
HDFSBaseDir string `json:"HDFSBaseDir"`
DFSBaseDir string `json:"DFSBaseDir"`
mock bool
mu sync.Mutex
}
var ConfigurationInstance *Configuration
var configurationInstance *Configuration
var ConfigurationInstanceLock sync.Mutex
func InstanceOfConfiguration() *Configuration {
ConfigurationInstanceLock.Lock()
defer ConfigurationInstanceLock.Unlock()
if ConfigurationInstance == nil {
ConfigurationInstance = &Configuration{mock: false}
if configurationInstance == nil {
/* set default values */
configurationInstance = &Configuration{
mock: false,
KafkaBrokers: []string{
"kafka-node1:9092",
"kafka-node2:9092",
"kafka-node3:9092",
},
KafkaTopic: "yao",
SchedulerPolicy: "fair",
ListenAddr: "0.0.0.0:8080",
HDFSAddress: "",
HDFSBaseDir: "/user/root/",
DFSBaseDir: "",
}
/* override conf value from env */
value := os.Getenv("KafkaBrokers")
if len(value) != 0 {
configurationInstance.KafkaBrokers = strings.Split(value, ",")
}
value = os.Getenv("KafkaTopic")
if len(value) != 0 {
configurationInstance.KafkaTopic = value
}
value = os.Getenv("SchedulerPolicy")
if len(value) != 0 {
configurationInstance.SchedulerPolicy = value
}
value = os.Getenv("ListenAddr")
if len(value) != 0 {
configurationInstance.ListenAddr = value
}
value = os.Getenv("HDFSAddress")
if len(value) != 0 {
configurationInstance.HDFSAddress = value
}
value = os.Getenv("HDFSBaseDir")
if len(value) != 0 {
configurationInstance.HDFSBaseDir = value
}
value = os.Getenv("DFSBaseDir")
if len(value) != 0 {
configurationInstance.DFSBaseDir = value
}
}
return ConfigurationInstance
return configurationInstance
}
func (config *Configuration) EnableMock() bool {
@ -41,3 +91,16 @@ func (config *Configuration) DisableMock() bool {
log.Info("configuration.mock = false")
return true
}
func (config *Configuration) Dump() map[string]interface{} {
res := map[string]interface{}{}
res["KafkaBrokers"] = config.KafkaBrokers
res["KafkaTopic"] = config.KafkaTopic
res["SchedulerPolicy"] = config.SchedulerPolicy
res["ListenAddr"] = config.ListenAddr
res["Mock"] = config.mock
res["HDFSAddress"] = config.HDFSAddress
res["HDFSBaseDir"] = config.HDFSBaseDir
res["DFSBaseDir"] = config.DFSBaseDir
return res
}

View File

@ -107,10 +107,10 @@ func (jm *JobManager) start() {
v.Set("network", jm.network)
v.Set("should_wait", shouldWait)
v.Set("output_dir", "/tmp/")
v.Set("hdfs_address", "http://192.168.100.104:50070/")
v.Set("hdfs_dir", "/user/yao/output/"+jm.job.Name)
v.Set("hdfs_address", InstanceOfConfiguration().HDFSAddress)
v.Set("hdfs_dir", InstanceOfConfiguration().HDFSBaseDir+jm.job.Name)
v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[index].MemoryGPU))
v.Set("dfs_src", "/dfs/yao-jobs/"+jm.job.Name+"/task-"+strconv.Itoa(index))
v.Set("dfs_src", InstanceOfConfiguration().DFSBaseDir+jm.job.Name+"/task-"+strconv.Itoa(index))
v.Set("dfs_dst", "/tmp")
resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")

View File

@ -1,19 +1,15 @@
package main
import (
"flag"
"net/http"
log "github.com/sirupsen/logrus"
"encoding/json"
"os"
"time"
"strconv"
"math/rand"
"os"
)
var addr = flag.String("addr", "0.0.0.0:8080", "http service address")
var confFile = flag.String("conf", "/etc/yao/config.json", "configuration file path")
var scheduler Scheduler
func serverAPI(w http.ResponseWriter, r *http.Request) {
@ -326,6 +322,13 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js)
break
case "debug_conf_dump":
log.Debug("debug_conf_dump")
js, _ := json.Marshal(InstanceOfConfiguration().Dump())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
default:
http.Error(w, "Not Found", http.StatusNotFound)
break
@ -333,21 +336,19 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
}
func main() {
flag.Parse()
/* read configuration */
file, err := os.Open(*confFile)
if err != nil {
log.Fatal(err)
value := os.Getenv("LoggerOutputDir")
if len(value) != 0 {
f, err := os.OpenFile(value, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
defer f.Close()
if err != nil {
log.Fatalf("error opening file: %v", err)
}
log.SetOutput(f)
}
defer file.Close()
//log.SetLevel(log.InfoLevel)
/* parse configuration */
decoder := json.NewDecoder(file)
config := Configuration{}
err = decoder.Decode(&config)
if err != nil {
log.Fatal(err)
}
config := *InstanceOfConfiguration()
/* init components */
InstanceOfResourcePool().init(config)
@ -378,9 +379,8 @@ func main() {
serverAPI(w, r)
})
err = http.ListenAndServe(*addr, nil)
err := http.ListenAndServe(config.ListenAddr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

View File

@ -9,10 +9,13 @@ import (
"encoding/json"
"math"
"hash/fnv"
"time"
)
type Optimizer struct {
versions map[string]int
cache map[string]OptimizerJobExecutionTime
cacheMu sync.Mutex
}
var optimizerInstance *Optimizer
@ -25,6 +28,25 @@ func InstanceOfOptimizer() *Optimizer {
if optimizerInstance == nil {
optimizerInstance = &Optimizer{}
optimizerInstance.versions = map[string]int{}
optimizerInstance.cache = map[string]OptimizerJobExecutionTime{}
go func() {
/* remove expired cache */
for {
time.Sleep(time.Second * 30)
optimizerInstance.cacheMu.Lock()
var expired []string
for k, v := range optimizerInstance.cache {
if time.Now().Unix()-v.Version > 300 {
expired = append(expired, k)
}
}
for _, k := range expired {
delete(optimizerInstance.cache, k)
}
optimizerInstance.cacheMu.Unlock()
}
}()
}
return optimizerInstance
}
@ -165,8 +187,14 @@ func (optimizer *Optimizer) trainTime(jobName string) {
}
func (optimizer *Optimizer) PredictTime(job Job) OptimizerJobExecutionTime {
res := OptimizerJobExecutionTime{Pre: 0, Post: 0, Total: math.MaxInt64}
optimizer.cacheMu.Lock()
if val, ok := optimizer.cache[job.Name]; ok {
optimizer.cacheMu.Unlock()
return val
}
optimizer.cacheMu.Unlock()
res := OptimizerJobExecutionTime{Pre: 0, Post: 0, Total: math.MaxInt64}
var jobName string
str := strings.Split(job.Name, "-")
if len(str) == 2 {
@ -246,6 +274,10 @@ func (optimizer *Optimizer) PredictTime(job Job) OptimizerJobExecutionTime {
res.Total = int(math.Ceil(v))
}
}
res.Version = time.Now().Unix()
optimizer.cacheMu.Lock()
optimizer.cache[job.Name] = res
optimizer.cacheMu.Unlock()
return res
}

View File

@ -46,11 +46,11 @@ type UtilGPUTimeSeries struct {
}
type OptimizerJobExecutionTime struct {
Pre int `json:"pre"`
Post int `json:"post"`
Total int `json:"total"`
Main int `json:"main"`
Version int `json:"version"`
Pre int `json:"pre"`
Post int `json:"post"`
Total int `json:"total"`
Main int `json:"main"`
Version int64 `json:"version"`
}
type OptimizerUtilGPU struct {