1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-12 23:36:44 +00:00

bugfix, update shceduler_fair

This commit is contained in:
2019-08-01 10:42:37 +08:00
parent 8d6383592c
commit 4380d2a7d6
3 changed files with 38 additions and 16 deletions

View File

@@ -61,3 +61,15 @@ func (gm *GroupManager) List() MsgGroupList {
}
return MsgGroupList{Groups: result}
}
func (gm *GroupManager) get(name string) *Group {
defer gm.mu.Unlock()
gm.mu.Lock()
for _, v := range gm.groups {
if v.Name == name {
return &v
}
}
return nil
}

View File

@@ -8,10 +8,10 @@ import (
type SchedulerFair struct {
history []*Job
queue []Job
queues map[string][]Job
mu sync.Mutex
scheduling sync.Mutex
jobs map[string]*JobManager
jobs map[string]*JobManager
}
func (scheduler *SchedulerFair) Start() {
@@ -19,17 +19,18 @@ func (scheduler *SchedulerFair) Start() {
scheduler.history = []*Job{}
go func() {
queue := "default"
for {
log.Info("Scheduling")
time.Sleep(time.Second * 5)
scheduler.scheduling.Lock()
scheduler.mu.Lock()
if len(scheduler.queue) > 0 {
if len(scheduler.queues[queue]) > 0 {
jm := JobManager{}
jm.job = scheduler.queue[0]
jm.job = scheduler.queues[queue][0]
scheduler.queue = scheduler.queue[1:]
scheduler.queues[queue] = scheduler.queues[queue][1:]
jm.scheduler = scheduler
scheduler.jobs[jm.job.Name] = &jm
@@ -79,30 +80,39 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
index := 0
queue := "default"
_, ok := scheduler.queues[queue]
if !ok {
if InstanceOfGroupManager().get(queue) != nil {
scheduler.queues[queue] = []Job{}
} else {
queue = "default"
}
}
index := 0
left := 0
right := len(scheduler.queue) - 1
right := len(queue) - 1
for ; left <= right; {
mid := (left + right) / 2
if scheduler.queue[left].Priority < job.Priority {
if scheduler.queues[queue][left].Priority < job.Priority {
index = left
break
}
if scheduler.queue[right].Priority >= job.Priority {
if scheduler.queues[queue][right].Priority >= job.Priority {
index = right + 1
break
}
if scheduler.queue[mid].Priority >= job.Priority {
if scheduler.queues[queue][mid].Priority >= job.Priority {
left = mid + 1
} else {
right = mid - 1
}
}
scheduler.queue = append(scheduler.queue, Job{})
scheduler.queues[queue] = append(scheduler.queues[queue], Job{})
copy(scheduler.queue[index+1:], scheduler.queue[index:])
scheduler.queue[index] = job
copy(scheduler.queues[queue][index+1:], scheduler.queues[queue][index:])
scheduler.queues[queue][index] = job
job.Status = Created
}
@@ -180,7 +190,7 @@ func (scheduler *SchedulerFair) ListJobs() MsgJobList {
for _, job := range scheduler.history {
tmp = append(tmp, *job)
}
tmp = append(tmp, scheduler.queue...)
tmp = append(tmp, scheduler.queues["default"]...)
return MsgJobList{Code: 0, Jobs: tmp}
}
@@ -196,7 +206,7 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
for _, job := range scheduler.history {
tmp = append(tmp, *job)
}
tmp = append(tmp, scheduler.queue...)
tmp = append(tmp, scheduler.queues["default"]...)
for _, job := range tmp {
switch job.Status {

View File

@@ -117,7 +117,7 @@ type Job struct {
Name string `json:"name"`
Tasks []Task `json:"tasks"`
Workspace string `json:"workspace"`
Cluster int `json:"virtual_cluster"`
Group string `json:"group"`
Priority JobPriority `json:"priority"`
RunBefore int `json:"run_before"`
CreatedAt int `json:"created_at"`