1
0
mirror of https://github.com/newnius/YAO-optimizer.git synced 2025-06-06 22:51:55 +00:00
YAO-optimizer/serve.py
2020-05-02 18:12:26 +08:00

302 lines
8.2 KiB
Python

#!/usr/bin/python
from threading import Thread
from threading import Lock
from http.server import BaseHTTPRequestHandler, HTTPServer
import cgi
import json
from urllib import parse
import pandas as pd
import csv
from pandas import DataFrame
from pandas import Series
from pandas import concat
from pandas import read_csv
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from math import sqrt
import numpy
PORT_NUMBER = 8080
lock = Lock()
models = {}
# frame a sequence as a supervised learning problem
def timeseries_to_supervised(data, lag=1):
df = DataFrame(data)
columns = [df.shift(i) for i in range(1, lag + 1)]
columns.append(df)
df = concat(columns, axis=1)
df = df.drop(0)
return df
# create a differenced series
def difference(dataset, interval=1):
diff = list()
for i in range(interval, len(dataset)):
value = dataset[i] - dataset[i - interval]
diff.append(value)
return Series(diff)
# invert differenced value
def inverse_difference(history, yhat, interval=1):
return yhat + history[-interval]
# inverse scaling for a forecasted value
def invert_scale(scaler, X, yhat):
new_row = [x for x in X] + [yhat]
array = numpy.array(new_row)
array = array.reshape(1, len(array))
inverted = scaler.inverse_transform(array)
return inverted[0, -1]
# fit an LSTM network to training data
def fit_lstm(train, batch_size2, nb_epoch, neurons):
X, y = train[:, 0:-1], train[:, -1]
X = X.reshape(X.shape[0], 1, X.shape[1])
model = Sequential()
model.add(LSTM(neurons, batch_input_shape=(batch_size2, X.shape[1], X.shape[2]), stateful=True))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
for i in range(nb_epoch):
model.fit(X, y, epochs=1, batch_size=batch_size2, verbose=0, shuffle=False)
# loss = model.evaluate(X, y)
# print("Epoch {}/{}, loss = {}".format(i, nb_epoch, loss))
print("Epoch {}/{}".format(i, nb_epoch))
model.reset_states()
return model
def train_models(job):
lock.acquire()
if job not in models:
models[job] = {
'lock': Lock()
}
lock.release()
models[job]['lock'].acquire()
# load dataset
series = read_csv('./data/' + job + '.csv', header=0, index_col=0, squeeze=True)
# transform data to be stationary
raw_values = series.values
diff_values = difference(raw_values, 1)
# transform data to be supervised learning
lag = 4
supervised = timeseries_to_supervised(diff_values, lag)
print(supervised)
print(type(supervised))
print(supervised.shape)
supervised_values = supervised.values
print(supervised_values)
batch_size = 32
if supervised_values.shape[0] < 100:
batch_size = 16
if supervised_values.shape[0] < 60:
batch_size = 8
# split data into train and test-sets
train = supervised_values
# transform the scale of the data
# scale data to [-1, 1]
# fit scaler
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler = scaler.fit(train)
# transform train
train = train.reshape(train.shape[0], train.shape[1])
train_scaled = scaler.transform(train)
# fit the model
t1 = train.shape[0] % batch_size
train_trimmed = train_scaled[t1:, :]
model = fit_lstm(train_trimmed, batch_size, 30, 4)
models[job]['model'] = model
models[job]['scaler'] = scaler
models[job]['batch_size'] = batch_size
models[job]['lock'].release()
def predict(job, seq):
if job not in models or 'model' not in models[job]:
return -1, False
# load dataset
batch_size = int(models[job]['batch_size'])
df = read_csv('./data/' + job + '.csv', header=0, index_col=0, squeeze=True)
df = df.tail(batch_size * 2 - 1)
df.loc[df.shape[0]] = [seq, 0]
# transform data to be stationary
raw_values = df.values
print(raw_values)
diff_values = difference(raw_values, 1)
print(diff_values)
# transform data to be supervised learning
lag = 4
supervised = timeseries_to_supervised(diff_values, lag)
print(type(supervised))
print(supervised)
supervised_values = supervised[batch_size:]
print(type(supervised_values))
print(supervised_values)
print(supervised_values.shape)
test = supervised_values.values
print(test)
test = test.reshape(test.shape[0], test.shape[1])
test_scaled = models[job]['scaler'].transform(test)
# forecast the entire training dataset to build up state for forecasting
test_reshaped = test_scaled[:, 0:-1]
test_reshaped = test_reshaped.reshape(len(test_reshaped), 1, lag)
output = models[job]['model'].predict(test_reshaped, batch_size=batch_size)
predictions = list()
for i in range(len(output)):
yhat = output[i, 0]
X = test_scaled[i, 0:-1]
# invert scaling
yhat = invert_scale(models[job]['scaler'], X, yhat)
# invert differencing
yhat = inverse_difference(raw_values, yhat, len(test_scaled) + 1 - i)
# store forecast
predictions.append(yhat)
# report performance
rmse = sqrt(mean_squared_error(raw_values[-batch_size:], predictions))
print(predictions, raw_values[-batch_size:])
return 1, True
class MyHandler(BaseHTTPRequestHandler):
# Handler for the GET requests
def do_GET(self):
req = parse.urlparse(self.path)
query = parse.parse_qs(req.query)
if req.path == "/ping":
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes("pong", "utf-8"))
elif req.path == "/predict":
try:
job = query.get('job')[0]
seq = query.get('seq')[0]
msg = {'code': 0, 'error': ""}
pred, success = predict(job, int(seq))
if not success:
msg = {'code': 2, 'error': "Job " + job + " not exist"}
except Exception as e:
msg = {'code': 1, 'error': str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
elif req.path == "/feed":
try:
job = query.get('job')[0]
seq = query.get('seq')[0]
value = query.get('value')[0]
if int(seq) == 1:
with open('./data/' + job + '.csv', 'w', newline='') as csvfile:
spamwriter = csv.writer(
csvfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL
)
spamwriter.writerow(["seq", "value"])
with open('./data/' + job + '.csv', 'a+', newline='') as csvfile:
spamwriter = csv.writer(
csvfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL
)
spamwriter.writerow([seq, value])
msg = {'code': 0, 'error': ""}
except Exception as e:
msg = {'code': 1, 'error': str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
elif req.path == "/train":
try:
job = query.get('job')[0]
t = Thread(target=train_models, name='train_models', args=(job,))
t.start()
msg = {'code': 0, 'error': ""}
except Exception as e:
msg = {'code': 1, 'error': str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
else:
self.send_error(404, 'File Not Found: %s' % self.path)
# Handler for the POST requests
def do_POST(self):
if self.path == "/train2":
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
try:
job = form.getvalue('job')[0]
seq = form.getvalue('seq')[0]
t = Thread(target=train_models(), name='train_models', args=(job, seq,))
t.start()
msg = {"code": 0, "error": ""}
except Exception as e:
msg = {"code": 1, "error": str(e)}
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(bytes(json.dumps(msg), "utf-8"))
else:
self.send_error(404, 'File Not Found: %s' % self.path)
if __name__ == '__main__':
try:
# Create a web server and define the handler to manage the
# incoming request
server = HTTPServer(('', PORT_NUMBER), MyHandler)
print('Started http server on port ', PORT_NUMBER)
# Wait forever for incoming http requests
server.serve_forever()
except KeyboardInterrupt:
print('^C received, shutting down the web server')
server.socket.close()