mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-13 07:46:43 +00:00
add scheduler_priority
This commit is contained in:
10
src/job_priority.go
Normal file
10
src/job_priority.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package main
|
||||
|
||||
type JobPriority int
|
||||
|
||||
const (
|
||||
Low JobPriority = iota
|
||||
Medium
|
||||
Hight
|
||||
Urgent
|
||||
)
|
||||
@@ -51,7 +51,6 @@ func (scheduler *SchedulerFCFS) Start() {
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerFCFS) UpdateProgress(jobName string, state State) {
|
||||
scheduler.scheduling.Unlock()
|
||||
switch state {
|
||||
case Running:
|
||||
scheduler.scheduling.Unlock()
|
||||
@@ -69,6 +68,13 @@ func (scheduler *SchedulerFCFS) UpdateProgress(jobName string, state State) {
|
||||
}
|
||||
}
|
||||
break
|
||||
case Stopped:
|
||||
for i := range scheduler.history {
|
||||
if scheduler.history[i].Name == jobName {
|
||||
scheduler.history[i].Status = Stopped
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
240
src/scheduler_priority.go
Normal file
240
src/scheduler_priority.go
Normal file
@@ -0,0 +1,240 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type SchedulerPriority struct {
|
||||
history []*Job
|
||||
queue []Job
|
||||
mu sync.Mutex
|
||||
scheduling sync.Mutex
|
||||
|
||||
jobs map[string]*JobManager
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) Start() {
|
||||
scheduler.jobs = map[string]*JobManager{}
|
||||
scheduler.history = []*Job{}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
log.Info("Scheduling")
|
||||
time.Sleep(time.Second * 5)
|
||||
scheduler.scheduling.Lock()
|
||||
scheduler.mu.Lock()
|
||||
if len(scheduler.queue) > 0 {
|
||||
|
||||
jm := JobManager{}
|
||||
jm.job = scheduler.queue[0]
|
||||
scheduler.queue = scheduler.queue[1:]
|
||||
jm.scheduler = scheduler
|
||||
scheduler.jobs[jm.job.Name] = &jm
|
||||
|
||||
for i := range scheduler.history {
|
||||
if scheduler.history[i].Name == jm.job.Name {
|
||||
scheduler.history[i].Status = Starting
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
jm.start()
|
||||
}()
|
||||
} else {
|
||||
scheduler.scheduling.Unlock()
|
||||
}
|
||||
scheduler.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) UpdateProgress(jobName string, state State) {
|
||||
switch state {
|
||||
case Running:
|
||||
scheduler.scheduling.Unlock()
|
||||
|
||||
for i := range scheduler.history {
|
||||
if scheduler.history[i].Name == jobName {
|
||||
scheduler.history[i].Status = Running
|
||||
}
|
||||
}
|
||||
break
|
||||
case Finished:
|
||||
for i := range scheduler.history {
|
||||
if scheduler.history[i].Name == jobName {
|
||||
scheduler.history[i].Status = Finished
|
||||
}
|
||||
}
|
||||
break
|
||||
case Stopped:
|
||||
for i := range scheduler.history {
|
||||
if scheduler.history[i].Name == jobName {
|
||||
scheduler.history[i].Status = Stopped
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) Schedule(job Job) {
|
||||
scheduler.mu.Lock()
|
||||
defer scheduler.mu.Unlock()
|
||||
|
||||
index := 0
|
||||
|
||||
left := 0
|
||||
right := len(scheduler.queue) - 1
|
||||
for ; left <= right; {
|
||||
mid := (left + right) / 2
|
||||
if scheduler.queue[left].Priority > job.Priority {
|
||||
index = left
|
||||
break
|
||||
}
|
||||
if scheduler.queue[right].Priority <= job.Priority {
|
||||
index = right + 1
|
||||
break
|
||||
}
|
||||
if scheduler.queue[mid].Priority <= job.Priority {
|
||||
left = mid + 1
|
||||
} else {
|
||||
right = mid - 1
|
||||
}
|
||||
}
|
||||
scheduler.queue = append(scheduler.queue, Job{})
|
||||
|
||||
copy(scheduler.queue[index+1:], scheduler.queue[index:])
|
||||
scheduler.queue[index] = job
|
||||
fmt.Println(scheduler.queue)
|
||||
|
||||
scheduler.history = append(scheduler.history, &job)
|
||||
job.Status = Created
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) AcquireResource(task Task) NodeStatus {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
res := NodeStatus{}
|
||||
for id, node := range pool.nodes {
|
||||
var available []GPUStatus
|
||||
for _, status := range node.Status {
|
||||
if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU {
|
||||
available = append(available, status)
|
||||
}
|
||||
}
|
||||
if len(available) >= task.NumberGPU {
|
||||
res.ClientID = id
|
||||
res.ClientHost = node.ClientHost
|
||||
res.Status = available[0:task.NumberGPU]
|
||||
|
||||
for i := range res.Status {
|
||||
for j := range node.Status {
|
||||
if res.Status[i].UUID == node.Status[j].UUID {
|
||||
node.Status[j].MemoryAllocated += task.MemoryGPU
|
||||
res.Status[i].MemoryTotal = task.MemoryGPU
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) ReleaseResource(agent NodeStatus) {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
nodes := pool.nodes[agent.ClientID]
|
||||
for _, gpu := range agent.Status {
|
||||
for j := range nodes.Status {
|
||||
if gpu.UUID == nodes.Status[j].UUID {
|
||||
nodes.Status[j].MemoryAllocated -= gpu.MemoryTotal
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) QueryState(jobName string) MsgJobStatus {
|
||||
jm, ok := scheduler.jobs[jobName]
|
||||
if !ok {
|
||||
return MsgJobStatus{Code: 1, Error: "Job not exist!"}
|
||||
}
|
||||
return jm.status()
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) Stop(jobName string) MsgStop {
|
||||
jm, ok := scheduler.jobs[jobName]
|
||||
if !ok {
|
||||
return MsgStop{Code: 1, Error: "Job not exist!"}
|
||||
}
|
||||
return jm.stop()
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) QueryLogs(jobName string, taskName string) MsgLog {
|
||||
jm, ok := scheduler.jobs[jobName]
|
||||
if !ok {
|
||||
return MsgLog{Code: 1, Error: "Job not exist!"}
|
||||
}
|
||||
return jm.logs(taskName)
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) ListJobs() MsgJobList {
|
||||
return MsgJobList{Code: 0, Jobs: scheduler.history}
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) Summary() MsgSummary {
|
||||
summary := MsgSummary{}
|
||||
summary.Code = 0
|
||||
|
||||
finishedJobsCounter := 0
|
||||
runningJobsCounter := 0
|
||||
pendingJobsCounter := 0
|
||||
|
||||
for _, job := range scheduler.history {
|
||||
switch job.Status {
|
||||
case Created:
|
||||
pendingJobsCounter++
|
||||
case Starting:
|
||||
pendingJobsCounter++
|
||||
break
|
||||
case Running:
|
||||
runningJobsCounter++
|
||||
break;
|
||||
case Finished:
|
||||
finishedJobsCounter++
|
||||
case Stopped:
|
||||
finishedJobsCounter++
|
||||
}
|
||||
}
|
||||
summary.JobsFinished = finishedJobsCounter
|
||||
summary.JobsPending = pendingJobsCounter
|
||||
summary.JobsRunning = runningJobsCounter
|
||||
|
||||
FreeGPU := 0
|
||||
UsingGPU := 0
|
||||
|
||||
for _, node := range pool.nodes {
|
||||
for j := range node.Status {
|
||||
if node.Status[j].MemoryAllocated == 0 {
|
||||
FreeGPU++
|
||||
} else {
|
||||
UsingGPU++
|
||||
}
|
||||
}
|
||||
}
|
||||
summary.FreeGPU = FreeGPU
|
||||
summary.UsingGPU = UsingGPU
|
||||
|
||||
return summary
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) AcquireNetwork() string {
|
||||
return pool.acquireNetwork()
|
||||
}
|
||||
|
||||
func (scheduler *SchedulerPriority) ReleaseNetwork(network string) {
|
||||
pool.releaseNetwork(network)
|
||||
}
|
||||
Reference in New Issue
Block a user