D_beta script and for batch job submission
From https://doi.org/10.1101/2024.05.29.596499
Script
from joblib import Parallel, delayed
import numpy as np
from scipy.special import comb
from scipy.linalg import toeplitz
def sm_d2(sig, order):
"""
Calculates D2, the distance to the beta-critical fixed point.
This is the same as sm_dbeta with beta=2
Args:
sig (np.ndarray): The input time series (should be z-scored).
order (int): The order 'p' of the AR(p) model.
Returns:
float: The calculated D2 distance.
"""
import statsmodels.api as sm
coefs, _ = sm.regression.yule_walker(sig, order=order, method='mle')
return (1-np.sum(coefs))/np.sqrt(order)
def _create_phi_r(r, p):
"""
Creates the special history kernel phi^(r).
This vector is an element of the A_2r^(r) subspace.
Args:
r (int): The order of the kernel to create.
p (int): The total order of the AR(p) model, for zero-padding.
Returns:
np.ndarray: The phi^(r) vector of length p.
"""
if r > p:
return np.zeros(p)
phi_r = np.zeros(p)
for t in range(1, r + 1):
phi_r[t - 1] = comb(r, t) * ((-1)**(t + 1))
return phi_r
def sm_dbeta(sig, order, beta):
"""
Calculates d_beta, the distance to the beta-critical fixed point
Args:
sig (np.ndarray): The input time series (should be z-scored).
order (int): The order 'p' of the AR(p) model.
beta (int): The beta value for the fixed point (must be even, >= 2).
Returns:
float: The calculated d_beta distance.
"""
if beta % 2 != 0 or beta < 2:
raise ValueError("beta must be an even integer >= 2.")
theta, _ = sm.regression.yule_walker(sig, order=order, method='mle')
subspace_dim = order - beta // 2
if subspace_dim < 0:
return np.linalg.norm(theta)
origin = _create_phi_r(beta // 2, order)
basis_vectors = []
for j in range(beta // 2 + 1, order + 1):
vec = _create_phi_r(j, order) - origin
basis_vectors.append(vec)
X = np.array(basis_vectors).T
v, _, _, _ = np.linalg.lstsq(X, theta - origin, rcond=None)
phi_closest = X @ v + origin
d_beta = np.linalg.norm(theta - phi_closest)
return d_beta
def d2_matlab(sig, order):
"""
Calculates the kernel weights based on the Yule-Walker equations with ACF.
Translated from MATLAB of Hengenlab (https://github.com/hengenlab/d2_matlab/blob/main/d2_fromsam_matlab_non_figshare/d2_matlab.m)
Args:
sig (np.ndarray): The input signal (time series) as a NumPy array.
order (int): The order of the autoregressive model.
Returns:
np.ndarray: The calculated kernel weights.
"""
from statsmodels.tsa.stattools import acf
full_acf = acf(sig, nlags=order, fft=True)
R = toeplitz(full_acf[:-1])
b = full_acf[1:]
kern = np.linalg.solve(R, b)
return (1 - np.sum(kern)) / np.sqrt(order)
def acr_fft(x):
x_centered = x - np.mean(x)
# FFT-based autocorrelation is much faster for large arrays
n = len(x_centered)
# Zero-padding to prevent circular correlation effects
fft = np.fft.fft(x_centered, n=2*n)
psd = np.abs(fft)**2
acr = np.fft.ifft(psd).real[:n]
# Normalize by the zero-lag value
acr /= acr[0]
return acr
def mark_d2(sig, order):
"""
Optimized d2 calculation using scipy's toeplitz function.
"""
# Use FFT-based autocorrelation
acf = acr_fft(sig)[:order+1]
order_lag_acf = acf[-1]
acf = acf[1:-1]
# Pre-allocate and use direct indexing for better performance
r = np.concatenate([acf[::-1], [1], acf])
# Create Toeplitz matrix with scipy's optimized function
c = r[order-1:2*order-1] # First column
r_row = r[order-1::-1] # First row
R = toeplitz(c, r_row)
# Solve linear system
rhs = np.concatenate([r[order:], [order_lag_acf]])
kern = np.linalg.solve(R, rhs)
return (1 - np.sum(a=kern)) / np.sqrt(order)
def opt_sm_d2(sig, order):
sig = sig.astype(np.float64)
sig -= sig.mean()
n = sig.shape[0]
r = np.zeros(order+1, np.float64)
r[0] = np.dot(sig, sig)
for k in range(1, order+1):
r[k] = np.dot(sig[0:-k], sig[k:])
r /= n
R = toeplitz(r[:-1])
coefs = np.linalg.solve(R, r[1:])
return (1 - np.sum(coefs)) / np.sqrt(order)
class D2Calculator:
def __init__(self, dp_per_minute):
"""
Initialize the D2Calculator with a specified data points per minute.
Args:
dp_per_minute (int): Number of data points per minute.
"""
dp_per_minute = int(dp_per_minute)
self.dp_per_minute = dp_per_minute
def get_all_d2(self, window_size, step_size, data, order, d2_function=sm_d2, n_worker=-1, shuffle=False, mean=False, zscore=True, seed=None):
"""
Calculate D2 values for a given time series data.
Args:
window_size (int): Size of the sliding window in minutes.
step_size (int): Step size for the sliding window in minutes.
data (np.ndarray): Input time series data.
order (int): Order of the AR model.
d2_function (function): Function to calculate D2, default is sm_d2.
n_worker (int): Number of parallel workers, -1 for all available.
shuffle (bool): If True, shuffle the data within each window.
mean (bool): If True, subtract the mean from the data before processing.
zscore (bool): If True, z-score the data before processing. (Advised)
Returns:
np.ndarray: Array of D2 values calculated for each window.
"""
if mean:
data = data - np.mean(data)
if zscore:
data = (data - np.mean(data)) / np.std(data)
if seed is not None:
np.random.seed(seed)
actual_window_size = window_size * self.dp_per_minute
actual_step_size = step_size * self.dp_per_minute
n_jobs = len(data) - actual_window_size + 1
job_data = range(0, n_jobs, actual_step_size)
results = Parallel(n_jobs=n_worker)(
delayed(d2_function)(
(
data[start:start + actual_window_size].astype(np.float32)
if not shuffle
else np.random.permutation(data[start:start + actual_window_size]).astype(np.float32)
),
order
) for start in job_data
)
return np.array(results)
def get_all_dbeta(self, window_size, step_size, data, order, beta=2, dbeta_function=sm_dbeta, n_worker=-1, shuffle=False, mean=False, zscore=True, seed=None):
"""
Calculate D beta values for a given time series data.
Args:
window_size (int): Size of the sliding window in minutes.
step_size (int): Step size for the sliding window in minutes.
data (np.ndarray): Input time series data.
order (int): Order of the AR model.
dbeta_function (function): Function to calculate D beta, default is sm_dbeta.
n_worker (int): Number of parallel workers, -1 for all available.
shuffle (bool): If True, shuffle the data within each window.
mean (bool): If True, subtract the mean from the data before processing.
zscore (bool): If True, z-score the data before processing. (Advised)
Returns:
np.ndarray: Array of D beta values calculated for each window.
"""
if mean:
data = data - np.mean(data)
if zscore:
data = (data - np.mean(data)) / np.std(data)
if beta % 2 != 0 or beta < 2:
raise ValueError("beta must be an even integer >= 2.")
if seed is not None:
np.random.seed(seed)
actual_window_size = window_size * self.dp_per_minute
actual_step_size = step_size * self.dp_per_minute
n_jobs = len(data) - actual_window_size + 1
job_data = range(0, n_jobs, actual_step_size)
results = Parallel(n_jobs=n_worker)(
delayed(dbeta_function)(
(
data[start:start + actual_window_size].astype(np.float32)
if not shuffle
else np.random.permutation(data[start:start + actual_window_size]).astype(np.float32)
),
order,
beta
) for start in job_data
)
return np.array(results)
Example Usage
import d2
from test_d2_crt import calc_d2v1_loc
_ = d2_calculator.get_all_dbeta(
window_size=15, step_size=1, data=first_row, order=i, beta=2, zscore=True
)
_ = d2_calculator.get_all_d2(
window_size=15, step_size=1, data=first_row, order=i, d2_function=d2.sm_d2
)
_ = d2_calculator.get_all_d2(
window_size=15, step_size=1, data=first_row, order=i, d2_function=calc_d2v1_loc
)
Result
Note Each value were calcuted from 5 minutes of data with 6000 data points per minute and a order of 10.