"""Time-Adaptive Quantile Regression (TAQR) Implementation
This module provides the core implementation of the TAQR algorithm based on
Møller's thesis "Modeling of Uncertainty in Wind Energy Forecast" (2006).
It implements an adaptive simplex algorithm for quantile regression problems.
"""
import numpy as np
import scipy.linalg
import time
from scipy import linalg as la
# from sklearn.linear_model import QuantileRegressor # TODO: Removed 13/1-25, check if it works
from .helper_functions import *
import pandas as pd
def opdatering_final(
X, Xny, IX, Iy, Iex, Ih, Ihc, beta, Rny, K, n, xB, P, tau, i, bins, n_in_bin
):
"""Updates the design matrix and related parameters for the adaptive quantile regression.
Parameters
----------
X : numpy.ndarray
Full data matrix
Xny : numpy.ndarray
Current design matrix
IX : numpy.ndarray
Index set for design matrix columns
Iy : int
Index for response variable
Iex : int
Index for grouping variable
Ih : numpy.ndarray
Index set for basic variables
Ihc : numpy.ndarray
Index set for non-basic variables
beta : numpy.ndarray
Current coefficient estimates
Rny : numpy.ndarray
Residuals array
K : int
Number of explanatory variables
n : int
Current number of observations
xB : numpy.ndarray
Basic solution
P : numpy.ndarray
Sign vector
tau : float
Quantile level
i : int
Current iteration
bins : numpy.ndarray
Bin boundaries
n_in_bin : int
Maximum number of observations per bin
Returns
-------
tuple
Updated parameters (Ih, Ihc, xB, Xny, Rny, P, n, i)
"""
Xny[Xny == np.inf] = 1
Xny[Xny == -np.inf] = 1
Xny = np.vstack((Xny, X[n + i - 2, :]))
Index = np.arange(Xny[:-1, Iy].shape[0])
j = 1
while Xny[-1, Iex] > bins[j]:
j += 1
j -= 1
In = Index[(Xny[:-1, Iex] > bins[j]) & (Xny[:-1, Iex] <= bins[j + 1])]
if In.size > 0:
Leav = np.min(In)
else:
Leav = 1
minIL, Inmin = np.min(np.abs(Ih - Leav)), np.argmin(np.abs(Ih - Leav))
if minIL == 0 and len(In) == n_in_bin:
cB = (P < 0) + P * tau
invXh = np.linalg.inv(Xny[Ih, IX])
g = -(P * (Xny[Ihc, IX] @ invXh[:, Inmin])).T @ cB
h = np.vstack(
(invXh[:, Inmin], -P * (Xny[Ihc, IX] @ invXh[:, Inmin]))
) @ np.sign(g)
sigma = np.zeros(n - K)
hm = h[K:]
xBm = xB[K:]
xBm[xBm < 0] = 0
tolerance = 1e-10
sigma[hm > tolerance] = xBm[hm > tolerance] / hm[hm > tolerance]
sigma[hm <= tolerance] = np.inf
alpha, q = np.min(sigma), np.argmin(sigma)
z = xB - alpha * h
Ihm = Ih[Inmin]
Ih[Inmin] = Ihc[q]
Ihc[q] = Ihm
xB = z
xB[q + K] = alpha
P[q] = np.sign(g) + (g == 0)
Ih = np.sort(Ih)
Ihc, IndexIhc = np.sort(Ihc), np.argsort(Ihc)
P = P[IndexIhc]
xBm = xB[K:]
xBm = xBm[IndexIhc]
xB[K:] = xBm
beta = xB[:K]
rny = X[n + i - 2, Iy] - X[n + i - 2, IX] @ beta
Rny = np.append(Rny, rny)
if rny < 0:
P = np.append(P, -1)
xB = np.append(xB, -rny)
else:
P = np.append(P, 1)
xB = np.append(xB, rny)
Ihc = np.append(Ihc, n)
if len(In) == n_in_bin:
i += 1
Stay = np.ones(len(Ihc), dtype=bool)
Stay[Ihc == Leav] = False
P = P[Stay]
Ihc = Ihc[Stay]
Xny = Xny[np.sort(np.hstack((Ih, Ihc))), :]
xBm = xB[K:]
xBm = xBm[Stay]
xB = xB[:-1]
xB[K:] = xBm
Ihc[Ihc > Leav] -= 1
Ih[Ih > Leav] -= 1
else:
n += 1
return Ih, Ihc, xB, Xny, Rny, P, n, i
def rq_initialiser_final(X, r, beta, n):
"""Initialize parameters for the simplex algorithm based on initial solution.
If the number of zero elements in r is equal to rank(X), then the work is essentially done.
Otherwise, the if statement will find the index set Ih s.t. X(Ih)*beta=y(Ih) and X(Ih)^(-1)*y(Ih)=beta,
the important note being that X(Ih) has an inverse.
This is done by using the LU transform of a non-quadratic matrix.
Parameters
----------
X : numpy.ndarray
Design matrix
r : numpy.ndarray
Initial residuals
beta : numpy.ndarray
Initial coefficients
n : int
Number of observations
Returns
-------
tuple
(xB, Ih, Ihc, P) Basic solution, basic indices, non-basic indices, and sign vector
"""
Index = np.arange(n)
if np.sum(r == 0) > len(beta):
Lr0 = np.sum(r == 0)
rs, Irs = np.sort(np.abs(r)), np.argsort(np.abs(r))
Irs = Irs[:Lr0]
Xh = X[Irs, :]
P, L, U = scipy.linalg.lu(Xh, permute_l=False)
In = np.arange(Lr0)
rI = np.zeros(Lr0 - len(beta), dtype=int)
for i in range(len(beta), Lr0):
rI[i - len(beta)] = In[P[:, i] == 1]
r[Irs[rI]] = 1e-15
index_to_index = r.flatten() == 0
if len(index_to_index) < len(Index):
index_to_index = np.append(
index_to_index, np.zeros(len(Index) - len(index_to_index), dtype=bool)
)
Ih = Index[index_to_index]
Ihc = np.setdiff1d(Index[: len(r)], Ih)
P = np.sign(r[Ihc])
r[np.abs(r) < 1e-15] = 0
xB = np.hstack((beta, np.abs(r[Ihc])))
return xB, Ih, Ihc, P
def rq_simplex_alg_final(Ih, Ihc, n, K, xB, Xny, IH, P, tau):
"""Perform one step of the simplex algorithm for quantile regression.
Parameters
----------
Ih : numpy.ndarray
Index set for basic variables
Ihc : numpy.ndarray
Index set for non-basic variables
n : int
Number of observations
K : int
Number of explanatory variables
xB : numpy.ndarray
Basic solution
Xny : numpy.ndarray
Design matrix
IH : numpy.ndarray
History of index sets
P : numpy.ndarray
Sign vector
tau : float
Quantile level
Returns
-------
tuple
Algorithm parameters for the next iteration
"""
try:
invXh = la.inv(Xny[Ih, :])
except np.linalg.LinAlgError:
print("Error in inverse matrix operation. Trying pseudo-inverse instead.")
invXh = np.linalg.pinv(Xny[Ih, :])
except Exception as e:
raise ValueError(
"Error in inverse matrix operation. Most likely cause of error: "
"- Increase length of data, "
"- Ensure no collinearity in the data"
) from e
cB = (P < 0) + P * tau
cC = np.vstack((np.ones(K) * tau, np.ones(K) * (1 - tau))).reshape(-1, 1)
IB2 = -np.dot(P.reshape(-1, 1) * (np.ones((1, K)) * Xny[Ihc, :]), invXh)
g = IB2.T @ cB
d = cC - np.vstack((g, -g)).reshape(-1, 1)
d[np.abs(d) < 1e-15] = 0
d = d.flatten()
md, s = np.sort(d), np.argsort(d)
s = s[md < 0]
md = md[md < 0]
c = np.ones(len(s))
c[s > (K - 1)] = -1
C = np.diag(c)
s[s > (K - 1)] -= K
h = np.vstack([invXh[:, s], IB2[:, s]]) @ C
alpha = np.zeros(len(s))
q = np.zeros(len(s), dtype=int)
xm = xB[K:]
xm[xm < 0] = 0
hm = h[K:, :]
cq = np.zeros(len(s))
tol1 = 1e-12
for k in range(len(s)):
sigma = xm.copy()
sigma[hm[:, k] > tol1] = xm[hm[:, k] > tol1] / hm[hm[:, k] > tol1, k]
sigma[hm[:, k] <= tol1] = np.inf
alpha[k], q[k] = np.min(sigma), np.argmin(sigma)
cq[k] = c[k]
gain = md * alpha
Mgain, IMgain = np.sort(gain), np.argsort(gain)
CON = np.inf
j = 0
if len(gain) == 0:
gain = 1
else:
while CON > 1e6 and j < len(s):
j += 1
IhMid = Ih.copy()
shifter = 0
IhMid[s[IMgain[j - 1 + shifter]]] = Ihc[q[IMgain[j - 1 + shifter]]]
IhMid = np.sort(IhMid)
if IH.shape[1] <= 1:
IH = IH.T.reshape(-1, 1)
IhMid = IhMid.reshape(1, -1)
if (
np.min(np.sum(np.abs(IH - IhMid.T * np.ones((1, IH.shape[1]))), axis=0))
== 0
):
CON = np.inf
else:
CON = np.linalg.cond(Xny[(IhMid.flatten()), :])
s = s[IMgain[j - 1 + shifter]]
q = q[IMgain[j - 1 + shifter]]
cq = cq[IMgain[j - 1 + shifter]]
alpha = alpha[IMgain[j - 1 + shifter]]
IH = np.hstack((IH, IhMid.T))
h = h[:, IMgain[j - 1 + shifter]]
gain = gain[IMgain[j - 1 + shifter]]
md = md[IMgain[j - 1 + shifter]]
return CON, s, q, gain, md, alpha, h, IH, cq
def rq_purify_final(xB, Ih, Ihc, P, K, Xny, yny):
"""Handle infeasible points in a simplex formulation of a quantile regression problem.
The underlying assumption is that there are no restrictions in the problem.
The updating can therefore be done by recalculating all residuals and coefficients.
The assumption is further that we are in a position s.t.
Xny*Xny(Ih)^(-1)*yny(Ih)=yny+residuals
K=rank(Xny)
Parameters
----------
xB : numpy.ndarray
Basic solution
Ih : numpy.ndarray
Index set for basic variables
Ihc : numpy.ndarray
Index set for non-basic variables
P : numpy.ndarray
Sign vector
K : int
Number of explanatory variables
Xny : numpy.ndarray
Design matrix
yny : numpy.ndarray
Response vector
Returns
-------
tuple
(xB, P) Updated basic solution and sign vector
"""
invXh = np.linalg.inv(Xny[Ih, :])
xB = np.hstack((invXh @ yny[Ih], yny[Ihc] - Xny[Ihc, :] @ invXh @ yny[Ih]))
P = np.sign(xB[K:])
P[P == 0] = 1
xB[K:] = np.abs(xB[K:])
return xB, P
[docs]
def rq_simplex_final(X, IX, Iy, Iex, r, beta, n, tau, bins, n_in_bin):
"""Calculate solution to an adaptive simplex algorithm for quantile regression.
The function uses knowledge of the solution at time t to calculate the solution at
time t+1. The basic idea is that the solution to the quantile regression
problem can be written as:
y(t) = X(t)'*beta + r(t)
where beta = X(h)^(-1)*y(h) for some index set h. Simplex algorithm
is used to calculate the optimal h at time t+1 based on the solution
at time t.
Parameters
----------
X : numpy.ndarray
Design matrix for the linear quantile regression problem
IX : numpy.ndarray
Index set referring to columns of X which is the design matrix
Iy : int
Index referring to response column in X
Iex : int
Index referring to grouping variable column in X
r : numpy.ndarray
Residuals from initial solution
beta : numpy.ndarray
Initial solution coefficients
n : int
Number of elements in r
tau : float
Required probability
bins : numpy.ndarray
Vector defining partition intervals
n_in_bin : int
Number of elements per bin
Returns
-------
tuple
(N, BETA, GAIN, Ld, Rny, Mx, Re, CON1, T)
- N: Number of simplex steps
- BETA: Solution matrix
- GAIN: Loss function gain
- Ld: Number of descent directions
- Rny: One-step-ahead prediction residuals
- Mx: Minimum constraint solution
- Re: Training set reliability
- CON1: Condition numbers
- T: Computation times
References
----------
.. [1] J. K. Møller (2006), "Modeling of Uncertainty in Wind Energy
Forecast". Master Thesis, Technical University of Denmark.
.. [2] H. B. Nielsen (1999), "Algorithms for Linear Optimization, an
Introduction". DTU Course Notes.
"""
GAIN = np.array([0])
Rny = np.array([0])
Ld = np.array([0])
mx = 0
T = np.zeros(len(X[:, Iy]))
Mx = np.zeros(len(X[:, Iy]))
N_num_of_simplex_steps = []
N_size_of_the_design_matrix_at_time_k = []
CON1 = np.array([0])
BETA = beta.reshape(1, -1)
K = len(beta)
Xny = X[:(n), :]
H = np.zeros((1, K))
tolmx = 1e-15
j = 0
LX = len(X[:, Iy])
i = 2
k = 0
xB, Ih, Ihc, P = rq_initialiser_final(Xny[:, IX], r, beta, n)
Re = np.zeros(LX - n)
while i + n < LX:
k += 1
t = time.time()
Re[k] = np.sum(P < 0) / n
mx = np.min(xB[K:])
if j > 0 and mx < -tolmx:
xB, P = rq_purify_final(xB, Ih, Ihc, P, K, Xny[:, IX], Xny[:, Iy])
mx = np.min(xB[K:])
Mx[k] = mx
j = 0
beta = xB[:K]
BETA = np.vstack((BETA, beta))
Ih, Ihc, xB, Xny, Rny, P, n, i = opdatering_final(
X, Xny, IX, Iy, Iex, Ih, Ihc, beta, Rny, K, n, xB, P, tau, i, bins, n_in_bin
)
IH = Ih.reshape(-1, 1)
CON, s, q, gain, md, alpha, h, IH, cq = rq_simplex_alg_final(
Ih, Ihc, n, K, xB, Xny[:, IX], IH, P, tau
)
CON1 = np.append(CON1, CON)
while gain <= 0 and md < 0 and j < 24 and CON < 1e6:
GAIN = np.append(GAIN, gain)
j += 1
z = xB - alpha * h
IhM = Ih[s]
IhcM = Ihc[q]
Ih[s] = IhcM
Ihc[q] = IhM
P[q] = cq
xB = z
xB[q + K] = alpha
Ih = np.sort(Ih)
Ihc, IndexIhc = np.sort(Ihc), np.argsort(Ihc)
P = P[IndexIhc]
xBm = xB[K:]
xBm = xBm[IndexIhc]
xB[K:] = xBm
CON, s, q, gain, md, alpha, h, IH, cq = rq_simplex_alg_final(
Ih, Ihc, n, K, xB, Xny[:, IX], IH, P, tau
)
CON1 = np.append(CON1, CON)
N_num_of_simplex_steps.append(j)
N_size_of_the_design_matrix_at_time_k.append(n)
T[k] = time.time() - t
N = np.hstack([N_num_of_simplex_steps, N_size_of_the_design_matrix_at_time_k])
return N, BETA, GAIN[1:], Ld, Rny, Mx, Re, CON1[1:], T
[docs]
def one_step_quantile_prediction(
X_input,
Y_input,
n_init,
n_full,
quantile=0.5,
already_correct_size=False,
n_in_X=5000,
):
"""Perform one-step quantile prediction using TAQR.
Takes the entire training set and, based on the last n_init observations,
calculates residuals and coefficients for the quantile regression.
Parameters
----------
X_input : numpy.ndarray
Input features matrix
Y_input : numpy.ndarray
Target values array
n_init : int
Number of initial observations for training
n_full : int
Total number of observations to use
quantile : float, optional
Quantile level to predict, by default 0.5
already_correct_size : bool, optional
Whether inputs are already correctly sized, by default False
n_in_X : int, optional
Number of observations to use in X, by default 5000
Returns
-------
tuple
(y_pred, y_actual, BETA) Predictions, actual values, and coefficients
"""
assert n_init <= n_full - 2, "n_init must be less than n_full"
if type(X_input) == pd.DataFrame:
X_input = X_input.to_numpy()
if type(Y_input) == pd.Series or type(Y_input) == pd.DataFrame:
Y_input = Y_input.to_numpy()
n, m = X_input.shape
full_length, p = X_input.shape
X = X_input[:n_full, :].copy()
Y = Y_input[:n_full]
X_for_residuals = X[:n_init, :]
Y_for_residuals = Y[:n_init]
np.savetxt("X_for_residuals.csv", X_for_residuals, delimiter=",")
np.savetxt("Y_for_residuals.csv", Y_for_residuals, delimiter=",")
run_r_script("X_for_residuals.csv", "Y_for_residuals.csv", tau=quantile)
def ignore_first_column(s):
return float(s)
residuals = np.genfromtxt(
"rq_fit_residuals.csv",
delimiter=",",
skip_header=1,
usecols=(1,),
converters={0: ignore_first_column},
)
beta_init = np.genfromtxt(
"rq_fit_coefficients.csv",
delimiter=",",
skip_header=1,
usecols=(1,),
converters={0: ignore_first_column},
)
if len(beta_init) < p:
beta_init = np.append(beta_init, np.ones(p - len(beta_init)))
else:
beta_init = beta_init[:p]
r_init = set_n_closest_to_zero(arr=residuals, n=len(beta_init))
X_full = np.column_stack((X, Y, np.random.choice([1, 1], size=n_full)))
IX = np.arange(p)
Iy = p
Iex = p + 1
bins = np.array([-np.inf, np.inf])
tau = quantile
n_in_bin = int(1.0 * full_length)
n_input = n_in_X
N, BETA, GAIN, Ld, Rny, Mx, Re, CON1, T = rq_simplex_final(
X_full, IX, Iy, Iex, r_init, beta_init, n_input, tau, bins, n_in_bin
)
y_pred = np.sum((X_input[(n_input + 2) : (n_full), :] * BETA[1:, :]), axis=1)
y_actual = Y_input[(n_input) : (n_full - 2)]
return y_pred, y_actual, BETA
def run_r_script(X_filename, Y_filename, tau):
"""Run R script for quantile regression.
Parameters
----------
X_filename : str
Path to X data CSV file
Y_filename : str
Path to Y data CSV file
tau : float
Quantile level
"""
import subprocess
process = subprocess.Popen(
["R", "--vanilla"], stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
r_script = f"""
options(warn = -1)
library(onlineforecast)
library(quantreg)
library(readr)
X_full <- read_csv("{X_filename}", col_names = FALSE, show_col_types = FALSE)
y <- read_csv("{Y_filename}", col_names = "y", show_col_types = FALSE)
X_full <- X_full[1:500,]
data <- cbind(X_full, y[1:500,1])
predictor_cols <- colnames(X_full)
formula_string <- paste("y ~ 0+", paste(predictor_cols, collapse = " + "))
formula <- as.formula(formula_string)
rq_fit <- rq(formula, tau = {tau}, data = data )
write.csv(rq_fit$coefficients, "rq_fit_coefficients.csv")
write.csv(rq_fit$residuals, "rq_fit_residuals.csv")
"""
for line in r_script.strip().split("\n"):
process.stdin.write(line.encode("utf-8") + b"\n")
process.stdin.close()
process.stdout.read()
process.terminate()