1
0
mirror of https://github.com/newnius/YAO-agent.git synced 2025-06-06 21:31:55 +00:00
YAO-agent/agent.py

544 lines
16 KiB
Python
Raw Normal View History

2019-12-26 08:34:31 +00:00
import os
2019-12-26 08:47:04 +00:00
from threading import Thread
from threading import Lock
2019-12-26 08:34:31 +00:00
import time
import subprocess
import json
from xml.dom.minidom import parse
import xml.dom.minidom
from kafka import KafkaProducer
import multiprocessing
import psutil
import math
from http.server import BaseHTTPRequestHandler, HTTPServer
2020-05-27 12:49:18 +00:00
from socketserver import ThreadingMixIn
2019-12-26 08:34:31 +00:00
import cgi
import docker
from urllib import parse
2020-05-23 10:22:12 +00:00
import random
import string
2020-06-03 01:46:54 +00:00
from pathlib import Path
2019-12-26 08:34:31 +00:00
ClientID = os.getenv('ClientID', 1)
ClientHost = os.getenv('ClientHost', "localhost")
2020-05-23 10:22:12 +00:00
ClientExtHost = os.getenv('ClientExtHost', "localhost")
2019-12-26 08:34:31 +00:00
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
2020-05-26 03:27:39 +00:00
RackID = os.getenv('RackID', "default")
DomainID = os.getenv('DomainID', "default")
2020-05-23 12:20:52 +00:00
PORT = int(os.getenv('Port', 8000))
HeartbeatInterval = int(os.getenv('HeartbeatInterval', 5))
2019-12-26 08:34:31 +00:00
2020-05-23 10:34:21 +00:00
EnableEventTrigger = os.getenv('EnableEventTrigger', 'true')
2019-12-26 08:47:04 +00:00
lock = Lock()
2020-05-23 10:45:44 +00:00
pending_tasks = {}
2020-05-23 10:22:12 +00:00
id2token = {}
2020-03-27 08:52:07 +00:00
2020-04-30 04:41:15 +00:00
counter = {}
2020-05-23 18:12:18 +00:00
event_counter = 0
client = docker.from_env()
2020-06-09 14:08:51 +00:00
taskStats = {}
taskStatsLock = Lock()
2020-06-24 09:10:26 +00:00
active_stats = {0: {
'util': 0,
'mem_util': 0,
'mem': 0
}}
2019-12-26 08:34:31 +00:00
2020-05-23 10:22:12 +00:00
def generate_token(stringLength=8):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
2020-04-11 02:51:51 +00:00
2020-06-09 14:08:51 +00:00
def monitor_task(container_id):
print(container_id)
container = client.containers.get(container_id)
2020-06-24 09:10:26 +00:00
pid = 0
2020-06-09 14:08:51 +00:00
maxCPU = 0
maxMem = 0
last_bw_rx = 0
last_bw_tx = 0
last_time = time.time() - 1
for statR in container.stats():
2020-06-24 09:10:26 +00:00
if pid == 0:
res = container.top()['Processes']
for x in res:
if "/workspace" in x[7] and int(x[1]) in active_stats:
pid = int(x[1])
break
2020-06-09 14:08:51 +00:00
stat = json.loads(statR)
# print(stat)
if stat['read'] == '0001-01-01T00:00:00Z':
print('container ', container_id, ' exited')
break
taskStatsLock.acquire()
# CPU load, x%
cur = stat['cpu_stats']['cpu_usage']['total_usage']
last = stat['precpu_stats']['cpu_usage']['total_usage']
utilCPU = (cur - last) / 10000000
# Memory, MB
mem = stat['memory_stats']['stats']['active_anon']
mem = mem / 1024
mem = mem / 1024
# Bandwidth, KB/s
cur = stat['networks']['eth0']['rx_bytes'] / 1024
bw_rx = cur - last_bw_rx
last_bw_rx = cur
cur = stat['networks']['eth0']['tx_bytes'] / 1024
bw_tx = cur - last_bw_tx
last_bw_tx = cur
now = time.time()
dur = now - last_time
last_time = now
bw_rx /= dur
bw_tx /= dur
2020-06-24 09:10:26 +00:00
taskStats[container_id] = {
'cpu': utilCPU,
'mem': mem,
'bw_rx': bw_rx,
'bw_tx': bw_tx,
'gpu_util': active_stats[pid]['util'],
'gpu_mem_util': active_stats[pid]['mem_util'],
'gpu_mem': active_stats[pid]['mem'],
}
# print(taskStats[container_id])
2020-06-09 14:08:51 +00:00
# print(utilCPU, mem, maxCPU, maxMem, bw_rx, bw_tx)
taskStatsLock.release()
if stat['preread'] == '0001-01-01T00:00:00Z':
continue
if utilCPU > maxCPU:
maxCPU = utilCPU
if mem > maxMem:
maxMem = mem
2019-12-26 08:34:31 +00:00
def launch_tasks(stats):
2019-12-26 08:53:41 +00:00
utils = {}
2020-05-23 10:22:12 +00:00
mem_frees = {}
2019-12-26 08:53:41 +00:00
for stat in stats:
utils[stat['uuid']] = stat['utilization_gpu']
2020-05-23 10:22:12 +00:00
if int(stat['utilization_gpu']) < 10:
2020-04-30 04:41:15 +00:00
if stat['uuid'] not in counter:
counter[stat['uuid']] = 0
counter[stat['uuid']] += 1
else:
counter[stat['uuid']] = 0
2020-05-23 10:22:12 +00:00
mem_frees[stat['uuid']] = stat['memory_free']
2019-12-26 08:53:41 +00:00
2019-12-26 08:34:31 +00:00
entries_to_remove = []
lock.acquire()
2020-05-23 10:22:12 +00:00
for token, task in pending_tasks.items():
if int(utils[task['gpus'][0]]) < 10 and counter[task['gpus'][0]] >= 2 \
and mem_frees[task['gpus'][0]] > task['gpu_mem']:
entries_to_remove.append(token)
2019-12-26 08:34:31 +00:00
for k in entries_to_remove:
pending_tasks.pop(k, None)
lock.release()
class MyHandler(BaseHTTPRequestHandler):
# Handler for the GET requests
def do_GET(self):
req = parse.urlparse(self.path)
query = parse.parse_qs(req.query)
if req.path == "/ping":
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes("pong", "utf-8"))
2020-05-23 12:20:52 +00:00
elif req.path == "/debug":
2020-05-23 10:22:12 +00:00
msg = {
'pending_tasks': pending_tasks,
2020-05-23 18:12:18 +00:00
'id2token': id2token,
2020-06-09 14:08:51 +00:00
'event_counter': event_counter,
'taskStats': taskStats
2020-05-23 10:22:12 +00:00
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
2020-05-23 12:20:52 +00:00
elif req.path == "/can_run":
2020-05-23 10:22:12 +00:00
res = "1"
try:
token = query.get('token')[0]
for i in range(0, 50):
if token in pending_tasks:
res = "0"
else:
break
time.sleep(0.1)
except Exception as e:
print(e)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(res, "utf-8"))
2019-12-26 08:34:31 +00:00
elif req.path == "/logs":
try:
container_id = query.get('id')[0]
container = client.containers.get(container_id)
msg = {'code': 0, 'logs': str(container.logs().decode())}
except Exception as e:
msg = {'code': 1, 'error': str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
elif req.path == "/status":
try:
container_id = query.get('id')[0]
container = client.containers.list(all=True, filters={'id': container_id})
if len(container) > 0:
container = container[0]
status = {
'id': container.short_id,
'image': container.attrs['Config']['Image'],
'image_digest': container.attrs['Image'],
'command': container.attrs['Config']['Cmd'],
'created_at': container.attrs['Created'],
'finished_at': container.attrs['State']['FinishedAt'],
'status': container.status,
'hostname': container.attrs['Config']['Hostname'],
2020-06-09 14:08:51 +00:00
'state': container.attrs['State'],
'cpu': 0,
'mem': 0,
'bw_rx': 0,
'bw_tx': 0
2019-12-26 08:34:31 +00:00
}
2020-06-09 14:08:51 +00:00
if container_id in taskStats:
taskStatsLock.acquire()
status['cpu'] = taskStats[container_id]['cpu']
status['mem'] = taskStats[container_id]['mem']
status['bw_rx'] = taskStats[container_id]['bw_rx']
status['bw_tx'] = taskStats[container_id]['bw_tx']
2020-06-24 09:10:26 +00:00
status['bw_tx'] = taskStats[container_id]['bw_tx']
status['gpu_util'] = taskStats[container_id]['gpu_util']
status['gpu_mem_util'] = taskStats[container_id]['gpu_mem_util']
status['gpu_mem'] = taskStats[container_id]['gpu_mem']
2020-06-09 14:08:51 +00:00
taskStatsLock.release()
2020-05-23 10:22:12 +00:00
if container_id in id2token:
token = id2token[container_id]
if token in pending_tasks:
status['status'] = 'ready'
2020-05-23 12:20:52 +00:00
else:
id2token.pop(container_id, None)
2019-12-26 08:34:31 +00:00
if status['command'] is not None:
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
msg = {'code': 0, 'status': status}
else:
msg = {'code': 1, 'error': "container not exist"}
except Exception as e:
msg = {'code': 2, 'error': str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
else:
self.send_error(404, 'File Not Found: %s' % self.path)
# Handler for the POST requests
def do_POST(self):
if self.path == "/create":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
docker_image = form.getvalue('image')
docker_name = form.getvalue('name')
docker_cmd = form.getvalue('cmd')
docker_workspace = form.getvalue('workspace')
docker_gpus = form.getvalue('gpus')
docker_mem_limit = form.getvalue('mem_limit')
docker_cpu_limit = form.getvalue('cpu_limit')
docker_network = form.getvalue('network')
2020-04-30 04:41:15 +00:00
docker_wait = form.getvalue('should_wait')
2020-05-04 04:13:42 +00:00
docker_output = form.getvalue('output_dir')
2020-05-04 07:20:19 +00:00
docker_hdfs_address = form.getvalue('hdfs_address')
2020-05-04 04:13:42 +00:00
docker_hdfs_dir = form.getvalue('hdfs_dir')
docker_gpu_mem = form.getvalue('gpu_mem')
2020-06-02 13:54:19 +00:00
docker_dfs_src = form.getvalue('dfs_src')
docker_dfs_dst = form.getvalue('dfs_dst')
2020-05-23 10:22:12 +00:00
token = generate_token(16)
2019-12-26 08:34:31 +00:00
2020-06-02 13:54:19 +00:00
try:
2020-06-03 01:46:54 +00:00
# Docker wouldn't create dir by default on bind mode,
# see https://github.com/moby/moby/issues/13121
path = Path(docker_dfs_src)
path.mkdir(parents=True, exist_ok=True)
2020-06-02 13:54:19 +00:00
except OSError as e:
print("Creation of the directory %s failed" % docker_dfs_src)
print(e)
2019-12-26 08:34:31 +00:00
try:
2020-06-09 14:08:51 +00:00
# set PYTHONUNBUFFERED=1 to output immediately
# see https://tarunlalwani.com/post/why-delayed-output-python-docker/
2019-12-26 08:34:31 +00:00
script = " ".join([
"docker run",
2019-12-26 11:16:05 +00:00
"--gpus '\"device=" + docker_gpus + "\"'",
2019-12-26 08:34:31 +00:00
"--detach=True",
"--hostname " + docker_name,
"--network " + docker_network,
"--network-alias " + docker_name,
"--memory-reservation " + docker_mem_limit,
"--cpus " + docker_cpu_limit,
"--env repo=" + docker_workspace,
2020-04-30 04:41:15 +00:00
"--env should_wait=" + docker_wait,
2020-05-23 12:20:52 +00:00
"--env should_cb=" + 'http://' + ClientExtHost + ':' + str(PORT) + '/can_run?token=' + token,
2020-05-04 04:13:42 +00:00
"--env output_dir=" + docker_output,
2020-05-04 07:20:19 +00:00
"--env hdfs_address=" + docker_hdfs_address,
2020-05-04 04:13:42 +00:00
"--env hdfs_dir=" + docker_hdfs_dir,
"--env gpu_mem=" + docker_gpu_mem,
2020-06-04 07:32:52 +00:00
"--env PYTHONUNBUFFERED=1",
2020-06-02 13:54:19 +00:00
"--mount type=bind,src=" + docker_dfs_src + ",dst=" + docker_dfs_dst,
2019-12-26 08:34:31 +00:00
docker_image,
docker_cmd
])
container = client.containers.get('yao-agent-helper')
2019-12-26 11:14:18 +00:00
exit_code, output = container.exec_run(['sh', '-c', script])
2019-12-26 08:34:31 +00:00
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
2020-06-09 14:08:51 +00:00
Thread(target=monitor_task, name='monitor_task', args=(msg['id'],)).start()
2020-05-26 15:00:02 +00:00
if docker_wait == "1":
lock.acquire()
pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}
id2token[msg['id']] = token
lock.release()
2019-12-26 08:34:31 +00:00
if exit_code != 0:
msg["code"] = 1
2020-01-15 12:54:43 +00:00
msg["error"] = output.decode('utf-8').rstrip('\n')
2020-05-05 07:10:31 +00:00
print(msg["error"])
2019-12-26 08:34:31 +00:00
except Exception as e:
msg = {"code": 1, "error": str(e)}
2019-12-26 09:02:09 +00:00
print(str(e))
2019-12-26 08:34:31 +00:00
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
elif self.path == "/stop":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
container_id = form.getvalue('id')
try:
container = client.containers.get(container_id)
2020-04-12 12:56:19 +00:00
container.stop(timeout=1)
2019-12-26 08:34:31 +00:00
msg = {"code": 0, "error": "Success"}
except Exception as e:
msg = {"code": 1, "error": str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
elif self.path == "/remove":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
container_id = form.getvalue('id')
try:
container = client.containers.get(container_id)
container.remove(force=True)
msg = {"code": 0, "error": "Success"}
except Exception as e:
msg = {"code": 1, "error": str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
else:
self.send_error(404, 'File Not Found: %s' % self.path)
2020-05-27 12:49:18 +00:00
class ThreadingSimpleServer(ThreadingMixIn, HTTPServer):
pass
2020-05-23 10:22:12 +00:00
def event_trigger():
2020-05-23 18:12:18 +00:00
global event_counter
2020-05-23 10:22:12 +00:00
for event in client.events(decode=True, filters={'event': 'die'}):
Thread(target=report).start()
2020-05-23 18:12:18 +00:00
event_counter += 1
2020-05-23 10:22:12 +00:00
print(event)
2019-12-26 08:34:31 +00:00
def report():
2020-05-23 10:22:12 +00:00
try:
status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml'])
if not status:
print("execute failed, ", msg_gpu)
stats = get_gpu_status()
report_msg(stats)
2020-05-23 16:09:02 +00:00
Thread(target=launch_tasks, name='launch_tasks', args=(stats,)).start()
2020-05-23 10:22:12 +00:00
except Exception as e:
print(e)
def reporter():
2019-12-26 08:34:31 +00:00
while True:
2020-05-23 10:22:12 +00:00
report()
time.sleep(HeartbeatInterval)
2019-12-26 08:34:31 +00:00
2020-06-24 09:10:26 +00:00
def pmon():
while True:
try:
status, msg_gpu = execute(['nvidia-smi', 'pmon', '-c', '1', '-s', 'um'])
if not status:
print("[WARN] execute failed, ", msg_gpu, status)
lists = msg_gpu.split('\n')
for p in lists:
if "#" not in p and "-" not in p:
tmp = p.split()
data = {
'idx': int(tmp[0]),
'pid': int(tmp[1]),
'util': int(tmp[3]),
'mem_util': int(tmp[4]),
'mem': int(tmp[7])
}
active_stats[int(tmp[1])] = data
except Exception as e:
print(e)
time.sleep(HeartbeatInterval)
2019-12-26 08:34:31 +00:00
def execute(cmd):
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return True, result.stdout.decode('utf-8').rstrip('\n')
return False, result.stderr.decode('utf-8').rstrip('\n')
except Exception as e:
return False, e
def get_gpu_status():
DOMTree = xml.dom.minidom.parse("status.xml")
collection = DOMTree.documentElement
gpus = collection.getElementsByTagName("gpu")
stats = []
for gpu in gpus:
stat = {
'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data,
'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data,
'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data,
'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[
0].data,
'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[
0].data,
'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[
0].data,
'utilization_gpu':
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data,
'utilization_mem':
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data,
'temperature_gpu':
gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data,
'power_draw':
gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data
}
stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0]))
stat['memory_free'] = int(float(stat['memory_free'].split(' ')[0]))
stat['memory_used'] = int(float(stat['memory_used'].split(' ')[0]))
stat['utilization_gpu'] = int(float(stat['utilization_gpu'].split(' ')[0]))
stat['utilization_mem'] = int(float(stat['utilization_mem'].split(' ')[0]))
stat['temperature_gpu'] = int(float(stat['temperature_gpu'].split(' ')[0]))
stat['power_draw'] = int(float(stat['power_draw'].split(' ')[0]))
stats.append(stat)
return stats
def report_msg(stats):
mem = psutil.virtual_memory()
post_fields = {
'id': ClientID,
2020-05-26 03:27:39 +00:00
'rack': RackID,
'domain': DomainID,
2019-12-26 08:34:31 +00:00
'host': ClientHost,
'status': stats,
'cpu_num': multiprocessing.cpu_count(),
'cpu_load': os.getloadavg()[0],
'mem_total': math.floor(mem.total / (1024. ** 3)),
2020-05-23 10:22:12 +00:00
'mem_available': math.floor(mem.available / (1024. ** 3)),
'version': time.time()
2019-12-26 08:34:31 +00:00
}
2020-03-27 08:52:07 +00:00
2019-12-26 08:34:31 +00:00
data = json.dumps(post_fields)
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
future = producer.send('yao', value=data.encode(), partition=0)
2020-05-23 10:22:12 +00:00
result = future.get(timeout=5)
2019-12-26 08:34:31 +00:00
2019-12-26 08:49:13 +00:00
def listener():
2020-05-23 10:22:12 +00:00
global server
2019-12-26 08:34:31 +00:00
try:
# Create a web server and define the handler to manage the
# incoming request
2020-05-27 12:49:18 +00:00
server = ThreadingSimpleServer(('', PORT), MyHandler)
2020-05-23 10:22:12 +00:00
print('Started http server on port ', PORT)
2019-12-26 08:34:31 +00:00
# Wait forever for incoming http requests
server.serve_forever()
except KeyboardInterrupt:
print('^C received, shutting down the web server')
server.socket.close()
2019-12-26 11:39:58 +00:00
if __name__ == '__main__':
os.environ["TZ"] = 'Asia/Shanghai'
if hasattr(time, 'tzset'):
time.tzset()
2020-05-23 10:34:21 +00:00
2020-05-23 16:09:02 +00:00
Thread(target=reporter).start()
Thread(target=listener).start()
2020-06-24 09:10:26 +00:00
Thread(target=pmon).start()
2020-05-23 16:09:02 +00:00
if EnableEventTrigger == 'true':
print('start event trigger')
Thread(target=event_trigger).start()
2020-05-23 10:34:21 +00:00
2020-05-23 16:09:02 +00:00
while True:
time.sleep(5)