Back to Community
Neural Network that tests for mean-reversion or momentum trending

This is a simple neural network, much of which is taken from here

It uses the Hurst Exponent and the Sharpe Ratio as inputs and trains for a small amount of days before actually using stock data. If the output of the neural network is < 0.2 there's hope that it's mean-reverting and if it's greater than 0.8, there's hope that it's momentum trending.

Things to note:

  • From my testing, it doesn't seem to be detecting momentum trending data when used with actual S&P
  • Bias with the S&P as it's been on-rise since the last three years
  • After a while, the output of the neural network seems to converge to -1, almost indefinitely, so limiting the amount of training days would be a good idea.

The algorithm definitely needs a lot of work so please feel free to play around with it and post back your suggestions.

Edit** The output seems to converge to -1 regardless of training depth, if anyone has some input on this, please feel free to contribute!

Much credit goes to Tom Starke
-Seong

Clone Algorithm
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
import numpy
import random

'''
    This part is exclusively for the data simulation/learning part
'''

class FeedForwardNetwork:
      
    def __init__(self, nIn, nHidden, nOut, hw = [], ow = []):
        # learning rate
        self.alpha = 0.1
                                                  
        # number of neurons in each layer
        self.nIn = nIn
        self.nHidden = nHidden
        self.nOut = nOut
 
        if not hw == [] and not ow == []:
            # initialize weights with previous data
            self.hWeights = hw 
            self.oWeights = ow
         
        else: 
            # initialize weights randomly (+1 for bias)
            self.hWeights = numpy.random.random((self.nHidden, self.nIn+1)) 
            self.oWeights = numpy.random.random((self.nOut, self.nHidden+1))
          
        # activations of neurons (sum of inputs)
        self.hActivation = numpy.zeros((self.nHidden, 1), dtype=float)
        self.oActivation = numpy.zeros((self.nOut, 1), dtype=float)
          
        # outputs of neurons (after sigmoid function)
        self.iOutput = numpy.zeros((self.nIn+1, nOut), dtype=float)      # +1 for bias
        self.hOutput = numpy.zeros((self.nHidden+1, nOut), dtype=float)  # +1 for bias
        self.oOutput = numpy.zeros((self.nOut), dtype=float)
          
        # deltas for hidden and output layer
        self.hDelta = numpy.zeros((self.nHidden), dtype=float)
        self.oDelta = numpy.zeros((self.nOut), dtype=float)   
      
    def forward(self, input_node):
        # set input as output of first layer (bias neuron = 1.0)
        self.iOutput[:-1, 0] = input_node
        self.iOutput[-1:, 0] = 1.0
          
        # hidden layer
        self.hActivation = numpy.dot(self.hWeights, self.iOutput)
        self.hOutput[:-1, :] = numpy.tanh(self.hActivation)
          
        # set bias neuron in hidden layer to 1.0
        self.hOutput[-1:, :] = 1.0
          
        # output layer
        self.oActivation = numpy.dot(self.oWeights, self.hOutput)
        self.oOutput = numpy.tanh(self.oActivation)
      
    def backward(self, teach):
        error = self.oOutput - numpy.array(teach, dtype=float) 
          
        # deltas of output neurons
        self.oDelta = (1 - numpy.tanh(self.oActivation)) * numpy.tanh(self.oActivation) * error
                  
        # deltas of hidden neurons
        self.hDelta = (1 - numpy.tanh(self.hActivation)) * numpy.tanh(self.hActivation) * numpy.dot(numpy.transpose(self.oWeights[:,:-1]), self.oDelta)
                  
        # apply weight changes
#        print self.hWeights, self.hDelta, self.iOutput.transpose()
        self.hWeights = self.hWeights - self.alpha * numpy.dot(self.hDelta, numpy.transpose(self.iOutput)) 
        self.oWeights = self.oWeights - self.alpha * numpy.dot(self.oDelta, numpy.transpose(self.hOutput))
#        koiuh
    def getOutput(self):
        return self.oOutput
    
def hurst(p):
    tau = []; lagvec = []
    #  Step through the different lags
    for lag in range(2,20):
        #  produce price difference with lag
        pp = numpy.subtract(p[lag:],p[:-lag])
        #  Write the different lags into a vector
        lagvec.append(lag)
        #  Calculate the variance of the differnce vector
        tau.append(numpy.sqrt(numpy.std(pp)))
    #  linear fit to double-log graph (gives power)
    m = numpy.polyfit(numpy.log10(lagvec),numpy.log10(tau),1)
    # calculate hurst
    hurst = m[0]*2
    return hurst
 
def sharpe(series):
    ret = numpy.divide(numpy.diff(series),series[:-1])
    return(numpy.mean(ret)/numpy.std(ret))
    
def simulate_coint(d, n, mu, sigma, start_point_X, start_point_Y):
#  This becomes a random walk if d = 0
    X = numpy.zeros(n)
    Y = numpy.zeros(n)
    #  These are the starting points of the random walk in y
    #  Be aware that X and Y are NOT coordinates but diffent series
    X[0] = start_point_X
    Y[0] = start_point_Y
    for t in range(1,n):
        #  Drunk and his dog cointegration equations
        X[t] = X[t-1] + random.gauss(mu,sigma);
        Y[t] = d*(X[t-1] - Y[t-1]) + Y[t-1] + random.gauss(mu,sigma);
    return X,Y,X - Y
 
def simulate_momentum_data(n,offset,sigma):
#  This becomes a random walk if offset is 0
    # produce the trending time series
    return numpy.cumsum([random.gauss(offset,sigma) for i in range(n)])

 
def teach():
    k = random.randint(0, 2)
    if k == 0:
        dummy, dummy, F = simulate_coint(0.3, 1000, 0, 0.5, 0.0, 0.0)
    elif k == 1:
        F = simulate_momentum_data(1000,0.1,0.9)
    elif k == 2:
        F = simulate_momentum_data(1000,0,0.9)
    return k, sharpe(F[1:]), hurst(F[1:])

''' 
    Data simulation ends here and calculation of hurst and sharpe begin here
'''
# Setting up the constant window for data length
constant_window = 40

def initialize(context):
    context.spy = sid(8554)
    context.days_traded = 0
    context.past_prices = []

    
    context.hw = []
    context.ow = []
    context.ffn = FeedForwardNetwork(2,8,1, context.hw, context.ow)
    
def handle_data(context,data):
    # Over time, the score is decreasing which should be the opposite
    # What I'm thinking is that handle_data acts like the while loop

    gather_prices(context, data, context.spy)

    ''' 
        Network Learning Phase for context.days_traded < 100
    '''
    if (context.days_traded < constant_window*.50):
        context.ffn = FeedForwardNetwork(2,8,1, context.hw, context.ow)
        true_count = 1
        untrue_count = 1
        uncertain_count = 1
        count = 0
        context.days_traded += 1
        while(count < 30):
            
            regime_desc, sharpe, hurst = teach()
            context.ffn.forward([sharpe, hurst])
            context.ffn.backward(regime_desc)
            
            f_output = context.ffn.getOutput()[0]
            
            if f_output >= 0.8 and regime_desc == 1: # Momentum data
                true_count +=1
            elif f_output <=0.2 and regime_desc == 0: # Mean Reverting
                true_count += 1
            elif f_output >=0.8 and regime_desc == 0 or f_output <0.2 and regime_desc==1:
                untrue_count += 1
            else:
                uncertain_count +=1
            total = float(uncertain_count) + float(true_count) + float(untrue_count)
            context.hw = context.ffn.hWeights
            context.ow = context.ffn.oWeights
            count +=1
    else:
        # As soon as the training wheels come it should be calibrated enough to launch
        regime_desc, sharpe, hurst = teach()
        context.ffn.forward([sharpe, hurst])
        # Setting the correction at 0 because the training has already been done and we don't have a way to measure the corrections after this
        context.ffn.backward(0)

        f_output = context.ffn.getOutput()[0]
        h_output = hurst_r(context,data)

        momentum = True if (f_output > 0.8 and h_output > 0.5) else False

        reverting = True if (f_output < 0.2 and h_output < 0.5) else False

        avg_price = data[context.spy].mavg(30)
        
        if data[context.spy].price < avg_price*.95:
            if momentum:
                order(context.spy, -10000)
                log.debug("Selling spy -10000 momentum")
            elif reverting:
                order(context.spy, 10000)
                log.debug("Ordering spy +10000 reverting") 
            else:
                return
        elif data[context.spy].price > avg_price*1.05:
            if momentum:
                order(context.spy, 10000)
                log.debug("Ordering spy +10000 momentum")
            elif reverting:
                order(context.spy, -10000)
                log.debug("Selling spy -10000 reverting")
            else:
                return
        
        

        ''' 
            So if the Hurst is less than 0.5 e.g. it means that the series will be moving in the opposite direction than before
            If the Hurst is greater than 0.5 then it will be moving in the same direction as before// How to get the direction of the previous series
            The question now is, how do we get the 
        '''

def gather_prices(context, data, sid):
    context.past_prices.append(data[sid].price)
    if len(context.past_prices) > constant_window:
        context.past_prices.pop(0)
    return 

'''
    Both hurst and sharpe will only return values if the length is greater than
    .80 * constant_window so in this case, 80 days
'''

def hurst_r(context, data):
    # Checks whether data exists
    if len(context.past_prices) < constant_window*.80:
        return
    
    tau, lagvec = [], []
    # Step through the different lags
    for lag in range(2,20):  
        # Produce price different with lag
        pp = numpy.subtract(context.past_prices[lag:],context.past_prices[:-lag])
        # Write the different lags into a vector
        lagvec.append(lag)
        # Calculate the variance of the difference
        tau.append(numpy.sqrt(numpy.std(pp)))
    # Linear fit to a double-log graph to get power
    m = numpy.polyfit(numpy.log10(lagvec),numpy.log10(tau),1)
    # Calculate hurst
    hurst = m[0]*2
    
    return hurst

def sharpe_r(context, data):
    # This sharpe takes the sharpe at the END of the period and not on a rolling basis
    if len(context.past_prices) < int(constant_window*.80):
        return
    returns = numpy.divide(numpy.diff(context.past_prices), context.past_prices[:-1])
    mean = numpy.mean(returns)
    std = numpy.std(returns)
    sharpe = mean/std
    # Sharpe * sqrt(number of periods in a year)
    return sharpe*numpy.sqrt(4)
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.
Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

12 responses

Hi Seong,
I haven't had time to look over your algo yet but the returns seem to be way too high. This post here points out what the problem might be.

Tom

Limited leverage a bit and the returns have cooled down a lot. Definitely looking for a better way to check the validity of the output, open to suggestions!

-Seong

Clone Algorithm
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
import numpy
import random

'''
    This part is exclusively for the data simulation/learning part
'''

class FeedForwardNetwork:
      
    def __init__(self, nIn, nHidden, nOut, hw = [], ow = []):
        # learning rate
        self.alpha = 0.1
                                                  
        # number of neurons in each layer
        self.nIn = nIn
        self.nHidden = nHidden
        self.nOut = nOut
 
        if not hw == [] and not ow == []:
            # initialize weights with previous data
            self.hWeights = hw 
            self.oWeights = ow
         
        else: 
            # initialize weights randomly (+1 for bias)
            self.hWeights = numpy.random.random((self.nHidden, self.nIn+1)) 
            self.oWeights = numpy.random.random((self.nOut, self.nHidden+1))
          
        # activations of neurons (sum of inputs)
        self.hActivation = numpy.zeros((self.nHidden, 1), dtype=float)
        self.oActivation = numpy.zeros((self.nOut, 1), dtype=float)
          
        # outputs of neurons (after sigmoid function)
        self.iOutput = numpy.zeros((self.nIn+1, nOut), dtype=float)      # +1 for bias
        self.hOutput = numpy.zeros((self.nHidden+1, nOut), dtype=float)  # +1 for bias
        self.oOutput = numpy.zeros((self.nOut), dtype=float)
          
        # deltas for hidden and output layer
        self.hDelta = numpy.zeros((self.nHidden), dtype=float)
        self.oDelta = numpy.zeros((self.nOut), dtype=float)   
      
    def forward(self, input_node):
        # set input as output of first layer (bias neuron = 1.0)
        self.iOutput[:-1, 0] = input_node
        self.iOutput[-1:, 0] = 1.0
          
        # hidden layer
        self.hActivation = numpy.dot(self.hWeights, self.iOutput)
        self.hOutput[:-1, :] = numpy.tanh(self.hActivation)
          
        # set bias neuron in hidden layer to 1.0
        self.hOutput[-1:, :] = 1.0
          
        # output layer
        self.oActivation = numpy.dot(self.oWeights, self.hOutput)
        self.oOutput = numpy.tanh(self.oActivation)
      
    def backward(self, teach):
        error = self.oOutput - numpy.array(teach, dtype=float) 
          
        # deltas of output neurons
        self.oDelta = (1 - numpy.tanh(self.oActivation)) * numpy.tanh(self.oActivation) * error
                  
        # deltas of hidden neurons
        self.hDelta = (1 - numpy.tanh(self.hActivation)) * numpy.tanh(self.hActivation) * numpy.dot(numpy.transpose(self.oWeights[:,:-1]), self.oDelta)
                  
        # apply weight changes
#        print self.hWeights, self.hDelta, self.iOutput.transpose()
        self.hWeights = self.hWeights - self.alpha * numpy.dot(self.hDelta, numpy.transpose(self.iOutput)) 
        self.oWeights = self.oWeights - self.alpha * numpy.dot(self.oDelta, numpy.transpose(self.hOutput))
#        koiuh
    def getOutput(self):
        return self.oOutput
    
def hurst(p):
    tau = []; lagvec = []
    #  Step through the different lags
    for lag in range(2,20):
        #  produce price difference with lag
        pp = numpy.subtract(p[lag:],p[:-lag])
        #  Write the different lags into a vector
        lagvec.append(lag)
        #  Calculate the variance of the differnce vector
        tau.append(numpy.sqrt(numpy.std(pp)))
    #  linear fit to double-log graph (gives power)
    m = numpy.polyfit(numpy.log10(lagvec),numpy.log10(tau),1)
    # calculate hurst
    hurst = m[0]*2
    return hurst
 
def sharpe(series):
    ret = numpy.divide(numpy.diff(series),series[:-1])
    return(numpy.mean(ret)/numpy.std(ret))
    
def simulate_coint(d, n, mu, sigma, start_point_X, start_point_Y):
#  This becomes a random walk if d = 0
    X = numpy.zeros(n)
    Y = numpy.zeros(n)
    #  These are the starting points of the random walk in y
    #  Be aware that X and Y are NOT coordinates but diffent series
    X[0] = start_point_X
    Y[0] = start_point_Y
    for t in range(1,n):
        #  Drunk and his dog cointegration equations
        X[t] = X[t-1] + random.gauss(mu,sigma);
        Y[t] = d*(X[t-1] - Y[t-1]) + Y[t-1] + random.gauss(mu,sigma);
    return X,Y,X - Y
 
def simulate_momentum_data(n,offset,sigma):
#  This becomes a random walk if offset is 0
    # produce the trending time series
    return numpy.cumsum([random.gauss(offset,sigma) for i in range(n)])

 
def teach():
    k = random.randint(0, 2)
    if k == 0:
        dummy, dummy, F = simulate_coint(0.3, 1000, 0, 0.5, 0.0, 0.0)
    elif k == 1:
        F = simulate_momentum_data(1000,0.1,0.9)
    elif k == 2:
        F = simulate_momentum_data(1000,0,0.9)
    return k, sharpe(F[1:]), hurst(F[1:])

''' 
    Data simulation ends here and calculation of hurst and sharpe begin here
'''
# Setting up the constant window for data length
constant_window = 40

def initialize(context):
    context.spy = sid(8554)
    context.days_traded = 0
    context.past_prices = []

    
    context.hw = []
    context.ow = []
    context.ffn = FeedForwardNetwork(2,8,1, context.hw, context.ow)

    context.min_notional = 0
    context.max_notional = 1000000
    
def handle_data(context,data):
    notional = context.portfolio.positions_value
    gather_prices(context, data, context.spy)

    ''' 
        Network Learning Phase for context.days_traded < 100
    '''
    if (context.days_traded < constant_window*.50):
        context.ffn = FeedForwardNetwork(2,8,1, context.hw, context.ow)
        true_count = 1
        untrue_count = 1
        uncertain_count = 1
        count = 0
        context.days_traded += 1
        while(count < 30):
            
            regime_desc, sharpe, hurst = teach()
            context.ffn.forward([sharpe, hurst])
            context.ffn.backward(regime_desc)
            
            f_output = context.ffn.getOutput()[0]
            
            if f_output >= 0.8 and regime_desc == 1: # Momentum data
                true_count +=1
            elif f_output <=0.2 and regime_desc == 0: # Mean Reverting
                true_count += 1
            elif f_output >=0.8 and regime_desc == 0 or f_output <0.2 and regime_desc==1:
                untrue_count += 1
            else:
                uncertain_count +=1
            total = float(uncertain_count) + float(true_count) + float(untrue_count)
            context.hw = context.ffn.hWeights
            context.ow = context.ffn.oWeights
            count +=1
    else:
        # As soon as the training wheels come it should be calibrated enough to launch
        regime_desc, sharpe, hurst = teach()
        context.ffn.forward([sharpe, hurst])
        # Setting the correction at 0 because the training has already been done and we don't have a way to measure the corrections after this
        context.ffn.backward(0)

        f_output = context.ffn.getOutput()[0]
        h_output = hurst_r(context,data)

        momentum = True if (f_output > 0.8 and h_output > 0.5) else False

        reverting = True if (f_output < 0.2 and h_output < 0.5) else False

        avg_price = data[context.spy].mavg(30)
        
        if data[context.spy].price < avg_price*.95:
            if momentum and notional > context.min_notional:
                order(context.spy, -10000)
                log.debug("Selling spy -10000 momentum")
            elif reverting and notional < context.max_notional:
                order(context.spy, 10000)
                log.debug("Ordering spy +10000 reverting") 
            else:
                return
        elif data[context.spy].price > avg_price*1.05:
            if momentum and notional < context.max_notional:
                order(context.spy, 10000)
                log.debug("Ordering spy +10000 momentum")
            elif reverting and notional > context.min_notional:
                order(context.spy, -10000)
                log.debug("Selling spy -10000 reverting")
            else:
                return
        
        

        ''' 
            So if the Hurst is less than 0.5 e.g. it means that the series will be moving in the opposite direction than before
            If the Hurst is greater than 0.5 then it will be moving in the same direction as before// How to get the direction of the previous series
            The question now is, how do we get the 
        '''

def gather_prices(context, data, sid):
    context.past_prices.append(data[sid].price)
    if len(context.past_prices) > constant_window:
        context.past_prices.pop(0)
    return 

'''
    Both hurst and sharpe will only return values if the length is greater than
    .80 * constant_window so in this case, 80 days
'''

def hurst_r(context, data):
    # Checks whether data exists
    if len(context.past_prices) < constant_window*.80:
        return
    
    tau, lagvec = [], []
    # Step through the different lags
    for lag in range(2,20):  
        # Produce price different with lag
        pp = numpy.subtract(context.past_prices[lag:],context.past_prices[:-lag])
        # Write the different lags into a vector
        lagvec.append(lag)
        # Calculate the variance of the difference
        tau.append(numpy.sqrt(numpy.std(pp)))
    # Linear fit to a double-log graph to get power
    m = numpy.polyfit(numpy.log10(lagvec),numpy.log10(tau),1)
    # Calculate hurst
    hurst = m[0]*2
    
    return hurst

def sharpe_r(context, data):
    # This sharpe takes the sharpe at the END of the period and not on a rolling basis
    if len(context.past_prices) < int(constant_window*.80):
        return
    returns = numpy.divide(numpy.diff(context.past_prices), context.past_prices[:-1])
    mean = numpy.mean(returns)
    std = numpy.std(returns)
    sharpe = mean/std
    # Sharpe * sqrt(number of periods in a year)
    return sharpe*numpy.sqrt(4)
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.

Hello Seong,

Does this algo give a different result each time it is run? That is what I find although some returns (23.8%, 70.4%) do re-occur.

Regards,

Peter

It is normal to get different results with neural networks. The network can be slightly different every time you train it. Even with the same data.

I guess that itself (consistency of results) might be an indicator of the strength of a model.

Seong...how can you make this constant... so that it ill be reliable....https://www.quantopian.com/posts/neural-network-that-tests-for-mean-reversion-or-momentum-trending

John, you could try to set the random seed with numpy.random.seed() or do a static initialization of the weights.

Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

wat line... do I substitute that numpy.random.seed()??

In initialize you would put numpy.random.seed(123). http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.seed.html

already put numpy.random.seed(123). @ initialize its still random...

I kinda did that before. The biggest problem here is that it could be quite confusing when the ranges of the output result are small, say, 0.2-0.21, or , 0.6-0.62. How are you gonna distinguish btw a momentum and a trend with these paras?

You use tanh as your activation function but then use the derivative of sigmoid function for back propagation. How is that valid?

Why have the S&P confirmation bias been on the rise, also is there a way to integrate Bayes' theorem for false positives or trading constraints?