From 00f69969ceba1439e412aab6c8c8752a3096b051 Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 22 Jul 2020 15:27:44 +0800 Subject: [PATCH] bugfix, concurrent map write in job master --- src/configuration.go | 18 +++++++++++++++++- src/job_manager.go | 17 +++++++++++++++-- src/main.go | 6 ++++++ src/scheduler_FCFS.go | 12 ++++++------ src/scheduler_capacity.go | 12 ++++++------ src/scheduler_fair.go | 16 ++++++++-------- src/scheduler_priority.go | 14 +++++++------- 7 files changed, 65 insertions(+), 30 deletions(-) diff --git a/src/configuration.go b/src/configuration.go index 27f41eb..0fe430e 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -18,8 +18,9 @@ type Configuration struct { EnableShareRatio float64 `json:"EnableShareRatio"` ShareMaxUtilization float64 `json:"ShareMaxUtilization"` EnablePreScheduleRatio float64 `json:"EnablePreScheduleRatio"` - PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds to schedule ahead except pre+post */ + PreScheduleExtraTime int `json:"PreScheduleExtraTime"` /* seconds of schedule ahead except pre+post */ PreScheduleTimeout int `json:"PreScheduleTimeout"` + JobMaxRetries int `json:"scheduler.job_max_retries"` mock bool mu sync.Mutex @@ -51,6 +52,7 @@ func InstanceOfConfiguration() *Configuration { ShareMaxUtilization: 1.3, // more than 1.0 to expect more improvement EnablePreScheduleRatio: 1.5, PreScheduleExtraTime: 15, + JobMaxRetries: 0, } } return configurationInstance @@ -116,6 +118,12 @@ func (config *Configuration) InitFromEnv() { configurationInstance.PreScheduleTimeout = val } } + value = os.Getenv("scheduler.job_max_retries") + if len(value) != 0 { + if val, err := strconv.Atoi(value); err == nil && val >= 0 { + configurationInstance.JobMaxRetries = val + } + } } func (config *Configuration) SetMockEnabled(enabled bool) bool { @@ -150,6 +158,14 @@ func (config *Configuration) SetShareMaxUtilization(value float64) bool { return true } +func (config *Configuration) SetJobMaxRetries(value int) bool { + config.mu.Lock() + defer config.mu.Unlock() + config.JobMaxRetries = value + log.Info("scheduler.job_max_retries is set to ", value) + return true +} + func (config *Configuration) Dump() map[string]interface{} { config.mu.Lock() defer config.mu.Unlock() diff --git a/src/job_manager.go b/src/job_manager.go index 88bc179..6bcbde1 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -25,6 +25,7 @@ type JobManager struct { jobStatus JobStatus isRunning bool lastHeartBeat map[string]int64 + statusMu sync.Mutex /* history info */ stats [][]TaskStatus @@ -153,8 +154,10 @@ func (jm *JobManager) start() { return } taskStatus := TaskStatus{Id: res.Id, Node: node.ClientHost, HostName: jm.job.Tasks[i].Name} + jm.statusMu.Lock() jm.jobStatus.tasks[task.Name] = taskStatus jm.lastHeartBeat[task.Name] = time.Now().Unix() + jm.statusMu.Unlock() }(task, jm.resources[task.Name]) } @@ -175,11 +178,13 @@ func (jm *JobManager) start() { break } now := time.Now().Unix() + jm.statusMu.Lock() for task, pre := range jm.lastHeartBeat { if now-pre > 30 { log.Warn(jm.job.Name, "-", task, " heartbeat longer tha 30s") } } + jm.statusMu.Unlock() time.Sleep(time.Second * 1) } @@ -313,7 +318,9 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { } jm.resourcesMu.Unlock() } + jm.statusMu.Lock() jm.lastHeartBeat[jm.job.Tasks[i].Name] = time.Now().Unix() + jm.statusMu.Unlock() } if flagRunning && onlyPS && jm.isRunning { log.Info(jm.job.Name, " Only PS is running, stop ") @@ -333,9 +340,11 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { func (jm *JobManager) logs(taskName string) MsgLog { spider := Spider{} spider.Method = "GET" + jm.statusMu.Lock() spider.URL = "http://" + jm.jobStatus.tasks[taskName].Node + ":8000/logs?id=" + jm.jobStatus.tasks[taskName].Id - - if _, ok := jm.jobStatus.tasks[taskName]; !ok { + _, ok := jm.jobStatus.tasks[taskName] + jm.statusMu.Unlock() + if !ok { return MsgLog{Code: -1, Error: "Task not exist"} } @@ -370,7 +379,9 @@ func (jm *JobManager) status() MsgJobStatus { } for i, task := range jm.job.Tasks { + jm.statusMu.Lock() taskStatus := jm.jobStatus.tasks[task.Name] + jm.statusMu.Unlock() /* still in launching phase */ if len(taskStatus.Node) == 0 { @@ -437,6 +448,7 @@ func (jm *JobManager) stop() MsgStop { log.Info("kill job, ", jm.job.Name) } + jm.statusMu.Lock() for _, taskStatus := range jm.jobStatus.tasks { /* stop at background */ go func(task TaskStatus) { @@ -473,5 +485,6 @@ func (jm *JobManager) stop() MsgStop { } }(taskStatus) } + jm.statusMu.Unlock() return MsgStop{Code: 0} } diff --git a/src/main.go b/src/main.go index 2c35c52..c07c3b8 100644 --- a/src/main.go +++ b/src/main.go @@ -354,6 +354,12 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { ok = log.LoggerDisableModule(value) break + case "scheduler.job_max_retries": + if maxRetries, err := strconv.Atoi(value); err == nil { + ok = InstanceOfConfiguration().SetJobMaxRetries(maxRetries) + } + break + } var msg MsgConfUpdate msg.Code = 0 diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index 3bf7deb..07cbd09 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -11,13 +11,13 @@ type SchedulerFCFS struct { mu sync.Mutex scheduling sync.Mutex - jobs map[string]*JobManager + jobMasters map[string]*JobManager enabled bool parallelism int } func (scheduler *SchedulerFCFS) Start() { - scheduler.jobs = map[string]*JobManager{} + scheduler.jobMasters = map[string]*JobManager{} scheduler.history = []*Job{} scheduler.enabled = true @@ -33,7 +33,7 @@ func (scheduler *SchedulerFCFS) Start() { jm.job = scheduler.queue[0] scheduler.queue = scheduler.queue[1:] jm.scheduler = scheduler - scheduler.jobs[jm.job.Name] = &jm + scheduler.jobMasters[jm.job.Name] = &jm jm.job.Status = Starting scheduler.history = append(scheduler.history, &jm.job) @@ -95,7 +95,7 @@ func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) { } func (scheduler *SchedulerFCFS) QueryState(jobName string) MsgJobStatus { - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} } @@ -103,7 +103,7 @@ func (scheduler *SchedulerFCFS) QueryState(jobName string) MsgJobStatus { } func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop { - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } @@ -111,7 +111,7 @@ func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop { } func (scheduler *SchedulerFCFS) QueryLogs(jobName string, taskName string) MsgLog { - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} } diff --git a/src/scheduler_capacity.go b/src/scheduler_capacity.go index d0e3147..c58151e 100644 --- a/src/scheduler_capacity.go +++ b/src/scheduler_capacity.go @@ -12,7 +12,7 @@ type SchedulerCapacity struct { historyMu sync.Mutex nextQueue string - jobs map[string]*JobManager + jobMasters map[string]*JobManager queues map[string][]Job queuesMu sync.Mutex @@ -32,7 +32,7 @@ type SchedulerCapacity struct { func (scheduler *SchedulerCapacity) Start() { log.Info("JS (capacity) started") - scheduler.jobs = map[string]*JobManager{} + scheduler.jobMasters = map[string]*JobManager{} scheduler.history = []*Job{} scheduler.nextQueue = "default" scheduler.queues = map[string][]Job{} @@ -94,7 +94,7 @@ func (scheduler *SchedulerCapacity) Start() { scheduler.queues[queue] = scheduler.queues[queue][1:] jm.scheduler = scheduler - scheduler.jobs[jm.job.Name] = &jm + scheduler.jobMasters[jm.job.Name] = &jm jm.job.Status = Starting scheduler.historyMu.Lock() @@ -257,7 +257,7 @@ func (scheduler *SchedulerCapacity) ReleaseResource(job Job, agent NodeStatus) { func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus { scheduler.queuesMu.Lock() - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] scheduler.queuesMu.Unlock() if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} @@ -267,7 +267,7 @@ func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus { func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop { scheduler.queuesMu.Lock() - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] scheduler.queuesMu.Unlock() if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} @@ -277,7 +277,7 @@ func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop { func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog { scheduler.queuesMu.Lock() - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] scheduler.queuesMu.Unlock() if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 07d6938..9e3e40d 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -11,9 +11,9 @@ type SchedulerFair struct { history []*Job historyMu sync.Mutex - jobs map[string]*JobManager - queues map[string]JobList - queuesMu sync.Mutex + jobMasters map[string]*JobManager + queues map[string]JobList + queuesMu sync.Mutex drfyarn bool enableBorrow bool @@ -58,7 +58,7 @@ func (jobs JobList) Swap(i, j int) { func (scheduler *SchedulerFair) Start() { log.Info("JS (fairness) started") - scheduler.jobs = map[string]*JobManager{} + scheduler.jobMasters = map[string]*JobManager{} scheduler.history = []*Job{} scheduler.queues = map[string]JobList{} scheduler.queues["default"] = []Job{} @@ -335,7 +335,7 @@ func (scheduler *SchedulerFair) Start() { jm.scheduler = scheduler jm.job.Status = Starting - scheduler.jobs[jm.job.Name] = &jm + scheduler.jobMasters[jm.job.Name] = &jm scheduler.queues[bestQueue] = scheduler.queues[bestQueue][1:] scheduler.historyMu.Lock() @@ -681,7 +681,7 @@ func (scheduler *SchedulerFair) UpdateQuota() { func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus { scheduler.queuesMu.Lock() - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] scheduler.queuesMu.Unlock() if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} @@ -692,7 +692,7 @@ func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus { func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { log.Info("Stop job ", jobName) scheduler.queuesMu.Lock() - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] scheduler.queuesMu.Unlock() if ok { return jm.stop() @@ -727,7 +727,7 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLog { scheduler.queuesMu.Lock() - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] scheduler.queuesMu.Unlock() if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 381bea9..a1bb5cf 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -17,13 +17,13 @@ type SchedulerPriority struct { schedulingJobs map[string]bool schedulingMu sync.Mutex - jobs map[string]*JobManager + jobMasters map[string]*JobManager enabled bool parallelism int } func (scheduler *SchedulerPriority) Start() { - scheduler.jobs = map[string]*JobManager{} + scheduler.jobMasters = map[string]*JobManager{} scheduler.history = []*Job{} scheduler.enabled = true scheduler.parallelism = 1 @@ -58,7 +58,7 @@ func (scheduler *SchedulerPriority) Start() { jm.job = scheduler.queue[0] scheduler.queue = scheduler.queue[1:] jm.scheduler = scheduler - scheduler.jobs[jm.job.Name] = &jm + scheduler.jobMasters[jm.job.Name] = &jm jm.job.Status = Starting scheduler.historyMu.Lock() @@ -136,7 +136,7 @@ func (scheduler *SchedulerPriority) Start() { copy(scheduler.queue[idx+1:], scheduler.queue[idx:]) scheduler.queue[idx] = preempted - delete(scheduler.jobs, preempted.Name) + delete(scheduler.jobMasters, preempted.Name) for { after := InstanceOfResourcePool().UsingGPU @@ -240,7 +240,7 @@ func (scheduler *SchedulerPriority) ReleaseResource(job Job, agent NodeStatus) { } func (scheduler *SchedulerPriority) QueryState(jobName string) MsgJobStatus { - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} } @@ -248,7 +248,7 @@ func (scheduler *SchedulerPriority) QueryState(jobName string) MsgJobStatus { } func (scheduler *SchedulerPriority) Stop(jobName string) MsgStop { - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } @@ -256,7 +256,7 @@ func (scheduler *SchedulerPriority) Stop(jobName string) MsgStop { } func (scheduler *SchedulerPriority) QueryLogs(jobName string, taskName string) MsgLog { - jm, ok := scheduler.jobs[jobName] + jm, ok := scheduler.jobMasters[jobName] if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} }