1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
YAO-scheduler/src/scheduler_capacity.go

414 lines
10 KiB
Go
Raw Normal View History

2019-07-29 07:01:59 +00:00
package main
import (
"sync"
"time"
log "github.com/sirupsen/logrus"
2019-08-01 03:11:37 +00:00
"sort"
2020-05-24 13:07:02 +00:00
"math"
2019-08-01 05:42:53 +00:00
)
2020-05-25 09:42:14 +00:00
type SchedulerCapacity struct {
2020-05-03 03:04:17 +00:00
history []*Job
historyMu sync.Mutex
nextQueue string
jobs map[string]*JobManager
queues map[string][]Job
2020-05-28 03:44:15 +00:00
queuesMu sync.Mutex
2020-05-03 03:04:17 +00:00
2020-05-25 05:44:54 +00:00
schedulingJobs map[string]bool
schedulingMu sync.Mutex
2020-05-03 03:04:17 +00:00
2020-04-13 16:06:15 +00:00
resourceAllocations map[string]*ResourceCount
resourceAllocationsMu sync.Mutex
2020-05-03 03:04:17 +00:00
enabled bool
parallelism int
2020-04-30 09:52:52 +00:00
2020-04-30 15:06:12 +00:00
allocatingGPU int
allocatingGPUMu sync.Mutex
2019-08-01 03:11:37 +00:00
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) Start() {
2020-05-28 01:40:10 +00:00
log.Info("JS (capacity) started")
2020-05-04 06:04:32 +00:00
2019-07-29 07:01:59 +00:00
scheduler.jobs = map[string]*JobManager{}
scheduler.history = []*Job{}
2019-08-01 03:11:37 +00:00
scheduler.nextQueue = "default"
2019-08-01 03:14:14 +00:00
scheduler.queues = map[string][]Job{}
2019-08-01 03:13:14 +00:00
scheduler.queues["default"] = []Job{}
2019-08-01 06:00:24 +00:00
scheduler.resourceAllocations = map[string]*ResourceCount{}
2020-04-13 10:37:54 +00:00
scheduler.enabled = true
2020-05-25 05:44:54 +00:00
scheduler.schedulingJobs = map[string]bool{}
2020-04-30 15:06:12 +00:00
scheduler.allocatingGPU = 0
2019-07-29 07:01:59 +00:00
2020-04-13 15:53:38 +00:00
scheduler.parallelism = 1
2019-07-29 07:01:59 +00:00
go func() {
2020-05-03 03:04:17 +00:00
flag := true
2019-07-29 07:01:59 +00:00
for {
2019-08-01 05:42:53 +00:00
log.Debug("Scheduling")
2020-05-03 03:04:17 +00:00
if !flag {
2020-05-03 03:33:22 +00:00
time.Sleep(time.Millisecond * 100)
2020-05-03 03:04:17 +00:00
}
flag = false
2020-04-13 10:26:40 +00:00
if !scheduler.enabled {
2020-04-13 15:41:01 +00:00
time.Sleep(time.Millisecond * 100)
2020-04-13 10:26:40 +00:00
continue
}
2020-04-13 14:35:17 +00:00
scheduler.schedulingMu.Lock()
2020-05-25 05:44:54 +00:00
if len(scheduler.schedulingJobs) >= scheduler.parallelism {
2020-04-13 14:35:17 +00:00
scheduler.schedulingMu.Unlock()
2020-04-13 15:41:01 +00:00
time.Sleep(time.Millisecond * 100)
2020-04-13 14:35:17 +00:00
continue
}
scheduler.schedulingMu.Unlock()
2020-05-03 03:04:17 +00:00
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
2019-08-01 03:11:37 +00:00
queue := scheduler.nextQueue
2020-05-03 03:31:45 +00:00
go func() {
scheduler.UpdateNextQueue()
}()
2019-08-01 02:42:37 +00:00
if len(scheduler.queues[queue]) > 0 {
2019-07-29 07:01:59 +00:00
jm := JobManager{}
2019-08-01 02:42:37 +00:00
jm.job = scheduler.queues[queue][0]
2019-07-29 07:01:59 +00:00
2020-04-30 15:06:12 +00:00
cnt := 0
for _, task := range jm.job.Tasks {
cnt += task.NumberGPU
}
2020-05-03 02:30:12 +00:00
2020-05-24 13:07:02 +00:00
pool := InstanceOfResourcePool()
2020-05-25 09:25:02 +00:00
log.Info(cnt, pool.TotalGPU, pool.UsingGPU, scheduler.allocatingGPU)
if len(scheduler.schedulingJobs) > 1 && (cnt*10+(scheduler.allocatingGPU)*13 > (pool.TotalGPU-pool.UsingGPU)*10) {
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2020-05-03 03:04:17 +00:00
continue
2020-04-30 15:06:12 +00:00
}
2020-05-03 03:04:17 +00:00
flag = true
2020-04-30 15:06:12 +00:00
scheduler.allocatingGPUMu.Lock()
scheduler.allocatingGPU += cnt
scheduler.allocatingGPUMu.Unlock()
log.Info("allocatingGPU is ", scheduler.allocatingGPU)
2020-05-25 05:44:54 +00:00
log.Info("schedulingJobs are ", scheduler.schedulingJobs)
2020-04-30 15:06:12 +00:00
2019-08-01 02:42:37 +00:00
scheduler.queues[queue] = scheduler.queues[queue][1:]
2019-07-29 07:01:59 +00:00
jm.scheduler = scheduler
scheduler.jobs[jm.job.Name] = &jm
jm.job.Status = Starting
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Lock()
2019-07-29 07:01:59 +00:00
scheduler.history = append(scheduler.history, &jm.job)
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Unlock()
2019-07-29 07:01:59 +00:00
2020-05-25 05:44:54 +00:00
scheduler.schedulingMu.Lock()
scheduler.schedulingJobs[jm.job.Name] = true
scheduler.schedulingMu.Unlock()
2019-07-29 07:01:59 +00:00
go func() {
jm.start()
}()
} else {
2020-04-30 13:22:21 +00:00
log.Debug("No more jobs to scheduling ", time.Now())
2019-07-29 07:01:59 +00:00
}
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2019-07-29 07:01:59 +00:00
}
}()
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) UpdateProgress(job Job, state State) {
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Lock()
defer scheduler.historyMu.Unlock()
2020-05-25 05:44:54 +00:00
scheduler.schedulingMu.Lock()
delete(scheduler.schedulingJobs, job.Name)
scheduler.schedulingMu.Unlock()
2019-07-29 07:01:59 +00:00
switch state {
case Running:
for i := range scheduler.history {
2020-05-02 16:14:08 +00:00
if scheduler.history[i].Name == job.Name {
2019-07-29 07:01:59 +00:00
scheduler.history[i].Status = Running
2020-04-12 10:42:55 +00:00
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
2019-07-29 07:01:59 +00:00
}
}
break
case Finished:
for i := range scheduler.history {
2020-05-02 16:14:08 +00:00
if scheduler.history[i].Name == job.Name {
2019-07-29 07:01:59 +00:00
scheduler.history[i].Status = Finished
2020-04-12 10:42:55 +00:00
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
2019-07-29 07:01:59 +00:00
}
}
break
case Stopped:
for i := range scheduler.history {
2020-05-02 16:14:08 +00:00
if scheduler.history[i].Name == job.Name {
2019-07-29 07:01:59 +00:00
scheduler.history[i].Status = Stopped
2020-04-12 10:42:55 +00:00
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
2019-07-29 07:01:59 +00:00
}
}
break
2020-05-04 05:59:01 +00:00
case Failed:
for i := range scheduler.history {
if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Failed
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
}
}
break
2019-07-29 07:01:59 +00:00
}
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) Schedule(job Job) {
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
defer scheduler.queuesMu.Unlock()
2019-07-29 07:01:59 +00:00
2019-08-01 03:11:37 +00:00
queue := job.Group
2019-08-01 02:42:37 +00:00
_, ok := scheduler.queues[queue]
if !ok {
if InstanceOfGroupManager().get(queue) != nil {
scheduler.queues[queue] = []Job{}
} else {
queue = "default"
}
}
2019-07-29 07:01:59 +00:00
2019-08-01 02:42:37 +00:00
index := 0
2019-07-29 07:01:59 +00:00
left := 0
2019-08-01 03:17:57 +00:00
right := len(scheduler.queues[queue]) - 1
2019-07-29 07:01:59 +00:00
for ; left <= right; {
mid := (left + right) / 2
2019-08-01 02:42:37 +00:00
if scheduler.queues[queue][left].Priority < job.Priority {
2019-07-29 07:01:59 +00:00
index = left
break
}
2019-08-01 02:42:37 +00:00
if scheduler.queues[queue][right].Priority >= job.Priority {
2019-07-29 07:01:59 +00:00
index = right + 1
break
}
2019-08-01 02:42:37 +00:00
if scheduler.queues[queue][mid].Priority >= job.Priority {
2019-07-29 07:01:59 +00:00
left = mid + 1
} else {
right = mid - 1
}
}
2019-08-01 02:42:37 +00:00
scheduler.queues[queue] = append(scheduler.queues[queue], Job{})
2019-07-29 07:01:59 +00:00
2019-08-01 02:42:37 +00:00
copy(scheduler.queues[queue][index+1:], scheduler.queues[queue][index:])
scheduler.queues[queue][index] = job
2019-07-29 07:01:59 +00:00
job.Status = Created
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) AcquireResource(job Job) []NodeStatus {
2020-05-24 13:07:02 +00:00
res := InstanceOfResourcePool().acquireResource(job)
2020-04-30 06:44:02 +00:00
2020-05-24 13:07:02 +00:00
if len(res) != 0 {
for _, task := range job.Tasks {
2020-04-30 06:04:40 +00:00
2020-05-24 13:07:02 +00:00
scheduler.allocatingGPUMu.Lock()
scheduler.allocatingGPU -= task.NumberGPU
scheduler.allocatingGPUMu.Unlock()
2020-04-13 15:03:34 +00:00
}
2020-05-24 13:07:02 +00:00
log.Info("allocatingGPU is ", scheduler.allocatingGPU)
2020-04-30 10:50:50 +00:00
2020-05-24 13:07:02 +00:00
go func(nodes []NodeStatus) {
for _, node := range nodes {
scheduler.resourceAllocationsMu.Lock()
if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
scheduler.resourceAllocations[job.Group] = &ResourceCount{}
2020-04-30 09:52:52 +00:00
}
2020-05-24 13:07:02 +00:00
cnt, _ := scheduler.resourceAllocations[job.Group]
cnt.CPU += node.MemTotal
cnt.Memory += node.NumCPU
for _, v := range node.Status {
cnt.NumberGPU ++
cnt.MemoryGPU += v.MemoryTotal
2020-04-30 09:52:52 +00:00
}
2020-05-24 13:07:02 +00:00
scheduler.resourceAllocationsMu.Unlock()
scheduler.UpdateNextQueue()
2020-04-30 09:52:52 +00:00
}
2020-05-24 13:07:02 +00:00
}(res)
2020-04-30 11:31:26 +00:00
}
2020-04-30 09:52:52 +00:00
2019-07-29 07:01:59 +00:00
return res
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) ReleaseResource(job Job, agent NodeStatus) {
2020-05-24 13:07:02 +00:00
InstanceOfResourcePool().releaseResource(job, agent)
2020-05-28 11:38:46 +00:00
scheduler.resourceAllocationsMu.Lock()
if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
scheduler.resourceAllocations[job.Group] = &ResourceCount{}
}
cnt, _ := scheduler.resourceAllocations[job.Group]
cnt.CPU -= agent.MemTotal
cnt.Memory -= agent.NumCPU
for _, v := range agent.Status {
cnt.NumberGPU --
cnt.MemoryGPU -= v.MemoryTotal
}
scheduler.resourceAllocationsMu.Unlock()
2019-08-01 05:42:53 +00:00
go func(res NodeStatus) {
scheduler.UpdateNextQueue()
}(agent)
2019-07-29 07:01:59 +00:00
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus {
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
2019-07-29 07:01:59 +00:00
jm, ok := scheduler.jobs[jobName]
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2019-07-29 07:01:59 +00:00
if !ok {
return MsgJobStatus{Code: 1, Error: "Job not exist!"}
}
return jm.status()
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop {
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
2019-07-29 07:01:59 +00:00
jm, ok := scheduler.jobs[jobName]
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2019-07-29 07:01:59 +00:00
if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"}
}
2020-05-25 03:35:44 +00:00
return jm.stop(true)
2019-07-29 07:01:59 +00:00
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog {
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
2019-07-29 07:01:59 +00:00
jm, ok := scheduler.jobs[jobName]
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2019-07-29 07:01:59 +00:00
if !ok {
return MsgLog{Code: 1, Error: "Job not exist!"}
}
return jm.logs(taskName)
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) ListJobs() MsgJobList {
2019-08-01 03:11:37 +00:00
var jobs []Job
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Lock()
2019-07-29 07:01:59 +00:00
for _, job := range scheduler.history {
2019-08-01 03:11:37 +00:00
jobs = append(jobs, *job)
2019-07-29 07:01:59 +00:00
}
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Unlock()
2019-08-01 03:11:37 +00:00
var tmp []Job
for _, v := range scheduler.queues {
tmp = append(tmp, v...)
}
2020-06-03 08:50:56 +00:00
sort.Sort(JobSorter(tmp))
2019-08-01 03:11:37 +00:00
jobs = append(jobs, tmp...)
return MsgJobList{Code: 0, Jobs: jobs}
2019-07-29 07:01:59 +00:00
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) Summary() MsgSummary {
2019-07-29 07:01:59 +00:00
summary := MsgSummary{}
summary.Code = 0
finishedJobsCounter := 0
runningJobsCounter := 0
pendingJobsCounter := 0
var tmp []Job
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Lock()
2019-07-29 07:01:59 +00:00
for _, job := range scheduler.history {
tmp = append(tmp, *job)
}
2020-04-13 16:06:15 +00:00
scheduler.historyMu.Unlock()
2020-05-03 03:04:17 +00:00
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
2019-08-01 03:11:37 +00:00
for _, v := range scheduler.queues {
tmp = append(tmp, v...)
}
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2019-07-29 07:01:59 +00:00
for _, job := range tmp {
switch job.Status {
case Created:
pendingJobsCounter++
case Starting:
pendingJobsCounter++
break
case Running:
runningJobsCounter++
2020-05-24 13:07:02 +00:00
break
2019-07-29 07:01:59 +00:00
case Finished:
finishedJobsCounter++
case Stopped:
finishedJobsCounter++
}
}
summary.JobsFinished = finishedJobsCounter
summary.JobsPending = pendingJobsCounter
summary.JobsRunning = runningJobsCounter
2020-05-24 13:07:02 +00:00
summary.FreeGPU, summary.UsingGPU = InstanceOfResourcePool().countGPU()
2019-07-29 07:01:59 +00:00
return summary
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) UpdateNextQueue() {
2019-08-01 05:42:53 +00:00
next := "default"
2020-05-24 13:07:02 +00:00
quota := math.MaxFloat64
NumberGPU := float64(InstanceOfResourcePool().TotalGPU) + 0.00001
2019-08-01 05:42:53 +00:00
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Lock()
2019-08-01 06:07:51 +00:00
for k, t := range scheduler.queues {
if len(t) == 0 {
2019-08-01 06:03:17 +00:00
continue
}
2020-04-13 16:06:15 +00:00
scheduler.resourceAllocationsMu.Lock()
2019-08-01 06:07:51 +00:00
if _, ok := scheduler.resourceAllocations[k]; !ok {
scheduler.resourceAllocations[k] = &ResourceCount{}
}
v := scheduler.resourceAllocations[k]
2020-05-24 13:07:02 +00:00
tmp := float64(v.NumberGPU) / NumberGPU
2020-04-13 16:06:15 +00:00
scheduler.resourceAllocationsMu.Unlock()
2019-10-24 05:31:03 +00:00
weight := 10
if g, ok2 := InstanceOfGroupManager().groups[k]; !ok2 {
weight = g.Weight
}
tmp /= float64(weight)
2019-08-01 05:42:53 +00:00
if tmp < quota {
quota = tmp
next = k
2019-08-01 03:11:37 +00:00
}
}
2019-08-01 05:42:53 +00:00
scheduler.nextQueue = next
2020-05-28 01:40:10 +00:00
scheduler.queuesMu.Unlock()
2020-05-03 16:48:16 +00:00
log.Debug("updateNextQueue ->", next)
2019-08-01 03:11:37 +00:00
}
2020-04-11 03:38:04 +00:00
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) Enable() bool {
2020-04-13 10:26:40 +00:00
scheduler.enabled = true
2020-04-30 13:22:21 +00:00
log.Info("scheduler is enabled ", time.Now())
2020-04-13 10:37:54 +00:00
return true
2020-04-13 10:26:40 +00:00
}
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) Disable() bool {
2020-04-13 10:26:40 +00:00
scheduler.enabled = false
2020-04-30 13:22:21 +00:00
log.Info("scheduler is disabled ", time.Now())
2020-04-13 10:37:54 +00:00
return true
2020-04-13 10:26:40 +00:00
}
2020-04-13 15:53:38 +00:00
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) UpdateParallelism(parallelism int) bool {
2020-04-13 15:53:38 +00:00
scheduler.parallelism = parallelism
2020-04-30 13:22:21 +00:00
log.Info("parallelism is updated to ", parallelism)
2020-04-13 15:53:38 +00:00
return true
}
2020-04-30 09:52:52 +00:00
2020-05-25 09:42:14 +00:00
func (scheduler *SchedulerCapacity) updateGroup(group Group) bool {
2020-05-03 02:30:12 +00:00
return true
}
2020-05-28 03:44:15 +00:00
func (scheduler *SchedulerCapacity) DebugDump() map[string]interface{} {
res := map[string]interface{}{}
res["nextQueue"] = scheduler.nextQueue
res["schedulingJobs"] = scheduler.schedulingJobs
res["resourceAllocations"] = scheduler.resourceAllocations
res["allocatingGPU"] = scheduler.allocatingGPU
return res
}