1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-08 06:41:56 +00:00
YAO-scheduler/src/scheduler_FCFS.go

276 lines
6.1 KiB
Go
Raw Normal View History

2019-03-20 03:14:07 +00:00
package main
import (
"sync"
"time"
2019-07-10 12:40:43 +00:00
log "github.com/sirupsen/logrus"
2020-04-13 14:35:17 +00:00
"math/rand"
2019-05-13 08:31:26 +00:00
)
2019-03-20 03:14:07 +00:00
2019-07-10 12:40:43 +00:00
type SchedulerFCFS struct {
2019-03-25 07:36:30 +00:00
history []*Job
queue []Job
mu sync.Mutex
2019-03-20 03:14:07 +00:00
scheduling sync.Mutex
2019-03-25 07:36:30 +00:00
2020-04-13 15:53:38 +00:00
jobs map[string]*JobManager
enabled bool
parallelism int
2019-03-20 03:14:07 +00:00
}
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) Start() {
scheduler.jobs = map[string]*JobManager{}
scheduler.history = []*Job{}
2020-04-13 10:37:54 +00:00
scheduler.enabled = true
2019-03-25 07:36:30 +00:00
2019-03-20 03:14:07 +00:00
go func() {
for {
2019-07-10 12:40:43 +00:00
log.Info("Scheduling")
2019-03-20 03:14:07 +00:00
time.Sleep(time.Second * 5)
2019-07-10 12:40:43 +00:00
scheduler.scheduling.Lock()
scheduler.mu.Lock()
if len(scheduler.queue) > 0 {
2019-03-20 03:14:07 +00:00
jm := JobManager{}
2019-07-10 12:40:43 +00:00
jm.job = scheduler.queue[0]
scheduler.queue = scheduler.queue[1:]
jm.scheduler = scheduler
scheduler.jobs[jm.job.Name] = &jm
2019-07-12 07:35:05 +00:00
jm.job.Status = Starting
scheduler.history = append(scheduler.history, &jm.job)
2019-03-25 07:36:30 +00:00
2019-03-20 03:14:07 +00:00
go func() {
jm.start()
}()
} else {
2019-07-10 12:40:43 +00:00
scheduler.scheduling.Unlock()
2019-03-20 03:14:07 +00:00
}
2019-07-10 12:40:43 +00:00
scheduler.mu.Unlock()
2019-03-20 03:14:07 +00:00
}
}()
}
2020-05-02 16:16:28 +00:00
func (scheduler *SchedulerFCFS) UpdateProgress(job Job, state State) {
2019-07-10 12:40:43 +00:00
switch state {
case Running:
scheduler.scheduling.Unlock()
2019-04-12 09:21:09 +00:00
2019-07-10 12:40:43 +00:00
for i := range scheduler.history {
2020-05-02 16:16:28 +00:00
if scheduler.history[i].Name == job.Name {
2019-07-10 12:40:43 +00:00
scheduler.history[i].Status = Running
}
2019-03-25 07:36:30 +00:00
}
2019-07-10 12:40:43 +00:00
break
case Finished:
for i := range scheduler.history {
2020-05-02 16:16:28 +00:00
if scheduler.history[i].Name == job.Name {
2019-07-10 12:40:43 +00:00
scheduler.history[i].Status = Finished
}
2019-03-25 07:36:30 +00:00
}
2019-07-10 12:40:43 +00:00
break
2019-07-12 06:58:16 +00:00
case Stopped:
for i := range scheduler.history {
2020-05-02 16:16:28 +00:00
if scheduler.history[i].Name == job.Name {
2019-07-12 06:58:16 +00:00
scheduler.history[i].Status = Stopped
}
}
break
2019-03-25 07:36:30 +00:00
}
2019-03-20 03:14:07 +00:00
}
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) Schedule(job Job) {
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
2019-03-20 03:14:07 +00:00
2019-07-10 12:40:43 +00:00
scheduler.queue = append(scheduler.queue, job)
job.Status = Created
2019-03-20 03:14:07 +00:00
}
2020-04-30 13:22:21 +00:00
func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus {
2020-04-13 14:35:17 +00:00
poolID := rand.Intn(pool.poolsCount)
pool.poolsMu[poolID].Lock()
defer pool.poolsMu[poolID].Unlock()
2019-03-20 03:14:07 +00:00
2019-04-16 08:59:19 +00:00
res := NodeStatus{}
2020-04-13 14:35:17 +00:00
for id, node := range pool.pools[poolID] {
2019-04-16 08:59:19 +00:00
var available []GPUStatus
for _, status := range node.Status {
2019-05-13 08:31:26 +00:00
if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU {
2019-03-20 03:14:07 +00:00
available = append(available, status)
}
}
if len(available) >= task.NumberGPU {
res.ClientID = id
2019-04-16 08:59:19 +00:00
res.ClientHost = node.ClientHost
2019-03-20 03:14:07 +00:00
res.Status = available[0:task.NumberGPU]
2020-04-13 14:35:17 +00:00
res.NumCPU = task.NumberCPU
res.MemTotal = task.Memory
2019-03-20 03:14:07 +00:00
for i := range res.Status {
2019-04-16 08:59:19 +00:00
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
2019-05-13 08:31:26 +00:00
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
2019-03-20 03:14:07 +00:00
}
}
}
2019-04-29 12:35:05 +00:00
break
2019-03-20 03:14:07 +00:00
}
}
return res
}
2019-08-01 05:42:53 +00:00
func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) {
2020-04-13 14:35:17 +00:00
poolID := rand.Intn(pool.poolsCount)
pool.poolsMu[poolID].Lock()
defer pool.poolsMu[poolID].Unlock()
node := pool.pools[poolID][agent.ClientID]
2019-03-20 03:14:07 +00:00
for _, gpu := range agent.Status {
2020-04-13 14:35:17 +00:00
for j := range node.Status {
if gpu.UUID == node.Status[j].UUID {
node.Status[j].MemoryAllocated -= gpu.MemoryTotal
if node.Status[j].MemoryAllocated < 0 {
// in case of error
log.Warn(node.ClientID, "More Memory Allocated")
node.Status[j].MemoryAllocated = 0
}
2019-03-20 03:14:07 +00:00
}
}
}
}
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) QueryState(jobName string) MsgJobStatus {
jm, ok := scheduler.jobs[jobName]
2019-03-25 07:36:30 +00:00
if !ok {
return MsgJobStatus{Code: 1, Error: "Job not exist!"}
2019-03-20 03:14:07 +00:00
}
2019-03-25 07:36:30 +00:00
return jm.status()
2019-03-20 03:14:07 +00:00
}
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop {
jm, ok := scheduler.jobs[jobName]
2019-04-18 09:25:37 +00:00
if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"}
}
return jm.stop()
}
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) QueryLogs(jobName string, taskName string) MsgLog {
jm, ok := scheduler.jobs[jobName]
2019-03-25 07:36:30 +00:00
if !ok {
return MsgLog{Code: 1, Error: "Job not exist!"}
2019-03-20 03:14:07 +00:00
}
2019-03-25 07:36:30 +00:00
return jm.logs(taskName)
}
2019-03-20 03:14:07 +00:00
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) ListJobs() MsgJobList {
2019-07-12 07:35:05 +00:00
var tmp []Job
for _, job := range scheduler.history {
tmp = append(tmp, *job)
}
tmp = append(tmp, scheduler.queue...)
return MsgJobList{Code: 0, Jobs: tmp}
2019-03-20 03:14:07 +00:00
}
2019-04-12 09:21:09 +00:00
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) Summary() MsgSummary {
2019-04-12 09:21:09 +00:00
summary := MsgSummary{}
summary.Code = 0
finishedJobsCounter := 0
runningJobsCounter := 0
pendingJobsCounter := 0
2019-07-12 07:35:05 +00:00
var tmp []Job
2019-07-10 12:40:43 +00:00
for _, job := range scheduler.history {
2019-07-12 07:35:05 +00:00
tmp = append(tmp, *job)
}
tmp = append(tmp, scheduler.queue...)
for _, job := range tmp {
2019-04-12 09:21:09 +00:00
switch job.Status {
case Created:
pendingJobsCounter++
case Starting:
pendingJobsCounter++
break
case Running:
runningJobsCounter++
break;
case Finished:
finishedJobsCounter++
case Stopped:
finishedJobsCounter++
}
}
summary.JobsFinished = finishedJobsCounter
summary.JobsPending = pendingJobsCounter
summary.JobsRunning = runningJobsCounter
FreeGPU := 0
UsingGPU := 0
2020-04-13 14:35:17 +00:00
for i := 0; i < pool.poolsCount; i++ {
pool.poolsMu[i].Lock()
for _, node := range pool.pools[i] {
for j := range node.Status {
if node.Status[j].MemoryAllocated == 0 {
FreeGPU++
} else {
UsingGPU++
}
2019-04-12 09:21:09 +00:00
}
}
2020-04-13 14:35:17 +00:00
pool.poolsMu[i].Unlock()
2019-04-12 09:21:09 +00:00
}
summary.FreeGPU = FreeGPU
summary.UsingGPU = UsingGPU
return summary
}
2019-06-05 09:09:22 +00:00
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) AcquireNetwork() string {
2019-06-05 09:09:22 +00:00
return pool.acquireNetwork()
}
2019-07-10 12:40:43 +00:00
func (scheduler *SchedulerFCFS) ReleaseNetwork(network string) {
2019-06-05 09:09:22 +00:00
pool.releaseNetwork(network)
}
2020-04-11 03:38:04 +00:00
func (scheduler *SchedulerFCFS) Attach(GPU string, job string) {
pool.attach(GPU, job)
}
func (scheduler *SchedulerFCFS) Detach(GPU string, job string) {
pool.detach(GPU, job)
2020-04-13 10:27:57 +00:00
}
2020-04-13 10:37:54 +00:00
func (scheduler *SchedulerFCFS) Enable() bool {
2020-04-13 10:27:57 +00:00
scheduler.enabled = true
2020-04-13 10:37:54 +00:00
return true
2020-04-13 10:27:57 +00:00
}
2020-04-13 10:37:54 +00:00
func (scheduler *SchedulerFCFS) Disable() bool {
2020-04-13 10:27:57 +00:00
scheduler.enabled = false
2020-04-13 10:37:54 +00:00
return true
2020-04-13 10:27:57 +00:00
}
2020-04-13 15:53:38 +00:00
func (scheduler *SchedulerFCFS) UpdateParallelism(parallelism int) bool {
scheduler.parallelism = parallelism
log.Info("parallelism is updated to", parallelism)
return true
}
2020-04-30 09:52:52 +00:00
func (scheduler *SchedulerFCFS) SetShareRatio(ratio float64) bool {
//scheduler.enableShareRatio = ratio
log.Info("enableShareRatio is updated to", ratio)
return true
}
func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool {
//scheduler.enablePreScheduleRatio = ratio
log.Info("enablePreScheduleRatio is updated to", ratio)
return true
2020-04-30 13:22:21 +00:00
}