mirror of
https://github.com/newnius/YAO-agent.git
synced 2025-12-12 21:16:44 +00:00
all in one
This commit is contained in:
320
main.py
Normal file
320
main.py
Normal file
@@ -0,0 +1,320 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import subprocess
|
||||
import json
|
||||
from xml.dom.minidom import parse
|
||||
import xml.dom.minidom
|
||||
from kafka import KafkaProducer
|
||||
import multiprocessing
|
||||
import psutil
|
||||
import math
|
||||
from executor import launch_tasks
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
import cgi
|
||||
import docker
|
||||
from urllib import parse
|
||||
|
||||
ClientID = os.getenv('ClientID', 1)
|
||||
ClientHost = os.getenv('ClientHost', "localhost")
|
||||
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
|
||||
|
||||
PORT_NUMBER = 8000
|
||||
|
||||
lock = threading.Lock()
|
||||
pending_tasks = {}
|
||||
|
||||
|
||||
def launch_tasks(stats):
|
||||
client = docker.from_env()
|
||||
container = client.containers.get('yao-agent-helper')
|
||||
entries_to_remove = []
|
||||
lock.acquire()
|
||||
for task_id, task in pending_tasks.items():
|
||||
if stats[task['gpus'][0]]['utilization_gpu'] < 75:
|
||||
entries_to_remove.append(task_id)
|
||||
script = " ".join([
|
||||
"docker exec",
|
||||
id,
|
||||
"pkill sleep"
|
||||
])
|
||||
container.exec_run('sh -c \'' + script + '\'')
|
||||
|
||||
for k in entries_to_remove:
|
||||
pending_tasks.pop(k, None)
|
||||
lock.release()
|
||||
|
||||
|
||||
class MyHandler(BaseHTTPRequestHandler):
|
||||
# Handler for the GET requests
|
||||
def do_GET(self):
|
||||
req = parse.urlparse(self.path)
|
||||
query = parse.parse_qs(req.query)
|
||||
|
||||
if req.path == "/ping":
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes("pong", "utf-8"))
|
||||
|
||||
elif req.path == "/logs":
|
||||
try:
|
||||
container_id = query.get('id')[0]
|
||||
client = docker.from_env()
|
||||
container = client.containers.get(container_id)
|
||||
msg = {'code': 0, 'logs': str(container.logs().decode())}
|
||||
except Exception as e:
|
||||
msg = {'code': 1, 'error': str(e)}
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||
|
||||
elif req.path == "/status":
|
||||
try:
|
||||
container_id = query.get('id')[0]
|
||||
client = docker.from_env()
|
||||
container = client.containers.list(all=True, filters={'id': container_id})
|
||||
if len(container) > 0:
|
||||
container = container[0]
|
||||
status = {
|
||||
'id': container.short_id,
|
||||
'image': container.attrs['Config']['Image'],
|
||||
'image_digest': container.attrs['Image'],
|
||||
'command': container.attrs['Config']['Cmd'],
|
||||
'created_at': container.attrs['Created'],
|
||||
'finished_at': container.attrs['State']['FinishedAt'],
|
||||
'status': container.status,
|
||||
'hostname': container.attrs['Config']['Hostname'],
|
||||
'state': container.attrs['State']
|
||||
}
|
||||
if status['command'] is not None:
|
||||
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
|
||||
msg = {'code': 0, 'status': status}
|
||||
else:
|
||||
msg = {'code': 1, 'error': "container not exist"}
|
||||
except Exception as e:
|
||||
msg = {'code': 2, 'error': str(e)}
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||
|
||||
else:
|
||||
self.send_error(404, 'File Not Found: %s' % self.path)
|
||||
|
||||
# Handler for the POST requests
|
||||
def do_POST(self):
|
||||
if self.path == "/create":
|
||||
form = cgi.FieldStorage(
|
||||
fp=self.rfile,
|
||||
headers=self.headers,
|
||||
environ={
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||
})
|
||||
docker_image = form.getvalue('image')
|
||||
docker_name = form.getvalue('name')
|
||||
docker_cmd = form.getvalue('cmd')
|
||||
docker_workspace = form.getvalue('workspace')
|
||||
docker_gpus = form.getvalue('gpus')
|
||||
docker_mem_limit = form.getvalue('mem_limit')
|
||||
docker_cpu_limit = form.getvalue('cpu_limit')
|
||||
docker_network = form.getvalue('network')
|
||||
|
||||
try:
|
||||
script = " ".join([
|
||||
"docker run",
|
||||
"--gpus '\"device=" + docker_gpus + "\"'",
|
||||
"--detach=True",
|
||||
"--hostname " + docker_name,
|
||||
"--network " + docker_network,
|
||||
"--network-alias " + docker_name,
|
||||
"--memory-reservation " + docker_mem_limit,
|
||||
"--cpus " + docker_cpu_limit,
|
||||
"--env repo=" + docker_workspace,
|
||||
docker_image,
|
||||
docker_cmd
|
||||
])
|
||||
|
||||
client = docker.from_env()
|
||||
container = client.containers.get('yao-agent-helper')
|
||||
exit_code, output = container.exec_run('sh -c \'' + script + '\'')
|
||||
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
|
||||
|
||||
lock.acquire()
|
||||
pending_tasks[msg['id']] = {'gpus': str(docker_gpus).split(',')}
|
||||
lock.release()
|
||||
if exit_code != 0:
|
||||
msg["code"] = 1
|
||||
except Exception as e:
|
||||
msg = {"code": 1, "error": str(e)}
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||
|
||||
elif self.path == "/stop":
|
||||
form = cgi.FieldStorage(
|
||||
fp=self.rfile,
|
||||
headers=self.headers,
|
||||
environ={
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||
})
|
||||
container_id = form.getvalue('id')
|
||||
|
||||
try:
|
||||
client = docker.from_env()
|
||||
container = client.containers.get(container_id)
|
||||
container.stop()
|
||||
msg = {"code": 0, "error": "Success"}
|
||||
except Exception as e:
|
||||
msg = {"code": 1, "error": str(e)}
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||
|
||||
elif self.path == "/remove":
|
||||
form = cgi.FieldStorage(
|
||||
fp=self.rfile,
|
||||
headers=self.headers,
|
||||
environ={
|
||||
'REQUEST_METHOD': 'POST',
|
||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||
})
|
||||
container_id = form.getvalue('id')
|
||||
|
||||
try:
|
||||
client = docker.from_env()
|
||||
container = client.containers.get(container_id)
|
||||
container.remove(force=True)
|
||||
msg = {"code": 0, "error": "Success"}
|
||||
except Exception as e:
|
||||
msg = {"code": 1, "error": str(e)}
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header('Content-type', 'application/json')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||
else:
|
||||
self.send_error(404, 'File Not Found: %s' % self.path)
|
||||
|
||||
|
||||
def report():
|
||||
interval = 5
|
||||
while True:
|
||||
try:
|
||||
status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml'])
|
||||
if not status:
|
||||
print("execute failed, ", msg_gpu)
|
||||
stats = get_gpu_status()
|
||||
report_msg(stats)
|
||||
t = threading.Thread(target=launch_tasks, name='launch_tasks', args=(stats,))
|
||||
t.start()
|
||||
time.sleep(interval)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def execute(cmd):
|
||||
try:
|
||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
if result.returncode == 0:
|
||||
return True, result.stdout.decode('utf-8').rstrip('\n')
|
||||
return False, result.stderr.decode('utf-8').rstrip('\n')
|
||||
except Exception as e:
|
||||
return False, e
|
||||
|
||||
|
||||
def get_gpu_status():
|
||||
DOMTree = xml.dom.minidom.parse("status.xml")
|
||||
collection = DOMTree.documentElement
|
||||
gpus = collection.getElementsByTagName("gpu")
|
||||
stats = []
|
||||
for gpu in gpus:
|
||||
stat = {
|
||||
'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data,
|
||||
'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data,
|
||||
'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data,
|
||||
'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[
|
||||
0].data,
|
||||
'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[
|
||||
0].data,
|
||||
'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[
|
||||
0].data,
|
||||
'utilization_gpu':
|
||||
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data,
|
||||
'utilization_mem':
|
||||
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data,
|
||||
'temperature_gpu':
|
||||
gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data,
|
||||
'power_draw':
|
||||
gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data
|
||||
}
|
||||
|
||||
stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0]))
|
||||
stat['memory_free'] = int(float(stat['memory_free'].split(' ')[0]))
|
||||
stat['memory_used'] = int(float(stat['memory_used'].split(' ')[0]))
|
||||
stat['utilization_gpu'] = int(float(stat['utilization_gpu'].split(' ')[0]))
|
||||
stat['utilization_mem'] = int(float(stat['utilization_mem'].split(' ')[0]))
|
||||
stat['temperature_gpu'] = int(float(stat['temperature_gpu'].split(' ')[0]))
|
||||
stat['power_draw'] = int(float(stat['power_draw'].split(' ')[0]))
|
||||
|
||||
stats.append(stat)
|
||||
return stats
|
||||
|
||||
|
||||
def report_msg(stats):
|
||||
mem = psutil.virtual_memory()
|
||||
|
||||
post_fields = {
|
||||
'id': ClientID,
|
||||
'host': ClientHost,
|
||||
'status': stats,
|
||||
'cpu_num': multiprocessing.cpu_count(),
|
||||
'cpu_load': os.getloadavg()[0],
|
||||
'mem_total': math.floor(mem.total / (1024. ** 3)),
|
||||
'mem_available': math.floor(mem.available / (1024. ** 3))
|
||||
}
|
||||
data = json.dumps(post_fields)
|
||||
|
||||
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
|
||||
future = producer.send('yao', value=data.encode(), partition=0)
|
||||
result = future.get(timeout=10)
|
||||
print(result)
|
||||
|
||||
|
||||
def listen():
|
||||
try:
|
||||
# Create a web server and define the handler to manage the
|
||||
# incoming request
|
||||
server = HTTPServer(('', PORT_NUMBER), MyHandler)
|
||||
print('Started http server on port ', PORT_NUMBER)
|
||||
|
||||
# Wait forever for incoming http requests
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print('^C received, shutting down the web server')
|
||||
|
||||
server.socket.close()
|
||||
|
||||
|
||||
def main():
|
||||
t1 = threading.Thread(target=report())
|
||||
t2 = threading.Thread(target=listen())
|
||||
t1.start()
|
||||
t2.start()
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
os.environ["TZ"] = 'Asia/Shanghai'
|
||||
if hasattr(time, 'tzset'):
|
||||
time.tzset()
|
||||
main()
|
||||
Reference in New Issue
Block a user