Source code for nabqr.functions

"""Neural Adaptive Basis Quantile Regression (NABQR) Core Functions

This module provides the core functionality for NABQR.

This module includes:
- Scoring metrics (Variogram, CRPS, QSS)
- Dataset creation and preprocessing
- Model definitions and training
- TAQR (Time-Adaptive Quantile Regression) implementation
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import properscoring as ps
import tensorflow as tf
import tensorflow_probability as tfp
import datetime as dt
from .helper_functions import set_n_closest_to_zero
from .functions_for_TAQR import *


[docs] def variogram_score_single_observation(x, y, p=0.5): """Calculate the Variogram score for a given observation. Translated from the R code in Energy and AI paper: "An introduction to multivariate probabilistic forecast evaluation" by Mathias B.B. et al. Parameters ---------- x : numpy.ndarray Ensemble forecast (m x k), where m is ensemble size, k is forecast horizon y : numpy.ndarray Actual observations (k,) p : float, optional Power parameter for the variogram score, by default 0.5 Returns ------- float Variogram score for the observation """ m, k = x.shape score = 0 for i in range(k - 1): for j in range(i + 1, k): Ediff = (1 / m) * np.sum(np.abs(x[:, i] - x[:, j]) ** p) score += (1 / np.abs(i - j)) * (np.abs(y[i] - y[j]) ** p - Ediff) ** 2 return score / k
[docs] def variogram_score_R_multivariate(x, y, p=0.5, t1=12, t2=36): """Calculate the Variogram score for all observations for the time horizon t1 to t2. Modified from the R code in Energy and AI paper: "An introduction to multivariate probabilistic forecast evaluation" by Mathias B.B. et al. Here we use t1 -> t2 as our forecast horizon. Parameters ---------- x : numpy.ndarray Ensemble forecast (m x k) y : numpy.ndarray Actual observations (k,) p : float, optional Power parameter, by default 0.5 t1 : int, optional Start hour (inclusive), by default 12 t2 : int, optional End hour (exclusive), by default 36 Returns ------- tuple (score, score_list) Overall score and list of individual scores """ m, k = x.shape score = 0 if m > k: x = x.T m, k = k, m score_list = [] for start in range(0, k, 24): if start + t2 <= k: for i in range(start + t1, start + t2 - 1): for j in range(i + 1, start + t2): Ediff = (1 / m) * np.sum(np.abs(x[:, i] - x[:, j]) ** p) score += (1 / np.abs(i - j)) * ( np.abs(y[i] - y[j]) ** p - Ediff ) ** 2 score_list.append(score) return score / (100_000), score_list
[docs] def variogram_score_R_v2(x, y, p=0.5, t1=12, t2=36): """ Calculate the Variogram score for all observations for the time horizon t1 to t2. Modified from the paper in Energy and AI, >> An introduction to multivariate probabilistic forecast evaluation <<. Assumes that x and y starts from day 0, 00:00. Parameters: x : array Ensemble forecast (m x k), where m is the size of the ensemble, and k is the maximal forecast horizon. y : array Actual observations (k,) p : float Power parameter for the variogram score. t1 : int Start of the hour range for comparison (inclusive). t2 : int End of the hour range for comparison (exclusive). Returns: -------- tuple (score, score_list) Overall score/100_000 and list of individual VarS contributions """ m, k = x.shape # Size of ensemble, Maximal forecast horizon score = 0 if m > k: x = x.T m, k = k, m else: print("m,k: ", m, k) score_list = [] # Iterate through every 24-hour block for start in range(0, k, 24): # Ensure we don't exceed the forecast horizon if start + t2 <= k: for i in range(start + t1, start + t2 - 1): for j in range(i + 1, start + t2): Ediff = (1 / m) * np.sum(np.abs(x[:, i] - x[:, j]) ** p) score += (1 / np.abs(i - j)) * ( np.abs(y[i] - y[j]) ** p - Ediff ) ** 2 score_list.append(score) # Variogram score return score / (100_000), score_list
[docs] def calculate_crps(actuals, corrected_ensembles): """Calculate the Continuous Ranked Probability Score (CRPS) using the properscoring package. If the ensembles do not have the correct dimensions, we transpose them. Parameters ---------- actuals : numpy.ndarray Actual observations corrected_ensembles : numpy.ndarray Ensemble forecasts Returns ------- float Mean CRPS score """ try: crps = ps.crps_ensemble(actuals, corrected_ensembles) return np.mean(crps) except: crps = np.mean(ps.crps_ensemble(actuals, corrected_ensembles.T)) return crps
[docs] def calculate_qss(actuals, taqr_results, quantiles): """Calculate the Quantile Skill Score (QSS). Parameters ---------- actuals : numpy.ndarray Actual observations taqr_results : numpy.ndarray TAQR ensemble forecasts quantiles : array-like Quantile levels to evaluate Returns ------- float Quantile Skill Score """ qss_scores = multi_quantile_skill_score(actuals, taqr_results, quantiles) table = pd.DataFrame({ "Quantiles": quantiles, "QSS NABQR": qss_scores }) print(table) return np.mean(qss_scores)
[docs] def multi_quantile_skill_score(y_true, y_pred, quantiles): """Calculate the Quantile Skill Score (QSS) for multiple quantile forecasts. Parameters ---------- y_true : numpy.ndarray True observed values y_pred : numpy.ndarray Predicted quantile values quantiles : list Quantile levels between 0 and 1 Returns ------- numpy.ndarray QSS for each quantile forecast """ y_pred = np.array(y_pred) if y_pred.shape[0] > y_pred.shape[1]: y_pred = y_pred.T assert all(0 <= q <= 1 for q in quantiles), "All quantiles must be between 0 and 1" assert len(quantiles) == len( y_pred ), "Number of quantiles must match inner dimension of y_pred" N = len(y_true) scores = np.zeros(len(quantiles)) for i, q in enumerate(quantiles): E = y_true - y_pred[i] scores[i] = np.sum(np.where(E > 0, q * E, (1 - q) * -E)) return scores / N
def reliability_func( quantile_forecasts, corrected_ensembles, ensembles, actuals, corrected_taqr_quantiles, data_source, plot_reliability=True, ): n = len(actuals) # Handling hpe hpe = ensembles[:, 0] hpe_quantile = 0.5 ensembles = ensembles[:, 1:] quantiles_ensembles = np.arange(2, 100, 2) / 100 quantiles_corrected_ensembles = np.linspace( 0.05, 0.95, corrected_ensembles.shape[1] ).round(3) # Ensuring that we are working with numpy arrays quantile_forecasts = ( np.array(quantile_forecasts) if type(quantile_forecasts) != np.ndarray else quantile_forecasts ) actuals = np.array(actuals) if type(actuals) != np.ndarray else actuals actuals_ensembles = actuals.copy() corrected_taqr_quantiles = ( np.array(corrected_taqr_quantiles) if type(corrected_taqr_quantiles) != np.ndarray else corrected_taqr_quantiles ) corrected_ensembles = ( np.array(corrected_ensembles) if type(corrected_ensembles) != np.ndarray else corrected_ensembles ) m, n1 = quantile_forecasts.shape if m != len(actuals): quantile_forecasts = quantile_forecasts.T m, n1 = quantile_forecasts.shape # Ensure that the length match up if len(actuals) != len(quantile_forecasts): if len(actuals) < len(quantile_forecasts): quantile_forecasts = quantile_forecasts[: len(actuals)] else: actuals_taqr = actuals[-len(quantile_forecasts) :] if len(actuals) != len(corrected_ensembles): if len(actuals) < len(corrected_ensembles): corrected_ensembles = corrected_ensembles[: len(actuals)] else: actuals_taqr = actuals[-len(corrected_ensembles) :] # Reliability: how often actuals are below the given quantiles compared to the quantile levels reliability_points_taqr = [] for i, q in enumerate(corrected_taqr_quantiles): forecast = quantile_forecasts[:, i] observed_below = np.sum(actuals_taqr <= forecast) / n reliability_points_taqr.append(observed_below) reliability_points_taqr = np.array(reliability_points_taqr) reliability_points_ensembles = [] n_ensembles = len(actuals_ensembles) for i, q in enumerate(quantiles_ensembles): forecast = ensembles[:, i] observed_below = np.sum(actuals_ensembles <= forecast) / n_ensembles reliability_points_ensembles.append(observed_below) reliability_points_corrected_ensembles = [] for i, q in enumerate(quantiles_corrected_ensembles): forecast = corrected_ensembles[:, i] observed_below = np.sum(actuals_ensembles <= forecast) / n_ensembles reliability_points_corrected_ensembles.append(observed_below) # Handle hpe separately observed_below_hpe = np.sum(actuals_ensembles <= hpe) / n_ensembles reliability_points_ensembles = np.array(reliability_points_ensembles) # Index of the 0.5 quantile idx_05 = np.where(corrected_taqr_quantiles == 0.5)[0][0] # Plotting the reliability plot if plot_reliability: import scienceplots with plt.style.context("no-latex"): plt.figure(figsize=(6, 6)) plt.plot( [0, 1], [0, 1], "k--", label="Perfect Reliability" ) # Diagonal line plt.scatter( corrected_taqr_quantiles, reliability_points_taqr, color="blue", label="Reliability CorrectedTAQR", ) plt.scatter( quantiles_ensembles, reliability_points_ensembles, color="grey", label="Reliability Original Ensembles", marker="p", alpha=0.5, ) plt.scatter( quantiles_corrected_ensembles, reliability_points_corrected_ensembles, color="green", label="Reliability Corrected Ensembles", marker="p", alpha=0.5, ) plt.scatter( hpe_quantile, observed_below_hpe, color="grey", label="Reliability HPE", alpha=0.5, marker="D", s=25, ) plt.xlabel("Nominal Quantiles") plt.ylabel("Observed Frequencies") plt.title( f'Reliability Plot for {data_source.replace("_", " ").replace("lstm", "")}' ) plt.legend() plt.grid(True) plt.savefig(f"reliability_plots/nolatex_reliability_plot_{data_source}.pdf") plt.show() return ( reliability_points_taqr, reliability_points_ensembles, reliability_points_corrected_ensembles, )
[docs] def calculate_scores( actuals, taqr_results, raw_ensembles, corrected_ensembles, quantiles_taqr, data_source, plot_reliability=True, visualize = True ): """Calculate Variogram, CRPS, QSS and MAE for the predictions and corrected ensembles. Parameters ---------- actuals : numpy.ndarray The actual values predictions : numpy.ndarray The predicted values raw_ensembles : numpy.ndarray The raw ensembles corrected_ensembles : numpy.ndarray The corrected ensembles quantiles : list The quantiles to calculate the scores for data_source : str The data source """ # Find common index common_index = corrected_ensembles.index.intersection(actuals.index) ensembles_CE_index = raw_ensembles.loc[common_index] actuals_comp = actuals.loc[common_index] variogram_score_raw_v2, _ = variogram_score_R_v2( ensembles_CE_index.loc[actuals_comp.index].values, actuals_comp.values ) variogram_score_raw_corrected_v2, _ = variogram_score_R_v2( corrected_ensembles.loc[actuals_comp.index].values, actuals_comp.values ) variogram_score_corrected_taqr_v2, _ = variogram_score_R_v2( taqr_results.values, actuals_comp.values ) qs_raw = calculate_qss( actuals_comp.values, ensembles_CE_index.loc[actuals_comp.index].T, np.linspace(0.05, 0.95, ensembles_CE_index.shape[1]), ) qs_corr = calculate_qss( actuals_comp.values, corrected_ensembles.loc[actuals_comp.index].T, np.linspace(0.05, 0.95, corrected_ensembles.shape[1]), ) # TODO: Should be done with max and min from the training set. taqr_values_clipped = np.clip(taqr_results, 0, max(actuals_comp.values)) qs_corrected_taqr = calculate_qss( actuals_comp.values, taqr_values_clipped, quantiles_taqr ) crps_orig_ensembles = calculate_crps( actuals_comp.values.flatten(), ensembles_CE_index.loc[actuals_comp.index].T ) crps_corr_ensembles = calculate_crps( actuals_comp.values.flatten(), corrected_ensembles.loc[actuals_comp.index].T ) crps_corrected_taqr = calculate_crps( actuals_comp.values.flatten(), np.array(taqr_results) ) # Instead of calculating mean value of ensembles, we just use the median MAE_raw_ensembles = np.abs( np.median(ensembles_CE_index.loc[actuals_comp.index].values, axis=1) - actuals_comp.values ) MAE_corr_ensembles = np.abs( np.median(corrected_ensembles.loc[actuals_comp.index].values, axis=1) - actuals_comp.values ) MAE_corrected_taqr = np.abs( (np.median(np.array(taqr_results), axis=1) - actuals_comp.values) ) scores_data = { "Metric": ["MAE", "CRPS", "Variogram", "QS"], "Original Ensembles": [ np.mean(MAE_raw_ensembles), crps_orig_ensembles, variogram_score_raw_v2, np.mean(qs_raw), ], "Corrected Ensembles": [ np.mean(MAE_corr_ensembles), crps_corr_ensembles, variogram_score_raw_corrected_v2, np.mean(qs_corr), ], "NABQR": [ np.mean(MAE_corrected_taqr), crps_corrected_taqr, variogram_score_corrected_taqr_v2, np.mean(qs_corrected_taqr), ], } scores_df = pd.DataFrame(scores_data).T # Calculate relative scores scores_data["Corrected Ensembles"] = [ 1 + (x - scores_data["Original Ensembles"][i]) / scores_data["Original Ensembles"][i] for i, x in enumerate(scores_data["Corrected Ensembles"]) ] scores_data["NABQR"] = [ 1 + (x - scores_data["Original Ensembles"][i]) / scores_data["Original Ensembles"][i] for i, x in enumerate(scores_data["NABQR"]) ] # Create DataFrame scores_df = pd.DataFrame(scores_data).T print("Scores: ") print(scores_df) # Print LaTeX table latex_output = scores_df.to_latex( column_format="lcccc", header=True, float_format="%.3f", caption=f"Performance Metrics for Different Ensemble Methods on {data_source}", label="tab:performance_metrics", escape=False, ).replace("& 0 & 1 & 2 & 3 \\\\\n\\midrule\n", "") with open(f"latex_output_{data_source}_scores.tex", "w") as f: f.write(latex_output) # Reliability plot reliability_func( taqr_results, corrected_ensembles, raw_ensembles, actuals, quantiles_taqr, data_source, plot_reliability = visualize, ) return scores_df
[docs] def run_r_script(X_filename, Y_filename, tau): """Run R script for quantile regression. Parameters ---------- X_filename : str Path to X data CSV file Y_filename : str Path to Y data CSV file tau : float Quantile level """ import subprocess process = subprocess.Popen( ["R", "--vanilla"], stdin=subprocess.PIPE, stdout=subprocess.PIPE ) r_script = f""" options(warn = -1) # library(onlineforecast) library(quantreg) library(readr) library(SparseM) X_full <- read_csv("{X_filename}", col_names = FALSE, show_col_types = FALSE) y <- read_csv("{Y_filename}", col_names = "y", show_col_types = FALSE) X_full <- X_full[1:500,] data <- cbind(X_full, y[1:500,1]) predictor_cols <- colnames(X_full) formula_string <- paste("y ~ 0+", paste(predictor_cols, collapse = " + ")) formula <- as.formula(formula_string) rq_fit <- rq(formula, tau = {tau}, data = data ) write.csv(rq_fit$coefficients, "rq_fit_coefficients.csv") write.csv(rq_fit$residuals, "rq_fit_residuals.csv") """ for line in r_script.strip().split("\n"): process.stdin.write(line.encode("utf-8") + b"\n") process.stdin.close() process.stdout.read() process.terminate()
[docs] def remove_zero_columns(df): """Wrapper function to remove columns that contain only zeros from a DataFrame. Parameters ---------- df : pandas.DataFrame Input DataFrame Returns ------- pandas.DataFrame DataFrame with zero columns removed """ return df.loc[:, (df != 0).any(axis=0)]
[docs] def remove_zero_columns_numpy(arr): """Remove columns that contain only zeros or constant values from a numpy array. Parameters ---------- arr : numpy.ndarray Input array Returns ------- numpy.ndarray Array with zero/constant columns removed """ return arr[:, (arr != 0).any(axis=0) & (arr != arr[0]).any(axis=0)]
[docs] def create_dataset_for_lstm(X, Y, time_steps): """Create a dataset suitable for LSTM training with multiple time steps (i.e. lags). Parameters ---------- X : numpy.ndarray Input features Y : numpy.ndarray Target values time_steps : list List of time steps to include Returns ------- tuple (X_lstm, Y_lstm) LSTM-ready datasets """ X = np.array(X) Y = np.array(Y) Xs, Ys = [], [] for i in range(len(X)): X_entry = [] for ts in time_steps: if i - ts >= 0: X_entry.append(X[i - ts, :]) else: X_entry.append(np.zeros_like(X[0, :])) Xs.append(np.array(X_entry)) Ys.append(Y[i]) return np.array(Xs), np.array(Ys)
[docs] class QuantileRegressionLSTM(tf.keras.Model): """LSTM-based model for quantile regression. Input: x -> LSTM -> Dense -> Dense -> output Parameters ---------- n_quantiles : int Number of quantiles to predict units : int Number of LSTM units n_timesteps : int Number of time steps in input """ def __init__(self, n_quantiles, units, n_timesteps, **kwargs): super().__init__(**kwargs) self.lstm = tf.keras.layers.LSTM( units, input_shape=(None, n_quantiles, n_timesteps), return_sequences=False ) self.dense = tf.keras.layers.Dense(n_quantiles, activation="sigmoid") self.dense2 = tf.keras.layers.Dense(n_quantiles, activation="relu") self.n_quantiles = n_quantiles self.n_timesteps = n_timesteps
[docs] def call(self, inputs, training=None): """Forward pass of the model. Parameters ---------- inputs : tensorflow.Tensor Input tensor training : bool, optional Whether in training mode, by default None Returns ------- tensorflow.Tensor Model output """ x = self.lstm(inputs, training=training) x = self.dense(x) x = self.dense2(x) return x
[docs] def get_config(self): """Get model configuration. Returns ------- dict Model configuration """ config = super(QuantileRegressionLSTM, self).get_config() config.update( { "n_quantiles": self.n_quantiles, "units": self.lstm.units, "n_timesteps": self.n_timesteps, } ) return config
[docs] @classmethod def from_config(cls, config): """Create model from configuration. Parameters ---------- config : dict Model configuration Returns ------- QuantileRegressionLSTM Model instance """ return cls(**config)
[docs] def quantile_loss_3(q, y_true, y_pred): """Calculate quantile loss for a single quantile. Parameters ---------- q : float Quantile level y_true : tensorflow.Tensor True values y_pred : tensorflow.Tensor Predicted values Returns ------- tensorflow.Tensor Quantile loss value """ y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) y_true = tfp.stats.percentile(y_true, 100 * q, axis=1) error = y_true - y_pred return tf.maximum(q * error, (q - 1) * error)
[docs] def quantile_loss_func(quantiles): """Create a loss function for multiple quantiles. Parameters ---------- quantiles : list List of quantile levels Returns ------- function Loss function for multiple quantiles """ def loss(y_true, y_pred): """Calculate the loss for given true and predicted values. Parameters ---------- y_true : tensorflow.Tensor True values y_pred : tensorflow.Tensor Predicted values Returns ------- tensorflow.Tensor Combined loss value for all quantiles """ losses = [] for i, q in enumerate(quantiles): loss = quantile_loss_3(q, y_true, y_pred[:, i]) losses.append(loss) return tf.reduce_mean(tf.stack(losses)) return loss
[docs] def map_range(values, input_start, input_end, output_start, output_end): """Map values from one range to another. Parameters ---------- values : list Values to map input_start : float Start of input range input_end : float End of input range output_start : float Start of output range output_end : float End of output range Returns ------- numpy.ndarray Mapped values """ mapped_values = [] for value in values: proportion = (value - input_start) / (input_end - input_start) mapped_value = output_start + (proportion * (output_end - output_start)) mapped_values.append(int(mapped_value)) return np.array(mapped_values)
[docs] def legend_without_duplicate_labels(ax): """Create a legend without duplicate labels. Primarily used for ensemble plots. Parameters ---------- ax : matplotlib.axes.Axes Axes object to create legend for """ handles, labels = ax.get_legend_handles_labels() unique = [ (h, l) for i, (h, l) in enumerate(zip(handles, labels)) if l not in labels[:i] ] ax.legend(*zip(*unique))
[docs] def remove_straight_line_outliers(ensembles): """Remove ensemble members that are perfectly straight lines (constant slope). Explanation: Sometimes the output from the LSTM is a straight line, which is not useful for the ensemble. Parameters ---------- ensembles : numpy.ndarray 2D array where rows are time steps and columns are ensemble members Returns ------- numpy.ndarray Filtered ensemble data without straight-line outliers """ # Calculate differences along the time axis differences = np.diff(ensembles, axis=0) # Identify columns where all differences are the same (perfectly straight lines) straight_line_mask = np.all(differences == differences[0, :], axis=0) # Remove the columns with perfectly straight lines return ensembles[:, ~straight_line_mask]
[docs] def train_model_lstm( quantiles, epochs: int, lr: float, batch_size: int, x, y, x_val, y_val, n_timesteps, data_name, ): """Train LSTM model for quantile regression. The @tf.function decorator is used to speed up the training process. Parameters ---------- quantiles : list List of quantile levels to predict epochs : int Number of training epochs lr : float Learning rate for optimizer batch_size : int Batch size for training x : tensor Training input data y : tensor Training target data x_val : tensor Validation input data y_val : tensor Validation target data n_timesteps : int Number of time steps in input sequence data_name : str Name identifier for saving model artifacts Returns ------- tf.keras.Model Trained LSTM model """ model = QuantileRegressionLSTM( n_quantiles=len(quantiles), units=256, n_timesteps=n_timesteps ) optimizer = tf.keras.optimizers.Adam(learning_rate=lr) @tf.function def train_step(x_batch, y_batch): with tf.GradientTape() as tape: y_pred = model(x_batch, training=True) losses = quantile_loss_func(quantiles)(y_batch, y_pred) total_loss = tf.reduce_mean(losses) grads = tape.gradient(total_loss, model.trainable_variables) optimizer.apply_gradients(zip(grads, model.trainable_variables)) return total_loss @tf.function def val_step(x_batch, y_batch): y_pred = model(x_batch, training=False) losses = quantile_loss_func(quantiles)(y_batch, y_pred) total_loss = tf.reduce_mean(losses) return total_loss train_loss_history = [] val_loss_history = [] y_preds = [] y_true = [] for epoch in range(epochs): epoch_train_loss = 0.0 epoch_val_loss = 0.0 num_batches = 0 # Training loop for i in range(0, len(x), batch_size): x_batch = x[i : i + batch_size] y_batch = y[i : i + batch_size] batch_train_loss = train_step(x_batch, y_batch) epoch_train_loss += batch_train_loss num_batches += 1 y_preds.append(model(x_batch, training=False)) y_true.append(y_batch) epoch_train_loss /= num_batches train_loss_history.append(epoch_train_loss) # Validation loop num_val_batches = 0 for i in range(0, len(x_val), batch_size): x_val_batch = x_val[i : i + batch_size] y_val_batch = y_val[i : i + batch_size] batch_val_loss = val_step(x_val_batch, y_val_batch) epoch_val_loss += batch_val_loss num_val_batches += 1 epoch_val_loss /= num_val_batches val_loss_history.append(epoch_val_loss) print( f"Epoch {epoch+1} Train Loss: {epoch_train_loss:.4f} Validation Loss: {epoch_val_loss:.4f}" ) y_preds_concat = tf.concat(y_preds, axis=0).numpy() y_true_concat = tf.concat(y_true, axis=0).numpy() return model
[docs] def one_step_quantile_prediction( X_input, Y_input, n_init, n_full, quantile=0.5, already_correct_size=False, n_in_X=5000, print_output=True, ): """Perform one-step quantile prediction using TAQR. This function takes the entire training set and, based on the last n_init observations, calculates residuals and coefficients for the quantile regression. An easy wrapper function to run TAQR. Parameters ---------- X_input : numpy.ndarray or pd.DataFrame Input features Y_input : numpy.ndarray or pd.Series Target values n_init : int Number of initial observations for warm start n_full : int Total number of observations to process quantile : float, optional Quantile level for prediction, by default 0.5 already_correct_size : bool, optional Whether input data is already correctly sized, by default False n_in_X : int, optional Number of observations to include in design matrix, by default 5000 Returns ------- tuple (predictions, actual values, coefficients) """ assert n_init <= n_full - 2, "n_init must be less two greater than n_full" if type(X_input) == pd.DataFrame: X_input = X_input.to_numpy() if type(Y_input) == pd.Series or type(Y_input) == pd.DataFrame: Y_input = Y_input.to_numpy() n, m = X_input.shape if print_output: print("X_input shape: ", X_input.shape) full_length, p = X_input.shape X = X_input[:n_full, :].copy() Y = Y_input[:n_full] X_for_residuals = X[:n_init, :] Y_for_residuals = Y[:n_init] np.savetxt("X_for_residuals.csv", X_for_residuals, delimiter=",") np.savetxt("Y_for_residuals.csv", Y_for_residuals, delimiter=",") run_r_script("X_for_residuals.csv", "Y_for_residuals.csv", tau=quantile) def ignore_first_column(s): return float(s) residuals = np.genfromtxt( "rq_fit_residuals.csv", delimiter=",", skip_header=1, usecols=(1,), converters={0: ignore_first_column}, ) beta_init = np.genfromtxt( "rq_fit_coefficients.csv", delimiter=",", skip_header=1, usecols=(1,), converters={0: ignore_first_column}, ) if print_output: print("len of beta_init: ", len(beta_init)) print( "There is: ", sum(residuals == 0), "zeros in residuals", "and", sum(abs(residuals) < 1e-8), "close to zeroes", ) print("p: ", p) if len(beta_init) < p: beta_init = np.append(beta_init, np.ones(p - len(beta_init))) else: beta_init = beta_init[:p] r_init = set_n_closest_to_zero(arr=residuals, n=len(beta_init)) if print_output: print(sum(r_init == 0), "r_init zeros") X_full = np.column_stack((X, Y, np.random.choice([1, 1], size=n_full))) IX = np.arange(p) Iy = p Iex = p + 1 bins = np.array([-np.inf, np.inf]) tau = quantile n_in_bin = int(1.0 * full_length) if print_output: print("n_in_bin", n_in_bin) n_input = n_in_X N, BETA, GAIN, Ld, Rny, Mx, Re, CON1, T = rq_simplex_final( X_full, IX, Iy, Iex, r_init, beta_init, n_input, tau, bins, n_in_bin ) y_pred = np.sum((X_input[(n_input + 2) : (n_full), :] * BETA[1:, :]), axis=1) y_actual = Y_input[(n_input) : (n_full - 2)] if print_output: print("y_pred shape", y_pred.shape) print("y_actual shape", y_actual.shape) y_actual_quantile = np.quantile(y_actual, quantile) return y_pred, y_actual, BETA
[docs] def run_taqr(corrected_ensembles, actuals, quantiles, n_init, n_full, n_in_X): """Wrapper function to run TAQR on corrected ensembles. Parameters ---------- corrected_ensembles : numpy.ndarray Shape (n_timesteps, n_ensembles) actuals : numpy.ndarray Shape (n_timesteps,) quantiles : list Quantiles to predict n_init : int Number of initial timesteps for warm start n_full : int Total number of timesteps n_in_X : int Number of timesteps in design matrix Returns ------- list TAQR results for each quantile """ if type(actuals) == pd.Series or type(actuals) == pd.DataFrame: actuals = actuals.to_numpy() actuals[np.isnan(actuals)] = 0 else: actuals[np.isnan(actuals)] = 0 taqr_results = [] actuals_output = [] BETA_output = [] for q in quantiles: print("Running TAQR for quantile: ", q) y_pred, y_actuals, BETA_q = one_step_quantile_prediction( corrected_ensembles, actuals, n_init=n_init, n_full=n_full, quantile=q, already_correct_size=True, n_in_X=n_in_X, ) taqr_results.append(y_pred) actuals_output.append(y_actuals) BETA_output.append(BETA_q) return taqr_results, actuals_output[1], BETA_output
[docs] def reliability_func( quantile_forecasts, corrected_ensembles, ensembles, actuals, corrected_taqr_quantiles, data_source, plot_reliability=True, ): n = len(actuals) # Ensuring that we are working with numpy arrays quantile_forecasts = ( np.array(quantile_forecasts) if type(quantile_forecasts) != np.ndarray else quantile_forecasts ) actuals = np.array(actuals) if type(actuals) != np.ndarray else actuals actuals_ensembles = actuals.copy() actuals_taqr = actuals.copy() corrected_taqr_quantiles = ( np.array(corrected_taqr_quantiles) if type(corrected_taqr_quantiles) != np.ndarray else corrected_taqr_quantiles ) corrected_ensembles = ( np.array(corrected_ensembles) if type(corrected_ensembles) != np.ndarray else corrected_ensembles ) ensembles = np.array(ensembles) if type(ensembles) != np.ndarray else ensembles # Handling hpe (high probability ensemble) hpe = ensembles[:, 0] hpe_quantile = 0.5 ensembles = ensembles[:, 1:] quantiles_ensembles = np.linspace(0.05, 0.95, ensembles.shape[1]).round(3) quantiles_corrected_ensembles = np.linspace( 0.05, 0.95, corrected_ensembles.shape[1] ).round(3) m, n1 = quantile_forecasts.shape if m != len(actuals): quantile_forecasts = quantile_forecasts.T m, n1 = quantile_forecasts.shape # Ensure that the length match up if len(actuals) != len(quantile_forecasts): if len(actuals) < len(quantile_forecasts): quantile_forecasts = quantile_forecasts[: len(actuals)] else: actuals_taqr = actuals[-len(quantile_forecasts) :] if len(actuals) != len(corrected_ensembles): if len(actuals) < len(corrected_ensembles): corrected_ensembles = corrected_ensembles[: len(actuals)] else: actuals_taqr = actuals[-len(corrected_ensembles) :] if len(actuals) != len(ensembles): if len(actuals) < len(ensembles): ensembles = ensembles[: len(actuals)] hpe = hpe[: len(actuals)] else: actuals_taqr = actuals[-len(ensembles) :] hpe = hpe[-len(ensembles) :] # Reliability: how often actuals are below the given quantiles compared to the quantile levels reliability_points_taqr = [] for i, q in enumerate(corrected_taqr_quantiles): forecast = quantile_forecasts[:, i] observed_below = np.sum(actuals_taqr <= forecast) / n reliability_points_taqr.append(observed_below) reliability_points_taqr = np.array(reliability_points_taqr) reliability_points_ensembles = [] n_ensembles = len(actuals_ensembles) for i, q in enumerate(quantiles_ensembles): forecast = ensembles[:, i] observed_below = np.sum(actuals_ensembles <= forecast) / n_ensembles reliability_points_ensembles.append(observed_below) reliability_points_corrected_ensembles = [] for i, q in enumerate(quantiles_corrected_ensembles): forecast = corrected_ensembles[:, i] observed_below = np.sum(actuals_ensembles <= forecast) / n_ensembles reliability_points_corrected_ensembles.append(observed_below) # Handle hpe separately observed_below_hpe = np.sum(actuals_ensembles <= hpe) / n_ensembles reliability_points_ensembles = np.array(reliability_points_ensembles) # Find the index of the 0.5 quantile idx_05 = np.where(corrected_taqr_quantiles == 0.5)[0][0] if plot_reliability: import scienceplots with plt.style.context("no-latex"): # Plot reliability: nominal quantiles vs calculated quantiles plt.figure(figsize=(6, 6)) plt.plot( [0, 1], [0, 1], "k--", label="Perfect Reliability" ) # Diagonal line plt.scatter( corrected_taqr_quantiles, reliability_points_taqr, color="blue", label="NABQR", ) plt.scatter( quantiles_ensembles, reliability_points_ensembles, color="grey", label="Original Ensembles", marker="p", alpha=0.5, ) plt.scatter( quantiles_corrected_ensembles, reliability_points_corrected_ensembles, color="green", label="Corrected Ensembles", marker="p", alpha=0.5, ) plt.scatter( hpe_quantile, observed_below_hpe, color="grey", label="High Prob. Ensemble", alpha=0.5, marker="D", s=25, ) plt.xlabel("Nominal Quantiles") plt.ylabel("Observed Frequencies") plt.title( f'Reliability Plot for {data_source.replace("_", " ").replace("lstm", "")}' ) plt.legend() plt.grid(True) plt.savefig(f"reliability_plot_{data_source}.pdf") plt.show() return ( reliability_points_taqr, reliability_points_ensembles, reliability_points_corrected_ensembles, )
[docs] def pipeline( X, y, name="TEST", training_size=0.8, epochs=100, timesteps_for_lstm=[0, 1, 2, 6, 12, 24, 48], **kwargs, ): """Main pipeline for NABQR model training and evaluation. The pipeline: 1. Trains an LSTM network to correct the provided ensembles 2. Runs TAQR algorithm on corrected ensembles to predict observations 3. Saves results and model artifacts Parameters ---------- X : pd.DataFrame or numpy.ndarray Shape (n_samples, n_features) - Ensemble data y : pd.Series or numpy.ndarray Shape (n_samples,) - Observations name : str, optional Dataset identifier, by default "TEST" training_size : float, optional Fraction of data to use for training, by default 0.8 epochs : int, optional Number of training epochs, by default 100 timesteps_for_lstm : list, optional Time steps to use for LSTM input, by default [0, 1, 2, 6, 12, 24, 48] **kwargs : dict Additional keyword arguments Returns ------- tuple A tuple containing: - corrected_ensembles: pd.DataFrame The corrected ensemble predictions. - taqr_results: list of numpy.ndarray The TAQR results. - actuals_output: list of numpy.ndarray The actual output values. - BETA_output: list of numpy.ndarray The BETA parameters. """ # Data preparation #. // TODO: check, that this loading works for npy and pd,... seems fine as of now, 5th feb 2025 actuals = y ensembles = X if isinstance(y, pd.Series): idx = y.index elif isinstance(X, pd.DataFrame): idx = X.index else: idx = pd.RangeIndex(start=0, stop=len(y), step=1) if isinstance(y, np.ndarray): X_y = np.concatenate((X, y.reshape(-1, 1)), axis=1) y = pd.Series(y, index=idx) else: X_y = np.concatenate((X, y.values.reshape(-1, 1)), axis=1) y = pd.Series(y.values.flatten(), index=idx) train_size = int(training_size * len(actuals)) ensembles = pd.DataFrame(ensembles, index=idx) # ensembles.index = pd.to_datetime(ensembles.index, utc=False).tz_localize(None) actuals = pd.DataFrame(actuals, index=idx) # actuals.index = pd.to_datetime(actuals.index, utc=False).tz_localize(None) common_index = ensembles.index.intersection(actuals.index) X_y = pd.DataFrame(X_y, index=idx) # X_y.index = pd.to_datetime(X_y.index, utc=False).tz_localize(None) ensembles = ensembles.loc[common_index] actuals = actuals.loc[common_index] X_y = X_y.loc[common_index] timesteps = timesteps_for_lstm Xs, X_Ys = create_dataset_for_lstm(ensembles, X_y, timesteps_for_lstm) # Handle NaN values if np.isnan(Xs).any(): print("Xs has NaNs") Xs[np.isnan(Xs).any(axis=(1, 2))] = 0 if np.isnan(X_Ys).any(): print("X_Ys has NaNs") X_Ys[np.isnan(X_Ys).any(axis=1)] = 0 # Data standardization XY_s_max_train = np.max(X_Ys[:train_size]) XY_s_min_train = np.min(X_Ys[:train_size]) X_Ys_scaled_train = (X_Ys[:train_size] - XY_s_min_train) / ( XY_s_max_train - XY_s_min_train ) Xs_scaled_train = (Xs[:train_size] - XY_s_min_train) / ( XY_s_max_train - XY_s_min_train ) validation_size = 100 X_Ys_scaled_validation = ( X_Ys[train_size : (train_size + validation_size)] - XY_s_min_train ) / (XY_s_max_train - XY_s_min_train) Xs_scaled_validation = ( Xs[train_size : (train_size + validation_size)] - XY_s_min_train ) / (XY_s_max_train - XY_s_min_train) # Train LSTM model quantiles_lstm = np.linspace(0.05, 0.95, 20) model = train_model_lstm( quantiles=quantiles_lstm, epochs=epochs, lr=1e-3, batch_size=50, x=tf.convert_to_tensor(Xs_scaled_train), y=tf.convert_to_tensor(X_Ys_scaled_train), x_val=tf.convert_to_tensor(Xs_scaled_validation), y_val=tf.convert_to_tensor(X_Ys_scaled_validation), n_timesteps=timesteps, data_name=f"{name}_LSTM_epochs_{epochs}", ) save_files = kwargs.get("save_files", True) if save_files: # Save model try: today = dt.datetime.today().strftime("%Y-%m-%d") model.save(f"Model_{name}_{epochs}_{today}.keras") except: model.save(f"Models_{name}_{epochs}.keras") # Generate predictions Xs_scaled_test = (Xs[train_size:] - XY_s_min_train) / ( XY_s_max_train - XY_s_min_train ) corrected_ensembles = model(Xs_scaled_test) corrected_ensembles = ( corrected_ensembles * (XY_s_max_train - XY_s_min_train) + XY_s_min_train ) actuals_out_of_sample = actuals[train_size:] test_idx = idx[train_size:] # Run TAQR quantiles_taqr = kwargs.get("quantiles_taqr", [0.1, 0.3, 0.5, 0.7, 0.9]) n_full = len(actuals_out_of_sample) n_init = int(0.25 * n_full) limit = kwargs.get("limit", 5000) if n_init < limit: # TODO: should actually be 5000 for optimal results... for wind power production. print(f"25% of the data is less than {limit} timesteps, thus, setting n_init to 50% of the data length or {limit}, whichever is smaller") n_init = min(int(0.5*n_full), limit) # print("n_init, n_full: ", n_init, n_full) corrected_ensembles = corrected_ensembles.numpy() corrected_ensembles = remove_zero_columns_numpy(corrected_ensembles) corrected_ensembles = remove_straight_line_outliers(corrected_ensembles) corrected_ensembles = pd.DataFrame(corrected_ensembles, index=test_idx) n_in_X = n_init taqr_results, actuals_output, BETA_output = run_taqr( corrected_ensembles, actuals_out_of_sample, quantiles_taqr, n_init, n_full, n_in_X, ) actuals_out_of_sample = actuals_out_of_sample[(n_init + 1) : (n_full - 1)] actuals_output = pd.Series( actuals_output.flatten(), index=test_idx[(n_init + 1) : (n_full - 1)] ) corrected_ensembles = corrected_ensembles.iloc[(n_init + 1) : (n_full - 1)] idx_to_save = test_idx[(n_init + 1) : (n_full - 1)] # Save results data_source = f"{name}" today = dt.datetime.today().strftime("%Y-%m-%d") if save_files: np.save( f"results_{today}_{data_source}_actuals_out_of_sample.npy", actuals_out_of_sample, ) df_corrected_ensembles = pd.DataFrame(corrected_ensembles, index=idx_to_save) if save_files: df_corrected_ensembles.to_csv( f"results_{today}_{data_source}_corrected_ensembles.csv" ) np.save(f"results_{today}_{data_source}_taqr_results.npy", taqr_results) np.save(f"results_{today}_{data_source}_actuals_output.npy", actuals_output) np.save(f"results_{today}_{data_source}_BETA_output.npy", BETA_output) return ( corrected_ensembles, pd.DataFrame(np.array(taqr_results).T, index=idx_to_save), actuals_output, BETA_output, ensembles, )