import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
import time
[docs]
def set_n_smallest_to_zero(arr, n):
"""Set the n smallest elements in an array to zero.
Parameters
----------
arr : array-like
Input array of numbers
n : int
Number of smallest elements to set to zero
Returns
-------
numpy.ndarray
Modified array with n smallest elements set to zero
"""
if n <= 0:
return arr
if n >= len(arr):
return [0] * len(arr)
# Find the n'th smallest element
nth_smallest = sorted(arr)[n - 1]
# Set elements smaller than or equal to nth_smallest to zero
modified_arr = [0 if x <= nth_smallest else x for x in arr]
modified_arr = np.array(modified_arr)
return modified_arr
[docs]
def set_n_closest_to_zero(arr, n):
"""Set the n elements closest to zero in an array to zero.
Parameters
----------
arr : array-like
Input array of numbers
n : int
Number of elements closest to zero to set to zero
Returns
-------
numpy.ndarray
Modified array with n elements closest to zero set to zero
"""
if n <= 0:
return arr
if n >= len(arr):
return [0] * len(arr)
# Find the absolute values of the elements
abs_arr = np.abs(arr)
# Find the indices of the n elements closest to zero
closest_indices = np.argpartition(abs_arr, n)[:n]
# Set the elements closest to zero to zero
modified_arr = arr.copy()
modified_arr[closest_indices] = 0
return modified_arr
[docs]
def quantile_score(p, z, q):
"""Calculate the Quantile Score (QS) for a given probability and set of observations and quantiles.
Implementation based on Fauer et al. (2021): "Flexible and consistent quantile estimation for
intensity–duration–frequency curves"
Parameters
----------
p : float
The probability level (between 0 and 1)
z : numpy.ndarray
The observed values
q : numpy.ndarray
The predicted quantiles
Returns
-------
float
The Quantile Score (QS)
"""
u = z - q
rho = np.where(u > 0, p * u, (p - 1) * u)
return np.sum(rho)
[docs]
def build_ar1_covariance(n, rho, sigma=1.0):
"""
Build the AR(1) covariance matrix for an n-dimensional process.
Parameters
----------
n : int
Dimension of the covariance matrix.
rho : float
AR(1) correlation parameter (the AR coefficient).
sigma : float, optional
Standard deviation of the noise (innovation), defaults to 1.0.
Returns
-------
numpy.ndarray
The AR(1) covariance matrix of shape (n, n), with elements sigma^2 * rho^(|i-j|).
"""
indices = np.arange(n)
abs_diff = np.abs(np.subtract.outer(indices, indices))
cov_matrix = (sigma**2) * (rho**abs_diff)
return cov_matrix
[docs]
def get_parameter_bounds() -> Dict[str, Tuple[float, float]]:
"""Define bounds for all parameters for SDE simulation.
Used to ensure that the parameters are within a reasonable range."""
return {
'X0': (0.0, 1.0),
'theta': (0.2, 0.8), # Lowered upper bound for mean level
'kappa': (0.05, 0.5), # Reduced mean reversion speed
'sigma_base': (0.01, 2), # Reduced base volatility
'alpha': (0.01, 0.8), # Reduced ARCH effect
'beta': (0.7, 1.2), # Increased persistence
'lambda_jump': (0.005, 0.05), # Fewer jumps
'jump_mu': (-0.2, 0.2), # Allowing negative jumps
'jump_sigma': (0.05, 0.2) # More consistent jump sizes
}
[docs]
def generate_ou_ensembles(
X: np.ndarray,
kappa: float,
sigma: float,
chunk_size: int = 24,
n_ensembles: int = 50
) -> np.ndarray:
"""
Generate continuous Ornstein-Uhlenbeck (OU) ensemble paths that revert
to the given reference series X[t], in chunk_size increments, but also
simulate 'extra' future steps to account for OU lag and shift
them back so the paths better align with X in real-time.
The ensemble is clipped to remain within [0,1].
Parameters
----------
X : np.ndarray
Reference series of length T that serves as the time-varying mean
for each OU path.
kappa : float
Mean-reversion speed for the OU process.
The characteristic lag ~ 1/kappa.
sigma : float
Diffusion (volatility) parameter.
chunk_size : int, optional
Size of each chunk in timesteps. Defaults to 24.
n_ensembles : int, optional
Number of ensemble paths to generate. Defaults to 50.
Returns
-------
Y_corrected : np.ndarray, shape (T, n_ensembles)
The lag-corrected OU ensemble paths, each of length T.
Notes
-----
- We break the timeline [0..T-1] into blocks of `chunk_size` steps.
At chunk boundaries, each ensemble path is continuous
(meaning; the new chunk starts where the old chunk ended).
- We simulate extra steps (about 1/kappa) at the end, then shift the entire
simulation backward by ~1/kappa to reduce the effective lag in real time.
- For fractional lag, a simple linear interpolation is applied.
- This "lag correction" is heuristic but often aligns the OU paths with
X(t) more tightly when the reversion is slow.
"""
T = len(X)
# ------------------------------
# 1. Calculate the lag
# ------------------------------
# For an OU with mean reversion kappa, characteristic timescale is 1/kappa
lag = 1.0 / kappa
lag_int = int(np.floor(lag))
lag_frac = lag - lag_int
# We'll simulate T + lag_int + 1 steps. The "+1" helps with
# fractional shift interpolation.
sim_len = T + lag_int + 1
# ------------------------------
# 2. Allocate array for ensembles
# ------------------------------
# We'll simulate a bigger array, shape (sim_len, n_ensembles).
Y_big = np.zeros((sim_len, n_ensembles))
# For the very first step, set them all to X[0]
Y_big[0, :] = X[0]
# Precompute chunk boundaries for the extended length
chunk_starts = list(range(0, sim_len, chunk_size))
# ------------------------------
# 3. Simulate with chunking
# ------------------------------
for idx_chunk in range(len(chunk_starts)):
start = chunk_starts[idx_chunk]
end = min(start + chunk_size, sim_len)
if start > 0:
# Continue from previous chunk's last step
Y_big[start, :] = Y_big[start - 1, :]
# Step through the chunk
for i in range(start, end - 1):
# We must clamp i to the length of X when indexing X:
# Because beyond T-1, we can assume X stays at X[-1]
iX = min(i, T - 1)
# OU update
drift = kappa * (X[iX] - Y_big[i, :])
diffusion = sigma * np.random.randn(n_ensembles)
Y_big[i + 1, :] = Y_big[i, :] + drift + diffusion
# Enforce [0, 1] clipping
Y_big[i + 1, :] = np.clip(Y_big[i + 1, :], 0.0, 1.0)
# Now we have Y_big of length sim_len = T + lag_int + 1
# ------------------------------
# 4. Shift the result backward
# by "lag = lag_int + lag_frac"
# ------------------------------
# We'll produce a final array Y_corrected of shape (T, n_ensembles).
Y_corrected = np.zeros((T, n_ensembles))
# If there's no fractional part, we can just slice
# If lag_frac == 0, then just do:
# Y_corrected[t] = Y_big[t + lag_int]
# for t = 0..T-1
if np.isclose(lag_frac, 0.0, atol=1e-14):
for t in range(T):
Y_corrected[t, :] = Y_big[t + lag_int, :]
else:
# We have a fractional shift, so do a linear interpolation
# Y_corrected[t] = (1 - alpha)* Y_big[t + lag_int] + alpha * Y_big[t + lag_int + 1]
# where alpha = lag_frac
alpha = lag_frac
for t in range(T):
i1 = t + lag_int
i2 = i1 + 1
# clamp i2 to avoid out-of-bounds (should not occur with sim_len = T + lag_int + 1)
i2 = min(i2, sim_len - 1)
Y_corrected[t, :] = (1 - alpha) * Y_big[i1, :] + alpha * Y_big[i2, :]
# We have a T x n_ensembles array that’s effectively “time-shifted” back by lag steps
return Y_corrected
[docs]
def simulate_wind_power_sde(params: Dict[str, float], T: float = 500, dt: float = 1.0) -> Tuple[np.ndarray, np.ndarray]:
"""
Simulate wind power production using an Ornstein-Uhlenbeck process with GARCH volatility
and jumps of normally distributed sizes driven by a Poisson process. The mean reversion
is state-dependent with a repelling mechanism near 1.0 (upper boundary), and the diffusion
term vanishes at the boundaries to avoid unphysical values outside [0, 1].
A few additional tweaks include:
- GARCH volatility that captures 'vol_shock' from recent values.
- Repellent forces that strengthen near 1.0, reducing both the drift and diffusion.
- Jumps that can persist over multiple steps, and become more negative if values are near 1.0.
Parameters
----------
params : Dict[str, float]
A dictionary containing all model parameters:
- X0 : float
Initial wind power production level in [0, 1].
- theta : float
Long-term mean level; typically in [0, 1].
- kappa : float
Mean reversion speed (absolute value is used).
- sigma_base : float
Base volatility level (absolute value is used).
- alpha : float
ARCH parameter (absolute value is used).
- beta : float
GARCH parameter; must be in [0, 1].
- lambda_jump : float
Intensity of jump arrivals in the Poisson process (absolute value is used).
- jump_mu : float
Mean jump size (can be positive or negative).
- jump_sigma : float
Standard deviation of jump sizes (absolute value is used).
T : float, optional
The end time of the simulation (total number of steps is T/dt). Default is 500.
dt : float, optional
The size of each time step. Default is 1.0.
Returns
-------
t : np.ndarray
Array of time points of length N = int(T/dt).
X : np.ndarray
Simulated wind power production values of length N, clipped to the interval [0, 1].
Notes
-----
- The drift term implements a state-dependent mean reversion that weakens
near 1.0 and introduces a strong downward force very close to 1.0.
- The diffusion term is modified as
(X_t * (1 - X_t)) * (X_t / (X_t + 0.5)) dB_t,
ensuring it decreases to zero when X_t is near 0 or 1.
- GARCH effects are included to model changing volatility based on recent
shocks in the process.
- Jumps arrive according to a Poisson process with random normal magnitudes,
and can persist over multiple time steps with some decay.
Examples
--------
>>> params = {
... 'X0': 0.5, 'theta': 0.7, 'kappa': 1.0, 'sigma_base': 0.1,
... 'alpha': 0.2, 'beta': 0.5, 'lambda_jump': 0.05,
... 'jump_mu': 0.0, 'jump_sigma': 0.02
... }
>>> t, X = simulate_wind_power_sde(params, T=100, dt=1.0)
>>> import matplotlib.pyplot as plt
>>> plt.plot(t, X)
>>> plt.show()
"""
# Unpack parameters and ensure they're valid
X0 = np.clip(params['X0'], 0, 1)
theta = np.clip(params['theta'], 0, 1) # Mean level
kappa = abs(params['kappa']) # Mean reversion speed
sigma_base = abs(params['sigma_base']) # Base volatility
# GARCH parameters
alpha = abs(params['alpha']) # ARCH parameter
beta = np.clip(params['beta'], 0, 1) # GARCH parameter
# Jump parameters
lambda_jump = abs(params['lambda_jump']) # Jump intensity
jump_mu = params['jump_mu'] # Jump mean
jump_sigma = abs(params['jump_sigma']) # Jump size volatility
# Time grid
t = np.linspace(0, T, int(T/dt))
N = len(t)
# Initialize arrays
X = np.zeros(N)
X[0] = X0
# Initialize GARCH volatility
sigma = np.zeros(N)
sigma[0] = sigma_base
# Initialize jump state variables
jump_state = 0
previous_jump = 0
# Generate jumps
jump_times = np.random.poisson(max(1e-10, lambda_jump * dt), N)
jump_sizes = np.random.normal(jump_mu, max(1e-10, jump_sigma), N)
# Simulation
for i in range(1, N):
# Update GARCH volatility
vol_shock = np.abs(X[i-1] - X[max(0, i-2)]) / dt
sigma[i] = np.sqrt(alpha * vol_shock**2 + beta * sigma[i-1]**2)
# Get previous value and recent average
X_prev = X[i-1]
recent_avg = np.mean(X[max(0, i-5):i]) if i > 5 else X_prev
# State-dependent mean reversion with repellent near 1.0
if X_prev > 0.99:
# Strong repellent force very close to 1.0
drift = -kappa * 2.0 * dt
elif X_prev > 0.7:
drift = kappa * (theta - X_prev) * dt * 0.5
else:
drift = kappa * (theta - X_prev) * dt
# Modified diffusion term with memory
diff_term = (X_prev * (1 - X_prev)) * 0.4*(X_prev / (X_prev + 0.1))
if X_prev > 0.75:
# Reduce volatility if we've been stable at high values
if abs(X_prev - recent_avg) < 0.1:
diff_term *= 0.5
# Additional repellent in diffusion term near 1.0
if X_prev > 0.95:
diff_term *= 0.3
diffusion = sigma[i] * diff_term * np.sqrt(dt) * np.random.normal()
# Persistent jumps with stronger downward bias near 1.0
if jump_state > 0:
# Continue the previous jump with decay
jump = previous_jump * 0.9
jump_state -= 1
elif jump_times[i] > 0:
# Start new jump
jump = jump_sizes[i]
if X_prev > 0.98:
jump = -abs(jump) * 1.5
jump_state = np.random.geometric(0.2)
previous_jump = jump
# Make downward jumps more persistent at high values
if X_prev > 0.8 and jump < 0:
jump_state = int(jump_state * 1.5)
else:
jump = 0
# Update process
X[i] = X[i-1] + drift + diffusion + jump
# Ensure bounds
X[i] = np.clip(X[i], 0, 1)
# Apply moving average filter
X = np.convolve(X, np.ones((4,))/4, mode='valid')
# The following line is referencing an undefined 'initial_params'.
# It seems intended to reuse X0. We'll just assume it was meant as X0 in params.
X = np.concatenate(([X0] * 3, X)) # Append initial value to keep the same length
# Final adjustment and bounds
X = np.clip(X*1.05, 0, 1)
# Now we will generate ensembles from X.
# We will use an OU process to generate ensembles
ensembles = generate_ou_ensembles(
X,
kappa = 0.3,
sigma = 0.05,
chunk_size=24, # 24 hours
n_ensembles=50
)
return t, X, ensembles