From d726c0b85007e04d418b955b94f64895335651f1 Mon Sep 17 00:00:00 2001 From: Newnius Date: Fri, 1 May 2020 14:54:29 +0800 Subject: [PATCH] update --- src/job_manager.go | 42 ++++++++++++++++++++++++++++++++++-------- src/optimizer.go | 4 ++-- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/job_manager.go b/src/job_manager.go index e7fecde..e82cc62 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -27,27 +27,53 @@ func (jm *JobManager) start() { InstanceJobHistoryLogger().submitJob(jm.job) /* request for resources */ - for range jm.job.Tasks { - jm.resources = append(jm.resources, NodeStatus{}) + for range jm.job.Tasks { //append would cause uncertain order + jm.resources = append(jm.resources, NodeStatus{ClientID: "null"}) } - for i := range jm.job.Tasks { + start := time.Now().Unix() + for i := 0; i < len(jm.job.Tasks); i++ { var resource NodeStatus for { if jm.killedFlag { break } - resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], jm.resources) + + var tmp []NodeStatus + for _, t := range jm.resources { + if t.ClientID != "null" { + tmp = append(tmp, t) + } + } + resource = jm.scheduler.AcquireResource(jm.job, jm.job.Tasks[i], tmp) if len(resource.Status) > 0 { break } + + if time.Now().Unix()-start > 30 { + log.Info("Wait too long, return all resource and retry") + for _, tt := range jm.resources { + if tt.ClientID != "null" { + jm.scheduler.ReleaseResource(jm.job, tt) + log.Info("return resource ", tt.ClientID) + jm.resources[i].ClientID = "null" + for _, t := range tt.Status { + jm.scheduler.Detach(t.UUID, jm.job.Name) + } + } + } + i = -1 + start = time.Now().Unix() + } time.Sleep(time.Second * 1) } - log.Info("Receive resource", resource) - jm.resources[i] = resource + if len(resource.Status) > 0 { + log.Info("Receive resource", resource) + jm.resources[i] = resource - for _, t := range resource.Status { - jm.scheduler.Attach(t.UUID, jm.job.Name) + for _, t := range resource.Status { + jm.scheduler.Attach(t.UUID, jm.job.Name) + } } } diff --git a/src/optimizer.go b/src/optimizer.go index 659dbbc..40b511e 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -14,7 +14,7 @@ type Optimizer struct { jobUtilsGPU map[string]*OptimizerUtilGPU - heartbeatInterval int + cache map[string]*OptimizerJobExecutionTime } var optimizerInstance *Optimizer @@ -28,7 +28,7 @@ func InstanceOfOptimizer() *Optimizer { optimizerInstance = &Optimizer{} optimizerInstance.predicts = map[string]*OptimizerJobExecutionTime{} optimizerInstance.jobUtilsGPU = map[string]*OptimizerUtilGPU{} - optimizerInstance.heartbeatInterval = 3 + optimizerInstance.cache = map[string]*OptimizerJobExecutionTime{} } return optimizerInstance }