1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00

update shceduler_fair

This commit is contained in:
2019-08-01 11:11:37 +08:00
parent 4380d2a7d6
commit 1f35c4484c
2 changed files with 51 additions and 8 deletions

View File

@@ -153,7 +153,8 @@ func main() {
pool.start() pool.start()
//scheduler = &SchedulerFCFS{} //scheduler = &SchedulerFCFS{}
scheduler = &SchedulerPriority{} //scheduler = &SchedulerPriority{}
scheduler = &SchedulerFair{}
scheduler.Start() scheduler.Start()
go func() { go func() {

View File

@@ -4,6 +4,7 @@ import (
"sync" "sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"sort"
) )
type SchedulerFair struct { type SchedulerFair struct {
@@ -12,19 +13,36 @@ type SchedulerFair struct {
mu sync.Mutex mu sync.Mutex
scheduling sync.Mutex scheduling sync.Mutex
jobs map[string]*JobManager jobs map[string]*JobManager
nextQueue string
}
type FairJobSorter []Job
func (s FairJobSorter) Len() int {
return len(s)
}
func (s FairJobSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s FairJobSorter) Less(i, j int) bool {
if s[i].Priority > s[j].Priority {
return true
}
return s[i].CreatedAt > s[j].CreatedAt
} }
func (scheduler *SchedulerFair) Start() { func (scheduler *SchedulerFair) Start() {
scheduler.jobs = map[string]*JobManager{} scheduler.jobs = map[string]*JobManager{}
scheduler.history = []*Job{} scheduler.history = []*Job{}
scheduler.nextQueue = "default"
go func() { go func() {
queue := "default"
for { for {
log.Info("Scheduling") log.Info("Scheduling")
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
scheduler.scheduling.Lock() scheduler.scheduling.Lock()
scheduler.mu.Lock() scheduler.mu.Lock()
queue := scheduler.nextQueue
if len(scheduler.queues[queue]) > 0 { if len(scheduler.queues[queue]) > 0 {
jm := JobManager{} jm := JobManager{}
@@ -39,6 +57,7 @@ func (scheduler *SchedulerFair) Start() {
go func() { go func() {
jm.start() jm.start()
scheduler.UpdateNextQueue()
}() }()
} else { } else {
scheduler.scheduling.Unlock() scheduler.scheduling.Unlock()
@@ -65,6 +84,7 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) {
scheduler.history[i].Status = Finished scheduler.history[i].Status = Finished
} }
} }
scheduler.UpdateNextQueue()
break break
case Stopped: case Stopped:
for i := range scheduler.history { for i := range scheduler.history {
@@ -72,6 +92,7 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) {
scheduler.history[i].Status = Stopped scheduler.history[i].Status = Stopped
} }
} }
scheduler.UpdateNextQueue()
break break
} }
} }
@@ -80,7 +101,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
scheduler.mu.Lock() scheduler.mu.Lock()
defer scheduler.mu.Unlock() defer scheduler.mu.Unlock()
queue := "default" queue := job.Group
_, ok := scheduler.queues[queue] _, ok := scheduler.queues[queue]
if !ok { if !ok {
if InstanceOfGroupManager().get(queue) != nil { if InstanceOfGroupManager().get(queue) != nil {
@@ -186,12 +207,17 @@ func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLo
} }
func (scheduler *SchedulerFair) ListJobs() MsgJobList { func (scheduler *SchedulerFair) ListJobs() MsgJobList {
var tmp []Job var jobs []Job
for _, job := range scheduler.history { for _, job := range scheduler.history {
tmp = append(tmp, *job) jobs = append(jobs, *job)
} }
tmp = append(tmp, scheduler.queues["default"]...) var tmp []Job
return MsgJobList{Code: 0, Jobs: tmp} for _, v := range scheduler.queues {
tmp = append(tmp, v...)
}
sort.Sort(FairJobSorter(tmp))
jobs = append(jobs, tmp...)
return MsgJobList{Code: 0, Jobs: jobs}
} }
func (scheduler *SchedulerFair) Summary() MsgSummary { func (scheduler *SchedulerFair) Summary() MsgSummary {
@@ -206,7 +232,9 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
for _, job := range scheduler.history { for _, job := range scheduler.history {
tmp = append(tmp, *job) tmp = append(tmp, *job)
} }
tmp = append(tmp, scheduler.queues["default"]...) for _, v := range scheduler.queues {
tmp = append(tmp, v...)
}
for _, job := range tmp { for _, job := range tmp {
switch job.Status { switch job.Status {
@@ -253,3 +281,17 @@ func (scheduler *SchedulerFair) AcquireNetwork() string {
func (scheduler *SchedulerFair) ReleaseNetwork(network string) { func (scheduler *SchedulerFair) ReleaseNetwork(network string) {
pool.releaseNetwork(network) pool.releaseNetwork(network)
} }
func (scheduler *SchedulerFair) UpdateNextQueue() {
flag := false
for k := range scheduler.queues {
if flag {
scheduler.nextQueue = k
return
}
if k == scheduler.nextQueue {
flag = true
}
}
scheduler.nextQueue = "default"
}