1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-12 23:36:44 +00:00

add concurrent

This commit is contained in:
2020-04-13 22:35:17 +08:00
parent 2705e12c52
commit 7e1c4349b9
5 changed files with 168 additions and 91 deletions

View File

@@ -5,6 +5,7 @@ import (
"time"
log "github.com/sirupsen/logrus"
"sort"
"math/rand"
)
type ResourceCount struct {
@@ -17,8 +18,9 @@ type ResourceCount struct {
type SchedulerFair struct {
history []*Job
queues map[string][]Job
mu sync.Mutex
scheduling sync.Mutex
queueMu sync.Mutex
schedulingMu sync.Mutex
schedulingJobsCnt int
jobs map[string]*JobManager
nextQueue string
resourceAllocations map[string]*ResourceCount
@@ -46,6 +48,7 @@ func (scheduler *SchedulerFair) Start() {
scheduler.queues["default"] = []Job{}
scheduler.resourceAllocations = map[string]*ResourceCount{}
scheduler.enabled = true
scheduler.schedulingJobsCnt = 0
go func() {
for {
@@ -54,8 +57,14 @@ func (scheduler *SchedulerFair) Start() {
if !scheduler.enabled {
continue
}
scheduler.scheduling.Lock()
scheduler.mu.Lock()
scheduler.schedulingMu.Lock()
if scheduler.schedulingJobsCnt >= pool.poolsCount {
scheduler.schedulingMu.Unlock()
continue
}
scheduler.schedulingJobsCnt++
scheduler.schedulingMu.Unlock()
scheduler.queueMu.Lock()
queue := scheduler.nextQueue
if len(scheduler.queues[queue]) > 0 {
jm := JobManager{}
@@ -72,13 +81,12 @@ func (scheduler *SchedulerFair) Start() {
jm.start()
}()
} else {
log.Info("No more jobs to scheduling")
scheduler.scheduling.Unlock()
log.Info("No more jobs to scheduling", time.Now())
go func() {
scheduler.UpdateNextQueue()
}()
}
scheduler.mu.Unlock()
scheduler.queueMu.Unlock()
}
}()
}
@@ -86,7 +94,9 @@ func (scheduler *SchedulerFair) Start() {
func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) {
switch state {
case Running:
scheduler.scheduling.Unlock()
scheduler.schedulingMu.Lock()
scheduler.schedulingJobsCnt--
scheduler.schedulingMu.Unlock()
for i := range scheduler.history {
if scheduler.history[i].Name == jobName {
@@ -115,8 +125,8 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) {
}
func (scheduler *SchedulerFair) Schedule(job Job) {
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
scheduler.queueMu.Lock()
defer scheduler.queueMu.Unlock()
queue := job.Group
_, ok := scheduler.queues[queue]
@@ -156,11 +166,12 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
}
func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
pool.mu.Lock()
defer pool.mu.Unlock()
poolID := rand.Intn(pool.poolsCount)
pool.poolsMu[poolID].Lock()
defer pool.poolsMu[poolID].Unlock()
res := NodeStatus{}
for id, node := range pool.nodes {
for id, node := range pool.pools[poolID] {
var available []GPUStatus
for _, status := range node.Status {
if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU {
@@ -206,9 +217,11 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
}
func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
pool.mu.Lock()
defer pool.mu.Unlock()
node := pool.nodes[agent.ClientID]
poolID := rand.Intn(pool.poolsCount)
pool.poolsMu[poolID].Lock()
defer pool.poolsMu[poolID].Unlock()
node := pool.pools[poolID][agent.ClientID]
for _, gpu := range agent.Status {
for j := range node.Status {
if gpu.UUID == node.Status[j].UUID {
@@ -313,17 +326,19 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
FreeGPU := 0
UsingGPU := 0
pool.mu.Lock()
for _, node := range pool.nodes {
for j := range node.Status {
if node.Status[j].MemoryAllocated == 0 {
FreeGPU++
} else {
UsingGPU++
for i := 0; i < pool.poolsCount; i++ {
pool.poolsMu[i].Lock()
for _, node := range pool.pools[i] {
for j := range node.Status {
if node.Status[j].MemoryAllocated == 0 {
FreeGPU++
} else {
UsingGPU++
}
}
}
pool.poolsMu[i].Unlock()
}
pool.mu.Unlock()
summary.FreeGPU = FreeGPU
summary.UsingGPU = UsingGPU
@@ -346,16 +361,18 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
MemoryGPU := 0.00001
CPU := 0.00001
Memory := 0.0001
pool.mu.Lock()
for _, node := range pool.nodes {
CPU += float64(node.NumCPU)
Memory += float64(node.MemTotal)
for _, card := range node.Status {
NumberGPU += 1.0
MemoryGPU += float64(card.MemoryTotal)
for i := 0; i < pool.poolsCount; i++ {
pool.poolsMu[i].Lock()
for _, node := range pool.pools[i] {
CPU += float64(node.NumCPU)
Memory += float64(node.MemTotal)
for _, card := range node.Status {
NumberGPU += 1.0
MemoryGPU += float64(card.MemoryTotal)
}
}
pool.poolsMu[i].Unlock()
}
pool.mu.Unlock()
for k, t := range scheduler.queues {
if len(t) == 0 {