1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-12-13 07:46:43 +00:00
This commit is contained in:
2020-04-11 11:38:04 +08:00
parent 7ed56468fd
commit 11aded3427
6 changed files with 55 additions and 0 deletions

View File

@@ -42,6 +42,11 @@ func (jm *JobManager) start() {
}
log.Info("Receive resource", resource)
jm.resources = append(jm.resources, resource)
for _, t := range resource.Status {
jm.scheduler.Attach(t.UUID, jm.job.Name)
}
}
jm.scheduler.UpdateProgress(jm.job.Name, Running)
@@ -119,6 +124,10 @@ func (jm *JobManager) start() {
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
fmt.Println("return resource ", jm.resources[i].ClientID)
for _, t := range jm.resources[i].Status {
jm.scheduler.Attach(t.UUID, jm.job.Name)
}
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
}
}

View File

@@ -26,6 +26,8 @@ type ResourcePool struct {
counter int
counterTotal int
bindings map[string]map[string]bool
}
func (pool *ResourcePool) start() {
@@ -34,6 +36,8 @@ func (pool *ResourcePool) start() {
pool.networksFree = map[string]bool{}
pool.versions = map[string]float64{}
pool.bindings = map[string]map[string]bool{}
/* check dead nodes */
go func() {
pool.heartBeat = map[string]time.Time{}
@@ -112,6 +116,8 @@ func (pool *ResourcePool) update(node NodeStatus) {
return
}
log.Info(node.Version, "!=", pool.versions[node.ClientID])
pool.counter++
status, ok := pool.nodes[node.ClientID]
if ok {
@@ -189,3 +195,15 @@ func (pool *ResourcePool) releaseNetwork(network string) {
pool.networksFree[network] = true
pool.networkMu.Unlock()
}
func (pool *ResourcePool) attach(GPU string, job string) {
if _, ok := pool.bindings[GPU]; ok {
pool.bindings[GPU][job] = true
}
}
func (pool *ResourcePool) detach(GPU string, job string) {
if list, ok := pool.bindings[GPU]; ok {
delete(list, job)
}
}

View File

@@ -24,4 +24,8 @@ type Scheduler interface {
ListJobs() MsgJobList
Summary() MsgSummary
Attach(GPU string, job string)
Detach(GPU string, job string)
}

View File

@@ -219,3 +219,11 @@ func (scheduler *SchedulerFCFS) AcquireNetwork() string {
func (scheduler *SchedulerFCFS) ReleaseNetwork(network string) {
pool.releaseNetwork(network)
}
func (scheduler *SchedulerFCFS) Attach(GPU string, job string) {
pool.attach(GPU, job)
}
func (scheduler *SchedulerFCFS) Detach(GPU string, job string) {
pool.detach(GPU, job)
}

View File

@@ -366,3 +366,11 @@ func (scheduler *SchedulerFair) UpdateNextQueue() {
scheduler.nextQueue = next
log.Info("updateNextQueue ->", next)
}
func (scheduler *SchedulerFair) Attach(GPU string, job string) {
pool.attach(GPU, job)
}
func (scheduler *SchedulerFair) Detach(GPU string, job string) {
pool.detach(GPU, job)
}

View File

@@ -243,3 +243,11 @@ func (scheduler *SchedulerPriority) AcquireNetwork() string {
func (scheduler *SchedulerPriority) ReleaseNetwork(network string) {
pool.releaseNetwork(network)
}
func (scheduler *SchedulerPriority) Attach(GPU string, job string) {
pool.attach(GPU, job)
}
func (scheduler *SchedulerPriority) Detach(GPU string, job string) {
pool.detach(GPU, job)
}