mirror of
https://github.com/newnius/YAO-scheduler.git
synced 2025-12-15 08:16:43 +00:00
[WIP] add distributed job support
This commit is contained in:
@@ -194,3 +194,11 @@ func (allocator *AllocatorFIFO) summary() MsgSummary {
|
|||||||
|
|
||||||
return summary
|
return summary
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (allocator *AllocatorFIFO) acquireNetwork() string {
|
||||||
|
return pool.acquireNetwork()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (allocator *AllocatorFIFO) releaseNetwork(network string) {
|
||||||
|
pool.releaseNetwork(network)
|
||||||
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobManager struct {
|
type JobManager struct {
|
||||||
@@ -21,6 +22,8 @@ func (jm *JobManager) start() {
|
|||||||
log.Println("start job ", jm.job.Name)
|
log.Println("start job ", jm.job.Name)
|
||||||
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
|
jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}}
|
||||||
|
|
||||||
|
network := allocator.acquireNetwork()
|
||||||
|
|
||||||
/* request for resources */
|
/* request for resources */
|
||||||
for i := range jm.job.Tasks {
|
for i := range jm.job.Tasks {
|
||||||
var resource NodeStatus
|
var resource NodeStatus
|
||||||
@@ -48,6 +51,9 @@ func (jm *JobManager) start() {
|
|||||||
v.Set("name", jm.job.Tasks[i].Name)
|
v.Set("name", jm.job.Tasks[i].Name)
|
||||||
v.Set("workspace", jm.job.Workspace)
|
v.Set("workspace", jm.job.Workspace)
|
||||||
v.Set("gpus", strings.Join(GPUs, ","))
|
v.Set("gpus", strings.Join(GPUs, ","))
|
||||||
|
v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m")
|
||||||
|
v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU))
|
||||||
|
v.Set("network", network)
|
||||||
|
|
||||||
fmt.Println(v.Encode())
|
fmt.Println(v.Encode())
|
||||||
|
|
||||||
@@ -102,6 +108,8 @@ func (jm *JobManager) start() {
|
|||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
allocator.releaseNetwork(network)
|
||||||
|
|
||||||
jm.allocator.finish(&jm.job)
|
jm.allocator.finish(&jm.job)
|
||||||
log.Println("finish job", jm.job.Name)
|
log.Println("finish job", jm.job.Name)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,11 @@ package main
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ResourcePool struct {
|
type ResourcePool struct {
|
||||||
@@ -12,9 +17,16 @@ type ResourcePool struct {
|
|||||||
history []PoolStatus
|
history []PoolStatus
|
||||||
|
|
||||||
heartBeat map[string]time.Time
|
heartBeat map[string]time.Time
|
||||||
|
|
||||||
|
networks map[string]bool
|
||||||
|
networksFree map[string]bool
|
||||||
|
networkMu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *ResourcePool) start() {
|
func (pool *ResourcePool) start() {
|
||||||
|
pool.networks = map[string]bool{}
|
||||||
|
pool.networksFree = map[string]bool{}
|
||||||
|
|
||||||
/* check dead nodes */
|
/* check dead nodes */
|
||||||
go func() {
|
go func() {
|
||||||
pool.heartBeat = map[string]time.Time{}
|
pool.heartBeat = map[string]time.Time{}
|
||||||
@@ -119,3 +131,38 @@ func (pool *ResourcePool) list() MsgResource {
|
|||||||
func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory {
|
func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory {
|
||||||
return MsgPoolStatusHistory{Code: 0, Data: pool.history}
|
return MsgPoolStatusHistory{Code: 0, Data: pool.history}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pool *ResourcePool) acquireNetwork() string {
|
||||||
|
var network string
|
||||||
|
if len(pool.networksFree) == 0 {
|
||||||
|
for {
|
||||||
|
network = "yao-net-" + strconv.Itoa(rand.Intn(999999))
|
||||||
|
if _, ok := pool.networksFree[network]; !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v := url.Values{}
|
||||||
|
v.Set("name", network)
|
||||||
|
resp, err := doRequest("POST", "http://yao-agent-master:8000/network_create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "")
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
pool.networksFree[network] = true
|
||||||
|
pool.networks[network] = true
|
||||||
|
}
|
||||||
|
pool.networkMu.Lock()
|
||||||
|
for k := range pool.networksFree {
|
||||||
|
network = k
|
||||||
|
delete(pool.networksFree, k)
|
||||||
|
}
|
||||||
|
pool.networkMu.Unlock()
|
||||||
|
return network
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pool *ResourcePool) releaseNetwork(network string) {
|
||||||
|
pool.networkMu.Lock()
|
||||||
|
pool.networksFree[network] = true
|
||||||
|
pool.networkMu.Unlock()
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user