Skip to content

机器学习、生成式AI和深度学习时间序列模型(含代码)

作者:老余捞鱼

原创不易,转载请标明出处及原作者。

写在前面的话:本文转自一篇论文,主要讨论了在不同行业中时间序列预测的重要性,以及如何利用机器学习、生成式人工智能(Generative AI)和深度学习来提高预测的准确性。时间序列数据是按特定时间间隔收集或记录的数据点序列,例如股票价格、天气数据、销售数字和传感器读数。预测未来值的能力可以显著改进决策过程和运营效率。

本文介绍了包括ARIMA、SARIMA、Prophet、XGBoost、GANs、WaveNet、LSTM、GRU、Transformer、Seq2Seq、TCN和DeepAR在内的多种时间序列预测模型,介绍了各自模型的特点以及具体代码。如我们首先从机器学习方法开始。

一、机器学习方法

1.1 ARIMA(自回归积分滑动平均模型)

ARIMA(自回归积分滑动平均模型)一种经典的统计方法,结合了自回归(AR)、差分(使数据平稳)和滑动平均(MA)模型。

import pandas as pdfrom statsmodels.tsa.arima.model import ARIMA
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Fit ARIMA modelmodel = ARIMA(time_series_data['Value'], order=(5, 1, 0)) # (p,d,q)model_fit = model.fit()
# Make predictionspredictions = model_fit.forecast(steps=10)print(predictions)

1.2 SARIMA(季节性ARIMA)

SARIMA(季节性ARIMA)是在ARIMA的基础上增加了季节性效应的考虑。

import pandas as pdimport numpy as npfrom statsmodels.tsa.statespace.sarimax import SARIMAX
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Fit SARIMA modelmodel = SARIMAX(time_series_data['Value'], order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)) # (p,d,q) (P,D,Q,s)model_fit = model.fit(disp=False)
# Make predictionspredictions = model_fit.forecast(steps=10)print(predictions)

1.3 Prophet

Prophet由Facebook开发,适用于处理缺失数据和异常值,并提供可靠的不确定性区间。

from fbprophet import Prophetimport pandas as pd
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.rename(columns={'Date': 'ds', 'Value': 'y'}, inplace=True)
# Fit Prophet modelmodel = Prophet()model.fit(time_series_data)
# Make future dataframe and predictionsfuture = model.make_future_dataframe(periods=10)forecast = model.predict(future)print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']])

1.4 XGBoost

XGBoost是一种梯度增强框架,通过将问题转化为监督学习任务来进行时间序列预测。

import pandas as pdimport numpy as npfrom xgboost import XGBRegressorfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import mean_squared_error
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for supervised learningdef create_lag_features(data, lag=1): df = data.copy() for i in range(1, lag + 1): df[f'lag_{i}'] = df['Value'].shift(i) return df.dropna()
lag = 5data_with_lags = create_lag_features(time_series_data, lag=lag)X = data_with_lags.drop('Value', axis=1)y = data_with_lags['Value']
# Split the data into training and testing setsX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# Fit XGBoost modelmodel = XGBRegressor(objective='reg:squarederror', n_estimators=1000)model.fit(X_train, y_train)
# Make predictionsy_pred = model.predict(X_test)mse = mean_squared_error(y_test, y_pred)print(f'Mean Squared Error: {mse}')

二、生成式AI方法

2.1 GANs(生成对抗网络)

GANs(生成对抗网络)由生成器和鉴别器组成,可以生成合理的未来序列。

import numpy as npimport pandas as pdfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import Dense, LSTM, Conv1D, MaxPooling1D, Flatten, LeakyReLU, Reshapefrom tensorflow.keras.optimizers import Adam
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for GANdef create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X_train, y_train = create_dataset(scaled_data, time_step)X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
# GAN componentsdef build_generator(): model = Sequential() model.add(Dense(100, input_dim=time_step)) model.add(LeakyReLU(alpha=0.2)) model.add(Dense(time_step, activation='tanh')) model.add(Reshape((time_step, 1))) return model
def build_discriminator(): model = Sequential() model.add(LSTM(50, input_shape=(time_step, 1))) model.add(Dense(1, activation='sigmoid')) return model
# Build and compile the discriminatordiscriminator = build_discriminator()discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])
# Build the generatorgenerator = build_generator()
# The generator takes noise as input and generates dataz = Input(shape=(time_step,))generated_data = generator(z)
# For the combined model, we will only train the generatordiscriminator.trainable = False
# The discriminator takes generated data as input and determines validityvalidity = discriminator(generated_data)
# The combined model (stacked generator and discriminator)combined = Model(z, validity)combined.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))
# Training the GANepochs = 10000batch_size = 32valid = np.ones((batch_size, 1))fake = np.zeros((batch_size, 1))
for epoch in range(epochs): # --------------------- # Train Discriminator # ---------------------
# Select a random batch of real data idx = np.random.randint(0, X_train.shape[0], batch_size) real_data = X_train[idx]
# Generate a batch of fake data noise = np.random.normal(0, 1, (batch_size, time_step)) gen_data = generator.predict(noise)
# Train the discriminator d_loss_real = discriminator.train_on_batch(real_data, valid) d_loss_fake = discriminator.train_on_batch(gen_data, fake) d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
# --------------------- # Train Generator # ---------------------
noise = np.random.normal(0, 1, (batch_size, time_step))
# Train the generator (to have the discriminator label samples as valid) g_loss = combined.train_on_batch(noise, valid)
# Print the progress if epoch % 1000 == 0: print(f"{epoch} [D loss: {d_loss[0]} | D accuracy: {100*d_loss[1]}] [G loss: {g_loss}]")
# Make predictionsnoise = np.random.normal(0, 1, (1, time_step))generated_prediction = generator.predict(noise)generated_prediction = scaler.inverse_transform(generated_prediction)print(generated_prediction)

2.2 WaveNet

WaveNet由DeepMind开发,最初用于音频生成,但已适应于时间序列预测。

import numpy as npimport pandas as pdimport tensorflow as tffrom sklearn.preprocessing import MinMaxScalerfrom tensorflow.keras.models import Modelfrom tensorflow.keras.layers import Input, Conv1D, Add, Activation, Multiply, Lambda, Dense, Flattenfrom tensorflow.keras.optimizers import Adam
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for WaveNetscaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
def create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10X, y = create_dataset(scaled_data, time_step)X = X.reshape(X.shape[0], X.shape[1], 1)
# Define WaveNet modeldef residual_block(x, dilation_rate): tanh_out = Conv1D(32, kernel_size=2, dilation_rate=dilation_rate, padding='causal', activation='tanh')(x) sigm_out = Conv1D(32, kernel_size=2, dilation_rate=dilation_rate, padding='causal', activation='sigmoid')(x) out = Multiply()([tanh_out, sigm_out]) out = Conv1D(32, kernel_size=1, padding='same')(out) out = Add()([out, x]) return out
input_layer = Input(shape=(time_step, 1))out = Conv1D(32, kernel_size=2, padding='causal', activation='tanh')(input_layer)skip_connections = []for i in range(10): out = residual_block(out, 2**i) skip_connections.append(out)
out = Add()(skip_connections)out = Activation('relu')(out)out = Conv1D(1, kernel_size=1, activation='relu')(out)out = Flatten()(out)out = Dense(1)(out)
model = Model(input_layer, out)model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
# Train the modelmodel.fit(X, y, epochs=10, batch_size=16)
# Make predictionspredictions = model.predict(X)predictions = scaler.inverse_transform(predictions)print(predictions)

三、深度学习方法

3.1 LSTM(长短期记忆网络)

LSTM(长短期记忆网络)是LSTM(长短期记忆网络)。

import numpy as npimport pandas as pdfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import LSTM, Densefrom sklearn.preprocessing import MinMaxScaler
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for LSTMscaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
train_size = int(len(scaled_data) * 0.8)train_data = scaled_data[:train_size]test_data = scaled_data[train_size:]
def create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10X_train, y_train = create_dataset(train_data, time_step)X_test, y_test = create_dataset(test_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# Build LSTM modelmodel = Sequential()model.add(LSTM(50, return_sequences=True, input_shape=(time_step, 1)))model.add(LSTM(50, return_sequences=False))model.add(Dense(25))model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')model.fit(X_train, y_train, batch_size=1, epochs=1)
# Make predictionstrain_predict = model.predict(X_train)test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)test_predict = scaler.inverse_transform(test_predict)print(test_predict)

3.2 GRU(门控循环单元)

GRU(门控循环单元)属于LSTM的变体,更简单,通常在时间序列任务中表现同样出色。

import numpy as npimport pandas as pdfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import GRU, Densefrom sklearn.preprocessing import MinMaxScaler
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for GRUscaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
train_size = int(len(scaled_data) * 0.8)train_data = scaled_data[:train_size]test_data = scaled_data[train_size:]
def create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10X_train, y_train = create_dataset(train_data, time_step)X_test, y_test = create_dataset(test_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# Build GRU modelmodel = Sequential()model.add(GRU(50, return_sequences=True, input_shape=(time_step, 1)))model.add(GRU(50, return_sequences=False))model.add(Dense(25))model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')model.fit(X_train, y_train, batch_size=1, epochs=1)
# Make predictionstrain_predict = model.predict(X_train)test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)test_predict = scaler.inverse_transform(test_predict)print(test_predict)

3.3 Transformer模型

Transformer模型在自然语言处理任务中非常成功,已被适应于时间序列预测。

import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import Dense, LSTM, Conv1D, MaxPooling1D, Flatten, MultiHeadAttention, LayerNormalization, Dropout
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare datascaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
train_size = int(len(scaled_data) * 0.8)train_data = scaled_data[:train_size]test_data = scaled_data[train_size:]
def create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10X_train, y_train = create_dataset(train_data, time_step)X_test, y_test = create_dataset(test_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# Build Transformer modelmodel = Sequential()model.add(MultiHeadAttention(num_heads=4, key_dim=2, input_shape=(time_step, 1)))model.add(LayerNormalization())model.add(Dense(50, activation='relu'))model.add(Dropout(0.1))model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')model.fit(X_train, y_train, batch_size=1, epochs=1)
# Make predictionstrain_predict = model.predict(X_train)test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)test_predict = scaler.inverse_transform(test_predict)print(test_predict)

3.4 Seq2Seq(序列到序列)模型

Seq2Seq(序列到序列)模型用于预测数据序列,通过学习输入序列到输出序列的映射。

import numpy as npimport pandas as pdfrom tensorflow.keras.models import Modelfrom tensorflow.keras.layers import Input, LSTM, Dense
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for Seq2Seqdef create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X, y = create_dataset(scaled_data, time_step)X = X.reshape(X.shape[0], X.shape[1], 1)
# Define Seq2Seq modelencoder_inputs = Input(shape=(time_step, 1))encoder = LSTM(50, return_state=True)encoder_outputs, state_h, state_c = encoder(encoder_inputs)
decoder_inputs = Input(shape=(time_step, 1))decoder_lstm = LSTM(50, return_sequences=True, return_state=True)decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=[state_h, state_c])decoder_dense = Dense(1)decoder_outputs = decoder_dense(decoder_outputs)
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)model.compile(optimizer='adam', loss='mean_squared_error')
# Train the modelmodel.fit([X, X], y, epochs=10, batch_size=16)
# Make predictionspredictions = model.predict([X, X])predictions = scaler.inverse_transform(predictions)print(predictions)

3.5 TCN(时序卷积网络)

TCN(时序卷积网络)使用扩张卷积来捕捉时间序列数据中的长期依赖关系。

import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import Conv1D, Dense, Flatten
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for TCNdef create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X, y = create_dataset(scaled_data, time_step)X = X.reshape(X.shape[0], X.shape[1], 1)
# Define TCN modelmodel = Sequential()model.add(Conv1D(filters=64, kernel_size=2, dilation_rate=1, activation='relu', input_shape=(time_step, 1)))model.add(Conv1D(filters=64, kernel_size=2, dilation_rate=2, activation='relu'))model.add(Conv1D(filters=64, kernel_size=2, dilation_rate=4, activation='relu'))model.add(Flatten())model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the modelmodel.fit(X, y, epochs=10, batch_size=16)
# Make predictionspredictions = model.predict(X)predictions = scaler.inverse_transform(predictions)print(predictions)

3.6 DeepAR

DeepAR:由Amazon开发,是一种自回归循环网络,设计用于时间序列预测。

import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import LSTM, Dense, Flatten
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for DeepAR-like modeldef create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X, y = create_dataset(scaled_data, time_step)X = X.reshape(X.shape[0], X.shape[1], 1)
# Define DeepAR-like modelmodel = Sequential()model.add(LSTM(50, return_sequences=True, input_shape=(time_step, 1)))model.add(LSTM(50))model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the modelmodel.fit(X, y, epochs=10, batch_size=16)
# Make predictionspredictions = model.predict(X)predictions = scaler.inverse_transform(predictions)print(predictions)

时间序列预测是一个复杂但迷人的领域,随着技术的发展,用于时间序列预测的工具和方法将变得更加精细,为各个领域提供创新和改进的新机会。

地址:https://medium.com/@palashm0002/predicting-time-series-data-with-machine-learning-generative-ai-and-deep-learning-36bf99ad6f5e

 文章标题:Predicting Time Series Data with Machine Learning, Generative AI, and Deep Learning

作者:Palash Mishra


本文内容仅仅是技术探讨和学习,并不构成任何投资建议。

转发请注明原作者和出处。

Published inAI&Invest专栏

Be First to Comment

发表回复