1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-04-14 00:06:15 +08:00
parent ca3ac7aea1
commit 6730dfc58e

View File

@@ -16,17 +16,18 @@ type ResourceCount struct {
} }
type SchedulerFair struct { type SchedulerFair struct {
history []*Job history []*Job
queues map[string][]Job historyMu sync.Mutex
queueMu sync.Mutex queues map[string][]Job
schedulingMu sync.Mutex queueMu sync.Mutex
schedulingJobsCnt int schedulingMu sync.Mutex
jobs map[string]*JobManager schedulingJobsCnt int
nextQueue string jobs map[string]*JobManager
resourceAllocations map[string]*ResourceCount nextQueue string
enabled bool resourceAllocations map[string]*ResourceCount
latestPoolIndex int resourceAllocationsMu sync.Mutex
parallelism int enabled bool
parallelism int
} }
type FairJobSorter []Job type FairJobSorter []Job
@@ -79,7 +80,9 @@ func (scheduler *SchedulerFair) Start() {
scheduler.jobs[jm.job.Name] = &jm scheduler.jobs[jm.job.Name] = &jm
jm.job.Status = Starting jm.job.Status = Starting
scheduler.historyMu.Lock()
scheduler.history = append(scheduler.history, &jm.job) scheduler.history = append(scheduler.history, &jm.job)
scheduler.historyMu.Unlock()
go func() { go func() {
jm.start() jm.start()
@@ -100,6 +103,9 @@ func (scheduler *SchedulerFair) Start() {
} }
func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) {
scheduler.historyMu.Lock()
defer scheduler.historyMu.Unlock()
switch state { switch state {
case Running: case Running:
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
@@ -215,6 +221,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
if len(res.Status) == 0 { if len(res.Status) == 0 {
return return
} }
scheduler.resourceAllocationsMu.Lock()
if _, ok := scheduler.resourceAllocations[job.Group]; !ok { if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
scheduler.resourceAllocations[job.Group] = &ResourceCount{} scheduler.resourceAllocations[job.Group] = &ResourceCount{}
} }
@@ -225,6 +232,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
cnt.NumberGPU ++ cnt.NumberGPU ++
cnt.MemoryGPU += v.MemoryTotal cnt.MemoryGPU += v.MemoryTotal
} }
scheduler.resourceAllocationsMu.Unlock()
scheduler.UpdateNextQueue() scheduler.UpdateNextQueue()
}(res) }(res)
@@ -250,6 +258,7 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
} }
} }
go func(res NodeStatus) { go func(res NodeStatus) {
scheduler.resourceAllocationsMu.Lock()
if _, ok := scheduler.resourceAllocations[job.Group]; !ok { if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
scheduler.resourceAllocations[job.Group] = &ResourceCount{} scheduler.resourceAllocations[job.Group] = &ResourceCount{}
} }
@@ -260,6 +269,7 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
cnt.NumberGPU -- cnt.NumberGPU --
cnt.MemoryGPU -= v.MemoryTotal cnt.MemoryGPU -= v.MemoryTotal
} }
scheduler.resourceAllocationsMu.Unlock()
scheduler.UpdateNextQueue() scheduler.UpdateNextQueue()
}(agent) }(agent)
} }
@@ -290,9 +300,11 @@ func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLo
func (scheduler *SchedulerFair) ListJobs() MsgJobList { func (scheduler *SchedulerFair) ListJobs() MsgJobList {
var jobs []Job var jobs []Job
scheduler.historyMu.Lock()
for _, job := range scheduler.history { for _, job := range scheduler.history {
jobs = append(jobs, *job) jobs = append(jobs, *job)
} }
scheduler.historyMu.Unlock()
var tmp []Job var tmp []Job
for _, v := range scheduler.queues { for _, v := range scheduler.queues {
tmp = append(tmp, v...) tmp = append(tmp, v...)
@@ -311,9 +323,11 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
pendingJobsCounter := 0 pendingJobsCounter := 0
var tmp []Job var tmp []Job
scheduler.historyMu.Lock()
for _, job := range scheduler.history { for _, job := range scheduler.history {
tmp = append(tmp, *job) tmp = append(tmp, *job)
} }
scheduler.historyMu.Unlock()
for _, v := range scheduler.queues { for _, v := range scheduler.queues {
tmp = append(tmp, v...) tmp = append(tmp, v...)
} }
@@ -393,6 +407,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
if len(t) == 0 { if len(t) == 0 {
continue continue
} }
scheduler.resourceAllocationsMu.Lock()
if _, ok := scheduler.resourceAllocations[k]; !ok { if _, ok := scheduler.resourceAllocations[k]; !ok {
scheduler.resourceAllocations[k] = &ResourceCount{} scheduler.resourceAllocations[k] = &ResourceCount{}
} }
@@ -403,6 +418,7 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
tmp += float64(v.Memory) / Memory tmp += float64(v.Memory) / Memory
tmp += float64(v.NumberGPU) / NumberGPU tmp += float64(v.NumberGPU) / NumberGPU
tmp += float64(v.MemoryGPU) / MemoryGPU tmp += float64(v.MemoryGPU) / MemoryGPU
scheduler.resourceAllocationsMu.Unlock()
tmp /= 4 tmp /= 4
weight := 10 weight := 10
if g, ok2 := InstanceOfGroupManager().groups[k]; !ok2 { if g, ok2 := InstanceOfGroupManager().groups[k]; !ok2 {