mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-16 00:26:43 +00:00
update
This commit is contained in:
@@ -8,49 +8,64 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
var collectorInstance *Collector
|
||||
var collectorInstanceLock sync.Mutex
|
||||
|
||||
func InstanceOfColector() *Collector {
|
||||
defer collectorInstanceLock.Unlock()
|
||||
collectorInstanceLock.Lock()
|
||||
|
||||
if collectorInstance == nil {
|
||||
collectorInstance = &Collector{}
|
||||
}
|
||||
return collectorInstance
|
||||
}
|
||||
|
||||
type Collector struct {
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
}
|
||||
|
||||
func start(pool *ResourcePool, config Configuration) {
|
||||
consumer, err := sarama.NewConsumer(config.KafkaBrokers, nil)
|
||||
for {
|
||||
if err == nil {
|
||||
break
|
||||
func (collector *Collector) init(conf Configuration) {
|
||||
go func() {
|
||||
consumer, err := sarama.NewConsumer(conf.KafkaBrokers, nil)
|
||||
for {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Warn(err)
|
||||
time.Sleep(time.Second * 5)
|
||||
consumer, err = sarama.NewConsumer(conf.KafkaBrokers, nil)
|
||||
}
|
||||
log.Warn(err)
|
||||
time.Sleep(time.Second * 5)
|
||||
consumer, err = sarama.NewConsumer(config.KafkaBrokers, nil)
|
||||
}
|
||||
|
||||
partitionList, err := consumer.Partitions(config.KafkaTopic)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for partition := range partitionList {
|
||||
pc, err := consumer.ConsumePartition(config.KafkaTopic, int32(partition), sarama.OffsetNewest)
|
||||
partitionList, err := consumer.Partitions(conf.KafkaTopic)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer pc.AsyncClose()
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func(sarama.PartitionConsumer) {
|
||||
defer wg.Done()
|
||||
for msg := range pc.Messages() {
|
||||
var nodeStatus NodeStatus
|
||||
err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
continue
|
||||
}
|
||||
pool.update(nodeStatus)
|
||||
for partition := range partitionList {
|
||||
pc, err := consumer.ConsumePartition(conf.KafkaTopic, int32(partition), sarama.OffsetNewest)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer pc.AsyncClose()
|
||||
|
||||
}(pc)
|
||||
}
|
||||
wg.Wait()
|
||||
consumer.Close()
|
||||
collector.wg.Add(1)
|
||||
|
||||
go func(sarama.PartitionConsumer) {
|
||||
defer collector.wg.Done()
|
||||
for msg := range pc.Messages() {
|
||||
var nodeStatus NodeStatus
|
||||
err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
continue
|
||||
}
|
||||
InstanceOfResourcePool().update(nodeStatus)
|
||||
}
|
||||
|
||||
}(pc)
|
||||
}
|
||||
collector.wg.Wait()
|
||||
consumer.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user