1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2019-04-16 16:59:19 +08:00
parent e34d75dc7f
commit 1504c71fb4
7 changed files with 45 additions and 41 deletions

View File

@@ -1,10 +1,12 @@
FROM ubuntu:16.04 FROM ubuntu:16.04
RUN apt update && \ RUN apt update && \
apt install -y wget apt install -y wget vim git gcc httpie dnsutils
RUN wget https://dl.google.com/go/go1.12.4.linux-amd64.tar.gz && \ RUN wget https://dl.google.com/go/go1.12.4.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go1.12.4.linux-amd64.tar.gz && \ tar -C /usr/local -xzf go1.12.4.linux-amd64.tar.gz && \
rm go1.12.4.linux-amd64.tar.gz rm go1.12.4.linux-amd64.tar.gz
ENV PATH $PATH:/usr/local/go/bin ENV PATH $PATH:/usr/local/go/bin
RUN go get github.com/Shopify/sarama

View File

@@ -77,26 +77,27 @@ func (allocator *AllocatorFIFO) schedule(job Job) {
allocator.history = append(allocator.history, &job) allocator.history = append(allocator.history, &job)
} }
func (allocator *AllocatorFIFO) requestResource(task Task) MsgAgent { func (allocator *AllocatorFIFO) requestResource(task Task) NodeStatus {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
res := MsgAgent{} res := NodeStatus{}
for id, node := range pool.nodes { for id, node := range pool.nodes {
var available []NodeStatus var available []GPUStatus
for _, status := range node { for _, status := range node.Status {
if status.MemoryAllocated == 0 { if status.MemoryAllocated == 0 {
available = append(available, status) available = append(available, status)
} }
} }
if len(available) >= task.NumberGPU { if len(available) >= task.NumberGPU {
res.ClientID = id res.ClientID = id
res.ClientHost = node.ClientHost
res.Status = available[0:task.NumberGPU] res.Status = available[0:task.NumberGPU]
for i := range res.Status { for i := range res.Status {
for j := range node { for j := range node.Status {
if res.Status[i].UUID == node[j].UUID { if res.Status[i].UUID == node.Status[j].UUID {
node[j].MemoryAllocated = task.MemoryGPU node.Status[j].MemoryAllocated = task.MemoryGPU
} }
} }
} }
@@ -105,14 +106,14 @@ func (allocator *AllocatorFIFO) requestResource(task Task) MsgAgent {
return res return res
} }
func (allocator *AllocatorFIFO) returnResource(agent MsgAgent) { func (allocator *AllocatorFIFO) returnResource(agent NodeStatus) {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
nodes := pool.nodes[agent.ClientID] nodes := pool.nodes[agent.ClientID]
for _, gpu := range agent.Status { for _, gpu := range agent.Status {
for j := range nodes { for j := range nodes.Status {
if gpu.UUID == nodes[j].UUID { if gpu.UUID == nodes.Status[j].UUID {
nodes[j].MemoryAllocated = 0 nodes.Status[j].MemoryAllocated = 0
} }
} }
} }
@@ -170,8 +171,8 @@ func (allocator *AllocatorFIFO) summary() MsgSummary {
UsingGPU := 0 UsingGPU := 0
for _, node := range pool.nodes { for _, node := range pool.nodes {
for j := range node { for j := range node.Status {
if node[j].MemoryAllocated == 0 { if node.Status[j].MemoryAllocated == 0 {
FreeGPU++ FreeGPU++
} else { } else {
UsingGPU++ UsingGPU++

View File

@@ -5,14 +5,14 @@ import (
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"encoding/json" "encoding/json"
"log" "log"
) )
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
) )
func start(pool *ResourcePool) { func start(pool *ResourcePool) {
consumer, err := sarama.NewConsumer([]string{"kafka:9092"}, nil) consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -34,13 +34,13 @@ func start(pool *ResourcePool) {
go func(sarama.PartitionConsumer) { go func(sarama.PartitionConsumer) {
defer wg.Done() defer wg.Done()
for msg := range pc.Messages() { for msg := range pc.Messages() {
var msgAgent MsgAgent var nodeStatus NodeStatus
err = json.Unmarshal([]byte(string(msg.Value)), &msgAgent) err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
continue continue
} }
pool.update(msgAgent) pool.update(nodeStatus)
} }
}(pc) }(pc)

View File

@@ -14,7 +14,7 @@ type JobManager struct {
allocator *AllocatorFIFO allocator *AllocatorFIFO
job Job job Job
jobStatus JobStatus jobStatus JobStatus
resources []MsgAgent resources []NodeStatus
} }
func (jm *JobManager) start() { func (jm *JobManager) start() {
@@ -23,7 +23,7 @@ func (jm *JobManager) start() {
/* request for resources */ /* request for resources */
for i := range jm.job.Tasks { for i := range jm.job.Tasks {
var resource MsgAgent var resource NodeStatus
for { for {
resource = jm.allocator.requestResource(jm.job.Tasks[i]) resource = jm.allocator.requestResource(jm.job.Tasks[i])
if len(resource.Status) > 0 { if len(resource.Status) > 0 {
@@ -51,7 +51,7 @@ func (jm *JobManager) start() {
fmt.Println(v.Encode()) fmt.Println(v.Encode())
resp, err := doRequest("POST", "http://kafka:8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
if err != nil { if err != nil {
log.Println(err.Error()) log.Println(err.Error())
return return
@@ -73,7 +73,7 @@ func (jm *JobManager) start() {
return return
} }
jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id} jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id, Node: jm.resources[i].ClientHost}
} }
jm.allocator.running(&jm.job) jm.allocator.running(&jm.job)
@@ -108,7 +108,7 @@ func (jm *JobManager) start() {
func (jm *JobManager) logs(taskName string) MsgLog { func (jm *JobManager) logs(taskName string) MsgLog {
spider := Spider{} spider := Spider{}
spider.Method = "GET" spider.Method = "GET"
spider.URL = "http://kafka_node1:8000/logs?id=" + taskName spider.URL = "http://" + jm.jobStatus.tasks[taskName].Node + ":8000/logs?id=" + jm.jobStatus.tasks[taskName].Id
err := spider.do() err := spider.do()
if err != nil { if err != nil {
@@ -137,7 +137,7 @@ func (jm *JobManager) status() MsgJobStatus {
for _, taskStatus := range jm.jobStatus.tasks { for _, taskStatus := range jm.jobStatus.tasks {
spider := Spider{} spider := Spider{}
spider.Method = "GET" spider.Method = "GET"
spider.URL = "http://kafka_node1:8000/status?id=" + taskStatus.Id spider.URL = "http://" + taskStatus.Node + ":8000/status?id=" + taskStatus.Id
err := spider.do() err := spider.do()
if err != nil { if err != nil {

View File

@@ -87,7 +87,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
func main() { func main() {
pool = &ResourcePool{} pool = &ResourcePool{}
pool.nodes = make(map[int][]NodeStatus) pool.nodes = make(map[int]NodeStatus)
allocator = &AllocatorFIFO{} allocator = &AllocatorFIFO{}
allocator.start() allocator.start()

View File

@@ -2,32 +2,32 @@ package main
import ( import (
"sync" "sync"
) )
type ResourcePool struct { type ResourcePool struct {
mu sync.Mutex mu sync.Mutex
nodes map[int][]NodeStatus nodes map[int]NodeStatus
} }
func (pool *ResourcePool) update(node MsgAgent) { func (pool *ResourcePool) update(node NodeStatus) {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
status, ok := pool.nodes[node.ClientID] status, ok := pool.nodes[node.ClientID]
if ok { if ok {
for i := range status { for i, GPU := range status.Status {
if status[i].UUID == node.Status[i].UUID { if GPU.UUID == node.Status[i].UUID {
node.Status[i].MemoryAllocated = status[i].MemoryAllocated node.Status[i].MemoryAllocated = GPU.MemoryAllocated
} }
} }
} }
pool.nodes[node.ClientID] = node.Status pool.nodes[node.ClientID] = node
//log.Println(pool.nodes) //log.Println(pool.nodes)
} }
func (pool *ResourcePool) getByID(id int) []NodeStatus { func (pool *ResourcePool) getByID(id int) NodeStatus {
pool.mu.Lock() pool.mu.Lock()
defer pool.mu.Unlock() defer pool.mu.Unlock()
@@ -35,5 +35,5 @@ func (pool *ResourcePool) getByID(id int) []NodeStatus {
if ok { if ok {
return status return status
} }
return []NodeStatus{} return NodeStatus{}
} }

View File

@@ -64,6 +64,7 @@ type MsgCreate struct {
type TaskStatus struct { type TaskStatus struct {
Id string `json:"id"` Id string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Node string `json:"node"`
Image string `json:"image"` Image string `json:"image"`
ImageDigest string `json:"image_digest"` ImageDigest string `json:"image_digest"`
Command string `json:"command"` Command string `json:"command"`
@@ -77,7 +78,7 @@ type JobStatus struct {
tasks map[string]TaskStatus tasks map[string]TaskStatus
} }
type NodeStatus struct { type GPUStatus struct {
UUID string `json:"uuid"` UUID string `json:"uuid"`
ProductName string `json:"product_name"` ProductName string `json:"product_name"`
PerformanceState string `json:"performance_state"` PerformanceState string `json:"performance_state"`
@@ -91,10 +92,10 @@ type NodeStatus struct {
PowerDraw int `json:"power_draw"` PowerDraw int `json:"power_draw"`
} }
type MsgAgent struct { type NodeStatus struct {
ClientID int `json:"code"` ClientID int `json:"code"`
ClientHost string `json:"host"` ClientHost string `json:"host"`
Status []NodeStatus `json:"status"` Status []GPUStatus `json:"status"`
} }
type Job struct { type Job struct {