diff --git a/executor.py b/executor.py index d261e9a..b3cfe1b 100644 --- a/executor.py +++ b/executor.py @@ -1,4 +1,5 @@ #!/usr/bin/python +import threading from http.server import BaseHTTPRequestHandler, HTTPServer import cgi import docker @@ -7,6 +8,29 @@ from urllib import parse PORT_NUMBER = 8000 +lock = threading.Lock() +pending_tasks = {} + + +def launch_tasks(stats): + client = docker.from_env() + container = client.containers.get('yao-agent-helper') + entries_to_remove = [] + lock.acquire() + for task_id, task in pending_tasks.items(): + if stats[task['gpus'][0]]['utilization_gpu'] < 75: + entries_to_remove.append(task_id) + script = " ".join([ + "docker exec", + id, + "pkill sleep" + ]) + container.exec_run('sh -c \'' + script + '\'') + + for k in entries_to_remove: + pending_tasks.pop(k, None) + lock.release() + class MyHandler(BaseHTTPRequestHandler): # Handler for the GET requests @@ -104,6 +128,10 @@ class MyHandler(BaseHTTPRequestHandler): container = client.containers.get('yao-agent-helper') exit_code, output = container.exec_run('sh -c \'' + script + '\'') msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')} + + lock.acquire() + pending_tasks[msg['id']] = {'gpus': str(docker_gpus).split(',')} + lock.release() if exit_code != 0: msg["code"] = 1 except Exception as e: diff --git a/monitor.py b/monitor.py index 069e0df..b27c62d 100644 --- a/monitor.py +++ b/monitor.py @@ -1,4 +1,5 @@ import os +import threading import time import subprocess import json @@ -8,6 +9,7 @@ from kafka import KafkaProducer import multiprocessing import psutil import math +from executor import launch_tasks ClientID = os.getenv('ClientID', 1) ClientHost = os.getenv('ClientHost', "localhost") @@ -15,13 +17,16 @@ KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') def main(): - interval = 10 + interval = 5 while True: try: status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml']) if not status: print("execute failed, ", msg_gpu) - report_msg() + stats = get_gpu_status() + report_msg(stats) + t = threading.Thread(target=launch_tasks, name='launch_tasks',args=(stats,)) + t.start() time.sleep(interval) except Exception as e: print(e) @@ -38,7 +43,7 @@ def execute(cmd): return False, e -def report_msg(): +def get_gpu_status(): DOMTree = xml.dom.minidom.parse("status.xml") collection = DOMTree.documentElement gpus = collection.getElementsByTagName("gpu") @@ -73,7 +78,10 @@ def report_msg(): stat['power_draw'] = int(float(stat['power_draw'].split(' ')[0])) stats.append(stat) + return stats + +def report_msg(stats): mem = psutil.virtual_memory() post_fields = {