diff --git a/agent.py b/agent.py index 3c9ac68..5e74621 100644 --- a/agent.py +++ b/agent.py @@ -14,61 +14,48 @@ from http.server import BaseHTTPRequestHandler, HTTPServer import cgi import docker from urllib import parse +import random +import string ClientID = os.getenv('ClientID', 1) ClientHost = os.getenv('ClientHost', "localhost") +ClientExtHost = os.getenv('ClientExtHost', "localhost") KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') -PORT_NUMBER = 8000 +PORT = os.getenv('Port', 8000) +HeartbeatInterval = os.getenv('HeartbeatInterval', 5) lock = Lock() -pending_tasks = {} - -ver = 0 -last_version = {} +# pending_tasks = {} +id2token = {} counter = {} -def launch_task_in_background(container, task_id): - script = " ".join([ - "docker exec", - task_id, - "pkill", - "sleep" - ]) - - while True: - code = container.exec_run('sh -c \'' + script + '\'').exit_code - if code == 0: - break - time.sleep(0.1) +def generate_token(stringLength=8): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for i in range(stringLength)) def launch_tasks(stats): utils = {} - mems = {} + mem_frees = {} for stat in stats: utils[stat['uuid']] = stat['utilization_gpu'] - if int(stat['utilization_gpu']) < 60: + if int(stat['utilization_gpu']) < 10: if stat['uuid'] not in counter: counter[stat['uuid']] = 0 counter[stat['uuid']] += 1 else: counter[stat['uuid']] = 0 - mems[stat['uuid']] = stat['memory_free'] + mem_frees[stat['uuid']] = stat['memory_free'] - client = docker.from_env() - container = client.containers.get('yao-agent-helper') entries_to_remove = [] lock.acquire() - for task_id, task in pending_tasks.items(): - if int(utils[task['gpus'][0]]) < 60 and counter[task['gpus'][0]] >= 2 \ - and mems[task['gpus'][0]] > task['gpu_mem']: - entries_to_remove.append(task_id) - - t = Thread(target=launch_task_in_background, name='launch_task', args=(container, task_id,)) - t.start() + for token, task in pending_tasks.items(): + if int(utils[task['gpus'][0]]) < 10 and counter[task['gpus'][0]] >= 2 \ + and mem_frees[task['gpus'][0]] > task['gpu_mem']: + entries_to_remove.append(token) for k in entries_to_remove: pending_tasks.pop(k, None) @@ -87,6 +74,33 @@ class MyHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(bytes("pong", "utf-8")) + if req.path == "/debug": + msg = { + 'pending_tasks': pending_tasks, + 'id2token': id2token + } + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(json.dumps(msg), "utf-8")) + + if req.path == "/can_run": + res = "1" + try: + token = query.get('token')[0] + for i in range(0, 50): + if token in pending_tasks: + res = "0" + else: + break + time.sleep(0.1) + except Exception as e: + print(e) + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + self.wfile.write(bytes(res, "utf-8")) + elif req.path == "/logs": try: container_id = query.get('id')[0] @@ -118,8 +132,10 @@ class MyHandler(BaseHTTPRequestHandler): 'hostname': container.attrs['Config']['Hostname'], 'state': container.attrs['State'] } - # if container_id in pending_tasks: - # status['status'] = 'ready' + if container_id in id2token: + token = id2token[container_id] + if token in pending_tasks: + status['status'] = 'ready' if status['command'] is not None: status['command'] = ' '.join(container.attrs['Config']['Cmd']) msg = {'code': 0, 'status': status} @@ -158,6 +174,7 @@ class MyHandler(BaseHTTPRequestHandler): docker_hdfs_address = form.getvalue('hdfs_address') docker_hdfs_dir = form.getvalue('hdfs_dir') docker_gpu_mem = form.getvalue('gpu_mem') + token = generate_token(16) try: script = " ".join([ @@ -171,6 +188,7 @@ class MyHandler(BaseHTTPRequestHandler): "--cpus " + docker_cpu_limit, "--env repo=" + docker_workspace, "--env should_wait=" + docker_wait, + "--env should_cb=" + 'http://' + ClientExtHost + ':' + PORT + '/can_run?token=' + token, "--env output_dir=" + docker_output, "--env hdfs_address=" + docker_hdfs_address, "--env hdfs_dir=" + docker_hdfs_dir, @@ -185,7 +203,8 @@ class MyHandler(BaseHTTPRequestHandler): msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')} lock.acquire() - pending_tasks[msg['id']] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)} + pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)} + id2token[msg['id']] = token lock.release() if exit_code != 0: msg["code"] = 1 @@ -249,21 +268,30 @@ class MyHandler(BaseHTTPRequestHandler): self.send_error(404, 'File Not Found: %s' % self.path) +def event_trigger(): + client = docker.from_env() + for event in client.events(decode=True, filters={'event': 'die'}): + Thread(target=report).start() + print(event) + + def report(): - interval = 3 + try: + status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml']) + if not status: + print("execute failed, ", msg_gpu) + stats = get_gpu_status() + report_msg(stats) + t = Thread(target=launch_tasks, name='launch_tasks', args=(stats,)) + t.start() + except Exception as e: + print(e) + + +def reporter(): while True: - try: - status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml']) - if not status: - print("execute failed, ", msg_gpu) - stats = get_gpu_status() - report_msg(stats) - t = Thread(target=launch_tasks, name='launch_tasks', args=(stats,)) - t.start() - time.sleep(interval) - except Exception as e: - print(e) - time.sleep(interval) + report() + time.sleep(HeartbeatInterval) def execute(cmd): @@ -315,10 +343,7 @@ def get_gpu_status(): def report_msg(stats): - global last_version - global ver mem = psutil.virtual_memory() - post_fields = { 'id': ClientID, 'host': ClientHost, @@ -326,59 +351,29 @@ def report_msg(stats): 'cpu_num': multiprocessing.cpu_count(), 'cpu_load': os.getloadavg()[0], 'mem_total': math.floor(mem.total / (1024. ** 3)), - 'mem_available': math.floor(mem.available / (1024. ** 3)) + 'mem_available': math.floor(mem.available / (1024. ** 3)), + 'version': time.time() } - flag = False - if 'cpu_num' in last_version: # not null - if abs(last_version['cpu_num'] - post_fields['cpu_num']) > 0.0: - flag = True - if abs(last_version['cpu_load'] - post_fields['cpu_load']) / post_fields['cpu_num'] > 0.1: - flag = True - if abs(last_version['mem_total'] - post_fields['mem_total']) > 0.0: - flag = True - if abs(last_version['mem_available'] - post_fields['mem_available']) / post_fields['mem_available'] > 0.05: - flag = True - for i in range(len(stats)): - if abs(last_version['status'][i]['memory_total'] - post_fields['status'][i]['memory_total']) > 0.0: - flag = True - if abs(last_version['status'][i]['memory_free'] - post_fields['status'][i]['memory_free']) / \ - post_fields['status'][i]['memory_total'] > 0.05: - flag = True - if abs(last_version['status'][i]['utilization_gpu'] - post_fields['status'][i]['utilization_gpu']) > 25.0: - flag = True - else: - flag = True - - if flag: - ver = time.time() - last_version = post_fields - - post_fields['version'] = ver data = json.dumps(post_fields) - if flag: - print(ver) - print(post_fields) - producer = KafkaProducer(bootstrap_servers=KafkaBrokers) future = producer.send('yao', value=data.encode(), partition=0) - result = future.get(timeout=10) - # print(result) + result = future.get(timeout=5) def listener(): + global server try: # Create a web server and define the handler to manage the # incoming request - server = HTTPServer(('', PORT_NUMBER), MyHandler) - print('Started http server on port ', PORT_NUMBER) + server = HTTPServer(('', PORT), MyHandler) + print('Started http server on port ', PORT) # Wait forever for incoming http requests server.serve_forever() except KeyboardInterrupt: print('^C received, shutting down the web server') - server.socket.close() @@ -386,10 +381,12 @@ if __name__ == '__main__': os.environ["TZ"] = 'Asia/Shanghai' if hasattr(time, 'tzset'): time.tzset() - t1 = Thread(target=report) + t1 = Thread(target=reporter) t2 = Thread(target=listener) + t3 = Thread(target=event_trigger) t1.start() t2.start() - while True: - time.sleep(5) - pass + t3.start() + t1.join() + t2.join() + t3.join() diff --git a/mock.py b/mock.py index 8d3be0e..3c8d760 100644 --- a/mock.py +++ b/mock.py @@ -10,8 +10,8 @@ NUMS = os.getenv('NUMS', 1) ClientHost = os.getenv('ClientHost', "localhost") KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') - -PORT_NUMBER = 8000 +PORT = os.getenv('Port', 8000) +HeartbeatInterval = os.getenv('HeartbeatInterval', 5) class MyHandler(BaseHTTPRequestHandler): @@ -85,7 +85,6 @@ class MyHandler(BaseHTTPRequestHandler): def report(ClientID): - interval = 3 while True: try: stats = [] @@ -119,21 +118,21 @@ def report(ClientID): producer = KafkaProducer(bootstrap_servers=KafkaBrokers) future = producer.send('yao', value=data.encode(), partition=0) - result = future.get(timeout=10) - # print(result) + result = future.get(timeout=5) - time.sleep(interval) + time.sleep(HeartbeatInterval) except Exception as e: print(e) - time.sleep(interval) + time.sleep(HeartbeatInterval) def listener(): + global server try: # Create a web server and define the handler to manage the # incoming request - server = HTTPServer(('', PORT_NUMBER), MyHandler) - print('Started http server on port ', PORT_NUMBER) + server = HTTPServer(('', PORT), MyHandler) + print('Started http server on port ', PORT) # Wait forever for incoming http requests server.serve_forever() @@ -147,14 +146,18 @@ if __name__ == '__main__': os.environ["TZ"] = 'Asia/Shanghai' if hasattr(time, 'tzset'): time.tzset() - t1 = Thread(target=report) - - for i in range(0, int(NUMS)): - t = Thread(target=report, name=ClientHost + '_' + str(i), args=(ClientHost + '_' + str(i),)) - t.start() + threads = [] + for clientID in range(0, int(NUMS)): + t = Thread(target=report, name=ClientHost + '_' + str(clientID), args=(ClientHost + '_' + str(clientID),)) + threads.append(t) t2 = Thread(target=listener) - t2.start() - while True: - time.sleep(5) - pass + threads.append(t2) + + # Start all threads + for t in threads: + t.start() + + # Wait for all of them to finish + for t in threads: + t.join()