From 014592fa43b1078503f910be278e1784fcf3f79f Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 30 Apr 2020 21:22:21 +0800 Subject: [PATCH] update --- src/job_manager.go | 2 +- src/resource_pool.go | 20 +++++++++++++++++++- src/scheduler.go | 2 +- src/scheduler_FCFS.go | 4 ++-- src/scheduler_fair.go | 34 ++++++++++++++++++++++------------ src/scheduler_priority.go | 4 ++-- 6 files changed, 47 insertions(+), 19 deletions(-) diff --git a/src/job_manager.go b/src/job_manager.go index 86558ec..c91a418 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -33,7 +33,7 @@ func (jm *JobManager) start() { if jm.killedFlag { break } - resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i]) + resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], jm.resources) if len(resource.Status) > 0 { break } diff --git a/src/resource_pool.go b/src/resource_pool.go index fd3638f..66c1d0c 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -40,7 +40,11 @@ type ResourcePool struct { } func (pool *ResourcePool) GPUModelToPower(model string) int { - mapper := map[string]int{"k40": 1, "K80": 2, "P100": 3} + mapper := map[string]int{ + "K40": 1, "Tesla K40": 1, + "K80": 2, "Tesla K80": 2, + "P100": 3, "Tesla P100": 3, + } if power, err := mapper[model]; !err { return power } @@ -297,3 +301,17 @@ func (pool *ResourcePool) detach(GPU string, jobName string) { func (pool *ResourcePool) getBindings() map[string]map[string]int { return pool.bindings } + +func (pool *ResourcePool) pickNode(nodes []*NodeStatus) *NodeStatus { + + /* shuffle */ + r := rand.New(rand.NewSource(time.Now().Unix())) + for n := len(nodes); n > 0; n-- { + randIndex := r.Intn(n) + nodes[n-1], nodes[randIndex] = nodes[randIndex], nodes[n-1] + } + + /* sort */ + + return nodes[0] +} diff --git a/src/scheduler.go b/src/scheduler.go index 51b7126..3cd1ffc 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -7,7 +7,7 @@ type Scheduler interface { UpdateProgress(jobName string, state State) - AcquireResource(Job, Task) NodeStatus + AcquireResource(Job, Task, []NodeStatus) NodeStatus ReleaseResource(Job, NodeStatus) diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index fc8296e..bf5c892 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -87,7 +87,7 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus { +func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { poolID := rand.Intn(pool.poolsCount) pool.poolsMu[poolID].Lock() defer pool.poolsMu[poolID].Unlock() @@ -272,4 +272,4 @@ func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool { //scheduler.enablePreScheduleRatio = ratio log.Info("enablePreScheduleRatio is updated to", ratio) return true -} \ No newline at end of file +} diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index a3e771b..16cb5fb 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -35,6 +35,7 @@ type SchedulerFair struct { enablePreScheduleRatio float64 UsingGPU int + UsingGPUMu sync.Mutex } type FairJobSorter []Job @@ -102,7 +103,7 @@ func (scheduler *SchedulerFair) Start() { jm.start() }() } else { - log.Debug("No more jobs to scheduling", time.Now()) + log.Debug("No more jobs to scheduling ", time.Now()) scheduler.schedulingMu.Lock() scheduler.schedulingJobsCnt-- scheduler.schedulingMu.Unlock() @@ -193,7 +194,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { +func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { poolID := rand.Intn(pool.poolsCount) res := NodeStatus{} @@ -202,7 +203,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { allocationType := 0 availableGPUs := map[string][]GPUStatus{} - var candidates []NodeStatus + var candidates []*NodeStatus /* first, choose sharable GPUs */ if scheduler.enableShare && (pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enableShareRatio) { @@ -236,7 +237,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { } } if len(available) >= task.NumberGPU { - candidates = append(candidates, node) + candidates = append(candidates, &node) if len(candidates) >= 8 { break } @@ -266,7 +267,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { } } if len(available) >= task.NumberGPU { - candidates = append(candidates, node) + candidates = append(candidates, &node) availableGPUs[node.ClientID] = available if len(candidates) >= 8 { break @@ -316,7 +317,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { } } if len(available) >= task.NumberGPU { - candidates = append(candidates, node) + candidates = append(candidates, &node) availableGPUs[node.ClientID] = available if len(candidates) >= 8 { break @@ -338,7 +339,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { /* assign */ if len(candidates) > 0 { - node := candidates[0] + node := pool.pickNode(candidates) res.ClientID = node.ClientID res.ClientHost = node.ClientHost res.Status = availableGPUs[node.ClientID][0:task.NumberGPU] @@ -354,10 +355,16 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { } } if allocationType == 2 { + scheduler.UsingGPUMu.Lock() scheduler.UsingGPU += task.NumberGPU + scheduler.UsingGPUMu.Unlock() + log.Info(res.Status, " is using") } } + + + for i := range locks { pool.poolsMu[i].Unlock() } @@ -399,7 +406,10 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { node.Status[j].MemoryAllocated = 0 } if node.Status[j].MemoryAllocated == 0 { + scheduler.UsingGPUMu.Lock() scheduler.UsingGPU-- + scheduler.UsingGPUMu.Unlock() + log.Info(node.Status[j].UUID, " is released") } log.Info(node.Status[j].MemoryAllocated) } @@ -592,30 +602,30 @@ func (scheduler *SchedulerFair) Detach(GPU string, job string) { func (scheduler *SchedulerFair) Enable() bool { scheduler.enabled = true - log.Info("scheduler is enabled", time.Now()) + log.Info("scheduler is enabled ", time.Now()) return true } func (scheduler *SchedulerFair) Disable() bool { scheduler.enabled = false - log.Info("scheduler is disabled", time.Now()) + log.Info("scheduler is disabled ", time.Now()) return true } func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool { scheduler.parallelism = parallelism - log.Info("parallelism is updated to", parallelism) + log.Info("parallelism is updated to ", parallelism) return true } func (scheduler *SchedulerFair) SetShareRatio(ratio float64) bool { scheduler.enableShareRatio = ratio - log.Info("enableShareRatio is updated to", ratio) + log.Info("enableShareRatio is updated to ", ratio) return true } func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool { scheduler.enablePreScheduleRatio = ratio - log.Info("enablePreScheduleRatio is updated to", ratio) + log.Info("enablePreScheduleRatio is updated to ", ratio) return true } diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 0ee0bc0..ff7225c 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -111,7 +111,7 @@ func (scheduler *SchedulerPriority) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStatus { +func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus { poolID := rand.Intn(pool.poolsCount) pool.poolsMu[poolID].Lock() defer pool.poolsMu[poolID].Unlock() @@ -296,4 +296,4 @@ func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool { //scheduler.enablePreScheduleRatio = ratio log.Info("enablePreScheduleRatio is updated to", ratio) return true -} \ No newline at end of file +}