Back to Community
Donchian Channel Breakout Strategy

Hi. I'm brand new to this community and the python programming, and excited to learn this. Is anyone using breakout strategy? I'm wondering if Donchian Channel Breakout trading strategy code available somewhere or someone could help with this. The trading algo is:

1) ADX should be rising.
2) Buy if the price breaks 55-day channel with stop loss at 20-day price channel.
3) Stop should be updated using 20-day channel as price moves up.
4) Liquidate everything if ADX is over 40 and ADX starts to go down.

Any help would be appreciated. Thanks!

9 responses

Well, it is easy enough to implement, but the performance is not great. It trades in little bursts and stays in cash too long - sometimes for years.

Clone Algorithm
230
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Donchian Channel Breakout Strategy
# Michelle Cho
# Hi. I'm brand new to this community and the python programming, and excited to learn this. Is anyone using breakout strategy? I'm wondering if Donchian Channel Breakout trading strategy code available somewhere or someone could help with this. The trading algo is:
#
# 1) ADX should be rising.
# 2) Buy if the price breaks 55-day channel with stop loss at 20-day price channel.
# 3) Stop should be updated using 20-day channel as price moves up.
# 4) Liquidate everything if ADX is over 40 and ADX starts to go down.

# Any help would be appreciated. Thanks!

import pandas as pd
import math
adx = ta.ADX(timeperiod=14)
max_buy = ta.MAX(timeperiod=55)
min_stop = ta.MIN(timeperiod=20)
# Try to make this so it will run in minute or daily modes
def initialize(context):
    context.trade_me = sid(21519)
    context.last_adx = None
    context.stop = None
    
    set_benchmark(context.trade_me)

def handle_data(context, data):
    
    adxs = adx(data)
    maxs = max_buy(data)
    mins = min_stop(data)
    if pd.isnull(adxs):
        # skip frame until warmup ends
        return
    elif context.last_adx is None or context.last_min is None:
        # fill last_adx and last_min if empty
        context.last_adx = adxs
        context.last_min = mins
        return
    
    #record(adx=adxs[context.trade_me]/10.0)
    record(max=math.log(maxs[context.trade_me]))
    record(min=math.log(mins[context.trade_me]))
    p = data[context.trade_me].price
    
    record(p=math.log(p))
    
    n = context.portfolio.positions[context.trade_me].amount
    
    adx_change = adxs[context.trade_me] - context.last_adx[context.trade_me]
    
    if n < 1 and not context.stop is None:
        # must have hit the stop
        this_order = get_order(context.stop)
        log.info('Must have hit stop. {} shares @ {}'.format(this_order.amount, this_order.limit))
        context.stop = None
    
    if adx_change > 0.0 and p >= maxs[context.trade_me] and n < 1:
        # go all-in
        order_target_percent(context.trade_me, 1.0)
        # set the stop-loss
        context.stop = order_target_percent(context.trade_me, 0.0, style=LimitOrder(mins[context.trade_me]))
        log.info('ADX: {:0.2f}, changes: {:0.2f}, max: {}, Bought @ {}, set stop @ {}'.\
            format(adxs[context.trade_me],adx_change,maxs[context.trade_me],p,mins[context.trade_me]))
    elif adxs[context.trade_me] > 40.0 and adx_change < 0.0 and n > 0:
        # exit positions
        order_target_percent(context.trade_me, 0.0)
        cancel_order(context.stop)
        context.stop = None
        log.info('ADX: {:0.2f}, changes: {:0.2f}, max: {}, min: {}, exit @ {}'.\
            format(adxs[context.trade_me],adx_change,maxs[context.trade_me],mins[context.trade_me],p))
    elif mins[context.trade_me] > context.last_min[context.trade_me] and n > 0:
        # cancel last stop-loss
        cancel_order(context.stop)
        # update the stop_loss
        context.stop = order_target_percent(context.trade_me, 0.0, style=LimitOrder(mins[context.trade_me]))
        log.info('ADX: {:0.2f}, changes: {:0.2f}, max: {}, p: {}, set new stop @ {}'.\
            format(adxs[context.trade_me],adx_change,maxs[context.trade_me],p,mins[context.trade_me]))
There was a runtime error.

Wow. Thanks, Ray. This will help me learn the code a lot faster. BTW, when I said ADX should be rising, I meant ADX today > ADX previous day when channel breakout occurs. Just wondering if the code reflects what I intended because I think it should give more signals. Thanks for all your help!

I think I have accounted for the ADX rising. A condition that I buy is that this be positive:

    adx_change = adxs[context.trade_me] - context.last_adx[context.trade_me]  

Yes, that will do it. Looking at the log, it seems ADX and price info are different than the ones on other chart service. Do you know how I can see the actual price (high, low, open, close) that quantopian provides and adx values for each day? Thanks!

If you look at my code for guidance, you can use the "record" function to plot anything you would like and the "log" function to spit out any numbers you would like. The ADX can be set to use any time period. I used 14 days because you did not specify. You can access a bunch of things by inspecting the "data" dictionary inside of the handle_data function. It contains the following data on every security in your universe: datetime, price, open_price, close_price, high, low, volume

So back to my example, to spit out a chart with high and low for each day:

record(high=data[context.trade_me].high)  
record(low=data[context.trade_me].low)  

The data variable passed to the handle_data function will contain the info for the *previous bar in a backtest. The attached backtest demos the data that is available. You might also want to check out the help docs

*UPDATE: The data variable is the data from the previous bar (not the current bar). For example, if you're running a call of handle_data at 3:01PM, and have data[stock].close_price, it can't know the close_price until the end of the minute bar (ie 3:01:59). Instead it returns the close price of the previous minute bar at 3:00:59.

Clone Algorithm
13
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Put any initialization logic here.  The context object will be passed to
# the other methods in your algorithm.
def initialize(context):
    pass

# Will be called on every trade event for the securities you specify. 
def handle_data(context, data):
    spy = symbol('SPY')
    
    # Accessing data can be done two ways
    
    # Dot notation
    log.info('current price = %s' % data[spy].price)
    
    # Like a dictionary
    log.info('volume = %s' % data[spy]['volume'])
    
    # Other data available 
    log.info('Open price = %s' % data[spy].open_price)
    log.info('Close price = %s' % data[spy].close_price) # Same as current price
    log.info('High price = %s' % data[spy].high)
    log.info('Low price = %s' % data[spy].low)
    log.info('datetime of last trade = %s' % data[spy].datetime)
    
    # Simple transformations available 
    log.info('5 day moving avg = %s' % data[spy].mavg(5))
    log.info('5 day standard deviation = %s' % data[spy].stddev(5))
    log.info('5 day volume weighted average price = %s' % data[spy].vwap(5))
    log.info('Returns since close yesterday = %s' % data[spy].returns())
    
    
             
             
             
             
             
             
             
             
             
             
             
             
             
             
             
             
             
             
             
There was a runtime error.

This is a pretty old posting, so hopefully I will still get a response. I am new to Python, so maybe that is what is causing my lack of results.
I cloned the first code above , by Ray Cathert. It produced an error, which had to do with an empty array.

Here's the fix I added

if adxs.empty:  
    ## if pd.isnull(adxs):

At that point the code compiled. However, when I run it , it never triggers any buys. What am I doing wrong?
New to this forum, so don't know if I should be posting code algo here, even though the only change is shown above.

Thanks

They have updated Pandas and that broke this algo. The fix is trivial:

Line 30 goes from:

if pd.isnull(adxs): to
if pd.isnull(adxs).all():

Attached is the latest backtest.

Clone Algorithm
230
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Donchian Channel Breakout Strategy
# Michelle Cho
# Hi. I'm brand new to this community and the python programming, and excited to learn this. Is anyone using breakout strategy? I'm wondering if Donchian Channel Breakout trading strategy code available somewhere or someone could help with this. The trading algo is:
#
# 1) ADX should be rising.
# 2) Buy if the price breaks 55-day channel with stop loss at 20-day price channel.
# 3) Stop should be updated using 20-day channel as price moves up.
# 4) Liquidate everything if ADX is over 40 and ADX starts to go down.

# Any help would be appreciated. Thanks!

import pandas as pd
import math
adx = ta.ADX(timeperiod=14)
max_buy = ta.MAX(timeperiod=55)
min_stop = ta.MIN(timeperiod=20)
# Try to make this so it will run in minute or daily modes
def initialize(context):
    context.trade_me = sid(21519)
    context.last_adx = None
    context.stop = None
    
    set_benchmark(context.trade_me)

def handle_data(context, data):
    
    adxs = adx(data)
    maxs = max_buy(data)
    mins = min_stop(data)
    if pd.isnull(adxs).all():
        # skip frame until warmup ends
        return
    elif context.last_adx is None or context.last_min is None:
        # fill last_adx and last_min if empty
        context.last_adx = adxs
        context.last_min = mins
        return
    
    #record(adx=adxs[context.trade_me]/10.0)
    record(max=math.log(maxs[context.trade_me]))
    record(min=math.log(mins[context.trade_me]))
    p = data[context.trade_me].price
    
    record(p=math.log(p))
    n = context.portfolio.positions[context.trade_me].amount
    
    adx_change = adxs[context.trade_me] - context.last_adx[context.trade_me]
    
    if n < 1 and not context.stop is None:
        # must have hit the stop
        this_order = get_order(context.stop)
        log.info('Must have hit stop. {} shares @ {}'.format(this_order.amount, this_order.limit))
        context.stop = None
    
    if adx_change > 0.0 and p >= maxs[context.trade_me] and n < 1:
        # go all-in
        order_target_percent(context.trade_me, 1.0)
        # set the stop-loss
        context.stop = order_target_percent(context.trade_me, 0.0, style=LimitOrder(mins[context.trade_me]))
        log.info('ADX: {:0.2f}, changes: {:0.2f}, max: {}, Bought @ {}, set stop @ {}'.\
            format(adxs[context.trade_me],adx_change,maxs[context.trade_me],p,mins[context.trade_me]))
    elif adxs[context.trade_me] > 40.0 and adx_change < 0.0 and n > 0:
        # exit positions
        order_target_percent(context.trade_me, 0.0)
        cancel_order(context.stop)
        context.stop = None
        log.info('ADX: {:0.2f}, changes: {:0.2f}, max: {}, min: {}, exit @ {}'.\
            format(adxs[context.trade_me],adx_change,maxs[context.trade_me],mins[context.trade_me],p))
    elif mins[context.trade_me] > context.last_min[context.trade_me] and n > 0:
        # cancel last stop-loss
        cancel_order(context.stop)
        # update the stop_loss
        context.stop = order_target_percent(context.trade_me, 0.0, style=LimitOrder(mins[context.trade_me]))
        log.info('ADX: {:0.2f}, changes: {:0.2f}, max: {}, p: {}, set new stop @ {}'.\
            format(adxs[context.trade_me],adx_change,maxs[context.trade_me],p,mins[context.trade_me]))
There was a runtime error.

And here's one of my typical modular (over done) versions... I don't much care for the entry mechanism though. Maybe it'll work flipped over?

Clone Algorithm
95
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# 1) ADX should be rising.
# 2) Buy if the price breaks 55-day channel with stop loss at 20-day price channel.
# 3) Stop should be updated using 20-day channel as price moves up.
# 4) Liquidate everything if ADX is over 40 and ADX starts to go down.

import math 
import talib
import numpy
import pandas 
import zipline
import datetime
import collections

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def initialize(context):
    set_symbol_lookup_date('2015-01-01')
    context.REF = symbol('SPY')
    set_benchmark(context.REF)
    context.Expired = []    
    context.S = {}
    
    statusRule = date_rules.every_day()
    entryRule = date_rules.week_start()
    exitRule = date_rules.week_end()
    
    # Establish state
    schedule_function(EstablishState, statusRule)
    
    # Calculate indicators
    schedule_function(CalculateADX, statusRule)
    schedule_function(CalculateHighestHigh, entryRule)
    schedule_function(CalculateLowestLow, exitRule)
    
    # Handle exits
    schedule_function(ExpiredStop, statusRule)    
    schedule_function(HandleExit, exitRule)
    schedule_function(FallingADXExit, statusRule)
    
    # Handle entries
    schedule_function(HandleEntry, entryRule)
    schedule_function(RecordStatus, statusRule)
    
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def handle_data(context, data):
    pass

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def RecordStatus(context, data):
    positions    = context.portfolio.positions
    record(Leveage       = context.account.leverage)
    record(DataCount     = len(data))
    record(PositionCount = sum([1 for stock in positions if positions[stock].amount != 0]))
    if (context.REF in context.S and 'ADXPrior' in context.S[context.REF]):
        record(ADX       = context.S[context.REF].ADXPrior)
        
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def HandleEntry(context, data):
    eligibleLong = []
    positions = context.portfolio.positions
    openPositions = [stock for stock in positions 
                     if positions[stock].amount != 0 
                     and stock != context.REF 
                     and not context.S[stock].HasOpenOrders]
    
    for stock in context.S:
        if (stock in context.Expired):
            continue
        if (stock in openPositions):
            continue
        if (context.S[stock].HasOpenOrders):
            continue
        if (stock == context.REF):
            continue        
        if (not context.S[stock].ADXIsRising):
            continue
        if (not context.S[stock].EntryChannelTriggered):
            continue
        eligibleLong.append(stock)
    
    eligibleLong += openPositions
    eligibleLongCount = float(len(eligibleLong))
    
    for stock in eligibleLong:
        order_target_percent(stock, .9 / eligibleLongCount)
        PrintEntry(context.S[stock], "Entry Long", "Retain Long", stock.symbol)
        
    # Quantopian spoof for a "hedged" strategy
    order_target_percent(context.REF, -.01)
    
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def HandleExit(context, data):
    positions = context.portfolio.positions
    openPositions = [stock for stock in positions if positions[stock].amount != 0]
    for stock in context.S:
        if (stock not in openPositions):
            continue
        if (stock == context.REF):
            continue
        if (context.S[stock].ExitChannelTriggered):
            order_target_percent(stock, 0.0)
            context.S[stock].HasOpenOrders = True
            PrintExit(context.S[stock], "Lower channel exit", stock.symbol)

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def FallingADXExit(context, data):
    # If we dont' have an ADX metric, OR, the ADXPrior is < 40, OR, ADX is not falling, return
    if ('ADXPrior' not in context.S[context.REF] or 
        context.S[context.REF].ADXPrior < FallingADXThreshold or 
        context.S[context.REF].ADXIsRising):
        return
    positions = context.portfolio.positions
    openPositions = [stock for stock in positions if positions[stock].amount != 0]
    for stock in openPositions:
        if (stock not in data):
            continue
        if (stock == context.REF):
            continue
        order_target_percent(stock, 0.0)
        context.S[stock].HasOpenOrders = True
        PrintExit(context.S[stock], "ADX falling exit", stock.symbol)

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def ExpiredStop(context, data):
    for stock in data:  
        if (stock.end_date > (get_datetime() + datetime.timedelta(days=5))):
            continue
        if (stock in context.Expired):
            continue
        order_target_percent(stock, 0)
        context.S[stock].HasOpenOrders = True
        print("Expired {0}".format(stock.symbol))
        context.Expired.append(stock)

#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def CalculateADX(context, data):
    highDeck  = history(ADXPeriods * 2, '1d', 'high').dropna(axis=1)
    lowDeck   = history(ADXPeriods * 2, '1d', 'low').dropna(axis=1)    
    closeDeck = history(ADXPeriods * 2, '1d', 'close_price').dropna(axis=1)    
    valid     = [sid for sid in highDeck if sid in data]
    highDeck  = highDeck[valid]
    lowDeck   = lowDeck[valid]
    closeDeck = closeDeck[valid]
    for stock in context.S:
        try:
            ADX = talib.ADX(highDeck[stock], lowDeck[stock], closeDeck[stock], timeperiod=ADXPeriods)
        except: continue
        context.S[stock].ADXIsRising = ADX[-1] > context.S[stock].ADXPrior
        context.S[stock].ADXPrior = ADX[-1]
       
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~            
def CalculateHighestHigh(context, data):
    highDeck = history(EntryChannelPeriods, "1d", "high").dropna(axis=1)
    highDeck = highDeck[[sid for sid in highDeck if sid in data]]
    for stock in highDeck:
        context.S[stock].EntryChannelTriggered = True if context.S[stock].High == max(highDeck[stock]) else False
        
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def CalculateLowestLow(context, data):
    lowDeck = history(ExitChannelPeriods, "1d", "low").dropna(axis=1)
    lowDeck = lowDeck[[sid for sid in lowDeck if sid in data]]
    for stock in lowDeck:
        context.S[stock].ExitChannelTriggered = True if context.S[stock].Low == min(lowDeck[stock]) else False
        
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def EstablishState(context, data):
    # Reconcile available stocks into context.S
    for stock in data:
        if (stock not in context.S):
            context.S[stock] = DataStock(stock, data[stock])
            context.S[stock].ADXPrior = numpy.NaN
            context.S[stock].ADXIsRising = False
            context.S[stock].EntryChannelTriggered = False
            context.S[stock].ExitChannelTriggered  = False
        else:
            context.S[stock].Update(data[stock])
            
    # Reconcile backwards for securities we can no longer trade
    removeThese = []
    for stock in context.S:
        if (stock not in data):
            removeThese.append(stock)
    for stock in removeThese:
        del context.S[stock]                    
        
	# Now setup up state on the SIDDAta object inside context.S        
    for stock in context.S:
        context.S[stock].Weight        = 0
        context.S[stock].NetQuantity   = context.portfolio.positions[stock].amount
        context.S[stock].CostBasis     = context.portfolio.positions[stock].cost_basis
        context.S[stock].HasOpenOrders = False
        context.S[stock].OpenLimit     = None
        context.S[stock].OpenStop      = None
        if (not get_open_orders(stock)):
        	continue
        context.S[stock].HasOpenOrders = True
        for order in get_open_orders(stock):
            if order.limit:
                context.S[stock].OpenLimit   = order
                context.S[stock].LimitLeaves = order.amount - order.filled
            elif order.stop:
                context.S[stock].OpenStop    = order
                context.S[stock].StopLeaves  = order.amount - order.filled 
                
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def PrintEntry(dataStock, entryName, retainName, symbol):
    if (not EnableLogging):
        return
    if (dataStock.NetQuantity == 0):
        print(">> {0:<20}{1:<5} @ {2:>7.2f}".format(
            entryName, symbol, dataStock.Close))
    else:
        print("** {0:<20}{1:<5} @ {2:>7.2f} # {3:>5}".format(
            retainName, symbol, dataStock.Close, dataStock.NetQuantity))
   
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def PrintExit(dataStock, exitName, symbol):
    if (not EnableLogging):
        return    
    pnl = 0.0
    if (dataStock.NetQuantity > 0):
        pnl = dataStock.Close - dataStock.CostBasis
    else:
        pnl = dataStock.CostBasis - dataStock.Close
        
    print("<< {0:<20}{1:<5} @ {2:>7.2f} delta {3:>6.2f} qty {4:>5}".format(
            exitName, symbol, dataStock.Close, pnl, dataStock.NetQuantity))
        
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def before_trading_start(context): 
    f = fundamentals
            
    fundyDeck = get_fundamentals(
        query(
            f.valuation.market_cap,
            f.valuation_ratios.ps_ratio
        )
        .filter(f.valuation.market_cap >= 1e9)
        .order_by(f.valuation_ratios.ps_ratio.asc())
        .limit(MaxSecuritiesToTrade)
    )
    update_universe(fundyDeck.columns.values)    
    
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class DataStock(zipline.protocol.SIDData):
    def __init__(this, sid, dataSid):
        this.Update(dataSid)

    def Update(this, dataSid):
        this.Open   = dataSid.open_price
        this.High   = dataSid.high
        this.Low    = dataSid.low
        this.Close  = dataSid.close_price
        this.Volume = dataSid.volume
        
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
class MAType():
    SMA   = 0; EMA   = 1; WMA   = 2; DEMA  = 3; TEMA  = 4;    
    TRIMA = 5; KAMA  = 6; MAMA  = 7; T3    = 8
        
### Settings Area ###
EnableLogging        = True
ADXPeriods           = 14
EntryChannelPeriods  = 55
ExitChannelPeriods   = 20
FallingADXThreshold  = 40
MaxSecuritiesToTrade = 100

        
There was a runtime error.