mirror of
https://github.com/newnius/YAO-agent.git
synced 2025-06-07 13:51:56 +00:00
update
This commit is contained in:
parent
800fab7bd4
commit
e614a15474
73
agent.py
73
agent.py
@ -42,12 +42,65 @@ event_counter = 0
|
|||||||
|
|
||||||
client = docker.from_env()
|
client = docker.from_env()
|
||||||
|
|
||||||
|
taskStats = {}
|
||||||
|
taskStatsLock = Lock()
|
||||||
|
|
||||||
|
|
||||||
def generate_token(stringLength=8):
|
def generate_token(stringLength=8):
|
||||||
letters = string.ascii_lowercase
|
letters = string.ascii_lowercase
|
||||||
return ''.join(random.choice(letters) for i in range(stringLength))
|
return ''.join(random.choice(letters) for i in range(stringLength))
|
||||||
|
|
||||||
|
|
||||||
|
def monitor_task(container_id):
|
||||||
|
print(container_id)
|
||||||
|
container = client.containers.get(container_id)
|
||||||
|
maxCPU = 0
|
||||||
|
maxMem = 0
|
||||||
|
last_bw_rx = 0
|
||||||
|
last_bw_tx = 0
|
||||||
|
last_time = time.time() - 1
|
||||||
|
for statR in container.stats():
|
||||||
|
stat = json.loads(statR)
|
||||||
|
# print(stat)
|
||||||
|
if stat['read'] == '0001-01-01T00:00:00Z':
|
||||||
|
print('container ', container_id, ' exited')
|
||||||
|
break
|
||||||
|
|
||||||
|
taskStatsLock.acquire()
|
||||||
|
# CPU load, x%
|
||||||
|
cur = stat['cpu_stats']['cpu_usage']['total_usage']
|
||||||
|
last = stat['precpu_stats']['cpu_usage']['total_usage']
|
||||||
|
utilCPU = (cur - last) / 10000000
|
||||||
|
|
||||||
|
# Memory, MB
|
||||||
|
mem = stat['memory_stats']['stats']['active_anon']
|
||||||
|
mem = mem / 1024
|
||||||
|
mem = mem / 1024
|
||||||
|
|
||||||
|
# Bandwidth, KB/s
|
||||||
|
cur = stat['networks']['eth0']['rx_bytes'] / 1024
|
||||||
|
bw_rx = cur - last_bw_rx
|
||||||
|
last_bw_rx = cur
|
||||||
|
cur = stat['networks']['eth0']['tx_bytes'] / 1024
|
||||||
|
bw_tx = cur - last_bw_tx
|
||||||
|
last_bw_tx = cur
|
||||||
|
now = time.time()
|
||||||
|
dur = now - last_time
|
||||||
|
last_time = now
|
||||||
|
bw_rx /= dur
|
||||||
|
bw_tx /= dur
|
||||||
|
|
||||||
|
taskStats[container_id] = {'cpu': utilCPU, 'mem': mem, 'bw_rx': bw_rx, 'bw_tx': bw_tx}
|
||||||
|
# print(utilCPU, mem, maxCPU, maxMem, bw_rx, bw_tx)
|
||||||
|
taskStatsLock.release()
|
||||||
|
if stat['preread'] == '0001-01-01T00:00:00Z':
|
||||||
|
continue
|
||||||
|
if utilCPU > maxCPU:
|
||||||
|
maxCPU = utilCPU
|
||||||
|
if mem > maxMem:
|
||||||
|
maxMem = mem
|
||||||
|
|
||||||
|
|
||||||
def launch_tasks(stats):
|
def launch_tasks(stats):
|
||||||
utils = {}
|
utils = {}
|
||||||
mem_frees = {}
|
mem_frees = {}
|
||||||
@ -89,7 +142,8 @@ class MyHandler(BaseHTTPRequestHandler):
|
|||||||
msg = {
|
msg = {
|
||||||
'pending_tasks': pending_tasks,
|
'pending_tasks': pending_tasks,
|
||||||
'id2token': id2token,
|
'id2token': id2token,
|
||||||
'event_counter': event_counter
|
'event_counter': event_counter,
|
||||||
|
'taskStats': taskStats
|
||||||
}
|
}
|
||||||
self.send_response(200)
|
self.send_response(200)
|
||||||
self.send_header('Content-type', 'application/json')
|
self.send_header('Content-type', 'application/json')
|
||||||
@ -140,8 +194,19 @@ class MyHandler(BaseHTTPRequestHandler):
|
|||||||
'finished_at': container.attrs['State']['FinishedAt'],
|
'finished_at': container.attrs['State']['FinishedAt'],
|
||||||
'status': container.status,
|
'status': container.status,
|
||||||
'hostname': container.attrs['Config']['Hostname'],
|
'hostname': container.attrs['Config']['Hostname'],
|
||||||
'state': container.attrs['State']
|
'state': container.attrs['State'],
|
||||||
|
'cpu': 0,
|
||||||
|
'mem': 0,
|
||||||
|
'bw_rx': 0,
|
||||||
|
'bw_tx': 0
|
||||||
}
|
}
|
||||||
|
if container_id in taskStats:
|
||||||
|
taskStatsLock.acquire()
|
||||||
|
status['cpu'] = taskStats[container_id]['cpu']
|
||||||
|
status['mem'] = taskStats[container_id]['mem']
|
||||||
|
status['bw_rx'] = taskStats[container_id]['bw_rx']
|
||||||
|
status['bw_tx'] = taskStats[container_id]['bw_tx']
|
||||||
|
taskStatsLock.release()
|
||||||
if container_id in id2token:
|
if container_id in id2token:
|
||||||
token = id2token[container_id]
|
token = id2token[container_id]
|
||||||
if token in pending_tasks:
|
if token in pending_tasks:
|
||||||
@ -200,6 +265,8 @@ class MyHandler(BaseHTTPRequestHandler):
|
|||||||
print(e)
|
print(e)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# set PYTHONUNBUFFERED=1 to output immediately
|
||||||
|
# see https://tarunlalwani.com/post/why-delayed-output-python-docker/
|
||||||
script = " ".join([
|
script = " ".join([
|
||||||
"docker run",
|
"docker run",
|
||||||
"--gpus '\"device=" + docker_gpus + "\"'",
|
"--gpus '\"device=" + docker_gpus + "\"'",
|
||||||
@ -226,6 +293,8 @@ class MyHandler(BaseHTTPRequestHandler):
|
|||||||
exit_code, output = container.exec_run(['sh', '-c', script])
|
exit_code, output = container.exec_run(['sh', '-c', script])
|
||||||
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
|
msg = {"code": 0, "id": output.decode('utf-8').rstrip('\n')}
|
||||||
|
|
||||||
|
Thread(target=monitor_task, name='monitor_task', args=(msg['id'],)).start()
|
||||||
|
|
||||||
if docker_wait == "1":
|
if docker_wait == "1":
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}
|
pending_tasks[token] = {'gpus': str(docker_gpus).split(','), 'gpu_mem': int(docker_gpu_mem)}
|
||||||
|
Loading…
Reference in New Issue
Block a user