From 1f35c4484c395090c780b4b0e15ee41b5e035723 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 1 Aug 2019 11:11:37 +0800 Subject: [PATCH] update shceduler_fair --- src/main.go | 3 ++- src/scheduler_fair.go | 56 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/main.go b/src/main.go index ae91869..d5921f1 100644 --- a/src/main.go +++ b/src/main.go @@ -153,7 +153,8 @@ func main() { pool.start() //scheduler = &SchedulerFCFS{} - scheduler = &SchedulerPriority{} + //scheduler = &SchedulerPriority{} + scheduler = &SchedulerFair{} scheduler.Start() go func() { diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index f5bbba3..b631be9 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -4,6 +4,7 @@ import ( "sync" "time" log "github.com/sirupsen/logrus" + "sort" ) type SchedulerFair struct { @@ -12,19 +13,36 @@ type SchedulerFair struct { mu sync.Mutex scheduling sync.Mutex jobs map[string]*JobManager + nextQueue string +} + +type FairJobSorter []Job + +func (s FairJobSorter) Len() int { + return len(s) +} +func (s FairJobSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s FairJobSorter) Less(i, j int) bool { + if s[i].Priority > s[j].Priority { + return true + } + return s[i].CreatedAt > s[j].CreatedAt } func (scheduler *SchedulerFair) Start() { scheduler.jobs = map[string]*JobManager{} scheduler.history = []*Job{} + scheduler.nextQueue = "default" go func() { - queue := "default" for { log.Info("Scheduling") time.Sleep(time.Second * 5) scheduler.scheduling.Lock() scheduler.mu.Lock() + queue := scheduler.nextQueue if len(scheduler.queues[queue]) > 0 { jm := JobManager{} @@ -39,6 +57,7 @@ func (scheduler *SchedulerFair) Start() { go func() { jm.start() + scheduler.UpdateNextQueue() }() } else { scheduler.scheduling.Unlock() @@ -65,6 +84,7 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { scheduler.history[i].Status = Finished } } + scheduler.UpdateNextQueue() break case Stopped: for i := range scheduler.history { @@ -72,6 +92,7 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { scheduler.history[i].Status = Stopped } } + scheduler.UpdateNextQueue() break } } @@ -80,7 +101,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) { scheduler.mu.Lock() defer scheduler.mu.Unlock() - queue := "default" + queue := job.Group _, ok := scheduler.queues[queue] if !ok { if InstanceOfGroupManager().get(queue) != nil { @@ -186,12 +207,17 @@ func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLo } func (scheduler *SchedulerFair) ListJobs() MsgJobList { - var tmp []Job + var jobs []Job for _, job := range scheduler.history { - tmp = append(tmp, *job) + jobs = append(jobs, *job) } - tmp = append(tmp, scheduler.queues["default"]...) - return MsgJobList{Code: 0, Jobs: tmp} + var tmp []Job + for _, v := range scheduler.queues { + tmp = append(tmp, v...) + } + sort.Sort(FairJobSorter(tmp)) + jobs = append(jobs, tmp...) + return MsgJobList{Code: 0, Jobs: jobs} } func (scheduler *SchedulerFair) Summary() MsgSummary { @@ -206,7 +232,9 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { for _, job := range scheduler.history { tmp = append(tmp, *job) } - tmp = append(tmp, scheduler.queues["default"]...) + for _, v := range scheduler.queues { + tmp = append(tmp, v...) + } for _, job := range tmp { switch job.Status { @@ -253,3 +281,17 @@ func (scheduler *SchedulerFair) AcquireNetwork() string { func (scheduler *SchedulerFair) ReleaseNetwork(network string) { pool.releaseNetwork(network) } + +func (scheduler *SchedulerFair) UpdateNextQueue() { + flag := false + for k := range scheduler.queues { + if flag { + scheduler.nextQueue = k + return + } + if k == scheduler.nextQueue { + flag = true + } + } + scheduler.nextQueue = "default" +}