From e614a15474a6835a24cc10e3d607ca96f182d759 Mon Sep 17 00:00:00 2001 From: Newnius Date: Tue, 9 Jun 2020 22:08:51 +0800 Subject: [PATCH] update --- agent.py | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/agent.py b/agent.py index 67eae9a..5e90b34 100644 --- a/agent.py +++ b/agent.py @@ -42,12 +42,65 @@ event_counter = 0 client = docker.from_env() +taskStats = {} +taskStatsLock = Lock() + def generate_token(stringLength=8): letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(stringLength)) +def monitor_task(container_id): + print(container_id) + container = client.containers.get(container_id) + maxCPU = 0 + maxMem = 0 + last_bw_rx = 0 + last_bw_tx = 0 + last_time = time.time() - 1 + for statR in container.stats(): + stat = json.loads(statR) + # print(stat) + if stat['read'] == '0001-01-01T00:00:00Z': + print('container ', container_id, ' exited') + break + + taskStatsLock.acquire() + # CPU load, x% + cur = stat['cpu_stats']['cpu_usage']['total_usage'] + last = stat['precpu_stats']['cpu_usage']['total_usage'] + utilCPU = (cur - last) / 10000000 + + # Memory, MB + mem = stat['memory_stats']['stats']['active_anon'] + mem = mem / 1024 + mem = mem / 1024 + + # Bandwidth, KB/s + cur = stat['networks']['eth0']['rx_bytes'] / 1024 + bw_rx = cur - last_bw_rx + last_bw_rx = cur + cur = stat['networks']['eth0']['tx_bytes'] / 1024 + bw_tx = cur - last_bw_tx + last_bw_tx = cur + now = time.time() + dur = now - last_time + last_time = now + bw_rx /= dur + bw_tx /= dur + + taskStats[container_id] = {'cpu': utilCPU, 'mem': mem, 'bw_rx': bw_rx, 'bw_tx': bw_tx} + # print(utilCPU, mem, maxCPU, maxMem, bw_rx, bw_tx) + taskStatsLock.release() + if stat['preread'] == '0001-01-01T00:00:00Z': + continue + if utilCPU > maxCPU: + maxCPU = utilCPU + if mem > maxMem: + maxMem = mem + + def launch_tasks(stats): utils = {} mem_frees = {} @@ -89,7 +142,8 @@ class MyHandler(BaseHTTPRequestHandler): msg = { 'pending_tasks': pending_tasks, 'id2token': id2token, - 'event_counter': event_counter + 'event_counter': event_counter, + 'taskStats': taskStats } self.send_response(200) self.send_header('Content-type', 'application/json') @@ -140,8 +194,19 @@ class MyHandler(BaseHTTPRequestHandler): 'finished_at': container.attrs['State']['FinishedAt'], 'status': container.status, 'hostname': container.attrs['Config']['Hostname'], - 'state': container.attrs['State'] + 'state': container.attrs['State'], + 'cpu': 0, + 'mem': 0, + 'bw_rx': 0, + 'bw_tx': 0 } + if container_id in taskStats: + taskStatsLock.acquire() + status['cpu'] = taskStats[container_id]['cpu'] + status['mem'] = taskStats[container_id]['mem'] + status['bw_rx'] = taskStats[container_id]['bw_rx'] + status['bw_tx'] = taskStats[container_id]['bw_tx'] + taskStatsLock.release() if container_id in id2token: token = id2token[container_id] if token in pending_tasks: @@ -200,6 +265,8 @@ class MyHandler(BaseHTTPRequestHandler): print(e) try: + # set PYTHONUNBUFFERED=1 to output immediately + # see https://tarunlalwani.com/post/why-delayed-output-python-docker/ script = " ".join([ "docker run", "--gpus '\"device=" + docker_gpus + "\"'", @@ -226,6 +293,8 @@ class MyHandler(BaseHTTPRequestHandler): exit_code, output = container.exec_run(['sh', '-c', script]) msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')} + Thread(target=monitor_task, name='monitor_task', args=(msg['id'],)).start() + if docker_wait == "1": lock.acquire() pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}