From 1504c71fb48a499291c936b8d02cb48d708ec8c8 Mon Sep 17 00:00:00 2001 From: Newnius Date: Tue, 16 Apr 2019 16:59:19 +0800 Subject: [PATCH] update --- Dockerfile => Dockerfile.dev | 6 ++++-- src/AllocatorFIFO.go | 27 ++++++++++++++------------- src/collector.go | 10 +++++----- src/job_manager.go | 12 ++++++------ src/main.go | 2 +- src/resource_pool.go | 18 +++++++++--------- src/util.go | 11 ++++++----- 7 files changed, 45 insertions(+), 41 deletions(-) rename Dockerfile => Dockerfile.dev (62%) diff --git a/Dockerfile b/Dockerfile.dev similarity index 62% rename from Dockerfile rename to Dockerfile.dev index f0531f6..58884e9 100644 --- a/Dockerfile +++ b/Dockerfile.dev @@ -1,10 +1,12 @@ FROM ubuntu:16.04 RUN apt update && \ - apt install -y wget + apt install -y wget vim git gcc httpie dnsutils RUN wget https://dl.google.com/go/go1.12.4.linux-amd64.tar.gz && \ tar -C /usr/local -xzf go1.12.4.linux-amd64.tar.gz && \ rm go1.12.4.linux-amd64.tar.gz -ENV PATH $PATH:/usr/local/go/bin \ No newline at end of file +ENV PATH $PATH:/usr/local/go/bin + +RUN go get github.com/Shopify/sarama \ No newline at end of file diff --git a/src/AllocatorFIFO.go b/src/AllocatorFIFO.go index 287d6e7..42b2fe7 100644 --- a/src/AllocatorFIFO.go +++ b/src/AllocatorFIFO.go @@ -77,26 +77,27 @@ func (allocator *AllocatorFIFO) schedule(job Job) { allocator.history = append(allocator.history, &job) } -func (allocator *AllocatorFIFO) requestResource(task Task) MsgAgent { +func (allocator *AllocatorFIFO) requestResource(task Task) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() - res := MsgAgent{} + res := NodeStatus{} for id, node := range pool.nodes { - var available []NodeStatus - for _, status := range node { + var available []GPUStatus + for _, status := range node.Status { if status.MemoryAllocated == 0 { available = append(available, status) } } if len(available) >= task.NumberGPU { res.ClientID = id + res.ClientHost = node.ClientHost res.Status = available[0:task.NumberGPU] for i := range res.Status { - for j := range node { - if res.Status[i].UUID == node[j].UUID { - node[j].MemoryAllocated = task.MemoryGPU + for j := range node.Status { + if res.Status[i].UUID == node.Status[j].UUID { + node.Status[j].MemoryAllocated = task.MemoryGPU } } } @@ -105,14 +106,14 @@ func (allocator *AllocatorFIFO) requestResource(task Task) MsgAgent { return res } -func (allocator *AllocatorFIFO) returnResource(agent MsgAgent) { +func (allocator *AllocatorFIFO) returnResource(agent NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() nodes := pool.nodes[agent.ClientID] for _, gpu := range agent.Status { - for j := range nodes { - if gpu.UUID == nodes[j].UUID { - nodes[j].MemoryAllocated = 0 + for j := range nodes.Status { + if gpu.UUID == nodes.Status[j].UUID { + nodes.Status[j].MemoryAllocated = 0 } } } @@ -170,8 +171,8 @@ func (allocator *AllocatorFIFO) summary() MsgSummary { UsingGPU := 0 for _, node := range pool.nodes { - for j := range node { - if node[j].MemoryAllocated == 0 { + for j := range node.Status { + if node.Status[j].MemoryAllocated == 0 { FreeGPU++ } else { UsingGPU++ diff --git a/src/collector.go b/src/collector.go index 4402cea..1b85e3c 100644 --- a/src/collector.go +++ b/src/collector.go @@ -5,14 +5,14 @@ import ( "github.com/Shopify/sarama" "encoding/json" "log" -) + ) var ( wg sync.WaitGroup ) func start(pool *ResourcePool) { - consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil) + consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil) if err != nil { panic(err) } @@ -34,13 +34,13 @@ func start(pool *ResourcePool) { go func(sarama.PartitionConsumer) { defer wg.Done() for msg := range pc.Messages() { - var msgAgent MsgAgent - err = json.Unmarshal([]byte(string(msg.Value)), &msgAgent) + var nodeStatus NodeStatus + err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus) if err != nil { log.Println(err) continue } - pool.update(msgAgent) + pool.update(nodeStatus) } }(pc) diff --git a/src/job_manager.go b/src/job_manager.go index c6da671..9edca8f 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -14,7 +14,7 @@ type JobManager struct { allocator *AllocatorFIFO job Job jobStatus JobStatus - resources []MsgAgent + resources []NodeStatus } func (jm *JobManager) start() { @@ -23,7 +23,7 @@ func (jm *JobManager) start() { /* request for resources */ for i := range jm.job.Tasks { - var resource MsgAgent + var resource NodeStatus for { resource = jm.allocator.requestResource(jm.job.Tasks[i]) if len(resource.Status) > 0 { @@ -51,7 +51,7 @@ func (jm *JobManager) start() { fmt.Println(v.Encode()) - resp, err := doRequest("POST", "http://kafka:8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") if err != nil { log.Println(err.Error()) return @@ -73,7 +73,7 @@ func (jm *JobManager) start() { return } - jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id} + jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id, Node: jm.resources[i].ClientHost} } jm.allocator.running(&jm.job) @@ -108,7 +108,7 @@ func (jm *JobManager) start() { func (jm *JobManager) logs(taskName string) MsgLog { spider := Spider{} spider.Method = "GET" - spider.URL = "http://kafka_node1:8000/logs?id=" + taskName + spider.URL = "http://" + jm.jobStatus.tasks[taskName].Node + ":8000/logs?id=" + jm.jobStatus.tasks[taskName].Id err := spider.do() if err != nil { @@ -137,7 +137,7 @@ func (jm *JobManager) status() MsgJobStatus { for _, taskStatus := range jm.jobStatus.tasks { spider := Spider{} spider.Method = "GET" - spider.URL = "http://kafka_node1:8000/status?id=" + taskStatus.Id + spider.URL = "http://" + taskStatus.Node + ":8000/status?id=" + taskStatus.Id err := spider.do() if err != nil { diff --git a/src/main.go b/src/main.go index b2c27e5..9d77725 100644 --- a/src/main.go +++ b/src/main.go @@ -87,7 +87,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { func main() { pool = &ResourcePool{} - pool.nodes = make(map[int][]NodeStatus) + pool.nodes = make(map[int]NodeStatus) allocator = &AllocatorFIFO{} allocator.start() diff --git a/src/resource_pool.go b/src/resource_pool.go index 1e9e6b7..3fde85b 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -2,32 +2,32 @@ package main import ( "sync" - ) +) type ResourcePool struct { mu sync.Mutex - nodes map[int][]NodeStatus + nodes map[int]NodeStatus } -func (pool *ResourcePool) update(node MsgAgent) { +func (pool *ResourcePool) update(node NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() status, ok := pool.nodes[node.ClientID] if ok { - for i := range status { - if status[i].UUID == node.Status[i].UUID { - node.Status[i].MemoryAllocated = status[i].MemoryAllocated + for i, GPU := range status.Status { + if GPU.UUID == node.Status[i].UUID { + node.Status[i].MemoryAllocated = GPU.MemoryAllocated } } } - pool.nodes[node.ClientID] = node.Status + pool.nodes[node.ClientID] = node //log.Println(pool.nodes) } -func (pool *ResourcePool) getByID(id int) []NodeStatus { +func (pool *ResourcePool) getByID(id int) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() @@ -35,5 +35,5 @@ func (pool *ResourcePool) getByID(id int) []NodeStatus { if ok { return status } - return []NodeStatus{} + return NodeStatus{} } diff --git a/src/util.go b/src/util.go index f29f9bf..ce785b8 100644 --- a/src/util.go +++ b/src/util.go @@ -64,6 +64,7 @@ type MsgCreate struct { type TaskStatus struct { Id string `json:"id"` Name string `json:"name"` + Node string `json:"node"` Image string `json:"image"` ImageDigest string `json:"image_digest"` Command string `json:"command"` @@ -77,7 +78,7 @@ type JobStatus struct { tasks map[string]TaskStatus } -type NodeStatus struct { +type GPUStatus struct { UUID string `json:"uuid"` ProductName string `json:"product_name"` PerformanceState string `json:"performance_state"` @@ -91,10 +92,10 @@ type NodeStatus struct { PowerDraw int `json:"power_draw"` } -type MsgAgent struct { - ClientID int `json:"code"` - ClientHost string `json:"host"` - Status []NodeStatus `json:"status"` +type NodeStatus struct { + ClientID int `json:"code"` + ClientHost string `json:"host"` + Status []GPUStatus `json:"status"` } type Job struct {