2020-06-14 13:12:22 +00:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
log "github.com/sirupsen/logrus"
|
2020-07-02 08:58:44 +00:00
|
|
|
"os"
|
|
|
|
"strings"
|
2020-07-03 13:39:40 +00:00
|
|
|
"strconv"
|
2020-06-14 13:12:22 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Configuration struct {
|
2020-07-03 13:39:40 +00:00
|
|
|
KafkaBrokers []string `json:"KafkaBrokers"`
|
|
|
|
KafkaTopic string `json:"KafkaTopic"`
|
|
|
|
SchedulerPolicy string `json:"SchedulerPolicy"`
|
|
|
|
ListenAddr string `json:"ListenAddr"`
|
|
|
|
HDFSAddress string `json:"HDFSAddress"`
|
|
|
|
HDFSBaseDir string `json:"HDFSBaseDir"`
|
|
|
|
DFSBaseDir string `json:"DFSBaseDir"`
|
|
|
|
EnableShareRatio float64 `json:"EnableShareRatio"`
|
|
|
|
EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"`
|
|
|
|
|
|
|
|
mock bool
|
|
|
|
mu sync.Mutex
|
2020-06-14 13:12:22 +00:00
|
|
|
}
|
|
|
|
|
2020-07-02 08:58:44 +00:00
|
|
|
var configurationInstance *Configuration
|
2020-06-14 13:12:22 +00:00
|
|
|
var ConfigurationInstanceLock sync.Mutex
|
|
|
|
|
|
|
|
func InstanceOfConfiguration() *Configuration {
|
|
|
|
ConfigurationInstanceLock.Lock()
|
|
|
|
defer ConfigurationInstanceLock.Unlock()
|
|
|
|
|
2020-07-02 08:58:44 +00:00
|
|
|
if configurationInstance == nil {
|
|
|
|
/* set default values */
|
|
|
|
configurationInstance = &Configuration{
|
|
|
|
mock: false,
|
|
|
|
KafkaBrokers: []string{
|
|
|
|
"kafka-node1:9092",
|
|
|
|
"kafka-node2:9092",
|
|
|
|
"kafka-node3:9092",
|
|
|
|
},
|
2020-07-03 13:39:40 +00:00
|
|
|
KafkaTopic: "yao",
|
|
|
|
SchedulerPolicy: "fair",
|
|
|
|
ListenAddr: "0.0.0.0:8080",
|
|
|
|
HDFSAddress: "",
|
|
|
|
HDFSBaseDir: "/user/root/",
|
|
|
|
DFSBaseDir: "",
|
|
|
|
EnableShareRatio: 1.5,
|
|
|
|
EnablePreScheduleRatio: 1.5,
|
2020-07-02 08:58:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/* override conf value from env */
|
|
|
|
value := os.Getenv("KafkaBrokers")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.KafkaBrokers = strings.Split(value, ",")
|
|
|
|
}
|
|
|
|
value = os.Getenv("KafkaTopic")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.KafkaTopic = value
|
|
|
|
}
|
|
|
|
value = os.Getenv("SchedulerPolicy")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.SchedulerPolicy = value
|
|
|
|
}
|
|
|
|
value = os.Getenv("ListenAddr")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.ListenAddr = value
|
|
|
|
}
|
|
|
|
value = os.Getenv("HDFSAddress")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.HDFSAddress = value
|
|
|
|
}
|
|
|
|
value = os.Getenv("HDFSBaseDir")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.HDFSBaseDir = value
|
|
|
|
}
|
2020-07-06 02:51:53 +00:00
|
|
|
value = os.Getenv("DFSBaseDir")
|
|
|
|
if len(value) != 0 {
|
|
|
|
configurationInstance.DFSBaseDir = value
|
|
|
|
}
|
2020-07-03 13:39:40 +00:00
|
|
|
value = os.Getenv("EnableShareRatio")
|
|
|
|
if len(value) != 0 {
|
|
|
|
if val, err := strconv.ParseFloat(value, 32); err == nil {
|
|
|
|
configurationInstance.EnableShareRatio = val
|
|
|
|
}
|
|
|
|
}
|
|
|
|
value = os.Getenv("EnablePreScheduleRatio")
|
2020-07-02 08:58:44 +00:00
|
|
|
if len(value) != 0 {
|
2020-07-03 13:39:40 +00:00
|
|
|
if val, err := strconv.ParseFloat(value, 32); err == nil {
|
|
|
|
configurationInstance.EnablePreScheduleRatio = val
|
|
|
|
}
|
2020-07-02 08:58:44 +00:00
|
|
|
}
|
2020-06-14 13:12:22 +00:00
|
|
|
}
|
2020-07-02 08:58:44 +00:00
|
|
|
return configurationInstance
|
2020-06-14 13:12:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (config *Configuration) EnableMock() bool {
|
|
|
|
config.mu.Lock()
|
|
|
|
defer config.mu.Unlock()
|
|
|
|
config.mock = true
|
|
|
|
log.Info("configuration.mock = true")
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (config *Configuration) DisableMock() bool {
|
|
|
|
config.mu.Lock()
|
|
|
|
defer config.mu.Unlock()
|
|
|
|
config.mock = false
|
|
|
|
log.Info("configuration.mock = false")
|
|
|
|
return true
|
|
|
|
}
|
2020-07-02 08:58:44 +00:00
|
|
|
|
2020-07-03 13:39:40 +00:00
|
|
|
func (config *Configuration) SetShareRatio(ratio float64) bool {
|
|
|
|
config.EnableShareRatio = ratio
|
|
|
|
log.Info("enableShareRatio is updated to ", ratio)
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (config *Configuration) SetPreScheduleRatio(ratio float64) bool {
|
|
|
|
config.EnablePreScheduleRatio = ratio
|
|
|
|
log.Info("enablePreScheduleRatio is updated to ", ratio)
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2020-07-02 08:58:44 +00:00
|
|
|
func (config *Configuration) Dump() map[string]interface{} {
|
|
|
|
res := map[string]interface{}{}
|
|
|
|
res["KafkaBrokers"] = config.KafkaBrokers
|
|
|
|
res["KafkaTopic"] = config.KafkaTopic
|
|
|
|
res["SchedulerPolicy"] = config.SchedulerPolicy
|
|
|
|
res["ListenAddr"] = config.ListenAddr
|
|
|
|
res["Mock"] = config.mock
|
|
|
|
res["HDFSAddress"] = config.HDFSAddress
|
|
|
|
res["HDFSBaseDir"] = config.HDFSBaseDir
|
|
|
|
res["DFSBaseDir"] = config.DFSBaseDir
|
2020-07-03 13:39:40 +00:00
|
|
|
res["EnableShareRatio"] = config.EnableShareRatio
|
|
|
|
res["EnablePreScheduleRatio"] = config.EnablePreScheduleRatio
|
2020-07-02 08:58:44 +00:00
|
|
|
return res
|
|
|
|
}
|