1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 22:01:55 +00:00

add files

This commit is contained in:
Newnius 2019-03-04 17:19:55 +08:00
parent 5c500fca4e
commit aa2a233485
6 changed files with 216 additions and 0 deletions

12
.gitignore vendored
View File

@ -1,3 +1,15 @@
# IntelliJ IDEA
.idea/
*.iml
# Temp files
*~
*.swp
# MacOS
.DS_Store
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##

50
src/collector.go Normal file
View File

@ -0,0 +1,50 @@
package main
import (
"sync"
"github.com/Shopify/sarama"
"encoding/json"
"log"
)
var (
wg sync.WaitGroup
)
func start(pool *ResourcePool) {
consumer, err := sarama.NewConsumer([]string{"kafka_node1:9092", "kafka_node2:9093", "kafka_node3:9094"}, nil)
if err != nil {
panic(err)
}
partitionList, err := consumer.Partitions("yao")
if err != nil {
panic(err)
}
for partition := range partitionList {
pc, err := consumer.ConsumePartition("yao", int32(partition), sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer pc.AsyncClose()
wg.Add(1)
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages() {
var msgAgent MsgAgent
err = json.Unmarshal([]byte(string(msg.Value)), &msgAgent)
if err != nil {
log.Println(err)
continue
}
pool.update(msgAgent)
}
}(pc)
}
wg.Wait()
consumer.Close()
}

27
src/job_manager.go Normal file
View File

@ -0,0 +1,27 @@
package main
import (
"time"
"log"
)
type JobManager struct {
}
func (jm *JobManager) start(id int) {
log.Println("start job ", id)
/* request for resource */
/* bring up containers */
/* monitor job execution */
for {
log.Println("executing job ", id)
time.Sleep(time.Second * 5)
}
/* save logs etc. */
/* return resource */
log.Println("finish job", id)
}

66
src/main.go Normal file
View File

@ -0,0 +1,66 @@
package main
import (
"flag"
"net/http"
"log"
"encoding/json"
)
var addr = flag.String("addr", ":8080", "http service address")
var pool *ResourcePool
func serverAPI(w http.ResponseWriter, r *http.Request) {
nodes := make([]int, 1)
for id := range pool.nodes {
nodes = append(nodes, id)
}
switch r.URL.Query().Get("action") {
case "host_gets":
js, _ := json.Marshal(nodes)
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "resource_get_by_node":
id := str2int(r.URL.Query().Get("id"), -1)
js, _ := json.Marshal(pool.getByID(id))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "job_submit":
jm := &JobManager{}
id := str2int(r.URL.Query().Get("id"), -1)
go func() {
jm.start(id)
}()
js, _ := json.Marshal(nodes)
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
default:
http.Error(w, "Not Found", http.StatusNotFound)
break
}
}
func main() {
pool = &ResourcePool{}
pool.nodes = make(map[int][]Status)
go func() {
start(pool)
}()
flag.Parse()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
serverAPI(w, r)
})
err := http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

29
src/resource_pool.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"sync"
)
type ResourcePool struct {
mu sync.Mutex
nodes map[int][]Status
}
func (pool *ResourcePool) update(node MsgAgent) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.nodes[node.ClientID] = node.Status
}
func (pool *ResourcePool) getByID(id int) []Status {
pool.mu.Lock()
defer pool.mu.Unlock()
status, ok := pool.nodes[id]
if ok {
return status
}
return []Status{}
}

32
src/util.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"strconv"
)
type Status struct {
UUID string `json:"uuid"`
ProductName string `json:"product_name"`
FanSpeed int `json:"fan_speed"`
PerformanceState string `json:"performance_state"`
MemoryTotal int `json:"emory_total"`
MemoryFree int `json:"memory_free"`
MemoryUsed int `json:"memory_used"`
UtilizationGPU int `json:"utilization_gpu"`
UtilizationMem int `json:"utilization_mem"`
TemperatureGPU int `json:"temperature_gpu"`
PowerDraw int `json:"power_draw"`
}
type MsgAgent struct {
ClientID int `json:"code"`
Status []Status `json:"status"`
}
func str2int(str string, defaultValue int) int {
i, err := strconv.Atoi(str)
if err == nil {
return i
}
return defaultValue
}