mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-15 08:16:43 +00:00
update
This commit is contained in:
@@ -33,7 +33,7 @@ func (jm *JobManager) start() {
|
|||||||
if jm.killedFlag {
|
if jm.killedFlag {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i])
|
resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], jm.resources)
|
||||||
if len(resource.Status) > 0 {
|
if len(resource.Status) > 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,11 @@ type ResourcePool struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) GPUModelToPower(model string) int {
|
func (pool *ResourcePool) GPUModelToPower(model string) int {
|
||||||
mapper := map[string]int{"k40": 1, "K80": 2, "P100": 3}
|
mapper := map[string]int{
|
||||||
|
"K40": 1, "Tesla K40": 1,
|
||||||
|
"K80": 2, "Tesla K80": 2,
|
||||||
|
"P100": 3, "Tesla P100": 3,
|
||||||
|
}
|
||||||
if power, err := mapper[model]; !err {
|
if power, err := mapper[model]; !err {
|
||||||
return power
|
return power
|
||||||
}
|
}
|
||||||
@@ -297,3 +301,17 @@ func (pool *ResourcePool) detach(GPU string, jobName string) {
|
|||||||
func (pool *ResourcePool) getBindings() map[string]map[string]int {
|
func (pool *ResourcePool) getBindings() map[string]map[string]int {
|
||||||
return pool.bindings
|
return pool.bindings
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pool *ResourcePool) pickNode(nodes []*NodeStatus) *NodeStatus {
|
||||||
|
|
||||||
|
/* shuffle */
|
||||||
|
r := rand.New(rand.NewSource(time.Now().Unix()))
|
||||||
|
for n := len(nodes); n > 0; n-- {
|
||||||
|
randIndex := r.Intn(n)
|
||||||
|
nodes[n-1], nodes[randIndex] = nodes[randIndex], nodes[n-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
/* sort */
|
||||||
|
|
||||||
|
return nodes[0]
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ type Scheduler interface {
|
|||||||
|
|
||||||
UpdateProgress(jobName string, state State)
|
UpdateProgress(jobName string, state State)
|
||||||
|
|
||||||
AcquireResource(Job, Task) NodeStatus
|
AcquireResource(Job, Task, []NodeStatus) NodeStatus
|
||||||
|
|
||||||
ReleaseResource(Job, NodeStatus)
|
ReleaseResource(Job, NodeStatus)
|
||||||
|
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) {
|
|||||||
job.Status = Created
|
job.Status = Created
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus {
|
func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus {
|
||||||
poolID := rand.Intn(pool.poolsCount)
|
poolID := rand.Intn(pool.poolsCount)
|
||||||
pool.poolsMu[poolID].Lock()
|
pool.poolsMu[poolID].Lock()
|
||||||
defer pool.poolsMu[poolID].Unlock()
|
defer pool.poolsMu[poolID].Unlock()
|
||||||
@@ -272,4 +272,4 @@ func (scheduler *SchedulerFCFS) SetPreScheduleRatio(ratio float64) bool {
|
|||||||
//scheduler.enablePreScheduleRatio = ratio
|
//scheduler.enablePreScheduleRatio = ratio
|
||||||
log.Info("enablePreScheduleRatio is updated to", ratio)
|
log.Info("enablePreScheduleRatio is updated to", ratio)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ type SchedulerFair struct {
|
|||||||
enablePreScheduleRatio float64
|
enablePreScheduleRatio float64
|
||||||
|
|
||||||
UsingGPU int
|
UsingGPU int
|
||||||
|
UsingGPUMu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type FairJobSorter []Job
|
type FairJobSorter []Job
|
||||||
@@ -102,7 +103,7 @@ func (scheduler *SchedulerFair) Start() {
|
|||||||
jm.start()
|
jm.start()
|
||||||
}()
|
}()
|
||||||
} else {
|
} else {
|
||||||
log.Debug("No more jobs to scheduling", time.Now())
|
log.Debug("No more jobs to scheduling ", time.Now())
|
||||||
scheduler.schedulingMu.Lock()
|
scheduler.schedulingMu.Lock()
|
||||||
scheduler.schedulingJobsCnt--
|
scheduler.schedulingJobsCnt--
|
||||||
scheduler.schedulingMu.Unlock()
|
scheduler.schedulingMu.Unlock()
|
||||||
@@ -193,7 +194,7 @@ func (scheduler *SchedulerFair) Schedule(job Job) {
|
|||||||
job.Status = Created
|
job.Status = Created
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus {
|
||||||
poolID := rand.Intn(pool.poolsCount)
|
poolID := rand.Intn(pool.poolsCount)
|
||||||
res := NodeStatus{}
|
res := NodeStatus{}
|
||||||
|
|
||||||
@@ -202,7 +203,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
allocationType := 0
|
allocationType := 0
|
||||||
availableGPUs := map[string][]GPUStatus{}
|
availableGPUs := map[string][]GPUStatus{}
|
||||||
|
|
||||||
var candidates []NodeStatus
|
var candidates []*NodeStatus
|
||||||
|
|
||||||
/* first, choose sharable GPUs */
|
/* first, choose sharable GPUs */
|
||||||
if scheduler.enableShare && (pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enableShareRatio) {
|
if scheduler.enableShare && (pool.TotalGPU != 0 && float64(scheduler.UsingGPU)/float64(pool.TotalGPU) >= scheduler.enableShareRatio) {
|
||||||
@@ -236,7 +237,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(available) >= task.NumberGPU {
|
if len(available) >= task.NumberGPU {
|
||||||
candidates = append(candidates, node)
|
candidates = append(candidates, &node)
|
||||||
if len(candidates) >= 8 {
|
if len(candidates) >= 8 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -266,7 +267,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(available) >= task.NumberGPU {
|
if len(available) >= task.NumberGPU {
|
||||||
candidates = append(candidates, node)
|
candidates = append(candidates, &node)
|
||||||
availableGPUs[node.ClientID] = available
|
availableGPUs[node.ClientID] = available
|
||||||
if len(candidates) >= 8 {
|
if len(candidates) >= 8 {
|
||||||
break
|
break
|
||||||
@@ -316,7 +317,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(available) >= task.NumberGPU {
|
if len(available) >= task.NumberGPU {
|
||||||
candidates = append(candidates, node)
|
candidates = append(candidates, &node)
|
||||||
availableGPUs[node.ClientID] = available
|
availableGPUs[node.ClientID] = available
|
||||||
if len(candidates) >= 8 {
|
if len(candidates) >= 8 {
|
||||||
break
|
break
|
||||||
@@ -338,7 +339,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
|
|
||||||
/* assign */
|
/* assign */
|
||||||
if len(candidates) > 0 {
|
if len(candidates) > 0 {
|
||||||
node := candidates[0]
|
node := pool.pickNode(candidates)
|
||||||
res.ClientID = node.ClientID
|
res.ClientID = node.ClientID
|
||||||
res.ClientHost = node.ClientHost
|
res.ClientHost = node.ClientHost
|
||||||
res.Status = availableGPUs[node.ClientID][0:task.NumberGPU]
|
res.Status = availableGPUs[node.ClientID][0:task.NumberGPU]
|
||||||
@@ -354,10 +355,16 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if allocationType == 2 {
|
if allocationType == 2 {
|
||||||
|
scheduler.UsingGPUMu.Lock()
|
||||||
scheduler.UsingGPU += task.NumberGPU
|
scheduler.UsingGPU += task.NumberGPU
|
||||||
|
scheduler.UsingGPUMu.Unlock()
|
||||||
|
log.Info(res.Status, " is using")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
for i := range locks {
|
for i := range locks {
|
||||||
pool.poolsMu[i].Unlock()
|
pool.poolsMu[i].Unlock()
|
||||||
}
|
}
|
||||||
@@ -399,7 +406,10 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
|
|||||||
node.Status[j].MemoryAllocated = 0
|
node.Status[j].MemoryAllocated = 0
|
||||||
}
|
}
|
||||||
if node.Status[j].MemoryAllocated == 0 {
|
if node.Status[j].MemoryAllocated == 0 {
|
||||||
|
scheduler.UsingGPUMu.Lock()
|
||||||
scheduler.UsingGPU--
|
scheduler.UsingGPU--
|
||||||
|
scheduler.UsingGPUMu.Unlock()
|
||||||
|
log.Info(node.Status[j].UUID, " is released")
|
||||||
}
|
}
|
||||||
log.Info(node.Status[j].MemoryAllocated)
|
log.Info(node.Status[j].MemoryAllocated)
|
||||||
}
|
}
|
||||||
@@ -592,30 +602,30 @@ func (scheduler *SchedulerFair) Detach(GPU string, job string) {
|
|||||||
|
|
||||||
func (scheduler *SchedulerFair) Enable() bool {
|
func (scheduler *SchedulerFair) Enable() bool {
|
||||||
scheduler.enabled = true
|
scheduler.enabled = true
|
||||||
log.Info("scheduler is enabled", time.Now())
|
log.Info("scheduler is enabled ", time.Now())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) Disable() bool {
|
func (scheduler *SchedulerFair) Disable() bool {
|
||||||
scheduler.enabled = false
|
scheduler.enabled = false
|
||||||
log.Info("scheduler is disabled", time.Now())
|
log.Info("scheduler is disabled ", time.Now())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool {
|
func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool {
|
||||||
scheduler.parallelism = parallelism
|
scheduler.parallelism = parallelism
|
||||||
log.Info("parallelism is updated to", parallelism)
|
log.Info("parallelism is updated to ", parallelism)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) SetShareRatio(ratio float64) bool {
|
func (scheduler *SchedulerFair) SetShareRatio(ratio float64) bool {
|
||||||
scheduler.enableShareRatio = ratio
|
scheduler.enableShareRatio = ratio
|
||||||
log.Info("enableShareRatio is updated to", ratio)
|
log.Info("enableShareRatio is updated to ", ratio)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool {
|
func (scheduler *SchedulerFair) SetPreScheduleRatio(ratio float64) bool {
|
||||||
scheduler.enablePreScheduleRatio = ratio
|
scheduler.enablePreScheduleRatio = ratio
|
||||||
log.Info("enablePreScheduleRatio is updated to", ratio)
|
log.Info("enablePreScheduleRatio is updated to ", ratio)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,7 +111,7 @@ func (scheduler *SchedulerPriority) Schedule(job Job) {
|
|||||||
job.Status = Created
|
job.Status = Created
|
||||||
}
|
}
|
||||||
|
|
||||||
func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStatus {
|
func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task, nodes []NodeStatus) NodeStatus {
|
||||||
poolID := rand.Intn(pool.poolsCount)
|
poolID := rand.Intn(pool.poolsCount)
|
||||||
pool.poolsMu[poolID].Lock()
|
pool.poolsMu[poolID].Lock()
|
||||||
defer pool.poolsMu[poolID].Unlock()
|
defer pool.poolsMu[poolID].Unlock()
|
||||||
@@ -296,4 +296,4 @@ func (scheduler *SchedulerPriority) SetPreScheduleRatio(ratio float64) bool {
|
|||||||
//scheduler.enablePreScheduleRatio = ratio
|
//scheduler.enablePreScheduleRatio = ratio
|
||||||
log.Info("enablePreScheduleRatio is updated to", ratio)
|
log.Info("enablePreScheduleRatio is updated to", ratio)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user