1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00

set parallelism

This commit is contained in:
Newnius 2020-04-13 23:53:38 +08:00
parent 490a6b3928
commit ca3ac7aea1
6 changed files with 44 additions and 5 deletions

View File

@ -32,3 +32,8 @@ GPU is occupied by which job(s)
``` ```
?action=debug_disable ?action=debug_disable
``` ```
**UpdateMaxParallelism**
```
?action=debug_update_parallelism&parallelism=5
```

View File

@ -7,6 +7,7 @@ import (
"encoding/json" "encoding/json"
"os" "os"
"time" "time"
"strconv"
) )
var addr = flag.String("addr", "0.0.0.0:8080", "http service address") var addr = flag.String("addr", "0.0.0.0:8080", "http service address")
@ -180,6 +181,14 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
w.Write(js) w.Write(js)
break break
case "debug_update_parallelism":
log.Debug("update_parallelism")
parallelism, _ := strconv.Atoi(r.URL.Query().Get("parallelism"))
js, _ := json.Marshal(scheduler.UpdateParallelism(parallelism))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
default: default:
http.Error(w, "Not Found", http.StatusNotFound) http.Error(w, "Not Found", http.StatusNotFound)
break break

View File

@ -32,4 +32,6 @@ type Scheduler interface {
Enable() bool Enable() bool
Disable() bool Disable() bool
UpdateParallelism(parallelism int) bool
} }

View File

@ -15,6 +15,7 @@ type SchedulerFCFS struct {
jobs map[string]*JobManager jobs map[string]*JobManager
enabled bool enabled bool
parallelism int
} }
func (scheduler *SchedulerFCFS) Start() { func (scheduler *SchedulerFCFS) Start() {
@ -254,3 +255,9 @@ func (scheduler *SchedulerFCFS) Disable() bool {
scheduler.enabled = false scheduler.enabled = false
return true return true
} }
func (scheduler *SchedulerFCFS) UpdateParallelism(parallelism int) bool {
scheduler.parallelism = parallelism
log.Info("parallelism is updated to", parallelism)
return true
}

View File

@ -26,6 +26,7 @@ type SchedulerFair struct {
resourceAllocations map[string]*ResourceCount resourceAllocations map[string]*ResourceCount
enabled bool enabled bool
latestPoolIndex int latestPoolIndex int
parallelism int
} }
type FairJobSorter []Job type FairJobSorter []Job
@ -50,6 +51,8 @@ func (scheduler *SchedulerFair) Start() {
scheduler.enabled = true scheduler.enabled = true
scheduler.schedulingJobsCnt = 0 scheduler.schedulingJobsCnt = 0
scheduler.parallelism = 1
go func() { go func() {
for { for {
log.Debug("Scheduling") log.Debug("Scheduling")
@ -58,7 +61,7 @@ func (scheduler *SchedulerFair) Start() {
continue continue
} }
scheduler.schedulingMu.Lock() scheduler.schedulingMu.Lock()
if scheduler.schedulingJobsCnt >= pool.poolsCount/10 { if scheduler.schedulingJobsCnt >= scheduler.parallelism {
scheduler.schedulingMu.Unlock() scheduler.schedulingMu.Unlock()
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
continue continue
@ -434,3 +437,9 @@ func (scheduler *SchedulerFair) Disable() bool {
log.Info("scheduler is disabled", time.Now()) log.Info("scheduler is disabled", time.Now())
return true return true
} }
func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool {
scheduler.parallelism = parallelism
log.Info("parallelism is updated to", parallelism)
return true
}

View File

@ -15,6 +15,7 @@ type SchedulerPriority struct {
jobs map[string]*JobManager jobs map[string]*JobManager
enabled bool enabled bool
parallelism int
} }
func (scheduler *SchedulerPriority) Start() { func (scheduler *SchedulerPriority) Start() {
@ -278,3 +279,9 @@ func (scheduler *SchedulerPriority) Disable() bool {
scheduler.enabled = false scheduler.enabled = false
return true return true
} }
func (scheduler *SchedulerPriority) UpdateParallelism(parallelism int) bool {
scheduler.parallelism = parallelism
log.Info("parallelism is updated to", parallelism)
return true
}