1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-05-26 20:46:11 +08:00
parent f7149310e8
commit ec30e79c81
5 changed files with 298 additions and 218 deletions

View File

@@ -14,6 +14,8 @@ type Evaluator struct {
factorNode float64 factorNode float64
factorRack float64 factorRack float64
factorDomain float64 factorDomain float64
costLoad float64
} }
func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) { func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) {
@@ -28,6 +30,7 @@ func (eva *Evaluator) init(nodes []NodeStatus, tasks []Task) {
eva.factorDomain = 40.0 eva.factorDomain = 40.0
eva.cost = 0.0 eva.cost = 0.0
eva.costNetwork = 0.0 eva.costNetwork = 0.0
eva.costLoad = 0.0
} }
func (eva *Evaluator) add(node NodeStatus, task Task) { func (eva *Evaluator) add(node NodeStatus, task Task) {
@@ -63,6 +66,20 @@ func (eva *Evaluator) add(node NodeStatus, task Task) {
eva.totalWorker++ eva.totalWorker++
} }
eva.cost = eva.costNetwork eva.cost = eva.costNetwork
if task.IsPS {
//eva.costLoad += 1
} else {
//eva.costLoad += 0.5
}
numberGPU := 1
for _, gpu := range node.Status {
if gpu.MemoryAllocated != 0 {
numberGPU += 1
}
}
eva.costLoad += float64(numberGPU) / float64(len(node.Status))
} }
func (eva *Evaluator) remove(node NodeStatus, task Task) { func (eva *Evaluator) remove(node NodeStatus, task Task) {
@@ -88,10 +105,23 @@ func (eva *Evaluator) remove(node NodeStatus, task Task) {
eva.totalWorker-- eva.totalWorker--
} }
eva.cost = eva.costNetwork eva.cost = eva.costNetwork
if task.IsPS {
//eva.costLoad -= 1
} else {
//eva.costLoad -= 0.5
}
numberGPU := 1
for _, gpu := range node.Status {
if gpu.MemoryAllocated != 0 {
numberGPU += 1
}
}
eva.costLoad -= float64(numberGPU) / float64(len(node.Status))
} }
func (eva *Evaluator) calculate() float64 { func (eva *Evaluator) calculate() float64 {
return eva.cost return eva.cost + eva.costLoad/float64(eva.totalPS+eva.totalWorker)
} }
func evaluate(allocation Allocation) float64 { func evaluate(allocation Allocation) float64 {
@@ -189,6 +219,7 @@ func evaluate(allocation Allocation) float64 {
costLB *= 100 costLB *= 100
//fmt.Println(costLB) //fmt.Println(costLB)
cost := 0.0*costLB + 1.0*costNetwork cost := costNetwork
//cost := 0.0*costLB + 1.0*costNetwork
return cost return cost
} }

199
src/ga.go
View File

@@ -3,14 +3,10 @@ package main
import ( import (
"math/rand" "math/rand"
"github.com/MaxHalford/eaopt" "github.com/MaxHalford/eaopt"
"time" "math"
"math"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var nodesMap map[string]NodeStatus
var tasksMap map[string]Task
// A resource allocation // A resource allocation
type Allocation struct { type Allocation struct {
TasksOnNode map[string][]Task // tasks on nodes[id] TasksOnNode map[string][]Task // tasks on nodes[id]
@@ -18,6 +14,7 @@ type Allocation struct {
NodeIDs []string NodeIDs []string
Flags map[string]bool Flags map[string]bool
Evaluator Evaluator Evaluator Evaluator
Tasks []Task
} }
func randomFit(allocation Allocation, task Task) (string, bool) { func randomFit(allocation Allocation, task Task) (string, bool) {
@@ -27,10 +24,10 @@ func randomFit(allocation Allocation, task Task) (string, bool) {
numberGPU := 0 numberGPU := 0
for _, gpu := range allocation.Nodes[nodeID].Status { for _, gpu := range allocation.Nodes[nodeID].Status {
if gpu.MemoryAllocated == 0 { if gpu.MemoryAllocated == 0 {
numberGPU += 0 numberGPU += 1
} }
} }
if _, ok := allocation.Nodes[nodeID]; ok && len(allocation.TasksOnNode[nodeID]) < numberGPU { if task.NumberGPU <= numberGPU {
flag = true flag = true
break break
} }
@@ -42,13 +39,16 @@ func firstFit(allocation Allocation, task Task) (string, bool) {
flag := false flag := false
nodeID := "" nodeID := ""
for _, nodeID = range allocation.NodeIDs { for _, nodeID = range allocation.NodeIDs {
if _, ok := allocation.Nodes[nodeID]; !ok {
continue
}
numberGPU := 0 numberGPU := 0
for _, gpu := range allocation.Nodes[nodeID].Status { for _, gpu := range allocation.Nodes[nodeID].Status {
if gpu.MemoryAllocated == 0 { if gpu.MemoryAllocated == 0 {
numberGPU += 0 numberGPU += 1
} }
} }
if _, ok := allocation.Nodes[nodeID]; ok && len(allocation.TasksOnNode[nodeID]) < numberGPU { if task.NumberGPU <= numberGPU {
flag = true flag = true
break break
} }
@@ -57,6 +57,8 @@ func firstFit(allocation Allocation, task Task) (string, bool) {
} }
func fastBestFit(nodes []NodeStatus, tasks []Task) Allocation { func fastBestFit(nodes []NodeStatus, tasks []Task) Allocation {
//log.Info(nodes)
//log.Info(tasks)
eva := Evaluator{} eva := Evaluator{}
eva.init(nodes, tasks) eva.init(nodes, tasks)
@@ -64,90 +66,76 @@ func fastBestFit(nodes []NodeStatus, tasks []Task) Allocation {
allocation.TasksOnNode = map[string][]Task{} allocation.TasksOnNode = map[string][]Task{}
for _, task := range tasks { for _, task := range tasks {
minCost := math.MaxFloat64 minCost := math.MaxFloat64
nodeID := "" var best *NodeStatus
for _, node := range nodes { for i, node := range nodes {
if _, ok := allocation.TasksOnNode[node.ClientID]; !ok { if _, ok := allocation.TasksOnNode[node.ClientID]; !ok {
allocation.TasksOnNode[node.ClientID] = []Task{} allocation.TasksOnNode[node.ClientID] = []Task{}
} }
numberGPU := 0 numberGPU := 0
for _, gpu := range allocation.Nodes[nodeID].Status { for _, gpu := range node.Status {
if gpu.MemoryAllocated == 0 { if gpu.MemoryAllocated == 0 {
numberGPU += 0 numberGPU += 1
} }
} }
if len(allocation.TasksOnNode[node.ClientID]) >= numberGPU { if task.NumberGPU > numberGPU {
continue continue
} }
eva.add(node, task) eva.add(node, task)
cost := eva.calculate() cost := eva.calculate()
eva.remove(node, task) eva.remove(node, task)
if cost < minCost || nodeID == "" { //log.Info(node, cost)
if cost < minCost || best == nil {
minCost = cost minCost = cost
nodeID = node.ClientID best = &nodes[i]
} }
//fmt.Println(cost)
} }
if nodeID == "" { log.Info(task, " choose ", best.ClientID)
if best == nil {
allocation.Flags["valid"] = false allocation.Flags["valid"] = false
break break
} else { } else {
//fmt.Println(task, nodeID, allocation.TasksOnNode, minCost) //fmt.Println(task, nodeID, allocation.TasksOnNode, minCost)
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task) allocation.TasksOnNode[best.ClientID] = append(allocation.TasksOnNode[best.ClientID], task)
eva.add(nodesMap[nodeID], task) eva.add(*best, task)
} for i := range best.Status {
} //allocate more than 1
log.Println(eva.calculate()) if best.Status[i].MemoryAllocated == 0 {
return allocation best.Status[i].MemoryAllocated += task.MemoryGPU
} break
func bestFit(allocation Allocation, task Task) (string, bool) {
flag := false
nodeID := ""
minCost := math.MaxFloat64
for _, id := range allocation.NodeIDs {
numberGPU := 0
for _, gpu := range allocation.Nodes[id].Status {
if gpu.MemoryAllocated == 0 {
numberGPU += 0
}
}
if _, ok := allocation.Nodes[id]; ok && len(allocation.TasksOnNode[id]) < numberGPU {
/* add */
allocation.TasksOnNode[id] = append(allocation.TasksOnNode[id], task)
/* evaluate */
cost := evaluate(allocation)
/* revert */
idx := -1
for i, task2 := range allocation.TasksOnNode[id] {
if task2.Name == task.Name {
idx = i
} }
} }
copy(allocation.TasksOnNode[id][idx:], allocation.TasksOnNode[id][idx+1:])
allocation.TasksOnNode[id] = allocation.TasksOnNode[id][:len(allocation.TasksOnNode[id])-1]
if cost < minCost || !flag {
nodeID = id
minCost = cost
}
flag = true
} }
} }
return nodeID, flag //log.Info(allocation.TasksOnNode)
log.Println("BestFit Cost:", eva.calculate())
return allocation
} }
/* Evaluate the allocation */ /* Evaluate the allocation */
func (X Allocation) Evaluate() (float64, error) { func (X Allocation) Evaluate() (float64, error) {
//log.Info(X)
if !X.Flags["valid"] { if !X.Flags["valid"] {
//fmt.Println("Invalid allocation") //fmt.Println("Invalid allocation")
return math.MaxFloat64, nil return math.MaxFloat64, nil
} }
costNetwork := evaluate(X) //costNetwork := evaluate(X)
cost := costNetwork var nodes []NodeStatus
for _, node := range X.Nodes {
nodes = append(nodes, node)
}
eva := Evaluator{}
eva.init(nodes, X.Tasks)
for node, tasks := range X.TasksOnNode {
for _, task := range tasks {
eva.add(X.Nodes[node], task)
}
}
cost := eva.calculate()
//log.Info(cost)
//fmt.Println(taskToNode, cost, len(X.Nodes)) //fmt.Println(taskToNode, cost, len(X.Nodes))
return float64(cost), nil return float64(cost), nil
} }
@@ -189,6 +177,12 @@ func (X Allocation) Mutate(rng *rand.Rand) {
for _, task := range tasks { for _, task := range tasks {
if nodeID, ok := firstFit(X, task); ok { if nodeID, ok := firstFit(X, task); ok {
X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task) X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task)
for i := range X.Nodes[nodeID].Status {
if X.Nodes[nodeID].Status[i].MemoryAllocated == 0 {
X.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU
break
}
}
} else { } else {
X.Flags["valid"] = false X.Flags["valid"] = false
} }
@@ -196,6 +190,7 @@ func (X Allocation) Mutate(rng *rand.Rand) {
} }
//fmt.Println("After", X) //fmt.Println("After", X)
return
/* move tasks */ /* move tasks */
if !X.Flags["valid"] { if !X.Flags["valid"] {
//fmt.Println("Invalid allocation") //fmt.Println("Invalid allocation")
@@ -230,7 +225,6 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) {
if !Y.(Allocation).Flags["valid"] || !X.Flags["valid"] { if !Y.(Allocation).Flags["valid"] || !X.Flags["valid"] {
return return
} }
//fmt.Println("Crossover")
taskToNode := map[string]string{} taskToNode := map[string]string{}
for nodeID, tasks := range X.TasksOnNode { for nodeID, tasks := range X.TasksOnNode {
for _, task := range tasks { for _, task := range tasks {
@@ -265,6 +259,12 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) {
//fmt.Println(X.TasksOnNode) //fmt.Println(X.TasksOnNode)
copy(X.TasksOnNode[nodeID2][idx:], X.TasksOnNode[nodeID2][idx+1:]) copy(X.TasksOnNode[nodeID2][idx:], X.TasksOnNode[nodeID2][idx+1:])
X.TasksOnNode[nodeID2] = X.TasksOnNode[nodeID2][:len(X.TasksOnNode[nodeID2])-1] X.TasksOnNode[nodeID2] = X.TasksOnNode[nodeID2][:len(X.TasksOnNode[nodeID2])-1]
for i := range X.Nodes[nodeID].Status {
if X.Nodes[nodeID].Status[i].MemoryAllocated == 0 {
X.Nodes[nodeID].Status[i].MemoryAllocated -= task.MemoryGPU
break
}
}
//fmt.Println(X.TasksOnNode) //fmt.Println(X.TasksOnNode)
} }
/* reschedule tasks on tgt node */ /* reschedule tasks on tgt node */
@@ -291,6 +291,12 @@ func (X Allocation) Crossover(Y eaopt.Genome, rng *rand.Rand) {
for _, task := range tasks { for _, task := range tasks {
if nodeID, ok := firstFit(X, task); ok { if nodeID, ok := firstFit(X, task); ok {
X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task) X.TasksOnNode[nodeID] = append(X.TasksOnNode[nodeID], task)
for i := range X.Nodes[nodeID].Status {
if X.Nodes[nodeID].Status[i].MemoryAllocated == 0 {
X.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU
break
}
}
} else { } else {
X.Flags["valid"] = false X.Flags["valid"] = false
} }
@@ -319,74 +325,3 @@ func (X Allocation) Clone() eaopt.Genome {
} }
return Y return Y
} }
func VectorFactory(rng *rand.Rand) eaopt.Genome {
allocation := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": true}}
var nodes []NodeStatus
var tasks []Task
for _, node := range nodesMap {
nodes = append(nodes, node)
}
for _, task := range tasksMap {
tasks = append(tasks, task)
}
/* shuffle */
for n := len(nodes); n > 0; n-- {
randIndex := rng.Intn(n)
nodes[n-1], nodes[randIndex] = nodes[randIndex], nodes[n-1]
}
for n := len(tasks); n > 0; n-- {
randIndex := rng.Intn(n)
tasks[n-1], tasks[randIndex] = tasks[randIndex], tasks[n-1]
}
/* pick nodes */
for _, node := range nodesMap {
allocation.Nodes[node.ClientID] = node
allocation.NodeIDs = append(allocation.NodeIDs, node.ClientID)
}
t := rng.Int() % 10
if t == -1 {
/* best-fit */
ts := time.Now()
/*
for _, task := range tasks {
if nodeID, ok := bestFit(allocation, task); ok {
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task)
} else {
allocation.Flags["valid"] = false
}
}
*/
allocation.TasksOnNode = fastBestFit(nodes, tasks).TasksOnNode
log.Println(time.Since(ts))
//fmt.Println("Best Fit")
} else if t%2 == 0 {
/* first-fit */
for _, task := range tasks {
if nodeID, ok := randomFit(allocation, task); ok {
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task)
} else {
allocation.Flags["valid"] = false
}
}
} else {
/* random-fit */
for _, task := range tasks {
if nodeID, ok := randomFit(allocation, task); ok {
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task)
} else {
allocation.Flags["valid"] = false
}
}
}
//fmt.Println(evaluatue(allocation))
//fmt.Println(allocation)
return allocation
}

View File

@@ -4,53 +4,61 @@ import (
"strconv" "strconv"
"math/rand" "math/rand"
"time" "time"
"log" log "github.com/sirupsen/logrus"
"github.com/MaxHalford/eaopt" "github.com/MaxHalford/eaopt"
"math" "math"
"testing" "testing"
) )
func TestGA(t *testing.T) { func TgenerateCase() ([]NodeStatus, []Task) {
numTask := 20 numTask := 6
nodesMap = map[string]NodeStatus{}
tasksMap = map[string]Task{}
for i := 0; i < numTask*3; i++ {
node := NodeStatus{ClientID: strconv.Itoa(i), Rack: strconv.Itoa(i % 40), Domain: strconv.Itoa(i % 4)}
node.NumCPU = 24
node.MemTotal = 188
node.TotalBW = 100
cnt := rand.Intn(3) + 1
for i := 0; i < cnt; i++ {
node.Status = append(node.Status, GPUStatus{MemoryTotal: 11439, MemoryAllocated: 0, UUID: node.ClientID + strconv.Itoa(i)})
}
nodesMap[strconv.Itoa(i)] = node
}
for i := 0; i < numTask; i++ {
isPS := false
if i >= 3 {
isPS = true
}
task := Task{Name: strconv.Itoa(i), IsPS: isPS}
task.Memory = 4
task.NumberCPU = 2
task.NumberGPU = 1
tasksMap[strconv.Itoa(i)] = task
}
var nodes []NodeStatus var nodes []NodeStatus
var tasks []Task var tasks []Task
for _, node := range nodesMap { for i := 0; i < numTask*3; i++ {
node := NodeStatus{ClientID: strconv.Itoa(i), Rack: "Rack-" + strconv.Itoa(i%40), Domain: "Domain-" + strconv.Itoa(i%4)}
node.NumCPU = 24
node.UtilCPU = 2.0
node.MemTotal = 188
node.MemAvailable = 20
node.TotalBW = 100
//cnt := 4
cnt := rand.Intn(3) + 1
for i := 0; i < cnt; i++ {
node.Status = append(node.Status, GPUStatus{MemoryTotal: 11439, MemoryAllocated: 0, UUID: node.ClientID + "-" + strconv.Itoa(i)})
}
nodes = append(nodes, node) nodes = append(nodes, node)
} }
for _, task := range tasksMap { for i := 0; i < numTask; i++ {
isPS := false
if i%4 == 0 {
isPS = true
}
task := Task{Name: "task-" + strconv.Itoa(i), IsPS: isPS}
task.Memory = 4
task.NumberCPU = 2
task.NumberGPU = 1
task.MemoryGPU = 4096
tasks = append(tasks, task) tasks = append(tasks, task)
} }
return nodes, tasks
}
func TestBestFit(t *testing.T) {
nodes, tasks := TgenerateCase()
for _, node := range nodes {
log.Info(node)
}
s := time.Now() s := time.Now()
allocation := fastBestFit(nodes, tasks) allocation := fastBestFit(nodes, tasks)
log.Println(time.Since(s)) log.Println(time.Since(s))
log.Println(allocation)
}
func TestGA(t *testing.T) {
return
nodes, tasks := TgenerateCase()
// Instantiate a GA with a GAConfig // Instantiate a GA with a GAConfig
var ga, err = eaopt.NewDefaultGAConfig().NewGA() var ga, err = eaopt.NewDefaultGAConfig().NewGA()
@@ -62,7 +70,7 @@ func TestGA(t *testing.T) {
// Set the number of generations to run for // Set the number of generations to run for
ga.NGenerations = math.MaxInt32 ga.NGenerations = math.MaxInt32
ga.NPops = 1 ga.NPops = 1
ga.PopSize = 30 + uint(numTask/2) ga.PopSize = 30 + uint(len(tasks)/2)
// Add a custom print function to track progress // Add a custom print function to track progress
ga.Callback = func(ga *eaopt.GA) { ga.Callback = func(ga *eaopt.GA) {
@@ -90,15 +98,92 @@ func TestGA(t *testing.T) {
return false return false
} }
var f = func(rng *rand.Rand) eaopt.Genome {
allocation := Allocation{TasksOnNode: map[string][]Task{}, Nodes: map[string]NodeStatus{}, Flags: map[string]bool{"valid": true}}
//log.Println(nodes)
var nodesT []NodeStatus
for _, node := range nodes {
nodesT = append(nodesT, node.Copy())
}
//nodesT[0].Status[0].MemoryAllocated = 100
//log.Println(nodes[0].Status[0].MemoryAllocated)
//log.Println(&nodesT[0])
//log.Println(&nodes[0])
for _, node := range nodesT {
allocation.Nodes[node.ClientID] = node
}
for _, task := range tasks {
allocation.Tasks = append(allocation.Tasks, task)
}
/* shuffle */
for n := len(tasks); n > 0; n-- {
randIndex := rng.Intn(n)
allocation.Tasks[n-1], allocation.Tasks[randIndex] = allocation.Tasks[randIndex], allocation.Tasks[n-1]
}
/* pick nodes */
for _, node := range nodesT {
allocation.Nodes[node.ClientID] = node
allocation.NodeIDs = append(allocation.NodeIDs, node.ClientID)
}
t := rng.Int() % 10
if t == 0 {
/* best-fit */
ts := time.Now()
allocation.TasksOnNode = fastBestFit(nodesT, tasks).TasksOnNode
log.Println(time.Since(ts))
//fmt.Println("Best Fit")
} else if t%2 == 0 {
/* first-fit */
for _, task := range tasks {
if nodeID, ok := randomFit(allocation, task); ok {
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task)
for i := range allocation.Nodes[nodeID].Status {
if allocation.Nodes[nodeID].Status[i].MemoryAllocated == 0 {
allocation.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU
break
}
}
} else {
allocation.Flags["valid"] = false
}
}
} else {
/* random-fit */
for _, task := range tasks {
if nodeID, ok := randomFit(allocation, task); ok {
allocation.TasksOnNode[nodeID] = append(allocation.TasksOnNode[nodeID], task)
for i := range allocation.Nodes[nodeID].Status {
if allocation.Nodes[nodeID].Status[i].MemoryAllocated == 0 {
allocation.Nodes[nodeID].Status[i].MemoryAllocated += task.MemoryGPU
break
}
}
} else {
allocation.Flags["valid"] = false
}
}
}
//fmt.Println(evaluatue(allocation))
//fmt.Println(allocation)
return allocation
}
// Find the minimum // Find the minimum
err = ga.Minimize(VectorFactory) err = ga.Minimize(f)
log.Println(time.Since(ts)) log.Println(time.Since(ts))
log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode) log.Println(ga.HallOfFame[0].Genome.(Allocation).TasksOnNode)
//fmt.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes) //log.Println(ga.HallOfFame[0].Genome.(Allocation).Flags)
//log.Println(ga.HallOfFame[0].Genome.(Allocation).Nodes)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
log.Println(allocation)
} }

View File

@@ -40,3 +40,10 @@ type NodeStatus struct {
TotalBW float64 `json:"bw_total"` TotalBW float64 `json:"bw_total"`
Status []GPUStatus `json:"status"` Status []GPUStatus `json:"status"`
} }
func (X *NodeStatus) Copy() NodeStatus {
res := *X
res.Status = make([]GPUStatus, len(X.Status))
copy(res.Status, X.Status)
return res
}

View File

@@ -158,14 +158,14 @@ func (pool *ResourcePool) checkDeadNodes() {
func (pool *ResourcePool) GPUModelToPower(model string) int { func (pool *ResourcePool) GPUModelToPower(model string) int {
mapper := map[string]int{ mapper := map[string]int{
"K40": 1, "Tesla K40": 1, "K40": 2, "Tesla K40": 2,
"K80": 2, "Tesla K80": 2, "K80": 3, "Tesla K80": 3,
"P100": 3, "Tesla P100": 3, "P100": 4, "Tesla P100": 4,
} }
if power, err := mapper[model]; !err { if power, err := mapper[model]; !err {
return power return power
} }
return 0 return 1
} }
func (pool *ResourcePool) getNodePool(name string) int { func (pool *ResourcePool) getNodePool(name string) int {
@@ -639,12 +639,16 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
locks := map[int]*sync.Mutex{} locks := map[int]*sync.Mutex{}
allocationType := 0 allocationType := 0
availableGPUs := map[string][]GPUStatus{}
var candidates []*NodeStatus var candidates []NodeStatus
if pool.TotalGPU == 0 {
return []NodeStatus{}
}
loadRatio := float64(pool.UsingGPU) / float64(pool.TotalGPU)
/* first, choose sharable GPUs */ /* first, choose sharable GPUs */
if pool.enableShare && (pool.TotalGPU != 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enableShareRatio) { if pool.enableShare && len(job.Tasks) == 1 && task.NumberGPU == 1 && loadRatio >= pool.enableShareRatio {
// check sharable // check sharable
allocationType = 1 allocationType = 1
if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid { if util, valid := InstanceOfOptimizer().predictUtilGPU(job.Name); valid {
@@ -671,13 +675,12 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
} }
if totalUtil < 100 { if totalUtil < 100 {
available = append(available, status) available = append(available, status)
availableGPUs[node.ClientID] = available
} }
} }
} }
} }
if len(available) >= task.NumberGPU { if len(available) >= task.NumberGPU {
candidates = append(candidates, node) candidates = append(candidates, *node)
if len(candidates) >= len(job.Tasks)*3+5 { if len(candidates) >= len(job.Tasks)*3+5 {
break break
} }
@@ -711,8 +714,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
} }
} }
if len(available) >= task.NumberGPU { if len(available) >= task.NumberGPU {
candidates = append(candidates, node) candidates = append(candidates, *node)
availableGPUs[node.ClientID] = available
if len(candidates) >= len(job.Tasks)*3+5 { if len(candidates) >= len(job.Tasks)*3+5 {
break break
} }
@@ -733,11 +735,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule { if len(candidates) == 0 && len(job.Tasks) == 1 && task.NumberGPU == 1 && pool.enablePreSchedule {
estimate, valid := InstanceOfOptimizer().predictTime(job.Name) estimate, valid := InstanceOfOptimizer().predictTime(job.Name)
//log.Info(pool.TotalGPU) if loadRatio >= pool.enablePreScheduleRatio && valid {
//log.Info(estimate, valid)
//log.Info(scheduler.UsingGPU)
if pool.TotalGPU != 0 && float64(pool.UsingGPU)/float64(pool.TotalGPU) >= pool.enablePreScheduleRatio && valid {
allocationType = 3 allocationType = 3
for cur := start; ; { for cur := start; ; {
if _, ok := locks[cur.ID]; !ok { if _, ok := locks[cur.ID]; !ok {
@@ -765,8 +763,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
} }
} }
if len(available) >= task.NumberGPU { if len(available) >= task.NumberGPU {
candidates = append(candidates, node) candidates = append(candidates, *node)
availableGPUs[node.ClientID] = available
if len(candidates) >= len(job.Tasks)*3+5 { if len(candidates) >= len(job.Tasks)*3+5 {
break break
} }
@@ -792,44 +789,69 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus {
/* assign */ /* assign */
var ress []NodeStatus var ress []NodeStatus
if len(candidates) > 0 { if len(candidates) > 0 {
/* for range job.Tasks { //append would cause uncertain order
for range job.Tasks { //append would cause uncertain order ress = append(ress, NodeStatus{ClientID: "null"})
resources = append(resources, NodeStatus{ClientID: "null"})
}
*/
var nodes []NodeStatus
if len(job.Tasks) == 1 {
node := pool.pickNode(candidates, availableGPUs, task, job, []NodeStatus{})
nodes = append(nodes, *node)
} }
for _, node := range nodes { var nodesT []NodeStatus
res := NodeStatus{} for _, node := range candidates {
res.ClientID = node.ClientID nodesT = append(nodesT, node.Copy())
res.ClientHost = node.ClientHost }
res.Status = availableGPUs[node.ClientID][0:task.NumberGPU]
res.NumCPU = task.NumberCPU
res.MemTotal = task.Memory
for i := range res.Status { allocation := fastBestFit(nodesT, job.Tasks)
for j := range node.Status { if !allocation.Flags["valid"] {
if res.Status[i].UUID == node.Status[j].UUID { return []NodeStatus{}
if node.Status[j].MemoryAllocated == 0 { }
pool.UsingGPUMu.Lock()
pool.UsingGPU ++ for nodeID, tasks := range allocation.TasksOnNode {
pool.UsingGPUMu.Unlock() var node *NodeStatus
for i := range candidates {
if candidates[i].ClientID == nodeID {
node = &candidates[i]
}
}
var available []GPUStatus
for _, gpu := range node.Status {
if gpu.MemoryAllocated == 0 {
available = append(available, gpu)
}
}
for _, task := range tasks {
res := NodeStatus{}
res.ClientID = node.ClientID
res.ClientHost = node.ClientHost
res.NumCPU = task.NumberCPU
res.MemTotal = task.Memory
res.Status = available[0:task.NumberGPU]
available = available[task.NumberGPU:]
for i := range res.Status {
for j := range node.Status {
if res.Status[i].UUID == node.Status[j].UUID {
if node.Status[j].MemoryAllocated == 0 {
pool.UsingGPUMu.Lock()
pool.UsingGPU ++
pool.UsingGPUMu.Unlock()
}
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
} }
node.Status[j].MemoryAllocated += task.MemoryGPU
res.Status[i].MemoryTotal = task.MemoryGPU
} }
} }
for _, t := range res.Status {
pool.attach(t.UUID, job.Name)
}
for i := range job.Tasks {
if job.Tasks[i].Name == task.Name {
ress[i] = res
}
}
} }
for _, t := range res.Status {
pool.attach(t.UUID, job.Name)
}
ress = append(ress, res)
} }
} }
for segID, lock := range locks { for segID, lock := range locks {