1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 06:11:56 +00:00

update FCFS

This commit is contained in:
Newnius 2020-08-14 14:52:33 +08:00
parent 0c58264435
commit c54e526b42
2 changed files with 35 additions and 13 deletions

View File

@ -6,10 +6,14 @@ import (
) )
type SchedulerFCFS struct { type SchedulerFCFS struct {
history []*Job history []*Job
queue []Job historyMu sync.Mutex
mu sync.Mutex
scheduling sync.Mutex queue []Job
queueMu sync.Mutex
schedulingJobs map[string]bool
schedulingMu sync.Mutex
jobMasters map[string]*JobManager jobMasters map[string]*JobManager
enabled bool enabled bool
@ -25,8 +29,8 @@ func (scheduler *SchedulerFCFS) Start() {
for { for {
log.Debug("Scheduling") log.Debug("Scheduling")
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
scheduler.scheduling.Lock() scheduler.schedulingMu.Lock()
scheduler.mu.Lock() scheduler.queueMu.Lock()
if len(scheduler.queue) > 0 { if len(scheduler.queue) > 0 {
jm := JobManager{} jm := JobManager{}
@ -36,27 +40,35 @@ func (scheduler *SchedulerFCFS) Start() {
scheduler.jobMasters[jm.job.Name] = &jm scheduler.jobMasters[jm.job.Name] = &jm
jm.job.Status = Starting jm.job.Status = Starting
scheduler.historyMu.Lock()
scheduler.history = append(scheduler.history, &jm.job) scheduler.history = append(scheduler.history, &jm.job)
scheduler.historyMu.Unlock()
go func() { go func() {
jm.start() jm.start()
}() }()
} else { } else {
scheduler.scheduling.Unlock() scheduler.schedulingMu.Unlock()
} }
scheduler.mu.Unlock() scheduler.queueMu.Unlock()
} }
}() }()
} }
func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) { func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) {
scheduler.historyMu.Lock()
defer scheduler.historyMu.Unlock()
scheduler.schedulingMu.Lock()
delete(scheduler.schedulingJobs, job.Name)
scheduler.schedulingMu.Unlock()
switch state { switch state {
case Running: case Running:
scheduler.scheduling.Unlock()
for i := range scheduler.history { for i := range scheduler.history {
if scheduler.history[i].Name == job.Name { if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Running scheduler.history[i].Status = Running
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
} }
} }
break break
@ -64,6 +76,7 @@ func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) {
for i := range scheduler.history { for i := range scheduler.history {
if scheduler.history[i].Name == job.Name { if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Finished scheduler.history[i].Status = Finished
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
} }
} }
break break
@ -71,6 +84,15 @@ func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) {
for i := range scheduler.history { for i := range scheduler.history {
if scheduler.history[i].Name == job.Name { if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Stopped scheduler.history[i].Status = Stopped
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
}
}
break
case Failed:
for i := range scheduler.history {
if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Failed
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
} }
} }
break break
@ -78,8 +100,8 @@ func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) {
} }
func (scheduler *SchedulerFCFS) Schedule(job Job) { func (scheduler *SchedulerFCFS) Schedule(job Job) {
scheduler.mu.Lock() scheduler.queueMu.Lock()
defer scheduler.mu.Unlock() defer scheduler.queueMu.Unlock()
scheduler.queue = append(scheduler.queue, job) scheduler.queue = append(scheduler.queue, job)
job.Status = Created job.Status = Created

View File

@ -32,7 +32,7 @@ func (scheduler *SchedulerPriority) Start() {
go func() { go func() {
flag := true flag := true
for { for {
log.Info("Scheduling") log.Debug("Scheduling")
if !flag { /* no more job */ if !flag { /* no more job */
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }