mirror of
https://github.com/newnius/YAO-agent.git
synced 2025-06-07 13:51:56 +00:00
update
This commit is contained in:
parent
21b1d2edd7
commit
d2889614ee
@ -6,8 +6,8 @@ RUN pip3 install docker kafka
|
|||||||
|
|
||||||
ADD bootstrap.sh /etc/bootstrap.sh
|
ADD bootstrap.sh /etc/bootstrap.sh
|
||||||
|
|
||||||
ADD yao-agent.py /root/yao-agent.py
|
ADD monitor.py /root/monitor.py
|
||||||
ADD server.py /root/server.py
|
ADD executor.py /root/executor.py
|
||||||
|
|
||||||
WORKDIR /root
|
WORKDIR /root
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# TODO: monitor the processes
|
# TODO: monitor the processes
|
||||||
|
|
||||||
python3 /root/yao-agent.py &
|
python3 /root/monitor.py &
|
||||||
|
|
||||||
python3 /root/server.py &
|
python3 /root/server.py &
|
||||||
|
|
||||||
|
173
executor.py
173
executor.py
@ -1,52 +1,137 @@
|
|||||||
|
#!/usr/bin/python
|
||||||
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||||
|
import cgi
|
||||||
import docker
|
import docker
|
||||||
|
import json
|
||||||
|
from urllib import parse
|
||||||
|
|
||||||
|
PORT_NUMBER = 8000
|
||||||
|
|
||||||
|
|
||||||
def run():
|
# This class will handles any incoming request from
|
||||||
client = docker.from_env()
|
# the browser
|
||||||
try:
|
class MyHandler(BaseHTTPRequestHandler):
|
||||||
print(client.containers.run(image="alpine", command="nvid", environment={"KEY": "value"}))
|
# Handler for the GET requests
|
||||||
# print(client.containers.run(image="nvidia/cuda:9.0-base", command="nvidia-smi", environment={"KEY": "value"}, runtime="nvidia"))
|
def do_GET(self):
|
||||||
except Exception as e:
|
req = parse.urlparse(self.path)
|
||||||
print(e.__class__.__name__, e)
|
query = parse.parse_qs(req.query)
|
||||||
|
|
||||||
|
if req.path == "/ping":
|
||||||
|
# Open the static file requested and send it
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'application/json')
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(bytes("pong", "utf-8"))
|
||||||
|
|
||||||
|
elif req.path == "/logs":
|
||||||
|
try:
|
||||||
|
container_id = query['id'][0]
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.get(container_id)
|
||||||
|
msg = {'code': 0, 'logs': str(container.logs().decode())}
|
||||||
|
except Exception as e:
|
||||||
|
msg = {'code': 0, 'error': str(e)}
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'application/json')
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||||
|
|
||||||
|
elif req.path == "/status":
|
||||||
|
container_id = query['id'][0]
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.list(all=True, filters={'id': container_id})
|
||||||
|
if len(container) > 0:
|
||||||
|
container = container[0]
|
||||||
|
status = {
|
||||||
|
'id': container.short_id,
|
||||||
|
'image': container.attrs['Config']['Image'],
|
||||||
|
'image_digest': container.attrs['Image'],
|
||||||
|
'command': container.attrs['Config']['Cmd'],
|
||||||
|
'created_at': container.attrs['Created'],
|
||||||
|
'finished_at': container.attrs['State']['FinishedAt'],
|
||||||
|
'status': container.status
|
||||||
|
}
|
||||||
|
if status['command'] is not None:
|
||||||
|
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
|
||||||
|
msg = {'code': 0, 'status': status}
|
||||||
|
else:
|
||||||
|
msg = {'code': 1, 'error': "container not exist"}
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'application/json')
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.send_error(404, 'File Not Found: %s' % self.path)
|
||||||
|
|
||||||
|
# Handler for the POST requests
|
||||||
|
def do_POST(self):
|
||||||
|
if self.path == "/create":
|
||||||
|
form = cgi.FieldStorage(
|
||||||
|
fp=self.rfile,
|
||||||
|
headers=self.headers,
|
||||||
|
environ={
|
||||||
|
'REQUEST_METHOD': 'POST',
|
||||||
|
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||||
|
})
|
||||||
|
docker_image = form["image"].value
|
||||||
|
docker_name = form["name"].value
|
||||||
|
docker_cmd = form["cmd"].value
|
||||||
|
docker_workspace = form["workspace"].value
|
||||||
|
docker_gpus = form["gpus"].value
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.run(
|
||||||
|
image=docker_image,
|
||||||
|
hostname=docker_name,
|
||||||
|
command=docker_cmd,
|
||||||
|
environment={"repo": docker_workspace, "NVIDIA_VISIBLE_DEVICES": docker_gpus},
|
||||||
|
runtime="nvidia",
|
||||||
|
detach=True
|
||||||
|
)
|
||||||
|
msg = {"code": 0, "id": container.id}
|
||||||
|
except Exception as e:
|
||||||
|
msg = {"code": 1, "error": str(e)}
|
||||||
|
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'application/json')
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||||
|
|
||||||
|
elif self.path == "/stop":
|
||||||
|
form = cgi.FieldStorage(
|
||||||
|
fp=self.rfile,
|
||||||
|
headers=self.headers,
|
||||||
|
environ={
|
||||||
|
'REQUEST_METHOD': 'POST',
|
||||||
|
'CONTENT_TYPE': self.headers['Content-Type'],
|
||||||
|
})
|
||||||
|
container_id = form["id"].value
|
||||||
|
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.get(container_id)
|
||||||
|
container.stop()
|
||||||
|
msg = {"code": 0}
|
||||||
|
|
||||||
|
self.send_response(200)
|
||||||
|
self.send_header('Content-type', 'application/json')
|
||||||
|
self.end_headers()
|
||||||
|
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
||||||
|
else:
|
||||||
|
self.send_error(404, 'File Not Found: %s' % self.path)
|
||||||
|
|
||||||
|
|
||||||
def run_in_background():
|
try:
|
||||||
client = docker.from_env()
|
# Create a web server and define the handler to manage the
|
||||||
container = client.containers.run("alpine", ["echo", "hello", "world"], detach=True)
|
# incoming request
|
||||||
print(container.id)
|
server = HTTPServer(('', PORT_NUMBER), MyHandler)
|
||||||
|
print('Started http server on port ', PORT_NUMBER)
|
||||||
|
|
||||||
|
# Wait forever for incoming http requests
|
||||||
|
server.serve_forever()
|
||||||
|
|
||||||
def list_containers():
|
except KeyboardInterrupt:
|
||||||
client = docker.from_env()
|
print('^C received, shutting down the web server')
|
||||||
for container in client.containers.list():
|
|
||||||
print(container.id)
|
|
||||||
|
|
||||||
|
server.socket.close()
|
||||||
def get_logs(id):
|
|
||||||
try:
|
|
||||||
client = docker.from_env()
|
|
||||||
container = client.containers.get(id)
|
|
||||||
print(container.logs().decode())
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
|
|
||||||
|
|
||||||
def get_status(id):
|
|
||||||
client = docker.from_env()
|
|
||||||
container = client.containers.list(all=True, filters={'id': id})
|
|
||||||
status = {}
|
|
||||||
if len(container) > 0:
|
|
||||||
container= container[0]
|
|
||||||
status['id'] = container.short_id
|
|
||||||
status['image'] = container.attrs['Config']['Image']
|
|
||||||
status['image_digest'] = container.attrs['Image']
|
|
||||||
status['command'] = container.attrs['Config']['Cmd']
|
|
||||||
status['createdAt'] = container.attrs['Created']
|
|
||||||
status['finishedAt'] = container.attrs['State']['FinishedAt']
|
|
||||||
status['status'] = container.status
|
|
||||||
if status['command'] is not None:
|
|
||||||
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
|
|
||||||
print(status)
|
|
||||||
|
|
||||||
|
|
||||||
get_status('')
|
|
||||||
|
@ -7,6 +7,7 @@ import xml.dom.minidom
|
|||||||
from kafka import KafkaProducer
|
from kafka import KafkaProducer
|
||||||
|
|
||||||
ClientID = os.getenv('ClientID', 1)
|
ClientID = os.getenv('ClientID', 1)
|
||||||
|
ClientHost = os.getenv('ClientHost', "localhost")
|
||||||
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
|
KafkaBrokers = os.getenv('KafkaBrokers', 'localhost:9092').split(',')
|
||||||
|
|
||||||
|
|
||||||
@ -44,13 +45,20 @@ def report_msg():
|
|||||||
'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data,
|
'uuid': gpu.getElementsByTagName('uuid')[0].childNodes[0].data,
|
||||||
'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data,
|
'product_name': gpu.getElementsByTagName('product_name')[0].childNodes[0].data,
|
||||||
'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data,
|
'performance_state': gpu.getElementsByTagName('performance_state')[0].childNodes[0].data,
|
||||||
'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[0].data,
|
'memory_total': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('total')[0].childNodes[
|
||||||
'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[0].data,
|
0].data,
|
||||||
'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[0].data,
|
'memory_free': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('free')[0].childNodes[
|
||||||
'utilization_gpu': gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data,
|
0].data,
|
||||||
'utilization_mem': gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data,
|
'memory_used': gpu.getElementsByTagName('fb_memory_usage')[0].getElementsByTagName('used')[0].childNodes[
|
||||||
'temperature_gpu': gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data,
|
0].data,
|
||||||
'power_draw': gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data
|
'utilization_gpu':
|
||||||
|
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('gpu_util')[0].childNodes[0].data,
|
||||||
|
'utilization_mem':
|
||||||
|
gpu.getElementsByTagName('utilization')[0].getElementsByTagName('memory_util')[0].childNodes[0].data,
|
||||||
|
'temperature_gpu':
|
||||||
|
gpu.getElementsByTagName('temperature')[0].getElementsByTagName('gpu_temp')[0].childNodes[0].data,
|
||||||
|
'power_draw':
|
||||||
|
gpu.getElementsByTagName('power_readings')[0].getElementsByTagName('power_draw')[0].childNodes[0].data
|
||||||
}
|
}
|
||||||
|
|
||||||
stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0]))
|
stat['memory_total'] = int(float(stat['memory_total'].split(' ')[0]))
|
||||||
@ -63,7 +71,7 @@ def report_msg():
|
|||||||
|
|
||||||
stats.append(stat)
|
stats.append(stat)
|
||||||
|
|
||||||
post_fields = {'id': ClientID, 'status': stats}
|
post_fields = {'id': ClientID, 'host': ClientHost, 'status': stats}
|
||||||
data = json.dumps(post_fields)
|
data = json.dumps(post_fields)
|
||||||
|
|
||||||
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
|
producer = KafkaProducer(bootstrap_servers=KafkaBrokers)
|
137
server.py
137
server.py
@ -1,137 +0,0 @@
|
|||||||
#!/usr/bin/python
|
|
||||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
||||||
import cgi
|
|
||||||
import docker
|
|
||||||
import json
|
|
||||||
from urllib import parse
|
|
||||||
|
|
||||||
PORT_NUMBER = 8000
|
|
||||||
|
|
||||||
|
|
||||||
# This class will handles any incoming request from
|
|
||||||
# the browser
|
|
||||||
class MyHandler(BaseHTTPRequestHandler):
|
|
||||||
# Handler for the GET requests
|
|
||||||
def do_GET(self):
|
|
||||||
req = parse.urlparse(self.path)
|
|
||||||
query = parse.parse_qs(req.query)
|
|
||||||
|
|
||||||
if req.path == "/ping":
|
|
||||||
# Open the static file requested and send it
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(bytes("pong", "utf-8"))
|
|
||||||
|
|
||||||
elif req.path == "/logs":
|
|
||||||
try:
|
|
||||||
container_id = query['id'][0]
|
|
||||||
client = docker.from_env()
|
|
||||||
container = client.containers.get(container_id)
|
|
||||||
msg = {'code': 0, 'logs': str(container.logs().decode())}
|
|
||||||
except Exception as e:
|
|
||||||
msg = {'code': 0, 'error': str(e)}
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
||||||
|
|
||||||
elif req.path == "/status":
|
|
||||||
container_id = query['id'][0]
|
|
||||||
client = docker.from_env()
|
|
||||||
container = client.containers.list(all=True, filters={'id': container_id})
|
|
||||||
if len(container) > 0:
|
|
||||||
container = container[0]
|
|
||||||
status = {
|
|
||||||
'id': container.short_id,
|
|
||||||
'image': container.attrs['Config']['Image'],
|
|
||||||
'image_digest': container.attrs['Image'],
|
|
||||||
'command': container.attrs['Config']['Cmd'],
|
|
||||||
'created_at': container.attrs['Created'],
|
|
||||||
'finished_at': container.attrs['State']['FinishedAt'],
|
|
||||||
'status': container.status
|
|
||||||
}
|
|
||||||
if status['command'] is not None:
|
|
||||||
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
|
|
||||||
msg = {'code': 0, 'status': status}
|
|
||||||
else:
|
|
||||||
msg = {'code': 1, 'error': "container not exist"}
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.send_error(404, 'File Not Found: %s' % self.path)
|
|
||||||
|
|
||||||
# Handler for the POST requests
|
|
||||||
def do_POST(self):
|
|
||||||
if self.path == "/create":
|
|
||||||
form = cgi.FieldStorage(
|
|
||||||
fp=self.rfile,
|
|
||||||
headers=self.headers,
|
|
||||||
environ={
|
|
||||||
'REQUEST_METHOD': 'POST',
|
|
||||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
|
||||||
})
|
|
||||||
docker_image = form["image"].value
|
|
||||||
docker_name = form["name"].value
|
|
||||||
docker_cmd = form["cmd"].value
|
|
||||||
docker_workspace = form["workspace"].value
|
|
||||||
docker_gpus = form["gpus"].value
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = docker.from_env()
|
|
||||||
container = client.containers.run(
|
|
||||||
image=docker_image,
|
|
||||||
hostname=docker_name,
|
|
||||||
command=docker_cmd,
|
|
||||||
environment={"repo": docker_workspace, "NVIDIA_VISIBLE_DEVICES": docker_gpus},
|
|
||||||
runtime="nvidia",
|
|
||||||
detach=True
|
|
||||||
)
|
|
||||||
msg = {"code": 0, "id": container.id}
|
|
||||||
except Exception as e:
|
|
||||||
msg = {"code": 1, "error": str(e)}
|
|
||||||
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
||||||
|
|
||||||
elif self.path == "/stop":
|
|
||||||
form = cgi.FieldStorage(
|
|
||||||
fp=self.rfile,
|
|
||||||
headers=self.headers,
|
|
||||||
environ={
|
|
||||||
'REQUEST_METHOD': 'POST',
|
|
||||||
'CONTENT_TYPE': self.headers['Content-Type'],
|
|
||||||
})
|
|
||||||
container_id = form["id"].value
|
|
||||||
|
|
||||||
client = docker.from_env()
|
|
||||||
container = client.containers.get(container_id)
|
|
||||||
container.stop()
|
|
||||||
msg = {"code": 0}
|
|
||||||
|
|
||||||
self.send_response(200)
|
|
||||||
self.send_header('Content-type', 'application/json')
|
|
||||||
self.end_headers()
|
|
||||||
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
|
|
||||||
else:
|
|
||||||
self.send_error(404, 'File Not Found: %s' % self.path)
|
|
||||||
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create a web server and define the handler to manage the
|
|
||||||
# incoming request
|
|
||||||
server = HTTPServer(('', PORT_NUMBER), MyHandler)
|
|
||||||
print('Started http server on port ', PORT_NUMBER)
|
|
||||||
|
|
||||||
# Wait forever for incoming http requests
|
|
||||||
server.serve_forever()
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print('^C received, shutting down the web server')
|
|
||||||
|
|
||||||
server.socket.close()
|
|
52
test.py
Normal file
52
test.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
import docker
|
||||||
|
|
||||||
|
|
||||||
|
def run():
|
||||||
|
client = docker.from_env()
|
||||||
|
try:
|
||||||
|
print(client.containers.run(image="alpine", command="nvid", environment={"KEY": "value"}))
|
||||||
|
# print(client.containers.run(image="nvidia/cuda:9.0-base", command="nvidia-smi", environment={"KEY": "value"}, runtime="nvidia"))
|
||||||
|
except Exception as e:
|
||||||
|
print(e.__class__.__name__, e)
|
||||||
|
|
||||||
|
|
||||||
|
def run_in_background():
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.run("alpine", ["echo", "hello", "world"], detach=True)
|
||||||
|
print(container.id)
|
||||||
|
|
||||||
|
|
||||||
|
def list_containers():
|
||||||
|
client = docker.from_env()
|
||||||
|
for container in client.containers.list():
|
||||||
|
print(container.id)
|
||||||
|
|
||||||
|
|
||||||
|
def get_logs(id):
|
||||||
|
try:
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.get(id)
|
||||||
|
print(container.logs().decode())
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
|
||||||
|
def get_status(id):
|
||||||
|
client = docker.from_env()
|
||||||
|
container = client.containers.list(all=True, filters={'id': id})
|
||||||
|
status = {}
|
||||||
|
if len(container) > 0:
|
||||||
|
container= container[0]
|
||||||
|
status['id'] = container.short_id
|
||||||
|
status['image'] = container.attrs['Config']['Image']
|
||||||
|
status['image_digest'] = container.attrs['Image']
|
||||||
|
status['command'] = container.attrs['Config']['Cmd']
|
||||||
|
status['createdAt'] = container.attrs['Created']
|
||||||
|
status['finishedAt'] = container.attrs['State']['FinishedAt']
|
||||||
|
status['status'] = container.status
|
||||||
|
if status['command'] is not None:
|
||||||
|
status['command'] = ' '.join(container.attrs['Config']['Cmd'])
|
||||||
|
print(status)
|
||||||
|
|
||||||
|
|
||||||
|
get_status('')
|
Loading…
Reference in New Issue
Block a user