1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-06 22:01:55 +00:00
This commit is contained in:
Newnius 2020-06-08 20:49:50 +08:00
parent e8c6ea53e2
commit 6f01eef504
3 changed files with 122 additions and 47 deletions

View File

@ -1,12 +1,14 @@
package main
type Evaluator struct {
domains map[string]map[string]int
racks map[string]map[string]int
nodes map[string]map[string]int
upstreams map[string]string
totalPS int
totalWorker int
domains map[string]map[string]map[string]int
racks map[string]map[string]map[string]int
nodes map[string]map[string]map[string]int
//upstreams map[string]map[string]string
totalPSs map[string]int
totalWorkers map[string]int
totalPS int
totalWorker int
costNetwork float64
costLoad float64
@ -19,10 +21,12 @@ type Evaluator struct {
}
func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) {
eva.domains = map[string]map[string]int{}
eva.racks = map[string]map[string]int{}
eva.nodes = map[string]map[string]int{}
eva.upstreams = map[string]string{}
eva.domains = map[string]map[string]map[string]int{}
eva.racks = map[string]map[string]map[string]int{}
eva.nodes = map[string]map[string]map[string]int{}
//eva.upstreams = map[string]string{}
eva.totalPSs = map[string]int{}
eva.totalWorkers = map[string]int{}
eva.totalPS = 0
eva.totalWorker = 0
eva.factorNode = 1.0
@ -34,33 +38,42 @@ func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) {
}
func (eva *Evaluator) add(node NodeStatus, task Task) {
if _, ok := eva.nodes[task.Job]; !ok {
eva.nodes[task.Job] = map[string]map[string]int{}
eva.racks[task.Job] = map[string]map[string]int{}
eva.domains[task.Job] = map[string]map[string]int{}
eva.totalPSs = map[string]int{}
eva.totalWorkers = map[string]int{}
}
/* update network cost */
if _, ok := eva.nodes[node.ClientID]; !ok {
eva.nodes[node.ClientID] = map[string]int{"PS": 0, "Worker": 0}
if _, ok := eva.nodes[task.Job][node.ClientID]; !ok {
eva.nodes[task.Job][node.ClientID] = map[string]int{"PS": 0, "Worker": 0}
}
if _, ok := eva.racks[node.Rack]; !ok {
eva.racks[node.Rack] = map[string]int{"PS": 0, "Worker": 0}
if _, ok := eva.racks[task.Job][node.Rack]; !ok {
eva.racks[task.Job][node.Rack] = map[string]int{"PS": 0, "Worker": 0}
}
if _, ok := eva.domains[node.Domain]; !ok {
eva.domains[node.Domain] = map[string]int{"PS": 0, "Worker": 0}
if _, ok := eva.domains[task.Job][node.Domain]; !ok {
eva.domains[task.Job][node.Domain] = map[string]int{"PS": 0, "Worker": 0}
}
if task.IsPS {
eva.costNetwork += eva.factorNode * float64(eva.racks[node.Rack]["Worker"]-eva.nodes[node.ClientID]["Worker"])
eva.costNetwork += eva.factorRack * float64(eva.domains[node.Domain]["Worker"]-eva.racks[node.Rack]["Worker"])
eva.costNetwork += eva.factorDomain * float64(eva.totalWorker-eva.domains[node.Domain]["Worker"])
eva.costNetwork += eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"])
eva.costNetwork += eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"])
eva.costNetwork += eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"])
eva.nodes[node.ClientID]["PS"]++
eva.racks[node.Rack]["PS"]++
eva.domains[node.Domain]["PS"]++
eva.nodes[task.Job][node.ClientID]["PS"]++
eva.racks[task.Job][node.Rack]["PS"]++
eva.domains[task.Job][node.Domain]["PS"]++
eva.totalPSs[task.Job]++
eva.totalPS++
} else {
eva.costNetwork += eva.factorNode * float64(eva.racks[node.Rack]["PS"]-eva.nodes[node.ClientID]["PS"])
eva.costNetwork += eva.factorRack * float64(eva.domains[node.Domain]["PS"]-eva.racks[node.Rack]["PS"])
eva.costNetwork += eva.factorDomain * float64(eva.totalPS-eva.domains[node.Domain]["PS"])
eva.costNetwork += eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"])
eva.costNetwork += eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"])
eva.costNetwork += eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"])
eva.nodes[node.ClientID]["Worker"]++
eva.racks[node.Rack]["Worker"]++
eva.domains[node.Domain]["Worker"]++
eva.nodes[task.Job][node.ClientID]["Worker"]++
eva.racks[task.Job][node.Rack]["Worker"]++
eva.domains[task.Job][node.Domain]["Worker"]++
eva.totalWorkers[task.Job]++
eva.totalWorker++
}
@ -78,22 +91,24 @@ func (eva *Evaluator) add(node NodeStatus, task Task) {
func (eva *Evaluator) remove(node NodeStatus, task Task) {
/* update network cost */
if task.IsPS {
eva.costNetwork -= eva.factorNode * float64(eva.racks[node.Rack]["Worker"]-eva.nodes[node.ClientID]["Worker"])
eva.costNetwork -= eva.factorRack * float64(eva.domains[node.Domain]["Worker"]-eva.racks[node.Rack]["Worker"])
eva.costNetwork -= eva.factorDomain * float64(eva.totalWorker-eva.domains[node.Domain]["Worker"])
eva.costNetwork -= eva.factorNode * float64(eva.racks[task.Job][node.Rack]["Worker"]-eva.nodes[task.Job][node.ClientID]["Worker"])
eva.costNetwork -= eva.factorRack * float64(eva.domains[task.Job][node.Domain]["Worker"]-eva.racks[task.Job][node.Rack]["Worker"])
eva.costNetwork -= eva.factorDomain * float64(eva.totalWorkers[task.Job]-eva.domains[task.Job][node.Domain]["Worker"])
eva.nodes[node.ClientID]["PS"]--
eva.racks[node.Rack]["PS"]--
eva.domains[node.Domain]["PS"]--
eva.nodes[task.Job][node.ClientID]["PS"]--
eva.racks[task.Job][node.Rack]["PS"]--
eva.domains[task.Job][node.Domain]["PS"]--
eva.totalPSs[task.Job]--
eva.totalPS--
} else {
eva.costNetwork -= eva.factorNode * float64(eva.racks[node.Rack]["PS"]-eva.nodes[node.ClientID]["PS"])
eva.costNetwork -= eva.factorRack * float64(eva.domains[node.Domain]["PS"]-eva.racks[node.Rack]["PS"])
eva.costNetwork -= eva.factorDomain * float64(eva.totalPS-eva.domains[node.Domain]["PS"])
eva.costNetwork -= eva.factorNode * float64(eva.racks[task.Job][node.Rack]["PS"]-eva.nodes[task.Job][node.ClientID]["PS"])
eva.costNetwork -= eva.factorRack * float64(eva.domains[task.Job][node.Domain]["PS"]-eva.racks[task.Job][node.Rack]["PS"])
eva.costNetwork -= eva.factorDomain * float64(eva.totalPSs[task.Job]-eva.domains[task.Job][node.Domain]["PS"])
eva.nodes[node.ClientID]["Worker"]--
eva.racks[node.Rack]["Worker"]--
eva.domains[node.Domain]["Worker"]--
eva.nodes[task.Job][node.ClientID]["Worker"]--
eva.racks[task.Job][node.Rack]["Worker"]--
eva.domains[task.Job][node.Domain]["Worker"]--
eva.totalWorkers[task.Job]--
eva.totalWorker--
}
@ -109,13 +124,19 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) {
func (eva *Evaluator) calculate() float64 {
usingNodes := 0.0
for _, pair := range eva.nodes {
if v, ok := pair["PS"]; ok && v > 0 {
usingNodes += 1.0
} else if v, ok := pair["Worker"]; ok && v > 0 {
usingNodes += 1.0
for _, job := range eva.nodes {
for _, pair := range job {
if v, ok := pair["PS"]; ok && v > 0 {
usingNodes += 1.0
} else if v, ok := pair["Worker"]; ok && v > 0 {
usingNodes += 1.0
}
}
}
usingNodes /= float64(eva.totalWorker + eva.totalPS)
if eva.totalPS+eva.totalWorker == 0 {
usingNodes = 1.0
} else {
usingNodes /= float64(eva.totalWorker + eva.totalPS)
}
return eva.costNetwork + eva.factorSpread*eva.costLoad/float64(eva.totalPS+eva.totalWorker) + usingNodes
}

View File

@ -61,6 +61,11 @@ type ResourcePool struct {
enableShareRatio float64
enablePreSchedule bool
enablePreScheduleRatio float64
enableBatch bool
batchJobs []Job
batchMu sync.Mutex
batchAllocations map[string][]NodeStatus
}
func (pool *ResourcePool) init(conf Configuration) {
@ -83,6 +88,8 @@ func (pool *ResourcePool) init(conf Configuration) {
pool.enablePreSchedule = true
pool.enablePreScheduleRatio = 0.95
pool.enableBatch = false
/* init pools */
pool.poolsCount = 300
for i := 0; i < pool.poolsCount; i++ {
@ -115,6 +122,31 @@ func (pool *ResourcePool) init(conf Configuration) {
go func() {
pool.saveStatusHistory()
}()
go func() {
time.Sleep(time.Second * 10)
pool.batchMu.Lock()
var tasks []Task
for _, job := range pool.batchJobs {
for _, task := range job.Tasks {
task.Job = job.Name
tasks = append(tasks, task)
}
}
job := Job{Tasks: tasks}
nodes := pool.doAcquireResource(job)
for i, task := range job.Tasks {
if _, ok := pool.batchAllocations[task.Job]; !ok {
pool.batchAllocations[task.Job] = []NodeStatus{}
}
pool.batchAllocations[task.Job] = append(pool.batchAllocations[task.Job], nodes[i])
}
if len(nodes) > 0 {
pool.batchJobs = []Job{}
}
pool.batchMu.Unlock()
}()
}
/* check dead nodes periodically */
@ -634,6 +666,27 @@ func (pool *ResourcePool) pickNode(candidates []*NodeStatus, availableGPUs map[s
}
func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
if !pool.enableBatch {
for i := range job.Tasks {
job.Tasks[i].Job = job.Name
}
return pool.acquireResource(job)
}
for {
if _, ok := pool.batchAllocations[job.Name]; ok {
break
} else {
time.Sleep(time.Millisecond * 100)
}
}
pool.batchMu.Lock()
nodes := pool.batchAllocations[job.Name]
delete(pool.batchAllocations, job.Name)
pool.batchMu.Unlock()
return nodes
}
func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus {
if len(job.Tasks) == 0 {
return []NodeStatus{}
}
@ -876,7 +929,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
}
}
for _, t := range res.Status {
pool.attach(t.UUID, job.Name)
pool.attach(t.UUID, task.Job)
}
for i := range job.Tasks {

View File

@ -31,6 +31,7 @@ type Job struct {
type Task struct {
Name string `json:"name"`
Job string `json:"job_name"`
Image string `json:"image"`
Cmd string `json:"cmd"`
NumberCPU int `json:"cpu_number"`