2019-12-26 08:34:31 +00:00
|
|
|
import os
|
2019-12-26 08:47:04 +00:00
|
|
|
from threading import Thread
|
|
|
|
from threading import Lock
|
2019-12-26 08:34:31 +00:00
|
|
|
import time
|
|
|
|
import subprocess
|
|
|
|
import json
|
|
|
|
from xml.dom.minidom import parse
|
|
|
|
import xml.dom.minidom
|
|
|
|
from kafka import KafkaProducer
|
|
|
|
import multiprocessing
|
|
|
|
import psutil
|
|
|
|
import math
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
import cgi
|
|
|
|
import docker
|
|
|
|
from urllib import parse
|
2020-05-23 10:22:12 +00:00
|
|
|
import random
|
|
|
|
import string
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
ClientID = os.getenv('ClientID', 1)
|
|
|
|
ClientHost = os.getenv('ClientHost', "localhost")
|
2020-05-23 10:22:12 +00:00
|
|
|
ClientExtHost = os.getenv('ClientExtHost', "localhost")
|
2019-12-26 08:34:31 +00:00
|
|
|
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
|
|
|
|
|
2020-05-26 03:27:39 +00:00
|
|
|
RackID = os.getenv('RackID', "default")
|
|
|
|
DomainID = os.getenv('DomainID', "default")
|
|
|
|
|
2020-05-23 12:20:52 +00:00
|
|
|
PORT = int(os.getenv('Port', 8000))
|
|
|
|
HeartbeatInterval = int(os.getenv('HeartbeatInterval', 5))
|
2019-12-26 08:34:31 +00:00
|
|
|
|
2020-05-23 10:34:21 +00:00
|
|
|
EnableEventTrigger = os.getenv('EnableEventTrigger', 'true')
|
|
|
|
|
2019-12-26 08:47:04 +00:00
|
|
|
lock = Lock()
|
2020-05-23 10:45:44 +00:00
|
|
|
pending_tasks = {}
|
2020-05-23 10:22:12 +00:00
|
|
|
id2token = {}
|
2020-03-27 08:52:07 +00:00
|
|
|
|
2020-04-30 04:41:15 +00:00
|
|
|
counter = {}
|
|
|
|
|
2020-05-23 18:12:18 +00:00
|
|
|
event_counter = 0
|
|
|
|
|
|
|
|
client = docker.from_env()
|
|
|
|
|
2019-12-26 08:34:31 +00:00
|
|
|
|
2020-05-23 10:22:12 +00:00
|
|
|
def generate_token(stringLength=8):
|
|
|
|
letters = string.ascii_lowercase
|
|
|
|
return ''.join(random.choice(letters) for i in range(stringLength))
|
2020-04-11 02:51:51 +00:00
|
|
|
|
|
|
|
|
2019-12-26 08:34:31 +00:00
|
|
|
def launch_tasks(stats):
|
2019-12-26 08:53:41 +00:00
|
|
|
utils = {}
|
2020-05-23 10:22:12 +00:00
|
|
|
mem_frees = {}
|
2019-12-26 08:53:41 +00:00
|
|
|
for stat in stats:
|
|
|
|
utils[stat['uuid']] = stat['utilization_gpu']
|
2020-05-23 10:22:12 +00:00
|
|
|
if int(stat['utilization_gpu']) < 10:
|
2020-04-30 04:41:15 +00:00
|
|
|
if stat['uuid'] not in counter:
|
|
|
|
counter[stat['uuid']] = 0
|
|
|
|
counter[stat['uuid']] += 1
|
|
|
|
else:
|
|
|
|
counter[stat['uuid']] = 0
|
2020-05-23 10:22:12 +00:00
|
|
|
mem_frees[stat['uuid']] = stat['memory_free']
|
2019-12-26 08:53:41 +00:00
|
|
|
|
2019-12-26 08:34:31 +00:00
|
|
|
entries_to_remove = []
|
|
|
|
lock.acquire()
|
2020-05-23 10:22:12 +00:00
|
|
|
for token, task in pending_tasks.items():
|
|
|
|
if int(utils[task['gpus'][0]]) < 10 and counter[task['gpus'][0]] >= 2 \
|
|
|
|
and mem_frees[task['gpus'][0]] > task['gpu_mem']:
|
|
|
|
entries_to_remove.append(token)
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
for k in entries_to_remove:
|
|
|
|
pending_tasks.pop(k, None)
|
|
|
|
lock.release()
|
|
|
|
|
|
|
|
|
|
|
|
class MyHandler(BaseHTTPRequestHandler):
|
|
|
|
# Handler for the GET requests
|
|
|
|
def do_GET(self):
|
|
|
|
req = parse.urlparse(self.path)
|
|
|
|
query = parse.parse_qs(req.query)
|
|
|
|
|
|
|
|
if req.path == "/ping":
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes("pong", "utf-8"))
|
|
|
|
|
2020-05-23 12:20:52 +00:00
|
|
|
elif req.path == "/debug":
|
2020-05-23 10:22:12 +00:00
|
|
|
msg = {
|
|
|
|
'pending_tasks': pending_tasks,
|
2020-05-23 18:12:18 +00:00
|
|
|
'id2token': id2token,
|
|
|
|
'event_counter': event_counter
|
2020-05-23 10:22:12 +00:00
|
|
|
}
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
|
|
|
2020-05-23 12:20:52 +00:00
|
|
|
elif req.path == "/can_run":
|
2020-05-23 10:22:12 +00:00
|
|
|
res = "1"
|
|
|
|
try:
|
|
|
|
token = query.get('token')[0]
|
|
|
|
for i in range(0, 50):
|
|
|
|
if token in pending_tasks:
|
|
|
|
res = "0"
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
|
|
print(e)
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(res, "utf-8"))
|
|
|
|
|
2019-12-26 08:34:31 +00:00
|
|
|
elif req.path == "/logs":
|
|
|
|
try:
|
|
|
|
container_id = query.get('id')[0]
|
|
|
|
container = client.containers.get(container_id)
|
|
|
|
msg = {'code': 0, 'logs': str(container.logs().decode())}
|
|
|
|
except Exception as e:
|
|
|
|
msg = {'code': 1, 'error': str(e)}
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
|
|
|
|
|
|
elif req.path == "/status":
|
|
|
|
try:
|
|
|
|
container_id = query.get('id')[0]
|
|
|
|
container = client.containers.list(all=True, filters={'id': container_id})
|
|
|
|
if len(container) > 0:
|
|
|
|
container = container[0]
|
|
|
|
status = {
|
|
|
|
'id': container.short_id,
|
|
|
|
'image': container.attrs['Config']['Image'],
|
|
|
|
'image_digest': container.attrs['Image'],
|
|
|
|
'command': container.attrs['Config']['Cmd'],
|
|
|
|
'created_at': container.attrs['Created'],
|
|
|
|
'finished_at': container.attrs['State']['FinishedAt'],
|
|
|
|
'status': container.status,
|
|
|
|
'hostname': container.attrs['Config']['Hostname'],
|
|
|
|
'state': container.attrs['State']
|
|
|
|
}
|
2020-05-23 10:22:12 +00:00
|
|
|
if container_id in id2token:
|
|
|
|
token = id2token[container_id]
|
|
|
|
if token in pending_tasks:
|
|
|
|
status['status'] = 'ready'
|
2020-05-23 12:20:52 +00:00
|
|
|
else:
|
|
|
|
id2token.pop(container_id, None)
|
2019-12-26 08:34:31 +00:00
|
|
|
if status['command'] is not None:
|
|
|
|
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
|
|
|
|
msg = {'code': 0, 'status': status}
|
|
|
|
else:
|
|
|
|
msg = {'code': 1, 'error': "container not exist"}
|
|
|
|
except Exception as e:
|
|
|
|
msg = {'code': 2, 'error': str(e)}
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
|
|
|
|
|
|
else:
|
|
|
|
self.send_error(404, 'File Not Found: %s' % self.path)
|
|
|
|
|
|
|
|
# Handler for the POST requests
|
|
|
|
def do_POST(self):
|
|
|
|
if self.path == "/create":
|
|
|
|
form = cgi.FieldStorage(
|
|
|
|
fp=self.rfile,
|
|
|
|
headers=self.headers,
|
|
|
|
environ={
|
|
|
|
'REQUEST_METHOD': 'POST',
|
|
|
|
'CONTENT_TYPE': self.headers['Content-Type'],
|
|
|
|
})
|
|
|
|
docker_image = form.getvalue('image')
|
|
|
|
docker_name = form.getvalue('name')
|
|
|
|
docker_cmd = form.getvalue('cmd')
|
|
|
|
docker_workspace = form.getvalue('workspace')
|
|
|
|
docker_gpus = form.getvalue('gpus')
|
|
|
|
docker_mem_limit = form.getvalue('mem_limit')
|
|
|
|
docker_cpu_limit = form.getvalue('cpu_limit')
|
|
|
|
docker_network = form.getvalue('network')
|
2020-04-30 04:41:15 +00:00
|
|
|
docker_wait = form.getvalue('should_wait')
|
2020-05-04 04:13:42 +00:00
|
|
|
docker_output = form.getvalue('output_dir')
|
2020-05-04 07:20:19 +00:00
|
|
|
docker_hdfs_address = form.getvalue('hdfs_address')
|
2020-05-04 04:13:42 +00:00
|
|
|
docker_hdfs_dir = form.getvalue('hdfs_dir')
|
|
|
|
docker_gpu_mem = form.getvalue('gpu_mem')
|
2020-05-23 10:22:12 +00:00
|
|
|
token = generate_token(16)
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
script = " ".join([
|
|
|
|
"docker run",
|
2019-12-26 11:16:05 +00:00
|
|
|
"--gpus '\"device=" + docker_gpus + "\"'",
|
2019-12-26 08:34:31 +00:00
|
|
|
"--detach=True",
|
|
|
|
"--hostname " + docker_name,
|
|
|
|
"--network " + docker_network,
|
|
|
|
"--network-alias " + docker_name,
|
|
|
|
"--memory-reservation " + docker_mem_limit,
|
|
|
|
"--cpus " + docker_cpu_limit,
|
|
|
|
"--env repo=" + docker_workspace,
|
2020-04-30 04:41:15 +00:00
|
|
|
"--env should_wait=" + docker_wait,
|
2020-05-23 12:20:52 +00:00
|
|
|
"--env should_cb=" + 'http://' + ClientExtHost + ':' + str(PORT) + '/can_run?token=' + token,
|
2020-05-04 04:13:42 +00:00
|
|
|
"--env output_dir=" + docker_output,
|
2020-05-04 07:20:19 +00:00
|
|
|
"--env hdfs_address=" + docker_hdfs_address,
|
2020-05-04 04:13:42 +00:00
|
|
|
"--env hdfs_dir=" + docker_hdfs_dir,
|
|
|
|
"--env gpu_mem=" + docker_gpu_mem,
|
2019-12-26 08:34:31 +00:00
|
|
|
docker_image,
|
|
|
|
docker_cmd
|
|
|
|
])
|
|
|
|
|
|
|
|
container = client.containers.get('yao-agent-helper')
|
2019-12-26 11:14:18 +00:00
|
|
|
exit_code, output = container.exec_run(['sh', '-c', script])
|
2019-12-26 08:34:31 +00:00
|
|
|
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
|
|
|
|
|
2020-05-26 15:00:02 +00:00
|
|
|
if docker_wait == "1":
|
|
|
|
lock.acquire()
|
|
|
|
pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}
|
|
|
|
id2token[msg['id']] = token
|
|
|
|
lock.release()
|
2019-12-26 08:34:31 +00:00
|
|
|
if exit_code != 0:
|
|
|
|
msg["code"] = 1
|
2020-01-15 12:54:43 +00:00
|
|
|
msg["error"] = output.decode('utf-8').rstrip('\n')
|
2020-05-05 07:10:31 +00:00
|
|
|
print(msg["error"])
|
2019-12-26 08:34:31 +00:00
|
|
|
except Exception as e:
|
|
|
|
msg = {"code": 1, "error": str(e)}
|
2019-12-26 09:02:09 +00:00
|
|
|
print(str(e))
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
|
|
|
|
|
|
elif self.path == "/stop":
|
|
|
|
form = cgi.FieldStorage(
|
|
|
|
fp=self.rfile,
|
|
|
|
headers=self.headers,
|
|
|
|
environ={
|
|
|
|
'REQUEST_METHOD': 'POST',
|
|
|
|
'CONTENT_TYPE': self.headers['Content-Type'],
|
|
|
|
})
|
|
|
|
container_id = form.getvalue('id')
|
|
|
|
|
|
|
|
try:
|
|
|
|
container = client.containers.get(container_id)
|
2020-04-12 12:56:19 +00:00
|
|
|
container.stop(timeout=1)
|
2019-12-26 08:34:31 +00:00
|
|
|
msg = {"code": 0, "error": "Success"}
|
|
|
|
except Exception as e:
|
|
|
|
msg = {"code": 1, "error": str(e)}
|
|
|
|
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
|
|
|
|
|
|
elif self.path == "/remove":
|
|
|
|
form = cgi.FieldStorage(
|
|
|
|
fp=self.rfile,
|
|
|
|
headers=self.headers,
|
|
|
|
environ={
|
|
|
|
'REQUEST_METHOD': 'POST',
|
|
|
|
'CONTENT_TYPE': self.headers['Content-Type'],
|
|
|
|
})
|
|
|
|
container_id = form.getvalue('id')
|
|
|
|
|
|
|
|
try:
|
|
|
|
container = client.containers.get(container_id)
|
|
|
|
container.remove(force=True)
|
|
|
|
msg = {"code": 0, "error": "Success"}
|
|
|
|
except Exception as e:
|
|
|
|
msg = {"code": 1, "error": str(e)}
|
|
|
|
|
|
|
|
self.send_response(200)
|
|
|
|
self.send_header('Content-type', 'application/json')
|
|
|
|
self.end_headers()
|
|
|
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
|
|
else:
|
|
|
|
self.send_error(404, 'File Not Found: %s' % self.path)
|
|
|
|
|
|
|
|
|
2020-05-23 10:22:12 +00:00
|
|
|
def event_trigger():
|
2020-05-23 18:12:18 +00:00
|
|
|
global event_counter
|
2020-05-23 10:22:12 +00:00
|
|
|
for event in client.events(decode=True, filters={'event': 'die'}):
|
|
|
|
Thread(target=report).start()
|
2020-05-23 18:12:18 +00:00
|
|
|
event_counter += 1
|
2020-05-23 10:22:12 +00:00
|
|
|
print(event)
|
|
|
|
|
|
|
|
|
2019-12-26 08:34:31 +00:00
|
|
|
def report():
|
2020-05-23 10:22:12 +00:00
|
|
|
try:
|
|
|
|
status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml'])
|
|
|
|
if not status:
|
|
|
|
print("execute failed, ", msg_gpu)
|
|
|
|
stats = get_gpu_status()
|
|
|
|
report_msg(stats)
|
2020-05-23 16:09:02 +00:00
|
|
|
Thread(target=launch_tasks, name='launch_tasks', args=(stats,)).start()
|
2020-05-23 10:22:12 +00:00
|
|
|
except Exception as e:
|
|
|
|
print(e)
|
|
|
|
|
|
|
|
|
|
|
|
def reporter():
|
2019-12-26 08:34:31 +00:00
|
|
|
while True:
|
2020-05-23 10:22:12 +00:00
|
|
|
report()
|
|
|
|
time.sleep(HeartbeatInterval)
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
|
|
|
|
def execute(cmd):
|
|
|
|
try:
|
|
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
if result.returncode == 0:
|
|
|
|
return True, result.stdout.decode('utf-8').rstrip('\n')
|
|
|
|
return False, result.stderr.decode('utf-8').rstrip('\n')
|
|
|
|
except Exception as e:
|
|
|
|
return False, e
|
|
|
|
|
|
|
|
|
|
|
|
def get_gpu_status():
|
|
|
|
DOMTree = xml.dom.minidom.parse("status.xml")
|
|
|
|
collection = DOMTree.documentElement
|
|
|
|
gpus = collection.getElementsByTagName("gpu")
|
|
|
|
stats = []
|
|
|
|
for gpu in gpus:
|
|
|
|
stat = {
|
|
|
|
'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data,
|
|
|
|
'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data,
|
|
|
|
'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data,
|
|
|
|
'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[
|
|
|
|
0].data,
|
|
|
|
'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[
|
|
|
|
0].data,
|
|
|
|
'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[
|
|
|
|
0].data,
|
|
|
|
'utilization_gpu':
|
|
|
|
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data,
|
|
|
|
'utilization_mem':
|
|
|
|
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data,
|
|
|
|
'temperature_gpu':
|
|
|
|
gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data,
|
|
|
|
'power_draw':
|
|
|
|
gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data
|
|
|
|
}
|
|
|
|
|
|
|
|
stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0]))
|
|
|
|
stat['memory_free'] = int(float(stat['memory_free'].split(' ')[0]))
|
|
|
|
stat['memory_used'] = int(float(stat['memory_used'].split(' ')[0]))
|
|
|
|
stat['utilization_gpu'] = int(float(stat['utilization_gpu'].split(' ')[0]))
|
|
|
|
stat['utilization_mem'] = int(float(stat['utilization_mem'].split(' ')[0]))
|
|
|
|
stat['temperature_gpu'] = int(float(stat['temperature_gpu'].split(' ')[0]))
|
|
|
|
stat['power_draw'] = int(float(stat['power_draw'].split(' ')[0]))
|
|
|
|
|
|
|
|
stats.append(stat)
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
def report_msg(stats):
|
|
|
|
mem = psutil.virtual_memory()
|
|
|
|
post_fields = {
|
|
|
|
'id': ClientID,
|
2020-05-26 03:27:39 +00:00
|
|
|
'rack': RackID,
|
|
|
|
'domain': DomainID,
|
2019-12-26 08:34:31 +00:00
|
|
|
'host': ClientHost,
|
|
|
|
'status': stats,
|
|
|
|
'cpu_num': multiprocessing.cpu_count(),
|
|
|
|
'cpu_load': os.getloadavg()[0],
|
|
|
|
'mem_total': math.floor(mem.total / (1024. ** 3)),
|
2020-05-23 10:22:12 +00:00
|
|
|
'mem_available': math.floor(mem.available / (1024. ** 3)),
|
|
|
|
'version': time.time()
|
2019-12-26 08:34:31 +00:00
|
|
|
}
|
2020-03-27 08:52:07 +00:00
|
|
|
|
2019-12-26 08:34:31 +00:00
|
|
|
data = json.dumps(post_fields)
|
|
|
|
|
|
|
|
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
|
|
|
|
future = producer.send('yao', value=data.encode(), partition=0)
|
2020-05-23 10:22:12 +00:00
|
|
|
result = future.get(timeout=5)
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
|
2019-12-26 08:49:13 +00:00
|
|
|
def listener():
|
2020-05-23 10:22:12 +00:00
|
|
|
global server
|
2019-12-26 08:34:31 +00:00
|
|
|
try:
|
|
|
|
# Create a web server and define the handler to manage the
|
|
|
|
# incoming request
|
2020-05-23 10:22:12 +00:00
|
|
|
server = HTTPServer(('', PORT), MyHandler)
|
|
|
|
print('Started http server on port ', PORT)
|
2019-12-26 08:34:31 +00:00
|
|
|
|
|
|
|
# Wait forever for incoming http requests
|
|
|
|
server.serve_forever()
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
print('^C received, shutting down the web server')
|
|
|
|
server.socket.close()
|
|
|
|
|
|
|
|
|
2019-12-26 11:39:58 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
os.environ["TZ"] = 'Asia/Shanghai'
|
|
|
|
if hasattr(time, 'tzset'):
|
|
|
|
time.tzset()
|
2020-05-23 10:34:21 +00:00
|
|
|
|
2020-05-23 16:09:02 +00:00
|
|
|
Thread(target=reporter).start()
|
|
|
|
Thread(target=listener).start()
|
|
|
|
if EnableEventTrigger == 'true':
|
|
|
|
print('start event trigger')
|
|
|
|
Thread(target=event_trigger).start()
|
2020-05-23 10:34:21 +00:00
|
|
|
|
2020-05-23 16:09:02 +00:00
|
|
|
while True:
|
|
|
|
time.sleep(5)
|