Back to Community
pipeline pair cointegration notebook (3700 cointegrated pairs)

Combined Delaney's Researching a Pairs Trading Strategy notebook with a simple pipeline I found somewhere (can't seem to figure out where I got it right now but shoutout to unknown stranger for a good first pass pipeline).

The pipeline returns about a 1000 securities that for a variety of reasons are good for trading. Of these I check for pairwise cointergration over the last year if the securities belong to the same morningstar sector. It takes about 15 or 20 minutes to run and yields 3700 pairs with pvalues less than 5%

Haven't gotten around to building a trading strategy based on this system yet but i thought the heat map looked really cool so its worth posting it here even if only for that reason.

Loading notebook preview...
Notebook previews are currently unavailable.
6 responses

The difficulty with this is that p-values of 5% are not really significant anymore when you've drawn from so many trials.

Well I'm not really as clued up on significance and critical values as i would like to be but looking at the coint() docs and the McKinnon paper they reference for their critical value determination it would appear to me as if the critical value for each test is determined solely by the number of variables being cointergrated (i.e. 2 in each case) and the number of samples used for the test (i.e. number of trading days over the year which is 250 or something like that)

All of which is to say i think to quote the docs "The Null hypothesis is that there is no cointegration, the alternative
hypothesis is that there is cointegrating relationship." and that this hypothesis is limited to only the pair I am testing rather than to the overall 1000 stock/1000000 pair system.

As I said though this really isn't my area of expertise though so my interpretation of the results i have obtained could very well be way off.

My understanding is that if i wished to compare these results across the entire system i would need to record the scores from each pair's test and then calculate critical values from the mean and standard deviation of the score for the entire universe? But even that is probably not useful as that would be entirely in sample so i would in all reality probably have to replicate McKinnon's simulation based approach to determining critical values for a system of this size before i would be able to actually interpret the scores across the entire system.

Despite all that my plan was to select random pairs from the 3700 pairs as my trading strategy needs additional pairs. I would then check for and only trade the pair if I find a short term (say 3 or 6 month) cointergration using minute data.

Please keep posting about this strategy; I tried something similar a few years ago, and found that most of the pairs were either garbage (spurious and/or in-sample only), or perfect (because they were ETFs on the same things) and therefore too low in variance to profit from with Quantopian. But, I am curious! I know some people do it successfully, I won't name names, but they are quiet about how they pick their pairs.

Out of curiosity I built an algorithm that trades those cointegrated pairs. While the current pair selection criteria is trivial and the results disappointing, as expected, it would be fairly easy to change the pair selection logic with a smarter one while reusing the algorithm code. In the hope it can be useful to someone else, here is the algorithm.

Clone Algorithm
44
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage, Latest,AverageDollarVolume
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline import factors, filters, classifiers

import pytz
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint

class Pairs(object):
    
    def __init__(self, pairs):
        self.map = { id:pair for id, pair in enumerate(pairs) }
        
    def add_pair(self, pair):
        next_id = max(self.map) + 1 if self.map else 0
        self.map[next_id] = pair
        
    def get_num_pairs(self):
        return len(self.map)
        
    def get_security_list(self):
        security_list = []
        for pair in self.map.itervalues():
            security_list.append(pair.y)
            security_list.append(pair.x)
        return security_list
        
class Pair(object):
    
    def __init__(self, y, x):
        self.y = y
        self.x = x
        self.inLong  = False
        self.inShort = False
        self.spread = np.empty(0)
        #self.hedgeRatioTS = np.empty(0)

def high_volume_universe(top_liquid, min_price = None, min_volume = None):  
    """
    Computes a security universe of liquid stocks and filtering out
    hard to trade ones
    Returns
    -------
    high_volume_tradable - zipline.pipeline.filter
    """
    
    full_filter = filters.make_us_equity_universe(
        target_size=top_liquid,
        rankby=factors.AverageDollarVolume(window_length=200),
        mask=filters.default_us_equity_universe_mask(),
        groupby=classifiers.morningstar.Sector(),
        max_group_weight=0.3,
        smoothing_func=lambda f: f.downsample('month_start'),
    )
    
    if min_price > 0:
        price = SimpleMovingAverage(inputs=[USEquityPricing.close],
                                    window_length=21, mask=full_filter)
        full_filter &= (price >= min_price)
        
    if min_volume > 0:
        volume = SimpleMovingAverage(inputs=[USEquityPricing.volume],
                                     window_length=21, mask=full_filter)
        full_filter &= (volume >= min_volume)
        
    return full_filter

def make_pipeline(context):
    full_filter = high_volume_universe(top_liquid=context.universe_size)
    pipe = Pipeline()
    pipe.set_screen(full_filter)
    sector = morningstar.asset_classification.morningstar_sector_code.latest
    pipe.add(sector, 'sector')
    return pipe        

def initialize(context):
    
    set_slippage(slippage.FixedSlippage(spread=0))
    set_commission(commission.PerShare(cost=0.00, min_trade_cost=0.00))
    
    context.dont_buys = security_lists.restrict_leveraged_etfs
    set_asset_restrictions(context.dont_buys)
    
    # strategy specific variables
    context.hedge_ratio_lookback   = 20 # used for regression
    context.z_window               = 20 # used for zscore calculation, must be <= lookback
    context.universe_size          = 500 # number of stock to select everyday for cointegration test
    context.cointegration_loopback = 252 # how many days to use in cointegration test
    
    context.stock_pairs = Pairs( [] ) # dynamic pair universe

    attach_pipeline(make_pipeline(context), 'universe')   
    
    schedule_function(func=clear_expired_pairs, date_rule=date_rules.every_day(), time_rule=time_rules.market_open())
    schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(hours=3))

def before_trading_start(context, data):
    results = pipeline_output('universe')     
    results = results.drop(context.dont_buys, axis=0, errors='ignore')
    
    new_universe = list(results.index)
       
    #
    # Remove pairs that are no longer cointegrated
    #
    current_pair_universe = context.stock_pairs.get_security_list()
    prices = data.history(current_pair_universe, 'price', context.cointegration_loopback, '1d')
    pairs_to_clear = []
    for i, pair in context.stock_pairs.map.items():
        out = coint(prices[pair.y], prices[pair.x])
        score = out[0]
        pvalue = out[1]
        if pvalue >= 0.05: #this threashold is higher than the one used to enter a pair, to avoid excessive turnover
            # pair no more cointegrated
            pairs_to_clear.append(pair)
            del context.stock_pairs.map[i]

    #
    # Find new cointegrated pairs on stocks that are not in current pairs
    #
    search_universe = list(set(new_universe) - set(context.stock_pairs.get_security_list()))
    prices = data.history(search_universe, 'price', context.cointegration_loopback, '1d')
    num_stocks = len(prices.columns)
    pairs = {}
    for y in range(num_stocks):
        for x in range(y+1, num_stocks):
            stocky = prices.columns[y]
            stockx = prices.columns[x]        
            # force pair to be in the same sector
            if results['sector'][stocky] != results['sector'][stockx]:
                continue
            py = prices[stocky]
            px = prices[stockx]
            out = coint(py, px)
            score = out[0]
            pvalue = out[1]
            if pvalue < 0.01:
                spread = (py-px).std()
                key = 1.0/spread # prioritize high spread pairs
                _list = pairs.get(key, list())
                _list.append( (stocky, stockx) )
                pairs[key] = _list

    #
    # Add new pairs to the previous ones: if a stock is present
    # in multiple pairs, the one with lower key in 'pairs' dictionary
    # will be chosen
    #
    prev_secs = set( context.stock_pairs.get_security_list() )
    new_secs = set()
    for key in sorted(pairs.keys()):
        for x,y in pairs[key]:
            xy_set = set([x,y])
            if xy_set&new_secs or xy_set&prev_secs:
                continue
            new_secs |= xy_set
            context.stock_pairs.add_pair( Pair(y, x) )

    #
    # log/record some info
    #
    new_pairs     = len(new_secs)/2
    expired_pairs = len(pairs_to_clear)
    turnover      = new_pairs+expired_pairs
    num_pairs     = context.stock_pairs.get_num_pairs()
    num_pos       = len(context.portfolio.positions)
        
    print 'Basket of stocks %d num_pos %d num_pairs %d new_pairs %d expired_pairs %d turnover %d' % (len(new_universe), num_pos, num_pairs, new_pairs, expired_pairs, turnover)
    
    record(leverage=context.account.leverage,
           exposure=context.account.net_leverage,
           #new_pairs=new_pairs,
           #expired_pairs=expired_pairs,
           turnover=turnover,
           num_pairs=num_pairs,
           num_pos=num_pos)

def clear_expired_pairs(context, data):   
    pair_security_list = context.stock_pairs.get_security_list()
    for s in context.portfolio.positions:
        if s not in pair_security_list and data.can_trade(s):
            order_target_percent(s, 0)
    
def check_pair_status(context, data):
    
    if get_open_orders():
        return
    
    pair_security_list = context.stock_pairs.get_security_list()
    prices = data.history(pair_security_list, 'price', context.hedge_ratio_lookback, '1d')
    
    orders = {}
    for i, pair in context.stock_pairs.map.iteritems():

        Y = prices[pair.y]
        X = prices[pair.x]

        try:
            hedge = hedge_ratio(Y, X, add_const=True)           
        except (IndexError, ValueError) as e:
            log.error(e)
            continue
      
        new_spread = Y[-1] - hedge * X[-1]
        
        pair.spread = np.append(pair.spread, new_spread)[-context.z_window:]
        #pair.hedgeRatioTS = np.append(pair.hedgeRatioTS, hedge)[-context.z_window:]

        if pair.spread.shape[0] >= context.z_window:
            
            # Keep only the z-score lookback period
            spreads = pair.spread[-context.z_window:]

            zscore = (spreads[-1] - spreads.mean()) / spreads.std()

            if pair.inShort and zscore < 0.0 and all(data.can_trade([pair.y,pair.x])):
                pair.inShort = False
                pair.inLong = False
                orders[i] = (0, 0)

            elif pair.inLong and zscore > 0.0 and all(data.can_trade([pair.y,pair.x])):
                pair.inShort = False
                pair.inLong = False
                orders[i] = (0, 0)

            elif zscore < -1.0 and (not pair.inLong) and all(data.can_trade([pair.y,pair.x])):
                # Only trade if NOT already in a trade
                y_target_shares = 1
                X_target_shares = -hedge
                pair.inLong = True
                pair.inShort = False

                (y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] )
                orders[i] = (y_target_pct, x_target_pct)
                
            elif zscore > 1.0 and (not pair.inShort) and all(data.can_trade([pair.y,pair.x])):
                # Only trade if NOT already in a trade
                y_target_shares = -1
                X_target_shares = hedge
                pair.inShort = True
                pair.inLong = False

                (y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
                orders[i] = (y_target_pct, x_target_pct)
     
    # 
    # rebalance
    #
    active_pairs = sum( [(p.inLong or p.inShort) for p in context.stock_pairs.map.itervalues()] )
    #active_pairs = context.stock_pairs.get_num_pairs() # this results in under-leverage because not all pairs are traded at any given time
    for i, weights in orders.iteritems():
        pair = context.stock_pairs.map[i]
        if not data.can_trade(pair.y) or not data.can_trade(pair.x):
            continue
        order_target_percent( pair.y, weights[0] / float(active_pairs) )
        order_target_percent( pair.x, weights[1] / float(active_pairs) )

def hedge_ratio(Y, X, add_const=True):
    if add_const:
        X = sm.add_constant(X)
        model = sm.OLS(Y, X).fit()
        return model.params[1]
    model = sm.OLS(Y, X).fit()
    return model.params.values
    
def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
    yDol = yShares * yPrice
    xDol = xShares * xPrice
    notionalDol =  abs(yDol) + abs(xDol)
    y_target_pct = yDol / notionalDol
    x_target_pct = xDol / notionalDol
    return (y_target_pct, x_target_pct)
There was a runtime error.

I make a video from youtube about cointegration: https://www.youtube.com/watch?v=fNk6qwKHY1c&t=7s