From 74373fb950dac2060cbb3a230d234b240272f621 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 1 Aug 2019 13:42:53 +0800 Subject: [PATCH] update shceduler_fair --- src/group.go | 3 +- src/job_manager.go | 6 +-- src/scheduler.go | 4 +- src/scheduler_FCFS.go | 4 +- src/scheduler_fair.go | 101 ++++++++++++++++++++++++++++++-------- src/scheduler_priority.go | 4 +- 6 files changed, 90 insertions(+), 32 deletions(-) diff --git a/src/group.go b/src/group.go index a847580..51ff9e3 100644 --- a/src/group.go +++ b/src/group.go @@ -54,8 +54,7 @@ func (gm *GroupManager) Remove(group Group) MsgGroupCreate { func (gm *GroupManager) List() MsgGroupList { defer gm.mu.Unlock() gm.mu.Lock() - // cannot change to `var`, since it would be json_encoded to null - result := []Group{} + var result []Group for _, v := range gm.groups { result = append(result, v) } diff --git a/src/job_manager.go b/src/job_manager.go index 224ac24..b58fe7f 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -28,7 +28,7 @@ func (jm *JobManager) start() { for i := range jm.job.Tasks { var resource NodeStatus for { - resource = jm.scheduler.AcquireResource(jm.job.Tasks[i]) + resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i]) if len(resource.Status) > 0 { break } @@ -92,7 +92,7 @@ func (jm *JobManager) start() { /* save logs etc. */ /* return resource */ - jm.scheduler.ReleaseResource(jm.resources[i]) + jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) fmt.Println("return resource ", jm.resources[i].ClientID) } } @@ -175,7 +175,7 @@ func (jm *JobManager) stop() MsgStop { } for i := range jm.resources { - jm.scheduler.ReleaseResource(jm.resources[i]) + jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) } jm.scheduler.UpdateProgress(jm.job.Name, Stopped) return MsgStop{Code: 0} diff --git a/src/scheduler.go b/src/scheduler.go index 4832f0d..77248c4 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -7,9 +7,9 @@ type Scheduler interface { UpdateProgress(jobName string, state State) - AcquireResource(Task) NodeStatus + AcquireResource(Job, Task) NodeStatus - ReleaseResource(NodeStatus) + ReleaseResource(Job, NodeStatus) AcquireNetwork() string diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index 304fd80..784be4c 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -83,7 +83,7 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerFCFS) AcquireResource(task Task) NodeStatus { +func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() @@ -114,7 +114,7 @@ func (scheduler *SchedulerFCFS) AcquireResource(task Task) NodeStatus { return res } -func (scheduler *SchedulerFCFS) ReleaseResource(agent NodeStatus) { +func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() nodes := pool.nodes[agent.ClientID] diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 30fa4a5..cf36c7a 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -5,15 +5,23 @@ import ( "time" log "github.com/sirupsen/logrus" "sort" - ) +) + +type ResourceCount struct { + NumberGPU int + MemoryGPU int + CPU int + Memory int +} type SchedulerFair struct { - history []*Job - queues map[string][]Job - mu sync.Mutex - scheduling sync.Mutex - jobs map[string]*JobManager - nextQueue string + history []*Job + queues map[string][]Job + mu sync.Mutex + scheduling sync.Mutex + jobs map[string]*JobManager + nextQueue string + resourceAllocations map[string]ResourceCount } type FairJobSorter []Job @@ -37,16 +45,16 @@ func (scheduler *SchedulerFair) Start() { scheduler.nextQueue = "default" scheduler.queues = map[string][]Job{} scheduler.queues["default"] = []Job{} + scheduler.resourceAllocations = map[string]ResourceCount{} go func() { for { - log.Info("Scheduling") + log.Debug("Scheduling") time.Sleep(time.Second * 5) scheduler.scheduling.Lock() scheduler.mu.Lock() queue := scheduler.nextQueue if len(scheduler.queues[queue]) > 0 { - jm := JobManager{} jm.job = scheduler.queues[queue][0] @@ -59,10 +67,12 @@ func (scheduler *SchedulerFair) Start() { go func() { jm.start() - scheduler.UpdateNextQueue() }() } else { scheduler.scheduling.Unlock() + go func() { + scheduler.UpdateNextQueue() + }() } scheduler.mu.Unlock() } @@ -140,7 +150,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerFair) AcquireResource(task Task) NodeStatus { +func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() @@ -156,6 +166,8 @@ func (scheduler *SchedulerFair) AcquireResource(task Task) NodeStatus { res.ClientID = id res.ClientHost = node.ClientHost res.Status = available[0:task.NumberGPU] + res.NumCPU = task.NumberCPU + res.MemTotal = task.Memory for i := range res.Status { for j := range node.Status { @@ -168,10 +180,23 @@ func (scheduler *SchedulerFair) AcquireResource(task Task) NodeStatus { break } } + go func(res NodeStatus) { + if _, ok := scheduler.resourceAllocations[job.Group]; !ok { + scheduler.resourceAllocations[job.Group] = ResourceCount{} + } + cnt, _ := scheduler.resourceAllocations[job.Group] + cnt.CPU += res.MemTotal + cnt.Memory += res.NumCPU + for _, v := range res.Status { + cnt.NumberGPU ++ + cnt.MemoryGPU += v.MemoryTotal + } + scheduler.UpdateNextQueue() + }(res) return res } -func (scheduler *SchedulerFair) ReleaseResource(agent NodeStatus) { +func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() nodes := pool.nodes[agent.ClientID] @@ -182,6 +207,19 @@ func (scheduler *SchedulerFair) ReleaseResource(agent NodeStatus) { } } } + go func(res NodeStatus) { + if _, ok := scheduler.resourceAllocations[job.Group]; !ok { + scheduler.resourceAllocations[job.Group] = ResourceCount{} + } + cnt, _ := scheduler.resourceAllocations[job.Group] + cnt.CPU -= res.MemTotal + cnt.Memory -= res.NumCPU + for _, v := range res.Status { + cnt.NumberGPU -- + cnt.MemoryGPU -= v.MemoryTotal + } + scheduler.UpdateNextQueue() + }(agent) } func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus { @@ -285,15 +323,36 @@ func (scheduler *SchedulerFair) ReleaseNetwork(network string) { } func (scheduler *SchedulerFair) UpdateNextQueue() { - flag := false - for k := range scheduler.queues { - if flag { - scheduler.nextQueue = k - return - } - if k == scheduler.nextQueue { - flag = true + next := "default" + quota := 9999.0 + + NumberGPU := 0.00001 + MemoryGPU := 0.00001 + CPU := 0.00001 + Memory := 0.0001 + for _, node := range pool.nodes { + CPU += float64(node.NumCPU) + Memory += float64(node.MemTotal) + for _, card := range node.Status { + NumberGPU += 1.0 + MemoryGPU += float64(card.MemoryTotal) } } - scheduler.nextQueue = "default" + + for k, v := range scheduler.resourceAllocations { + tmp := 0.0 + tmp += float64(v.CPU) / CPU + tmp += float64(v.Memory) / Memory + tmp += float64(v.NumberGPU) / NumberGPU + tmp += float64(v.MemoryGPU) / MemoryGPU + tmp /= 4 + if tmp < quota { + quota = tmp + next = k + } + } + scheduler.nextQueue = next + log.Info("updateNextQueue") + log.Info(scheduler.resourceAllocations) + log.Info("updateNextQueue ->", next) } diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 8e1919c..fd92c26 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -107,7 +107,7 @@ func (scheduler *SchedulerPriority) Schedule(job Job) { job.Status = Created } -func (scheduler *SchedulerPriority) AcquireResource(task Task) NodeStatus { +func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() @@ -138,7 +138,7 @@ func (scheduler *SchedulerPriority) AcquireResource(task Task) NodeStatus { return res } -func (scheduler *SchedulerPriority) ReleaseResource(agent NodeStatus) { +func (scheduler *SchedulerPriority) ReleaseResource(job Job, agent NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() nodes := pool.nodes[agent.ClientID]