From 4380d2a7d6ef0b41a73beb86c018149f5f2be4c3 Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 1 Aug 2019 10:42:37 +0800 Subject: [PATCH] bugfix, update shceduler_fair --- src/group.go | 12 ++++++++++++ src/scheduler_fair.go | 40 +++++++++++++++++++++++++--------------- src/util.go | 2 +- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/src/group.go b/src/group.go index 9b7d7ed..a847580 100644 --- a/src/group.go +++ b/src/group.go @@ -61,3 +61,15 @@ func (gm *GroupManager) List() MsgGroupList { } return MsgGroupList{Groups: result} } + +func (gm *GroupManager) get(name string) *Group { + defer gm.mu.Unlock() + gm.mu.Lock() + + for _, v := range gm.groups { + if v.Name == name { + return &v + } + } + return nil +} diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 550b002..f5bbba3 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -8,10 +8,10 @@ import ( type SchedulerFair struct { history []*Job - queue []Job + queues map[string][]Job mu sync.Mutex scheduling sync.Mutex - jobs map[string]*JobManager + jobs map[string]*JobManager } func (scheduler *SchedulerFair) Start() { @@ -19,17 +19,18 @@ func (scheduler *SchedulerFair) Start() { scheduler.history = []*Job{} go func() { + queue := "default" for { log.Info("Scheduling") time.Sleep(time.Second * 5) scheduler.scheduling.Lock() scheduler.mu.Lock() - if len(scheduler.queue) > 0 { + if len(scheduler.queues[queue]) > 0 { jm := JobManager{} - jm.job = scheduler.queue[0] + jm.job = scheduler.queues[queue][0] - scheduler.queue = scheduler.queue[1:] + scheduler.queues[queue] = scheduler.queues[queue][1:] jm.scheduler = scheduler scheduler.jobs[jm.job.Name] = &jm @@ -79,30 +80,39 @@ func (scheduler *SchedulerFair) Schedule(job Job) { scheduler.mu.Lock() defer scheduler.mu.Unlock() - index := 0 + queue := "default" + _, ok := scheduler.queues[queue] + if !ok { + if InstanceOfGroupManager().get(queue) != nil { + scheduler.queues[queue] = []Job{} + } else { + queue = "default" + } + } + index := 0 left := 0 - right := len(scheduler.queue) - 1 + right := len(queue) - 1 for ; left <= right; { mid := (left + right) / 2 - if scheduler.queue[left].Priority < job.Priority { + if scheduler.queues[queue][left].Priority < job.Priority { index = left break } - if scheduler.queue[right].Priority >= job.Priority { + if scheduler.queues[queue][right].Priority >= job.Priority { index = right + 1 break } - if scheduler.queue[mid].Priority >= job.Priority { + if scheduler.queues[queue][mid].Priority >= job.Priority { left = mid + 1 } else { right = mid - 1 } } - scheduler.queue = append(scheduler.queue, Job{}) + scheduler.queues[queue] = append(scheduler.queues[queue], Job{}) - copy(scheduler.queue[index+1:], scheduler.queue[index:]) - scheduler.queue[index] = job + copy(scheduler.queues[queue][index+1:], scheduler.queues[queue][index:]) + scheduler.queues[queue][index] = job job.Status = Created } @@ -180,7 +190,7 @@ func (scheduler *SchedulerFair) ListJobs() MsgJobList { for _, job := range scheduler.history { tmp = append(tmp, *job) } - tmp = append(tmp, scheduler.queue...) + tmp = append(tmp, scheduler.queues["default"]...) return MsgJobList{Code: 0, Jobs: tmp} } @@ -196,7 +206,7 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { for _, job := range scheduler.history { tmp = append(tmp, *job) } - tmp = append(tmp, scheduler.queue...) + tmp = append(tmp, scheduler.queues["default"]...) for _, job := range tmp { switch job.Status { diff --git a/src/util.go b/src/util.go index fee9def..23086a8 100644 --- a/src/util.go +++ b/src/util.go @@ -117,7 +117,7 @@ type Job struct { Name string `json:"name"` Tasks []Task `json:"tasks"` Workspace string `json:"workspace"` - Cluster int `json:"virtual_cluster"` + Group string `json:"group"` Priority JobPriority `json:"priority"` RunBefore int `json:"run_before"` CreatedAt int `json:"created_at"`