mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-15 08:16:43 +00:00
add version, jhl
This commit is contained in:
@@ -1,2 +1,55 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobHistoryLogger struct {
|
||||||
|
scheduler Scheduler
|
||||||
|
jobs map[string]Job
|
||||||
|
tasks map[string][]TaskStatus
|
||||||
|
jobStatus JobStatus
|
||||||
|
resources []NodeStatus
|
||||||
|
killedFlag bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var jobHistoryLoggerInstance *JobHistoryLogger
|
||||||
|
var jobHistoryLoggerInstanceLock sync.Mutex
|
||||||
|
|
||||||
|
func InstanceJobHistoryLogger() *JobHistoryLogger {
|
||||||
|
defer jobHistoryLoggerInstanceLock.Unlock()
|
||||||
|
jobHistoryLoggerInstanceLock.Lock()
|
||||||
|
|
||||||
|
if jobHistoryLoggerInstance == nil {
|
||||||
|
jobHistoryLoggerInstance = &JobHistoryLogger{}
|
||||||
|
}
|
||||||
|
return jobHistoryLoggerInstance
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jhl *JobHistoryLogger) init() {
|
||||||
|
log.Info("jhl init")
|
||||||
|
jhl.jobs = map[string]Job{}
|
||||||
|
jhl.tasks = map[string][]TaskStatus{}
|
||||||
|
/* retrieve list */
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jhl *JobHistoryLogger) submitJob(job Job) {
|
||||||
|
log.Info("submit job", job.Name)
|
||||||
|
jhl.jobs[job.Name] = job
|
||||||
|
jhl.tasks[job.Name] = []TaskStatus{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jhl *JobHistoryLogger) updateJobStatus(jobName string, state State) {
|
||||||
|
log.Info("update job status", jobName)
|
||||||
|
if job, ok := jhl.jobs[jobName]; ok {
|
||||||
|
job.Status = state
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jhl *JobHistoryLogger) submitTaskStatus(jobName string, task TaskStatus) {
|
||||||
|
log.Info("submit job task", jobName)
|
||||||
|
if tasks, ok := jhl.tasks[jobName]; ok {
|
||||||
|
jhl.tasks[jobName] = append(tasks, task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ func (jm *JobManager) start() {
|
|||||||
|
|
||||||
network := jm.scheduler.AcquireNetwork()
|
network := jm.scheduler.AcquireNetwork()
|
||||||
|
|
||||||
|
InstanceJobHistoryLogger().submitJob(jm.job)
|
||||||
|
|
||||||
/* request for resources */
|
/* request for resources */
|
||||||
for i := range jm.job.Tasks {
|
for i := range jm.job.Tasks {
|
||||||
var resource NodeStatus
|
var resource NodeStatus
|
||||||
@@ -112,6 +114,8 @@ func (jm *JobManager) start() {
|
|||||||
/* return resource */
|
/* return resource */
|
||||||
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
|
jm.scheduler.ReleaseResource(jm.job, jm.resources[i])
|
||||||
fmt.Println("return resource ", jm.resources[i].ClientID)
|
fmt.Println("return resource ", jm.resources[i].ClientID)
|
||||||
|
|
||||||
|
InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, res.Status[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !flag {
|
if !flag {
|
||||||
@@ -183,6 +187,7 @@ func (jm *JobManager) status() MsgJobStatus {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
res.Status.Node = taskStatus.Node
|
||||||
tasksStatus = append(tasksStatus, res.Status)
|
tasksStatus = append(tasksStatus, res.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -88,6 +88,13 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Write(js)
|
w.Write(js)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
case "get_counter":
|
||||||
|
log.Debug("get_counters")
|
||||||
|
js, _ := json.Marshal(pool.getCounter())
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Write(js)
|
||||||
|
break
|
||||||
|
|
||||||
case "group_list":
|
case "group_list":
|
||||||
log.Debug("group_list")
|
log.Debug("group_list")
|
||||||
js, _ := json.Marshal(InstanceOfGroupManager().List())
|
js, _ := json.Marshal(InstanceOfGroupManager().List())
|
||||||
|
|||||||
@@ -21,12 +21,18 @@ type ResourcePool struct {
|
|||||||
networks map[string]bool
|
networks map[string]bool
|
||||||
networksFree map[string]bool
|
networksFree map[string]bool
|
||||||
networkMu sync.Mutex
|
networkMu sync.Mutex
|
||||||
|
|
||||||
|
versions map[string]string
|
||||||
|
|
||||||
|
counter int
|
||||||
|
counterTotal int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) start() {
|
func (pool *ResourcePool) start() {
|
||||||
//TODO: retrieve networks from yao-agent-master in blocking io
|
//TODO: retrieve networks from yao-agent-master in blocking io
|
||||||
pool.networks = map[string]bool{}
|
pool.networks = map[string]bool{}
|
||||||
pool.networksFree = map[string]bool{}
|
pool.networksFree = map[string]bool{}
|
||||||
|
pool.versions = map[string]string{}
|
||||||
|
|
||||||
/* check dead nodes */
|
/* check dead nodes */
|
||||||
go func() {
|
go func() {
|
||||||
@@ -36,6 +42,7 @@ func (pool *ResourcePool) start() {
|
|||||||
for k, v := range pool.heartBeat {
|
for k, v := range pool.heartBeat {
|
||||||
if v.Add(time.Second * 30).Before(time.Now()) {
|
if v.Add(time.Second * 30).Before(time.Now()) {
|
||||||
delete(pool.nodes, k)
|
delete(pool.nodes, k)
|
||||||
|
delete(pool.versions, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
@@ -100,6 +107,12 @@ func (pool *ResourcePool) update(node NodeStatus) {
|
|||||||
pool.mu.Lock()
|
pool.mu.Lock()
|
||||||
defer pool.mu.Unlock()
|
defer pool.mu.Unlock()
|
||||||
|
|
||||||
|
pool.counterTotal++
|
||||||
|
if version, ok := pool.versions[node.ClientID]; ok && version == node.Version {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pool.counter++
|
||||||
status, ok := pool.nodes[node.ClientID]
|
status, ok := pool.nodes[node.ClientID]
|
||||||
if ok {
|
if ok {
|
||||||
for i, GPU := range status.Status {
|
for i, GPU := range status.Status {
|
||||||
@@ -110,6 +123,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
|
|||||||
}
|
}
|
||||||
pool.nodes[node.ClientID] = node
|
pool.nodes[node.ClientID] = node
|
||||||
pool.heartBeat[node.ClientID] = time.Now()
|
pool.heartBeat[node.ClientID] = time.Now()
|
||||||
|
pool.versions[node.ClientID] = node.Version
|
||||||
log.Debug(pool.nodes)
|
log.Debug(pool.nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,6 +146,10 @@ func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory {
|
|||||||
return MsgPoolStatusHistory{Code: 0, Data: pool.history}
|
return MsgPoolStatusHistory{Code: 0, Data: pool.history}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pool *ResourcePool) getCounter() map[string]int {
|
||||||
|
return map[string]int{"counter": pool.counter, "counterTotal": pool.counterTotal}
|
||||||
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) acquireNetwork() string {
|
func (pool *ResourcePool) acquireNetwork() string {
|
||||||
pool.networkMu.Lock()
|
pool.networkMu.Lock()
|
||||||
defer pool.networkMu.Unlock()
|
defer pool.networkMu.Unlock()
|
||||||
|
|||||||
@@ -111,6 +111,7 @@ type GPUStatus struct {
|
|||||||
type NodeStatus struct {
|
type NodeStatus struct {
|
||||||
ClientID string `json:"id"`
|
ClientID string `json:"id"`
|
||||||
ClientHost string `json:"host"`
|
ClientHost string `json:"host"`
|
||||||
|
Version string `json:"version"`
|
||||||
NumCPU int `json:"cpu_num"`
|
NumCPU int `json:"cpu_num"`
|
||||||
UtilCPU float64 `json:"cpu_load"`
|
UtilCPU float64 `json:"cpu_load"`
|
||||||
MemTotal int `json:"mem_total"`
|
MemTotal int `json:"mem_total"`
|
||||||
|
|||||||
Reference in New Issue
Block a user