作者:老余捞鱼
原创不易,转载请标明出处及原作者。

写在前面的话:最近遇到了强大的 Time Mixer 模型,该模型以在复杂数据集上提供令人印象深刻的结果而闻名。出于好奇,我决定将其应用于我在 Kaggle 上找到的数据集,其中包含 Microsoft 的历史股票价格。在本文中,我们将探讨如何利用 Time Mixer 来预测 Microsoft 股票未来某个时段的实际收盘价,从而展示其在财务时间序列预测方面的潜力。
一、什么是Time Mixer?
时间混合器代表了时间序列分析向前迈出的重要一步,尤其是在处理目标变量随时间变化的复杂变化时。像 ARIMA 这样的传统模型可能难以捕捉这些复杂的模式,从而导致预测结果不佳。这就是 Time Mixer 的亮点。
Time Mixer 属于一类称为 Mixer 模型的新模型。这些模型旨在识别序列中的依赖关系和隐藏模式,就像转换器处理文本或图像的方式一样。与逐步处理序列的递归神经网络 (RNN) 和长短期记忆 (LSTM) 网络不同,时间混合器并行运行,使其能够更有效地处理序列。此外,与 transformer 相比,它更简单、更精简,使其成为传统方法不足的时间序列分析的令人兴奋的选择
时间混合器设计:
- Time-mixer 专门针对时间序列数据,其中涉及按时间顺序索引的数据点序列。它旨在有效地对时间依赖关系进行建模,同时捕获短期和长期模式。
- 该模型通常包括混合层,这些层处理 Importing 序列的方式允许模型学习复杂的时间模式,而无需明确的注意机制。

二、探索性数据分析 (EDA)
data = pd.read_csv(“/content/Microsoft_Stock.csv”)让我们从安装库开始,使用最常用的PIP安装模式。
pip install pandas
pip install numpy
pip install tensorflow
pip install sklearn
pip install matplotlib让我们深入研究数据,看看数据长什么样。
microsoft_data = pd.read_csv("/content/Microsoft_Stock.csv")
microsoft_data.head()
所以,这个数据框看起来有6列,分别是日期、开盘价、最高价、最低价、收盘价和成交量。让我们检查一下所有列的数据类型以及数据帧的形状。这个数据也是我们平时能接触到的最常见的股票数据。
microsoft_data.info()
我们可以看到 Date 列的数据类型为 object,数据集中的行数为 1511,让我们先将 Date 列的数据类型转换为 datetime。
microsoft_data['Date']=microsoft_data.Date.astype('datetime64[ns]')让我们绘制一个条形图来查看一年中的变化。
stock_volume = microsoft_data.groupby(microsoft_data.Date.dt.year).Volume.max().reset_index()
stock_volume
plt.figure(figsize=[16,5])
plt.bar(stock_volume.Date,stock_volume.Volume)
plt.xlabel('Date')
plt.ylabel('Volume')
plt.title("Volume with Year")
从图表上看,我们可以看到 2019 年的交易量下降,这是他们的交易量最低的地方,我们在此图表中取了每个最大交易量。让我们创建更多列,以便更好地理解数据集。
microsoft_data = microsoft_data.set_index('Date')
microsoft_data['year'] = microsoft_data.index.year
microsoft_data['month'] = microsoft_data.index.month
microsoft_data['day'] = microsoft_data.index.day
microsoft_data['hour'] = microsoft_data.index.hour
microsoft_data['dayofweek'] = microsoft_data.index.dayofweek
microsoft_data['dayofyear'] = microsoft_data.index.dayofyear
microsoft_data['weekofyear'] = microsoft_data.index.isocalendar().week
microsot_data['quarter'] = microsot_data.index.quarter这就是数据集中新添加的列的值。

现在,让我们看看多年来成交量的子图。
import seaborn as sns
color_pal = sns.color_palette('tab10')
plt.subplots(figsize=(15, 5))
sns.lineplot(microsoft_data=microsoft_data, x='month', y='Volume', hue='year',palette=color_pal, ci=False)
plt.show()
让我们使用图表可视化收盘价多年来的趋势。
data['Close'].plot(figsize=[15,7])
plt.xlabel("Date")
plt.ylabel("Close")
plt.plot()
好吧,多年来价格显著增长,但在 2020 年有所下降。
三、TS-Mixer 模型
现在,让我们构建我们的模型。我们将从导入所需的库开始。
#importing necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import numpy as np我已经为EDA(探索性数据分析)和TS-mixer分别创建了一个单独的文件。我建议为它们各自创建一个单独的文件,这样可以帮助你更容易地找到它们,并且以这种方式组织事物会更加有序。让我们从上传CSV文件到数据框开始。
data = pd.read_csv("/content/Microsoft_Stock.csv")
# Convert the 'Date' column to datetime
data['Date'] = pd.to_datetime(data['Date'])现在我们将使用最小-最大缩放器(Min Max Scaler)来归一化我们的列。
#Normalizing the data
scaler = MinMaxScaler()
data[['Open', 'High', 'Low', 'Close', 'Volume']] = scaler.fit_transform(data[['Open', 'High', 'Low', 'Close', 'Volume']])DataFrame 转化到numpy array格式。
values = data[['Open', 'High', 'Low', 'Close', 'Volume']].values我们将提供一个序列长度并创建我们的数据集。序列长度指的是在时间序列模型中用于进行预测的时间段数量。在我们的例子中,它指的是在预测下一天的价格时考虑的前多少天(或观察值)。值得注意的是,文章中选取的target是收盘价,非常不建议使用收盘价为预测目标,实际使用时,可以尝试使用第二天的涨跌,或者是涨跌幅为预测目标。
def create_sequences(data, seq_length):
xs, ys = [], []
for i in range(len(data) - seq_length):
x = data[i:i+seq_length]
y = data[i+seq_length, 3] # Close price as target
xs.append(x)
ys.append(y)
return torch.tensor(xs, dtype=torch.float32), torch.tensor(ys, dtype=torch.float32)
# Set the sequence length to 180
seq_length = 180
X, y = create_sequences(values, seq_length)将数据拆分为训练数据集和测试数据集。
train_size = int(X.shape[0] * 0.8)
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]创建 DataLoader。
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=32)设置我们的模型。
class TSMixer(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(TSMixer, self).__init__()
self.mixer = nn.Sequential(
nn.Conv1d(in_channels=input_size, out_channels=hidden_size, kernel_size=1),
nn.ReLU(),
*[nn.Sequential(
nn.Conv1d(in_channels=hidden_size, out_channels=hidden_size, kernel_size=1),
nn.ReLU()) for _ in range(num_layers)],
nn.Conv1d(in_channels=hidden_size, out_channels=output_size, kernel_size=1)
)
def forward(self, x):
x = x.transpose(1, 2)
x = self.mixer(x)
x = x.transpose(1, 2)
return x[:, -1, :]初始化模型、损失函数和优化器。
input_size = X_train.shape[2] # 5 features: Open, High, Low, Close, Volume
hidden_size = 64
num_layers = 2
output_size = 1
model = TSMixer(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)训练我们的模型。
epochs = 50
for epoch in range(epochs):
model.train()
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y.unsqueeze(1))
loss.backward()
optimizer.step()
print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item()}')
# 4. Prediction
model.eval()
predictions = []
with torch.no_grad():
for batch_X, _ in test_loader:
pred = model(batch_X)
predictions.append(pred)
# Convert predictions to a numpy array and invert normalization
# Create a placeholder for inverse scaling
predictions = torch.cat(predictions).numpy()
# Initialize an array of zeros with the same number of features as the original data
predicted_prices_full = np.zeros((predictions.shape[0], values.shape[1]))
# Place the predictions in the 'Close' column (3rd index)
predicted_prices_full[:, 3] = predictions[:, 0]
# Inverse transform
predicted_prices_full = scaler.inverse_transform(predicted_prices_full)
# Extract the 'Close' prices from the inverse-transformed data
predicted_prices = predicted_prices_full[:, 3]
# Convert predictions to a DataFrame and save to a CSV file
predicted_prices_df = pd.DataFrame(predicted_prices, columns=['Predicted_Close'])
predicted_prices_df.to_csv('predicted_prices.csv', index=False)
print("Predictions saved to predicted_prices.csv")这里我取了 epoch 值 为50。

输出会是这样的,您还可以根据数据需要更改学习率的值。是时候检查模型的准确性了。
y_test_np = y_test.numpy().reshape(-1, 1)
# Create an array with the same shape as the original features, filled with zeros
y_test_full = np.zeros((y_test_np.shape[0], values.shape[1]))
# Place y_test_np in the Close column (assuming it's the 4th column as before)
y_test_full[:, 3] = y_test_np[:, 0]
# Apply inverse transform only on the relevant column
actual_prices_full = scaler.inverse_transform(y_test_full)
# Extract the actual Close prices
actual_prices = actual_prices_full[:, 3]
# Calculate the MSE between actual and predicted Close prices
mse = mean_squared_error(actual_prices, predicted_prices)
print(f"Mean Squared Error: {mse}")
# Matching predictions with dates
predicted_dates = data['Date'].iloc[train_size + seq_length:].reset_index(drop=True)
# Combine dates, actual prices, and predicted prices into a DataFrame
predicted_prices_df = pd.DataFrame({
'Date': predicted_dates,
'Actual_Close': actual_prices,
'Predicted_Close': predicted_prices
})
# Save the predictions with dates
predicted_prices_df.to_csv('predicted_prices_with_dates.csv', index=False)
print("Predictions with dates saved to predicted_prices_with_dates.csv")
# Visualization
plt.figure(figsize=(12, 6))
plt.plot(predicted_prices_df['Date'], predicted_prices_df['Actual_Close'], label='Actual Close Price')
plt.plot(predicted_prices_df['Date'], predicted_prices_df['Predicted_Close'], label='Predicted Close Price')
plt.xlabel('Date')
plt.ylabel('Close Price')
plt.title('Actual vs Predicted Close Prices')
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()四、结果展示
实际收盘价和预测收盘价的图表如下所示:

当 MSE(均方误差)分数为 21.56 时,结果非常惊人,可以通过更改输入层的值来进一步减小该值,但在增加层时应小心,因为它可能会导致过度拟合。所以,这就是我们的模型的工作原理,您可以通过将其结果与 ARIMA、Exponential Smoothing 等其他模型进行比较来进一步分析其效率。
本文内容仅仅是技术探讨和学习,并不构成任何投资建议。
转发请注明原作者和出处。
Be First to Comment