From 4e37b5a62d29703d66e14dd1cf0eac8ea311841f Mon Sep 17 00:00:00 2001 From: Newnius Date: Thu, 26 Dec 2019 16:34:31 +0800 Subject: [PATCH] all in one --- main.py | 320 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ utils.py | 0 2 files changed, 320 insertions(+) create mode 100644 main.py create mode 100644 utils.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..fc75a7e --- /dev/null +++ b/main.py @@ -0,0 +1,320 @@ +import os +import threading +import time +import subprocess +import json +from xml.dom.minidom import parse +import xml.dom.minidom +from kafka import KafkaProducer +import multiprocessing +import psutil +import math +from executor import launch_tasks +from http.server import BaseHTTPRequestHandler, HTTPServer +import cgi +import docker +from urllib import parse + +ClientID = os.getenv('ClientID', 1) +ClientHost = os.getenv('ClientHost', "localhost") +KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') + +PORT_NUMBER = 8000 + +lock = threading.Lock() +pending_tasks = {} + + +def launch_tasks(stats): + client = docker.from_env() + container = client.containers.get('yao-agent-helper') + entries_to_remove = [] + lock.acquire() + for task_id, task in pending_tasks.items(): + if stats[task['gpus'][0]]['utilization_gpu'] < 75: + entries_to_remove.append(task_id) + script = " ".join([ + "docker exec", + id, + "pkill sleep" + ]) + container.exec_run('sh -c \'' + script + '\'') + + for k in entries_to_remove: + pending_tasks.pop(k, None) + lock.release() + + +class MyHandler(BaseHTTPRequestHandler): + # Handler for the GET requests + def do_GET(self): + req = parse.urlparse(self.path) + query = parse.parse_qs(req.query) + + if req.path == "/ping": + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes("pong", "utf-8")) + + elif req.path == "/logs": + try: + container_id = query.get('id')[0] + client = docker.from_env() + container = client.containers.get(container_id) + msg = {'code': 0, 'logs': str(container.logs().decode())} + except Exception as e: + msg = {'code': 1, 'error': str(e)} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif req.path == "/status": + try: + container_id = query.get('id')[0] + client = docker.from_env() + container = client.containers.list(all=True, filters={'id': container_id}) + if len(container) > 0: + container = container[0] + status = { + 'id': container.short_id, + 'image': container.attrs['Config']['Image'], + 'image_digest': container.attrs['Image'], + 'command': container.attrs['Config']['Cmd'], + 'created_at': container.attrs['Created'], + 'finished_at': container.attrs['State']['FinishedAt'], + 'status': container.status, + 'hostname': container.attrs['Config']['Hostname'], + 'state': container.attrs['State'] + } + if status['command'] is not None: + status['command'] = ' '.join(container.attrs['Config']['Cmd']) + msg = {'code': 0, 'status': status} + else: + msg = {'code': 1, 'error': "container not exist"} + except Exception as e: + msg = {'code': 2, 'error': str(e)} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + else: + self.send_error(404, 'File Not Found: %s' % self.path) + + # Handler for the POST requests + def do_POST(self): + if self.path == "/create": + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + docker_image = form.getvalue('image') + docker_name = form.getvalue('name') + docker_cmd = form.getvalue('cmd') + docker_workspace = form.getvalue('workspace') + docker_gpus = form.getvalue('gpus') + docker_mem_limit = form.getvalue('mem_limit') + docker_cpu_limit = form.getvalue('cpu_limit') + docker_network = form.getvalue('network') + + try: + script = " ".join([ + "docker run", + "--gpus '\"device=" + docker_gpus + "\"'", + "--detach=True", + "--hostname " + docker_name, + "--network " + docker_network, + "--network-alias " + docker_name, + "--memory-reservation " + docker_mem_limit, + "--cpus " + docker_cpu_limit, + "--env repo=" + docker_workspace, + docker_image, + docker_cmd + ]) + + client = docker.from_env() + container = client.containers.get('yao-agent-helper') + exit_code, output = container.exec_run('sh -c \'' + script + '\'') + msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')} + + lock.acquire() + pending_tasks[msg['id']] = {'gpus': str(docker_gpus).split(',')} + lock.release() + if exit_code != 0: + msg["code"] = 1 + except Exception as e: + msg = {"code": 1, "error": str(e)} + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif self.path == "/stop": + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + container_id = form.getvalue('id') + + try: + client = docker.from_env() + container = client.containers.get(container_id) + container.stop() + msg = {"code": 0, "error": "Success"} + except Exception as e: + msg = {"code": 1, "error": str(e)} + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif self.path == "/remove": + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + container_id = form.getvalue('id') + + try: + client = docker.from_env() + container = client.containers.get(container_id) + container.remove(force=True) + msg = {"code": 0, "error": "Success"} + except Exception as e: + msg = {"code": 1, "error": str(e)} + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + else: + self.send_error(404, 'File Not Found: %s' % self.path) + + +def report(): + interval = 5 + while True: + try: + status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml']) + if not status: + print("execute failed, ", msg_gpu) + stats = get_gpu_status() + report_msg(stats) + t = threading.Thread(target=launch_tasks, name='launch_tasks', args=(stats,)) + t.start() + time.sleep(interval) + except Exception as e: + print(e) + time.sleep(interval) + + +def execute(cmd): + try: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if result.returncode == 0: + return True, result.stdout.decode('utf-8').rstrip('\n') + return False, result.stderr.decode('utf-8').rstrip('\n') + except Exception as e: + return False, e + + +def get_gpu_status(): + DOMTree = xml.dom.minidom.parse("status.xml") + collection = DOMTree.documentElement + gpus = collection.getElementsByTagName("gpu") + stats = [] + for gpu in gpus: + stat = { + 'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data, + 'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data, + 'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data, + 'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[ + 0].data, + 'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[ + 0].data, + 'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[ + 0].data, + 'utilization_gpu': + gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data, + 'utilization_mem': + gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data, + 'temperature_gpu': + gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data, + 'power_draw': + gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data + } + + stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0])) + stat['memory_free'] = int(float(stat['memory_free'].split(' ')[0])) + stat['memory_used'] = int(float(stat['memory_used'].split(' ')[0])) + stat['utilization_gpu'] = int(float(stat['utilization_gpu'].split(' ')[0])) + stat['utilization_mem'] = int(float(stat['utilization_mem'].split(' ')[0])) + stat['temperature_gpu'] = int(float(stat['temperature_gpu'].split(' ')[0])) + stat['power_draw'] = int(float(stat['power_draw'].split(' ')[0])) + + stats.append(stat) + return stats + + +def report_msg(stats): + mem = psutil.virtual_memory() + + post_fields = { + 'id': ClientID, + 'host': ClientHost, + 'status': stats, + 'cpu_num': multiprocessing.cpu_count(), + 'cpu_load': os.getloadavg()[0], + 'mem_total': math.floor(mem.total / (1024. ** 3)), + 'mem_available': math.floor(mem.available / (1024. ** 3)) + } + data = json.dumps(post_fields) + + producer = KafkaProducer(bootstrap_servers=KafkaBrokers) + future = producer.send('yao', value=data.encode(), partition=0) + result = future.get(timeout=10) + print(result) + + +def listen(): + try: + # Create a web server and define the handler to manage the + # incoming request + server = HTTPServer(('', PORT_NUMBER), MyHandler) + print('Started http server on port ', PORT_NUMBER) + + # Wait forever for incoming http requests + server.serve_forever() + except KeyboardInterrupt: + print('^C received, shutting down the web server') + + server.socket.close() + + +def main(): + t1 = threading.Thread(target=report()) + t2 = threading.Thread(target=listen()) + t1.start() + t2.start() + t1.join() + t2.join() + + +if __name__ == '__main__': + os.environ["TZ"] = 'Asia/Shanghai' + if hasattr(time, 'tzset'): + time.tzset() + main() diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..e69de29