diff --git a/src/AllocatorFIFO.go b/src/AllocatorFIFO.go index 6647576..287d6e7 100644 --- a/src/AllocatorFIFO.go +++ b/src/AllocatorFIFO.go @@ -51,6 +51,9 @@ func (allocator *AllocatorFIFO) start() { func (allocator *AllocatorFIFO) ack(job *Job) { allocator.scheduling.Unlock() +} + +func (allocator *AllocatorFIFO) running(job *Job) { for i := range allocator.history { if allocator.history[i].Name == job.Name { allocator.history[i].Status = Running @@ -134,3 +137,49 @@ func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog { func (allocator *AllocatorFIFO) listJobs() MsgJobList { return MsgJobList{Code: 0, Jobs: allocator.history} } + +func (allocator *AllocatorFIFO) summary() MsgSummary { + summary := MsgSummary{} + summary.Code = 0 + + finishedJobsCounter := 0 + runningJobsCounter := 0 + pendingJobsCounter := 0 + + for _, job := range allocator.history { + switch job.Status { + case Created: + pendingJobsCounter++ + case Starting: + pendingJobsCounter++ + break + case Running: + runningJobsCounter++ + break; + case Finished: + finishedJobsCounter++ + case Stopped: + finishedJobsCounter++ + } + } + summary.JobsFinished = finishedJobsCounter + summary.JobsPending = pendingJobsCounter + summary.JobsRunning = runningJobsCounter + + FreeGPU := 0 + UsingGPU := 0 + + for _, node := range pool.nodes { + for j := range node { + if node[j].MemoryAllocated == 0 { + FreeGPU++ + } else { + UsingGPU++ + } + } + } + summary.FreeGPU = FreeGPU + summary.UsingGPU = UsingGPU + + return summary +} diff --git a/src/job_manager.go b/src/job_manager.go index 65083d9..839388b 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -7,6 +7,7 @@ import ( "strings" "io/ioutil" "encoding/json" + "fmt" ) type JobManager struct { @@ -32,12 +33,24 @@ func (jm *JobManager) start() { log.Println("Receive resource", resource) jm.resources = append(jm.resources, resource) } + jm.allocator.ack(&jm.job) /* bring up containers */ for i := range jm.job.Tasks { + var GPUs []string + for _, GPU := range jm.resources[i].Status { + GPUs = append(GPUs, GPU.UUID) + } + v := url.Values{} - v.Set("image", jm.job.Image) + v.Set("image", jm.job.Tasks[i].Image) v.Set("cmd", jm.job.Tasks[i].Cmd) + v.Set("name", jm.job.Tasks[i].Name) + v.Set("workspace", jm.job.Workspace) + v.Set("gpus", strings.Join(GPUs, ",")) + + fmt.Print(v.Encode()) + resp, err := doRequest("POST", "http://kafka_node1:8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") if err != nil { log.Println(err) @@ -63,7 +76,7 @@ func (jm *JobManager) start() { jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id} } - jm.allocator.ack(&jm.job) + jm.allocator.running(&jm.job) /* monitor job execution */ for { diff --git a/src/main.go b/src/main.go index cec8ae6..2eb7118 100644 --- a/src/main.go +++ b/src/main.go @@ -70,6 +70,14 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write(js) break + + case "summary": + fmt.Println("summary") + js, _ := json.Marshal(allocator.summary()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + default: http.Error(w, "Not Found", http.StatusNotFound) break diff --git a/src/spider.go b/src/spider.go index 52b8306..68590d3 100644 --- a/src/spider.go +++ b/src/spider.go @@ -15,7 +15,6 @@ type Spider struct { ContentType string Referer string Data url.Values - Response *http.Response } diff --git a/src/util.go b/src/util.go index bb82868..4677448 100644 --- a/src/util.go +++ b/src/util.go @@ -16,6 +16,21 @@ const ( Finished = 4 ) +type MsgSubmit struct { + Code int `json:"code"` + Error string `json:"error"` +} + +type MsgSummary struct { + Code int `json:"code"` + Error string `json:"error"` + JobsFinished int `json:"jobs_finished"` + JobsRunning int `json:"jobs_running"` + JobsPending int `json:"jobs_pending"` + FreeGPU int `json:"gpu_free"` + UsingGPU int `json:"gpu_using"` +} + type MsgJobList struct { Code int `json:"code"` Error string `json:"error"` @@ -48,6 +63,7 @@ type MsgCreate struct { type TaskStatus struct { Id string `json:"id"` + Name string `json:"name"` Image string `json:"image"` ImageDigest string `json:"image_digest"` Command string `json:"command"` @@ -83,9 +99,8 @@ type MsgAgent struct { type Job struct { ID int `json:"id"` Name string `json:"name"` - Image string `json:"image"` Tasks []Task `json:"tasks"` - Workspace int `json:"workspace"` + Workspace string `json:"workspace"` Cluster int `json:"virtual_cluster"` Priority int `json:"priority"` RunBefore int `json:"run_before"` @@ -97,6 +112,7 @@ type Job struct { type Task struct { Name string `json:"name"` + Image string `json:"image"` Cmd string `json:"cmd"` NumberCPU int `json:"cpu_number"` Memory int `json:"memory"`