Back to Community
Hurst Exponent

Following is a generic implementation of the Hurst Exponent. Hope this would be helpful. Also please share application of it on live trading.

Loading notebook preview...
Notebook previews are currently unavailable.
1 response

This is an initial attempt to use it.

Clone Algorithm
11
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
import numpy as np
import pandas as pd
import numbers
import statsmodels.api as sm
from  scipy.spatial.distance import euclidean
from scipy.special import erf
    
# adopted from https://www.mathworks.com/matlabcentral/fileexchange/36487-weighted-generalized-hurst-exponent
def genhurstw(S, q = 1.0, delta = np.inf, maxT = 20, minT = 5, ddim = 0):
    import numpy as np
    import pandas as pd
    import numbers

    q        = float(q) if isinstance(q, numbers.Integral) else q
    delta    = float(delta) if isinstance(delta, numbers.Integral) else delta

    S        = np.array(S) if isinstance(S, list) else (S.values if isinstance(S, (pd.DataFrame, pd.Series, pd.Panel, pd.Panel4D)) else S)
    q        = np.array(q) if isinstance(q, (list, numbers.Number)) else (q.values.copy() if isinstance(q, (pd.DataFrame, pd.Series, pd.Panel, pd.Panel4D)) else q)
    delta    = np.array(delta) if isinstance(delta, (list, numbers.Number)) else (delta.values.copy() if isinstance(delta, (pd.DataFrame, pd.Series, pd.Panel, pd.Panel4D)) else delta)

    lpidxd   = S.shape[:ddim]
    rpidxd   = S.shape[(ddim + 1):]
    lpidxi   = (slice(None, None, None),) * ddim
    rpidxi   = (slice(None, None, None),) * (S.ndim - ddim - 1)
    rs1d     = ((1,) * ddim) + (-1,) + ((1,) * (S.ndim - ddim - 1))

    q        = q.reshape(-1).reshape(rs1d)
    delta    = delta.reshape(-1).reshape(rs1d)

    L        = S.shape[ddim]
    lq       = q.shape[ddim]
    deltas   = delta.shape[ddim]
    H        = np.empty(lpidxd + (maxT - minT + 1, lq, deltas) + rpidxd, dtype = float)

    cc       = np.empty(lpidxd + (2,) + rpidxd, dtype = float)

    k        = 0
    for Tmax in range(minT, maxT + 1):
        x = np.arange(1, Tmax + 1, dtype = float).reshape(rs1d)
        mcord = np.empty(lpidxd + (Tmax, lq, deltas) + rpidxd, dtype = float)

        for tt in range(1, Tmax + 1):
            dV    = S[lpidxi + (slice(tt, L, tt),) + rpidxi] - S[lpidxi + (slice(0, L - tt, tt),) + rpidxi]
            VV    = S[lpidxi + (slice(0, L, tt),) + rpidxi]

            N     = float(dV.shape[ddim]) + 1.0
            Np1   = N + 1.0

            X     = np.arange(1.0, Np1, dtype = float).reshape(rs1d)
            Y     = VV


            mx    = np.sum(X, axis = ddim) / N
            SSxx  = np.sum(X ** 2, axis = ddim) - N * (mx ** 2)

            my    = np.sum(Y, axis = ddim) / N
            SSxy  = np.sum(X * Y, axis = ddim) - N * mx * my


            cc[lpidxi + (0,) + rpidxi] = SSxy/SSxx
            cc[lpidxi + (1,) + rpidxi] = my - cc[lpidxi + (0,) + rpidxi] * mx

            ddVd  = dV - cc[lpidxi + (0,) + rpidxi]
            VVVd  = VV - cc[lpidxi + (0,) + rpidxi] * X - cc[lpidxi + (1,) + rpidxi] # X = 1:N

            for d in range(0, deltas):
                cdelta  = delta[lpidxi + (d,) + rpidxi]

                if np.isfinite(cdelta):
                    # extra coputation but this gives the same results as below for Inf values

                    aph = 1.0 / cdelta

                    r   = np.arange(0, Np1, dtype = float).reshape(rs1d)
                    w   = np.exp(-aph * r[lpidxi + (slice(0, -2),) + rpidxi]) * (1.0 - np.exp(-aph)) / (1.0 - np.exp(-aph * (N - 1)))
                    w1  = np.exp(-aph * r[lpidxi + (slice(0, -1),) + rpidxi]) * (1.0 - np.exp(-aph)) / (1.0 - np.exp(-aph * N))

                    for qq in range(0, lq):
                        mcord[lpidxi + (tt - 1, qq, d) + rpidxi] = np.sum(w * (np.abs(ddVd) ** q[lpidxi + (qq,) + rpidxi]), axis = ddim) / np.sum(w1 * (np.abs(VVVd) ** q[lpidxi + (qq,) + rpidxi]), axis = ddim)
                else:
                    for qq in range(0, lq):
                        mcord[lpidxi + (tt - 1, qq, d) + rpidxi] = np.mean(np.abs(ddVd) ** q[lpidxi + (qq,) + rpidxi], axis = ddim) / np.mean(np.abs(VVVd) ** q[lpidxi + (qq,) + rpidxi], axis = ddim)

        mx   = np.mean(np.log10(x), axis = ddim)
        SSxx = np.sum(np.log10(x) ** 2, axis = ddim) - Tmax * (mx ** 2)
        for d in range(0, deltas):
            for qq in range(0, lq):
                my   = np.mean(np.log10(mcord[lpidxi + (slice(None), qq, d) + rpidxi]), axis = ddim)
                SSxy = np.sum(np.log10(x) * np.log10(mcord[lpidxi + (slice(None), qq, d) + rpidxi]), axis = ddim)  - Tmax * mx * my
                H[lpidxi + (k, qq, d) + rpidxi] = SSxy/SSxx

        k = k + 1

    mH = np.empty(lpidxd + (lq,) + rpidxd + (deltas,), dtype = float)
    sH = np.empty(lpidxd + (lq,) + rpidxd + (deltas,), dtype = float)
    
    for d in range(0, deltas):
        for qq in range(0, lq):
            mH[lpidxi + (qq,) + rpidxi + (d,)] = np.mean(H[lpidxi + (slice(None), qq, d) + rpidxi], axis = ddim) / q[lpidxi + (qq,) + rpidxi]
            sH[lpidxi + (qq,) + rpidxi + (d,)] = np.std(H[lpidxi + (slice(None), qq, d) + rpidxi], axis = ddim) / q[lpidxi + (qq,) + rpidxi]

    return mH, sH
    
def initialize(context):
    context.assets = symbols('SPY', 'XLP', 'XLE', 'XLK', 'XLY', 'XLU', 'XLB', 'XLF', 'XLV','XLI', 'MDY', 'EWQ', 'EWA', 'EWC', 'EWG', 'EWH', 'EWJ', 'EWW', 'EWS', 'EWU', 'EZU', 'EFA')
    context.numstks = float(len(context.assets))
    
    context.w       = pd.Series(data = 0.0, index = context.assets)
    
    context.z       = pd.Series(data = 0.0, index = context.assets)
    
    context.sig     = pd.Series(data = 0.0, index = context.assets)
    
    context.lngth   = 252.0
    
    schedule_function(func = trade, date_rule = date_rules.every_day(), time_rule = time_rules.market_open(hours = 0, minutes = 1), half_days = True)
    
def before_trading_start(context, data):
    p = data.history(context.assets, "price", int(context.lngth) + 1, "1d")
    p.fillna(method = 'ffill', inplace = True)
    p.fillna(method = 'bfill', inplace = True)
    
    lnp = np.log(p)
    r   = (lnp - lnp.shift(1)).iloc[1:, :]
    p   = p.iloc[1:, :]
    
    mH, sH = genhurstw(p, delta = 22.0)
    
    nm = erf((mH - 0.5) / sH).squeeze()
    
    z = erf((p - p.mean()) / p.std(axis = 0))
    zmid = z.quantile(q = 0.5, axis = 0)
    zl = np.fmin(z.quantile(q = 0.25, axis = 0).abs(), z.quantile(q = 0.75, axis = 0).abs())
    
    z = z.iloc[-1, :]
    
    sig = ((context.z < -zl) & (z >= -zl)).astype(float) - ((context.z > zl) & (z <= zl)).astype(float)
    
    context.z.iloc[:] = z
    
    context.sig = (sig == 0.0).astype(float) * context.sig + sig
    
    w = context.sig * nm / context.numstks
    
    w = (w == 0.0).astype(float) * np.sign(z - zmid) * nm / context.numstks + w
    
    context.w.iloc[:] = w
    
def trade(context, data):
    cantrade = data.can_trade(context.assets)
    for i in context.assets:
        oo = get_open_orders(i)

        if len(oo) == 0 and cantrade.loc[i]:
            w = context.portfolio.positions[i].amount * context.portfolio.positions[i].last_sale_price / context.portfolio.portfolio_value
            if np.abs(w - context.w.loc[i]) > 0.02:
                order_target_percent(i, context.w.loc[i])
    
def handle_data(context,data):
    pass
There was a runtime error.