diff --git a/.gitignore b/.gitignore index e61be51..1cbfec0 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ # MacOS .DS_Store - +test.go ## Ignore Visual Studio temporary files, build results, and ## files generated by popular Visual Studio add-ons. diff --git a/src/collector.go b/src/collector.go index e91e243..1c92a0b 100644 --- a/src/collector.go +++ b/src/collector.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/Shopify/sarama" "encoding/json" - "log" - "fmt" + log "github.com/sirupsen/logrus" + "time" ) var ( @@ -14,10 +14,13 @@ var ( func start(pool *ResourcePool) { consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil) - if err != nil { - fmt.Println(err) - return - //panic(err) + for { + if err == nil { + break + } + log.Warn(err) + time.Sleep(time.Second * 5) + consumer, err = sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil) } partitionList, err := consumer.Partitions("yao") diff --git a/src/job_manager.go b/src/job_manager.go index 073d3ae..224ac24 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -2,41 +2,41 @@ package main import ( "time" - "log" "net/url" "strings" "io/ioutil" "encoding/json" "fmt" "strconv" + log "github.com/sirupsen/logrus" ) type JobManager struct { - allocator *AllocatorFIFO + scheduler Scheduler job Job jobStatus JobStatus resources []NodeStatus } func (jm *JobManager) start() { - log.Println("start job ", jm.job.Name) + log.Info("start job ", jm.job.Name) jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} - network := allocator.acquireNetwork() + network := jm.scheduler.AcquireNetwork() /* request for resources */ for i := range jm.job.Tasks { var resource NodeStatus for { - resource = jm.allocator.requestResource(jm.job.Tasks[i]) + resource = jm.scheduler.AcquireResource(jm.job.Tasks[i]) if len(resource.Status) > 0 { break } } - log.Println("Receive resource", resource) + log.Info("Receive resource", resource) jm.resources = append(jm.resources, resource) } - jm.allocator.ack(&jm.job) + jm.scheduler.UpdateProgress(jm.job.Name, Running) /* bring up containers */ for i := range jm.job.Tasks { @@ -57,36 +57,34 @@ func (jm *JobManager) start() { resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") if err != nil { - log.Println(err.Error()) + log.Warn(err.Error()) return } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { - log.Println(err) + log.Warn(err) return } var res MsgCreate err = json.Unmarshal([]byte(string(body)), &res) if err != nil { - log.Println(err) + log.Warn(err) return } jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id, Node: jm.resources[i].ClientHost} } - jm.allocator.running(&jm.job) - /* monitor job execution */ for { res := jm.status() flag := false for i := range res.Status { if res.Status[i].Status == "running" { - log.Println(jm.job.Name, "-", i, " is running") + log.Info(jm.job.Name, "-", i, " is running") flag = true } else { log.Println(jm.job.Name, "-", i, " ", res.Status[i].Status) @@ -94,7 +92,7 @@ func (jm *JobManager) start() { /* save logs etc. */ /* return resource */ - jm.allocator.returnResource(jm.resources[i]) + jm.scheduler.ReleaseResource(jm.resources[i]) fmt.Println("return resource ", jm.resources[i].ClientID) } } @@ -104,10 +102,10 @@ func (jm *JobManager) start() { time.Sleep(time.Second * 10) } - allocator.releaseNetwork(network) + jm.scheduler.ReleaseNetwork(network) - jm.allocator.finish(&jm.job) - log.Println("finish job", jm.job.Name) + jm.scheduler.UpdateProgress(jm.job.Name, Finished) + log.Info("finish job", jm.job.Name) } func (jm *JobManager) logs(taskName string) MsgLog { @@ -177,8 +175,8 @@ func (jm *JobManager) stop() MsgStop { } for i := range jm.resources { - jm.allocator.returnResource(jm.resources[i]) + jm.scheduler.ReleaseResource(jm.resources[i]) } - jm.allocator.finish(&jm.job) + jm.scheduler.UpdateProgress(jm.job.Name, Stopped) return MsgStop{Code: 0} } diff --git a/src/main.go b/src/main.go index 63e2a10..37614b9 100644 --- a/src/main.go +++ b/src/main.go @@ -3,7 +3,7 @@ package main import ( "flag" "net/http" - "log" + log "github.com/sirupsen/logrus" "encoding/json" "fmt" ) @@ -12,7 +12,7 @@ var addr = flag.String("addr", ":8080", "http service address") var pool *ResourcePool -var allocator *AllocatorFIFO +var scheduler Scheduler func serverAPI(w http.ResponseWriter, r *http.Request) { switch r.URL.Query().Get("action") { @@ -38,7 +38,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { msgSubmit.Code = 1 msgSubmit.Error = err.Error() } else { - allocator.schedule(job) + scheduler.Schedule(job) } js, _ := json.Marshal(msgSubmit) w.Header().Set("Content-Type", "application/json") @@ -47,35 +47,35 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { case "job_status": fmt.Println("job_status") - js, _ := json.Marshal(allocator.status(r.URL.Query().Get("id"))) + js, _ := json.Marshal(scheduler.QueryState(r.URL.Query().Get("id"))) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "job_stop": fmt.Println("job_stop") - js, _ := json.Marshal(allocator.stop(string(r.PostFormValue("id")))) + js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id")))) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "task_logs": fmt.Println("task_logs") - js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("job"), r.URL.Query().Get("task"))) + js, _ := json.Marshal(scheduler.QueryLogs(r.URL.Query().Get("job"), r.URL.Query().Get("task"))) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "jobs": fmt.Println("job_list") - js, _ := json.Marshal(allocator.listJobs()) + js, _ := json.Marshal(scheduler.ListJobs()) w.Header().Set("Content-Type", "application/json") w.Write(js) break case "summary": fmt.Println("summary") - js, _ := json.Marshal(allocator.summary()) + js, _ := json.Marshal(scheduler.Summary()) w.Header().Set("Content-Type", "application/json") w.Write(js) break @@ -98,8 +98,8 @@ func main() { pool.nodes = make(map[string]NodeStatus) pool.start() - allocator = &AllocatorFIFO{} - allocator.start() + scheduler = &SchedulerFCFS{} + scheduler.Start() go func() { start(pool) diff --git a/src/pool_status.go b/src/pool_status.go new file mode 100644 index 0000000..d9a8d3d --- /dev/null +++ b/src/pool_status.go @@ -0,0 +1,13 @@ +package main + +type PoolStatus struct { + TimeStamp string `json:"ts"` + UtilCPU float64 `json:"cpu_util"` + TotalCPU int `json:"cpu_total"` + TotalMem int `json:"mem_total"` + AvailableMem int `json:"mem_available"` + TotalGPU int `json:"TotalGPU"` + UtilGPU int `json:"gpu_util"` + TotalMemGPU int `json:"gpu_mem_total"` + AvailableMemGPU int `json:"gpu_mem_available"` +} diff --git a/src/resource_pool.go b/src/resource_pool.go index fd59788..636b83e 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -5,11 +5,10 @@ import ( "time" "net/url" "strings" - "log" + log "github.com/sirupsen/logrus" "math/rand" "strconv" - "fmt" -) + ) type ResourcePool struct { mu sync.Mutex @@ -111,8 +110,7 @@ func (pool *ResourcePool) update(node NodeStatus) { } pool.nodes[node.ClientID] = node pool.heartBeat[node.ClientID] = time.Now() - - //log.Println(pool.nodes) + log.Debug(pool.nodes) } func (pool *ResourcePool) getByID(id string) NodeStatus { @@ -138,7 +136,7 @@ func (pool *ResourcePool) acquireNetwork() string { pool.networkMu.Lock() defer pool.networkMu.Unlock() var network string - fmt.Println(pool.networksFree) + log.Info(pool.networksFree) if len(pool.networksFree) == 0 { for { for { diff --git a/src/scheduler.go b/src/scheduler.go new file mode 100644 index 0000000..4832f0d --- /dev/null +++ b/src/scheduler.go @@ -0,0 +1,27 @@ +package main + +type Scheduler interface { + Start() + + Schedule(Job) + + UpdateProgress(jobName string, state State) + + AcquireResource(Task) NodeStatus + + ReleaseResource(NodeStatus) + + AcquireNetwork() string + + ReleaseNetwork(network string) + + QueryState(jobName string) MsgJobStatus + + QueryLogs(jobName string, taskName string) MsgLog + + Stop(jobName string) MsgStop + + ListJobs() MsgJobList + + Summary() MsgSummary +} diff --git a/src/AllocatorFIFO.go b/src/scheduler_FCFS.go similarity index 53% rename from src/AllocatorFIFO.go rename to src/scheduler_FCFS.go index 453e1bd..af02c03 100644 --- a/src/AllocatorFIFO.go +++ b/src/scheduler_FCFS.go @@ -3,9 +3,10 @@ package main import ( "sync" "time" + log "github.com/sirupsen/logrus" ) -type AllocatorFIFO struct { +type SchedulerFCFS struct { history []*Job queue []Job mu sync.Mutex @@ -14,27 +15,27 @@ type AllocatorFIFO struct { jobs map[string]*JobManager } -func (allocator *AllocatorFIFO) start() { - allocator.jobs = map[string]*JobManager{} - allocator.history = []*Job{} +func (scheduler *SchedulerFCFS) Start() { + scheduler.jobs = map[string]*JobManager{} + scheduler.history = []*Job{} go func() { for { - //fmt.Print("Scheduling ") + log.Info("Scheduling") time.Sleep(time.Second * 5) - allocator.scheduling.Lock() - allocator.mu.Lock() - if len(allocator.queue) > 0 { + scheduler.scheduling.Lock() + scheduler.mu.Lock() + if len(scheduler.queue) > 0 { jm := JobManager{} - jm.job = allocator.queue[0] - allocator.queue = allocator.queue[1:] - jm.allocator = allocator - allocator.jobs[jm.job.Name] = &jm + jm.job = scheduler.queue[0] + scheduler.queue = scheduler.queue[1:] + jm.scheduler = scheduler + scheduler.jobs[jm.job.Name] = &jm - for i := range allocator.history { - if allocator.history[i].Name == jm.job.Name { - allocator.history[i].Status = Starting + for i := range scheduler.history { + if scheduler.history[i].Name == jm.job.Name { + scheduler.history[i].Status = Starting } } @@ -42,42 +43,45 @@ func (allocator *AllocatorFIFO) start() { jm.start() }() } else { - allocator.scheduling.Unlock() + scheduler.scheduling.Unlock() } - allocator.mu.Unlock() + scheduler.mu.Unlock() } }() } -func (allocator *AllocatorFIFO) ack(job *Job) { - allocator.scheduling.Unlock() -} +func (scheduler *SchedulerFCFS) UpdateProgress(jobName string, state State) { + scheduler.scheduling.Unlock() + switch state { + case Running: + scheduler.scheduling.Unlock() -func (allocator *AllocatorFIFO) running(job *Job) { - for i := range allocator.history { - if allocator.history[i].Name == job.Name { - allocator.history[i].Status = Running + for i := range scheduler.history { + if scheduler.history[i].Name == jobName { + scheduler.history[i].Status = Running + } } + break + case Finished: + for i := range scheduler.history { + if scheduler.history[i].Name == jobName { + scheduler.history[i].Status = Finished + } + } + break } } -func (allocator *AllocatorFIFO) finish(job *Job) { - for i := range allocator.history { - if allocator.history[i].Name == job.Name { - allocator.history[i].Status = Finished - } - } +func (scheduler *SchedulerFCFS) Schedule(job Job) { + scheduler.mu.Lock() + defer scheduler.mu.Unlock() + + scheduler.queue = append(scheduler.queue, job) + scheduler.history = append(scheduler.history, &job) + job.Status = Created } -func (allocator *AllocatorFIFO) schedule(job Job) { - allocator.mu.Lock() - defer allocator.mu.Unlock() - - allocator.queue = append(allocator.queue, job) - allocator.history = append(allocator.history, &job) -} - -func (allocator *AllocatorFIFO) requestResource(task Task) NodeStatus { +func (scheduler *SchedulerFCFS) AcquireResource(task Task) NodeStatus { pool.mu.Lock() defer pool.mu.Unlock() @@ -108,7 +112,7 @@ func (allocator *AllocatorFIFO) requestResource(task Task) NodeStatus { return res } -func (allocator *AllocatorFIFO) returnResource(agent NodeStatus) { +func (scheduler *SchedulerFCFS) ReleaseResource(agent NodeStatus) { pool.mu.Lock() defer pool.mu.Unlock() nodes := pool.nodes[agent.ClientID] @@ -121,35 +125,35 @@ func (allocator *AllocatorFIFO) returnResource(agent NodeStatus) { } } -func (allocator *AllocatorFIFO) status(jobName string) MsgJobStatus { - jm, ok := allocator.jobs[jobName] +func (scheduler *SchedulerFCFS) QueryState(jobName string) MsgJobStatus { + jm, ok := scheduler.jobs[jobName] if !ok { return MsgJobStatus{Code: 1, Error: "Job not exist!"} } return jm.status() } -func (allocator *AllocatorFIFO) stop(jobName string) MsgStop { - jm, ok := allocator.jobs[jobName] +func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop { + jm, ok := scheduler.jobs[jobName] if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } return jm.stop() } -func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog { - jm, ok := allocator.jobs[jobName] +func (scheduler *SchedulerFCFS) QueryLogs(jobName string, taskName string) MsgLog { + jm, ok := scheduler.jobs[jobName] if !ok { return MsgLog{Code: 1, Error: "Job not exist!"} } return jm.logs(taskName) } -func (allocator *AllocatorFIFO) listJobs() MsgJobList { - return MsgJobList{Code: 0, Jobs: allocator.history} +func (scheduler *SchedulerFCFS) ListJobs() MsgJobList { + return MsgJobList{Code: 0, Jobs: scheduler.history} } -func (allocator *AllocatorFIFO) summary() MsgSummary { +func (scheduler *SchedulerFCFS) Summary() MsgSummary { summary := MsgSummary{} summary.Code = 0 @@ -157,7 +161,7 @@ func (allocator *AllocatorFIFO) summary() MsgSummary { runningJobsCounter := 0 pendingJobsCounter := 0 - for _, job := range allocator.history { + for _, job := range scheduler.history { switch job.Status { case Created: pendingJobsCounter++ @@ -195,10 +199,10 @@ func (allocator *AllocatorFIFO) summary() MsgSummary { return summary } -func (allocator *AllocatorFIFO) acquireNetwork() string { +func (scheduler *SchedulerFCFS) AcquireNetwork() string { return pool.acquireNetwork() } -func (allocator *AllocatorFIFO) releaseNetwork(network string) { +func (scheduler *SchedulerFCFS) ReleaseNetwork(network string) { pool.releaseNetwork(network) } diff --git a/src/state.go b/src/state.go new file mode 100644 index 0000000..c3f79f0 --- /dev/null +++ b/src/state.go @@ -0,0 +1,16 @@ +package main + +type State int + +const ( + // submitted + Created State = iota + // scheduling + Starting + // running + Running + // stopped + Stopped + // finished successfully + Finished +) diff --git a/src/util.go b/src/util.go index 7110872..a32116e 100644 --- a/src/util.go +++ b/src/util.go @@ -8,26 +8,6 @@ import ( "net/http" ) -const ( - Created = 0 - Starting = 1 - Running = 2 - Stopped = 3 - Finished = 4 -) - -type PoolStatus struct { - TimeStamp string `json:"ts"` - UtilCPU float64 `json:"cpu_util"` - TotalCPU int `json:"cpu_total"` - TotalMem int `json:"mem_total"` - AvailableMem int `json:"mem_available"` - TotalGPU int `json:"TotalGPU"` - UtilGPU int `json:"gpu_util"` - TotalMemGPU int `json:"gpu_mem_total"` - AvailableMemGPU int `json:"gpu_mem_available"` -} - type MsgSubmit struct { Code int `json:"code"` Error string `json:"error"` @@ -143,7 +123,7 @@ type Job struct { CreatedAt int `json:"created_at"` UpdatedAt int `json:"updated_at"` CreatedBy int `json:"created_by"` - Status int `json:"status"` + Status State `json:"status"` } type Task struct {