作者:老余捞鱼
原创不易,转载请标明出处及原作者。

写在前面的话:最近遇到了强大的 Time Mixer 模型,该模型以在复杂数据集上提供令人印象深刻的结果而闻名。出于好奇,我决定将其应用于我在 Kaggle 上找到的数据集,其中包含 Microsoft 的历史股票价格。在本文中,我们将探讨如何利用 Time Mixer 来预测 Microsoft 股票未来某个时段的实际收盘价,从而展示其在财务时间序列预测方面的潜力。
一、什么是Time Mixer?
时间混合器代表了时间序列分析向前迈出的重要一步,尤其是在处理目标变量随时间变化的复杂变化时。像 ARIMA 这样的传统模型可能难以捕捉这些复杂的模式,从而导致预测结果不佳。这就是 Time Mixer 的亮点。
Time Mixer 属于一类称为 Mixer 模型的新模型。这些模型旨在识别序列中的依赖关系和隐藏模式,就像转换器处理文本或图像的方式一样。与逐步处理序列的递归神经网络 (RNN) 和长短期记忆 (LSTM) 网络不同,时间混合器并行运行,使其能够更有效地处理序列。此外,与 transformer 相比,它更简单、更精简,使其成为传统方法不足的时间序列分析的令人兴奋的选择
时间混合器设计:
- Time-mixer 专门针对时间序列数据,其中涉及按时间顺序索引的数据点序列。它旨在有效地对时间依赖关系进行建模,同时捕获短期和长期模式。
- 该模型通常包括混合层,这些层处理 Importing 序列的方式允许模型学习复杂的时间模式,而无需明确的注意机制。

二、探索性数据分析 (EDA)
data = pd.read_csv(“/content/Microsoft_Stock.csv”)
让我们从安装库开始,使用最常用的PIP安装模式。
pip install pandas
pip install numpy
pip install tensorflow
pip install sklearn
pip install matplotlib
让我们深入研究数据,看看数据长什么样。
microsoft_data = pd.read_csv("/content/Microsoft_Stock.csv")
microsoft_data.head()

所以,这个数据框看起来有6列,分别是日期、开盘价、最高价、最低价、收盘价和成交量。让我们检查一下所有列的数据类型以及数据帧的形状。这个数据也是我们平时能接触到的最常见的股票数据。
microsoft_data.info()

我们可以看到 Date 列的数据类型为 object,数据集中的行数为 1511,让我们先将 Date 列的数据类型转换为 datetime。
microsoft_data['Date']=microsoft_data.Date.astype('datetime64[ns]')
让我们绘制一个条形图来查看一年中的变化。
stock_volume = microsoft_data.groupby(microsoft_data.Date.dt.year).Volume.max().reset_index()
stock_volume
plt.figure(figsize=[16,5])
plt.bar(stock_volume.Date,stock_volume.Volume)
plt.xlabel('Date')
plt.ylabel('Volume')
plt.title("Volume with Year")

从图表上看,我们可以看到 2019 年的交易量下降,这是他们的交易量最低的地方,我们在此图表中取了每个最大交易量。让我们创建更多列,以便更好地理解数据集。
microsoft_data = microsoft_data.set_index('Date')
microsoft_data['year'] = microsoft_data.index.year
microsoft_data['month'] = microsoft_data.index.month
microsoft_data['day'] = microsoft_data.index.day
microsoft_data['hour'] = microsoft_data.index.hour
microsoft_data['dayofweek'] = microsoft_data.index.dayofweek
microsoft_data['dayofyear'] = microsoft_data.index.dayofyear
microsoft_data['weekofyear'] = microsoft_data.index.isocalendar().week
microsot_data['quarter'] = microsot_data.index.quarter
这就是数据集中新添加的列的值。

现在,让我们看看多年来成交量的子图。
import seaborn as sns
color_pal = sns.color_palette('tab10')
plt.subplots(figsize=(15, 5))
sns.lineplot(microsoft_data=microsoft_data, x='month', y='Volume', hue='year',palette=color_pal, ci=False)
plt.show()

让我们使用图表可视化收盘价多年来的趋势。
data['Close'].plot(figsize=[15,7])
plt.xlabel("Date")
plt.ylabel("Close")
plt.plot()

好吧,多年来价格显著增长,但在 2020 年有所下降。
三、TS-Mixer 模型
现在,让我们构建我们的模型。我们将从导入所需的库开始。
#importing necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np
我已经为EDA(探索性数据分析)和TS-mixer分别创建了一个单独的文件。我建议为它们各自创建一个单独的文件,这样可以帮助你更容易地找到它们,并且以这种方式组织事物会更加有序。让我们从上传CSV文件到数据框开始。
data = pd.read_csv("/content/Microsoft_Stock.csv")
# Convert the 'Date' column to datetime
data['Date'] = pd.to_datetime(data['Date'])
现在我们将使用最小-最大缩放器(Min Max Scaler)来归一化我们的列。
#Normalizing the data
scaler = MinMaxScaler()
data[['Open', 'High', 'Low', 'Close', 'Volume']] = scaler.fit_transform(data[['Open', 'High', 'Low', 'Close', 'Volume']])
DataFrame 转化到numpy array格式。
values = data[['Open', 'High', 'Low', 'Close', 'Volume']].values
我们将提供一个序列长度并创建我们的数据集。序列长度指的是在时间序列模型中用于进行预测的时间段数量。在我们的例子中,它指的是在预测下一天的价格时考虑的前多少天(或观察值)。值得注意的是,文章中选取的target是收盘价,非常不建议使用收盘价为预测目标,实际使用时,可以尝试使用第二天的涨跌,或者是涨跌幅为预测目标。
def create_sequences(data, seq_length):
xs, ys = [], []
for i in range(len(data) - seq_length):
x = data[i:i+seq_length]
y = data[i+seq_length, 3] # Close price as target
xs.append(x)
ys.append(y)
return torch.tensor(xs, dtype=torch.float32), torch.tensor(ys, dtype=torch.float32)
# Set the sequence length to 180
seq_length = 180
X, y = create_sequences(values, seq_length)
将数据拆分为训练数据集和测试数据集。
train_size = int(X.shape[0] * 0.8)
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]
创建 DataLoader。
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=32)
设置我们的模型。
class TSMixer(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(TSMixer, self).__init__()
self.mixer = nn.Sequential(
nn.Conv1d(in_channels=input_size, out_channels=hidden_size, kernel_size=1),
nn.ReLU(),
*[nn.Sequential(
nn.Conv1d(in_channels=hidden_size, out_channels=hidden_size, kernel_size=1),
nn.ReLU()) for _ in range(num_layers)],
nn.Conv1d(in_channels=hidden_size, out_channels=output_size, kernel_size=1)
)
def forward(self, x):
x = x.transpose(1, 2)
x = self.mixer(x)
x = x.transpose(1, 2)
return x[:, -1, :]
初始化模型、损失函数和优化器。
input_size = X_train.shape[2] # 5 features: Open, High, Low, Close, Volume
hidden_size = 64
num_layers = 2
output_size = 1
model = TSMixer(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
训练我们的模型。
epochs = 50
for epoch in range(epochs):
model.train()
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y.unsqueeze(1))
loss.backward()
optimizer.step()
print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')
# 4. Prediction
model.eval()
predictions = []
with torch.no_grad():
for batch_X, _ in test_loader:
pred = model(batch_X)
predictions.append(pred)
# Convert predictions to a numpy array and invert normalization
# Create a placeholder for inverse scaling
predictions = torch.cat(predictions).numpy()
# Initialize an array of zeros with the same number of features as the original data
predicted_prices_full = np.zeros((predictions.shape[0], values.shape[1]))
# Place the predictions in the 'Close' column (3rd index)
predicted_prices_full[:, 3] = predictions[:, 0]
# Inverse transform
predicted_prices_full = scaler.inverse_transform(predicted_prices_full)
# Extract the 'Close' prices from the inverse-transformed data
predicted_prices = predicted_prices_full[:, 3]
# Convert predictions to a DataFrame and save to a CSV file
predicted_prices_df = pd.DataFrame(predicted_prices, columns=['Predicted_Close'])
predicted_prices_df.to_csv('predicted_prices.csv', index=False)
print("Predictions saved to predicted_prices.csv")
这里我取了 epoch 值 为50。

输出会是这样的,您还可以根据数据需要更改学习率的值。是时候检查模型的准确性了。
y_test_np = y_test.numpy().reshape(-1, 1)
# Create an array with the same shape as the original features, filled with zeros
y_test_full = np.zeros((y_test_np.shape[0], values.shape[1]))
# Place y_test_np in the Close column (assuming it's the 4th column as before)
y_test_full[:, 3] = y_test_np[:, 0]
# Apply inverse transform only on the relevant column
actual_prices_full = scaler.inverse_transform(y_test_full)
# Extract the actual Close prices
actual_prices = actual_prices_full[:, 3]
# Calculate the MSE between actual and predicted Close prices
mse = mean_squared_error(actual_prices, predicted_prices)
print(f"Mean Squared Error: {mse}")
# Matching predictions with dates
predicted_dates = data['Date'].iloc[train_size + seq_length:].reset_index(drop=True)
# Combine dates, actual prices, and predicted prices into a DataFrame
predicted_prices_df = pd.DataFrame({
'Date': predicted_dates,
'Actual_Close': actual_prices,
'Predicted_Close': predicted_prices
})
# Save the predictions with dates
predicted_prices_df.to_csv('predicted_prices_with_dates.csv', index=False)
print("Predictions with dates saved to predicted_prices_with_dates.csv")
# Visualization
plt.figure(figsize=(12, 6))
plt.plot(predicted_prices_df['Date'], predicted_prices_df['Actual_Close'], label='Actual Close Price')
plt.plot(predicted_prices_df['Date'], predicted_prices_df['Predicted_Close'], label='Predicted Close Price')
plt.xlabel('Date')
plt.ylabel('Close Price')
plt.title('Actual vs Predicted Close Prices')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
四、结果展示
实际收盘价和预测收盘价的图表如下所示:

当 MSE(均方误差)分数为 21.56 时,结果非常惊人,可以通过更改输入层的值来进一步减小该值,但在增加层时应小心,因为它可能会导致过度拟合。所以,这就是我们的模型的工作原理,您可以通过将其结果与 ARIMA、Exponential Smoothing 等其他模型进行比较来进一步分析其效率。
本文内容仅仅是技术探讨和学习,并不构成任何投资建议。
转发请注明原作者和出处。
Be First to Comment