1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 22:31:55 +00:00

update shceduler_fair

This commit is contained in:
Newnius 2019-08-01 13:42:53 +08:00
parent e178154ca0
commit 74373fb950
6 changed files with 90 additions and 32 deletions

View File

@ -54,8 +54,7 @@ func (gm *GroupManager) Remove(group Group) MsgGroupCreate {
func (gm *GroupManager) List() MsgGroupList {
defer gm.mu.Unlock()
gm.mu.Lock()
// cannot change to `var`, since it would be json_encoded to null
result := []Group{}
var result []Group
for _, v := range gm.groups {
result = append(result, v)
}

View File

@ -28,7 +28,7 @@ func (jm *JobManager) start() {
for i := range jm.job.Tasks {
var resource NodeStatus
for {
resource = jm.scheduler.AcquireResource(jm.job.Tasks[i])
resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i])
if len(resource.Status) > 0 {
break
}
@ -92,7 +92,7 @@ func (jm *JobManager) start() {
/* save logs etc. */
/* return resource */
jm.scheduler.ReleaseResource(jm.resources[i])
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
fmt.Println("return resource ", jm.resources[i].ClientID)
}
}
@ -175,7 +175,7 @@ func (jm *JobManager) stop() MsgStop {
}
for i := range jm.resources {
jm.scheduler.ReleaseResource(jm.resources[i])
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
}
jm.scheduler.UpdateProgress(jm.job.Name, Stopped)
return MsgStop{Code: 0}

View File

@ -7,9 +7,9 @@ type Scheduler interface {
UpdateProgress(jobName string, state State)
AcquireResource(Task) NodeStatus
AcquireResource(Job, Task) NodeStatus
ReleaseResource(NodeStatus)
ReleaseResource(Job, NodeStatus)
AcquireNetwork() string

View File

@ -83,7 +83,7 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) {
job.Status = Created
}
func (scheduler *SchedulerFCFS) AcquireResource(task Task) NodeStatus {
func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus {
pool.mu.Lock()
defer pool.mu.Unlock()
@ -114,7 +114,7 @@ func (scheduler *SchedulerFCFS) AcquireResource(task Task) NodeStatus {
return res
}
func (scheduler *SchedulerFCFS) ReleaseResource(agent NodeStatus) {
func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) {
pool.mu.Lock()
defer pool.mu.Unlock()
nodes := pool.nodes[agent.ClientID]

View File

@ -5,15 +5,23 @@ import (
"time"
log "github.com/sirupsen/logrus"
"sort"
)
)
type ResourceCount struct {
NumberGPU int
MemoryGPU int
CPU int
Memory int
}
type SchedulerFair struct {
history []*Job
queues map[string][]Job
mu sync.Mutex
scheduling sync.Mutex
jobs map[string]*JobManager
nextQueue string
history []*Job
queues map[string][]Job
mu sync.Mutex
scheduling sync.Mutex
jobs map[string]*JobManager
nextQueue string
resourceAllocations map[string]ResourceCount
}
type FairJobSorter []Job
@ -37,16 +45,16 @@ func (scheduler *SchedulerFair) Start() {
scheduler.nextQueue = "default"
scheduler.queues = map[string][]Job{}
scheduler.queues["default"] = []Job{}
scheduler.resourceAllocations = map[string]ResourceCount{}
go func() {
for {
log.Info("Scheduling")
log.Debug("Scheduling")
time.Sleep(time.Second * 5)
scheduler.scheduling.Lock()
scheduler.mu.Lock()
queue := scheduler.nextQueue
if len(scheduler.queues[queue]) > 0 {
jm := JobManager{}
jm.job = scheduler.queues[queue][0]
@ -59,10 +67,12 @@ func (scheduler *SchedulerFair) Start() {
go func() {
jm.start()
scheduler.UpdateNextQueue()
}()
} else {
scheduler.scheduling.Unlock()
go func() {
scheduler.UpdateNextQueue()
}()
}
scheduler.mu.Unlock()
}
@ -140,7 +150,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
job.Status = Created
}
func (scheduler *SchedulerFair) AcquireResource(task Task) NodeStatus {
func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
pool.mu.Lock()
defer pool.mu.Unlock()
@ -156,6 +166,8 @@ func (scheduler *SchedulerFair) AcquireResource(task Task) NodeStatus {
res.ClientID = id
res.ClientHost = node.ClientHost
res.Status = available[0:task.NumberGPU]
res.NumCPU = task.NumberCPU
res.MemTotal = task.Memory
for i := range res.Status {
for j := range node.Status {
@ -168,10 +180,23 @@ func (scheduler *SchedulerFair) AcquireResource(task Task) NodeStatus {
break
}
}
go func(res NodeStatus) {
if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
scheduler.resourceAllocations[job.Group] = ResourceCount{}
}
cnt, _ := scheduler.resourceAllocations[job.Group]
cnt.CPU += res.MemTotal
cnt.Memory += res.NumCPU
for _, v := range res.Status {
cnt.NumberGPU ++
cnt.MemoryGPU += v.MemoryTotal
}
scheduler.UpdateNextQueue()
}(res)
return res
}
func (scheduler *SchedulerFair) ReleaseResource(agent NodeStatus) {
func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
pool.mu.Lock()
defer pool.mu.Unlock()
nodes := pool.nodes[agent.ClientID]
@ -182,6 +207,19 @@ func (scheduler *SchedulerFair) ReleaseResource(agent NodeStatus) {
}
}
}
go func(res NodeStatus) {
if _, ok := scheduler.resourceAllocations[job.Group]; !ok {
scheduler.resourceAllocations[job.Group] = ResourceCount{}
}
cnt, _ := scheduler.resourceAllocations[job.Group]
cnt.CPU -= res.MemTotal
cnt.Memory -= res.NumCPU
for _, v := range res.Status {
cnt.NumberGPU --
cnt.MemoryGPU -= v.MemoryTotal
}
scheduler.UpdateNextQueue()
}(agent)
}
func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus {
@ -285,15 +323,36 @@ func (scheduler *SchedulerFair) ReleaseNetwork(network string) {
}
func (scheduler *SchedulerFair) UpdateNextQueue() {
flag := false
for k := range scheduler.queues {
if flag {
scheduler.nextQueue = k
return
}
if k == scheduler.nextQueue {
flag = true
next := "default"
quota := 9999.0
NumberGPU := 0.00001
MemoryGPU := 0.00001
CPU := 0.00001
Memory := 0.0001
for _, node := range pool.nodes {
CPU += float64(node.NumCPU)
Memory += float64(node.MemTotal)
for _, card := range node.Status {
NumberGPU += 1.0
MemoryGPU += float64(card.MemoryTotal)
}
}
scheduler.nextQueue = "default"
for k, v := range scheduler.resourceAllocations {
tmp := 0.0
tmp += float64(v.CPU) / CPU
tmp += float64(v.Memory) / Memory
tmp += float64(v.NumberGPU) / NumberGPU
tmp += float64(v.MemoryGPU) / MemoryGPU
tmp /= 4
if tmp < quota {
quota = tmp
next = k
}
}
scheduler.nextQueue = next
log.Info("updateNextQueue")
log.Info(scheduler.resourceAllocations)
log.Info("updateNextQueue ->", next)
}

View File

@ -107,7 +107,7 @@ func (scheduler *SchedulerPriority) Schedule(job Job) {
job.Status = Created
}
func (scheduler *SchedulerPriority) AcquireResource(task Task) NodeStatus {
func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStatus {
pool.mu.Lock()
defer pool.mu.Unlock()
@ -138,7 +138,7 @@ func (scheduler *SchedulerPriority) AcquireResource(task Task) NodeStatus {
return res
}
func (scheduler *SchedulerPriority) ReleaseResource(agent NodeStatus) {
func (scheduler *SchedulerPriority) ReleaseResource(job Job, agent NodeStatus) {
pool.mu.Lock()
defer pool.mu.Unlock()
nodes := pool.nodes[agent.ClientID]