1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00

import configuration file

This commit is contained in:
Newnius 2019-10-24 20:25:59 +08:00
parent 9df3263548
commit 142a3a7bdf
6 changed files with 61 additions and 14 deletions

9
conf/config.json Normal file
View File

@ -0,0 +1,9 @@
{
"kafkaBrokers": [
"kafka-node1:9092",
"kafka-node2:9092",
"kafka-node3:9092"
],
"kafkaTopic": "yao",
"schedulerPolicy": "fair"
}

View File

@ -12,24 +12,24 @@ var (
wg sync.WaitGroup
)
func start(pool *ResourcePool) {
consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil)
func start(pool *ResourcePool, config Configuration) {
consumer, err := sarama.NewConsumer(config.KafkaBrokers, nil)
for {
if err == nil {
break
}
log.Warn(err)
time.Sleep(time.Second * 5)
consumer, err = sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil)
consumer, err = sarama.NewConsumer(config.KafkaBrokers, nil)
}
partitionList, err := consumer.Partitions("yao")
partitionList, err := consumer.Partitions(config.KafkaTopic)
if err != nil {
panic(err)
}
for partition := range partitionList {
pc, err := consumer.ConsumePartition("yao", int32(partition), sarama.OffsetNewest)
pc, err := consumer.ConsumePartition(config.KafkaTopic, int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
@ -43,7 +43,7 @@ func start(pool *ResourcePool) {
var nodeStatus NodeStatus
err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus)
if err != nil {
log.Println(err)
log.Warn(err)
continue
}
pool.update(nodeStatus)

View File

@ -169,8 +169,6 @@ func (jm *JobManager) status() MsgJobStatus {
var res MsgTaskStatus
err = json.Unmarshal([]byte(string(body)), &res)
log.Info(res)
log.Info(string(body))
if err != nil {
continue
}

View File

@ -5,6 +5,7 @@ import (
"net/http"
log "github.com/sirupsen/logrus"
"encoding/json"
"os"
)
var addr = flag.String("addr", ":8080", "http service address")
@ -148,17 +149,50 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
}
func main() {
var confFile = "conf/config.json"
for i := 0; i < (len(os.Args)-1)/2; i++ {
if os.Args[i*2+1] == "-c" {
confFile = os.Args[i*2+2]
}
}
/* read configuration */
file, err := os.Open(confFile)
if err != nil {
log.Fatal(err)
}
defer file.Close()
/* parse configuration */
decoder := json.NewDecoder(file)
config := Configuration{}
err = decoder.Decode(&config)
if err != nil {
log.Fatal(err)
}
pool = &ResourcePool{}
pool.nodes = make(map[string]NodeStatus)
pool.start()
//scheduler = &SchedulerFCFS{}
//scheduler = &SchedulerPriority{}
switch config.SchedulerPolicy {
case "FCFS":
scheduler = &SchedulerFCFS{}
break
case "fair":
scheduler = &SchedulerFair{}
break
case "priority":
scheduler = &SchedulerPriority{}
break
default:
scheduler = &SchedulerFCFS{}
}
scheduler.Start()
go func() {
start(pool)
start(pool, config)
}()
flag.Parse()
@ -167,7 +201,7 @@ func main() {
serverAPI(w, r)
})
err := http.ListenAndServe(*addr, nil)
err = http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}

View File

@ -8,7 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"math/rand"
"strconv"
)
)
type ResourcePool struct {
mu sync.Mutex

View File

@ -8,6 +8,12 @@ import (
"net/http"
)
type Configuration struct {
KafkaBrokers []string `json:"kafkaBrokers"`
KafkaTopic string `json:"kafkaTopic"`
SchedulerPolicy string `json:"schedulerPolicy"`
}
type MsgSubmit struct {
Code int `json:"code"`
Error string `json:"error"`