1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 22:31:55 +00:00
This commit is contained in:
Newnius 2019-04-29 17:05:15 +08:00
parent 549e559a73
commit 00c04333ad
6 changed files with 104 additions and 53 deletions

View File

@ -1,38 +0,0 @@
package main
type AllocatorParallel struct {
}
func (allocator *AllocatorParallel) requestResource(task Task) MsgAgent {
res := MsgAgent{}
for id, node := range pool.nodes {
var available []NodeStatus
for _, status := range node {
if status.MemoryAllocated == 0 {
available = append(available, status)
}
}
if len(available) >= task.NumberGPU {
res.ClientID = id
res.Status = available[0:task.NumberGPU]
for i := range res.Status {
for j := range node {
if res.Status[i].UUID == node[j].UUID {
node[j].MemoryAllocated = task.MemoryGPU
}
}
}
}
}
return res
}
func (allocator *AllocatorParallel) returnResource(agent MsgAgent) {
nodes := pool.nodes[agent.ClientID]
for i, gpu := range agent.Status {
if gpu.UUID == nodes[i].UUID {
nodes[i].MemoryAllocated = 0
}
}
}

View File

@ -5,7 +5,8 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"encoding/json" "encoding/json"
"log" "log"
) "fmt"
)
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
@ -14,7 +15,9 @@ var (
func start(pool *ResourcePool) { func start(pool *ResourcePool) {
consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil) consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil)
if err != nil { if err != nil {
panic(err) fmt.Println(err)
return
//panic(err)
} }
partitionList, err := consumer.Partitions("yao") partitionList, err := consumer.Partitions("yao")

2
src/history_logger.go Normal file
View File

@ -0,0 +1,2 @@
package main

View File

@ -15,14 +15,9 @@ var pool *ResourcePool
var allocator *AllocatorFIFO var allocator *AllocatorFIFO
func serverAPI(w http.ResponseWriter, r *http.Request) { func serverAPI(w http.ResponseWriter, r *http.Request) {
var nodes []string
for id := range pool.nodes {
nodes = append(nodes, id)
}
switch r.URL.Query().Get("action") { switch r.URL.Query().Get("action") {
case "node_gets": case "resource_list":
js, _ := json.Marshal(nodes) js, _ := json.Marshal(pool.list())
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write(js) w.Write(js)
break break
@ -85,6 +80,13 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js) w.Write(js)
break break
case "pool_status_history":
fmt.Println("pool_status_history")
js, _ := json.Marshal(pool.statusHistory())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
default: default:
http.Error(w, "Not Found", http.StatusNotFound) http.Error(w, "Not Found", http.StatusNotFound)
break break
@ -94,6 +96,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
func main() { func main() {
pool = &ResourcePool{} pool = &ResourcePool{}
pool.nodes = make(map[string]NodeStatus) pool.nodes = make(map[string]NodeStatus)
pool.start()
allocator = &AllocatorFIFO{} allocator = &AllocatorFIFO{}
allocator.start() allocator.start()

View File

@ -2,12 +2,69 @@ package main
import ( import (
"sync" "sync"
) "time"
"strconv"
"fmt"
)
type ResourcePool struct { type ResourcePool struct {
mu sync.Mutex mu sync.Mutex
nodes map[string]NodeStatus nodes map[string]NodeStatus
history []map[string]string
}
func (pool *ResourcePool) start() {
go func() {
for {
summary := map[string]string{}
UtilCPU := 0.0
TotalCPU := 0
TotalMem := 0
AvailableMem := 0
TotalGPU := 0
UtilGPU := 0
TotalMemGPU := 0
AvailableMemGPU := 0
for _, node := range pool.nodes {
if i, err := strconv.ParseFloat(node.UtilCPU, 64); err != nil {
UtilCPU += i
}
TotalCPU += node.NumCPU
TotalMem += str2int(node.MemTotal, 0)
AvailableMem += str2int(node.MemAvailable, 0)
for _, GPU := range node.Status {
UtilGPU += GPU.UtilizationGPU
TotalGPU ++
TotalMemGPU += GPU.MemoryTotal
AvailableMemGPU += GPU.MemoryFree
}
}
summary["ts"] = time.Now().Format("2006-01-02 15:04:05")
summary["cpu_util"] = fmt.Sprintf("%.2f", UtilCPU/(float64(len(pool.nodes))+0.001))
summary["cpu_total"] = strconv.Itoa(TotalCPU)
summary["mem_total"] = strconv.Itoa(TotalMem)
summary["mem_available"] = strconv.Itoa(AvailableMem)
summary["gpu_total"] = strconv.Itoa(TotalGPU)
if TotalGPU == 0 {
summary["gpu_util"] = "0"
} else {
summary["gpu_util"] = fmt.Sprintf("%2d", UtilGPU/TotalGPU)
}
summary["gpu_mem_total"] = strconv.Itoa(TotalMemGPU)
summary["gpu_mem_available"] = strconv.Itoa(AvailableMemGPU)
pool.history = append(pool.history, summary)
if len(pool.history) > 60 {
pool.history = pool.history[0:60]
}
time.Sleep(time.Second * 60)
}
}()
} }
func (pool *ResourcePool) update(node NodeStatus) { func (pool *ResourcePool) update(node NodeStatus) {
@ -37,3 +94,11 @@ func (pool *ResourcePool) getByID(id string) NodeStatus {
} }
return NodeStatus{} return NodeStatus{}
} }
func (pool *ResourcePool) list() MsgResource {
return MsgResource{Code: 0, Resource: pool.nodes}
}
func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory {
return MsgPoolStatusHistory{Code: 0, Data: pool.history}
}

View File

@ -21,6 +21,12 @@ type MsgSubmit struct {
Error string `json:"error"` Error string `json:"error"`
} }
type MsgPoolStatusHistory struct {
Code int `json:"code"`
Error string `json:"error"`
Data []map[string]string `json:"data"`
}
type MsgStop struct { type MsgStop struct {
Code int `json:"code"` Code int `json:"code"`
Error string `json:"error"` Error string `json:"error"`
@ -36,6 +42,12 @@ type MsgSummary struct {
UsingGPU int `json:"gpu_using"` UsingGPU int `json:"gpu_using"`
} }
type MsgResource struct {
Code int `json:"code"`
Error string `json:"error"`
Resource map[string]NodeStatus `json:"resources"`
}
type MsgJobList struct { type MsgJobList struct {
Code int `json:"code"` Code int `json:"code"`
Error string `json:"error"` Error string `json:"error"`
@ -98,9 +110,13 @@ type GPUStatus struct {
} }
type NodeStatus struct { type NodeStatus struct {
ClientID string `json:"id"` ClientID string `json:"id"`
ClientHost string `json:"host"` ClientHost string `json:"host"`
Status []GPUStatus `json:"status"` NumCPU int `json:"cpu_num"`
UtilCPU string `json:"cpu_load"`
MemTotal string `json:"mem_total"`
MemAvailable string `json:"mem_available"`
Status []GPUStatus `json:"status"`
} }
type Job struct { type Job struct {