1
0
mirror of https://github.com/newnius/YAO-scheduler.git synced 2025-06-07 14:21:55 +00:00
This commit is contained in:
Newnius 2019-07-10 20:40:43 +08:00
parent ad1497bfbd
commit c42154696e
10 changed files with 154 additions and 115 deletions

2
.gitignore vendored
View File

@ -8,7 +8,7 @@
# MacOS
.DS_Store
test.go
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.

View File

@ -4,8 +4,8 @@ import (
"sync"
"github.com/Shopify/sarama"
"encoding/json"
"log"
"fmt"
log "github.com/sirupsen/logrus"
"time"
)
var (
@ -14,10 +14,13 @@ var (
func start(pool *ResourcePool) {
consumer, err := sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil)
if err != nil {
fmt.Println(err)
return
//panic(err)
for {
if err == nil {
break
}
log.Warn(err)
time.Sleep(time.Second * 5)
consumer, err = sarama.NewConsumer([]string{"kafka-nod21:9092", "kafka-node2:9092", "kafka-node3:9092"}, nil)
}
partitionList, err := consumer.Partitions("yao")

View File

@ -2,41 +2,41 @@ package main
import (
"time"
"log"
"net/url"
"strings"
"io/ioutil"
"encoding/json"
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
)
type JobManager struct {
allocator *AllocatorFIFO
scheduler Scheduler
job Job
jobStatus JobStatus
resources []NodeStatus
}
func (jm *JobManager) start() {
log.Println("start job ", jm.job.Name)
log.Info("start job ", jm.job.Name)
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
network := allocator.acquireNetwork()
network := jm.scheduler.AcquireNetwork()
/* request for resources */
for i := range jm.job.Tasks {
var resource NodeStatus
for {
resource = jm.allocator.requestResource(jm.job.Tasks[i])
resource = jm.scheduler.AcquireResource(jm.job.Tasks[i])
if len(resource.Status) > 0 {
break
}
}
log.Println("Receive resource", resource)
log.Info("Receive resource", resource)
jm.resources = append(jm.resources, resource)
}
jm.allocator.ack(&jm.job)
jm.scheduler.UpdateProgress(jm.job.Name, Running)
/* bring up containers */
for i := range jm.job.Tasks {
@ -57,36 +57,34 @@ func (jm *JobManager) start() {
resp, err := doRequest("POST", "http://"+jm.resources[i].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
if err != nil {
log.Println(err.Error())
log.Warn(err.Error())
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println(err)
log.Warn(err)
return
}
var res MsgCreate
err = json.Unmarshal([]byte(string(body)), &res)
if err != nil {
log.Println(err)
log.Warn(err)
return
}
jm.jobStatus.tasks[jm.job.Tasks[i].Name] = TaskStatus{Id: res.Id, Node: jm.resources[i].ClientHost}
}
jm.allocator.running(&jm.job)
/* monitor job execution */
for {
res := jm.status()
flag := false
for i := range res.Status {
if res.Status[i].Status == "running" {
log.Println(jm.job.Name, "-", i, " is running")
log.Info(jm.job.Name, "-", i, " is running")
flag = true
} else {
log.Println(jm.job.Name, "-", i, " ", res.Status[i].Status)
@ -94,7 +92,7 @@ func (jm *JobManager) start() {
/* save logs etc. */
/* return resource */
jm.allocator.returnResource(jm.resources[i])
jm.scheduler.ReleaseResource(jm.resources[i])
fmt.Println("return resource ", jm.resources[i].ClientID)
}
}
@ -104,10 +102,10 @@ func (jm *JobManager) start() {
time.Sleep(time.Second * 10)
}
allocator.releaseNetwork(network)
jm.scheduler.ReleaseNetwork(network)
jm.allocator.finish(&jm.job)
log.Println("finish job", jm.job.Name)
jm.scheduler.UpdateProgress(jm.job.Name, Finished)
log.Info("finish job", jm.job.Name)
}
func (jm *JobManager) logs(taskName string) MsgLog {
@ -177,8 +175,8 @@ func (jm *JobManager) stop() MsgStop {
}
for i := range jm.resources {
jm.allocator.returnResource(jm.resources[i])
jm.scheduler.ReleaseResource(jm.resources[i])
}
jm.allocator.finish(&jm.job)
jm.scheduler.UpdateProgress(jm.job.Name, Stopped)
return MsgStop{Code: 0}
}

View File

@ -3,7 +3,7 @@ package main
import (
"flag"
"net/http"
"log"
log "github.com/sirupsen/logrus"
"encoding/json"
"fmt"
)
@ -12,7 +12,7 @@ var addr = flag.String("addr", ":8080", "http service address")
var pool *ResourcePool
var allocator *AllocatorFIFO
var scheduler Scheduler
func serverAPI(w http.ResponseWriter, r *http.Request) {
switch r.URL.Query().Get("action") {
@ -38,7 +38,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
msgSubmit.Code = 1
msgSubmit.Error = err.Error()
} else {
allocator.schedule(job)
scheduler.Schedule(job)
}
js, _ := json.Marshal(msgSubmit)
w.Header().Set("Content-Type", "application/json")
@ -47,35 +47,35 @@ func serverAPI(w http.ResponseWriter, r *http.Request) {
case "job_status":
fmt.Println("job_status")
js, _ := json.Marshal(allocator.status(r.URL.Query().Get("id")))
js, _ := json.Marshal(scheduler.QueryState(r.URL.Query().Get("id")))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "job_stop":
fmt.Println("job_stop")
js, _ := json.Marshal(allocator.stop(string(r.PostFormValue("id"))))
js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id"))))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "task_logs":
fmt.Println("task_logs")
js, _ := json.Marshal(allocator.logs(r.URL.Query().Get("job"), r.URL.Query().Get("task")))
js, _ := json.Marshal(scheduler.QueryLogs(r.URL.Query().Get("job"), r.URL.Query().Get("task")))
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "jobs":
fmt.Println("job_list")
js, _ := json.Marshal(allocator.listJobs())
js, _ := json.Marshal(scheduler.ListJobs())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
case "summary":
fmt.Println("summary")
js, _ := json.Marshal(allocator.summary())
js, _ := json.Marshal(scheduler.Summary())
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
@ -98,8 +98,8 @@ func main() {
pool.nodes = make(map[string]NodeStatus)
pool.start()
allocator = &AllocatorFIFO{}
allocator.start()
scheduler = &SchedulerFCFS{}
scheduler.Start()
go func() {
start(pool)

13
src/pool_status.go Normal file
View File

@ -0,0 +1,13 @@
package main
type PoolStatus struct {
TimeStamp string `json:"ts"`
UtilCPU float64 `json:"cpu_util"`
TotalCPU int `json:"cpu_total"`
TotalMem int `json:"mem_total"`
AvailableMem int `json:"mem_available"`
TotalGPU int `json:"TotalGPU"`
UtilGPU int `json:"gpu_util"`
TotalMemGPU int `json:"gpu_mem_total"`
AvailableMemGPU int `json:"gpu_mem_available"`
}

View File

@ -5,11 +5,10 @@ import (
"time"
"net/url"
"strings"
"log"
log "github.com/sirupsen/logrus"
"math/rand"
"strconv"
"fmt"
)
)
type ResourcePool struct {
mu sync.Mutex
@ -111,8 +110,7 @@ func (pool *ResourcePool) update(node NodeStatus) {
}
pool.nodes[node.ClientID] = node
pool.heartBeat[node.ClientID] = time.Now()
//log.Println(pool.nodes)
log.Debug(pool.nodes)
}
func (pool *ResourcePool) getByID(id string) NodeStatus {
@ -138,7 +136,7 @@ func (pool *ResourcePool) acquireNetwork() string {
pool.networkMu.Lock()
defer pool.networkMu.Unlock()
var network string
fmt.Println(pool.networksFree)
log.Info(pool.networksFree)
if len(pool.networksFree) == 0 {
for {
for {

27
src/scheduler.go Normal file
View File

@ -0,0 +1,27 @@
package main
type Scheduler interface {
Start()
Schedule(Job)
UpdateProgress(jobName string, state State)
AcquireResource(Task) NodeStatus
ReleaseResource(NodeStatus)
AcquireNetwork() string
ReleaseNetwork(network string)
QueryState(jobName string) MsgJobStatus
QueryLogs(jobName string, taskName string) MsgLog
Stop(jobName string) MsgStop
ListJobs() MsgJobList
Summary() MsgSummary
}

View File

@ -3,9 +3,10 @@ package main
import (
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type AllocatorFIFO struct {
type SchedulerFCFS struct {
history []*Job
queue []Job
mu sync.Mutex
@ -14,27 +15,27 @@ type AllocatorFIFO struct {
jobs map[string]*JobManager
}
func (allocator *AllocatorFIFO) start() {
allocator.jobs = map[string]*JobManager{}
allocator.history = []*Job{}
func (scheduler *SchedulerFCFS) Start() {
scheduler.jobs = map[string]*JobManager{}
scheduler.history = []*Job{}
go func() {
for {
//fmt.Print("Scheduling ")
log.Info("Scheduling")
time.Sleep(time.Second * 5)
allocator.scheduling.Lock()
allocator.mu.Lock()
if len(allocator.queue) > 0 {
scheduler.scheduling.Lock()
scheduler.mu.Lock()
if len(scheduler.queue) > 0 {
jm := JobManager{}
jm.job = allocator.queue[0]
allocator.queue = allocator.queue[1:]
jm.allocator = allocator
allocator.jobs[jm.job.Name] = &jm
jm.job = scheduler.queue[0]
scheduler.queue = scheduler.queue[1:]
jm.scheduler = scheduler
scheduler.jobs[jm.job.Name] = &jm
for i := range allocator.history {
if allocator.history[i].Name == jm.job.Name {
allocator.history[i].Status = Starting
for i := range scheduler.history {
if scheduler.history[i].Name == jm.job.Name {
scheduler.history[i].Status = Starting
}
}
@ -42,42 +43,45 @@ func (allocator *AllocatorFIFO) start() {
jm.start()
}()
} else {
allocator.scheduling.Unlock()
scheduler.scheduling.Unlock()
}
allocator.mu.Unlock()
scheduler.mu.Unlock()
}
}()
}
func (allocator *AllocatorFIFO) ack(job *Job) {
allocator.scheduling.Unlock()
}
func (scheduler *SchedulerFCFS) UpdateProgress(jobName string, state State) {
scheduler.scheduling.Unlock()
switch state {
case Running:
scheduler.scheduling.Unlock()
func (allocator *AllocatorFIFO) running(job *Job) {
for i := range allocator.history {
if allocator.history[i].Name == job.Name {
allocator.history[i].Status = Running
for i := range scheduler.history {
if scheduler.history[i].Name == jobName {
scheduler.history[i].Status = Running
}
}
break
case Finished:
for i := range scheduler.history {
if scheduler.history[i].Name == jobName {
scheduler.history[i].Status = Finished
}
}
break
}
}
func (allocator *AllocatorFIFO) finish(job *Job) {
for i := range allocator.history {
if allocator.history[i].Name == job.Name {
allocator.history[i].Status = Finished
}
}
func (scheduler *SchedulerFCFS) Schedule(job Job) {
scheduler.mu.Lock()
defer scheduler.mu.Unlock()
scheduler.queue = append(scheduler.queue, job)
scheduler.history = append(scheduler.history, &job)
job.Status = Created
}
func (allocator *AllocatorFIFO) schedule(job Job) {
allocator.mu.Lock()
defer allocator.mu.Unlock()
allocator.queue = append(allocator.queue, job)
allocator.history = append(allocator.history, &job)
}
func (allocator *AllocatorFIFO) requestResource(task Task) NodeStatus {
func (scheduler *SchedulerFCFS) AcquireResource(task Task) NodeStatus {
pool.mu.Lock()
defer pool.mu.Unlock()
@ -108,7 +112,7 @@ func (allocator *AllocatorFIFO) requestResource(task Task) NodeStatus {
return res
}
func (allocator *AllocatorFIFO) returnResource(agent NodeStatus) {
func (scheduler *SchedulerFCFS) ReleaseResource(agent NodeStatus) {
pool.mu.Lock()
defer pool.mu.Unlock()
nodes := pool.nodes[agent.ClientID]
@ -121,35 +125,35 @@ func (allocator *AllocatorFIFO) returnResource(agent NodeStatus) {
}
}
func (allocator *AllocatorFIFO) status(jobName string) MsgJobStatus {
jm, ok := allocator.jobs[jobName]
func (scheduler *SchedulerFCFS) QueryState(jobName string) MsgJobStatus {
jm, ok := scheduler.jobs[jobName]
if !ok {
return MsgJobStatus{Code: 1, Error: "Job not exist!"}
}
return jm.status()
}
func (allocator *AllocatorFIFO) stop(jobName string) MsgStop {
jm, ok := allocator.jobs[jobName]
func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop {
jm, ok := scheduler.jobs[jobName]
if !ok {
return MsgStop{Code: 1, Error: "Job not exist!"}
}
return jm.stop()
}
func (allocator *AllocatorFIFO) logs(jobName string, taskName string) MsgLog {
jm, ok := allocator.jobs[jobName]
func (scheduler *SchedulerFCFS) QueryLogs(jobName string, taskName string) MsgLog {
jm, ok := scheduler.jobs[jobName]
if !ok {
return MsgLog{Code: 1, Error: "Job not exist!"}
}
return jm.logs(taskName)
}
func (allocator *AllocatorFIFO) listJobs() MsgJobList {
return MsgJobList{Code: 0, Jobs: allocator.history}
func (scheduler *SchedulerFCFS) ListJobs() MsgJobList {
return MsgJobList{Code: 0, Jobs: scheduler.history}
}
func (allocator *AllocatorFIFO) summary() MsgSummary {
func (scheduler *SchedulerFCFS) Summary() MsgSummary {
summary := MsgSummary{}
summary.Code = 0
@ -157,7 +161,7 @@ func (allocator *AllocatorFIFO) summary() MsgSummary {
runningJobsCounter := 0
pendingJobsCounter := 0
for _, job := range allocator.history {
for _, job := range scheduler.history {
switch job.Status {
case Created:
pendingJobsCounter++
@ -195,10 +199,10 @@ func (allocator *AllocatorFIFO) summary() MsgSummary {
return summary
}
func (allocator *AllocatorFIFO) acquireNetwork() string {
func (scheduler *SchedulerFCFS) AcquireNetwork() string {
return pool.acquireNetwork()
}
func (allocator *AllocatorFIFO) releaseNetwork(network string) {
func (scheduler *SchedulerFCFS) ReleaseNetwork(network string) {
pool.releaseNetwork(network)
}

16
src/state.go Normal file
View File

@ -0,0 +1,16 @@
package main
type State int
const (
// submitted
Created State = iota
// scheduling
Starting
// running
Running
// stopped
Stopped
// finished successfully
Finished
)

View File

@ -8,26 +8,6 @@ import (
"net/http"
)
const (
Created = 0
Starting = 1
Running = 2
Stopped = 3
Finished = 4
)
type PoolStatus struct {
TimeStamp string `json:"ts"`
UtilCPU float64 `json:"cpu_util"`
TotalCPU int `json:"cpu_total"`
TotalMem int `json:"mem_total"`
AvailableMem int `json:"mem_available"`
TotalGPU int `json:"TotalGPU"`
UtilGPU int `json:"gpu_util"`
TotalMemGPU int `json:"gpu_mem_total"`
AvailableMemGPU int `json:"gpu_mem_available"`
}
type MsgSubmit struct {
Code int `json:"code"`
Error string `json:"error"`
@ -143,7 +123,7 @@ type Job struct {
CreatedAt int `json:"created_at"`
UpdatedAt int `json:"updated_at"`
CreatedBy int `json:"created_by"`
Status int `json:"status"`
Status State `json:"status"`
}
type Task struct {