From 6635d603f086d0190ba79e0ff59c4aba483ff8d3 Mon Sep 17 00:00:00 2001 From: Newnius Date: Sun, 29 Mar 2020 21:12:44 +0800 Subject: [PATCH] add version, jhl --- src/history_logger.go | 53 +++++++++++++++++++++++++++++++++++++++++++ src/job_manager.go | 5 ++++ src/main.go | 7 ++++++ src/resource_pool.go | 18 +++++++++++++++ src/util.go | 1 + 5 files changed, 84 insertions(+) diff --git a/src/history_logger.go b/src/history_logger.go index c9ecbf5..e2949a1 100644 --- a/src/history_logger.go +++ b/src/history_logger.go @@ -1,2 +1,55 @@ package main +import ( + log "github.com/sirupsen/logrus" + "sync" +) + +type JobHistoryLogger struct { + scheduler Scheduler + jobs map[string]Job + tasks map[string][]TaskStatus + jobStatus JobStatus + resources []NodeStatus + killedFlag bool +} + +var jobHistoryLoggerInstance *JobHistoryLogger +var jobHistoryLoggerInstanceLock sync.Mutex + +func InstanceJobHistoryLogger() *JobHistoryLogger { + defer jobHistoryLoggerInstanceLock.Unlock() + jobHistoryLoggerInstanceLock.Lock() + + if jobHistoryLoggerInstance == nil { + jobHistoryLoggerInstance = &JobHistoryLogger{} + } + return jobHistoryLoggerInstance +} + +func (jhl *JobHistoryLogger) init() { + log.Info("jhl init") + jhl.jobs = map[string]Job{} + jhl.tasks = map[string][]TaskStatus{} + /* retrieve list */ +} + +func (jhl *JobHistoryLogger) submitJob(job Job) { + log.Info("submit job", job.Name) + jhl.jobs[job.Name] = job + jhl.tasks[job.Name] = []TaskStatus{} +} + +func (jhl *JobHistoryLogger) updateJobStatus(jobName string, state State) { + log.Info("update job status", jobName) + if job, ok := jhl.jobs[jobName]; ok { + job.Status = state + } +} + +func (jhl *JobHistoryLogger) submitTaskStatus(jobName string, task TaskStatus) { + log.Info("submit job task", jobName) + if tasks, ok := jhl.tasks[jobName]; ok { + jhl.tasks[jobName] = append(tasks, task) + } +} diff --git a/src/job_manager.go b/src/job_manager.go index e5ebc24..82d9447 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -25,6 +25,8 @@ func (jm *JobManager) start() { network := jm.scheduler.AcquireNetwork() + InstanceJobHistoryLogger().submitJob(jm.job) + /* request for resources */ for i := range jm.job.Tasks { var resource NodeStatus @@ -112,6 +114,8 @@ func (jm *JobManager) start() { /* return resource */ jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) fmt.Println("return resource ", jm.resources[i].ClientID) + + InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i]) } } if !flag { @@ -183,6 +187,7 @@ func (jm *JobManager) status() MsgJobStatus { if err != nil { continue } + res.Status.Node = taskStatus.Node tasksStatus = append(tasksStatus, res.Status) } diff --git a/src/main.go b/src/main.go index cdd801b..f251dd0 100644 --- a/src/main.go +++ b/src/main.go @@ -88,6 +88,13 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "get_counter": + log.Debug("get_counters") + js, _ := json.Marshal(pool.getCounter()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + case "group_list": log.Debug("group_list") js, _ := json.Marshal(InstanceOfGroupManager().List()) diff --git a/src/resource_pool.go b/src/resource_pool.go index 02a2f9f..ed46fae 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -21,12 +21,18 @@ type ResourcePool struct { networks map[string]bool networksFree map[string]bool networkMu sync.Mutex + + versions map[string]string + + counter int + counterTotal int } func (pool *ResourcePool) start() { //TODO: retrieve networks from yao-agent-master in blocking io pool.networks = map[string]bool{} pool.networksFree = map[string]bool{} + pool.versions = map[string]string{} /* check dead nodes */ go func() { @@ -36,6 +42,7 @@ func (pool *ResourcePool) start() { for k, v := range pool.heartBeat { if v.Add(time.Second * 30).Before(time.Now()) { delete(pool.nodes, k) + delete(pool.versions, k) } } time.Sleep(time.Second * 10) @@ -100,6 +107,12 @@ func (pool *ResourcePool) update(node NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() + pool.counterTotal++ + if version, ok := pool.versions[node.ClientID]; ok && version == node.Version { + return + } + + pool.counter++ status, ok := pool.nodes[node.ClientID] if ok { for i, GPU := range status.Status { @@ -110,6 +123,7 @@ func (pool *ResourcePool) update(node NodeStatus) { } pool.nodes[node.ClientID] = node pool.heartBeat[node.ClientID] = time.Now() + pool.versions[node.ClientID] = node.Version log.Debug(pool.nodes) } @@ -132,6 +146,10 @@ func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory { return MsgPoolStatusHistory{Code: 0, Data: pool.history} } +func (pool *ResourcePool) getCounter() map[string]int { + return map[string]int{"counter": pool.counter, "counterTotal": pool.counterTotal} +} + func (pool *ResourcePool) acquireNetwork() string { pool.networkMu.Lock() defer pool.networkMu.Unlock() diff --git a/src/util.go b/src/util.go index 7e640ce..f0f57dd 100644 --- a/src/util.go +++ b/src/util.go @@ -111,6 +111,7 @@ type GPUStatus struct { type NodeStatus struct { ClientID string `json:"id"` ClientHost string `json:"host"` + Version string `json:"version"` NumCPU int `json:"cpu_num"` UtilCPU float64 `json:"cpu_load"` MemTotal int `json:"mem_total"`