From f77465bb8b0ab3df831e629fad39d7c2615afb30 Mon Sep 17 00:00:00 2001 From: Newnius Date: Wed, 24 Jun 2020 17:10:26 +0800 Subject: [PATCH] update, add gpu stats for contaier --- agent.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- test.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 101 insertions(+), 3 deletions(-) diff --git a/agent.py b/agent.py index 5e90b34..4c92e53 100644 --- a/agent.py +++ b/agent.py @@ -45,6 +45,12 @@ client = docker.from_env() taskStats = {} taskStatsLock = Lock() +active_stats = {0: { + 'util': 0, + 'mem_util': 0, + 'mem': 0 +}} + def generate_token(stringLength=8): letters = string.ascii_lowercase @@ -54,12 +60,22 @@ def generate_token(stringLength=8): def monitor_task(container_id): print(container_id) container = client.containers.get(container_id) + + pid = 0 + maxCPU = 0 maxMem = 0 last_bw_rx = 0 last_bw_tx = 0 last_time = time.time() - 1 for statR in container.stats(): + if pid == 0: + res = container.top()['Processes'] + for x in res: + if "/workspace" in x[7] and int(x[1]) in active_stats: + pid = int(x[1]) + break + stat = json.loads(statR) # print(stat) if stat['read'] == '0001-01-01T00:00:00Z': @@ -90,7 +106,16 @@ def monitor_task(container_id): bw_rx /= dur bw_tx /= dur - taskStats[container_id] = {'cpu': utilCPU, 'mem': mem, 'bw_rx': bw_rx, 'bw_tx': bw_tx} + taskStats[container_id] = { + 'cpu': utilCPU, + 'mem': mem, + 'bw_rx': bw_rx, + 'bw_tx': bw_tx, + 'gpu_util': active_stats[pid]['util'], + 'gpu_mem_util': active_stats[pid]['mem_util'], + 'gpu_mem': active_stats[pid]['mem'], + } + # print(taskStats[container_id]) # print(utilCPU, mem, maxCPU, maxMem, bw_rx, bw_tx) taskStatsLock.release() if stat['preread'] == '0001-01-01T00:00:00Z': @@ -206,6 +231,10 @@ class MyHandler(BaseHTTPRequestHandler): status['mem'] = taskStats[container_id]['mem'] status['bw_rx'] = taskStats[container_id]['bw_rx'] status['bw_tx'] = taskStats[container_id]['bw_tx'] + status['bw_tx'] = taskStats[container_id]['bw_tx'] + status['gpu_util'] = taskStats[container_id]['gpu_util'] + status['gpu_mem_util'] = taskStats[container_id]['gpu_mem_util'] + status['gpu_mem'] = taskStats[container_id]['gpu_mem'] taskStatsLock.release() if container_id in id2token: token = id2token[container_id] @@ -390,6 +419,29 @@ def reporter(): time.sleep(HeartbeatInterval) +def pmon(): + while True: + try: + status, msg_gpu = execute(['nvidia-smi', 'pmon', '-c', '1', '-s', 'um']) + if not status: + print("[WARN] execute failed, ", msg_gpu, status) + lists = msg_gpu.split('\n') + for p in lists: + if "#" not in p and "-" not in p: + tmp = p.split() + data = { + 'idx': int(tmp[0]), + 'pid': int(tmp[1]), + 'util': int(tmp[3]), + 'mem_util': int(tmp[4]), + 'mem': int(tmp[7]) + } + active_stats[int(tmp[1])] = data + except Exception as e: + print(e) + time.sleep(HeartbeatInterval) + + def execute(cmd): try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -482,6 +534,7 @@ if __name__ == '__main__': Thread(target=reporter).start() Thread(target=listener).start() + Thread(target=pmon).start() if EnableEventTrigger == 'true': print('start event trigger') Thread(target=event_trigger).start() diff --git a/test.py b/test.py index ab6aef0..9b46caa 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,4 @@ +import subprocess import docker @@ -100,11 +101,53 @@ def create_container(): def exec_run(): client = docker.from_env() container = client.containers.get('yao-agent-helper') - exit_code, output = container.exec_run(cmd="sh -c 'docker run --gpus all --detach=True tensorflow/tensorflow:1.14.0-gpu nvidia-smi'") + exit_code, output = container.exec_run( + cmd="sh -c 'docker run --gpus all --detach=True tensorflow/tensorflow:1.14.0-gpu nvidia-smi'") if exit_code == 0: print(output.decode('utf-8').rstrip('\n')) +def report(): + try: + status, msg_gpu = execute(['nvidia-smi', 'pmon', '-c', '1', '-s', 'um']) + if not status: + print("execute failed, ", msg_gpu, status) + lists = msg_gpu.split('\n') + for p in lists: + if "#" not in p and "-" not in p: + tmp = p.split() + data = { + 'idx': int(tmp[0]), + 'pid': int(tmp[1]), + 'util': int(tmp[3]), + 'mem_util': int(tmp[4]), + 'mem': int(tmp[7]) + } + print(data) + except Exception as e: + print(e) + + +def execute(cmd): + try: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if result.returncode == 0: + return True, result.stdout.decode('utf-8').rstrip('\n') + return False, result.stderr.decode('utf-8').rstrip('\n') + except Exception as e: + return False, e + + +def getPID(container_id): + client = docker.from_env() + container = client.containers.get(container_id) + res = container.top()['Processes'] + for x in res: + if "/workspace" in x[7]: + print(res[1]) + break + + # create_network() # list_networks() @@ -112,4 +155,6 @@ def exec_run(): # get_status('af121babda9b') # exec_run() # run() -create_container() +# create_container() +# report() +getPID('a6543cef3c85')