diff --git a/src/job_manager.go b/src/job_manager.go index f5c53f7..767e435 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -115,7 +115,7 @@ func (jm *JobManager) start() { v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) v.Set("network", network) v.Set("should_wait", "1") - v.Set("output_dir", "/output/") + v.Set("output_dir", "/tmp/") v.Set("hdfs_dir", "http://hdfs-master:50070/user/yao/output/"+jm.job.Name) v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[i].MemoryGPU)) @@ -171,6 +171,14 @@ func (jm *JobManager) start() { log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status) /* save logs etc. */ + if exitCode, ok := res.Status[i].State["ExitCode"].(int); ok { + if exitCode != 0 { + log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) + jm.killedFlag = true + jm.scheduler.UpdateProgress(jm.job, Failed) + } + } + /* remove exited containers */ //v := url.Values{} //v.Set("id", res.Status[i].Id) diff --git a/src/resource_pool.go b/src/resource_pool.go index b55176a..a4253f4 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -41,10 +41,12 @@ type ResourcePool struct { } func (pool *ResourcePool) start() { + log.Info("RM started ") + pool.networks = map[string]bool{} pool.networksFree = map[string]bool{} - pool.versions = map[string]float64{} + pool.versions = map[string]float64{} pool.bindings = map[string]map[string]int{} pool.utils = map[string][]UtilGPUTimeSeries{} @@ -221,7 +223,7 @@ func (pool *ResourcePool) update(node NodeStatus) { defer pool.bindingsMu.Unlock() for _, gpu := range node.Status { if _, ok := pool.bindings[gpu.UUID]; ok { - if len(pool.bindings[gpu.UUID]) == 1 { + if _, ok2 := pool.utils[gpu.UUID]; ok2 { pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID], UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU}) } @@ -244,7 +246,7 @@ func (pool *ResourcePool) update(node NodeStatus) { status, ok := seg.Nodes[node.ClientID] if ok { - /* remain allocation info */ + /* keep allocation info */ for i, GPU := range status.Status { if GPU.UUID == node.Status[i].UUID { node.Status[i].MemoryAllocated = GPU.MemoryAllocated @@ -433,15 +435,21 @@ func (pool *ResourcePool) attach(GPU string, job string) { if _, ok := pool.utils[GPU]; !ok { pool.utils[GPU] = []UtilGPUTimeSeries{} } + + if len(pool.bindings[GPU]) > 1 { + delete(pool.utils, GPU) + } } func (pool *ResourcePool) detach(GPU string, jobName string) { pool.bindingsMu.Lock() defer pool.bindingsMu.Unlock() if _, ok := pool.bindings[GPU]; ok { - if len(pool.bindings[GPU]) == 1 { - InstanceOfOptimizer().feed(jobName, pool.utils[GPU]) - pool.utils[GPU] = []UtilGPUTimeSeries{} + if _, ok2 := pool.utils[GPU]; ok2 { + if len(pool.bindings[GPU]) == 1 { + InstanceOfOptimizer().feed(jobName, pool.utils[GPU]) + } + delete(pool.utils, GPU) } } diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index e5a3d6e..162a12e 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -85,6 +85,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.parallelism = 1 go func() { + /* fair scheduler */ flag := true for { log.Debug("Scheduling") @@ -286,6 +287,14 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) { } } break + case Failed: + for i := range scheduler.history { + if scheduler.history[i].Name == job.Name { + scheduler.history[i].Status = Failed + scheduler.history[i].UpdatedAt = int(time.Now().Unix()) + } + } + break } } diff --git a/src/state.go b/src/state.go index c3f79f0..066ad28 100644 --- a/src/state.go +++ b/src/state.go @@ -13,4 +13,6 @@ const ( Stopped // finished successfully Finished + + Failed )