作者:老余捞鱼
原创不易,转载请标明出处及原作者。

写在前面的话:本文转自一篇论文,主要讨论了在不同行业中时间序列预测的重要性,以及如何利用机器学习、生成式人工智能(Generative AI)和深度学习来提高预测的准确性。时间序列数据是按特定时间间隔收集或记录的数据点序列,例如股票价格、天气数据、销售数字和传感器读数。预测未来值的能力可以显著改进决策过程和运营效率。
本文介绍了包括ARIMA、SARIMA、Prophet、XGBoost、GANs、WaveNet、LSTM、GRU、Transformer、Seq2Seq、TCN和DeepAR在内的多种时间序列预测模型,介绍了各自模型的特点以及具体代码。如我们首先从机器学习方法开始。
一、机器学习方法
1.1 ARIMA(自回归积分滑动平均模型)
ARIMA(自回归积分滑动平均模型)一种经典的统计方法,结合了自回归(AR)、差分(使数据平稳)和滑动平均(MA)模型。

import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Fit ARIMA model
model = ARIMA(time_series_data['Value'], order=(5, 1, 0)) # (p,d,q)
model_fit = model.fit()
# Make predictions
predictions = model_fit.forecast(steps=10)
print(predictions)
1.2 SARIMA(季节性ARIMA)
SARIMA(季节性ARIMA)是在ARIMA的基础上增加了季节性效应的考虑。

import pandas as pd
import numpy as np
from statsmodels.tsa.statespace.sarimax import SARIMAX
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Fit SARIMA model
model = SARIMAX(time_series_data['Value'], order=(1, 1, 1), seasonal_order=(1, 1, 1, 12)) # (p,d,q) (P,D,Q,s)
model_fit = model.fit(disp=False)
# Make predictions
predictions = model_fit.forecast(steps=10)
print(predictions)
1.3 Prophet
Prophet由Facebook开发,适用于处理缺失数据和异常值,并提供可靠的不确定性区间。

from fbprophet import Prophetimport pandas as pd
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.rename(columns={'Date': 'ds', 'Value': 'y'}, inplace=True)
# Fit Prophet modelmodel = Prophet()model.fit(time_series_data)
# Make future dataframe and predictionsfuture = model.make_future_dataframe(periods=10)forecast = model.predict(future)print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']])
1.4 XGBoost
XGBoost是一种梯度增强框架,通过将问题转化为监督学习任务来进行时间序列预测。

import pandas as pdimport numpy as npfrom xgboost import XGBRegressorfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import mean_squared_error
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for supervised learningdef create_lag_features(data, lag=1): df = data.copy() for i in range(1, lag + 1): df[f'lag_{i}'] = df['Value'].shift(i) return df.dropna()
lag = 5data_with_lags = create_lag_features(time_series_data, lag=lag)X = data_with_lags.drop('Value', axis=1)y = data_with_lags['Value']
# Split the data into training and testing setsX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# Fit XGBoost modelmodel = XGBRegressor(objective='reg:squarederror', n_estimators=1000)model.fit(X_train, y_train)
# Make predictionsy_pred = model.predict(X_test)mse = mean_squared_error(y_test, y_pred)print(f'Mean Squared Error: {mse}')
二、生成式AI方法
2.1 GANs(生成对抗网络)
GANs(生成对抗网络)由生成器和鉴别器组成,可以生成合理的未来序列。

import numpy as np
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv1D, MaxPooling1D, Flatten, LeakyReLU, Reshape
from tensorflow.keras.optimizers import Adam
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Prepare data for GAN
def create_dataset(dataset, time_step=1):
X, Y = [], []
for i in range(len(dataset)-time_step-1):
a = dataset[i:(i+time_step), 0]
X.append(a)
Y.append(dataset[i + time_step, 0])
return np.array(X), np.array(Y)
time_step = 10
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X_train, y_train = create_dataset(scaled_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
# GAN components
def build_generator():
model = Sequential()
model.add(Dense(100, input_dim=time_step))
model.add(LeakyReLU(alpha=0.2))
model.add(Dense(time_step, activation='tanh'))
model.add(Reshape((time_step, 1)))
return model
def build_discriminator():
model = Sequential()
model.add(LSTM(50, input_shape=(time_step, 1)))
model.add(Dense(1, activation='sigmoid'))
return model
# Build and compile the discriminator
discriminator = build_discriminator()
discriminator.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])
# Build the generator
generator = build_generator()
# The generator takes noise as input and generates data
z = Input(shape=(time_step,))
generated_data = generator(z)
# For the combined model, we will only train the generator
discriminator.trainable = False
# The discriminator takes generated data as input and determines validity
validity = discriminator(generated_data)
# The combined model (stacked generator and discriminator)
combined = Model(z, validity)
combined.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5))
# Training the GAN
epochs = 10000
batch_size = 32
valid = np.ones((batch_size, 1))
fake = np.zeros((batch_size, 1))
for epoch in range(epochs):
# ---------------------
# Train Discriminator
# ---------------------
# Select a random batch of real data
idx = np.random.randint(0, X_train.shape[0], batch_size)
real_data = X_train[idx]
# Generate a batch of fake data
noise = np.random.normal(0, 1, (batch_size, time_step))
gen_data = generator.predict(noise)
# Train the discriminator
d_loss_real = discriminator.train_on_batch(real_data, valid)
d_loss_fake = discriminator.train_on_batch(gen_data, fake)
d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
# ---------------------
# Train Generator
# ---------------------
noise = np.random.normal(0, 1, (batch_size, time_step))
# Train the generator (to have the discriminator label samples as valid)
g_loss = combined.train_on_batch(noise, valid)
# Print the progress
if epoch % 1000 == 0:
print(f"{epoch} [D loss: {d_loss[0]} | D accuracy: {100*d_loss[1]}] [G loss: {g_loss}]")
# Make predictions
noise = np.random.normal(0, 1, (1, time_step))
generated_prediction = generator.predict(noise)
generated_prediction = scaler.inverse_transform(generated_prediction)
print(generated_prediction)
2.2 WaveNet
WaveNet由DeepMind开发,最初用于音频生成,但已适应于时间序列预测。

import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv1D, Add, Activation, Multiply, Lambda, Dense, Flatten
from tensorflow.keras.optimizers import Adam
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Prepare data for WaveNet
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
def create_dataset(dataset, time_step=1):
X, Y = [], []
for i in range(len(dataset)-time_step-1):
a = dataset[i:(i+time_step), 0]
X.append(a)
Y.append(dataset[i + time_step, 0])
return np.array(X), np.array(Y)
time_step = 10
X, y = create_dataset(scaled_data, time_step)
X = X.reshape(X.shape[0], X.shape[1], 1)
# Define WaveNet model
def residual_block(x, dilation_rate):
tanh_out = Conv1D(32, kernel_size=2, dilation_rate=dilation_rate, padding='causal', activation='tanh')(x)
sigm_out = Conv1D(32, kernel_size=2, dilation_rate=dilation_rate, padding='causal', activation='sigmoid')(x)
out = Multiply()([tanh_out, sigm_out])
out = Conv1D(32, kernel_size=1, padding='same')(out)
out = Add()([out, x])
return out
input_layer = Input(shape=(time_step, 1))
out = Conv1D(32, kernel_size=2, padding='causal', activation='tanh')(input_layer)
skip_connections = []
for i in range(10):
out = residual_block(out, 2**i)
skip_connections.append(out)
out = Add()(skip_connections)
out = Activation('relu')(out)
out = Conv1D(1, kernel_size=1, activation='relu')(out)
out = Flatten()(out)
out = Dense(1)(out)
model = Model(input_layer, out)
model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
# Train the model
model.fit(X, y, epochs=10, batch_size=16)
# Make predictions
predictions = model.predict(X)
predictions = scaler.inverse_transform(predictions)
print(predictions)
三、深度学习方法
3.1 LSTM(长短期记忆网络)
LSTM(长短期记忆网络)是LSTM(长短期记忆网络)。

import numpy as np
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Prepare data for LSTM
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
train_size = int(len(scaled_data) * 0.8)
train_data = scaled_data[:train_size]
test_data = scaled_data[train_size:]
def create_dataset(dataset, time_step=1):
X, Y = [], []
for i in range(len(dataset)-time_step-1):
a = dataset[i:(i+time_step), 0]
X.append(a)
Y.append(dataset[i + time_step, 0])
return np.array(X), np.array(Y)
time_step = 10
X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# Build LSTM model
model = Sequential()
model.add(LSTM(50, return_sequences=True, input_shape=(time_step, 1)))
model.add(LSTM(50, return_sequences=False))
model.add(Dense(25))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, batch_size=1, epochs=1)
# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)
print(test_predict)
3.2 GRU(门控循环单元)
GRU(门控循环单元)属于LSTM的变体,更简单,通常在时间序列任务中表现同样出色。

import numpy as np
import pandas as pd
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense
from sklearn.preprocessing import MinMaxScaler
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Prepare data for GRU
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
train_size = int(len(scaled_data) * 0.8)
train_data = scaled_data[:train_size]
test_data = scaled_data[train_size:]
def create_dataset(dataset, time_step=1):
X, Y = [], []
for i in range(len(dataset)-time_step-1):
a = dataset[i:(i+time_step), 0]
X.append(a)
Y.append(dataset[i + time_step, 0])
return np.array(X), np.array(Y)
time_step = 10
X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# Build GRU model
model = Sequential()
model.add(GRU(50, return_sequences=True, input_shape=(time_step, 1)))
model.add(GRU(50, return_sequences=False))
model.add(Dense(25))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, batch_size=1, epochs=1)
# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)
print(test_predict)
3.3 Transformer模型
Transformer模型在自然语言处理任务中非常成功,已被适应于时间序列预测。

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Conv1D, MaxPooling1D, Flatten, MultiHeadAttention, LayerNormalization, Dropout
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Prepare data
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
train_size = int(len(scaled_data) * 0.8)
train_data = scaled_data[:train_size]
test_data = scaled_data[train_size:]
def create_dataset(dataset, time_step=1):
X, Y = [], []
for i in range(len(dataset)-time_step-1):
a = dataset[i:(i+time_step), 0]
X.append(a)
Y.append(dataset[i + time_step, 0])
return np.array(X), np.array(Y)
time_step = 10
X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
# Build Transformer model
model = Sequential()
model.add(MultiHeadAttention(num_heads=4, key_dim=2, input_shape=(time_step, 1)))
model.add(LayerNormalization())
model.add(Dense(50, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
model.fit(X_train, y_train, batch_size=1, epochs=1)
# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)
print(test_predict)
3.4 Seq2Seq(序列到序列)模型
Seq2Seq(序列到序列)模型用于预测数据序列,通过学习输入序列到输出序列的映射。

import numpy as np
import pandas as pd
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense
# Load your time series data
time_series_data = pd.read_csv('time_series_data.csv')
time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])
time_series_data.set_index('Date', inplace=True)
# Prepare data for Seq2Seq
def create_dataset(dataset, time_step=1):
X, Y = [], []
for i in range(len(dataset)-time_step-1):
a = dataset[i:(i+time_step), 0]
X.append(a)
Y.append(dataset[i + time_step, 0])
return np.array(X), np.array(Y)
time_step = 10
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X, y = create_dataset(scaled_data, time_step)
X = X.reshape(X.shape[0], X.shape[1], 1)
# Define Seq2Seq model
encoder_inputs = Input(shape=(time_step, 1))
encoder = LSTM(50, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)
decoder_inputs = Input(shape=(time_step, 1))
decoder_lstm = LSTM(50, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=[state_h, state_c])
decoder_dense = Dense(1)
decoder_outputs = decoder_dense(decoder_outputs)
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the model
model.fit([X, X], y, epochs=10, batch_size=16)
# Make predictions
predictions = model.predict([X, X])
predictions = scaler.inverse_transform(predictions)
print(predictions)
3.5 TCN(时序卷积网络)
TCN(时序卷积网络)使用扩张卷积来捕捉时间序列数据中的长期依赖关系。

import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import Conv1D, Dense, Flatten
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for TCNdef create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X, y = create_dataset(scaled_data, time_step)X = X.reshape(X.shape[0], X.shape[1], 1)
# Define TCN modelmodel = Sequential()model.add(Conv1D(filters=64, kernel_size=2, dilation_rate=1, activation='relu', input_shape=(time_step, 1)))model.add(Conv1D(filters=64, kernel_size=2, dilation_rate=2, activation='relu'))model.add(Conv1D(filters=64, kernel_size=2, dilation_rate=4, activation='relu'))model.add(Flatten())model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the modelmodel.fit(X, y, epochs=10, batch_size=16)
# Make predictionspredictions = model.predict(X)predictions = scaler.inverse_transform(predictions)print(predictions)
3.6 DeepAR
DeepAR:由Amazon开发,是一种自回归循环网络,设计用于时间序列预测。

import numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerfrom tensorflow.keras.models import Sequentialfrom tensorflow.keras.layers import LSTM, Dense, Flatten
# Load your time series datatime_series_data = pd.read_csv('time_series_data.csv')time_series_data['Date'] = pd.to_datetime(time_series_data['Date'])time_series_data.set_index('Date', inplace=True)
# Prepare data for DeepAR-like modeldef create_dataset(dataset, time_step=1): X, Y = [], [] for i in range(len(dataset)-time_step-1): a = dataset[i:(i+time_step), 0] X.append(a) Y.append(dataset[i + time_step, 0]) return np.array(X), np.array(Y)
time_step = 10scaler = MinMaxScaler(feature_range=(0, 1))scaled_data = scaler.fit_transform(time_series_data['Value'].values.reshape(-1, 1))
X, y = create_dataset(scaled_data, time_step)X = X.reshape(X.shape[0], X.shape[1], 1)
# Define DeepAR-like modelmodel = Sequential()model.add(LSTM(50, return_sequences=True, input_shape=(time_step, 1)))model.add(LSTM(50))model.add(Dense(1))
model.compile(optimizer='adam', loss='mean_squared_error')
# Train the modelmodel.fit(X, y, epochs=10, batch_size=16)
# Make predictionspredictions = model.predict(X)predictions = scaler.inverse_transform(predictions)print(predictions)
时间序列预测是一个复杂但迷人的领域,随着技术的发展,用于时间序列预测的工具和方法将变得更加精细,为各个领域提供创新和改进的新机会。
文章标题:Predicting Time Series Data with Machine Learning, Generative AI, and Deep Learning
作者:Palash Mishra
本文内容仅仅是技术探讨和学习,并不构成任何投资建议。
转发请注明原作者和出处。
Be First to Comment