作者:老余捞鱼
原创不易,转载请标明出处及原作者。

写在前面的话:Transformer 是一种在自然语言处理等领域广泛应用的深度学习架构,与传统的循环神经网络(RNN)相比,Transformer 可以并行处理输入序列的各个位置,大大提高了计算效率。而且通过多层的深度堆叠,能够学习到更复杂和抽象的特征表示。本文将利用Python代码来实现美股价格的预测模拟。
话不多说,代码如下:
import numpy as np
import pandas as pd
import os, datetime
import tensorflow as tf
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
print('Tensorflow version: {}'.format(tf.__version__))
import matplotlib.pyplot as plt
plt.style.use('seaborn')
import warnings
warnings.filterwarnings('ignore')
physical_devices = tf.config.list_physical_devices()
print('Physical devices: {}'.format(physical_devices))
# Filter out the CPUs and keep only the GPUs
gpus = [device for device in physical_devices if 'GPU' in device.device_type]
# If GPUs are available, set memory growth to True
if len(gpus) > 0:
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)
print('GPU memory growth: True')Tensorflow version: 2.9.1
Physical devices: [PhysicalDevice(name=’/physical_device:CPU:0′, device_type=’CPU’)]
Hyperparameters
batch_size = 32
seq_len = 128
d_k = 256
d_v = 256
n_heads = 12
ff_dim = 256Load IBM data
IBM_path = 'IBM.csv'
df = pd.read_csv(IBM_path, delimiter=',', usecols=['Date', 'Open', 'High', 'Low', 'Close', 'Volume'])
# Replace 0 to avoid dividing by 0 later ondf['Volume'].replace(to_replace=0, method='ffill', inplace=True) df.sort_values('Date', inplace=True)df.tail() df.head()

# print the shape of the datasetprint('Shape of the dataframe: {}'.format(df.shape))Shape of the dataframe: (14588, 6)Plot daily IBM closing prices and volume
fig = plt.figure(figsize=(15,10))st = fig.suptitle("IBM Close Price and Volume", fontsize=20)st.set_y(0.92)
ax1 = fig.add_subplot(211)ax1.plot(df['Close'], label='IBM Close Price')ax1.set_xticks(range(0, df.shape[0], 1464))ax1.set_xticklabels(df['Date'].loc[::1464])ax1.set_ylabel('Close Price', fontsize=18)ax1.legend(loc="upper left", fontsize=12)
ax2 = fig.add_subplot(212)ax2.plot(df['Volume'], label='IBM Volume')ax2.set_xticks(range(0, df.shape[0], 1464))ax2.set_xticklabels(df['Date'].loc[::1464])ax2.set_ylabel('Volume', fontsize=18)ax2.legend(loc="upper left", fontsize=12)
Calculate normalized percentage change of all columns
'''Calculate percentage change'''
df['Open'] = df['Open'].pct_change() # Create arithmetic returns columndf['High'] = df['High'].pct_change() # Create arithmetic returns columndf['Low'] = df['Low'].pct_change() # Create arithmetic returns columndf['Close'] = df['Close'].pct_change() # Create arithmetic returns columndf['Volume'] = df['Volume'].pct_change()
df.dropna(how='any', axis=0, inplace=True) # Drop all rows with NaN values
###############################################################################'''Create indexes to split dataset'''
times = sorted(df.index.values)last_10pct = sorted(df.index.values)[-int(0.1*len(times))] # Last 10% of serieslast_20pct = sorted(df.index.values)[-int(0.2*len(times))] # Last 20% of series
###############################################################################'''Normalize price columns'''#min_return = min(df[(df.index < last_20pct)][['Open', 'High', 'Low', 'Close']].min(axis=0))max_return = max(df[(df.index < last_20pct)][['Open', 'High', 'Low', 'Close']].max(axis=0))
# Min-max normalize price columns (0-1 range)df['Open'] = (df['Open'] - min_return) / (max_return - min_return)df['High'] = (df['High'] - min_return) / (max_return - min_return)df['Low'] = (df['Low'] - min_return) / (max_return - min_return)df['Close'] = (df['Close'] - min_return) / (max_return - min_return)
###############################################################################'''Normalize volume column'''
min_volume = df[(df.index < last_20pct)]['Volume'].min(axis=0)max_volume = df[(df.index < last_20pct)]['Volume'].max(axis=0)
# Min-max normalize volume columns (0-1 range)df['Volume'] = (df['Volume'] - min_volume) / (max_volume - min_volume)
###############################################################################'''Create training, validation and test split'''
df_train = df[(df.index < last_20pct)] # Training data are 80% of total datadf_val = df[(df.index >= last_20pct) & (df.index < last_10pct)]df_test = df[(df.index >= last_10pct)]
# Remove date columndf_train.drop(columns=['Date'], inplace=True)df_val.drop(columns=['Date'], inplace=True)df_test.drop(columns=['Date'], inplace=True)
# Convert pandas columns into arraystrain_data = df_train.valuesval_data = df_val.valuestest_data = df_test.valuesprint('Training data shape: {}'.format(train_data.shape))print('Validation data shape: {}'.format(val_data.shape))print('Test data shape: {}'.format(test_data.shape))
df_train.head()Training data shape: (11678, 5)
Validation data shape: (1460, 5)
Test data shape: (1459, 5)

Plot daily changes of close prices and volume
fig = plt.figure(figsize=(15,12))st = fig.suptitle("Data Separation", fontsize=20)st.set_y(0.95)
###############################################################################
ax1 = fig.add_subplot(211)ax1.plot(np.arange(train_data.shape[0]), df_train['Close'], label='Training data')
ax1.plot(np.arange(train_data.shape[0], train_data.shape[0]+val_data.shape[0]), df_val['Close'], label='Validation data')
ax1.plot(np.arange(train_data.shape[0]+val_data.shape[0], train_data.shape[0]+val_data.shape[0]+test_data.shape[0]), df_test['Close'], label='Test data')ax1.set_xlabel('Date')ax1.set_ylabel('Normalized Closing Returns')ax1.set_title("Close Price", fontsize=18)ax1.legend(loc="best", fontsize=12)
###############################################################################
ax2 = fig.add_subplot(212)ax2.plot(np.arange(train_data.shape[0]), df_train['Volume'], label='Training data')
ax2.plot(np.arange(train_data.shape[0], train_data.shape[0]+val_data.shape[0]), df_val['Volume'], label='Validation data')
ax2.plot(np.arange(train_data.shape[0]+val_data.shape[0], train_data.shape[0]+val_data.shape[0]+test_data.shape[0]), df_test['Volume'], label='Test data')ax2.set_xlabel('Date')ax2.set_ylabel('Normalized Volume Changes')ax2.set_title("Volume", fontsize=18)ax2.legend(loc="best", fontsize=12)
Create chunks of training, validation and test data
# Training dataX_train, y_train = [], []for i in range(seq_len, len(train_data)): X_train.append(train_data[i-seq_len:i]) # Chunks of training data with a length of 128 df-rows y_train.append(train_data[:, 3][i]) #Value of 4th column (Close Price) of df-row 128+1X_train, y_train = np.array(X_train), np.array(y_train)
###############################################################################
# Validation dataX_val, y_val = [], []for i in range(seq_len, len(val_data)): X_val.append(val_data[i-seq_len:i]) y_val.append(val_data[:, 3][i])X_val, y_val = np.array(X_val), np.array(y_val)
###############################################################################
# Test dataX_test, y_test = [], []for i in range(seq_len, len(test_data)): X_test.append(test_data[i-seq_len:i]) y_test.append(test_data[:, 3][i]) X_test, y_test = np.array(X_test), np.array(y_test)
print('Training set shape', X_train.shape, y_train.shape)print('Validation set shape', X_val.shape, y_val.shape)print('Testing set shape' ,X_test.shape, y_test.shape)Training set shape (11550, 128, 5) (11550,)
Validation set shape (1332, 128, 5) (1332,)
Testing set shape (1331, 128, 5) (1331,)
TimeVector
class Time2Vector(Layer): def __init__(self, seq_len, **kwargs): super(Time2Vector, self).__init__() self.seq_len = seq_len
def build(self, input_shape): '''Initialize weights and biases with shape (batch, seq_len)''' self.weights_linear = self.add_weight(name='weight_linear', shape=(int(self.seq_len),), initializer='uniform', trainable=True) self.bias_linear = self.add_weight(name='bias_linear', shape=(int(self.seq_len),), initializer='uniform', trainable=True) self.weights_periodic = self.add_weight(name='weight_periodic', shape=(int(self.seq_len),), initializer='uniform', trainable=True)
self.bias_periodic = self.add_weight(name='bias_periodic', shape=(int(self.seq_len),), initializer='uniform', trainable=True)
def call(self, x): '''Calculate linear and periodic time features''' x = tf.math.reduce_mean(x[:,:,:4], axis=-1) time_linear = self.weights_linear * x + self.bias_linear # Linear time feature time_linear = tf.expand_dims(time_linear, axis=-1) # Add dimension (batch, seq_len, 1) time_periodic = tf.math.sin(tf.multiply(x, self.weights_periodic) + self.bias_periodic) time_periodic = tf.expand_dims(time_periodic, axis=-1) # Add dimension (batch, seq_len, 1) return tf.concat([time_linear, time_periodic], axis=-1) # shape = (batch, seq_len, 2) def get_config(self): # Needed for saving and loading model with custom layer config = super().get_config().copy() config.update({'seq_len': self.seq_len}) return configTransformer
class SingleAttention(Layer): def __init__(self, d_k, d_v): super(SingleAttention, self).__init__() self.d_k = d_k self.d_v = d_v
def build(self, input_shape): self.query = Dense(self.d_k, input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform') self.key = Dense(self.d_k, input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform') self.value = Dense(self.d_v, input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform')
def call(self, inputs): # inputs = (in_seq, in_seq, in_seq) q = self.query(inputs[0]) k = self.key(inputs[1])
attn_weights = tf.matmul(q, k, transpose_b=True) attn_weights = tf.map_fn(lambda x: x/np.sqrt(self.d_k), attn_weights) attn_weights = tf.nn.softmax(attn_weights, axis=-1) v = self.value(inputs[2]) attn_out = tf.matmul(attn_weights, v) return attn_out
#############################################################################
class MultiAttention(Layer): def __init__(self, d_k, d_v, n_heads): super(MultiAttention, self).__init__() self.d_k = d_k self.d_v = d_v self.n_heads = n_heads self.attn_heads = list()
def build(self, input_shape): for n in range(self.n_heads): self.attn_heads.append(SingleAttention(self.d_k, self.d_v)) # input_shape[0]=(batch, seq_len, 7), input_shape[0][-1]=7 self.linear = Dense(input_shape[0][-1], input_shape=input_shape, kernel_initializer='glorot_uniform', bias_initializer='glorot_uniform')
def call(self, inputs): attn = [self.attn_heads[i](inputs) for i in range(self.n_heads)] concat_attn = tf.concat(attn, axis=-1) multi_linear = self.linear(concat_attn) return multi_linear
#############################################################################
class TransformerEncoder(Layer): def __init__(self, d_k, d_v, n_heads, ff_dim, dropout=0.1, **kwargs): super(TransformerEncoder, self).__init__() self.d_k = d_k self.d_v = d_v self.n_heads = n_heads self.ff_dim = ff_dim self.attn_heads = list() self.dropout_rate = dropout
def build(self, input_shape): self.attn_multi = MultiAttention(self.d_k, self.d_v, self.n_heads) self.attn_dropout = Dropout(self.dropout_rate) self.attn_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6)
self.ff_conv1D_1 = Conv1D(filters=self.ff_dim, kernel_size=1, activation='relu') # input_shape[0]=(batch, seq_len, 7), input_shape[0][-1] = 7 self.ff_conv1D_2 = Conv1D(filters=input_shape[0][-1], kernel_size=1) self.ff_dropout = Dropout(self.dropout_rate) self.ff_normalize = LayerNormalization(input_shape=input_shape, epsilon=1e-6) def call(self, inputs): # inputs = (in_seq, in_seq, in_seq) attn_layer = self.attn_multi(inputs) attn_layer = self.attn_dropout(attn_layer) attn_layer = self.attn_normalize(inputs[0] + attn_layer)
ff_layer = self.ff_conv1D_1(attn_layer) ff_layer = self.ff_conv1D_2(ff_layer) ff_layer = self.ff_dropout(ff_layer) ff_layer = self.ff_normalize(inputs[0] + ff_layer) return ff_layer
def get_config(self): # Needed for saving and loading model with custom layer config = super().get_config().copy() config.update({'d_k': self.d_k, 'd_v': self.d_v, 'n_heads': self.n_heads, 'ff_dim': self.ff_dim, 'attn_heads': self.attn_heads, 'dropout_rate': self.dropout_rate}) return configModel
def create_model(): '''Initialize time and transformer layers''' time_embedding = Time2Vector(seq_len) attn_layer1 = TransformerEncoder(d_k, d_v, n_heads, ff_dim) attn_layer2 = TransformerEncoder(d_k, d_v, n_heads, ff_dim) attn_layer3 = TransformerEncoder(d_k, d_v, n_heads, ff_dim)
'''Construct model''' in_seq = Input(shape=(seq_len, 5)) x = time_embedding(in_seq) x = Concatenate(axis=-1)([in_seq, x]) x = attn_layer1((x, x, x)) x = attn_layer2((x, x, x)) x = attn_layer3((x, x, x)) x = GlobalAveragePooling1D(data_format='channels_first')(x) x = Dropout(0.1)(x) x = Dense(64, activation='relu')(x) x = Dropout(0.1)(x) out = Dense(1, activation='linear')(x)
model = Model(inputs=in_seq, outputs=out) model.compile(loss='mse', optimizer='adam', metrics=['mae', 'mape']) return model
model = create_model()model.summary()
callback = tf.keras.callbacks.ModelCheckpoint('Transformer+TimeEmbedding.hdf5', monitor='val_loss', save_best_only=True, verbose=1)
history = model.fit(X_train, y_train, batch_size=batch_size, epochs=35, callbacks=[callback], validation_data=(X_val, y_val))
model = tf.keras.models.load_model('Transformer+TimeEmbedding.hdf5', custom_objects={'Time2Vector': Time2Vector, 'SingleAttention': SingleAttention, 'MultiAttention': MultiAttention, 'TransformerEncoder': TransformerEncoder})
###############################################################################'''Calculate predictions and metrics'''
#Calculate predication for training, validation and test datatrain_pred = model.predict(X_train)val_pred = model.predict(X_val)test_pred = model.predict(X_test)
#Print evaluation metrics for all datasetstrain_eval = model.evaluate(X_train, y_train, verbose=0)val_eval = model.evaluate(X_val, y_val, verbose=0)test_eval = model.evaluate(X_test, y_test, verbose=0)print(' ')print('Evaluation metrics')print('Training Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(train_eval[0], train_eval[1], train_eval[2]))print('Validation Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(val_eval[0], val_eval[1], val_eval[2]))print('Test Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(test_eval[0], test_eval[1], test_eval[2]))
###############################################################################'''Display results'''
fig = plt.figure(figsize=(15,20))st = fig.suptitle("Transformer + TimeEmbedding Model", fontsize=22)st.set_y(0.92)
#Plot training data resultsax11 = fig.add_subplot(311)ax11.plot(train_data[:, 3], label='IBM Closing Returns')ax11.plot(np.arange(seq_len, train_pred.shape[0]+seq_len), train_pred, linewidth=3, label='Predicted IBM Closing Returns')ax11.set_title("Training Data", fontsize=18)ax11.set_xlabel('Date')ax11.set_ylabel('IBM Closing Returns')ax11.legend(loc="best", fontsize=12)
#Plot validation data resultsax21 = fig.add_subplot(312)ax21.plot(val_data[:, 3], label='IBM Closing Returns')ax21.plot(np.arange(seq_len, val_pred.shape[0]+seq_len), val_pred, linewidth=3, label='Predicted IBM Closing Returns')ax21.set_title("Validation Data", fontsize=18)ax21.set_xlabel('Date')ax21.set_ylabel('IBM Closing Returns')ax21.legend(loc="best", fontsize=12)
#Plot test data resultsax31 = fig.add_subplot(313)ax31.plot(test_data[:, 3], label='IBM Closing Returns')ax31.plot(np.arange(seq_len, test_pred.shape[0]+seq_len), test_pred, linewidth=3, label='Predicted IBM Closing Returns')ax31.set_title("Test Data", fontsize=18)ax31.set_xlabel('Date')ax31.set_ylabel('IBM Closing Returns')ax31.legend(loc="best", fontsize=12)
###############################################################################'''Calculate predictions and metrics'''
#Calculate predication for training, validation and test datatrain_pred = model.predict(X_train)val_pred = model.predict(X_val)test_pred = model.predict(X_test)
#Print evaluation metrics for all datasetstrain_eval = model.evaluate(X_train, y_train, verbose=0)val_eval = model.evaluate(X_val, y_val, verbose=0)test_eval = model.evaluate(X_test, y_test, verbose=0)print(' ')print('Evaluation metrics')print('Training Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(train_eval[0], train_eval[1], train_eval[2]))print('Validation Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(val_eval[0], val_eval[1], val_eval[2]))print('Test Data - Loss: {:.4f}, MAE: {:.4f}, MAPE: {:.4f}'.format(test_eval[0], test_eval[1], test_eval[2]))
###############################################################################'''Display results'''
fig = plt.figure(figsize=(15,20))st = fig.suptitle("Transformer + TimeEmbedding Model", fontsize=22)st.set_y(0.92)
#Plot training data resultsax11 = fig.add_subplot(311)ax11.plot(train_data[:, 3], label='IBM Closing Returns')ax11.plot(np.arange(seq_len, train_pred.shape[0]+seq_len), train_pred, linewidth=3, label='Predicted IBM Closing Returns')ax11.set_title("Training Data", fontsize=18)ax11.set_xlabel('Date')ax11.set_ylabel('IBM Closing Returns')ax11.legend(loc="best", fontsize=12)
#Plot validation data resultsax21 = fig.add_subplot(312)ax21.plot(val_data[:, 3], label='IBM Closing Returns')ax21.plot(np.arange(seq_len, val_pred.shape[0]+seq_len), val_pred, linewidth=3, label='Predicted IBM Closing Returns')ax21.set_title("Validation Data", fontsize=18)ax21.set_xlabel('Date')ax21.set_ylabel('IBM Closing Returns')ax21.legend(loc="best", fontsize=12)
#Plot test data resultsax31 = fig.add_subplot(313)ax31.plot(test_data[:, 3], label='IBM Closing Returns')ax31.plot(np.arange(seq_len, test_pred.shape[0]+seq_len), test_pred, linewidth=3, label='Predicted IBM Closing Returns')ax31.set_title("Test Data", fontsize=18)ax31.set_xlabel('Date')ax31.set_ylabel('IBM Closing Returns')ax31.legend(loc="best", fontsize=12)Model metrics
'''Display model metrics'''
fig = plt.figure(figsize=(15,20))st = fig.suptitle("Transformer + TimeEmbedding Model Metrics", fontsize=22)st.set_y(0.92)
#Plot model lossax1 = fig.add_subplot(311)ax1.plot(history.history['loss'], label='Training loss (MSE)')ax1.plot(history.history['val_loss'], label='Validation loss (MSE)')ax1.set_title("Model loss", fontsize=18)ax1.set_xlabel('Epoch')ax1.set_ylabel('Loss (MSE)')ax1.legend(loc="best", fontsize=12)
#Plot MAEax2 = fig.add_subplot(312)ax2.plot(history.history['mae'], label='Training MAE')ax2.plot(history.history['val_mae'], label='Validation MAE')ax2.set_title("Model metric - Mean average error (MAE)", fontsize=18)ax2.set_xlabel('Epoch')ax2.set_ylabel('Mean average error (MAE)')ax2.legend(loc="best", fontsize=12)
#Plot MAPEax3 = fig.add_subplot(313)ax3.plot(history.history['mape'], label='Training MAPE')ax3.plot(history.history['val_mape'], label='Validation MAPE')ax3.set_title("Model metric - Mean average percentage error (MAPE)", fontsize=18)ax3.set_xlabel('Epoch')ax3.set_ylabel('Mean average percentage error (MAPE)')ax3.legend(loc="best", fontsize=12)
Model architecture overview
tf.keras.utils.plot_model(
model,
to_file="IBM_Transformer+TimeEmbedding.png",
show_shapes=True,
show_layer_names=True,
expand_nested=True,
dpi=96,)
tf.keras.utils.plot_model(
model,
to_file="IBM_Transformer+TimeEmbedding.png",
show_shapes=True,
show_layer_names=True,
expand_nested=True,
dpi=96,)Moving Average
Moving Average – Load IBM data again, to apply rolling window
IBM_path = 'IBM.csv'
df = pd.read_csv(IBM_path, delimiter=',', usecols=['Date', 'Open', 'High', 'Low', 'Close', 'Volume'])
# Replace 0 to avoid dividing by 0 later ondf['Volume'].replace(to_replace=0, method='ffill', inplace=True) df.sort_values('Date', inplace=True)
# Apply moving average with a window of 10 days to all columnsdf[['Open', 'High', 'Low', 'Close', 'Volume']] = df[['Open', 'High', 'Low', 'Close', 'Volume']].rolling(10).mean()
# Drop all rows with NaN valuesdf.dropna(how='any', axis=0, inplace=True) df.head()
Moving Average – Plot daily IBM closing prices and volume
fig = plt.figure(figsize=(15,10))st = fig.suptitle("IBM Close Price and Volume", fontsize=20)st.set_y(0.92)
ax1 = fig.add_subplot(211)ax1.plot(df['Close'], label='IBM Close Price')ax1.set_xticks(range(0, df.shape[0], 1464))ax1.set_xticklabels(df['Date'].loc[::1464])ax1.set_ylabel('Close Price', fontsize=18)ax1.legend(loc="upper left", fontsize=12)
ax2 = fig.add_subplot(212)ax2.plot(df['Volume'], label='IBM Volume')ax2.set_xticks(range(0, df.shape[0], 1464))ax2.set_xticklabels(df['Date'].loc[::1464])ax2.set_ylabel('Volume', fontsize=18)ax2.legend(loc="upper left", fontsize=12)
本文内容仅仅是技术探讨和学习,并不构成任何投资建议。
Be First to Comment