Back to Community
pipeline pair cointegration notebook (3700 cointegrated pairs)

Combined Delaney's Researching a Pairs Trading Strategy notebook with a simple pipeline I found somewhere (can't seem to figure out where I got it right now but shoutout to unknown stranger for a good first pass pipeline).

The pipeline returns about a 1000 securities that for a variety of reasons are good for trading. Of these I check for pairwise cointergration over the last year if the securities belong to the same morningstar sector. It takes about 15 or 20 minutes to run and yields 3700 pairs with pvalues less than 5%

Haven't gotten around to building a trading strategy based on this system yet but i thought the heat map looked really cool so its worth posting it here even if only for that reason.

Loading notebook preview...
6 responses

The difficulty with this is that p-values of 5% are not really significant anymore when you've drawn from so many trials.

Well I'm not really as clued up on significance and critical values as i would like to be but looking at the coint() docs and the McKinnon paper they reference for their critical value determination it would appear to me as if the critical value for each test is determined solely by the number of variables being cointergrated (i.e. 2 in each case) and the number of samples used for the test (i.e. number of trading days over the year which is 250 or something like that)

All of which is to say i think to quote the docs "The Null hypothesis is that there is no cointegration, the alternative
hypothesis is that there is cointegrating relationship." and that this hypothesis is limited to only the pair I am testing rather than to the overall 1000 stock/1000000 pair system.

As I said though this really isn't my area of expertise though so my interpretation of the results i have obtained could very well be way off.

My understanding is that if i wished to compare these results across the entire system i would need to record the scores from each pair's test and then calculate critical values from the mean and standard deviation of the score for the entire universe? But even that is probably not useful as that would be entirely in sample so i would in all reality probably have to replicate McKinnon's simulation based approach to determining critical values for a system of this size before i would be able to actually interpret the scores across the entire system.

Despite all that my plan was to select random pairs from the 3700 pairs as my trading strategy needs additional pairs. I would then check for and only trade the pair if I find a short term (say 3 or 6 month) cointergration using minute data.

Please keep posting about this strategy; I tried something similar a few years ago, and found that most of the pairs were either garbage (spurious and/or in-sample only), or perfect (because they were ETFs on the same things) and therefore too low in variance to profit from with Quantopian. But, I am curious! I know some people do it successfully, I won't name names, but they are quiet about how they pick their pairs.

Out of curiosity I built an algorithm that trades those cointegrated pairs. While the current pair selection criteria is trivial and the results disappointing, as expected, it would be fairly easy to change the pair selection logic with a smarter one while reusing the algorithm code. In the hope it can be useful to someone else, here is the algorithm.

Clone Algorithm
Total Returns
Max Drawdown
Benchmark Returns
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from import USEquityPricing
from import morningstar
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage, Latest,AverageDollarVolume
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline import factors, filters, classifiers

import pytz
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint

class Pairs(object):
    def __init__(self, pairs): = { id:pair for id, pair in enumerate(pairs) }
    def add_pair(self, pair):
        next_id = max( + 1 if else 0[next_id] = pair
    def get_num_pairs(self):
        return len(
    def get_security_list(self):
        security_list = []
        for pair in
        return security_list
class Pair(object):
    def __init__(self, y, x):
        self.y = y
        self.x = x
        self.inLong  = False
        self.inShort = False
        self.spread = np.empty(0)
        #self.hedgeRatioTS = np.empty(0)

def high_volume_universe(top_liquid, min_price = None, min_volume = None):  
    Computes a security universe of liquid stocks and filtering out
    hard to trade ones
    high_volume_tradable - zipline.pipeline.filter
    full_filter = filters.make_us_equity_universe(
        smoothing_func=lambda f: f.downsample('month_start'),
    if min_price > 0:
        price = SimpleMovingAverage(inputs=[USEquityPricing.close],
                                    window_length=21, mask=full_filter)
        full_filter &= (price >= min_price)
    if min_volume > 0:
        volume = SimpleMovingAverage(inputs=[USEquityPricing.volume],
                                     window_length=21, mask=full_filter)
        full_filter &= (volume >= min_volume)
    return full_filter

def make_pipeline(context):
    full_filter = high_volume_universe(top_liquid=context.universe_size)
    pipe = Pipeline()
    sector = morningstar.asset_classification.morningstar_sector_code.latest
    pipe.add(sector, 'sector')
    return pipe        

def initialize(context):
    set_commission(commission.PerShare(cost=0.00, min_trade_cost=0.00))
    context.dont_buys = security_lists.restrict_leveraged_etfs
    # strategy specific variables
    context.hedge_ratio_lookback   = 20 # used for regression
    context.z_window               = 20 # used for zscore calculation, must be <= lookback
    context.universe_size          = 500 # number of stock to select everyday for cointegration test
    context.cointegration_loopback = 252 # how many days to use in cointegration test
    context.stock_pairs = Pairs( [] ) # dynamic pair universe

    attach_pipeline(make_pipeline(context), 'universe')   
    schedule_function(func=clear_expired_pairs, date_rule=date_rules.every_day(), time_rule=time_rules.market_open())
    schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(hours=3))

def before_trading_start(context, data):
    results = pipeline_output('universe')     
    results = results.drop(context.dont_buys, axis=0, errors='ignore')
    new_universe = list(results.index)
    # Remove pairs that are no longer cointegrated
    current_pair_universe = context.stock_pairs.get_security_list()
    prices = data.history(current_pair_universe, 'price', context.cointegration_loopback, '1d')
    pairs_to_clear = []
    for i, pair in
        out = coint(prices[pair.y], prices[pair.x])
        score = out[0]
        pvalue = out[1]
        if pvalue >= 0.05: #this threashold is higher than the one used to enter a pair, to avoid excessive turnover
            # pair no more cointegrated

    # Find new cointegrated pairs on stocks that are not in current pairs
    search_universe = list(set(new_universe) - set(context.stock_pairs.get_security_list()))
    prices = data.history(search_universe, 'price', context.cointegration_loopback, '1d')
    num_stocks = len(prices.columns)
    pairs = {}
    for y in range(num_stocks):
        for x in range(y+1, num_stocks):
            stocky = prices.columns[y]
            stockx = prices.columns[x]        
            # force pair to be in the same sector
            if results['sector'][stocky] != results['sector'][stockx]:
            py = prices[stocky]
            px = prices[stockx]
            out = coint(py, px)
            score = out[0]
            pvalue = out[1]
            if pvalue < 0.01:
                spread = (py-px).std()
                key = 1.0/spread # prioritize high spread pairs
                _list = pairs.get(key, list())
                _list.append( (stocky, stockx) )
                pairs[key] = _list

    # Add new pairs to the previous ones: if a stock is present
    # in multiple pairs, the one with lower key in 'pairs' dictionary
    # will be chosen
    prev_secs = set( context.stock_pairs.get_security_list() )
    new_secs = set()
    for key in sorted(pairs.keys()):
        for x,y in pairs[key]:
            xy_set = set([x,y])
            if xy_set&new_secs or xy_set&prev_secs:
            new_secs |= xy_set
            context.stock_pairs.add_pair( Pair(y, x) )

    # log/record some info
    new_pairs     = len(new_secs)/2
    expired_pairs = len(pairs_to_clear)
    turnover      = new_pairs+expired_pairs
    num_pairs     = context.stock_pairs.get_num_pairs()
    num_pos       = len(context.portfolio.positions)
    print 'Basket of stocks %d num_pos %d num_pairs %d new_pairs %d expired_pairs %d turnover %d' % (len(new_universe), num_pos, num_pairs, new_pairs, expired_pairs, turnover)

def clear_expired_pairs(context, data):   
    pair_security_list = context.stock_pairs.get_security_list()
    for s in context.portfolio.positions:
        if s not in pair_security_list and data.can_trade(s):
            order_target_percent(s, 0)
def check_pair_status(context, data):
    if get_open_orders():
    pair_security_list = context.stock_pairs.get_security_list()
    prices = data.history(pair_security_list, 'price', context.hedge_ratio_lookback, '1d')
    orders = {}
    for i, pair in

        Y = prices[pair.y]
        X = prices[pair.x]

            hedge = hedge_ratio(Y, X, add_const=True)           
        except (IndexError, ValueError) as e:
        new_spread = Y[-1] - hedge * X[-1]
        pair.spread = np.append(pair.spread, new_spread)[-context.z_window:]
        #pair.hedgeRatioTS = np.append(pair.hedgeRatioTS, hedge)[-context.z_window:]

        if pair.spread.shape[0] >= context.z_window:
            # Keep only the z-score lookback period
            spreads = pair.spread[-context.z_window:]

            zscore = (spreads[-1] - spreads.mean()) / spreads.std()

            if pair.inShort and zscore < 0.0 and all(data.can_trade([pair.y,pair.x])):
                pair.inShort = False
                pair.inLong = False
                orders[i] = (0, 0)

            elif pair.inLong and zscore > 0.0 and all(data.can_trade([pair.y,pair.x])):
                pair.inShort = False
                pair.inLong = False
                orders[i] = (0, 0)

            elif zscore < -1.0 and (not pair.inLong) and all(data.can_trade([pair.y,pair.x])):
                # Only trade if NOT already in a trade
                y_target_shares = 1
                X_target_shares = -hedge
                pair.inLong = True
                pair.inShort = False

                (y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] )
                orders[i] = (y_target_pct, x_target_pct)
            elif zscore > 1.0 and (not pair.inShort) and all(data.can_trade([pair.y,pair.x])):
                # Only trade if NOT already in a trade
                y_target_shares = -1
                X_target_shares = hedge
                pair.inShort = True
                pair.inLong = False

                (y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
                orders[i] = (y_target_pct, x_target_pct)
    # rebalance
    active_pairs = sum( [(p.inLong or p.inShort) for p in] )
    #active_pairs = context.stock_pairs.get_num_pairs() # this results in under-leverage because not all pairs are traded at any given time
    for i, weights in orders.iteritems():
        pair =[i]
        if not data.can_trade(pair.y) or not data.can_trade(pair.x):
        order_target_percent( pair.y, weights[0] / float(active_pairs) )
        order_target_percent( pair.x, weights[1] / float(active_pairs) )

def hedge_ratio(Y, X, add_const=True):
    if add_const:
        X = sm.add_constant(X)
        model = sm.OLS(Y, X).fit()
        return model.params[1]
    model = sm.OLS(Y, X).fit()
    return model.params.values
def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
    yDol = yShares * yPrice
    xDol = xShares * xPrice
    notionalDol =  abs(yDol) + abs(xDol)
    y_target_pct = yDol / notionalDol
    x_target_pct = xDol / notionalDol
    return (y_target_pct, x_target_pct)
There was a runtime error.

I make a video from youtube about cointegration: