From c54e526b42f6c34badf4b0303181dc689c83535b Mon Sep 17 00:00:00 2001 From: Newnius Date: Fri, 14 Aug 2020 14:52:33 +0800 Subject: [PATCH] update FCFS --- src/scheduler_FCFS.go | 46 +++++++++++++++++++++++++++++---------- src/scheduler_priority.go | 2 +- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index 1d3cb72..2fe6db0 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -6,10 +6,14 @@ import ( ) type SchedulerFCFS struct { - history []*Job - queue []Job - mu sync.Mutex - scheduling sync.Mutex + history []*Job + historyMu sync.Mutex + + queue []Job + queueMu sync.Mutex + + schedulingJobs map[string]bool + schedulingMu sync.Mutex jobMasters map[string]*JobManager enabled bool @@ -25,8 +29,8 @@ func (scheduler *SchedulerFCFS) Start() { for { log.Debug("Scheduling") time.Sleep(time.Second * 5) - scheduler.scheduling.Lock() - scheduler.mu.Lock() + scheduler.schedulingMu.Lock() + scheduler.queueMu.Lock() if len(scheduler.queue) > 0 { jm := JobManager{} @@ -36,27 +40,35 @@ func (scheduler *SchedulerFCFS) Start() { scheduler.jobMasters[jm.job.Name] = &jm jm.job.Status = Starting + scheduler.historyMu.Lock() scheduler.history = append(scheduler.history, &jm.job) + scheduler.historyMu.Unlock() go func() { jm.start() }() } else { - scheduler.scheduling.Unlock() + scheduler.schedulingMu.Unlock() } - scheduler.mu.Unlock() + scheduler.queueMu.Unlock() } }() } func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) { + scheduler.historyMu.Lock() + defer scheduler.historyMu.Unlock() + + scheduler.schedulingMu.Lock() + delete(scheduler.schedulingJobs, job.Name) + scheduler.schedulingMu.Unlock() + switch state { case Running: - scheduler.scheduling.Unlock() - for i := range scheduler.history { if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Running + scheduler.history[i].UpdatedAt = int(time.Now().Unix()) } } break @@ -64,6 +76,7 @@ func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) { for i := range scheduler.history { if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Finished + scheduler.history[i].UpdatedAt = int(time.Now().Unix()) } } break @@ -71,6 +84,15 @@ func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) { for i := range scheduler.history { if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Stopped + scheduler.history[i].UpdatedAt = int(time.Now().Unix()) + } + } + break + case Failed: + for i := range scheduler.history { + if scheduler.history[i].Name == job.Name { + scheduler.history[i].Status = Failed + scheduler.history[i].UpdatedAt = int(time.Now().Unix()) } } break @@ -78,8 +100,8 @@ func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) { } func (scheduler *SchedulerFCFS) Schedule(job Job) { - scheduler.mu.Lock() - defer scheduler.mu.Unlock() + scheduler.queueMu.Lock() + defer scheduler.queueMu.Unlock() scheduler.queue = append(scheduler.queue, job) job.Status = Created diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index f68f945..7303cb3 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -32,7 +32,7 @@ func (scheduler *SchedulerPriority) Start() { go func() { flag := true for { - log.Info("Scheduling") + log.Debug("Scheduling") if !flag { /* no more job */ time.Sleep(time.Millisecond * 100) }