From d2889614eeec8ac1ff44d5e3221674e365362eba Mon Sep 17 00:00:00 2001 From: Newnius Date: Sat, 13 Apr 2019 19:39:29 +0800 Subject: [PATCH] update --- Dockerfile | 4 +- bootstrap.sh | 2 +- executor.py | 173 +++++++++++++++++++++++++++---------- yao-agent.py => monitor.py | 24 +++-- server.py | 137 ----------------------------- test.py | 52 +++++++++++ 6 files changed, 200 insertions(+), 192 deletions(-) rename yao-agent.py => monitor.py (74%) delete mode 100644 server.py create mode 100644 test.py diff --git a/Dockerfile b/Dockerfile index 1206461..be21dff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,8 +6,8 @@ RUN pip3 install docker kafka ADD bootstrap.sh /etc/bootstrap.sh -ADD yao-agent.py /root/yao-agent.py -ADD server.py /root/server.py +ADD monitor.py /root/monitor.py +ADD executor.py /root/executor.py WORKDIR /root diff --git a/bootstrap.sh b/bootstrap.sh index 94f8c2b..8561bff 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -2,7 +2,7 @@ # TODO: monitor the processes -python3 /root/yao-agent.py & +python3 /root/monitor.py & python3 /root/server.py & diff --git a/executor.py b/executor.py index f01073d..3ee7873 100644 --- a/executor.py +++ b/executor.py @@ -1,52 +1,137 @@ +#!/usr/bin/python +from http.server import BaseHTTPRequestHandler, HTTPServer +import cgi import docker +import json +from urllib import parse + +PORT_NUMBER = 8000 -def run(): - client = docker.from_env() - try: - print(client.containers.run(image="alpine", command="nvid", environment={"KEY": "value"})) - # print(client.containers.run(image="nvidia/cuda:9.0-base", command="nvidia-smi", environment={"KEY": "value"}, runtime="nvidia")) - except Exception as e: - print(e.__class__.__name__, e) +# This class will handles any incoming request from +# the browser +class MyHandler(BaseHTTPRequestHandler): + # Handler for the GET requests + def do_GET(self): + req = parse.urlparse(self.path) + query = parse.parse_qs(req.query) + + if req.path == "/ping": + # Open the static file requested and send it + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes("pong", "utf-8")) + + elif req.path == "/logs": + try: + container_id = query['id'][0] + client = docker.from_env() + container = client.containers.get(container_id) + msg = {'code': 0, 'logs': str(container.logs().decode())} + except Exception as e: + msg = {'code': 0, 'error': str(e)} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif req.path == "/status": + container_id = query['id'][0] + client = docker.from_env() + container = client.containers.list(all=True, filters={'id': container_id}) + if len(container) > 0: + container = container[0] + status = { + 'id': container.short_id, + 'image': container.attrs['Config']['Image'], + 'image_digest': container.attrs['Image'], + 'command': container.attrs['Config']['Cmd'], + 'created_at': container.attrs['Created'], + 'finished_at': container.attrs['State']['FinishedAt'], + 'status': container.status + } + if status['command'] is not None: + status['command'] = ' '.join(container.attrs['Config']['Cmd']) + msg = {'code': 0, 'status': status} + else: + msg = {'code': 1, 'error': "container not exist"} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + else: + self.send_error(404, 'File Not Found: %s' % self.path) + + # Handler for the POST requests + def do_POST(self): + if self.path == "/create": + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + docker_image = form["image"].value + docker_name = form["name"].value + docker_cmd = form["cmd"].value + docker_workspace = form["workspace"].value + docker_gpus = form["gpus"].value + + try: + client = docker.from_env() + container = client.containers.run( + image=docker_image, + hostname=docker_name, + command=docker_cmd, + environment={"repo": docker_workspace, "NVIDIA_VISIBLE_DEVICES": docker_gpus}, + runtime="nvidia", + detach=True + ) + msg = {"code": 0, "id": container.id} + except Exception as e: + msg = {"code": 1, "error": str(e)} + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif self.path == "/stop": + form = cgi.FieldStorage( + fp=self.rfile, + headers=self.headers, + environ={ + 'REQUEST_METHOD': 'POST', + 'CONTENT_TYPE': self.headers['Content-Type'], + }) + container_id = form["id"].value + + client = docker.from_env() + container = client.containers.get(container_id) + container.stop() + msg = {"code": 0} + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + else: + self.send_error(404, 'File Not Found: %s' % self.path) -def run_in_background(): - client = docker.from_env() - container = client.containers.run("alpine", ["echo", "hello", "world"], detach=True) - print(container.id) +try: + # Create a web server and define the handler to manage the + # incoming request + server = HTTPServer(('', PORT_NUMBER), MyHandler) + print('Started http server on port ', PORT_NUMBER) + # Wait forever for incoming http requests + server.serve_forever() -def list_containers(): - client = docker.from_env() - for container in client.containers.list(): - print(container.id) +except KeyboardInterrupt: + print('^C received, shutting down the web server') - -def get_logs(id): - try: - client = docker.from_env() - container = client.containers.get(id) - print(container.logs().decode()) - except Exception as e: - print(e) - - -def get_status(id): - client = docker.from_env() - container = client.containers.list(all=True, filters={'id': id}) - status = {} - if len(container) > 0: - container= container[0] - status['id'] = container.short_id - status['image'] = container.attrs['Config']['Image'] - status['image_digest'] = container.attrs['Image'] - status['command'] = container.attrs['Config']['Cmd'] - status['createdAt'] = container.attrs['Created'] - status['finishedAt'] = container.attrs['State']['FinishedAt'] - status['status'] = container.status - if status['command'] is not None: - status['command'] = ' '.join(container.attrs['Config']['Cmd']) - print(status) - - -get_status('') +server.socket.close() diff --git a/yao-agent.py b/monitor.py similarity index 74% rename from yao-agent.py rename to monitor.py index b568972..0eaca8d 100644 --- a/yao-agent.py +++ b/monitor.py @@ -7,6 +7,7 @@ import xml.dom.minidom from kafka import KafkaProducer ClientID = os.getenv('ClientID', 1) +ClientHost = os.getenv('ClientHost', "localhost") KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') @@ -44,13 +45,20 @@ def report_msg(): 'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data, 'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data, 'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data, - 'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[0].data, - 'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[0].data, - 'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[0].data, - 'utilization_gpu': gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data, - 'utilization_mem': gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data, - 'temperature_gpu': gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data, - 'power_draw': gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data + 'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[ + 0].data, + 'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[ + 0].data, + 'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[ + 0].data, + 'utilization_gpu': + gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data, + 'utilization_mem': + gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data, + 'temperature_gpu': + gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data, + 'power_draw': + gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data } stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0])) @@ -63,7 +71,7 @@ def report_msg(): stats.append(stat) - post_fields = {'id': ClientID, 'status': stats} + post_fields = {'id': ClientID, 'host': ClientHost, 'status': stats} data = json.dumps(post_fields) producer = KafkaProducer(bootstrap_servers=KafkaBrokers) diff --git a/server.py b/server.py deleted file mode 100644 index 3ee7873..0000000 --- a/server.py +++ /dev/null @@ -1,137 +0,0 @@ -#!/usr/bin/python -from http.server import BaseHTTPRequestHandler, HTTPServer -import cgi -import docker -import json -from urllib import parse - -PORT_NUMBER = 8000 - - -# This class will handles any incoming request from -# the browser -class MyHandler(BaseHTTPRequestHandler): - # Handler for the GET requests - def do_GET(self): - req = parse.urlparse(self.path) - query = parse.parse_qs(req.query) - - if req.path == "/ping": - # Open the static file requested and send it - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(bytes("pong", "utf-8")) - - elif req.path == "/logs": - try: - container_id = query['id'][0] - client = docker.from_env() - container = client.containers.get(container_id) - msg = {'code': 0, 'logs': str(container.logs().decode())} - except Exception as e: - msg = {'code': 0, 'error': str(e)} - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(bytes(json.dumps(msg), "utf-8")) - - elif req.path == "/status": - container_id = query['id'][0] - client = docker.from_env() - container = client.containers.list(all=True, filters={'id': container_id}) - if len(container) > 0: - container = container[0] - status = { - 'id': container.short_id, - 'image': container.attrs['Config']['Image'], - 'image_digest': container.attrs['Image'], - 'command': container.attrs['Config']['Cmd'], - 'created_at': container.attrs['Created'], - 'finished_at': container.attrs['State']['FinishedAt'], - 'status': container.status - } - if status['command'] is not None: - status['command'] = ' '.join(container.attrs['Config']['Cmd']) - msg = {'code': 0, 'status': status} - else: - msg = {'code': 1, 'error': "container not exist"} - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(bytes(json.dumps(msg), "utf-8")) - - else: - self.send_error(404, 'File Not Found: %s' % self.path) - - # Handler for the POST requests - def do_POST(self): - if self.path == "/create": - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={ - 'REQUEST_METHOD': 'POST', - 'CONTENT_TYPE': self.headers['Content-Type'], - }) - docker_image = form["image"].value - docker_name = form["name"].value - docker_cmd = form["cmd"].value - docker_workspace = form["workspace"].value - docker_gpus = form["gpus"].value - - try: - client = docker.from_env() - container = client.containers.run( - image=docker_image, - hostname=docker_name, - command=docker_cmd, - environment={"repo": docker_workspace, "NVIDIA_VISIBLE_DEVICES": docker_gpus}, - runtime="nvidia", - detach=True - ) - msg = {"code": 0, "id": container.id} - except Exception as e: - msg = {"code": 1, "error": str(e)} - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(bytes(json.dumps(msg), "utf-8")) - - elif self.path == "/stop": - form = cgi.FieldStorage( - fp=self.rfile, - headers=self.headers, - environ={ - 'REQUEST_METHOD': 'POST', - 'CONTENT_TYPE': self.headers['Content-Type'], - }) - container_id = form["id"].value - - client = docker.from_env() - container = client.containers.get(container_id) - container.stop() - msg = {"code": 0} - - self.send_response(200) - self.send_header('Content-type', 'application/json') - self.end_headers() - self.wfile.write(bytes(json.dumps(msg), "utf-8")) - else: - self.send_error(404, 'File Not Found: %s' % self.path) - - -try: - # Create a web server and define the handler to manage the - # incoming request - server = HTTPServer(('', PORT_NUMBER), MyHandler) - print('Started http server on port ', PORT_NUMBER) - - # Wait forever for incoming http requests - server.serve_forever() - -except KeyboardInterrupt: - print('^C received, shutting down the web server') - -server.socket.close() diff --git a/test.py b/test.py new file mode 100644 index 0000000..f01073d --- /dev/null +++ b/test.py @@ -0,0 +1,52 @@ +import docker + + +def run(): + client = docker.from_env() + try: + print(client.containers.run(image="alpine", command="nvid", environment={"KEY": "value"})) + # print(client.containers.run(image="nvidia/cuda:9.0-base", command="nvidia-smi", environment={"KEY": "value"}, runtime="nvidia")) + except Exception as e: + print(e.__class__.__name__, e) + + +def run_in_background(): + client = docker.from_env() + container = client.containers.run("alpine", ["echo", "hello", "world"], detach=True) + print(container.id) + + +def list_containers(): + client = docker.from_env() + for container in client.containers.list(): + print(container.id) + + +def get_logs(id): + try: + client = docker.from_env() + container = client.containers.get(id) + print(container.logs().decode()) + except Exception as e: + print(e) + + +def get_status(id): + client = docker.from_env() + container = client.containers.list(all=True, filters={'id': id}) + status = {} + if len(container) > 0: + container= container[0] + status['id'] = container.short_id + status['image'] = container.attrs['Config']['Image'] + status['image_digest'] = container.attrs['Image'] + status['command'] = container.attrs['Config']['Cmd'] + status['createdAt'] = container.attrs['Created'] + status['finishedAt'] = container.attrs['State']['FinishedAt'] + status['status'] = container.status + if status['command'] is not None: + status['command'] = ' '.join(container.attrs['Config']['Cmd']) + print(status) + + +get_status('')