From 0705c0630d16ae1269f34675eebafa1bb6b244da Mon Sep 17 00:00:00 2001 From: Newnius Date: Sun, 24 May 2020 21:07:02 +0800 Subject: [PATCH] update --- src/collector.go | 85 ++++---- src/group.go | 4 + src/history_logger.go | 2 +- src/job_manager.go | 69 ++----- src/main.go | 32 ++- src/optimizer.go | 4 + src/pool_status.go | 27 +++ src/resource_pool.go | 310 ++++++++++++++++++++++++++++- src/scheduler.go | 14 +- src/scheduler_FCFS.go | 112 +---------- src/scheduler_fair.go | 404 ++++---------------------------------- src/scheduler_priority.go | 115 +---------- src/util.go | 151 +------------- 13 files changed, 486 insertions(+), 843 deletions(-) diff --git a/src/collector.go b/src/collector.go index 26d9925..e6ecc4a 100644 --- a/src/collector.go +++ b/src/collector.go @@ -8,49 +8,64 @@ import ( "time" ) -var ( +var collectorInstance *Collector +var collectorInstanceLock sync.Mutex + +func InstanceOfColector() *Collector { + defer collectorInstanceLock.Unlock() + collectorInstanceLock.Lock() + + if collectorInstance == nil { + collectorInstance = &Collector{} + } + return collectorInstance +} + +type Collector struct { wg sync.WaitGroup -) +} -func start(pool *ResourcePool, config Configuration) { - consumer, err := sarama.NewConsumer(config.KafkaBrokers, nil) - for { - if err == nil { - break +func (collector *Collector) init(conf Configuration) { + go func() { + consumer, err := sarama.NewConsumer(conf.KafkaBrokers, nil) + for { + if err == nil { + break + } + log.Warn(err) + time.Sleep(time.Second * 5) + consumer, err = sarama.NewConsumer(conf.KafkaBrokers, nil) } - log.Warn(err) - time.Sleep(time.Second * 5) - consumer, err = sarama.NewConsumer(config.KafkaBrokers, nil) - } - partitionList, err := consumer.Partitions(config.KafkaTopic) - if err != nil { - panic(err) - } - - for partition := range partitionList { - pc, err := consumer.ConsumePartition(config.KafkaTopic, int32(partition), sarama.OffsetNewest) + partitionList, err := consumer.Partitions(conf.KafkaTopic) if err != nil { panic(err) } - defer pc.AsyncClose() - wg.Add(1) - - go func(sarama.PartitionConsumer) { - defer wg.Done() - for msg := range pc.Messages() { - var nodeStatus NodeStatus - err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus) - if err != nil { - log.Warn(err) - continue - } - pool.update(nodeStatus) + for partition := range partitionList { + pc, err := consumer.ConsumePartition(conf.KafkaTopic, int32(partition), sarama.OffsetNewest) + if err != nil { + panic(err) } + defer pc.AsyncClose() - }(pc) - } - wg.Wait() - consumer.Close() + collector.wg.Add(1) + + go func(sarama.PartitionConsumer) { + defer collector.wg.Done() + for msg := range pc.Messages() { + var nodeStatus NodeStatus + err = json.Unmarshal([]byte(string(msg.Value)), &nodeStatus) + if err != nil { + log.Warn(err) + continue + } + InstanceOfResourcePool().update(nodeStatus) + } + + }(pc) + } + collector.wg.Wait() + consumer.Close() + }() } diff --git a/src/group.go b/src/group.go index 51ff9e3..201356c 100644 --- a/src/group.go +++ b/src/group.go @@ -21,6 +21,10 @@ func InstanceOfGroupManager() *GroupManager { return groupManagerInstance } +func (gm *GroupManager) init(conf Configuration) { + +} + func (gm *GroupManager) Add(group Group) MsgGroupCreate { defer gm.mu.Unlock() gm.mu.Lock() diff --git a/src/history_logger.go b/src/history_logger.go index f4e5fc2..c7c482c 100644 --- a/src/history_logger.go +++ b/src/history_logger.go @@ -28,7 +28,7 @@ func InstanceJobHistoryLogger() *JobHistoryLogger { return jobHistoryLoggerInstance } -func (jhl *JobHistoryLogger) init() { +func (jhl *JobHistoryLogger) init(conf Configuration) { log.Info("jhl init") jhl.jobs = map[string]Job{} jhl.tasks = map[string][]TaskStatus{} diff --git a/src/job_manager.go b/src/job_manager.go index b79ab86..909d4ab 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -25,7 +25,7 @@ func (jm *JobManager) start() { jm.isRunning = false jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} - jm.network = jm.scheduler.AcquireNetwork() + jm.network = InstanceOfResourcePool().acquireNetwork() InstanceJobHistoryLogger().submitJob(jm.job) @@ -34,55 +34,26 @@ func (jm *JobManager) start() { jm.resources = append(jm.resources, NodeStatus{ClientID: "null"}) } - start := time.Now().Unix() - for i := 0; i < len(jm.job.Tasks); i++ { - var resource NodeStatus - for { - if jm.killedFlag { - break - } - - var tmp []NodeStatus - for _, t := range jm.resources { - if t.ClientID != "null" { - tmp = append(tmp, t) - } - } - resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], tmp) - if len(resource.Status) > 0 { - break - } - - if time.Now().Unix()-start > 30 { - log.Info("Wait too long, return all resource and retry") - for _, tt := range jm.resources { - if tt.ClientID != "null" { - jm.scheduler.ReleaseResource(jm.job, tt) - log.Info("return resource ", tt.ClientID) - jm.resources[i].ClientID = "null" - for _, t := range tt.Status { - jm.scheduler.Detach(t.UUID, jm.job) - } - } - } - i = -1 - start = time.Now().Unix() - } - if i == -1 { - break - } - time.Sleep(time.Second * 1) + var nodes []NodeStatus + for { + if jm.killedFlag { + break } - if len(resource.Status) > 0 { - log.Info("Receive resource", resource) - jm.resources[i] = resource - - for _, t := range resource.Status { - jm.scheduler.Attach(t.UUID, jm.job.Name) - } + nodes = jm.scheduler.AcquireResource(jm.job) + if len(nodes) > 0 { + break } - + time.Sleep(time.Second * 1) } + log.Info("Receive resource", nodes) + jm.resources = nodes + + for _, node := range nodes { + for _, t := range node.Status { + InstanceOfResourcePool().attach(t.UUID, jm.job.Name) + } + } + if !jm.killedFlag { jm.scheduler.UpdateProgress(jm.job, Running) jm.isRunning = true @@ -218,7 +189,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) bool { jm.resources[i].ClientID = "null" for _, t := range jm.resources[i].Status { - jm.scheduler.Detach(t.UUID, jm.job) + InstanceOfResourcePool().detach(t.UUID, jm.job) } InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) @@ -233,7 +204,7 @@ func (jm *JobManager) checkStatus(status []TaskStatus) bool { if !flag { jm.isRunning = false - jm.scheduler.ReleaseNetwork(jm.network) + InstanceOfResourcePool().releaseNetwork(jm.network) if !jm.killedFlag { jm.scheduler.UpdateProgress(jm.job, Finished) diff --git a/src/main.go b/src/main.go index 6132ca7..b7c62e1 100644 --- a/src/main.go +++ b/src/main.go @@ -13,21 +13,19 @@ import ( var addr = flag.String("addr", "0.0.0.0:8080", "http service address") var confFile = flag.String("conf", "/etc/yao/config.json", "configuration file path") -var pool *ResourcePool - var scheduler Scheduler func serverAPI(w http.ResponseWriter, r *http.Request) { switch r.URL.Query().Get("action") { case "resource_list": - js, _ := json.Marshal(pool.list()) + js, _ := json.Marshal(InstanceOfResourcePool().list()) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "resource_get_by_node": id := r.URL.Query().Get("id") - js, _ := json.Marshal(pool.getByID(id)) + js, _ := json.Marshal(InstanceOfResourcePool().getByID(id)) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -86,21 +84,21 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { case "pool_status_history": log.Debug("pool_status_history") - js, _ := json.Marshal(pool.statusHistory()) + js, _ := json.Marshal(InstanceOfResourcePool().statusHistory()) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "get_counter": log.Debug("get_counters") - js, _ := json.Marshal(pool.getCounter()) + js, _ := json.Marshal(InstanceOfResourcePool().getCounter()) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "get_bindings": log.Debug("get_bindings") - js, _ := json.Marshal(pool.getBindings()) + js, _ := json.Marshal(InstanceOfResourcePool().getBindings()) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -199,7 +197,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { ratio = t } - js, _ := json.Marshal(scheduler.SetShareRatio(ratio)) + js, _ := json.Marshal(InstanceOfResourcePool().SetShareRatio(ratio)) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -210,7 +208,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil { ratio = t } - js, _ := json.Marshal(scheduler.SetPreScheduleRatio(ratio)) + js, _ := json.Marshal(InstanceOfResourcePool().SetPreScheduleRatio(ratio)) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -292,11 +290,12 @@ func main() { log.Fatal(err) } - /* init jhl */ - InstanceJobHistoryLogger().init() - - pool = &ResourcePool{} - pool.start() + /* init components */ + InstanceOfResourcePool().init(config) + InstanceOfColector().init(config) + InstanceJobHistoryLogger().init(config) + InstanceOfOptimizer().init(config) + InstanceOfGroupManager().init(config) switch config.SchedulerPolicy { case "FCFS": @@ -311,13 +310,8 @@ func main() { default: scheduler = &SchedulerFCFS{} } - scheduler.Start() - go func() { - start(pool, config) - }() - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { serverAPI(w, r) }) diff --git a/src/optimizer.go b/src/optimizer.go index 8751376..d04028f 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -37,6 +37,10 @@ func InstanceOfOptimizer() *Optimizer { return optimizerInstance } +func (optimizer *Optimizer) init(conf Configuration) { + log.Info("optimizer started") +} + func (optimizer *Optimizer) feed(job string, utils []UtilGPUTimeSeries) { log.Info("optimizer feed") //log.Info(job, utils) diff --git a/src/pool_status.go b/src/pool_status.go index d9a8d3d..dafcd2f 100644 --- a/src/pool_status.go +++ b/src/pool_status.go @@ -11,3 +11,30 @@ type PoolStatus struct { TotalMemGPU int `json:"gpu_mem_total"` AvailableMemGPU int `json:"gpu_mem_available"` } + +type GPUStatus struct { + UUID string `json:"uuid"` + ProductName string `json:"product_name"` + PerformanceState string `json:"performance_state"` + MemoryTotal int `json:"memory_total"` + MemoryFree int `json:"memory_free"` + MemoryAllocated int `json:"memory_allocated"` + MemoryUsed int `json:"memory_used"` + UtilizationGPU int `json:"utilization_gpu"` + UtilizationMem int `json:"utilization_mem"` + TemperatureGPU int `json:"temperature_gpu"` + PowerDraw int `json:"power_draw"` +} + +type NodeStatus struct { + ClientID string `json:"id"` + ClientHost string `json:"host"` + Domain string `json:"domain"` + Rack int `json:"rack"` + Version float64 `json:"version"` + NumCPU int `json:"cpu_num"` + UtilCPU float64 `json:"cpu_load"` + MemTotal int `json:"mem_total"` + MemAvailable int `json:"mem_available"` + Status []GPUStatus `json:"status"` +} diff --git a/src/resource_pool.go b/src/resource_pool.go index b1631f1..5008bb6 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -12,6 +12,19 @@ import ( "hash/fnv" ) +var resourcePoolInstance *ResourcePool +var resourcePoolInstanceLock sync.Mutex + +func InstanceOfResourcePool() *ResourcePool { + defer resourcePoolInstanceLock.Unlock() + resourcePoolInstanceLock.Lock() + + if resourcePoolInstance == nil { + resourcePoolInstance = &ResourcePool{} + } + return resourcePoolInstance +} + type ResourcePool struct { poolsCount int pools []PoolSeg @@ -39,11 +52,19 @@ type ResourcePool struct { TotalGPU int TotalGPUMu sync.Mutex + UsingGPU int + UsingGPUMu sync.Mutex + subscriptions map[string]map[string]int subscriptionsMu sync.Mutex + + enableShare bool + enableShareRatio float64 + enablePreSchedule bool + enablePreScheduleRatio float64 } -func (pool *ResourcePool) start() { +func (pool *ResourcePool) init(conf Configuration) { log.Info("RM started ") pool.networks = map[string]bool{} @@ -56,6 +77,12 @@ func (pool *ResourcePool) start() { pool.subscriptions = map[string]map[string]int{} pool.TotalGPU = 0 + pool.UsingGPU = 0 + + pool.enableShare = true + pool.enableShareRatio = 0.75 + pool.enablePreSchedule = true + pool.enablePreScheduleRatio = 0.95 /* init pools */ pool.poolsCount = 300 @@ -134,6 +161,7 @@ func (pool *ResourcePool) checkDeadNodes() { } delete(seg.Nodes, v) seg.Lock.Unlock() + delete(pool.heartBeat, v) } pool.heartBeatMu.Unlock() time.Sleep(time.Second * 10) @@ -253,6 +281,7 @@ func (pool *ResourcePool) update(node NodeStatus) { if _, ok := pool.subscriptions[gpu.UUID]; ok { for jobName := range pool.subscriptions[gpu.UUID] { go func(name string) { + /* ask to update job status */ scheduler.QueryState(name) }(jobName) } @@ -431,7 +460,7 @@ func (pool *ResourcePool) acquireNetwork() string { log.Println(err.Error()) continue } - defer resp.Body.Close() + resp.Body.Close() pool.networksFree[network] = true pool.networks[network] = true break @@ -501,6 +530,33 @@ func (pool *ResourcePool) detach(GPU string, job Job) { } } +func (pool *ResourcePool) countGPU() (int, int) { + FreeGPU := 0 + UsingGPU := 0 + start := &pool.pools[0] + if start.Nodes == nil { + start = start.Next + } + for cur := start; ; { + cur.Lock.Lock() + for _, node := range cur.Nodes { + for j := range node.Status { + if node.Status[j].MemoryAllocated == 0 { + FreeGPU++ + } else { + UsingGPU++ + } + } + } + cur.Lock.Unlock() + cur = cur.Next + if cur.ID == start.ID { + break + } + } + return FreeGPU, UsingGPU +} + func (pool *ResourcePool) getBindings() map[string]map[string]int { return pool.bindings } @@ -584,3 +640,253 @@ func (pool *ResourcePool) pickNode(candidates []*NodeStatus, availableGPUs map[s return candidates[0] } + +func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { + if len(job.Tasks) == 0 { + return []NodeStatus{} + } + task := job.Tasks[0] + segID := rand.Intn(pool.poolsCount) + if pool.TotalGPU < 100 { + segID = 0 + } + start := &pool.pools[segID] + if start.Nodes == nil { + start = start.Next + } + + locks := map[int]*sync.Mutex{} + + allocationType := 0 + availableGPUs := map[string][]GPUStatus{} + + var candidates []*NodeStatus + + /* first, choose sharable GPUs */ + if pool.enableShare && (pool.TotalGPU != 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enableShareRatio) { + // check sharable + allocationType = 1 + if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { + + for cur := start; cur.ID < cur.Next.ID; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = &cur.Lock + } + + for _, node := range cur.Nodes { + var available []GPUStatus + for _, status := range node.Status { + if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated { + + if jobs, ok := pool.bindings[status.UUID]; ok { + totalUtil := util + for job := range jobs { + if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok { + totalUtil += utilT + } else { + totalUtil += 100 + } + } + if totalUtil < 100 { + available = append(available, status) + availableGPUs[node.ClientID] = available + } + } + } + } + if len(available) >= task.NumberGPU { + candidates = append(candidates, node) + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + } + } + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + cur = cur.Next + if cur.ID == start.ID { + break + } + } + } + //log.Info(candidates) + } + + /* second round, find vacant gpu */ + if len(candidates) == 0 { + allocationType = 2 + for cur := start; cur.ID < cur.Next.ID; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = &cur.Lock + } + for _, node := range cur.Nodes { + var available []GPUStatus + for _, status := range node.Status { + if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { + available = append(available, status) + } + } + if len(available) >= task.NumberGPU { + candidates = append(candidates, node) + availableGPUs[node.ClientID] = available + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + } + } + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + cur = cur.Next + if cur.ID == start.ID { + break + } + } + //log.Info(candidates) + } + + /* third round, find gpu to be released */ + if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule { + estimate, valid := InstanceOfOptimizer().predictTime(job.Name) + + //log.Info(pool.TotalGPU) + //log.Info(estimate, valid) + //log.Info(scheduler.UsingGPU) + + if pool.TotalGPU != 0 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enablePreScheduleRatio && valid { + allocationType = 3 + for cur := start; cur.ID < cur.Next.ID; { + if _, ok := locks[cur.ID]; !ok { + cur.Lock.Lock() + locks[cur.ID] = &cur.Lock + } + for _, node := range cur.Nodes { + var available []GPUStatus + for _, status := range node.Status { + bindings := pool.getBindings() + if tasks, ok := bindings[status.UUID]; ok { + if len(tasks) > 1 || status.MemoryAllocated == 0 { + continue + } + for taskT, s := range tasks { + est, valid2 := InstanceOfOptimizer().predictTime(taskT) + if valid2 { + now := (int)(time.Now().Unix()) + log.Info(s, now, estimate, est) + if now-s > est.Total-est.Post-estimate.Pre-15 { + available = append(available, status) + } + } + } + } + } + if len(available) >= task.NumberGPU { + candidates = append(candidates, node) + availableGPUs[node.ClientID] = available + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + } + } + if len(candidates) >= len(job.Tasks)*3+5 { + break + } + } + //log.Info(candidates) + } + } + + if len(candidates) > 0 { + log.Info("allocationType is ", allocationType) + //log.Info(candidates) + } + + /* assign */ + var ress []NodeStatus + if len(candidates) > 0 { + var nodes []NodeStatus + if len(job.Tasks) == 1 { + node := pool.pickNode(candidates, availableGPUs, task, job, []NodeStatus{}) + nodes = append(nodes, *node) + } + + for _, node := range nodes { + res := NodeStatus{} + res.ClientID = node.ClientID + res.ClientHost = node.ClientHost + res.Status = availableGPUs[node.ClientID][0:task.NumberGPU] + res.NumCPU = task.NumberCPU + res.MemTotal = task.Memory + + for i := range res.Status { + for j := range node.Status { + if res.Status[i].UUID == node.Status[j].UUID { + if node.Status[j].MemoryAllocated == 0 { + pool.UsingGPUMu.Lock() + pool.UsingGPU ++ + pool.UsingGPUMu.Unlock() + } + node.Status[j].MemoryAllocated += task.MemoryGPU + res.Status[i].MemoryTotal = task.MemoryGPU + } + } + } + for _, t := range res.Status { + pool.attach(t.UUID, job.Name) + } + ress = append(ress, res) + } + } + + for segID, lock := range locks { + log.Debug("Unlock ", segID) + lock.Unlock() + } + return ress +} + +func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) { + segID := pool.getNodePool(agent.ClientID) + seg := pool.pools[segID] + if seg.Nodes == nil { + seg = *seg.Next + } + seg.Lock.Lock() + defer seg.Lock.Unlock() + + node := seg.Nodes[agent.ClientID] + for _, gpu := range agent.Status { + for j := range node.Status { + if gpu.UUID == node.Status[j].UUID { + node.Status[j].MemoryAllocated -= gpu.MemoryTotal + if node.Status[j].MemoryAllocated < 0 { + // in case of error + log.Warn(node.ClientID, "More Memory Allocated") + node.Status[j].MemoryAllocated = 0 + } + if node.Status[j].MemoryAllocated == 0 { + pool.UsingGPUMu.Lock() + pool.UsingGPU-- + pool.UsingGPUMu.Unlock() + log.Info(node.Status[j].UUID, " is released") + } + //log.Info(node.Status[j].MemoryAllocated) + } + } + } +} + +func (pool *ResourcePool) SetShareRatio(ratio float64) bool { + pool.enableShareRatio = ratio + log.Info("enableShareRatio is updated to ", ratio) + return true +} + +func (pool *ResourcePool) SetPreScheduleRatio(ratio float64) bool { + pool.enablePreScheduleRatio = ratio + log.Info("enablePreScheduleRatio is updated to ", ratio) + return true +} diff --git a/src/scheduler.go b/src/scheduler.go index 0b8e475..93b4080 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -7,14 +7,10 @@ type Scheduler interface { UpdateProgress(job Job, state State) - AcquireResource(Job, Task, []NodeStatus) NodeStatus + AcquireResource(Job) []NodeStatus ReleaseResource(Job, NodeStatus) - AcquireNetwork() string - - ReleaseNetwork(network string) - QueryState(jobName string) MsgJobStatus QueryLogs(jobName string, taskName string) MsgLog @@ -25,19 +21,11 @@ type Scheduler interface { Summary() MsgSummary - Attach(GPU string, job string) - - Detach(GPU string, job Job) - Enable() bool Disable() bool UpdateParallelism(parallelism int) bool - SetShareRatio(ratio float64) bool - - SetPreScheduleRatio(ratio float64) bool - updateGroup(group Group) bool } diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index b4e2214..9d02b96 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -4,7 +4,6 @@ import ( "sync" "time" log "github.com/sirupsen/logrus" - "math/rand" ) type SchedulerFCFS struct { @@ -87,64 +86,13 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { - segID := rand.Intn(pool.poolsCount) - seg := &pool.pools[segID] - if seg.Nodes == nil { - seg = seg.Next - } - - res := NodeStatus{} - for id, node := range seg.Nodes { - var available []GPUStatus - for _, status := range node.Status { - if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { - available = append(available, status) - } - } - if len(available) >= task.NumberGPU { - res.ClientID = id - res.ClientHost = node.ClientHost - res.Status = available[0:task.NumberGPU] - res.NumCPU = task.NumberCPU - res.MemTotal = task.Memory - - for i := range res.Status { - for j := range node.Status { - if res.Status[i].UUID == node.Status[j].UUID { - node.Status[j].MemoryAllocated += task.MemoryGPU - res.Status[i].MemoryTotal = task.MemoryGPU - } - } - } - break - } - } +func (scheduler *SchedulerFCFS) AcquireResource(job Job) []NodeStatus { + res := InstanceOfResourcePool().acquireResource(job) return res } func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) { - segID := pool.getNodePool(agent.ClientID) - seg := &pool.pools[segID] - if seg.Nodes == nil { - seg = seg.Next - } - seg.Lock.Lock() - defer seg.Lock.Unlock() - - node := seg.Nodes[agent.ClientID] - for _, gpu := range agent.Status { - for j := range node.Status { - if gpu.UUID == node.Status[j].UUID { - node.Status[j].MemoryAllocated -= gpu.MemoryTotal - if node.Status[j].MemoryAllocated < 0 { - // in case of error - log.Warn(node.ClientID, "More Memory Allocated") - node.Status[j].MemoryAllocated = 0 - } - } - } - } + InstanceOfResourcePool().releaseResource(job, agent) } func (scheduler *SchedulerFCFS) QueryState(jobName string) MsgJobStatus { @@ -203,7 +151,7 @@ func (scheduler *SchedulerFCFS) Summary() MsgSummary { break case Running: runningJobsCounter++ - break; + break case Finished: finishedJobsCounter++ case Stopped: @@ -214,49 +162,11 @@ func (scheduler *SchedulerFCFS) Summary() MsgSummary { summary.JobsPending = pendingJobsCounter summary.JobsRunning = runningJobsCounter - FreeGPU := 0 - UsingGPU := 0 - - start := pool.pools[0].Next - for cur := start; ; { - cur.Lock.Lock() - for _, node := range cur.Nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ - } - } - } - cur.Lock.Unlock() - cur = cur.Next - if cur == start { - break - } - } - summary.FreeGPU = FreeGPU - summary.UsingGPU = UsingGPU + summary.FreeGPU, summary.UsingGPU = InstanceOfResourcePool().countGPU() return summary } -func (scheduler *SchedulerFCFS) AcquireNetwork() string { - return pool.acquireNetwork() -} - -func (scheduler *SchedulerFCFS) ReleaseNetwork(network string) { - pool.releaseNetwork(network) -} - -func (scheduler *SchedulerFCFS) Attach(GPU string, job string) { - pool.attach(GPU, job) -} - -func (scheduler *SchedulerFCFS) Detach(GPU string, job Job) { - pool.detach(GPU, job) -} - func (scheduler *SchedulerFCFS) Enable() bool { scheduler.enabled = true return true @@ -273,18 +183,6 @@ func (scheduler *SchedulerFCFS) UpdateParallelism(parallelism int) bool { return true } -func (scheduler *SchedulerFCFS) SetShareRatio(ratio float64) bool { - //scheduler.enableShareRatio = ratio - log.Info("enableShareRatio is updated to", ratio) - return true -} - -func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool { - //scheduler.enablePreScheduleRatio = ratio - log.Info("enablePreScheduleRatio is updated to", ratio) - return true -} - func (scheduler *SchedulerFCFS) updateGroup(group Group) bool { return true } diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 36a4bbd..0948ec6 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -5,16 +5,9 @@ import ( "time" log "github.com/sirupsen/logrus" "sort" - "math/rand" + "math" ) -type ResourceCount struct { - NumberGPU int - MemoryGPU int - CPU int - Memory int -} - type SchedulerFair struct { history []*Job historyMu sync.Mutex @@ -33,14 +26,6 @@ type SchedulerFair struct { enabled bool parallelism int - enableShare bool - enableShareRatio float64 - enablePreSchedule bool - enablePreScheduleRatio float64 - - UsingGPU int - UsingGPUMu sync.Mutex - allocatingGPU int allocatingGPUMu sync.Mutex @@ -77,12 +62,6 @@ func (scheduler *SchedulerFair) Start() { scheduler.schedulingJobsCnt = 0 scheduler.queueUsingGPU = map[string]int{} - scheduler.enableShare = true - scheduler.enableShareRatio = 0.75 - scheduler.enablePreSchedule = true - scheduler.enablePreScheduleRatio = 0.95 - - scheduler.UsingGPU = 0 scheduler.allocatingGPU = 0 scheduler.queuesSchedulingCnt = map[string]int{} @@ -132,8 +111,9 @@ func (scheduler *SchedulerFair) Start() { } scheduler.queuesUsingGPUMu.Unlock() - log.Info(cnt, reserved, pool.TotalGPU, scheduler.UsingGPU, scheduler.allocatingGPU) - if scheduler.schedulingJobsCnt > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-scheduler.UsingGPU-reserved)*10) { + pool := InstanceOfResourcePool() + log.Info(cnt, reserved, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU) + if scheduler.schedulingJobsCnt > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU-reserved)*10) { scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt-- scheduler.schedulingMu.Unlock() @@ -203,7 +183,8 @@ func (scheduler *SchedulerFair) Start() { } scheduler.queuesUsingGPUMu.Unlock() - if pool.TotalGPU-scheduler.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { + pool := InstanceOfResourcePool() + if pool.TotalGPU-pool.UsingGPU-scheduler.allocatingGPU*13/10 < 0 { continue } @@ -343,273 +324,46 @@ func (scheduler *SchedulerFair) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { - segID := rand.Intn(pool.poolsCount) - if pool.TotalGPU < 100 { - segID = 0 - } - res := NodeStatus{} - start := &pool.pools[segID] - if start.Nodes == nil { - start = start.Next - } +func (scheduler *SchedulerFair) AcquireResource(job Job) []NodeStatus { + res := InstanceOfResourcePool().acquireResource(job) - locks := map[int]*sync.Mutex{} + if len(res) != 0 { + for _, task := range job.Tasks { + scheduler.queuesUsingGPUMu.Lock() + scheduler.queueUsingGPU[job.Group] += task.NumberGPU + scheduler.queuesUsingGPUMu.Unlock() - allocationType := 0 - availableGPUs := map[string][]GPUStatus{} - - var candidates []*NodeStatus - - /* first, choose sharable GPUs */ - if scheduler.enableShare && (pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enableShareRatio) { - // check sharable - allocationType = 1 - if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { - - for cur := start; cur.ID < cur.Next.ID; { - if _, ok := locks[cur.ID]; !ok { - cur.Lock.Lock() - locks[cur.ID] = &cur.Lock - } - - for _, node := range cur.Nodes { - var available []GPUStatus - for _, status := range node.Status { - if status.MemoryAllocated > 0 && status.MemoryTotal > task.MemoryGPU+status.MemoryAllocated { - - if jobs, ok := pool.bindings[status.UUID]; ok { - totalUtil := util - for job := range jobs { - if utilT, ok := InstanceOfOptimizer().predictUtilGPU(job); ok { - totalUtil += utilT - } else { - totalUtil += 100 - } - } - if totalUtil < 100 { - available = append(available, status) - availableGPUs[node.ClientID] = available - } - } - } - } - if len(available) >= task.NumberGPU { - candidates = append(candidates, node) - if len(candidates) >= 8 { - break - } - } - } - if len(candidates) >= 8 { - break - } - cur = cur.Next - if cur.ID == start.ID { - break - } - } + scheduler.allocatingGPUMu.Lock() + scheduler.allocatingGPU -= task.NumberGPU + scheduler.allocatingGPUMu.Unlock() } - //log.Info(candidates) - } - - /* second round, find vacant gpu */ - flag := false - reserved := scheduler.reservedGPU - scheduler.queuesUsingGPUMu.Lock() - for g, v := range scheduler.queueUsingGPU { - if InstanceOfGroupManager().groups[g].Reserved { - reserved -= v - } - } - scheduler.queuesUsingGPUMu.Unlock() - if g, ok := InstanceOfGroupManager().groups[job.Group]; ok && g.Reserved && g.NumGPU > scheduler.queueUsingGPU[job.Group] { - flag = true - } - if task.NumberGPU <= pool.TotalGPU-scheduler.UsingGPU-reserved { - flag = true - } - if len(candidates) == 0 && flag { - allocationType = 2 - for cur := start; cur.ID < cur.Next.ID; { - if _, ok := locks[cur.ID]; !ok { - cur.Lock.Lock() - locks[cur.ID] = &cur.Lock - } - for _, node := range cur.Nodes { - var available []GPUStatus - for _, status := range node.Status { - if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { - available = append(available, status) - } - } - if len(available) >= task.NumberGPU { - candidates = append(candidates, node) - availableGPUs[node.ClientID] = available - if len(candidates) >= 8 { - break - } - } - } - if len(candidates) >= 8 { - break - } - cur = cur.Next - if cur.ID == start.ID { - break - } - } - //log.Info(candidates) - } - - /* third round, find gpu to be released */ - if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && scheduler.enablePreSchedule { - estimate, valid := InstanceOfOptimizer().predictTime(job.Name) - - //log.Info(pool.TotalGPU) - //log.Info(estimate, valid) - //log.Info(scheduler.UsingGPU) - - if pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enablePreScheduleRatio && valid { - allocationType = 3 - for cur := start; cur.ID < cur.Next.ID; { - if _, ok := locks[cur.ID]; !ok { - cur.Lock.Lock() - locks[cur.ID] = &cur.Lock - } - for _, node := range cur.Nodes { - var available []GPUStatus - for _, status := range node.Status { - bindings := pool.getBindings() - if tasks, ok := bindings[status.UUID]; ok { - if len(tasks) > 1 || status.MemoryAllocated == 0 { - continue - } - for task_t, s := range tasks { - est, valid2 := InstanceOfOptimizer().predictTime(task_t) - if valid2 { - now := (int)(time.Now().Unix()) - log.Info(s, now, estimate, est) - if now-s > est.Total-est.Post-estimate.Pre-15 { - available = append(available, status) - } - } - } - } - } - if len(available) >= task.NumberGPU { - candidates = append(candidates, node) - availableGPUs[node.ClientID] = available - if len(candidates) >= 8 { - break - } - } - } - if len(candidates) >= 8 { - break - } - } - //log.Info(candidates) - } - } - - if len(candidates) > 0 { - log.Info("allocationType is ", allocationType) - //log.Info(candidates) - } - - /* assign */ - if len(candidates) > 0 { - node := pool.pickNode(candidates, availableGPUs, task, job, nodes) - res.ClientID = node.ClientID - res.ClientHost = node.ClientHost - res.Status = availableGPUs[node.ClientID][0:task.NumberGPU] - res.NumCPU = task.NumberCPU - res.MemTotal = task.Memory - - for i := range res.Status { - for j := range node.Status { - if res.Status[i].UUID == node.Status[j].UUID { - if node.Status[j].MemoryAllocated == 0 { - scheduler.UsingGPUMu.Lock() - scheduler.UsingGPU ++ - scheduler.UsingGPUMu.Unlock() - } - node.Status[j].MemoryAllocated += task.MemoryGPU - res.Status[i].MemoryTotal = task.MemoryGPU - } - } - } - for _, t := range res.Status { - scheduler.Attach(t.UUID, job.Name) - } - - scheduler.queuesUsingGPUMu.Lock() - scheduler.queueUsingGPU[job.Group] += task.NumberGPU - scheduler.queuesUsingGPUMu.Unlock() - - scheduler.allocatingGPUMu.Lock() - scheduler.allocatingGPU -= task.NumberGPU - scheduler.allocatingGPUMu.Unlock() log.Info("allocatingGPU is ", scheduler.allocatingGPU) + + go func(nodes []NodeStatus) { + for _, node := range nodes { + scheduler.resourceAllocationsMu.Lock() + if _, ok := scheduler.resourceAllocations[job.Group]; !ok { + scheduler.resourceAllocations[job.Group] = &ResourceCount{} + } + cnt, _ := scheduler.resourceAllocations[job.Group] + cnt.CPU += node.MemTotal + cnt.Memory += node.NumCPU + for _, v := range node.Status { + cnt.NumberGPU ++ + cnt.MemoryGPU += v.MemoryTotal + } + scheduler.resourceAllocationsMu.Unlock() + scheduler.UpdateNextQueue() + } + + }(res) } - for segID, lock := range locks { - log.Debug("Unlock ", segID) - lock.Unlock() - } - - go func(res NodeStatus) { - if len(res.Status) == 0 { - return - } - scheduler.resourceAllocationsMu.Lock() - if _, ok := scheduler.resourceAllocations[job.Group]; !ok { - scheduler.resourceAllocations[job.Group] = &ResourceCount{} - } - cnt, _ := scheduler.resourceAllocations[job.Group] - cnt.CPU += res.MemTotal - cnt.Memory += res.NumCPU - for _, v := range res.Status { - cnt.NumberGPU ++ - cnt.MemoryGPU += v.MemoryTotal - } - scheduler.resourceAllocationsMu.Unlock() - scheduler.UpdateNextQueue() - - }(res) return res } func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { - segID := pool.getNodePool(agent.ClientID) - seg := pool.pools[segID] - if seg.Nodes == nil { - seg = *seg.Next - } - seg.Lock.Lock() - defer seg.Lock.Unlock() - - node := seg.Nodes[agent.ClientID] - for _, gpu := range agent.Status { - for j := range node.Status { - if gpu.UUID == node.Status[j].UUID { - node.Status[j].MemoryAllocated -= gpu.MemoryTotal - if node.Status[j].MemoryAllocated < 0 { - // in case of error - log.Warn(node.ClientID, "More Memory Allocated") - node.Status[j].MemoryAllocated = 0 - } - if node.Status[j].MemoryAllocated == 0 { - scheduler.UsingGPUMu.Lock() - scheduler.UsingGPU-- - scheduler.UsingGPUMu.Unlock() - log.Info(node.Status[j].UUID, " is released") - } - //log.Info(node.Status[j].MemoryAllocated) - } - } - } + InstanceOfResourcePool().releaseResource(job, agent) scheduler.queuesUsingGPUMu.Lock() if _, ok := scheduler.queueUsingGPU[job.Group]; ok { scheduler.queueUsingGPU[job.Group] -= len(agent.Status) @@ -712,7 +466,7 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { break case Running: runningJobsCounter++ - break; + break case Finished: finishedJobsCounter++ case Stopped: @@ -723,66 +477,15 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { summary.JobsPending = pendingJobsCounter summary.JobsRunning = runningJobsCounter - FreeGPU := 0 - UsingGPU := 0 - - start := pool.pools[0].Next - for cur := start; ; { - cur.Lock.Lock() - for _, node := range cur.Nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ - } - } - } - cur.Lock.Unlock() - cur = cur.Next - if cur.ID == start.ID { - break - } - } - summary.FreeGPU = FreeGPU - summary.UsingGPU = UsingGPU - + summary.FreeGPU, summary.UsingGPU = InstanceOfResourcePool().countGPU() return summary } -func (scheduler *SchedulerFair) AcquireNetwork() string { - return pool.acquireNetwork() -} - -func (scheduler *SchedulerFair) ReleaseNetwork(network string) { - pool.releaseNetwork(network) -} - func (scheduler *SchedulerFair) UpdateNextQueue() { next := "default" - quota := 9999.0 + quota := math.MaxFloat64 - NumberGPU := 0.00001 - MemoryGPU := 0.00001 - CPU := 0.00001 - Memory := 0.0001 - start := pool.pools[0].Next - for cur := start; ; { - cur.Lock.Lock() - for _, node := range cur.Nodes { - CPU += float64(node.NumCPU) - Memory += float64(node.MemTotal) - for _, card := range node.Status { - NumberGPU += 1.0 - MemoryGPU += float64(card.MemoryTotal) - } - } - cur.Lock.Unlock() - cur = cur.Next - if cur == start { - break - } - } + NumberGPU := float64(InstanceOfResourcePool().TotalGPU) + 0.00001 scheduler.queueMu.Lock() for k, t := range scheduler.queues { @@ -795,13 +498,8 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { } v := scheduler.resourceAllocations[k] - tmp := 0.0 - tmp += float64(v.CPU) / CPU - tmp += float64(v.Memory) / Memory - tmp += float64(v.NumberGPU) / NumberGPU - tmp += float64(v.MemoryGPU) / MemoryGPU + tmp := float64(v.NumberGPU) / NumberGPU scheduler.resourceAllocationsMu.Unlock() - tmp /= 4 weight := 10 if g, ok2 := InstanceOfGroupManager().groups[k]; !ok2 { weight = g.Weight @@ -817,14 +515,6 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { log.Debug("updateNextQueue ->", next) } -func (scheduler *SchedulerFair) Attach(GPU string, job string) { - pool.attach(GPU, job) -} - -func (scheduler *SchedulerFair) Detach(GPU string, job Job) { - pool.detach(GPU, job) -} - func (scheduler *SchedulerFair) Enable() bool { scheduler.enabled = true log.Info("scheduler is enabled ", time.Now()) @@ -843,18 +533,6 @@ func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool { return true } -func (scheduler *SchedulerFair) SetShareRatio(ratio float64) bool { - scheduler.enableShareRatio = ratio - log.Info("enableShareRatio is updated to ", ratio) - return true -} - -func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool { - scheduler.enablePreScheduleRatio = ratio - log.Info("enablePreScheduleRatio is updated to ", ratio) - return true -} - func (scheduler *SchedulerFair) updateGroup(group Group) bool { num := 0 for _, g := range InstanceOfGroupManager().List().Groups { diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 4cbb025..47fb36d 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -4,8 +4,7 @@ import ( "sync" "time" log "github.com/sirupsen/logrus" - "math/rand" -) + ) type SchedulerPriority struct { history []*Job @@ -111,64 +110,13 @@ func (scheduler *SchedulerPriority) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { - segID := rand.Intn(pool.poolsCount) - seg := &pool.pools[segID] - if seg.Nodes == nil { - seg = seg.Next - } - - res := NodeStatus{} - for id, node := range seg.Nodes { - var available []GPUStatus - for _, status := range node.Status { - if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { - available = append(available, status) - } - } - if len(available) >= task.NumberGPU { - res.ClientID = id - res.ClientHost = node.ClientHost - res.Status = available[0:task.NumberGPU] - res.NumCPU = task.NumberCPU - res.MemTotal = task.Memory - - for i := range res.Status { - for j := range node.Status { - if res.Status[i].UUID == node.Status[j].UUID { - node.Status[j].MemoryAllocated += task.MemoryGPU - res.Status[i].MemoryTotal = task.MemoryGPU - } - } - } - break - } - } +func (scheduler *SchedulerPriority) AcquireResource(job Job) []NodeStatus { + res := InstanceOfResourcePool().acquireResource(job) return res } func (scheduler *SchedulerPriority) ReleaseResource(job Job, agent NodeStatus) { - segID := pool.getNodePool(agent.ClientID) - seg := &pool.pools[segID] - if seg.Nodes == nil { - seg = seg.Next - } - seg.Lock.Lock() - defer seg.Lock.Unlock() - - node := seg.Nodes[agent.ClientID] - for _, gpu := range agent.Status { - for j := range node.Status { - if gpu.UUID == node.Status[j].UUID { - node.Status[j].MemoryAllocated -= gpu.MemoryTotal - if node.Status[j].MemoryAllocated < 0 { - // in case of error - log.Warn(node.ClientID, "More Memory Allocated") - node.Status[j].MemoryAllocated = 0 - } - } - } - } + InstanceOfResourcePool().releaseResource(job, agent) } func (scheduler *SchedulerPriority) QueryState(jobName string) MsgJobStatus { @@ -227,7 +175,7 @@ func (scheduler *SchedulerPriority) Summary() MsgSummary { break case Running: runningJobsCounter++ - break; + break case Finished: finishedJobsCounter++ case Stopped: @@ -238,49 +186,10 @@ func (scheduler *SchedulerPriority) Summary() MsgSummary { summary.JobsPending = pendingJobsCounter summary.JobsRunning = runningJobsCounter - FreeGPU := 0 - UsingGPU := 0 - - start := pool.pools[0].Next - for cur := start; ; { - cur.Lock.Lock() - for _, node := range cur.Nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ - } - } - } - cur.Lock.Unlock() - cur = cur.Next - if cur == start { - break - } - } - summary.FreeGPU = FreeGPU - summary.UsingGPU = UsingGPU - + summary.FreeGPU, summary.UsingGPU = InstanceOfResourcePool().countGPU() return summary } -func (scheduler *SchedulerPriority) AcquireNetwork() string { - return pool.acquireNetwork() -} - -func (scheduler *SchedulerPriority) ReleaseNetwork(network string) { - pool.releaseNetwork(network) -} - -func (scheduler *SchedulerPriority) Attach(GPU string, job string) { - pool.attach(GPU, job) -} - -func (scheduler *SchedulerPriority) Detach(GPU string, job Job) { - pool.detach(GPU, job) -} - func (scheduler *SchedulerPriority) Enable() bool { scheduler.enabled = true return true @@ -297,18 +206,6 @@ func (scheduler *SchedulerPriority) UpdateParallelism(parallelism int) bool { return true } -func (scheduler *SchedulerPriority) SetShareRatio(ratio float64) bool { - //scheduler.enableShareRatio = ratio - log.Info("enableShareRatio is updated to", ratio) - return true -} - -func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool { - //scheduler.enablePreScheduleRatio = ratio - log.Info("enablePreScheduleRatio is updated to", ratio) - return true -} - func (scheduler *SchedulerPriority) updateGroup(group Group) bool { return true } diff --git a/src/util.go b/src/util.go index fb9154d..6387878 100644 --- a/src/util.go +++ b/src/util.go @@ -6,8 +6,7 @@ import ( "time" "io" "net/http" - "sync" -) + ) type Configuration struct { KafkaBrokers []string `json:"kafkaBrokers"` @@ -15,113 +14,6 @@ type Configuration struct { SchedulerPolicy string `json:"schedulerPolicy"` } -type MsgSubmit struct { - Code int `json:"code"` - Error string `json:"error"` -} - -type MsgPoolStatusHistory struct { - Code int `json:"code"` - Error string `json:"error"` - Data []PoolStatus `json:"data"` -} - -type MsgStop struct { - Code int `json:"code"` - Error string `json:"error"` -} - -type MsgSummary struct { - Code int `json:"code"` - Error string `json:"error"` - JobsFinished int `json:"jobs_finished"` - JobsRunning int `json:"jobs_running"` - JobsPending int `json:"jobs_pending"` - FreeGPU int `json:"gpu_free"` - UsingGPU int `json:"gpu_using"` -} - -type MsgResource struct { - Code int `json:"code"` - Error string `json:"error"` - Resource map[string]NodeStatus `json:"resources"` -} - -type MsgJobList struct { - Code int `json:"code"` - Error string `json:"error"` - Jobs []Job `json:"jobs"` -} - -type MsgLog struct { - Code int `json:"code"` - Error string `json:"error"` - Logs string `json:"logs"` -} - -type MsgTaskStatus struct { - Code int `json:"code"` - Error string `json:"error"` - Status TaskStatus `json:"status"` -} - -type MsgJobStatus struct { - Code int `json:"code"` - Error string `json:"error"` - Status []TaskStatus `json:"status"` -} - -type MsgCreate struct { - Code int `json:"code"` - Error string `json:"error"` - Id string `json:"id"` -} - -type TaskStatus struct { - Id string `json:"id"` - HostName string `json:"hostname"` - Node string `json:"node"` - Image string `json:"image"` - ImageDigest string `json:"image_digest"` - Command string `json:"command"` - CreatedAt string `json:"created_at"` - FinishedAt string `json:"finished_at"` - Status string `json:"status"` - State map[string]interface{} `json:"state"` -} - -type JobStatus struct { - Name string - tasks map[string]TaskStatus -} - -type GPUStatus struct { - UUID string `json:"uuid"` - ProductName string `json:"product_name"` - PerformanceState string `json:"performance_state"` - MemoryTotal int `json:"memory_total"` - MemoryFree int `json:"memory_free"` - MemoryAllocated int `json:"memory_allocated"` - MemoryUsed int `json:"memory_used"` - UtilizationGPU int `json:"utilization_gpu"` - UtilizationMem int `json:"utilization_mem"` - TemperatureGPU int `json:"temperature_gpu"` - PowerDraw int `json:"power_draw"` -} - -type NodeStatus struct { - ClientID string `json:"id"` - ClientHost string `json:"host"` - Domain string `json:"domain"` - Rack int `json:"rack"` - Version float64 `json:"version"` - NumCPU int `json:"cpu_num"` - UtilCPU float64 `json:"cpu_load"` - MemTotal int `json:"mem_total"` - MemAvailable int `json:"mem_available"` - Status []GPUStatus `json:"status"` -} - type Job struct { ID int `json:"id"` Name string `json:"name"` @@ -149,27 +41,6 @@ type Task struct { ModelGPU string `json:"gpu_model"` } -type Group struct { - Name string `json:"name"` - Weight int `json:"weight"` - Reserved bool `json:"reserved"` - NumGPU int `json:"quota_gpu"` - MemoryGPU int `json:"quota_gpu_mem"` - CPU int `json:"quota_cpu"` - Memory int `json:"quota_mem"` -} - -type MsgGroupCreate struct { - Code int `json:"code"` - Error string `json:"error"` -} - -type MsgGroupList struct { - Code int `json:"code"` - Error string `json:"error"` - Groups []Group `json:"groups"` -} - type UtilGPUTimeSeries struct { Time int `json:"time"` Util int `json:"util"` @@ -188,21 +59,11 @@ type OptimizerUtilGPU struct { Version int `json:"version"` } -type MsgOptimizerPredict struct { - Code int `json:"code"` - Error string `json:"error"` - Total int `json:"total"` - Pre int `json:"pre"` - Main int `json:"main"` - Post int `json:"post"` -} - -type PoolSeg struct { - ID int - Nodes map[string]*NodeStatus - Lock sync.Mutex - Next *PoolSeg - IsVirtual bool +type ResourceCount struct { + NumberGPU int + MemoryGPU int + CPU int + Memory int } func str2int(str string, defaultValue int) int {