Back to Community
RSI Divergence Strategy

I'm working on this strategy that attempts to trade on bullish RSI divergence. It successfully finds divergence patterns however its clearly lacking on the trade-management and trade-frequency end. Any pointers?

thanks!

Clone Algorithm
74
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
#This algorithm attempts to enter the market at a bullish divergence in the RSI

import talib
import numpy as np
import math


def create_rsi_price_array(rsi,closelist):
    price_rsi = []

    prices_close_list = []

    prices_close_list = list(closelist.values.flatten())
    
    rsi = rsi.tolist()
    
    #print rsi
    #print prices_close_list
 
    for price,rsi in zip(rsi,prices_close_list):
        price_rsi.append([rsi,price])
    
    #print price_rsi
    return price_rsi
           

 
def bullish_divergence(price_rsi,percent_baseline,low):
                get_rsi = []
                get_price = []
                low_vals = []
                trough_vals = []
                #get array of just price
                for rsi in price_rsi:
                                rsi = rsi[1]
                                get_rsi.append(rsi)

                #get array of just rsi
                for price in price_rsi:
                                price = price[0]
                                get_price.append(price)


                for value in get_rsi:
                                if value < low:
                                                low_vals.append(value)
                #filter actual troughs from potentials 
                for item in low_vals:
                    try:
                        if get_rsi[get_rsi.index(item)-1] > item and get_rsi[get_rsi.index(item)+1] > item and get_rsi.index(item) != 0:

                            trough_vals.append(item)
                    except:
                        pass
                #check to see if potential pattern. A potential pattern means it is current and RSI is gaining strength (bullish)
                try:
                    if trough_vals[-1] == get_rsi[-2] and trough_vals[-1] > trough_vals[-2]:

                        delta = get_rsi[(get_rsi.index(trough_vals[-2])):(get_rsi.index(trough_vals[-1]))]
                        payload = []
                        for item in delta:
                                        if item >  trough_vals[-1] and item > trough_vals[-2]:
                                            difference = item - trough_vals[-2]
                                            percent_dif = difference/trough_vals[-2]

                                            if percent_dif >= percent_baseline:
       
                                                trough_vals = trough_vals[-2:]
                                                trough_one_index = (get_rsi.index(trough_vals[-1]))
                                                trough_two_index = (get_rsi.index(trough_vals[-2]))
                                                price_signal = get_price[trough_one_index]
                                                price_setup = get_price[trough_two_index]
                                                #confirm divergence by comparing price action
                                                if price_signal < price_setup:
                                                    payload.append(trough_vals)
                                                    payload.append(len(delta))
                                                    payload.append([price_setup,price_signal])
                                                    break
                    if len(payload) != 0:
                        return payload
                except:
                    pass
                return 

def initialize(context):
    context.max_notional = 100000
    context.stock = symbol("SPY")
    set_benchmark(context.stock)
    context.stop_price = 0
    # Trailing stop percentage    
    context.stop_pct = 0.95 
    
   
def handle_data(context, data):
    
    #variables
    lookback = 200
    lookback2 = 200  #period to lookback
    charttype = '1d' #day or minute chart
    percent_baseline = .5#RSI retracement amount
    low = 35#RSI low value 
    RSI_timeperiod = 3
    percent_gain_amt  = .90
    divergence_strength = 1
    

    prices_close = history(lookback, charttype, 'close_price')
    
    prices = history(lookback2,charttype, 'close_price')[context.stock]


    prices_close_day = history(10, '1d', 'close_price')
    prices_close_day = list(prices_close_day.values.flatten())
    prices_close_day = prices_close_day[-2]
    
    rsi = talib.RSI(prices, timeperiod=RSI_timeperiod)
   
    rsi_prices = create_rsi_price_array(rsi,prices_close)
    
    rsi_history = rsi.tolist()
    current_rsi = rsi_history[-1]
    
    
    num_shares = math.floor(context.max_notional / data[context.stock].close_price)
    if num_shares > 0:
        set_trailing_stop(context, data,num_shares,percent_gain_amt)
        #print num_shares
    if data[context.stock].price < context.stop_price:
        order_target(context.stock, 0)
        context.stop_price = 0
    
    currentshares = context.portfolio.positions[context.stock].amount
    
    divergence = bullish_divergence(rsi_prices,percent_baseline,low)

    
    if divergence != None and divergence[1] > 1 and currentshares == 0:
        troughs = divergence[0]
        trough_diff = troughs[1] - troughs[0]#used for calculate strength of divergence
        if trough_diff > divergence_strength:            
            order_target(context.stock, num_shares)
            print 'buying '+str(num_shares)


    record(price=data[context.stock].price, stop=context.stop_price)
        
def set_trailing_stop(context, data,num_shares,percent_gain_amt):
    if context.portfolio.positions[context.stock].amount:
        price = data[context.stock].price
        context.stop_price = max(context.stop_price, context.stop_pct * price) 

            
    
    
    
    
    
    
    
    
    
    
    
    
    
There was a runtime error.
3 responses

I switched it to use the order_target_percent method to make sure it gets fully invested. That should allow you to simplify a lot of the code in your algo. I would also try this on minute data to get a better idea of how it might perform in a live scenario.

Clone Algorithm
21
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
#This algorithm attempts to enter the market at a bullish divergence in the RSI

import talib
import numpy as np
import math


def create_rsi_price_array(rsi,closelist):
    price_rsi = []

    prices_close_list = []

    prices_close_list = list(closelist.values.flatten())
    
    rsi = rsi.tolist()
    
    #print rsi
    #print prices_close_list
 
    for price,rsi in zip(rsi,prices_close_list):
        price_rsi.append([rsi,price])
    
    #print price_rsi
    return price_rsi
           

 
def bullish_divergence(price_rsi,percent_baseline,low):
                get_rsi = []
                get_price = []
                low_vals = []
                trough_vals = []
                #get array of just price
                for rsi in price_rsi:
                                rsi = rsi[1]
                                get_rsi.append(rsi)

                #get array of just rsi
                for price in price_rsi:
                                price = price[0]
                                get_price.append(price)


                for value in get_rsi:
                                if value < low:
                                                low_vals.append(value)
                #filter actual troughs from potentials 
                for item in low_vals:
                    try:
                        if get_rsi[get_rsi.index(item)-1] > item and get_rsi[get_rsi.index(item)+1] > item and get_rsi.index(item) != 0:

                            trough_vals.append(item)
                    except:
                        pass
                #check to see if potential pattern. A potential pattern means it is current and RSI is gaining strength (bullish)
                try:
                    if trough_vals[-1] == get_rsi[-2] and trough_vals[-1] > trough_vals[-2]:

                        delta = get_rsi[(get_rsi.index(trough_vals[-2])):(get_rsi.index(trough_vals[-1]))]
                        payload = []
                        for item in delta:
                                        if item >  trough_vals[-1] and item > trough_vals[-2]:
                                            difference = item - trough_vals[-2]
                                            percent_dif = difference/trough_vals[-2]

                                            if percent_dif >= percent_baseline:
       
                                                trough_vals = trough_vals[-2:]
                                                trough_one_index = (get_rsi.index(trough_vals[-1]))
                                                trough_two_index = (get_rsi.index(trough_vals[-2]))
                                                price_signal = get_price[trough_one_index]
                                                price_setup = get_price[trough_two_index]
                                                #confirm divergence by comparing price action
                                                if price_signal < price_setup:
                                                    payload.append(trough_vals)
                                                    payload.append(len(delta))
                                                    payload.append([price_setup,price_signal])
                                                    break
                    if len(payload) != 0:
                        return payload
                except:
                    pass
                return 

def initialize(context):
    context.max_notional = 100000
    context.stock = symbol("SPY")
    set_benchmark(context.stock)
    context.stop_price = 0
    # Trailing stop percentage    
    context.stop_pct = 0.95 
    
   
def handle_data(context, data):
    
    #variables
    lookback = 200
    lookback2 = 200  #period to lookback
    charttype = '1d' #day or minute chart
    percent_baseline = .5#RSI retracement amount
    low = 35#RSI low value 
    RSI_timeperiod = 3
    percent_gain_amt  = .90
    divergence_strength = 1
    

    prices_close = history(lookback, charttype, 'close_price')
    
    prices = history(lookback2,charttype, 'close_price')[context.stock]


    prices_close_day = history(10, '1d', 'close_price')
    prices_close_day = list(prices_close_day.values.flatten())
    prices_close_day = prices_close_day[-2]
    
    rsi = talib.RSI(prices, timeperiod=RSI_timeperiod)
   
    rsi_prices = create_rsi_price_array(rsi,prices_close)
    
    rsi_history = rsi.tolist()
    current_rsi = rsi_history[-1]
    
    
    num_shares = math.floor(context.max_notional / data[context.stock].close_price)
    if num_shares > 0:
        set_trailing_stop(context, data,num_shares,percent_gain_amt)
        #print num_shares
    if data[context.stock].price < context.stop_price:
        order_target(context.stock, 0)
        context.stop_price = 0
    
    currentshares = context.portfolio.positions[context.stock].amount
    
    divergence = bullish_divergence(rsi_prices,percent_baseline,low)

    
    if divergence != None and divergence[1] > 1 and currentshares == 0:
        troughs = divergence[0]
        trough_diff = troughs[1] - troughs[0]#used for calculate strength of divergence
        if trough_diff > divergence_strength:            
            order_target_percent(context.stock, 1.0)
            print 'buying '+str(num_shares)


    record(price=data[context.stock].price, stop=context.stop_price)
        
def set_trailing_stop(context, data,num_shares,percent_gain_amt):
    if context.portfolio.positions[context.stock].amount:
        price = data[context.stock].price
        context.stop_price = max(context.stop_price, context.stop_pct * price) 

            
    
    
    
    
    
    
    
    
    
    
    
    
    
There was a runtime error.

Thanks David,
Looks like there may be some viability to it. I cloned your code and modified a couple of the variables. I'm going to try it in intraday and add bearish divergence to play on the down side as well .

Clone Algorithm
213
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
#This algorithm attempts to enter the market at a bullish divergence in the RSI

import talib
import numpy as np
import math


def create_rsi_price_array(rsi,closelist):
    price_rsi = []

    prices_close_list = []

    prices_close_list = list(closelist.values.flatten())
    
    rsi = rsi.tolist()
    
    #print rsi
    #print prices_close_list
 
    for price,rsi in zip(rsi,prices_close_list):
        price_rsi.append([rsi,price])
    
    #print price_rsi
    return price_rsi
           

 
def bullish_divergence(price_rsi,percent_baseline,low):
                get_rsi = []
                get_price = []
                low_vals = []
                trough_vals = []
                #get array of just price
                for rsi in price_rsi:
                                rsi = rsi[1]
                                get_rsi.append(rsi)

                #get array of just rsi
                for price in price_rsi:
                                price = price[0]
                                get_price.append(price)


                for value in get_rsi:
                                if value < low:
                                                low_vals.append(value)
                #filter actual troughs from potentials 
                for item in low_vals:
                    try:
                        if get_rsi[get_rsi.index(item)-1] > item and get_rsi[get_rsi.index(item)+1] > item and get_rsi.index(item) != 0:

                            trough_vals.append(item)
                    except:
                        pass
                #check to see if potential pattern. A potential pattern means it is current and RSI is gaining strength (bullish)
                try:
                    if trough_vals[-1] == get_rsi[-2] and trough_vals[-1] > trough_vals[-2]:

                        delta = get_rsi[(get_rsi.index(trough_vals[-2])):(get_rsi.index(trough_vals[-1]))]
                        payload = []
                        for item in delta:
                                        if item >  trough_vals[-1] and item > trough_vals[-2]:
                                            difference = item - trough_vals[-2]
                                            percent_dif = difference/trough_vals[-2]

                                            if percent_dif >= percent_baseline:
       
                                                trough_vals = trough_vals[-2:]
                                                trough_one_index = (get_rsi.index(trough_vals[-1]))
                                                trough_two_index = (get_rsi.index(trough_vals[-2]))
                                                price_signal = get_price[trough_one_index]
                                                price_setup = get_price[trough_two_index]
                                                #confirm divergence by comparing price action
                                                if price_signal < price_setup:
                                                    payload.append(trough_vals)
                                                    payload.append(len(delta))
                                                    payload.append([price_setup,price_signal])
                                                    break
                    if len(payload) != 0:
                        return payload
                except:
                    pass
                return 

def initialize(context):
    context.max_notional = 100000
    context.stock = symbol("SPY")
    set_benchmark(context.stock)
    context.stop_price = 0
    # Trailing stop percentage    
    context.stop_pct = 0.85 
  
   
def handle_data(context, data):
    
    #variables
    lookback = 200
    lookback2 = 200  #period to lookback
    charttype = '1d' #day or minute chart
    percent_baseline = .5#RSI retracement amount
    low = 30#RSI low value 
    RSI_timeperiod = 3
    percent_gain_amt  = .80
    divergence_strength = 0
    

    prices_close = history(lookback, charttype, 'close_price')
    
    prices = history(lookback2,charttype, 'close_price')[context.stock]


    prices_close_day = history(10, '1d', 'close_price')
    prices_close_day = list(prices_close_day.values.flatten())
    prices_close_day = prices_close_day[-2]
    
    rsi = talib.RSI(prices, timeperiod=RSI_timeperiod)
   
    rsi_prices = create_rsi_price_array(rsi,prices_close)
    
    rsi_history = rsi.tolist()
    current_rsi = rsi_history[-1]
    
    
    num_shares = math.floor(context.max_notional / data[context.stock].close_price)
    if num_shares > 0:
        set_trailing_stop(context, data,num_shares,percent_gain_amt)
        #print num_shares
    if data[context.stock].price < context.stop_price:
        order_target(context.stock, 0)
        context.stop_price = 0
    
    currentshares = context.portfolio.positions[context.stock].amount
    
    divergence = bullish_divergence(rsi_prices,percent_baseline,low)

    
    if divergence != None and divergence[1] > 1 and currentshares == 0:
        troughs = divergence[0]
        trough_diff = troughs[1] - troughs[0]#used for calculate strength of divergence
        if trough_diff > divergence_strength:            
            order_target_percent(context.stock, 1.0)
            print 'buying '+str(num_shares)


    record(price=data[context.stock].price, stop=context.stop_price)
        
def set_trailing_stop(context, data,num_shares,percent_gain_amt):
    if context.portfolio.positions[context.stock].amount:
        price = data[context.stock].price
        context.stop_price = max(context.stop_price, context.stop_pct * price) 

            
    
    
    
    
    
    
    
    
    
    
    
    
    
There was a runtime error.

Just migrated the deprecated code.

Clone Algorithm
86
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
#This algorithm attempts to enter the market at a bullish divergence in the RSI

import talib
import numpy as np
import math

def create_rsi_price_array(rsi,closelist):
    price_rsi = []
    prices_close_list = []
    prices_close_list = list(closelist.values.flatten())
    rsi = rsi.tolist()
    for price, rsi in zip(rsi,prices_close_list):
        price_rsi.append([rsi, price])  
    return price_rsi
           
def bullish_divergence(price_rsi,percent_baseline,low):
    get_rsi = []
    get_price = []
    low_vals = []
    trough_vals = []
    #get array of just price
    for rsi in price_rsi:
        rsi = rsi[1]
        get_rsi.append(rsi)

    #get array of just rsi
    for price in price_rsi:
        price = price[0]
        get_price.append(price)

    for value in get_rsi:
        if value < low:
            low_vals.append(value)
    #filter actual troughs from potentials 
    for item in low_vals:
        if get_rsi.index(item) < (200-1):
            if get_rsi[get_rsi.index(item)-1] > item and get_rsi[get_rsi.index(item)+1] > item and get_rsi.index(item) != 0:
                trough_vals.append(item)
    
    #check to see if potential pattern. A potential pattern means it is current and RSI is gaining strength (bullish) 
    if trough_vals[-1] == get_rsi[-2] and trough_vals[-1] > trough_vals[-2]:
        delta = get_rsi[(get_rsi.index(trough_vals[-2])):(get_rsi.index(trough_vals[-1]))]
        payload = []
        for item in delta:
            if item >  trough_vals[-1] and item > trough_vals[-2]:
                difference = item - trough_vals[-2]
                percent_dif = difference/trough_vals[-2]

                if percent_dif >= percent_baseline:
                    trough_vals = trough_vals[-2:]
                    trough_one_index = (get_rsi.index(trough_vals[-1]))
                    trough_two_index = (get_rsi.index(trough_vals[-2]))
                    price_signal = get_price[trough_one_index]
                    price_setup = get_price[trough_two_index]
                    #confirm divergence by comparing price action
                    if price_signal < price_setup:
                        payload.append(trough_vals)
                        payload.append(len(delta))
                        payload.append([price_setup,price_signal])
                        break
        if len(payload) != 0:
            return payload
    return       

def initialize(context):
    context.max_notional = 100000
    context.stock = symbol("SPY")
    set_benchmark(context.stock)
    context.stop_price = 0   
    context.stop_pct = 0.85 
       
def handle_data(context, data):
    
    percent_baseline = .5 #RSI retracement amount
    percent_gain_amt  = .80
    divergence_strength = 0
    
    prices_close =     data.history(context.stock,'close', 200, '1d').dropna() 
    price_close_day = data.history(context.stock,'close', 10, '1d').dropna() 
    
    price_close_day = list(price_close_day.values.flatten())
    price_close_day = price_close_day[-2]
    rsi = talib.RSI(prices_close, timeperiod=3)
    rsi_prices = create_rsi_price_array(rsi, prices_close) 
    
    num_shares = math.floor(context.max_notional / data.current(context.stock,'close'))
    if num_shares > 0:
        set_trailing_stop(context, data, num_shares, percent_gain_amt)
    
    if data.current(context.stock,'price') < context.stop_price:
        order_target(context.stock, 0)
        context.stop_price = 0
    currentshares = context.portfolio.positions[context.stock].amount 
    divergence = bullish_divergence(rsi_prices, percent_baseline, 30) 
    
    if divergence != None and divergence[1] > 1 and currentshares == 0:
        troughs = divergence[0]
        trough_diff = troughs[1] - troughs[0] #used for calculate strength of divergence
        if trough_diff > divergence_strength:            
            order_target_percent(context.stock, 1.0)
            print 'buying '+str(num_shares)
    record(price=data.current(context.stock,'price'), stop=context.stop_price)
        
def set_trailing_stop(context, data,num_shares,percent_gain_amt):
    if context.portfolio.positions[context.stock].amount > 0:
        price = data.current(context.stock,'price')
        context.stop_price = max(context.stop_price, context.stop_pct * price)
There was a runtime error.