diff --git a/src/optimizer.go b/src/optimizer.go index 8073c61..3a59c99 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -3,11 +3,14 @@ package main import ( log "github.com/sirupsen/logrus" "sync" + "strings" ) type Optimizer struct { scheduler Scheduler killedFlag bool + + predicts map[string]OptimizerJobExecutionTime } var optimizerInstance *Optimizer @@ -19,11 +22,42 @@ func InstanceOfOptimizer() *Optimizer { if optimizerInstance == nil { optimizerInstance = &Optimizer{} + optimizerInstance.predicts = map[string]OptimizerJobExecutionTime{} } return optimizerInstance } -func (jhl *Optimizer) feed(job string, utils []int) { +func (optimizer *Optimizer) feed(job string, utils []int) { log.Info("optimizer feed") log.Info(job, utils) + + go func() { + str := strings.Split(job, "-") + if len(str) == 2 { + preCnt := 0 + for i := 0; i < len(utils); i++ { + if utils[i] > 15 { + break + } + preCnt++ + } + + postCnt := 0 + for i := len(utils); i >= 0; i-- { + if utils[i] > 15 { + break + } + postCnt++ + } + + if _, ok := optimizer.predicts[str[0]]; !ok { + optimizer.predicts[str[0]] = OptimizerJobExecutionTime{} + } + predict := optimizer.predicts[str[0]] + predict.Pre = ((predict.Pre * predict.Version) + preCnt) / (predict.Version + 1) + predict.Post = ((predict.Post * predict.Version) + postCnt) / (predict.Version + 1) + predict.Total = ((predict.Total * predict.Version) + len(utils)) / (predict.Version + 1) + predict.Version++ + } + }() } diff --git a/src/resource_pool.go b/src/resource_pool.go index 338e945..f2b769e 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -27,8 +27,9 @@ type ResourcePool struct { counter int counterTotal int - bindings map[string]map[string]bool - utils map[string][]int + bindings map[string]map[string]bool + utils map[string][]int + bindingMu sync.Mutex } func (pool *ResourcePool) start() { @@ -210,6 +211,8 @@ func (pool *ResourcePool) releaseNetwork(network string) { } func (pool *ResourcePool) attach(GPU string, job string) { + pool.bindingMu.Lock() + defer pool.bindingMu.Unlock() if _, ok := pool.bindings[GPU]; !ok { pool.bindings[GPU] = map[string]bool{} } @@ -221,6 +224,8 @@ func (pool *ResourcePool) attach(GPU string, job string) { } func (pool *ResourcePool) detach(GPU string, jobName string) { + pool.bindingMu.Lock() + defer pool.bindingMu.Unlock() if _, ok := pool.bindings[GPU]; ok { if len(pool.bindings[GPU]) == 1 { InstanceOfOptimizer().feed(jobName, pool.utils[GPU]) diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index cd6f923..e0db6cf 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -206,6 +206,10 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { for j := range nodes.Status { if gpu.UUID == nodes.Status[j].UUID { nodes.Status[j].MemoryAllocated -= gpu.MemoryTotal + if nodes.Status[j].MemoryAllocated < 0 { + // in case error + nodes.Status[j].MemoryAllocated = 0 + } } } } @@ -376,4 +380,4 @@ func (scheduler *SchedulerFair) Attach(GPU string, job string) { func (scheduler *SchedulerFair) Detach(GPU string, job string) { pool.detach(GPU, job) -} \ No newline at end of file +} diff --git a/src/util.go b/src/util.go index 04f0323..871d850 100644 --- a/src/util.go +++ b/src/util.go @@ -111,7 +111,7 @@ type GPUStatus struct { type NodeStatus struct { ClientID string `json:"id"` ClientHost string `json:"host"` - Version float64 `json:"version"` + Version float64 `json:"version"` NumCPU int `json:"cpu_num"` UtilCPU float64 `json:"cpu_load"` MemTotal int `json:"mem_total"` @@ -164,6 +164,13 @@ type MsgGroupList struct { Groups []Group `json:"groups"` } +type OptimizerJobExecutionTime struct { + Pre int `json:"pre"` + Post int `json:"post"` + Total int `json:"total"` + Version int `json:"version"` +} + func str2int(str string, defaultValue int) int { i, err := strconv.Atoi(str) if err == nil {