1
0
mirror of https://github.com/newnius/YAO-agent.git synced 2025-06-06 05:21:55 +00:00

add event driven heart beats

This commit is contained in:
Newnius 2020-05-23 18:22:12 +08:00
parent afc5b55a93
commit 055d554a43
2 changed files with 107 additions and 107 deletions

175
agent.py
View File

@ -14,61 +14,48 @@ from http.server import BaseHTTPRequestHandler, HTTPServer
import cgi
import docker
from urllib import parse
import random
import string
ClientID = os.getenv('ClientID', 1)
ClientHost = os.getenv('ClientHost', "localhost")
ClientExtHost = os.getenv('ClientExtHost', "localhost")
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
PORT_NUMBER = 8000
PORT = os.getenv('Port', 8000)
HeartbeatInterval = os.getenv('HeartbeatInterval', 5)
lock = Lock()
pending_tasks = {}
ver = 0
last_version = {}
# pending_tasks = {}
id2token = {}
counter = {}
def launch_task_in_background(container, task_id):
script = " ".join([
"docker exec",
task_id,
"pkill",
"sleep"
])
while True:
code = container.exec_run('sh -c \'' + script + '\'').exit_code
if code == 0:
break
time.sleep(0.1)
def generate_token(stringLength=8):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(stringLength))
def launch_tasks(stats):
utils = {}
mems = {}
mem_frees = {}
for stat in stats:
utils[stat['uuid']] = stat['utilization_gpu']
if int(stat['utilization_gpu']) < 60:
if int(stat['utilization_gpu']) < 10:
if stat['uuid'] not in counter:
counter[stat['uuid']] = 0
counter[stat['uuid']] += 1
else:
counter[stat['uuid']] = 0
mems[stat['uuid']] = stat['memory_free']
mem_frees[stat['uuid']] = stat['memory_free']
client = docker.from_env()
container = client.containers.get('yao-agent-helper')
entries_to_remove = []
lock.acquire()
for task_id, task in pending_tasks.items():
if int(utils[task['gpus'][0]]) < 60 and counter[task['gpus'][0]] >= 2 \
and mems[task['gpus'][0]] > task['gpu_mem']:
entries_to_remove.append(task_id)
t = Thread(target=launch_task_in_background, name='launch_task', args=(container, task_id,))
t.start()
for token, task in pending_tasks.items():
if int(utils[task['gpus'][0]]) < 10 and counter[task['gpus'][0]] >= 2 \
and mem_frees[task['gpus'][0]] > task['gpu_mem']:
entries_to_remove.append(token)
for k in entries_to_remove:
pending_tasks.pop(k, None)
@ -87,6 +74,33 @@ class MyHandler(BaseHTTPRequestHandler):
self.end_headers()
self.wfile.write(bytes("pong", "utf-8"))
if req.path == "/debug":
msg = {
'pending_tasks': pending_tasks,
'id2token': id2token
}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
if req.path == "/can_run":
res = "1"
try:
token = query.get('token')[0]
for i in range(0, 50):
if token in pending_tasks:
res = "0"
else:
break
time.sleep(0.1)
except Exception as e:
print(e)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(res, "utf-8"))
elif req.path == "/logs":
try:
container_id = query.get('id')[0]
@ -118,8 +132,10 @@ class MyHandler(BaseHTTPRequestHandler):
'hostname': container.attrs['Config']['Hostname'],
'state': container.attrs['State']
}
# if container_id in pending_tasks:
# status['status'] = 'ready'
if container_id in id2token:
token = id2token[container_id]
if token in pending_tasks:
status['status'] = 'ready'
if status['command'] is not None:
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
msg = {'code': 0, 'status': status}
@ -158,6 +174,7 @@ class MyHandler(BaseHTTPRequestHandler):
docker_hdfs_address = form.getvalue('hdfs_address')
docker_hdfs_dir = form.getvalue('hdfs_dir')
docker_gpu_mem = form.getvalue('gpu_mem')
token = generate_token(16)
try:
script = " ".join([
@ -171,6 +188,7 @@ class MyHandler(BaseHTTPRequestHandler):
"--cpus " + docker_cpu_limit,
"--env repo=" + docker_workspace,
"--env should_wait=" + docker_wait,
"--env should_cb=" + 'http://' + ClientExtHost + ':' + PORT + '/can_run?token=' + token,
"--env output_dir=" + docker_output,
"--env hdfs_address=" + docker_hdfs_address,
"--env hdfs_dir=" + docker_hdfs_dir,
@ -185,7 +203,8 @@ class MyHandler(BaseHTTPRequestHandler):
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
lock.acquire()
pending_tasks[msg['id']] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}
pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}
id2token[msg['id']] = token
lock.release()
if exit_code != 0:
msg["code"] = 1
@ -249,21 +268,30 @@ class MyHandler(BaseHTTPRequestHandler):
self.send_error(404, 'File Not Found: %s' % self.path)
def event_trigger():
client = docker.from_env()
for event in client.events(decode=True, filters={'event': 'die'}):
Thread(target=report).start()
print(event)
def report():
interval = 3
try:
status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml'])
if not status:
print("execute failed, ", msg_gpu)
stats = get_gpu_status()
report_msg(stats)
t = Thread(target=launch_tasks, name='launch_tasks', args=(stats,))
t.start()
except Exception as e:
print(e)
def reporter():
while True:
try:
status, msg_gpu = execute(['nvidia-smi', '-q', '-x', '-f', 'status.xml'])
if not status:
print("execute failed, ", msg_gpu)
stats = get_gpu_status()
report_msg(stats)
t = Thread(target=launch_tasks, name='launch_tasks', args=(stats,))
t.start()
time.sleep(interval)
except Exception as e:
print(e)
time.sleep(interval)
report()
time.sleep(HeartbeatInterval)
def execute(cmd):
@ -315,10 +343,7 @@ def get_gpu_status():
def report_msg(stats):
global last_version
global ver
mem = psutil.virtual_memory()
post_fields = {
'id': ClientID,
'host': ClientHost,
@ -326,59 +351,29 @@ def report_msg(stats):
'cpu_num': multiprocessing.cpu_count(),
'cpu_load': os.getloadavg()[0],
'mem_total': math.floor(mem.total / (1024. ** 3)),
'mem_available': math.floor(mem.available / (1024. ** 3))
'mem_available': math.floor(mem.available / (1024. ** 3)),
'version': time.time()
}
flag = False
if 'cpu_num' in last_version: # not null
if abs(last_version['cpu_num'] - post_fields['cpu_num']) > 0.0:
flag = True
if abs(last_version['cpu_load'] - post_fields['cpu_load']) / post_fields['cpu_num'] > 0.1:
flag = True
if abs(last_version['mem_total'] - post_fields['mem_total']) > 0.0:
flag = True
if abs(last_version['mem_available'] - post_fields['mem_available']) / post_fields['mem_available'] > 0.05:
flag = True
for i in range(len(stats)):
if abs(last_version['status'][i]['memory_total'] - post_fields['status'][i]['memory_total']) > 0.0:
flag = True
if abs(last_version['status'][i]['memory_free'] - post_fields['status'][i]['memory_free']) / \
post_fields['status'][i]['memory_total'] > 0.05:
flag = True
if abs(last_version['status'][i]['utilization_gpu'] - post_fields['status'][i]['utilization_gpu']) > 25.0:
flag = True
else:
flag = True
if flag:
ver = time.time()
last_version = post_fields
post_fields['version'] = ver
data = json.dumps(post_fields)
if flag:
print(ver)
print(post_fields)
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
future = producer.send('yao', value=data.encode(), partition=0)
result = future.get(timeout=10)
# print(result)
result = future.get(timeout=5)
def listener():
global server
try:
# Create a web server and define the handler to manage the
# incoming request
server = HTTPServer(('', PORT_NUMBER), MyHandler)
print('Started http server on port ', PORT_NUMBER)
server = HTTPServer(('', PORT), MyHandler)
print('Started http server on port ', PORT)
# Wait forever for incoming http requests
server.serve_forever()
except KeyboardInterrupt:
print('^C received, shutting down the web server')
server.socket.close()
@ -386,10 +381,12 @@ if __name__ == '__main__':
os.environ["TZ"] = 'Asia/Shanghai'
if hasattr(time, 'tzset'):
time.tzset()
t1 = Thread(target=report)
t1 = Thread(target=reporter)
t2 = Thread(target=listener)
t3 = Thread(target=event_trigger)
t1.start()
t2.start()
while True:
time.sleep(5)
pass
t3.start()
t1.join()
t2.join()
t3.join()

39
mock.py
View File

@ -10,8 +10,8 @@ NUMS = os.getenv('NUMS', 1)
ClientHost = os.getenv('ClientHost', "localhost")
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
PORT_NUMBER = 8000
PORT = os.getenv('Port', 8000)
HeartbeatInterval = os.getenv('HeartbeatInterval', 5)
class MyHandler(BaseHTTPRequestHandler):
@ -85,7 +85,6 @@ class MyHandler(BaseHTTPRequestHandler):
def report(ClientID):
interval = 3
while True:
try:
stats = []
@ -119,21 +118,21 @@ def report(ClientID):
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
future = producer.send('yao', value=data.encode(), partition=0)
result = future.get(timeout=10)
# print(result)
result = future.get(timeout=5)
time.sleep(interval)
time.sleep(HeartbeatInterval)
except Exception as e:
print(e)
time.sleep(interval)
time.sleep(HeartbeatInterval)
def listener():
global server
try:
# Create a web server and define the handler to manage the
# incoming request
server = HTTPServer(('', PORT_NUMBER), MyHandler)
print('Started http server on port ', PORT_NUMBER)
server = HTTPServer(('', PORT), MyHandler)
print('Started http server on port ', PORT)
# Wait forever for incoming http requests
server.serve_forever()
@ -147,14 +146,18 @@ if __name__ == '__main__':
os.environ["TZ"] = 'Asia/Shanghai'
if hasattr(time, 'tzset'):
time.tzset()
t1 = Thread(target=report)
for i in range(0, int(NUMS)):
t = Thread(target=report, name=ClientHost + '_' + str(i), args=(ClientHost + '_' + str(i),))
t.start()
threads = []
for clientID in range(0, int(NUMS)):
t = Thread(target=report, name=ClientHost + '_' + str(clientID), args=(ClientHost + '_' + str(clientID),))
threads.append(t)
t2 = Thread(target=listener)
t2.start()
while True:
time.sleep(5)
pass
threads.append(t2)
# Start all threads
for t in threads:
t.start()
# Wait for all of them to finish
for t in threads:
t.join()