From 11aded3427e72041f5c41e44d9f9538afd310abf Mon Sep 17 00:00:00 2001 From: Newnius Date: Sat, 11 Apr 2020 11:38:04 +0800 Subject: [PATCH] update --- src/job_manager.go | 9 +++++++++ src/resource_pool.go | 18 ++++++++++++++++++ src/scheduler.go | 4 ++++ src/scheduler_FCFS.go | 8 ++++++++ src/scheduler_fair.go | 8 ++++++++ src/scheduler_priority.go | 8 ++++++++ 6 files changed, 55 insertions(+) diff --git a/src/job_manager.go b/src/job_manager.go index c8e7c5c..f639888 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -42,6 +42,11 @@ func (jm *JobManager) start() { } log.Info("Receive resource", resource) jm.resources = append(jm.resources, resource) + + for _, t := range resource.Status { + jm.scheduler.Attach(t.UUID, jm.job.Name) + } + } jm.scheduler.UpdateProgress(jm.job.Name, Running) @@ -119,6 +124,10 @@ func (jm *JobManager) start() { jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) fmt.Println("return resource ", jm.resources[i].ClientID) + for _, t := range jm.resources[i].Status { + jm.scheduler.Attach(t.UUID, jm.job.Name) + } + InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) } } diff --git a/src/resource_pool.go b/src/resource_pool.go index 453f8d6..7e645be 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -26,6 +26,8 @@ type ResourcePool struct { counter int counterTotal int + + bindings map[string]map[string]bool } func (pool *ResourcePool) start() { @@ -34,6 +36,8 @@ func (pool *ResourcePool) start() { pool.networksFree = map[string]bool{} pool.versions = map[string]float64{} + pool.bindings = map[string]map[string]bool{} + /* check dead nodes */ go func() { pool.heartBeat = map[string]time.Time{} @@ -112,6 +116,8 @@ func (pool *ResourcePool) update(node NodeStatus) { return } + log.Info(node.Version, "!=", pool.versions[node.ClientID]) + pool.counter++ status, ok := pool.nodes[node.ClientID] if ok { @@ -189,3 +195,15 @@ func (pool *ResourcePool) releaseNetwork(network string) { pool.networksFree[network] = true pool.networkMu.Unlock() } + +func (pool *ResourcePool) attach(GPU string, job string) { + if _, ok := pool.bindings[GPU]; ok { + pool.bindings[GPU][job] = true + } +} + +func (pool *ResourcePool) detach(GPU string, job string) { + if list, ok := pool.bindings[GPU]; ok { + delete(list, job) + } +} diff --git a/src/scheduler.go b/src/scheduler.go index 77248c4..afa149c 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -24,4 +24,8 @@ type Scheduler interface { ListJobs() MsgJobList Summary() MsgSummary + + Attach(GPU string, job string) + + Detach(GPU string, job string) } diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index 784be4c..320bc6c 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -219,3 +219,11 @@ func (scheduler *SchedulerFCFS) AcquireNetwork() string { func (scheduler *SchedulerFCFS) ReleaseNetwork(network string) { pool.releaseNetwork(network) } + +func (scheduler *SchedulerFCFS) Attach(GPU string, job string) { + pool.attach(GPU, job) +} + +func (scheduler *SchedulerFCFS) Detach(GPU string, job string) { + pool.detach(GPU, job) +} \ No newline at end of file diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 31fb0d9..cb752ea 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -366,3 +366,11 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { scheduler.nextQueue = next log.Info("updateNextQueue ->", next) } + +func (scheduler *SchedulerFair) Attach(GPU string, job string) { + pool.attach(GPU, job) +} + +func (scheduler *SchedulerFair) Detach(GPU string, job string) { + pool.detach(GPU, job) +} \ No newline at end of file diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index fd92c26..13d5b81 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -243,3 +243,11 @@ func (scheduler *SchedulerPriority) AcquireNetwork() string { func (scheduler *SchedulerPriority) ReleaseNetwork(network string) { pool.releaseNetwork(network) } + +func (scheduler *SchedulerPriority) Attach(GPU string, job string) { + pool.attach(GPU, job) +} + +func (scheduler *SchedulerPriority) Detach(GPU string, job string) { + pool.detach(GPU, job) +}