mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-15 16:16:44 +00:00
update
This commit is contained in:
23
src/main.go
23
src/main.go
@@ -189,6 +189,29 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Write(js)
|
w.Write(js)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
case "debug_update_enable_share_ratio":
|
||||||
|
log.Debug("debug_update_enable_share_ratio")
|
||||||
|
|
||||||
|
ratio := 0.75
|
||||||
|
if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil {
|
||||||
|
ratio = t
|
||||||
|
}
|
||||||
|
js, _ := json.Marshal(scheduler.SetShareRatio(ratio))
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Write(js)
|
||||||
|
break
|
||||||
|
|
||||||
|
case "debug_update_enable_pre_schedule_ratio":
|
||||||
|
log.Debug("debug_update_enable_pre_schedule_ratio")
|
||||||
|
ratio := 0.95
|
||||||
|
if t, err := strconv.ParseFloat(r.URL.Query().Get("ratio"), 32); err == nil {
|
||||||
|
ratio = t
|
||||||
|
}
|
||||||
|
js, _ := json.Marshal(scheduler.SetPreScheduleRatio(ratio))
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Write(js)
|
||||||
|
break
|
||||||
|
|
||||||
case "debug_get_predicts":
|
case "debug_get_predicts":
|
||||||
log.Debug("debug_get_predicts")
|
log.Debug("debug_get_predicts")
|
||||||
js, _ := json.Marshal(InstanceOfOptimizer().getAllPredicts())
|
js, _ := json.Marshal(InstanceOfOptimizer().getAllPredicts())
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ type Optimizer struct {
|
|||||||
predicts map[string]*OptimizerJobExecutionTime
|
predicts map[string]*OptimizerJobExecutionTime
|
||||||
|
|
||||||
jobUtilsGPU map[string]*OptimizerUtilGPU
|
jobUtilsGPU map[string]*OptimizerUtilGPU
|
||||||
|
|
||||||
|
heartbeatInterval int
|
||||||
}
|
}
|
||||||
|
|
||||||
var optimizerInstance *Optimizer
|
var optimizerInstance *Optimizer
|
||||||
@@ -26,6 +28,7 @@ func InstanceOfOptimizer() *Optimizer {
|
|||||||
optimizerInstance = &Optimizer{}
|
optimizerInstance = &Optimizer{}
|
||||||
optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{}
|
optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{}
|
||||||
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
|
optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{}
|
||||||
|
optimizerInstance.heartbeatInterval = 3
|
||||||
}
|
}
|
||||||
return optimizerInstance
|
return optimizerInstance
|
||||||
}
|
}
|
||||||
@@ -74,6 +77,8 @@ func (optimizer *Optimizer) feed(job string, utils []int) {
|
|||||||
if _, ok := optimizer.predicts[jobName]; !ok {
|
if _, ok := optimizer.predicts[jobName]; !ok {
|
||||||
optimizer.predicts[jobName] = &OptimizerJobExecutionTime{}
|
optimizer.predicts[jobName] = &OptimizerJobExecutionTime{}
|
||||||
}
|
}
|
||||||
|
postCnt *= optimizer.heartbeatInterval
|
||||||
|
preCnt *= optimizer.heartbeatInterval
|
||||||
predict := optimizer.predicts[jobName]
|
predict := optimizer.predicts[jobName]
|
||||||
predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1)
|
predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1)
|
||||||
predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1)
|
predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1)
|
||||||
|
|||||||
@@ -35,6 +35,8 @@ type ResourcePool struct {
|
|||||||
bindings map[string]map[string]bool
|
bindings map[string]map[string]bool
|
||||||
bindingsMu sync.Mutex
|
bindingsMu sync.Mutex
|
||||||
utils map[string][]int
|
utils map[string][]int
|
||||||
|
|
||||||
|
TotalGPU int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) GPUModelToPower(model string) int {
|
func (pool *ResourcePool) GPUModelToPower(model string) int {
|
||||||
@@ -60,6 +62,8 @@ func (pool *ResourcePool) start() {
|
|||||||
pool.bindings = map[string]map[string]bool{}
|
pool.bindings = map[string]map[string]bool{}
|
||||||
pool.utils = map[string][]int{}
|
pool.utils = map[string][]int{}
|
||||||
|
|
||||||
|
pool.TotalGPU = 0
|
||||||
|
|
||||||
pool.poolsCount = 100
|
pool.poolsCount = 100
|
||||||
for i := 0; i < pool.poolsCount; i++ {
|
for i := 0; i < pool.poolsCount; i++ {
|
||||||
pool.pools = append(pool.pools, map[string]NodeStatus{})
|
pool.pools = append(pool.pools, map[string]NodeStatus{})
|
||||||
@@ -141,6 +145,8 @@ func (pool *ResourcePool) start() {
|
|||||||
if len(pool.history) > 60 {
|
if len(pool.history) > 60 {
|
||||||
pool.history = pool.history[len(pool.history)-60:]
|
pool.history = pool.history[len(pool.history)-60:]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pool.TotalGPU = TotalGPU
|
||||||
time.Sleep(time.Second * 60)
|
time.Sleep(time.Second * 60)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@@ -34,4 +34,8 @@ type Scheduler interface {
|
|||||||
Disable() bool
|
Disable() bool
|
||||||
|
|
||||||
UpdateParallelism(parallelism int) bool
|
UpdateParallelism(parallelism int) bool
|
||||||
|
|
||||||
|
SetShareRatio(ratio float64) bool
|
||||||
|
|
||||||
|
SetPreScheduleRatio(ratio float64) bool
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -261,3 +261,15 @@ func (scheduler *SchedulerFCFS) UpdateParallelism(parallelism int) bool {
|
|||||||
log.Info("parallelism is updated to", parallelism)
|
log.Info("parallelism is updated to", parallelism)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (scheduler *SchedulerFCFS) SetShareRatio(ratio float64) bool {
|
||||||
|
//scheduler.enableShareRatio = ratio
|
||||||
|
log.Info("enableShareRatio is updated to", ratio)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool {
|
||||||
|
//scheduler.enablePreScheduleRatio = ratio
|
||||||
|
log.Info("enablePreScheduleRatio is updated to", ratio)
|
||||||
|
return true
|
||||||
|
}
|
||||||
@@ -28,8 +28,13 @@ type SchedulerFair struct {
|
|||||||
resourceAllocationsMu sync.Mutex
|
resourceAllocationsMu sync.Mutex
|
||||||
enabled bool
|
enabled bool
|
||||||
parallelism int
|
parallelism int
|
||||||
enableShare bool
|
|
||||||
enablePreSchedule bool
|
enableShare bool
|
||||||
|
enableShareRatio float64
|
||||||
|
enablePreSchedule bool
|
||||||
|
enablePreScheduleRatio float64
|
||||||
|
|
||||||
|
UsingGPU int
|
||||||
}
|
}
|
||||||
|
|
||||||
type FairJobSorter []Job
|
type FairJobSorter []Job
|
||||||
@@ -53,8 +58,13 @@ func (scheduler *SchedulerFair) Start() {
|
|||||||
scheduler.resourceAllocations = map[string]*ResourceCount{}
|
scheduler.resourceAllocations = map[string]*ResourceCount{}
|
||||||
scheduler.enabled = true
|
scheduler.enabled = true
|
||||||
scheduler.schedulingJobsCnt = 0
|
scheduler.schedulingJobsCnt = 0
|
||||||
|
|
||||||
scheduler.enableShare = true
|
scheduler.enableShare = true
|
||||||
|
scheduler.enableShareRatio = 0.75
|
||||||
scheduler.enablePreSchedule = true
|
scheduler.enablePreSchedule = true
|
||||||
|
scheduler.enablePreScheduleRatio = 0.95
|
||||||
|
|
||||||
|
scheduler.UsingGPU = 0
|
||||||
|
|
||||||
scheduler.parallelism = 1
|
scheduler.parallelism = 1
|
||||||
|
|
||||||
@@ -189,39 +199,15 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
|
|
||||||
locks := map[int]sync.Mutex{}
|
locks := map[int]sync.Mutex{}
|
||||||
|
|
||||||
allocationType := 1
|
allocationType := 0
|
||||||
availableGPUs := map[string][]GPUStatus{}
|
availableGPUs := map[string][]GPUStatus{}
|
||||||
|
|
||||||
var candidates []NodeStatus
|
var candidates []NodeStatus
|
||||||
/* first round, find vacant gpu */
|
|
||||||
for i := 0; i < pool.poolsCount; i++ {
|
|
||||||
pool.poolsMu[(i+poolID)%pool.poolsCount].Lock()
|
|
||||||
locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount]
|
|
||||||
for _, node := range pool.pools[(i+poolID)%pool.poolsCount] {
|
|
||||||
var available []GPUStatus
|
|
||||||
for _, status := range node.Status {
|
|
||||||
if status.MemoryAllocated == 0 && status.MemoryUsed < 10 {
|
|
||||||
available = append(available, status)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(available) >= task.NumberGPU {
|
|
||||||
candidates = append(candidates, node)
|
|
||||||
availableGPUs[node.ClientID] = available
|
|
||||||
if len(candidates) >= 8 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(candidates) >= 8 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
log.Info(candidates)
|
|
||||||
|
|
||||||
/* second round, find sharable gpu */
|
/* first, choose sharable GPUs */
|
||||||
if len(candidates) == 0 && scheduler.enableShare {
|
if scheduler.enableShare && (pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) > scheduler.enableShareRatio) {
|
||||||
// check sharable
|
// check sharable
|
||||||
allocationType = 2
|
allocationType = 1
|
||||||
if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid {
|
if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid {
|
||||||
|
|
||||||
for i := 0; i < pool.poolsCount; i++ {
|
for i := 0; i < pool.poolsCount; i++ {
|
||||||
@@ -242,7 +228,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
totalUtil += utilT
|
totalUtil += utilT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if totalUtil < 110 {
|
if totalUtil < 100 {
|
||||||
available = append(available, status)
|
available = append(available, status)
|
||||||
availableGPUs[node.ClientID] = available
|
availableGPUs[node.ClientID] = available
|
||||||
}
|
}
|
||||||
@@ -264,8 +250,67 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
log.Info(candidates)
|
log.Info(candidates)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(allocationType)
|
/* second round, find vacant gpu */
|
||||||
/*assign*/
|
if len(candidates) == 0 {
|
||||||
|
allocationType = 2
|
||||||
|
for i := 0; i < pool.poolsCount; i++ {
|
||||||
|
pool.poolsMu[(i+poolID)%pool.poolsCount].Lock()
|
||||||
|
locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount]
|
||||||
|
for _, node := range pool.pools[(i+poolID)%pool.poolsCount] {
|
||||||
|
var available []GPUStatus
|
||||||
|
for _, status := range node.Status {
|
||||||
|
if status.MemoryAllocated == 0 && status.MemoryUsed < 10 {
|
||||||
|
available = append(available, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(available) >= task.NumberGPU {
|
||||||
|
candidates = append(candidates, node)
|
||||||
|
availableGPUs[node.ClientID] = available
|
||||||
|
if len(candidates) >= 8 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(candidates) >= 8 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info(candidates)
|
||||||
|
}
|
||||||
|
|
||||||
|
/* third round, find gpu to be released */
|
||||||
|
if len(candidates) == 0 && len(job.Tasks) == 1 && scheduler.enablePreSchedule {
|
||||||
|
if pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) > scheduler.enablePreScheduleRatio {
|
||||||
|
allocationType = 3
|
||||||
|
for i := 0; i < pool.poolsCount; i++ {
|
||||||
|
pool.poolsMu[(i+poolID)%pool.poolsCount].Lock()
|
||||||
|
locks[(i+poolID)%pool.poolsCount] = pool.poolsMu[(i+poolID)%pool.poolsCount]
|
||||||
|
for _, node := range pool.pools[(i+poolID)%pool.poolsCount] {
|
||||||
|
var available []GPUStatus
|
||||||
|
for _, status := range node.Status {
|
||||||
|
if status.MemoryAllocated == 0 && status.MemoryUsed < 10 {
|
||||||
|
available = append(available, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(available) >= task.NumberGPU {
|
||||||
|
candidates = append(candidates, node)
|
||||||
|
availableGPUs[node.ClientID] = available
|
||||||
|
if len(candidates) >= 8 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(candidates) >= 8 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info(candidates)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("allocationType is ", allocationType)
|
||||||
|
|
||||||
|
/* assign */
|
||||||
if len(candidates) > 0 {
|
if len(candidates) > 0 {
|
||||||
node := candidates[0]
|
node := candidates[0]
|
||||||
res.ClientID = node.ClientID
|
res.ClientID = node.ClientID
|
||||||
@@ -530,3 +575,15 @@ func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool {
|
|||||||
log.Info("parallelism is updated to", parallelism)
|
log.Info("parallelism is updated to", parallelism)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (scheduler *SchedulerFair) SetShareRatio(ratio float64) bool {
|
||||||
|
scheduler.enableShareRatio = ratio
|
||||||
|
log.Info("enableShareRatio is updated to", ratio)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool {
|
||||||
|
scheduler.enablePreScheduleRatio = ratio
|
||||||
|
log.Info("enablePreScheduleRatio is updated to", ratio)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|||||||
@@ -285,3 +285,15 @@ func (scheduler *SchedulerPriority) UpdateParallelism(parallelism int) bool {
|
|||||||
log.Info("parallelism is updated to", parallelism)
|
log.Info("parallelism is updated to", parallelism)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (scheduler *SchedulerPriority) SetShareRatio(ratio float64) bool {
|
||||||
|
//scheduler.enableShareRatio = ratio
|
||||||
|
log.Info("enableShareRatio is updated to", ratio)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool {
|
||||||
|
//scheduler.enablePreScheduleRatio = ratio
|
||||||
|
log.Info("enablePreScheduleRatio is updated to", ratio)
|
||||||
|
return true
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user