diff --git a/src/AllocatorParallel.go b/src/AllocatorParallel.go deleted file mode 100644 index 1c7a1a3..0000000 --- a/src/AllocatorParallel.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -type AllocatorParallel struct { -} - -func (allocator *AllocatorParallel) requestResource(task Task) MsgAgent { - res := MsgAgent{} - for id, node := range pool.nodes { - var available []NodeStatus - for _, status := range node { - if status.MemoryAllocated == 0 { - available = append(available, status) - } - } - if len(available) >= task.NumberGPU { - res.ClientID = id - res.Status = available[0:task.NumberGPU] - - for i := range res.Status { - for j := range node { - if res.Status[i].UUID == node[j].UUID { - node[j].MemoryAllocated = task.MemoryGPU - } - } - } - } - } - return res -} - -func (allocator *AllocatorParallel) returnResource(agent MsgAgent) { - nodes := pool.nodes[agent.ClientID] - for i, gpu := range agent.Status { - if gpu.UUID == nodes[i].UUID { - nodes[i].MemoryAllocated = 0 - } - } -} diff --git a/src/collector.go b/src/collector.go index 1b85e3c..e91e243 100644 --- a/src/collector.go +++ b/src/collector.go @@ -5,7 +5,8 @@ import ( "github.com/Shopify/sarama" "encoding/json" "log" - ) + "fmt" +) var ( wg sync.WaitGroup @@ -14,7 +15,9 @@ var ( func start(pool *ResourcePool) { consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil) if err != nil { - panic(err) + fmt.Println(err) + return + //panic(err) } partitionList, err := consumer.Partitions("yao") diff --git a/src/history_logger.go b/src/history_logger.go new file mode 100644 index 0000000..c9ecbf5 --- /dev/null +++ b/src/history_logger.go @@ -0,0 +1,2 @@ +package main + diff --git a/src/main.go b/src/main.go index 15cf074..63e2a10 100644 --- a/src/main.go +++ b/src/main.go @@ -15,14 +15,9 @@ var pool *ResourcePool var allocator *AllocatorFIFO func serverAPI(w http.ResponseWriter, r *http.Request) { - var nodes []string - for id := range pool.nodes { - nodes = append(nodes, id) - } - switch r.URL.Query().Get("action") { - case "node_gets": - js, _ := json.Marshal(nodes) + case "resource_list": + js, _ := json.Marshal(pool.list()) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -85,6 +80,13 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "pool_status_history": + fmt.Println("pool_status_history") + js, _ := json.Marshal(pool.statusHistory()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + default: http.Error(w, "Not Found", http.StatusNotFound) break @@ -94,6 +96,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { func main() { pool = &ResourcePool{} pool.nodes = make(map[string]NodeStatus) + pool.start() allocator = &AllocatorFIFO{} allocator.start() diff --git a/src/resource_pool.go b/src/resource_pool.go index daea469..5d7aa85 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -2,12 +2,69 @@ package main import ( "sync" - ) + "time" + "strconv" + "fmt" +) type ResourcePool struct { - mu sync.Mutex - + mu sync.Mutex nodes map[string]NodeStatus + + history []map[string]string +} + +func (pool *ResourcePool) start() { + go func() { + for { + summary := map[string]string{} + + UtilCPU := 0.0 + TotalCPU := 0 + TotalMem := 0 + AvailableMem := 0 + + TotalGPU := 0 + UtilGPU := 0 + TotalMemGPU := 0 + AvailableMemGPU := 0 + for _, node := range pool.nodes { + if i, err := strconv.ParseFloat(node.UtilCPU, 64); err != nil { + UtilCPU += i + } + TotalCPU += node.NumCPU + TotalMem += str2int(node.MemTotal, 0) + AvailableMem += str2int(node.MemAvailable, 0) + + for _, GPU := range node.Status { + UtilGPU += GPU.UtilizationGPU + TotalGPU ++ + TotalMemGPU += GPU.MemoryTotal + AvailableMemGPU += GPU.MemoryFree + } + } + summary["ts"] = time.Now().Format("2006-01-02 15:04:05") + summary["cpu_util"] = fmt.Sprintf("%.2f", UtilCPU/(float64(len(pool.nodes))+0.001)) + summary["cpu_total"] = strconv.Itoa(TotalCPU) + summary["mem_total"] = strconv.Itoa(TotalMem) + summary["mem_available"] = strconv.Itoa(AvailableMem) + summary["gpu_total"] = strconv.Itoa(TotalGPU) + if TotalGPU == 0 { + summary["gpu_util"] = "0" + } else { + summary["gpu_util"] = fmt.Sprintf("%2d", UtilGPU/TotalGPU) + } + summary["gpu_mem_total"] = strconv.Itoa(TotalMemGPU) + summary["gpu_mem_available"] = strconv.Itoa(AvailableMemGPU) + + pool.history = append(pool.history, summary) + + if len(pool.history) > 60 { + pool.history = pool.history[0:60] + } + time.Sleep(time.Second * 60) + } + }() } func (pool *ResourcePool) update(node NodeStatus) { @@ -37,3 +94,11 @@ func (pool *ResourcePool) getByID(id string) NodeStatus { } return NodeStatus{} } + +func (pool *ResourcePool) list() MsgResource { + return MsgResource{Code: 0, Resource: pool.nodes} +} + +func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory { + return MsgPoolStatusHistory{Code: 0, Data: pool.history} +} diff --git a/src/util.go b/src/util.go index e57f1c0..64a6e8d 100644 --- a/src/util.go +++ b/src/util.go @@ -21,6 +21,12 @@ type MsgSubmit struct { Error string `json:"error"` } +type MsgPoolStatusHistory struct { + Code int `json:"code"` + Error string `json:"error"` + Data []map[string]string `json:"data"` +} + type MsgStop struct { Code int `json:"code"` Error string `json:"error"` @@ -36,6 +42,12 @@ type MsgSummary struct { UsingGPU int `json:"gpu_using"` } +type MsgResource struct { + Code int `json:"code"` + Error string `json:"error"` + Resource map[string]NodeStatus `json:"resources"` +} + type MsgJobList struct { Code int `json:"code"` Error string `json:"error"` @@ -98,9 +110,13 @@ type GPUStatus struct { } type NodeStatus struct { - ClientID string `json:"id"` - ClientHost string `json:"host"` - Status []GPUStatus `json:"status"` + ClientID string `json:"id"` + ClientHost string `json:"host"` + NumCPU int `json:"cpu_num"` + UtilCPU string `json:"cpu_load"` + MemTotal string `json:"mem_total"` + MemAvailable string `json:"mem_available"` + Status []GPUStatus `json:"status"` } type Job struct {