From 7e1c4349b9a0a3b4dc0543bd7b04f788251dc001 Mon Sep 17 00:00:00 2001 From: Newnius Date: Mon, 13 Apr 2020 22:35:17 +0800 Subject: [PATCH] add concurrent --- src/main.go | 1 - src/resource_pool.go | 87 ++++++++++++++++++++++++++------------- src/scheduler_FCFS.go | 45 +++++++++++++------- src/scheduler_fair.go | 81 ++++++++++++++++++++++-------------- src/scheduler_priority.go | 45 +++++++++++++------- 5 files changed, 168 insertions(+), 91 deletions(-) diff --git a/src/main.go b/src/main.go index feaae38..ece56b0 100644 --- a/src/main.go +++ b/src/main.go @@ -207,7 +207,6 @@ func main() { InstanceJobHistoryLogger().init() pool = &ResourcePool{} - pool.nodes = make(map[string]NodeStatus) pool.start() switch config.SchedulerPolicy { diff --git a/src/resource_pool.go b/src/resource_pool.go index 6546a3e..e2a0b2e 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -8,11 +8,15 @@ import ( log "github.com/sirupsen/logrus" "math/rand" "strconv" + "hash/fnv" ) type ResourcePool struct { - mu sync.Mutex - nodes map[string]NodeStatus + //mu sync.Mutex + //nodes map[string]NodeStatus + pools []map[string]NodeStatus + poolsMu []sync.Mutex + poolsCount int history []PoolStatus @@ -33,6 +37,12 @@ type ResourcePool struct { utils map[string][]int } +func (pool *ResourcePool) getNodePool(name string) int { + h := fnv.New32a() + h.Write([]byte(name)) + return int(h.Sum32()) % pool.poolsCount +} + func (pool *ResourcePool) start() { //TODO: retrieve networks from yao-agent-master in blocking io pool.networks = map[string]bool{} @@ -42,6 +52,12 @@ func (pool *ResourcePool) start() { pool.bindings = map[string]map[string]bool{} pool.utils = map[string][]int{} + pool.poolsCount = 10 + for i := 0; i < pool.poolsCount; i++ { + pool.pools = append(pool.pools, map[string]NodeStatus{}) + pool.poolsMu = append(pool.poolsMu, sync.Mutex{}) + } + /* check dead nodes */ go func() { pool.heartBeat = map[string]time.Time{} @@ -50,10 +66,11 @@ func (pool *ResourcePool) start() { pool.heartBeatMu.Lock() for k, v := range pool.heartBeat { if v.Add(time.Second * 30).Before(time.Now()) { - pool.mu.Lock() - delete(pool.nodes, k) + poolID := pool.getNodePool(k) + pool.poolsMu[poolID].Lock() + delete(pool.pools[poolID], k) delete(pool.versions, k) - pool.mu.Unlock() + pool.poolsMu[poolID].Unlock() } } pool.heartBeatMu.Unlock() @@ -78,24 +95,27 @@ func (pool *ResourcePool) start() { UtilGPU := 0 TotalMemGPU := 0 AvailableMemGPU := 0 - pool.mu.Lock() - for _, node := range pool.nodes { - UtilCPU += node.UtilCPU - TotalCPU += node.NumCPU - TotalMem += node.MemTotal - AvailableMem += node.MemAvailable + nodesCount := 0 + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + for _, node := range pool.pools[i] { + UtilCPU += node.UtilCPU + TotalCPU += node.NumCPU + TotalMem += node.MemTotal + AvailableMem += node.MemAvailable - for _, GPU := range node.Status { - UtilGPU += GPU.UtilizationGPU - TotalGPU ++ - TotalMemGPU += GPU.MemoryTotal - AvailableMemGPU += GPU.MemoryFree + for _, GPU := range node.Status { + UtilGPU += GPU.UtilizationGPU + TotalGPU ++ + TotalMemGPU += GPU.MemoryTotal + AvailableMemGPU += GPU.MemoryFree + } } + nodesCount += len(pool.pools[i]) + pool.poolsMu[i].Unlock() } - size := len(pool.nodes) - pool.mu.Unlock() summary.TimeStamp = time.Now().Format("2006-01-02 15:04:05") - summary.UtilCPU = UtilCPU / (float64(size) + 0.001) + summary.UtilCPU = UtilCPU / (float64(nodesCount) + 0.001) summary.TotalCPU = TotalCPU summary.TotalMem = TotalMem summary.AvailableMem = AvailableMem @@ -119,8 +139,10 @@ func (pool *ResourcePool) start() { } func (pool *ResourcePool) update(node NodeStatus) { - pool.mu.Lock() - defer pool.mu.Unlock() + poolID := pool.getNodePool(node.ClientID) + + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() go func(node NodeStatus) { pool.bindingsMu.Lock() @@ -145,7 +167,7 @@ func (pool *ResourcePool) update(node NodeStatus) { log.Debug(node.Version, "!=", pool.versions[node.ClientID]) pool.counter++ - status, ok := pool.nodes[node.ClientID] + status, ok := pool.pools[poolID][node.ClientID] if ok { for i, GPU := range status.Status { if GPU.UUID == node.Status[i].UUID { @@ -153,16 +175,17 @@ func (pool *ResourcePool) update(node NodeStatus) { } } } - pool.nodes[node.ClientID] = node + pool.pools[poolID][node.ClientID] = node pool.versions[node.ClientID] = node.Version - log.Debug(pool.nodes) } func (pool *ResourcePool) getByID(id string) NodeStatus { - pool.mu.Lock() - defer pool.mu.Unlock() + poolID := pool.getNodePool(id) - status, ok := pool.nodes[id] + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() + + status, ok := pool.pools[poolID][id] if ok { return status } @@ -170,7 +193,15 @@ func (pool *ResourcePool) getByID(id string) NodeStatus { } func (pool *ResourcePool) list() MsgResource { - return MsgResource{Code: 0, Resource: pool.nodes} + nodes := map[string]NodeStatus{} + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + for k, node := range pool.pools[i] { + nodes[k] = node + } + pool.poolsMu[i].Unlock() + } + return MsgResource{Code: 0, Resource: nodes} } func (pool *ResourcePool) statusHistory() MsgPoolStatusHistory { diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index a4e05f8..b90bd68 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -4,6 +4,7 @@ import ( "sync" "time" log "github.com/sirupsen/logrus" + "math/rand" ) type SchedulerFCFS struct { @@ -86,11 +87,12 @@ func (scheduler *SchedulerFCFS) Schedule(job Job) { } func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus { - pool.mu.Lock() - defer pool.mu.Unlock() + poolID := rand.Intn(pool.poolsCount) + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() res := NodeStatus{} - for id, node := range pool.nodes { + for id, node := range pool.pools[poolID] { var available []GPUStatus for _, status := range node.Status { if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { @@ -101,6 +103,8 @@ func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus { res.ClientID = id res.ClientHost = node.ClientHost res.Status = available[0:task.NumberGPU] + res.NumCPU = task.NumberCPU + res.MemTotal = task.Memory for i := range res.Status { for j := range node.Status { @@ -117,13 +121,20 @@ func (scheduler *SchedulerFCFS) AcquireResource(job Job, task Task) NodeStatus { } func (scheduler *SchedulerFCFS) ReleaseResource(job Job, agent NodeStatus) { - pool.mu.Lock() - defer pool.mu.Unlock() - nodes := pool.nodes[agent.ClientID] + poolID := rand.Intn(pool.poolsCount) + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() + + node := pool.pools[poolID][agent.ClientID] for _, gpu := range agent.Status { - for j := range nodes.Status { - if gpu.UUID == nodes.Status[j].UUID { - nodes.Status[j].MemoryAllocated -= gpu.MemoryTotal + for j := range node.Status { + if gpu.UUID == node.Status[j].UUID { + node.Status[j].MemoryAllocated -= gpu.MemoryTotal + if node.Status[j].MemoryAllocated < 0 { + // in case of error + log.Warn(node.ClientID, "More Memory Allocated") + node.Status[j].MemoryAllocated = 0 + } } } } @@ -199,14 +210,18 @@ func (scheduler *SchedulerFCFS) Summary() MsgSummary { FreeGPU := 0 UsingGPU := 0 - for _, node := range pool.nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + for _, node := range pool.pools[i] { + for j := range node.Status { + if node.Status[j].MemoryAllocated == 0 { + FreeGPU++ + } else { + UsingGPU++ + } } } + pool.poolsMu[i].Unlock() } summary.FreeGPU = FreeGPU summary.UsingGPU = UsingGPU diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 11438e0..8a1f007 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -5,6 +5,7 @@ import ( "time" log "github.com/sirupsen/logrus" "sort" + "math/rand" ) type ResourceCount struct { @@ -17,8 +18,9 @@ type ResourceCount struct { type SchedulerFair struct { history []*Job queues map[string][]Job - mu sync.Mutex - scheduling sync.Mutex + queueMu sync.Mutex + schedulingMu sync.Mutex + schedulingJobsCnt int jobs map[string]*JobManager nextQueue string resourceAllocations map[string]*ResourceCount @@ -46,6 +48,7 @@ func (scheduler *SchedulerFair) Start() { scheduler.queues["default"] = []Job{} scheduler.resourceAllocations = map[string]*ResourceCount{} scheduler.enabled = true + scheduler.schedulingJobsCnt = 0 go func() { for { @@ -54,8 +57,14 @@ func (scheduler *SchedulerFair) Start() { if !scheduler.enabled { continue } - scheduler.scheduling.Lock() - scheduler.mu.Lock() + scheduler.schedulingMu.Lock() + if scheduler.schedulingJobsCnt >= pool.poolsCount { + scheduler.schedulingMu.Unlock() + continue + } + scheduler.schedulingJobsCnt++ + scheduler.schedulingMu.Unlock() + scheduler.queueMu.Lock() queue := scheduler.nextQueue if len(scheduler.queues[queue]) > 0 { jm := JobManager{} @@ -72,13 +81,12 @@ func (scheduler *SchedulerFair) Start() { jm.start() }() } else { - log.Info("No more jobs to scheduling") - scheduler.scheduling.Unlock() + log.Info("No more jobs to scheduling", time.Now()) go func() { scheduler.UpdateNextQueue() }() } - scheduler.mu.Unlock() + scheduler.queueMu.Unlock() } }() } @@ -86,7 +94,9 @@ func (scheduler *SchedulerFair) Start() { func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { switch state { case Running: - scheduler.scheduling.Unlock() + scheduler.schedulingMu.Lock() + scheduler.schedulingJobsCnt-- + scheduler.schedulingMu.Unlock() for i := range scheduler.history { if scheduler.history[i].Name == jobName { @@ -115,8 +125,8 @@ func (scheduler *SchedulerFair) UpdateProgress(jobName string, state State) { } func (scheduler *SchedulerFair) Schedule(job Job) { - scheduler.mu.Lock() - defer scheduler.mu.Unlock() + scheduler.queueMu.Lock() + defer scheduler.queueMu.Unlock() queue := job.Group _, ok := scheduler.queues[queue] @@ -156,11 +166,12 @@ func (scheduler *SchedulerFair) Schedule(job Job) { } func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { - pool.mu.Lock() - defer pool.mu.Unlock() + poolID := rand.Intn(pool.poolsCount) + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() res := NodeStatus{} - for id, node := range pool.nodes { + for id, node := range pool.pools[poolID] { var available []GPUStatus for _, status := range node.Status { if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { @@ -206,9 +217,11 @@ func (scheduler *SchedulerFair) AcquireResource(job Job, task Task) NodeStatus { } func (scheduler *SchedulerFair) ReleaseResource(job Job, agent NodeStatus) { - pool.mu.Lock() - defer pool.mu.Unlock() - node := pool.nodes[agent.ClientID] + poolID := rand.Intn(pool.poolsCount) + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() + + node := pool.pools[poolID][agent.ClientID] for _, gpu := range agent.Status { for j := range node.Status { if gpu.UUID == node.Status[j].UUID { @@ -313,17 +326,19 @@ func (scheduler *SchedulerFair) Summary() MsgSummary { FreeGPU := 0 UsingGPU := 0 - pool.mu.Lock() - for _, node := range pool.nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + for _, node := range pool.pools[i] { + for j := range node.Status { + if node.Status[j].MemoryAllocated == 0 { + FreeGPU++ + } else { + UsingGPU++ + } } } + pool.poolsMu[i].Unlock() } - pool.mu.Unlock() summary.FreeGPU = FreeGPU summary.UsingGPU = UsingGPU @@ -346,16 +361,18 @@ func (scheduler *SchedulerFair) UpdateNextQueue() { MemoryGPU := 0.00001 CPU := 0.00001 Memory := 0.0001 - pool.mu.Lock() - for _, node := range pool.nodes { - CPU += float64(node.NumCPU) - Memory += float64(node.MemTotal) - for _, card := range node.Status { - NumberGPU += 1.0 - MemoryGPU += float64(card.MemoryTotal) + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + for _, node := range pool.pools[i] { + CPU += float64(node.NumCPU) + Memory += float64(node.MemTotal) + for _, card := range node.Status { + NumberGPU += 1.0 + MemoryGPU += float64(card.MemoryTotal) + } } + pool.poolsMu[i].Unlock() } - pool.mu.Unlock() for k, t := range scheduler.queues { if len(t) == 0 { diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index c637809..1cd5d56 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -4,6 +4,7 @@ import ( "sync" "time" log "github.com/sirupsen/logrus" + "math/rand" ) type SchedulerPriority struct { @@ -110,11 +111,12 @@ func (scheduler *SchedulerPriority) Schedule(job Job) { } func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStatus { - pool.mu.Lock() - defer pool.mu.Unlock() + poolID := rand.Intn(pool.poolsCount) + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() res := NodeStatus{} - for id, node := range pool.nodes { + for id, node := range pool.pools[poolID] { var available []GPUStatus for _, status := range node.Status { if status.MemoryTotal-status.MemoryAllocated >= task.MemoryGPU { @@ -125,6 +127,8 @@ func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStat res.ClientID = id res.ClientHost = node.ClientHost res.Status = available[0:task.NumberGPU] + res.NumCPU = task.NumberCPU + res.MemTotal = task.Memory for i := range res.Status { for j := range node.Status { @@ -141,13 +145,20 @@ func (scheduler *SchedulerPriority) AcquireResource(job Job, task Task) NodeStat } func (scheduler *SchedulerPriority) ReleaseResource(job Job, agent NodeStatus) { - pool.mu.Lock() - defer pool.mu.Unlock() - nodes := pool.nodes[agent.ClientID] + poolID := rand.Intn(pool.poolsCount) + pool.poolsMu[poolID].Lock() + defer pool.poolsMu[poolID].Unlock() + + node := pool.pools[poolID][agent.ClientID] for _, gpu := range agent.Status { - for j := range nodes.Status { - if gpu.UUID == nodes.Status[j].UUID { - nodes.Status[j].MemoryAllocated -= gpu.MemoryTotal + for j := range node.Status { + if gpu.UUID == node.Status[j].UUID { + node.Status[j].MemoryAllocated -= gpu.MemoryTotal + if node.Status[j].MemoryAllocated < 0 { + // in case of error + log.Warn(node.ClientID, "More Memory Allocated") + node.Status[j].MemoryAllocated = 0 + } } } } @@ -223,14 +234,18 @@ func (scheduler *SchedulerPriority) Summary() MsgSummary { FreeGPU := 0 UsingGPU := 0 - for _, node := range pool.nodes { - for j := range node.Status { - if node.Status[j].MemoryAllocated == 0 { - FreeGPU++ - } else { - UsingGPU++ + for i := 0; i < pool.poolsCount; i++ { + pool.poolsMu[i].Lock() + for _, node := range pool.pools[i] { + for j := range node.Status { + if node.Status[j].MemoryAllocated == 0 { + FreeGPU++ + } else { + UsingGPU++ + } } } + pool.poolsMu[i].Unlock() } summary.FreeGPU = FreeGPU summary.UsingGPU = UsingGPU