pipeline pair cointegration notebook (3700 cointegrated pairs)

Combined Delaney's Researching a Pairs Trading Strategy notebook with a simple pipeline I found somewhere (can't seem to figure out where I got it right now but shoutout to unknown stranger for a good first pass pipeline).

The pipeline returns about a 1000 securities that for a variety of reasons are good for trading. Of these I check for pairwise cointergration over the last year if the securities belong to the same morningstar sector. It takes about 15 or 20 minutes to run and yields 3700 pairs with pvalues less than 5%

Haven't gotten around to building a trading strategy based on this system yet but i thought the heat map looked really cool so its worth posting it here even if only for that reason.

93
Notebook previews are currently unavailable.
6 responses

The difficulty with this is that p-values of 5% are not really significant anymore when you've drawn from so many trials.

Well I'm not really as clued up on significance and critical values as i would like to be but looking at the coint() docs and the McKinnon paper they reference for their critical value determination it would appear to me as if the critical value for each test is determined solely by the number of variables being cointergrated (i.e. 2 in each case) and the number of samples used for the test (i.e. number of trading days over the year which is 250 or something like that)

All of which is to say i think to quote the docs "The Null hypothesis is that there is no cointegration, the alternative
hypothesis is that there is cointegrating relationship." and that this hypothesis is limited to only the pair I am testing rather than to the overall 1000 stock/1000000 pair system.

As I said though this really isn't my area of expertise though so my interpretation of the results i have obtained could very well be way off.

My understanding is that if i wished to compare these results across the entire system i would need to record the scores from each pair's test and then calculate critical values from the mean and standard deviation of the score for the entire universe? But even that is probably not useful as that would be entirely in sample so i would in all reality probably have to replicate McKinnon's simulation based approach to determining critical values for a system of this size before i would be able to actually interpret the scores across the entire system.

Despite all that my plan was to select random pairs from the 3700 pairs as my trading strategy needs additional pairs. I would then check for and only trade the pair if I find a short term (say 3 or 6 month) cointergration using minute data.

Please keep posting about this strategy; I tried something similar a few years ago, and found that most of the pairs were either garbage (spurious and/or in-sample only), or perfect (because they were ETFs on the same things) and therefore too low in variance to profit from with Quantopian. But, I am curious! I know some people do it successfully, I won't name names, but they are quiet about how they pick their pairs.

Out of curiosity I built an algorithm that trades those cointegrated pairs. While the current pair selection criteria is trivial and the results disappointing, as expected, it would be fairly easy to change the pair selection logic with a smarter one while reusing the algorithm code. In the hope it can be useful to someone else, here is the algorithm.

44
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage, Latest,AverageDollarVolume
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline import factors, filters, classifiers

import pytz
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint

class Pairs(object):

def __init__(self, pairs):
self.map = { id:pair for id, pair in enumerate(pairs) }

next_id = max(self.map) + 1 if self.map else 0
self.map[next_id] = pair

def get_num_pairs(self):
return len(self.map)

def get_security_list(self):
security_list = []
for pair in self.map.itervalues():
security_list.append(pair.y)
security_list.append(pair.x)
return security_list

class Pair(object):

def __init__(self, y, x):
self.y = y
self.x = x
self.inLong  = False
self.inShort = False
#self.hedgeRatioTS = np.empty(0)

def high_volume_universe(top_liquid, min_price = None, min_volume = None):
"""
Computes a security universe of liquid stocks and filtering out
Returns
-------
"""

full_filter = filters.make_us_equity_universe(
target_size=top_liquid,
rankby=factors.AverageDollarVolume(window_length=200),
groupby=classifiers.morningstar.Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)

if min_price > 0:
price = SimpleMovingAverage(inputs=[USEquityPricing.close],
full_filter &= (price >= min_price)

if min_volume > 0:
volume = SimpleMovingAverage(inputs=[USEquityPricing.volume],
full_filter &= (volume >= min_volume)

return full_filter

def make_pipeline(context):
full_filter = high_volume_universe(top_liquid=context.universe_size)
pipe = Pipeline()
pipe.set_screen(full_filter)
sector = morningstar.asset_classification.morningstar_sector_code.latest
return pipe

def initialize(context):

# strategy specific variables
context.hedge_ratio_lookback   = 20 # used for regression
context.z_window               = 20 # used for zscore calculation, must be <= lookback
context.universe_size          = 500 # number of stock to select everyday for cointegration test
context.cointegration_loopback = 252 # how many days to use in cointegration test

context.stock_pairs = Pairs( [] ) # dynamic pair universe

attach_pipeline(make_pipeline(context), 'universe')

schedule_function(func=clear_expired_pairs, date_rule=date_rules.every_day(), time_rule=time_rules.market_open())
schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(hours=3))

results = pipeline_output('universe')

new_universe = list(results.index)

#
# Remove pairs that are no longer cointegrated
#
current_pair_universe = context.stock_pairs.get_security_list()
prices = data.history(current_pair_universe, 'price', context.cointegration_loopback, '1d')
pairs_to_clear = []
for i, pair in context.stock_pairs.map.items():
out = coint(prices[pair.y], prices[pair.x])
score = out[0]
pvalue = out[1]
if pvalue >= 0.05: #this threashold is higher than the one used to enter a pair, to avoid excessive turnover
# pair no more cointegrated
pairs_to_clear.append(pair)
del context.stock_pairs.map[i]

#
# Find new cointegrated pairs on stocks that are not in current pairs
#
search_universe = list(set(new_universe) - set(context.stock_pairs.get_security_list()))
prices = data.history(search_universe, 'price', context.cointegration_loopback, '1d')
num_stocks = len(prices.columns)
pairs = {}
for y in range(num_stocks):
for x in range(y+1, num_stocks):
stocky = prices.columns[y]
stockx = prices.columns[x]
# force pair to be in the same sector
if results['sector'][stocky] != results['sector'][stockx]:
continue
py = prices[stocky]
px = prices[stockx]
out = coint(py, px)
score = out[0]
pvalue = out[1]
if pvalue < 0.01:
_list = pairs.get(key, list())
_list.append( (stocky, stockx) )
pairs[key] = _list

#
# Add new pairs to the previous ones: if a stock is present
# in multiple pairs, the one with lower key in 'pairs' dictionary
# will be chosen
#
prev_secs = set( context.stock_pairs.get_security_list() )
new_secs = set()
for key in sorted(pairs.keys()):
for x,y in pairs[key]:
xy_set = set([x,y])
if xy_set&new_secs or xy_set&prev_secs:
continue
new_secs |= xy_set

#
# log/record some info
#
new_pairs     = len(new_secs)/2
expired_pairs = len(pairs_to_clear)
turnover      = new_pairs+expired_pairs
num_pairs     = context.stock_pairs.get_num_pairs()
num_pos       = len(context.portfolio.positions)

print 'Basket of stocks %d num_pos %d num_pairs %d new_pairs %d expired_pairs %d turnover %d' % (len(new_universe), num_pos, num_pairs, new_pairs, expired_pairs, turnover)

record(leverage=context.account.leverage,
exposure=context.account.net_leverage,
#new_pairs=new_pairs,
#expired_pairs=expired_pairs,
turnover=turnover,
num_pairs=num_pairs,
num_pos=num_pos)

def clear_expired_pairs(context, data):
pair_security_list = context.stock_pairs.get_security_list()
for s in context.portfolio.positions:
if s not in pair_security_list and data.can_trade(s):
order_target_percent(s, 0)

def check_pair_status(context, data):

if get_open_orders():
return

pair_security_list = context.stock_pairs.get_security_list()
prices = data.history(pair_security_list, 'price', context.hedge_ratio_lookback, '1d')

orders = {}
for i, pair in context.stock_pairs.map.iteritems():

Y = prices[pair.y]
X = prices[pair.x]

try:
except (IndexError, ValueError) as e:
log.error(e)
continue

new_spread = Y[-1] - hedge * X[-1]

#pair.hedgeRatioTS = np.append(pair.hedgeRatioTS, hedge)[-context.z_window:]

# Keep only the z-score lookback period

if pair.inShort and zscore < 0.0 and all(data.can_trade([pair.y,pair.x])):
pair.inShort = False
pair.inLong = False
orders[i] = (0, 0)

elif pair.inLong and zscore > 0.0 and all(data.can_trade([pair.y,pair.x])):
pair.inShort = False
pair.inLong = False
orders[i] = (0, 0)

elif zscore < -1.0 and (not pair.inLong) and all(data.can_trade([pair.y,pair.x])):
y_target_shares = 1
X_target_shares = -hedge
pair.inLong = True
pair.inShort = False

(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] )
orders[i] = (y_target_pct, x_target_pct)

elif zscore > 1.0 and (not pair.inShort) and all(data.can_trade([pair.y,pair.x])):
y_target_shares = -1
X_target_shares = hedge
pair.inShort = True
pair.inLong = False

(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
orders[i] = (y_target_pct, x_target_pct)

#
# rebalance
#
active_pairs = sum( [(p.inLong or p.inShort) for p in context.stock_pairs.map.itervalues()] )
#active_pairs = context.stock_pairs.get_num_pairs() # this results in under-leverage because not all pairs are traded at any given time
for i, weights in orders.iteritems():
pair = context.stock_pairs.map[i]
continue
order_target_percent( pair.y, weights[0] / float(active_pairs) )
order_target_percent( pair.x, weights[1] / float(active_pairs) )

model = sm.OLS(Y, X).fit()
return model.params[1]
model = sm.OLS(Y, X).fit()
return model.params.values

def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
yDol = yShares * yPrice
xDol = xShares * xPrice
notionalDol =  abs(yDol) + abs(xDol)
y_target_pct = yDol / notionalDol
x_target_pct = xDol / notionalDol
return (y_target_pct, x_target_pct)

There was a runtime error.