From 9dd9c50002a75509305e55898a4beafa6b0c99ca Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 5 Jun 2019 17:09:22 +0800 Subject: [PATCH] [WIP] add distributed job support --- src/AllocatorFIFO.go | 8 ++++++++ src/job_manager.go | 8 ++++++++ src/resource_pool.go | 47 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/src/AllocatorFIFO.go b/src/AllocatorFIFO.go index abdf1b8..453e1bd 100644 --- a/src/AllocatorFIFO.go +++ b/src/AllocatorFIFO.go @@ -194,3 +194,11 @@ func (allocator *AllocatorFIFO) summary() MsgSummary { return summary } + +func (allocator *AllocatorFIFO) acquireNetwork() string { + return pool.acquireNetwork() +} + +func (allocator *AllocatorFIFO) releaseNetwork(network string) { + pool.releaseNetwork(network) +} diff --git a/src/job_manager.go b/src/job_manager.go index 6d65ff7..0ceeaf4 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "encoding/json" "fmt" + "strconv" ) type JobManager struct { @@ -21,6 +22,8 @@ func (jm *JobManager) start() { log.Println("start job ", jm.job.Name) jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} + network := allocator.acquireNetwork() + /* request for resources */ for i := range jm.job.Tasks { var resource NodeStatus @@ -48,6 +51,9 @@ func (jm *JobManager) start() { v.Set("name", jm.job.Tasks[i].Name) v.Set("workspace", jm.job.Workspace) v.Set("gpus", strings.Join(GPUs, ",")) + v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[i].Memory)+"m") + v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[i].NumberCPU)) + v.Set("network", network) fmt.Println(v.Encode()) @@ -102,6 +108,8 @@ func (jm *JobManager) start() { time.Sleep(time.Second * 10) } + allocator.releaseNetwork(network) + jm.allocator.finish(&jm.job) log.Println("finish job", jm.job.Name) } diff --git a/src/resource_pool.go b/src/resource_pool.go index 718a92b..321a4bf 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -3,6 +3,11 @@ package main import ( "sync" "time" + "net/url" + "strings" + "log" + "math/rand" + "strconv" ) type ResourcePool struct { @@ -12,9 +17,16 @@ type ResourcePool struct { history []PoolStatus heartBeat map[string]time.Time + + networks map[string]bool + networksFree map[string]bool + networkMu sync.Mutex } func (pool *ResourcePool) start() { + pool.networks = map[string]bool{} + pool.networksFree = map[string]bool{} + /* check dead nodes */ go func() { pool.heartBeat = map[string]time.Time{} @@ -119,3 +131,38 @@ func (pool *ResourcePool) list() MsgResource { func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory { return MsgPoolStatusHistory{Code: 0, Data: pool.history} } + +func (pool *ResourcePool) acquireNetwork() string { + var network string + if len(pool.networksFree) == 0 { + for { + network = "yao-net-" + strconv.Itoa(rand.Intn(999999)) + if _, ok := pool.networksFree[network]; !ok { + break + } + } + v := url.Values{} + v.Set("name", network) + resp, err := doRequest("POST", "http://yao-agent-master:8000/network_create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + if err != nil { + log.Println(err.Error()) + return "" + } + defer resp.Body.Close() + pool.networksFree[network] = true + pool.networks[network] = true + } + pool.networkMu.Lock() + for k := range pool.networksFree { + network = k + delete(pool.networksFree, k) + } + pool.networkMu.Unlock() + return network +} + +func (pool *ResourcePool) releaseNetwork(network string) { + pool.networkMu.Lock() + pool.networksFree[network] = true + pool.networkMu.Unlock() +}