import itertools from sklearn.model_selection import train_test_split import pandas as pd import numpy as np import matplotlib.pyplot as plt from keras import layers, models def get_state_vect_cols(prefix=''): if prefix: prefix += '_' vectors = ['r', 'v'] components = ['x', 'y', 'z'] col_names = [f'{prefix}{v}_{c}' for v, c in itertools.product(vectors, components)] return col_names def build_train_test_sets(df, test_size=0.2): # Features are the physics predicted state vectors and the amount of # time in seconds into the future the prediction was made feature_cols = ['elapsed_seconds'] + get_state_vect_cols('physics_pred') \ + get_state_vect_cols('start') # The target values are the errors between the physical model predictions # and the ground truth observations target_cols = get_state_vect_cols('physics_err') # Create feature and target matrices X = df[feature_cols] y = df[target_cols] # Split feature and target data into training and test sets data_keys = ['X_train', 'X_test', 'y_train', 'y_test'] data_vals = train_test_split(X, y, test_size=test_size) train_test_data = dict(zip(data_keys, data_vals)) return train_test_data def get_data(file_path): print('Loading physical model orbit prediction training data...') physics_pred_df = pd.read_parquet(file_path) print('Building training and test sets...') train_test_data = build_train_test_sets(physics_pred_df) x_train = train_test_data['X_train'] x_test = train_test_data['X_test'] y_train = train_test_data['y_train'] y_test = train_test_data['y_test'] return x_train, y_train, x_test, y_test # 模型定义 def build_model(): network = models.Sequential() network.add(layers.Dense(64, activation='relu', input_shape=(13, ))) network.add(layers.Dense(64, activation='relu')) network.add(layers.Dense(1)) # 最后输出预测值,恒等函数 #损失函数用mes(均方误差), 监控指标为mae(平均绝对误差, 返回误差绝对值) network.compile(optimizer='rmsprop', loss='mse', metrics=['mae']) return network file_path = r"ssa/traindata/physics_preds.parquet" train_data, train_labels, test_data, test_labels = get_data(file_path) # # 数据标准化,减去平均值再除以标准差(测试数据也用训练数据的标准差) # mean = train_data.mean(axis=0) # train_data -= mean # std = train_data.std(axis=0) # train_data /= std # test_data -= mean # test_data /= std x_train = np.array(train_data) print(x_train.shape) x_test = np.array(test_data) print(x_test.shape) features = ['physics_err_r_x', 'physics_err_r_y', 'physics_err_r_z', 'physics_err_v_x', 'physics_err_v_y', 'physics_err_v_z'] ave_r2 = 0. for i in range(6): y_train = np.array(train_labels[features[i]]) print(y_train.shape) y_test = np.array(test_labels[features[i]]) print(y_test.shape) network = build_model() network.summary() History = network.fit(x_train, y_train, epochs=100, batch_size=1) network.save('models/DNN{0}.h5'.format(i+1)) # 用训练好的模型衡量测试数据精确度 mse, mae = network.evaluate(x_test, y_test) rmse = mse**0.5 r2 = 1-mse / np.var(y_test) print(features[i], ": mse:", mse, " rmse:", rmse, " mae:", mae, " r2:", r2) ave_r2 += r2 # #用训练好的网络预测结果 # y_p = network.predict(x_test) # 绘制图像 history_dict = History.history print(history_dict.keys()) metric_list = history_dict['mae'] x = range(1, len(metric_list) + 1) plt.figure(i) plt.plot(x, metric_list) plt.title('Training_mae') plt.xlabel('Epoches') plt.ylabel('mean abs error') plt.legend() plt.show() print('ave_r2: ', ave_r2/6)