diff --git a/agent.py b/agent.py index 1f3c714..5361f6e 100644 --- a/agent.py +++ b/agent.py @@ -286,8 +286,7 @@ def report(): print("execute failed, ", msg_gpu) stats = get_gpu_status() report_msg(stats) - t = Thread(target=launch_tasks, name='launch_tasks', args=(stats,)) - t.start() + Thread(target=launch_tasks, name='launch_tasks', args=(stats,)).start() except Exception as e: print(e) @@ -385,19 +384,12 @@ if __name__ == '__main__': os.environ["TZ"] = 'Asia/Shanghai' if hasattr(time, 'tzset'): time.tzset() - threads = [] - t1 = Thread(target=reporter) - threads.append(t1) - t2 = Thread(target=listener) - threads.append(t2) + + Thread(target=reporter).start() + Thread(target=listener).start() if EnableEventTrigger == 'true': - t3 = Thread(target=event_trigger) - threads.append(t3) + print('start event trigger') + Thread(target=event_trigger).start() - # Start all threads - for t in threads: - t.start() - - # Wait for all of them to finish - for t in threads: - t.join() + while True: + time.sleep(5) diff --git a/monitor.py b/monitor.py index b27c62d..b9a6f31 100644 --- a/monitor.py +++ b/monitor.py @@ -1,32 +1,29 @@ import os -import threading import time import subprocess import json from xml.dom.minidom import parse import xml.dom.minidom -from kafka import KafkaProducer import multiprocessing import psutil import math -from executor import launch_tasks - -ClientID = os.getenv('ClientID', 1) -ClientHost = os.getenv('ClientHost', "localhost") -KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',') +import datetime def main(): - interval = 5 + interval = 1 while True: try: status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml']) if not status: print("execute failed, ", msg_gpu) stats = get_gpu_status() - report_msg(stats) - t = threading.Thread(target=launch_tasks, name='launch_tasks',args=(stats,)) - t.start() + utils = [] + for stat in stats: + utils.append(str(stat['utilization_gpu'])) + # report_msg(stats) + t = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(str(t) + ',' + ','.join(utils)) time.sleep(interval) except Exception as e: print(e) @@ -85,8 +82,6 @@ def report_msg(stats): mem = psutil.virtual_memory() post_fields = { - 'id': ClientID, - 'host': ClientHost, 'status': stats, 'cpu_num': multiprocessing.cpu_count(), 'cpu_load': os.getloadavg()[0], @@ -95,11 +90,6 @@ def report_msg(stats): } data = json.dumps(post_fields) - producer = KafkaProducer(bootstrap_servers=KafkaBrokers) - future = producer.send('yao', value=data.encode(), partition=0) - result = future.get(timeout=10) - print(result) - if __name__ == '__main__': os.environ["TZ"] = 'Asia/Shanghai'