diff --git a/mock.py b/mock.py new file mode 100644 index 0000000..8d3be0e --- /dev/null +++ b/mock.py @@ -0,0 +1,160 @@ +import os +from threading import Thread +import time +import json +from kafka import KafkaProducer +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib import parse + +NUMS = os.getenv('NUMS', 1) + +ClientHost = os.getenv('ClientHost', "localhost") +KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') + +PORT_NUMBER = 8000 + + +class MyHandler(BaseHTTPRequestHandler): + # Handler for the GET requests + def do_GET(self): + req = parse.urlparse(self.path) + query = parse.parse_qs(req.query) + + if req.path == "/ping": + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes("pong", "utf-8")) + + elif req.path == "/logs": + try: + msg = {'code': 0, 'logs': 'Output from mock container'} + except Exception as e: + msg = {'code': 1, 'error': str(e)} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif req.path == "/status": + status = { + 'id': 'mock-container-id', + 'image': 'mock-container-image', + 'image_digest': 'mock-image-digest', + 'command': 'mock-container-command', + 'created_at': 'mock-container-created-at', + 'finished_at': 'mock-container-finished-at', + 'status': 'running', + 'hostname': 'mock-container-hostname', + 'state': {'exitCode': 1} + } + msg = {'code': 0, 'status': status} + + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + else: + self.send_error(404, 'File Not Found: %s' % self.path) + + # Handler for the POST requests + def do_POST(self): + if self.path == "/create": + msg = {"code": 0, "id": 'mock-container-id'} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif self.path == "/stop": + msg = {"code": 0, "error": "Success"} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + elif self.path == "/remove": + msg = {"code": 0, "error": "Success"} + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + else: + self.send_error(404, 'File Not Found: %s' % self.path) + + +def report(ClientID): + interval = 3 + while True: + try: + stats = [] + for i in range(0, 4): + stat = { + 'uuid': 'UUID-' + str(i), + 'product_name': 'K80', + 'performance_state': 'P0', + 'memory_total': 11260, + 'memory_free': 11260, + 'memory_used': 0, + 'utilization_gpu': 0, + 'utilization_mem': 0, + 'temperature_gpu': 45, + 'power_draw': 25 + } + + stats.append(stat) + + post_fields = { + 'id': ClientID, + 'host': ClientHost, + 'status': stats, + 'cpu_num': 64, + 'cpu_load': 3, + 'mem_total': 188, + 'mem_available': 180, + 'version': 0 + } + data = json.dumps(post_fields) + + producer = KafkaProducer(bootstrap_servers=KafkaBrokers) + future = producer.send('yao', value=data.encode(), partition=0) + result = future.get(timeout=10) + # print(result) + + time.sleep(interval) + except Exception as e: + print(e) + time.sleep(interval) + + +def listener(): + try: + # Create a web server and define the handler to manage the + # incoming request + server = HTTPServer(('', PORT_NUMBER), MyHandler) + print('Started http server on port ', PORT_NUMBER) + + # Wait forever for incoming http requests + server.serve_forever() + except KeyboardInterrupt: + print('^C received, shutting down the web server') + + server.socket.close() + + +if __name__ == '__main__': + os.environ["TZ"] = 'Asia/Shanghai' + if hasattr(time, 'tzset'): + time.tzset() + t1 = Thread(target=report) + + for i in range(0, int(NUMS)): + t = Thread(target=report, name=ClientHost + '_' + str(i), args=(ClientHost + '_' + str(i),)) + t.start() + + t2 = Thread(target=listener) + t2.start() + while True: + time.sleep(5) + pass