From 731f173503cfa25b7b0f2ea30e60174077bb195f Mon Sep 17 00:00:00 2001 From: Newnius Date: Tue, 21 Jul 2020 21:41:11 +0800 Subject: [PATCH] refactor --- src/configuration.go | 122 ++++++++--------- src/ga_test.go | 2 +- src/group.go | 2 +- src/history_logger.go | 2 +- src/job_manager.go | 268 ++++++++++++++++++++------------------ src/logger.go | 57 ++++++-- src/main.go | 152 +++++++++++---------- src/mocker.go | 8 +- src/optimizer.go | 2 +- src/pool_test.go | 4 +- src/resource_pool.go | 136 +++++-------------- src/scheduler.go | 1 + src/scheduler_FCFS.go | 2 +- src/scheduler_capacity.go | 2 +- src/scheduler_fair.go | 6 +- src/scheduler_priority.go | 2 +- src/spider.go | 16 ++- src/util.go | 42 +----- 18 files changed, 388 insertions(+), 438 deletions(-) diff --git a/src/configuration.go b/src/configuration.go index 8dc49b0..27f41eb 100644 --- a/src/configuration.go +++ b/src/configuration.go @@ -52,70 +52,72 @@ func InstanceOfConfiguration() *Configuration { EnablePreScheduleRatio: 1.5, PreScheduleExtraTime: 15, } - - /* override conf value from env */ - value := os.Getenv("KafkaBrokers") - if len(value) != 0 { - configurationInstance.KafkaBrokers = strings.Split(value, ",") - } - value = os.Getenv("KafkaTopic") - if len(value) != 0 { - configurationInstance.KafkaTopic = value - } - value = os.Getenv("SchedulerPolicy") - if len(value) != 0 { - configurationInstance.SchedulerPolicy = value - } - value = os.Getenv("ListenAddr") - if len(value) != 0 { - configurationInstance.ListenAddr = value - } - value = os.Getenv("HDFSAddress") - if len(value) != 0 { - configurationInstance.HDFSAddress = value - } - value = os.Getenv("HDFSBaseDir") - if len(value) != 0 { - configurationInstance.HDFSBaseDir = value - } - value = os.Getenv("DFSBaseDir") - if len(value) != 0 { - configurationInstance.DFSBaseDir = value - } - value = os.Getenv("EnableShareRatio") - if len(value) != 0 { - if val, err := strconv.ParseFloat(value, 32); err == nil { - configurationInstance.EnableShareRatio = val - } - } - value = os.Getenv("ShareMaxUtilization") - if len(value) != 0 { - if val, err := strconv.ParseFloat(value, 32); err == nil { - configurationInstance.ShareMaxUtilization = val - } - } - value = os.Getenv("EnablePreScheduleRatio") - if len(value) != 0 { - if val, err := strconv.ParseFloat(value, 32); err == nil { - configurationInstance.EnablePreScheduleRatio = val - } - } - value = os.Getenv("PreScheduleExtraTime") - if len(value) != 0 { - if val, err := strconv.Atoi(value); err == nil { - configurationInstance.PreScheduleExtraTime = val - } - } - value = os.Getenv("PreScheduleTimeout") - if len(value) != 0 { - if val, err := strconv.Atoi(value); err == nil { - configurationInstance.PreScheduleTimeout = val - } - } } return configurationInstance } +/* read conf value from env */ +func (config *Configuration) InitFromEnv() { + value := os.Getenv("KafkaBrokers") + if len(value) != 0 { + configurationInstance.KafkaBrokers = strings.Split(value, ",") + } + value = os.Getenv("KafkaTopic") + if len(value) != 0 { + configurationInstance.KafkaTopic = value + } + value = os.Getenv("SchedulerPolicy") + if len(value) != 0 { + configurationInstance.SchedulerPolicy = value + } + value = os.Getenv("ListenAddr") + if len(value) != 0 { + configurationInstance.ListenAddr = value + } + value = os.Getenv("HDFSAddress") + if len(value) != 0 { + configurationInstance.HDFSAddress = value + } + value = os.Getenv("HDFSBaseDir") + if len(value) != 0 { + configurationInstance.HDFSBaseDir = value + } + value = os.Getenv("DFSBaseDir") + if len(value) != 0 { + configurationInstance.DFSBaseDir = value + } + value = os.Getenv("EnableShareRatio") + if len(value) != 0 { + if val, err := strconv.ParseFloat(value, 32); err == nil { + configurationInstance.EnableShareRatio = val + } + } + value = os.Getenv("ShareMaxUtilization") + if len(value) != 0 { + if val, err := strconv.ParseFloat(value, 32); err == nil { + configurationInstance.ShareMaxUtilization = val + } + } + value = os.Getenv("EnablePreScheduleRatio") + if len(value) != 0 { + if val, err := strconv.ParseFloat(value, 32); err == nil { + configurationInstance.EnablePreScheduleRatio = val + } + } + value = os.Getenv("PreScheduleExtraTime") + if len(value) != 0 { + if val, err := strconv.Atoi(value); err == nil { + configurationInstance.PreScheduleExtraTime = val + } + } + value = os.Getenv("PreScheduleTimeout") + if len(value) != 0 { + if val, err := strconv.Atoi(value); err == nil { + configurationInstance.PreScheduleTimeout = val + } + } +} + func (config *Configuration) SetMockEnabled(enabled bool) bool { config.mu.Lock() defer config.mu.Unlock() diff --git a/src/ga_test.go b/src/ga_test.go index 6a53197..828132d 100644 --- a/src/ga_test.go +++ b/src/ga_test.go @@ -64,7 +64,7 @@ func TestGA(t *testing.T) { allocation = InstanceOfAllocator().fastBestFit(nodes, tasks) - InstanceOfResourcePool().init(Configuration{}) + InstanceOfResourcePool().Start() allocatedNodes := InstanceOfResourcePool().acquireResource(Job{Tasks: tasks}) log.Info(allocatedNodes) } diff --git a/src/group.go b/src/group.go index 201356c..d48ef64 100644 --- a/src/group.go +++ b/src/group.go @@ -21,7 +21,7 @@ func InstanceOfGroupManager() *GroupManager { return groupManagerInstance } -func (gm *GroupManager) init(conf Configuration) { +func (gm *GroupManager) Start() { } diff --git a/src/history_logger.go b/src/history_logger.go index a91e6ba..106896f 100644 --- a/src/history_logger.go +++ b/src/history_logger.go @@ -27,7 +27,7 @@ func InstanceJobHistoryLogger() *JobHistoryLogger { return jobHistoryLoggerInstance } -func (jhl *JobHistoryLogger) init(conf Configuration) { +func (jhl *JobHistoryLogger) Start() { log.Info("jhl init") jhl.jobs = map[string]Job{} jhl.tasks = map[string][]TaskStatus{} diff --git a/src/job_manager.go b/src/job_manager.go index 933989f..a907d1c 100644 --- a/src/job_manager.go +++ b/src/job_manager.go @@ -12,77 +12,85 @@ import ( ) type JobManager struct { - scheduler Scheduler - job Job - jobStatus JobStatus - resources []NodeStatus + /* meta */ + scheduler Scheduler + job Job + + /* resource */ + network string + resources map[string]NodeStatus resourcesMu sync.Mutex - isRunning bool - killFlag bool - network string + /* status */ + jobStatus JobStatus + isRunning bool + lastHeartBeat int64 + /* history info */ stats [][]TaskStatus } func (jm *JobManager) start() { log.Info("start job ", jm.job.Name, " at ", time.Now()) - jm.isRunning = false - jm.killFlag = false + jm.isRunning = true + jm.lastHeartBeat = time.Now().Unix() jm.jobStatus = JobStatus{Name: jm.job.Name, tasks: map[string]TaskStatus{}} + jm.resources = map[string]NodeStatus{} /* register in JHL */ InstanceJobHistoryLogger().submitJob(jm.job) - /* request for private network */ - jm.network = InstanceOfResourcePool().acquireNetwork() - /* request for resources */ + jm.resourcesMu.Lock() + jm.network = InstanceOfResourcePool().acquireNetwork() for { - if jm.killFlag { + if !jm.isRunning { break } - jm.resources = jm.scheduler.AcquireResource(jm.job) - if len(jm.resources) > 0 { + resources := jm.scheduler.AcquireResource(jm.job) + if len(resources) > 0 { + for i, node := range resources { + jm.resources[jm.job.Tasks[i].Name] = node + } log.Info(jm.job.Name, " receive resource", jm.resources) break } /* sleep random Millisecond to avoid deadlock */ time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500))) } - jm.job.StartedAt = time.Now().Unix() + jm.resourcesMu.Unlock() if InstanceOfConfiguration().mock { - jm.scheduler.UpdateProgress(jm.job, Running) - jm.job.Status = Running - jm.isRunning = false - duration := InstanceOfMocker().GetDuration(jm.job, jm.resources) - log.Info("mock ", jm.job.Name, ", wait ", duration) - time.Sleep(time.Second * time.Duration(duration)) - jm.returnResource([]TaskStatus{}) - jm.scheduler.UpdateProgress(jm.job, Finished) - jm.job.Status = Finished + if jm.isRunning { + jm.scheduler.UpdateProgress(jm.job, Running) + duration := InstanceOfMocker().GetDuration(jm.job, jm.resources) + log.Info("mock ", jm.job.Name, ", wait ", duration) + time.Sleep(time.Second * time.Duration(duration)) + jm.isRunning = false + jm.scheduler.UpdateProgress(jm.job, Finished) + } + jm.returnResource() log.Info("JobMaster exited ", jm.job.Name) return } isShare := false isScheduleAhead := false - if !jm.killFlag { + if jm.isRunning { /* switch to Running state */ jm.scheduler.UpdateProgress(jm.job, Running) - jm.job.Status = Running /* bring up containers */ wg := sync.WaitGroup{} - for i := range jm.job.Tasks { + success := true + for i, task := range jm.job.Tasks { wg.Add(1) - go func(index int) { + go func(task Task, node NodeStatus) { defer wg.Done() var UUIDs []string shouldWait := "0" - for _, GPU := range jm.resources[index].Status { + for _, GPU := range node.Status { UUIDs = append(UUIDs, GPU.UUID) if GPU.MemoryUsed == GPU.MemoryTotal { shouldWait = "1" @@ -96,40 +104,44 @@ func (jm *JobManager) start() { GPUs := strings.Join(UUIDs, ",") v := url.Values{} - v.Set("image", jm.job.Tasks[index].Image) - v.Set("cmd", jm.job.Tasks[index].Cmd) - v.Set("name", jm.job.Tasks[index].Name) + v.Set("image", task.Image) + v.Set("cmd", task.Cmd) + v.Set("name", task.Name) v.Set("workspace", jm.job.Workspace) v.Set("gpus", GPUs) - v.Set("mem_limit", strconv.Itoa(jm.job.Tasks[index].Memory)+"m") - v.Set("cpu_limit", strconv.Itoa(jm.job.Tasks[index].NumberCPU)) + v.Set("mem_limit", strconv.Itoa(task.Memory)+"m") + v.Set("cpu_limit", strconv.Itoa(task.NumberCPU)) v.Set("network", jm.network) v.Set("should_wait", shouldWait) v.Set("output_dir", "/tmp/") v.Set("hdfs_address", InstanceOfConfiguration().HDFSAddress) v.Set("hdfs_dir", InstanceOfConfiguration().HDFSBaseDir+jm.job.Name) - v.Set("gpu_mem", strconv.Itoa(jm.job.Tasks[index].MemoryGPU)) + v.Set("gpu_mem", strconv.Itoa(task.MemoryGPU)) if InstanceOfConfiguration().DFSBaseDir != "" { - v.Set("dfs_src", InstanceOfConfiguration().DFSBaseDir+jm.job.Name+"/task-"+strconv.Itoa(index)) + v.Set("dfs_src", InstanceOfConfiguration().DFSBaseDir+jm.job.Name+"/task-"+task.Name) } else { v.Set("dfs_src", "") } v.Set("dfs_dst", "/tmp") - resp, err := doRequest("POST", "http://"+jm.resources[index].ClientHost+":8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + spider := Spider{} + spider.Method = "POST" + spider.URL = "http://" + node.ClientHost + ":8000/create" + spider.Data = v + spider.ContentType = "application/x-www-form-urlencoded" + err := spider.do() if err != nil { log.Warn(err.Error()) - jm.job.Status = Failed - jm.stop(false) + success = false return } + resp := spider.getResponse() body, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { log.Warn(err) - jm.job.Status = Failed - jm.stop(false) + success = false return } @@ -137,28 +149,36 @@ func (jm *JobManager) start() { err = json.Unmarshal([]byte(string(body)), &res) if err != nil || res.Code != 0 { log.Warn(res) - jm.job.Status = Failed - jm.stop(false) + success = false return } - jm.jobStatus.tasks[jm.job.Tasks[index].Name] = TaskStatus{Id: res.Id, Node: jm.resources[index].ClientHost, HostName: jm.job.Tasks[i].Name} - }(i) + taskStatus := TaskStatus{Id: res.Id, Node: node.ClientHost, HostName: jm.job.Tasks[i].Name} + jm.jobStatus.tasks[task.Name] = taskStatus + + }(task, jm.resources[task.Name]) } wg.Wait() - jm.isRunning = true + /* start failed */ + if !success { + jm.isRunning = false + jm.scheduler.UpdateProgress(jm.job, Failed) + jm.stop() + } } /* monitor job execution */ for { - //jm.status() - if !jm.isRunning || jm.killFlag { + if !jm.isRunning { break } + if time.Now().Unix()-jm.lastHeartBeat > 30 { + log.Warn(jm.job.Name, " heartbeat longer tha 30s") + } time.Sleep(time.Second * 1) } - /* make sure resources are released */ - jm.returnResource(jm.status().Status) + /* release again to make sure resources are released */ + jm.returnResource() /* feed data to optimizer */ isExclusive := InstanceOfResourcePool().isExclusive(jm.job.Name) @@ -197,47 +217,50 @@ func (jm *JobManager) start() { if len(jm.job.Tasks) == 1 && !isShare && !isScheduleAhead && jm.job.Status == Finished && isExclusive { InstanceOfOptimizer().FeedTime(jm.job, stats) } + + /* clear, to reduce memory usage */ + jm.stats = [][]TaskStatus{} + + /* remove exited containers */ + //for _, task := range jm.jobStatus.tasks { + // go func(container TaskStatus) { + // v := url.Values{} + // v.Set("id", container.Id) + // + // spider := Spider{} + // spider.Method = "POST" + // spider.URL = "http://" + container.Node + ":8000/remove" + // spider.Data = v + // spider.ContentType = "application/x-www-form-urlencoded" + // err := spider.do() + // if err != nil { + // log.Warn(err.Error()) + // } + // }(task) + //} + log.Info("JobMaster exited ", jm.job.Name) } /* release all resource */ -func (jm *JobManager) returnResource(status []TaskStatus) { +func (jm *JobManager) returnResource() { jm.resourcesMu.Lock() defer jm.resourcesMu.Unlock() /* return resource */ for i := range jm.resources { - if jm.resources[i].ClientID == "_released_" { - continue - } jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) - log.Info("return resource again ", jm.resources[i].ClientID) - jm.resources[i].ClientID = "_released_" - for _, t := range jm.resources[i].Status { InstanceOfResourcePool().detach(t.UUID, jm.job) } - - if !InstanceOfConfiguration().mock { - InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) - } - - /* remove exited containers */ - //v := url.Values{} - //v.Set("id", res.Status[i].Id) - // - //_, err := doRequest("POST", "http://"+res.Status[i].Node+":8000/remove", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") - //if err != nil { - // log.Warn(err.Error()) - // continue - //} } + jm.resources = map[string]NodeStatus{} if jm.network != "" { InstanceOfResourcePool().releaseNetwork(jm.network) jm.network = "" } } -/* monitor all tasks */ +/* monitor all tasks, update job status */ func (jm *JobManager) checkStatus(status []TaskStatus) { if !jm.isRunning { return @@ -245,77 +268,57 @@ func (jm *JobManager) checkStatus(status []TaskStatus) { flagRunning := false onlyPS := true for i := range status { - if status[i].Status == "ready" { - log.Debug(jm.job.Name, "-", i, " is ready to run") - flagRunning = true - if !jm.job.Tasks[i].IsPS { - onlyPS = false - } - } else if status[i].Status == "running" { - log.Debug(jm.job.Name, "-", i, " is running") + if status[i].Status == "ready" || status[i].Status == "running" { flagRunning = true if !jm.job.Tasks[i].IsPS { onlyPS = false } InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) } else if status[i].Status == "unknown" { - log.Warn(jm.job.Name, "-", i, " is unknown") flagRunning = true if !jm.job.Tasks[i].IsPS { onlyPS = false } - //InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) } else { - jm.resourcesMu.Lock() - if jm.resources[i].ClientID == "_released_" { - jm.resourcesMu.Unlock() - continue - } log.Info(jm.job.Name, "-", i, " ", status[i].Status) - if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && !jm.killFlag { + if exitCode, ok := status[i].State["ExitCode"].(float64); ok && exitCode != 0 && jm.isRunning { log.Warn(jm.job.Name+"-"+jm.job.Tasks[i].Name+" exited unexpected, exitCode=", exitCode) - jm.stop(false) - jm.killFlag = true + jm.isRunning = false jm.scheduler.UpdateProgress(jm.job, Failed) - jm.job.Status = Failed - } else if !jm.killFlag { + jm.stop() + } else if jm.isRunning { log.Info("Some instance exited, close others") - jm.stop(false) - jm.killFlag = true + jm.isRunning = false jm.scheduler.UpdateProgress(jm.job, Finished) - jm.job.Status = Finished + jm.stop() } - if jm.resources[i].ClientID != "_released_" { - jm.scheduler.ReleaseResource(jm.job, jm.resources[i]) - log.Info("return resource ", jm.resources[i].ClientID) - jm.resources[i].ClientID = "_released_" + jm.resourcesMu.Lock() + nodeID := jm.job.Tasks[i].Name + if _, ok := jm.resources[nodeID]; ok { + jm.scheduler.ReleaseResource(jm.job, jm.resources[nodeID]) + log.Info("return resource ", jm.resources[nodeID].ClientID) - for _, t := range jm.resources[i].Status { + for _, t := range jm.resources[nodeID].Status { InstanceOfResourcePool().detach(t.UUID, jm.job) } InstanceJobHistoryLogger().submitTaskStatus(jm.job.Name, status[i]) + delete(jm.resources, nodeID) } jm.resourcesMu.Unlock() } } - if flagRunning && onlyPS && !jm.killFlag { + if flagRunning && onlyPS && jm.isRunning { log.Info("Only PS is running, stop ", jm.job.Name) - jm.stop(false) - jm.killFlag = true - jm.scheduler.UpdateProgress(jm.job, Finished) - jm.job.Status = Finished - } - - if !flagRunning && !jm.killFlag { - jm.scheduler.UpdateProgress(jm.job, Finished) - jm.job.Status = Finished - log.Info("finish job ", jm.job.Name) - } - - if !flagRunning { jm.isRunning = false - jm.returnResource(status) + jm.scheduler.UpdateProgress(jm.job, Finished) + jm.stop() + } + + if !flagRunning && jm.isRunning { + log.Info("finish job ", jm.job.Name) + jm.isRunning = false + jm.scheduler.UpdateProgress(jm.job, Finished) } } @@ -354,7 +357,8 @@ func (jm *JobManager) logs(taskName string) MsgLog { /* fetch job tasks status */ func (jm *JobManager) status() MsgJobStatus { var tasksStatus []TaskStatus - for range jm.job.Tasks { //append would cause uncertain order + /* create slice ahead, since append would cause uncertain order */ + for range jm.job.Tasks { tasksStatus = append(tasksStatus, TaskStatus{}) } @@ -418,18 +422,32 @@ func (jm *JobManager) status() MsgJobStatus { return MsgJobStatus{Status: tasksStatus} } -/* force stop all containers */ -func (jm *JobManager) stop(force bool) MsgStop { +func (jm *JobManager) stop() MsgStop { + if jm.isRunning { + jm.isRunning = false + jm.scheduler.UpdateProgress(jm.job, Stopped) + log.Info("kill job, ", jm.job.Name) + } + for _, taskStatus := range jm.jobStatus.tasks { /* stop at background */ go func(task TaskStatus) { + log.Info("kill ", jm.job.Name, "-", task.Id, " :", task.HostName) v := url.Values{} v.Set("id", task.Id) - resp, err := doRequest("POST", "http://"+task.Node+":8000/stop", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + spider := Spider{} + spider.Method = "POST" + spider.URL = "http://" + task.Node + ":8000/stop" + spider.Data = v + spider.ContentType = "application/x-www-form-urlencoded" + + err := spider.do() if err != nil { log.Warn(err.Error()) + return } + resp := spider.getResponse() body, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { @@ -445,17 +463,7 @@ func (jm *JobManager) stop(force bool) MsgStop { if res.Code != 0 { log.Warn(res.Error) } - log.Info(jm.job.Name, ":", task.HostName, " is killed:", task.Id) }(taskStatus) } - - go func() { - if force { - jm.killFlag = true - jm.scheduler.UpdateProgress(jm.job, Stopped) - jm.job.Status = Stopped - log.Info("kill job, ", jm.job.Name) - } - }() return MsgStop{Code: 0} } diff --git a/src/logger.go b/src/logger.go index dc033e1..05b26b2 100644 --- a/src/logger.go +++ b/src/logger.go @@ -5,7 +5,6 @@ import ( "io" "sync" "runtime" - "fmt" ) type Logger struct { @@ -19,29 +18,59 @@ func (logger *Logger) Init() { logger.LoggerModuleDisabled = map[string]bool{} } -func (logger *Logger) Debug(args ... interface{}) { - _log.Debug(args) -} - -func (logger *Logger) Info(args ... interface{}) { +func (logger *Logger) Debug(args ...interface{}) { pc, _, _, ok := runtime.Caller(1) details := runtime.FuncForPC(pc) + module := "unknown" if ok && details != nil { - fmt.Printf("called from %s\n", details.Name()) + module = details.Name() } - _log.Info(args) + args = append(args, module) + _log.Debug(args...) } -func (logger *Logger) Warn(args ... interface{}) { - _log.Warn(args) +func (logger *Logger) Info(args ...interface{}) { + pc, _, _, ok := runtime.Caller(1) + details := runtime.FuncForPC(pc) + module := "unknown" + if ok && details != nil { + module = details.Name() + } + args = append(args, module) + _log.Info(args...) } -func (logger *Logger) Fatal(args ... interface{}) { - _log.Fatal(args) +func (logger *Logger) Warn(args ...interface{}) { + pc, _, _, ok := runtime.Caller(1) + details := runtime.FuncForPC(pc) + module := "unknown" + if ok && details != nil { + module = details.Name() + } + args = append(args, module) + _log.Warn(args...) } -func (logger *Logger) Fatalf(format string, args ... interface{}) { - _log.Fatalf(format, args) +func (logger *Logger) Fatal(args ...interface{}) { + pc, _, _, ok := runtime.Caller(1) + details := runtime.FuncForPC(pc) + module := "unknown" + if ok && details != nil { + module = details.Name() + } + args = append(args, module) + _log.Fatal(args...) +} + +func (logger *Logger) Fatalf(format string, args ...interface{}) { + pc, _, _, ok := runtime.Caller(1) + details := runtime.FuncForPC(pc) + module := "unknown" + if ok && details != nil { + module = details.Name() + } + args = append(args, module) + _log.Fatalf(format, args...) } func (logger *Logger) SetOutput(f io.Writer) { diff --git a/src/main.go b/src/main.go index 79a0755..2c35c52 100644 --- a/src/main.go +++ b/src/main.go @@ -7,6 +7,7 @@ import ( "strconv" "math/rand" "os" + "fmt" ) var log Logger @@ -15,6 +16,7 @@ var scheduler Scheduler func serverAPI(w http.ResponseWriter, r *http.Request) { switch r.URL.Query().Get("action") { + /* resource pool */ case "agent_report": log.Debug("agent_report") msgAgentReport := MsgAgentReport{Code: 0} @@ -50,6 +52,28 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "pool_status_history": + log.Debug("pool_status_history") + js, _ := json.Marshal(InstanceOfResourcePool().statusHistory()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "get_counter": + log.Debug("get_counters") + js, _ := json.Marshal(InstanceOfResourcePool().getCounter()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "debug_pool_dump": + log.Debug("debug_pool_dump") + js, _ := json.Marshal(InstanceOfResourcePool().Dump()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + /* scheduler */ case "job_submit": var job Job log.Debug("job_submit") @@ -95,6 +119,42 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break + case "job_stop": + log.Debug("job_stop") + js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id")))) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "task_logs": + log.Debug("task_logs") + js, _ := json.Marshal(scheduler.QueryLogs(r.URL.Query().Get("job"), r.URL.Query().Get("task"))) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "jobs": + log.Debug("job_list") + js, _ := json.Marshal(scheduler.ListJobs()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "debug_scheduler_dump": + log.Debug("debug_scheduler_dump") + js, _ := json.Marshal(scheduler.DebugDump()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + case "summary": + log.Debug("summary") + js, _ := json.Marshal(scheduler.Summary()) + w.Header().Set("Content-Type", "application/json") + w.Write(js) + break + + /* optimizer */ case "job_predict_req": log.Debug("job_predict_req") var job Job @@ -143,48 +203,15 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break - case "job_stop": - log.Debug("job_stop") - js, _ := json.Marshal(scheduler.Stop(string(r.PostFormValue("id")))) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "task_logs": - log.Debug("task_logs") - js, _ := json.Marshal(scheduler.QueryLogs(r.URL.Query().Get("job"), r.URL.Query().Get("task"))) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "jobs": - log.Debug("job_list") - js, _ := json.Marshal(scheduler.ListJobs()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "summary": - log.Debug("summary") - js, _ := json.Marshal(scheduler.Summary()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "pool_status_history": - log.Debug("pool_status_history") - js, _ := json.Marshal(InstanceOfResourcePool().statusHistory()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "get_counter": - log.Debug("get_counters") - js, _ := json.Marshal(InstanceOfResourcePool().getCounter()) + /* job history logger */ + case "jhl_job_status": + log.Debug("jhl_job_status") + js, _ := json.Marshal(InstanceJobHistoryLogger().getTaskStatus(r.URL.Query().Get("job"))) w.Header().Set("Content-Type", "application/json") w.Write(js) break + /* group */ case "group_list": log.Debug("group_list") js, _ := json.Marshal(InstanceOfGroupManager().List()) @@ -227,7 +254,6 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { break case "group_remove": - /* TODO: rearrange jobs to other queues */ log.Debug("group_remove") var group Group msg := MsgGroupCreate{Code: 0} @@ -244,27 +270,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { w.Write(js) break - case "jhl_job_status": - log.Debug("jhl_job_status") - js, _ := json.Marshal(InstanceJobHistoryLogger().getTaskStatus(r.URL.Query().Get("job"))) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_scheduler_dump": - log.Debug("debug_scheduler_dump") - js, _ := json.Marshal(scheduler.DebugDump()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - - case "debug_pool_dump": - log.Debug("debug_pool_dump") - js, _ := json.Marshal(InstanceOfResourcePool().Dump()) - w.Header().Set("Content-Type", "application/json") - w.Write(js) - break - + /* configuration */ case "conf_list": log.Debug("conf_list") var msg MsgConfList @@ -308,8 +314,9 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { break case "pool.batch.interval": - interval := str2int(value, 1) - ok = InstanceOfResourcePool().SetBatchInterval(interval) + if interval, err := strconv.Atoi(value); err == nil { + ok = InstanceOfResourcePool().SetBatchInterval(interval) + } break /* scheduler.mock */ @@ -324,8 +331,9 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { /* scheduler.parallelism */ case "scheduler.parallelism": - parallelism, _ := strconv.Atoi(value) - ok = scheduler.UpdateParallelism(parallelism) + if parallelism, err := strconv.Atoi(value); err == nil { + ok = scheduler.UpdateParallelism(parallelism) + } break /* allocator.strategy */ @@ -333,6 +341,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { ok = InstanceOfAllocator().updateStrategy(value) break + /* logger */ case "logger.level": ok = log.SetLoggerLevel(value) break @@ -350,7 +359,7 @@ func serverAPI(w http.ResponseWriter, r *http.Request) { msg.Code = 0 if !ok { msg.Code = 1 - msg.Error = "Option not exist or invalid value" + msg.Error = fmt.Sprintf("Option (%s) not exist or invalid value (%s)", option, value) } js, _ := json.Marshal(msg) w.Header().Set("Content-Type", "application/json") @@ -381,15 +390,14 @@ func main() { log.SetOutput(f) } - /* parse configuration */ - config := *InstanceOfConfiguration() + config := InstanceOfConfiguration() + config.InitFromEnv() /* init components */ - InstanceOfResourcePool().init(config) - //InstanceOfCollector().init(config) - InstanceJobHistoryLogger().init(config) - InstanceOfOptimizer().Init(config) - InstanceOfGroupManager().init(config) + InstanceOfResourcePool().Start() + InstanceJobHistoryLogger().Start() + InstanceOfOptimizer().Start() + InstanceOfGroupManager().Start() switch config.SchedulerPolicy { case "FCFS": diff --git a/src/mocker.go b/src/mocker.go index d77220f..8674250 100644 --- a/src/mocker.go +++ b/src/mocker.go @@ -23,7 +23,7 @@ func InstanceOfMocker() *Mocker { return MockerInstance } -func (mocker *Mocker) GetDuration(job Job, nodes []NodeStatus) int { +func (mocker *Mocker) GetDuration(job Job, nodes map[string]NodeStatus) int { str := strings.Split(job.Name, "-") duration := 300 @@ -37,11 +37,11 @@ func (mocker *Mocker) GetDuration(job Job, nodes []NodeStatus) int { } else if len(job.Tasks) == 3 { var psNodes []string var workerNodes []string - for i, task := range job.Tasks { + for _, task := range job.Tasks { if task.IsPS { - psNodes = append(psNodes, nodes[i].ClientHost) + psNodes = append(psNodes, nodes[task.Name].ClientHost) } else { - workerNodes = append(workerNodes, nodes[i].ClientHost) + workerNodes = append(workerNodes, nodes[task.Name].ClientHost) } } if psNodes[0] == workerNodes[0] { diff --git a/src/optimizer.go b/src/optimizer.go index e5600c1..89d308d 100644 --- a/src/optimizer.go +++ b/src/optimizer.go @@ -50,7 +50,7 @@ func InstanceOfOptimizer() *Optimizer { return optimizerInstance } -func (optimizer *Optimizer) Init(conf Configuration) { +func (optimizer *Optimizer) Start() { log.Info("optimizer started") } diff --git a/src/pool_test.go b/src/pool_test.go index fc87d37..224c0f1 100644 --- a/src/pool_test.go +++ b/src/pool_test.go @@ -8,7 +8,7 @@ import ( func TestPool(t *testing.T) { return - InstanceOfResourcePool().init(Configuration{}) + InstanceOfResourcePool().Start() for j := 0; j < 100; j++ { for i := 0; i < 1000; i++ { @@ -36,7 +36,7 @@ func TestPool(t *testing.T) { } func TestAllocate(t *testing.T) { - InstanceOfResourcePool().init(Configuration{}) + InstanceOfResourcePool().Start() job := Job{Name: strconv.Itoa(int(time.Now().Unix() % 1000000000))} job.Group = "default" diff --git a/src/resource_pool.go b/src/resource_pool.go index a34080b..e88b25c 100644 --- a/src/resource_pool.go +++ b/src/resource_pool.go @@ -4,7 +4,6 @@ import ( "sync" "time" "net/url" - "strings" "math/rand" "strconv" "sort" @@ -51,11 +50,11 @@ type ResourcePool struct { exclusiveJobs map[string]bool TotalGPU int - TotalGPUMu sync.Mutex TotalCPU int TotalMemory int + TotalMu sync.Mutex UsingGPU int - UsingGPUMu sync.Mutex + UsingMu sync.Mutex enableBatch bool batchJobs map[string]Job @@ -64,7 +63,7 @@ type ResourcePool struct { batchInterval int } -func (pool *ResourcePool) init(conf Configuration) { +func (pool *ResourcePool) Start() { log.Info("RM started ") pool.networks = map[string]bool{} @@ -181,13 +180,13 @@ func (pool *ResourcePool) checkDeadNodes() { } seg.Lock.Lock() - pool.TotalGPUMu.Lock() + pool.TotalMu.Lock() if _, ok := seg.Nodes[k]; ok { pool.TotalGPU -= len(seg.Nodes[k].Status) pool.TotalCPU -= seg.Nodes[k].NumCPU pool.TotalMemory -= seg.Nodes[k].MemTotal } - pool.TotalGPUMu.Unlock() + pool.TotalMu.Unlock() delete(seg.Nodes, k) seg.Lock.Unlock() pool.versionsMu.Lock() @@ -297,11 +296,11 @@ func (pool *ResourcePool) saveStatusHistory() { } pool.historyMu.Unlock() - pool.TotalGPUMu.Lock() + pool.TotalMu.Lock() pool.TotalGPU = TotalGPU pool.TotalCPU = TotalCPU pool.TotalMemory = TotalMemGPU - pool.TotalGPUMu.Unlock() + pool.TotalMu.Unlock() time.Sleep(time.Second * 60) } } @@ -359,11 +358,11 @@ func (pool *ResourcePool) update(node NodeStatus) { } } else { /* TODO: double check node do belong to this seg */ - pool.TotalGPUMu.Lock() + pool.TotalMu.Lock() pool.TotalGPU += len(node.Status) pool.TotalCPU += node.NumCPU pool.TotalMemory += node.MemTotal - pool.TotalGPUMu.Unlock() + pool.TotalMu.Unlock() log.Info("node ", node.ClientID, " is online") } seg.Nodes[node.ClientID] = &node @@ -517,11 +516,17 @@ func (pool *ResourcePool) acquireNetwork() string { } v := url.Values{} v.Set("name", network) - resp, err := doRequest("POST", "http://yao-agent-master:8000/create", strings.NewReader(v.Encode()), "application/x-www-form-urlencoded", "") + spider := Spider{} + spider.Method = "POST" + spider.URL = "http://yao-agent-master:8000/create" + spider.Data = v + spider.ContentType = "application/x-www-form-urlencoded" + err := spider.do() if err != nil { log.Warn(err.Error()) continue } + resp := spider.getResponse() resp.Body.Close() pool.networksFree[network] = true pool.networks[network] = true @@ -580,86 +585,6 @@ func (pool *ResourcePool) countGPU() (int, int) { return pool.TotalGPU - pool.UsingGPU, pool.UsingGPU } -func (pool *ResourcePool) pickNode(candidates []*NodeStatus, availableGPUs map[string][]GPUStatus, task Task, job Job, nodes []NodeStatus) *NodeStatus { - - /* shuffle */ - r := rand.New(rand.NewSource(time.Now().Unix())) - for n := len(candidates); n > 0; n-- { - randIndex := r.Intn(n) - candidates[n-1], candidates[randIndex] = candidates[randIndex], candidates[n-1] - } - - /* sort */ - // single node, single GPU - sort.Slice(candidates, func(a, b int) bool { - diffA := pool.GPUModelToPower(candidates[a].Status[0].ProductName) - pool.GPUModelToPower(task.ModelGPU) - diffB := pool.GPUModelToPower(candidates[b].Status[0].ProductName) - pool.GPUModelToPower(task.ModelGPU) - - if diffA > 0 && diffB >= 0 && diffA > diffB { - return false //b - } - if diffA < 0 && diffB < 0 && diffA > diffB { - return false - } - if diffA < 0 && diffB >= 0 { - return false - } - if diffA == diffB { - if len(availableGPUs[candidates[a].ClientID]) == len(availableGPUs[candidates[b].ClientID]) { - return candidates[a].UtilCPU > candidates[b].UtilCPU - } - return len(availableGPUs[candidates[a].ClientID]) < len(availableGPUs[candidates[b].ClientID]) - } - return true //a - }) - - var t []*NodeStatus - bestGPU := candidates[0].Status[0].ProductName - for _, node := range candidates { - if node.Status[0].ProductName != bestGPU { - break - } - t = append(t, node) - } - candidates = t - - if (len(job.Tasks) == 1) && task.NumberGPU > 1 { //single node, multi GPUs - sort.Slice(candidates, func(a, b int) bool { - if len(availableGPUs[candidates[a].ClientID]) == len(availableGPUs[candidates[b].ClientID]) { - return candidates[a].UtilCPU > candidates[b].UtilCPU - } - return len(availableGPUs[candidates[a].ClientID]) < len(availableGPUs[candidates[b].ClientID]) - }) - } - - if len(job.Tasks) > 1 { //multi nodes, multi GPUs - sort.Slice(candidates, func(a, b int) bool { - distanceA := 0 - distanceB := 0 - for _, node := range nodes { - if node.Rack != candidates[a].Rack { - distanceA += 10 - } - if node.ClientID != candidates[a].ClientID { - distanceA += 1 - } - if node.Rack != candidates[b].Rack { - distanceB += 10 - } - if node.ClientID != candidates[b].ClientID { - distanceB += 1 - } - } - if distanceA == distanceB { - return len(availableGPUs[candidates[a].ClientID]) > len(availableGPUs[candidates[b].ClientID]) - } - return distanceA*job.Locality < distanceB*job.Locality - }) - } - - return candidates[0] -} - func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { for i := range job.Tasks { job.Tasks[i].Job = job.Name @@ -671,6 +596,7 @@ func (pool *ResourcePool) acquireResource(job Job) []NodeStatus { pool.batchJobs[job.Name] = job pool.batchMu.Unlock() for { + /* wait until request is satisfied */ pool.batchMu.Lock() if _, ok := pool.batchAllocations[job.Name]; ok { pool.batchMu.Unlock() @@ -785,9 +711,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { for j := range node.Status { if res.Status[i].UUID == node.Status[j].UUID { if node.Status[j].MemoryAllocated == 0 { - pool.UsingGPUMu.Lock() + pool.UsingMu.Lock() pool.UsingGPU ++ - pool.UsingGPUMu.Unlock() + pool.UsingMu.Unlock() } node.Status[j].MemoryAllocated += task.MemoryGPU res.Status[i].MemoryTotal = task.MemoryGPU @@ -895,9 +821,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { for j := range node.Status { if res.Status[i].UUID == node.Status[j].UUID { if node.Status[j].MemoryAllocated == 0 { - pool.UsingGPUMu.Lock() + pool.UsingMu.Lock() pool.UsingGPU ++ - pool.UsingGPUMu.Unlock() + pool.UsingMu.Unlock() } node.Status[j].MemoryAllocated += task.MemoryGPU res.Status[i].MemoryTotal = task.MemoryGPU @@ -989,9 +915,9 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { for j := range node.Status { if res.Status[i].UUID == node.Status[j].UUID { if node.Status[j].MemoryAllocated == 0 { - pool.UsingGPUMu.Lock() + pool.UsingMu.Lock() pool.UsingGPU ++ - pool.UsingGPUMu.Unlock() + pool.UsingMu.Unlock() } node.Status[j].MemoryAllocated += task.MemoryGPU res.Status[i].MemoryTotal = task.MemoryGPU @@ -1040,6 +966,11 @@ func (pool *ResourcePool) doAcquireResource(job Job) []NodeStatus { return ress } +/* +TODO: +bug-1: node is offline, unable to retrieve allocation info +bug-2: when node offline & back, allocation info is lost +*/ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) { segID := pool.getNodePool(agent.ClientID) seg := pool.pools[segID] @@ -1052,7 +983,7 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) { node, ok := seg.Nodes[agent.ClientID] /* in case node is offline */ if !ok { - /* TODO, update usingTotalGPU correctly */ + /* bug-1 */ log.Warn("node ", agent.ClientID, " not present") return } @@ -1060,19 +991,18 @@ func (pool *ResourcePool) releaseResource(job Job, agent NodeStatus) { for j := range node.Status { if gpu.UUID == node.Status[j].UUID { node.Status[j].MemoryAllocated -= gpu.MemoryTotal + log.Debug(node.Status[j].MemoryAllocated) if node.Status[j].MemoryAllocated < 0 { - // in case of error - /* Case 0: a node is offline and then online, the allocation info will be lost */ + /* bug-2: a node is offline and then online, the allocation info will be lost */ log.Warn(node.ClientID, " UUID=", gpu.UUID, " More Memory Allocated") node.Status[j].MemoryAllocated = 0 } if node.Status[j].MemoryAllocated == 0 { - pool.UsingGPUMu.Lock() + pool.UsingMu.Lock() pool.UsingGPU-- - pool.UsingGPUMu.Unlock() + pool.UsingMu.Unlock() log.Info(node.Status[j].UUID, " is released") } - //log.Info(node.Status[j].MemoryAllocated) } } } diff --git a/src/scheduler.go b/src/scheduler.go index f64ea50..ee07834 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -25,6 +25,7 @@ type Scheduler interface { UpdateParallelism(parallelism int) bool + /* TODO: rearrange jobs to other queues */ updateGroup(group Group) bool DebugDump() map[string]interface{} diff --git a/src/scheduler_FCFS.go b/src/scheduler_FCFS.go index e0e3c14..3bf7deb 100644 --- a/src/scheduler_FCFS.go +++ b/src/scheduler_FCFS.go @@ -107,7 +107,7 @@ func (scheduler *SchedulerFCFS) Stop(jobName string) MsgStop { if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } - return jm.stop(true) + return jm.stop() } func (scheduler *SchedulerFCFS) QueryLogs(jobName string, taskName string) MsgLog { diff --git a/src/scheduler_capacity.go b/src/scheduler_capacity.go index 7f09bec..d0e3147 100644 --- a/src/scheduler_capacity.go +++ b/src/scheduler_capacity.go @@ -272,7 +272,7 @@ func (scheduler *SchedulerCapacity) Stop(jobName string) MsgStop { if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } - return jm.stop(true) + return jm.stop() } func (scheduler *SchedulerCapacity) QueryLogs(jobName string, taskName string) MsgLog { diff --git a/src/scheduler_fair.go b/src/scheduler_fair.go index 19a3fdb..07d6938 100644 --- a/src/scheduler_fair.go +++ b/src/scheduler_fair.go @@ -379,6 +379,7 @@ func (scheduler *SchedulerFair) UpdateProgress(job Job, state State) { if scheduler.history[i].Name == job.Name { scheduler.history[i].Status = Running scheduler.history[i].UpdatedAt = int(time.Now().Unix()) + scheduler.history[i].StartedAt = time.Now().Unix() } } break @@ -694,7 +695,7 @@ func (scheduler *SchedulerFair) Stop(jobName string) MsgStop { jm, ok := scheduler.jobs[jobName] scheduler.queuesMu.Unlock() if ok { - return jm.stop(true) + return jm.stop() } else { found := false for queue := range scheduler.queues { @@ -802,6 +803,9 @@ func (scheduler *SchedulerFair) SetEnabled(enabled bool) bool { } func (scheduler *SchedulerFair) UpdateParallelism(parallelism int) bool { + if parallelism < 1 { + parallelism = 1 + } scheduler.parallelism = parallelism log.Info("parallelism is updated to ", parallelism) return true diff --git a/src/scheduler_priority.go b/src/scheduler_priority.go index 81b1f2b..381bea9 100644 --- a/src/scheduler_priority.go +++ b/src/scheduler_priority.go @@ -252,7 +252,7 @@ func (scheduler *SchedulerPriority) Stop(jobName string) MsgStop { if !ok { return MsgStop{Code: 1, Error: "Job not exist!"} } - return jm.stop(true) + return jm.stop() } func (scheduler *SchedulerPriority) QueryLogs(jobName string, taskName string) MsgLog { diff --git a/src/spider.go b/src/spider.go index 68590d3..f6b0fdb 100644 --- a/src/spider.go +++ b/src/spider.go @@ -15,7 +15,7 @@ type Spider struct { ContentType string Referer string Data url.Values - Response *http.Response + Response *http.Response } func (spider *Spider) do() error { @@ -25,17 +25,21 @@ func (spider *Spider) do() error { return err } - if len(spider.ContentType) > 0 { - req.Header.Set("Content-Type", spider.ContentType) + if len(spider.ContentType) == 0 { + spider.ContentType = "" } + req.Header.Set("Content-Type", spider.ContentType) + /* set user-agent */ if len(spider.UserAgent) == 0 { - req.Header.Set("User-Agent", getUA()) + spider.UserAgent = spider.getUA() } + req.Header.Set("User-Agent", spider.UserAgent) - if len(spider.Referer) > 0 { - req.Header.Set("Referer", spider.Referer) + if len(spider.Referer) == 0 { + spider.Referer = "" } + req.Header.Set("Referer", spider.Referer) spider.Response, err = client.Do(req) if err != nil { diff --git a/src/util.go b/src/util.go index 0f5e612..9792dcd 100644 --- a/src/util.go +++ b/src/util.go @@ -2,10 +2,6 @@ package main import ( "strconv" - "math/rand" - "time" - "io" - "net/http" ) type Job struct { @@ -24,6 +20,7 @@ type Job struct { Locality int `json:"locality"` Status State `json:"status"` NumberGPU int `json:"number_GPU"` + Retries int `json:"retries"` } type Task struct { @@ -65,43 +62,10 @@ type ResourceCount struct { Memory int } -func str2int(str string, defaultValue int) int { - i, err := strconv.Atoi(str) +func str2int(s string, defaultValue int) int { + i, err := strconv.Atoi(s) if err == nil { return i } return defaultValue } - -func getUA() string { - rand.Seed(time.Now().Unix()) - UAs := []string{ - "Mozilla/5.0 (X11; Linux i686; rv:64.0) Gecko/20100101 Firefox/64.0", - "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0", - "Mozilla/5.0 (X11; Linux i586; rv:63.0) Gecko/20100101 Firefox/63.0", - "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:63.0) Gecko/20100101 Firefox/63.0", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:10.0) Gecko/20100101 Firefox/62.0", - "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10.13; ko; rv:1.9.1b2) Gecko/20081201 Firefox/60.0", - "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/58.0", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/51.0.2704.79 Safari/537.36 Edge/14.14931", - "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/47.0.2526.111 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9", - } - return UAs[rand.Intn(len(UAs))] -} - -func doRequest(method string, url string, r io.Reader, contentType string, referer string) (*http.Response, error) { - client := &http.Client{} - req, err := http.NewRequest(method, url, r) - if err != nil { - return nil, err - } - - req.Header.Set("Content-Type", contentType) - req.Header.Set("User-Agent", getUA()) - req.Header.Set("Referer", referer) - - resp, err := client.Do(req) - return resp, err -}