EMD分解代码,这里比较简单,直接用PyEMD库就可以实现。
import pandas as pd
from PyEMD import EMD
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
data = pd.read_excel('data.xls',header=0,index_col=0)
data = data.fillna(method='bfill')
df = data.iloc[:,:-1]
df1 = df.T
data_new = pd.DataFrame()
for i in range(df1.shape[1]):
data_new = pd.concat([data_new,pd.DataFrame(df1.iloc[:,i:i+1].values)],axis=0)
new_index = pd.date_range(start='1960-01-01', periods=len(data_new), freq='MS')
data_new.index = new_index
signal = data_new.iloc[:,0:].values.reshape(-1)
# 应用EMD方法
emd = EMD()
IMFs = emd(signal)
num_imfs = IMFs.shape[0]
print(f"共有 {num_imfs} 个分解后的子信号:")
for i, imf in enumerate(IMFs):
plt.figure(dpi=300)
plt.plot(imf)
plt.xlabel('累计月数')
plt.ylabel(f'子序列{i}')
plt.grid(ls='--', alpha=0.5)
plt.savefig(f'figure/子序列{i}.png', format = 'png', bbox_inches='tight',dpi=300)
plt.show()
ims = pd.DataFrame(IMFs,columns=None)
imst = ims.T
imst.index = new_index
ddd = pd.concat([data_new,imst],axis=1)
ddd.to_csv('data_split.csv',encoding='gbk')
分解后的子序列图片。
LSTM模型
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf
from OA import COA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import LSTM, Dense
from sklearn.metrics import r2_score,mean_squared_error,mean_absolute_error
import os, random
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
# 设置中文字体
plt.rcParams['font.family'] = 'SimHei'
# 设置正常显示符号
plt.rcParams['axes.unicode_minus'] = False
data = pd.read_csv('data_split.csv', encoding='gbk', index_col=0)
X = data.iloc[:,0:].values
Y = data.iloc[:,0:1].values
X_SS = StandardScaler()
Y_SS = StandardScaler()
x = X_SS.fit_transform(X)
y = Y_SS.fit_transform(Y)
x_train, x_test, y_train, y_test = train_test_split(x, y, random_state=42, test_size=0.2, shuffle=False)
#定义时间滑窗函数
def create_time_windows(x, y, window_size, stride):
'''
:param x: 特征值
:param y: 目标值
:param window_size:时间窗格大小
:param stride: 时间步长
:return: 返回增加了时间滑窗的x和y
'''
x_windows = []
y_windows = []
for i in range(0, len(x) - window_size, stride):
x_window = x[i:i+window_size+1]
y_window = y[i+window_size]
x_windows.append(x_window)
y_windows.append(y_window)
return np.array(x_windows), np.array(y_windows)
window_size = 7 #时间窗格大小
stride = 1 #时间步长
#划分训练集和测试集
x_train, y_train = create_time_windows(x_train, y_train, window_size, stride)
x_test, y_test = create_time_windows(x_test, y_test, window_size, stride)
def lstm(units,lr,epoch,batch_size):
model = Sequential()
model.add(LSTM(units,input_shape=(x_train.shape[1],x_train.shape[2])))
model.add(Dense(1))
model.compile(loss='mse',optimizer=Adam(lr))
history = model.fit(x_train,y_train,epochs=epoch,batch_size=batch_size,verbose=0,validation_data=(x_test,y_test))
pre = model.predict(x_test)
pre_invers = Y_SS.inverse_transform(pre)
y_test_invers = Y_SS.inverse_transform(y_test)
return model,history,pre_invers,y_test_invers
def fun_fitness(x):
# 设置随机数种子
tf.random.set_seed(666)
np.random.seed(666)
random.seed(666)
units = int(x[0]) #神经元个数
lr = x[1] #初始学习率
epoch = int(x[2]) #迭代次数
batch_size = int(x[3]) #batch_size
model, history, pre_invers, y_test_invers = lstm(units,lr,epoch,batch_size)
fitness = mean_squared_error(y_test_invers,pre_invers)
return fitness
预测结果。