1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-15 08:16:43 +00:00
This commit is contained in:
2020-05-04 18:05:11 +08:00
parent 4815ab7321
commit 45685fef79
6 changed files with 16 additions and 17 deletions

View File

@@ -58,7 +58,7 @@ func (jm *JobManager) start() {
log.Info("return resource ", tt.ClientID) log.Info("return resource ", tt.ClientID)
jm.resources[i].ClientID = "null" jm.resources[i].ClientID = "null"
for _, t := range tt.Status { for _, t := range tt.Status {
jm.scheduler.Detach(t.UUID, jm.job.Name) jm.scheduler.Detach(t.UUID, jm.job)
} }
} }
} }
@@ -170,7 +170,7 @@ func (jm *JobManager) start() {
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
} else { } else {
log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status) log.Info(jm.job.Name, "-", i, " ", res.Status[i].Status)
if exitCode, ok := res.Status[i].State["ExitCode"].(float64); ok { if exitCode, ok := res.Status[i].State["ExitCode"].(float64); ok && !jm.job.Tasks[i].IsPS {
if exitCode != 0 && !jm.killedFlag { if exitCode != 0 && !jm.killedFlag {
log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode)
jm.killedFlag = true jm.killedFlag = true
@@ -195,7 +195,7 @@ func (jm *JobManager) start() {
jm.resources[i].ClientID = "null" jm.resources[i].ClientID = "null"
for _, t := range jm.resources[i].Status { for _, t := range jm.resources[i].Status {
jm.scheduler.Detach(t.UUID, jm.job.Name) jm.scheduler.Detach(t.UUID, jm.job)
} }
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])

View File

@@ -102,12 +102,12 @@ func (pool *ResourcePool) checkDeadNodes() {
} }
pool.TotalGPUMu.Lock() pool.TotalGPUMu.Lock()
seg.Lock.Lock()
if _, ok := seg.Nodes[k]; ok { if _, ok := seg.Nodes[k]; ok {
pool.TotalGPU -= len(seg.Nodes[k].Status) pool.TotalGPU -= len(seg.Nodes[k].Status)
} }
pool.TotalGPUMu.Unlock() pool.TotalGPUMu.Unlock()
seg.Lock.Lock()
delete(seg.Nodes, k) delete(seg.Nodes, k)
seg.Lock.Unlock() seg.Lock.Unlock()
pool.versionsMu.Lock() pool.versionsMu.Lock()
@@ -361,7 +361,6 @@ func (pool *ResourcePool) list() MsgResource {
start := pool.pools[0].Next start := pool.pools[0].Next
for cur := start; ; { for cur := start; ; {
log.Info(cur.ID)
cur.Lock.Lock() cur.Lock.Lock()
for k, node := range cur.Nodes { for k, node := range cur.Nodes {
nodes[k] = *node nodes[k] = *node
@@ -441,20 +440,20 @@ func (pool *ResourcePool) attach(GPU string, job string) {
} }
} }
func (pool *ResourcePool) detach(GPU string, jobName string) { func (pool *ResourcePool) detach(GPU string, job Job) {
pool.bindingsMu.Lock() pool.bindingsMu.Lock()
defer pool.bindingsMu.Unlock() defer pool.bindingsMu.Unlock()
if _, ok := pool.bindings[GPU]; ok { if _, ok := pool.bindings[GPU]; ok {
if _, ok2 := pool.utils[GPU]; ok2 { if _, ok2 := pool.utils[GPU]; ok2 {
if len(pool.bindings[GPU]) == 1 { if len(pool.bindings[GPU]) == 1 && job.Status == Finished {
InstanceOfOptimizer().feed(jobName, pool.utils[GPU]) InstanceOfOptimizer().feed(job.Name, pool.utils[GPU])
} }
delete(pool.utils, GPU) delete(pool.utils, GPU)
} }
} }
if list, ok := pool.bindings[GPU]; ok { if list, ok := pool.bindings[GPU]; ok {
delete(list, jobName) delete(list, job.Name)
} }
} }

View File

@@ -27,7 +27,7 @@ type Scheduler interface {
Attach(GPU string, job string) Attach(GPU string, job string)
Detach(GPU string, job string) Detach(GPU string, job Job)
Enable() bool Enable() bool

View File

@@ -253,7 +253,7 @@ func (scheduler *SchedulerFCFS) Attach(GPU string, job string) {
pool.attach(GPU, job) pool.attach(GPU, job)
} }
func (scheduler *SchedulerFCFS) Detach(GPU string, job string) { func (scheduler *SchedulerFCFS) Detach(GPU string, job Job) {
pool.detach(GPU, job) pool.detach(GPU, job)
} }

View File

@@ -181,8 +181,8 @@ func (scheduler *SchedulerFair) Start() {
if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved { if len(t) == 0 || !InstanceOfGroupManager().groups[t[0].Group].Reserved {
continue continue
} }
log.Info(scheduler.queueUsingGPU) //log.Info(scheduler.queueUsingGPU)
log.Info(scheduler.queuesSchedulingCnt) //log.Info(scheduler.queuesSchedulingCnt)
scheduler.queuesUsingGPUMu.Lock() scheduler.queuesUsingGPUMu.Lock()
if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 { if cnt, ok := scheduler.queuesSchedulingCnt[t[0].Group]; ok && cnt > 0 {
scheduler.queuesUsingGPUMu.Unlock() scheduler.queuesUsingGPUMu.Unlock()
@@ -495,7 +495,7 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task, nodes []Node
if len(candidates) > 0 { if len(candidates) > 0 {
log.Info("allocationType is ", allocationType) log.Info("allocationType is ", allocationType)
log.Info(candidates) //log.Info(candidates)
} }
/* assign */ /* assign */
@@ -586,7 +586,7 @@ func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) {
scheduler.UsingGPUMu.Unlock() scheduler.UsingGPUMu.Unlock()
log.Info(node.Status[j].UUID, " is released") log.Info(node.Status[j].UUID, " is released")
} }
log.Info(node.Status[j].MemoryAllocated) //log.Info(node.Status[j].MemoryAllocated)
} }
} }
} }
@@ -801,7 +801,7 @@ func (scheduler *SchedulerFair) Attach(GPU string, job string) {
pool.attach(GPU, job) pool.attach(GPU, job)
} }
func (scheduler *SchedulerFair) Detach(GPU string, job string) { func (scheduler *SchedulerFair) Detach(GPU string, job Job) {
pool.detach(GPU, job) pool.detach(GPU, job)
} }

View File

@@ -277,7 +277,7 @@ func (scheduler *SchedulerPriority) Attach(GPU string, job string) {
pool.attach(GPU, job) pool.attach(GPU, job)
} }
func (scheduler *SchedulerPriority) Detach(GPU string, job string) { func (scheduler *SchedulerPriority) Detach(GPU string, job Job) {
pool.detach(GPU, job) pool.detach(GPU, job)
} }