1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-05-04 13:59:01 +08:00
parent b6516496bc
commit d9a53d52e5
4 changed files with 34 additions and 7 deletions

View File

@@ -115,7 +115,7 @@ func (jm *JobManager) start() {
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU))
v.Set("network", network) v.Set("network", network)
v.Set("should_wait", "1") v.Set("should_wait", "1")
v.Set("output_dir", "/output/") v.Set("output_dir", "/tmp/")
v.Set("hdfs_dir", "http://hdfs-master:50070/user/yao/output/"+jm.job.Name) v.Set("hdfs_dir", "http://hdfs-master:50070/user/yao/output/"+jm.job.Name)
v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[i].MemoryGPU)) v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[i].MemoryGPU))
@@ -171,6 +171,14 @@ func (jm *JobManager) start() {
log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status) log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status)
/* save logs etc. */ /* save logs etc. */
if exitCode, ok := res.Status[i].State["ExitCode"].(int); ok {
if exitCode != 0 {
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
jm.killedFlag = true
jm.scheduler.UpdateProgress(jm.job, Failed)
}
}
/* remove exited containers */ /* remove exited containers */
//v := url.Values{} //v := url.Values{}
//v.Set("id", res.Status[i].Id) //v.Set("id", res.Status[i].Id)

View File

@@ -41,10 +41,12 @@ type ResourcePool struct {
} }
func (pool *ResourcePool) start() { func (pool *ResourcePool) start() {
log.Info("RM started ")
pool.networks = map[string]bool{} pool.networks = map[string]bool{}
pool.networksFree = map[string]bool{} pool.networksFree = map[string]bool{}
pool.versions = map[string]float64{}
pool.versions = map[string]float64{}
pool.bindings = map[string]map[string]int{} pool.bindings = map[string]map[string]int{}
pool.utils = map[string][]UtilGPUTimeSeries{} pool.utils = map[string][]UtilGPUTimeSeries{}
@@ -221,7 +223,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
defer pool.bindingsMu.Unlock() defer pool.bindingsMu.Unlock()
for _, gpu := range node.Status { for _, gpu := range node.Status {
if _, ok := pool.bindings[gpu.UUID]; ok { if _, ok := pool.bindings[gpu.UUID]; ok {
if len(pool.bindings[gpu.UUID]) == 1 { if _, ok2 := pool.utils[gpu.UUID]; ok2 {
pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID], pool.utils[gpu.UUID] = append(pool.utils[gpu.UUID],
UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU}) UtilGPUTimeSeries{Time: (int)(time.Now().Unix()), Util: gpu.UtilizationGPU})
} }
@@ -244,7 +246,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
status, ok := seg.Nodes[node.ClientID] status, ok := seg.Nodes[node.ClientID]
if ok { if ok {
/* remain allocation info */ /* keep allocation info */
for i, GPU := range status.Status { for i, GPU := range status.Status {
if GPU.UUID == node.Status[i].UUID { if GPU.UUID == node.Status[i].UUID {
node.Status[i].MemoryAllocated = GPU.MemoryAllocated node.Status[i].MemoryAllocated = GPU.MemoryAllocated
@@ -433,15 +435,21 @@ func (pool *ResourcePool) attach(GPU string, job string) {
if _, ok := pool.utils[GPU]; !ok { if _, ok := pool.utils[GPU]; !ok {
pool.utils[GPU] = []UtilGPUTimeSeries{} pool.utils[GPU] = []UtilGPUTimeSeries{}
} }
if len(pool.bindings[GPU]) > 1 {
delete(pool.utils, GPU)
}
} }
func (pool *ResourcePool) detach(GPU string, jobName string) { func (pool *ResourcePool) detach(GPU string, jobName string) {
pool.bindingsMu.Lock() pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock() defer pool.bindingsMu.Unlock()
if _, ok := pool.bindings[GPU]; ok { if _, ok := pool.bindings[GPU]; ok {
if len(pool.bindings[GPU]) == 1 { if _, ok2 := pool.utils[GPU]; ok2 {
InstanceOfOptimizer().feed(jobName, pool.utils[GPU]) if len(pool.bindings[GPU]) == 1 {
pool.utils[GPU] = []UtilGPUTimeSeries{} InstanceOfOptimizer().feed(jobName, pool.utils[GPU])
}
delete(pool.utils, GPU)
} }
} }

View File

@@ -85,6 +85,7 @@ func (scheduler *SchedulerFair) Start() {
scheduler.parallelism = 1 scheduler.parallelism = 1
go func() { go func() {
/* fair scheduler */
flag := true flag := true
for { for {
log.Debug("Scheduling") log.Debug("Scheduling")
@@ -286,6 +287,14 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) {
} }
} }
break break
case Failed:
for i := range scheduler.history {
if scheduler.history[i].Name == job.Name {
scheduler.history[i].Status = Failed
scheduler.history[i].UpdatedAt = int(time.Now().Unix())
}
}
break
} }
} }

View File

@@ -13,4 +13,6 @@ const (
Stopped Stopped
// finished successfully // finished successfully
Finished Finished
Failed
) )