1
0
mirror of https://github.com/newnius/YAO-agent.git synced 2025-06-06 21:31:55 +00:00
YAO-agent/executor.py

207 lines
5.8 KiB
Python
Raw Normal View History

2019-04-13 11:39:29 +00:00
#!/usr/bin/python
2019-12-26 08:21:17 +00:00
import threading
2019-04-13 11:39:29 +00:00
from http.server import BaseHTTPRequestHandler, HTTPServer
import cgi
2019-03-12 08:28:04 +00:00
import docker
2019-04-13 11:39:29 +00:00
import json
from urllib import parse
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
PORT_NUMBER = 8000
2019-03-12 08:28:04 +00:00
2019-12-26 08:21:17 +00:00
lock = threading.Lock()
pending_tasks = {}
def launch_tasks(stats):
client = docker.from_env()
container = client.containers.get('yao-agent-helper')
entries_to_remove = []
lock.acquire()
for task_id, task in pending_tasks.items():
if stats[task['gpus'][0]]['utilization_gpu'] < 75:
entries_to_remove.append(task_id)
script = " ".join([
"docker exec",
id,
"pkill sleep"
])
container.exec_run('sh -c \'' + script + '\'')
for k in entries_to_remove:
pending_tasks.pop(k, None)
lock.release()
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
class MyHandler(BaseHTTPRequestHandler):
# Handler for the GET requests
def do_GET(self):
req = parse.urlparse(self.path)
query = parse.parse_qs(req.query)
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
if req.path == "/ping":
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes("pong", "utf-8"))
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
elif req.path == "/logs":
try:
2019-06-10 07:32:46 +00:00
container_id = query.get('id')[0]
2019-04-13 11:39:29 +00:00
client = docker.from_env()
container = client.containers.get(container_id)
msg = {'code': 0, 'logs': str(container.logs().decode())}
except Exception as e:
2019-06-10 07:32:46 +00:00
msg = {'code': 1, 'error': str(e)}
2019-04-13 11:39:29 +00:00
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
elif req.path == "/status":
2019-06-10 07:32:46 +00:00
try:
container_id = query.get('id')[0]
client = docker.from_env()
container = client.containers.list(all=True, filters={'id': container_id})
if len(container) > 0:
container = container[0]
status = {
'id': container.short_id,
'image': container.attrs['Config']['Image'],
'image_digest': container.attrs['Image'],
'command': container.attrs['Config']['Cmd'],
'created_at': container.attrs['Created'],
'finished_at': container.attrs['State']['FinishedAt'],
'status': container.status,
'hostname': container.attrs['Config']['Hostname'],
'state': container.attrs['State']
}
if status['command'] is not None:
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
msg = {'code': 0, 'status': status}
else:
msg = {'code': 1, 'error': "container not exist"}
except Exception as e:
msg = {'code': 2, 'error': str(e)}
2019-04-13 11:39:29 +00:00
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
else:
self.send_error(404, 'File Not Found: %s' % self.path)
2019-03-12 08:28:04 +00:00
2019-04-13 11:39:29 +00:00
# Handler for the POST requests
def do_POST(self):
if self.path == "/create":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
2019-06-10 07:32:46 +00:00
docker_image = form.getvalue('image')
docker_name = form.getvalue('name')
docker_cmd = form.getvalue('cmd')
docker_workspace = form.getvalue('workspace')
docker_gpus = form.getvalue('gpus')
docker_mem_limit = form.getvalue('mem_limit')
2019-12-04 08:13:54 +00:00
docker_cpu_limit = form.getvalue('cpu_limit')
2019-06-10 07:32:46 +00:00
docker_network = form.getvalue('network')
2019-03-18 07:59:54 +00:00
2019-04-13 11:39:29 +00:00
try:
2019-12-04 08:13:54 +00:00
script = " ".join([
"docker run",
"--gpus '\"device=" + docker_gpus + "\"'",
"--detach=True",
"--hostname " + docker_name,
"--network " + docker_network,
"--network-alias " + docker_name,
"--memory-reservation " + docker_mem_limit,
"--cpus " + docker_cpu_limit,
"--env repo=" + docker_workspace,
docker_image,
docker_cmd
])
2019-06-13 09:35:48 +00:00
2019-12-04 08:13:54 +00:00
client = docker.from_env()
container = client.containers.get('yao-agent-helper')
exit_code, output = container.exec_run('sh -c \'' + script + '\'')
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
2019-12-26 08:21:17 +00:00
lock.acquire()
pending_tasks[msg['id']] = {'gpus': str(docker_gpus).split(',')}
lock.release()
2019-12-04 08:13:54 +00:00
if exit_code != 0:
msg["code"] = 1
2019-04-13 11:39:29 +00:00
except Exception as e:
msg = {"code": 1, "error": str(e)}
2019-03-18 07:59:54 +00:00
2019-04-13 11:39:29 +00:00
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
2019-03-18 07:59:54 +00:00
2019-04-13 11:39:29 +00:00
elif self.path == "/stop":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
2019-06-10 07:32:46 +00:00
container_id = form.getvalue('id')
2019-03-18 07:59:54 +00:00
2019-06-14 07:19:56 +00:00
try:
client = docker.from_env()
container = client.containers.get(container_id)
container.stop()
msg = {"code": 0, "error": "Success"}
except Exception as e:
msg = {"code": 1, "error": str(e)}
2019-04-13 11:39:29 +00:00
2019-04-24 07:14:01 +00:00
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
elif self.path == "/remove":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
2019-06-10 07:32:46 +00:00
container_id = form.getvalue('id')
2019-04-24 07:14:01 +00:00
2019-06-14 07:19:56 +00:00
try:
client = docker.from_env()
container = client.containers.get(container_id)
container.remove(force=True)
msg = {"code": 0, "error": "Success"}
except Exception as e:
msg = {"code": 1, "error": str(e)}
2019-04-24 07:14:01 +00:00
2019-04-13 11:39:29 +00:00
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
else:
self.send_error(404, 'File Not Found: %s' % self.path)
try:
# Create a web server and define the handler to manage the
# incoming request
server = HTTPServer(('', PORT_NUMBER), MyHandler)
print('Started http server on port ', PORT_NUMBER)
# Wait forever for incoming http requests
server.serve_forever()
except KeyboardInterrupt:
print('^C received, shutting down the web server')
server.socket.close()