Skip to content

D_beta script and for batch job submission

From https://doi.org/10.1101/2024.05.29.596499

Script

from joblib import Parallel, delayed
import numpy as np
from scipy.special import comb
from scipy.linalg import toeplitz


def sm_d2(sig, order):
    """
    Calculates D2, the distance to the beta-critical fixed point.
    This is the same as sm_dbeta with beta=2

    Args:
        sig (np.ndarray): The input time series (should be z-scored).
        order (int): The order 'p' of the AR(p) model.

    Returns:
        float: The calculated D2 distance.
    """
    import statsmodels.api as sm
    coefs, _ = sm.regression.yule_walker(sig, order=order, method='mle')
    return (1-np.sum(coefs))/np.sqrt(order)

def _create_phi_r(r, p):
    """
    Creates the special history kernel phi^(r).
    This vector is an element of the A_2r^(r) subspace.

    Args:
        r (int): The order of the kernel to create.
        p (int): The total order of the AR(p) model, for zero-padding.

    Returns:
        np.ndarray: The phi^(r) vector of length p.
    """
    if r > p:
        return np.zeros(p)

    phi_r = np.zeros(p)
    for t in range(1, r + 1):
        phi_r[t - 1] = comb(r, t) * ((-1)**(t + 1))

    return phi_r

def sm_dbeta(sig, order, beta):
    """
    Calculates d_beta, the distance to the beta-critical fixed point

    Args:
        sig (np.ndarray): The input time series (should be z-scored).
        order (int): The order 'p' of the AR(p) model.
        beta (int): The beta value for the fixed point (must be even, >= 2).

    Returns:
        float: The calculated d_beta distance.
    """
    if beta % 2 != 0 or beta < 2:
        raise ValueError("beta must be an even integer >= 2.")
    theta, _ = sm.regression.yule_walker(sig, order=order, method='mle')
    subspace_dim = order - beta // 2
    if subspace_dim < 0:
        return np.linalg.norm(theta)
    origin = _create_phi_r(beta // 2, order)
    basis_vectors = []
    for j in range(beta // 2 + 1, order + 1):
        vec = _create_phi_r(j, order) - origin
        basis_vectors.append(vec)
    X = np.array(basis_vectors).T
    v, _, _, _ = np.linalg.lstsq(X, theta - origin, rcond=None)
    phi_closest = X @ v + origin
    d_beta = np.linalg.norm(theta - phi_closest)

    return d_beta

def d2_matlab(sig, order):
    """
    Calculates the kernel weights based on the Yule-Walker equations with ACF.
    Translated from MATLAB of Hengenlab (https://github.com/hengenlab/d2_matlab/blob/main/d2_fromsam_matlab_non_figshare/d2_matlab.m)

    Args:
        sig (np.ndarray): The input signal (time series) as a NumPy array.
        order (int): The order of the autoregressive model.

    Returns:
        np.ndarray: The calculated kernel weights.
    """
    from statsmodels.tsa.stattools import acf
    full_acf = acf(sig, nlags=order, fft=True)
    R = toeplitz(full_acf[:-1])
    b = full_acf[1:]
    kern = np.linalg.solve(R, b)
    return (1 - np.sum(kern)) / np.sqrt(order)

def acr_fft(x):
    x_centered = x - np.mean(x)

    # FFT-based autocorrelation is much faster for large arrays
    n = len(x_centered)
    # Zero-padding to prevent circular correlation effects
    fft = np.fft.fft(x_centered, n=2*n)
    psd = np.abs(fft)**2
    acr = np.fft.ifft(psd).real[:n]

    # Normalize by the zero-lag value
    acr /= acr[0]

    return acr

def mark_d2(sig, order):
    """
    Optimized d2 calculation using scipy's toeplitz function.
    """
    # Use FFT-based autocorrelation
    acf = acr_fft(sig)[:order+1]
    order_lag_acf = acf[-1]
    acf = acf[1:-1]

    # Pre-allocate and use direct indexing for better performance
    r = np.concatenate([acf[::-1], [1], acf])

    # Create Toeplitz matrix with scipy's optimized function
    c = r[order-1:2*order-1]  # First column
    r_row = r[order-1::-1]    # First row
    R = toeplitz(c, r_row)

    # Solve linear system
    rhs = np.concatenate([r[order:], [order_lag_acf]])
    kern = np.linalg.solve(R, rhs)

    return (1 - np.sum(a=kern)) / np.sqrt(order)

def opt_sm_d2(sig, order):
    sig = sig.astype(np.float64)
    sig -= sig.mean()
    n = sig.shape[0]
    r = np.zeros(order+1, np.float64)
    r[0] = np.dot(sig, sig)
    for k in range(1, order+1):
        r[k] = np.dot(sig[0:-k], sig[k:])
    r /= n
    R = toeplitz(r[:-1])
    coefs = np.linalg.solve(R, r[1:])
    return (1 - np.sum(coefs)) / np.sqrt(order)

class D2Calculator:
    def __init__(self, dp_per_minute):
        """
        Initialize the D2Calculator with a specified data points per minute.
        Args:
            dp_per_minute (int): Number of data points per minute.
        """

        dp_per_minute = int(dp_per_minute)
        self.dp_per_minute = dp_per_minute

    def get_all_d2(self, window_size, step_size, data, order, d2_function=sm_d2, n_worker=-1, shuffle=False, mean=False, zscore=True, seed=None):
        """
        Calculate D2 values for a given time series data.

        Args:
            window_size (int): Size of the sliding window in minutes.
            step_size (int): Step size for the sliding window in minutes.
            data (np.ndarray): Input time series data.
            order (int): Order of the AR model.
            d2_function (function): Function to calculate D2, default is sm_d2.
            n_worker (int): Number of parallel workers, -1 for all available.
            shuffle (bool): If True, shuffle the data within each window.
            mean (bool): If True, subtract the mean from the data before processing.
            zscore (bool): If True, z-score the data before processing. (Advised)

        Returns:
            np.ndarray: Array of D2 values calculated for each window.
        """

        if mean:
            data = data - np.mean(data)
        if zscore:
            data = (data - np.mean(data)) / np.std(data)
        if seed is not None:
            np.random.seed(seed)
        actual_window_size = window_size * self.dp_per_minute
        actual_step_size = step_size * self.dp_per_minute
        n_jobs = len(data) - actual_window_size + 1
        job_data = range(0, n_jobs, actual_step_size)
        results = Parallel(n_jobs=n_worker)(
            delayed(d2_function)(
                (
                    data[start:start + actual_window_size].astype(np.float32)
                    if not shuffle
                    else np.random.permutation(data[start:start + actual_window_size]).astype(np.float32)
                ),
                order
            ) for start in job_data
        )
        return np.array(results)

    def get_all_dbeta(self, window_size, step_size, data, order, beta=2, dbeta_function=sm_dbeta, n_worker=-1, shuffle=False, mean=False, zscore=True, seed=None):
        """
        Calculate D beta values for a given time series data.

        Args:
            window_size (int): Size of the sliding window in minutes.
            step_size (int): Step size for the sliding window in minutes.
            data (np.ndarray): Input time series data.
            order (int): Order of the AR model.
            dbeta_function (function): Function to calculate D beta, default is sm_dbeta.
            n_worker (int): Number of parallel workers, -1 for all available.
            shuffle (bool): If True, shuffle the data within each window.
            mean (bool): If True, subtract the mean from the data before processing.
            zscore (bool): If True, z-score the data before processing. (Advised)

        Returns:
            np.ndarray: Array of D beta values calculated for each window.
        """
        if mean:
            data = data - np.mean(data)
        if zscore:
            data = (data - np.mean(data)) / np.std(data)
        if beta % 2 != 0 or beta < 2:
            raise ValueError("beta must be an even integer >= 2.")
        if seed is not None:
            np.random.seed(seed)
        actual_window_size = window_size * self.dp_per_minute
        actual_step_size = step_size * self.dp_per_minute
        n_jobs = len(data) - actual_window_size + 1
        job_data = range(0, n_jobs, actual_step_size)
        results = Parallel(n_jobs=n_worker)(
            delayed(dbeta_function)(
                (
                    data[start:start + actual_window_size].astype(np.float32)
                    if not shuffle
                    else np.random.permutation(data[start:start + actual_window_size]).astype(np.float32)
                ),
                order,
                beta
            ) for start in job_data
        )
        return np.array(results)

Example Usage

import d2
from test_d2_crt import calc_d2v1_loc
_ = d2_calculator.get_all_dbeta(
    window_size=15, step_size=1, data=first_row, order=i, beta=2, zscore=True
)
_ = d2_calculator.get_all_d2(
    window_size=15, step_size=1, data=first_row, order=i, d2_function=d2.sm_d2
)
_ = d2_calculator.get_all_d2(
    window_size=15, step_size=1, data=first_row, order=i, d2_function=calc_d2v1_loc
)

Result

Note Each value were calcuted from 5 minutes of data with 6000 data points per minute and a order of 10.

result