diff --git a/src/evaluator.go b/src/evaluator.go index 065229c..4996188 100644 --- a/src/evaluator.go +++ b/src/evaluator.go @@ -5,17 +5,15 @@ type Evaluator struct { racks map[string]map[string]int nodes map[string]map[string]int upstreams map[string]string - cost float64 totalPS int totalWorker int costNetwork float64 + costLoad float64 factorNode float64 factorRack float64 factorDomain float64 - - costLoad float64 } func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) { @@ -28,7 +26,6 @@ func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) { eva.factorNode = 1.0 eva.factorRack = 4.0 eva.factorDomain = 40.0 - eva.cost = 0.0 eva.costNetwork = 0.0 eva.costLoad = 0.0 } @@ -65,7 +62,6 @@ func (eva *Evaluator) add(node NodeStatus, task Task) { eva.domains[node.Domain]["Worker"]++ eva.totalWorker++ } - eva.cost = eva.costNetwork if task.IsPS { //eva.costLoad += 1 @@ -104,7 +100,6 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) { eva.domains[node.Domain]["Worker"]-- eva.totalWorker-- } - eva.cost = eva.costNetwork if task.IsPS { //eva.costLoad -= 1 @@ -121,7 +116,10 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) { } func (eva *Evaluator) calculate() float64 { - return eva.cost + eva.costLoad/float64(eva.totalPS+eva.totalWorker) + /* factor to determine spread or pack */ + /* 1.0 spread, -1.0 pack */ + factor := -1.0 + return eva.costNetwork + factor*eva.costLoad/float64(eva.totalPS+eva.totalWorker) } func evaluate(allocation Allocation) float64 { diff --git a/src/resource_pool.go b/src/resource_pool.go index 30afec0..9da08fd 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -709,7 +709,8 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { for _, node := range cur.Nodes { var available []GPUStatus for _, status := range node.Status { - if status.MemoryAllocated == 0 && status.MemoryUsed < 10 { + /* make sure GPU is not used by in-system and outer-system */ + if status.MemoryAllocated == 0 && status.MemoryUsed < 100 { available = append(available, status) } } @@ -720,7 +721,6 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { } } } - log.Info(candidates, cur) if len(candidates) >= len(job.Tasks)*3+5 { break } @@ -795,7 +795,33 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { nodesT = append(nodesT, node.Copy()) } - allocation := fastBestFit(nodesT, job.Tasks) + tasks := make([]Task, len(job.Tasks)) + var tasksPS []Task + var tasksWorker []Task + for _, taskT := range job.Tasks { + if taskT.IsPS { + tasksPS = append(tasksPS, taskT) + } else { + tasksWorker = append(tasksWorker, taskT) + } + } + idxPS := 0 + idxWorker := 0 + factor := float64(len(tasksWorker)) / (float64(len(tasksPS)) + 0.001) + for i := range tasks { + if float64(idxPS)*factor <= float64(idxWorker) && idxPS < len(tasksPS) { + tasks[i] = tasksPS[idxPS] + idxPS++ + } else if idxWorker < len(tasksWorker) { + tasks[i] = tasksWorker[idxWorker] + idxWorker++ + } else { + tasks[i] = tasksPS[idxPS] + idxPS++ + } + } + + allocation := fastBestFit(nodesT, tasks) if allocation.Flags["valid"] { for range job.Tasks { //append would cause uncertain order