mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-15 08:16:43 +00:00
update
This commit is contained in:
@@ -302,7 +302,7 @@ func main() {
|
|||||||
scheduler = &SchedulerFCFS{}
|
scheduler = &SchedulerFCFS{}
|
||||||
break
|
break
|
||||||
case "fair":
|
case "fair":
|
||||||
scheduler = &SchedulerFair{}
|
scheduler = &SchedulerCapacity{}
|
||||||
break
|
break
|
||||||
case "priority":
|
case "priority":
|
||||||
scheduler = &SchedulerPriority{}
|
scheduler = &SchedulerPriority{}
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SchedulerFair struct {
|
type SchedulerCapacity struct {
|
||||||
history []*Job
|
history []*Job
|
||||||
historyMu sync.Mutex
|
historyMu sync.Mutex
|
||||||
|
|
||||||
@@ -47,7 +47,7 @@ func (s FairJobSorter) Less(i, j int) bool {
|
|||||||
return s[i].CreatedAt < s[j].CreatedAt
|
return s[i].CreatedAt < s[j].CreatedAt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Start() {
|
func (scheduler *SchedulerCapacity) Start() {
|
||||||
log.Info("JS started")
|
log.Info("JS started")
|
||||||
|
|
||||||
scheduler.jobs = map[string]*JobManager{}
|
scheduler.jobs = map[string]*JobManager{}
|
||||||
@@ -64,7 +64,7 @@ func (scheduler *SchedulerFair) Start() {
|
|||||||
scheduler.parallelism = 1
|
scheduler.parallelism = 1
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
/* fair scheduler */
|
/* capacity scheduler */
|
||||||
flag := true
|
flag := true
|
||||||
for {
|
for {
|
||||||
log.Debug("Scheduling")
|
log.Debug("Scheduling")
|
||||||
@@ -141,7 +141,7 @@ func (scheduler *SchedulerFair) Start() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) {
|
func (scheduler *SchedulerCapacity) UpdateProgress(job Job, state State) {
|
||||||
scheduler.historyMu.Lock()
|
scheduler.historyMu.Lock()
|
||||||
defer scheduler.historyMu.Unlock()
|
defer scheduler.historyMu.Unlock()
|
||||||
|
|
||||||
@@ -195,7 +195,7 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Schedule(job Job) {
|
func (scheduler *SchedulerCapacity) Schedule(job Job) {
|
||||||
scheduler.queueMu.Lock()
|
scheduler.queueMu.Lock()
|
||||||
defer scheduler.queueMu.Unlock()
|
defer scheduler.queueMu.Unlock()
|
||||||
|
|
||||||
@@ -236,7 +236,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
|
|||||||
job.Status = Created
|
job.Status = Created
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) AcquireResource(job Job) []NodeStatus {
|
func (scheduler *SchedulerCapacity) AcquireResource(job Job) []NodeStatus {
|
||||||
res := InstanceOfResourcePool().acquireResource(job)
|
res := InstanceOfResourcePool().acquireResource(job)
|
||||||
|
|
||||||
if len(res) != 0 {
|
if len(res) != 0 {
|
||||||
@@ -271,7 +271,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job) []NodeStatus {
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
|
func (scheduler *SchedulerCapacity) ReleaseResource(job Job, agent NodeStatus) {
|
||||||
InstanceOfResourcePool().releaseResource(job, agent)
|
InstanceOfResourcePool().releaseResource(job, agent)
|
||||||
go func(res NodeStatus) {
|
go func(res NodeStatus) {
|
||||||
scheduler.resourceAllocationsMu.Lock()
|
scheduler.resourceAllocationsMu.Lock()
|
||||||
@@ -290,7 +290,7 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
|
|||||||
}(agent)
|
}(agent)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus {
|
func (scheduler *SchedulerCapacity) QueryState(jobName string) MsgJobStatus {
|
||||||
scheduler.queueMu.Lock()
|
scheduler.queueMu.Lock()
|
||||||
jm, ok := scheduler.jobs[jobName]
|
jm, ok := scheduler.jobs[jobName]
|
||||||
scheduler.queueMu.Unlock()
|
scheduler.queueMu.Unlock()
|
||||||
@@ -300,7 +300,7 @@ func (scheduler *SchedulerFair) QueryState(jobName string) MsgJobStatus {
|
|||||||
return jm.status()
|
return jm.status()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Stop(jobName string) MsgStop {
|
func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop {
|
||||||
scheduler.queueMu.Lock()
|
scheduler.queueMu.Lock()
|
||||||
jm, ok := scheduler.jobs[jobName]
|
jm, ok := scheduler.jobs[jobName]
|
||||||
scheduler.queueMu.Unlock()
|
scheduler.queueMu.Unlock()
|
||||||
@@ -310,7 +310,7 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop {
|
|||||||
return jm.stop(true)
|
return jm.stop(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLog {
|
func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog {
|
||||||
scheduler.queueMu.Lock()
|
scheduler.queueMu.Lock()
|
||||||
jm, ok := scheduler.jobs[jobName]
|
jm, ok := scheduler.jobs[jobName]
|
||||||
scheduler.queueMu.Unlock()
|
scheduler.queueMu.Unlock()
|
||||||
@@ -320,7 +320,7 @@ func (scheduler *SchedulerFair) QueryLogs(jobName string, taskName string) MsgLo
|
|||||||
return jm.logs(taskName)
|
return jm.logs(taskName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) ListJobs() MsgJobList {
|
func (scheduler *SchedulerCapacity) ListJobs() MsgJobList {
|
||||||
var jobs []Job
|
var jobs []Job
|
||||||
scheduler.historyMu.Lock()
|
scheduler.historyMu.Lock()
|
||||||
for _, job := range scheduler.history {
|
for _, job := range scheduler.history {
|
||||||
@@ -336,7 +336,7 @@ func (scheduler *SchedulerFair) ListJobs() MsgJobList {
|
|||||||
return MsgJobList{Code: 0, Jobs: jobs}
|
return MsgJobList{Code: 0, Jobs: jobs}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Summary() MsgSummary {
|
func (scheduler *SchedulerCapacity) Summary() MsgSummary {
|
||||||
summary := MsgSummary{}
|
summary := MsgSummary{}
|
||||||
summary.Code = 0
|
summary.Code = 0
|
||||||
|
|
||||||
@@ -381,7 +381,7 @@ func (scheduler *SchedulerFair) Summary() MsgSummary {
|
|||||||
return summary
|
return summary
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) UpdateNextQueue() {
|
func (scheduler *SchedulerCapacity) UpdateNextQueue() {
|
||||||
next := "default"
|
next := "default"
|
||||||
quota := math.MaxFloat64
|
quota := math.MaxFloat64
|
||||||
|
|
||||||
@@ -415,25 +415,25 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
|
|||||||
log.Debug("updateNextQueue ->", next)
|
log.Debug("updateNextQueue ->", next)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Enable() bool {
|
func (scheduler *SchedulerCapacity) Enable() bool {
|
||||||
scheduler.enabled = true
|
scheduler.enabled = true
|
||||||
log.Info("scheduler is enabled ", time.Now())
|
log.Info("scheduler is enabled ", time.Now())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Disable() bool {
|
func (scheduler *SchedulerCapacity) Disable() bool {
|
||||||
scheduler.enabled = false
|
scheduler.enabled = false
|
||||||
log.Info("scheduler is disabled ", time.Now())
|
log.Info("scheduler is disabled ", time.Now())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool {
|
func (scheduler *SchedulerCapacity) UpdateParallelism(parallelism int) bool {
|
||||||
scheduler.parallelism = parallelism
|
scheduler.parallelism = parallelism
|
||||||
log.Info("parallelism is updated to ", parallelism)
|
log.Info("parallelism is updated to ", parallelism)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) updateGroup(group Group) bool {
|
func (scheduler *SchedulerCapacity) updateGroup(group Group) bool {
|
||||||
num := 0
|
num := 0
|
||||||
for _, g := range InstanceOfGroupManager().List().Groups {
|
for _, g := range InstanceOfGroupManager().List().Groups {
|
||||||
if g.Reserved {
|
if g.Reserved {
|
||||||
Reference in New Issue
Block a user