from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage, Latest,AverageDollarVolume
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline import factors, filters, classifiers
import pytz
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint
class Pairs(object):
def __init__(self, pairs):
self.map = { id:pair for id, pair in enumerate(pairs) }
def add_pair(self, pair):
next_id = max(self.map) + 1 if self.map else 0
self.map[next_id] = pair
def get_num_pairs(self):
return len(self.map)
def get_security_list(self):
security_list = []
for pair in self.map.itervalues():
security_list.append(pair.y)
security_list.append(pair.x)
return security_list
class Pair(object):
def __init__(self, y, x):
self.y = y
self.x = x
self.inLong = False
self.inShort = False
self.spread = np.empty(0)
#self.hedgeRatioTS = np.empty(0)
def high_volume_universe(top_liquid, min_price = None, min_volume = None):
"""
Computes a security universe of liquid stocks and filtering out
hard to trade ones
Returns
-------
high_volume_tradable - zipline.pipeline.filter
"""
full_filter = filters.make_us_equity_universe(
target_size=top_liquid,
rankby=factors.AverageDollarVolume(window_length=200),
mask=filters.default_us_equity_universe_mask(),
groupby=classifiers.morningstar.Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)
if min_price > 0:
price = SimpleMovingAverage(inputs=[USEquityPricing.close],
window_length=21, mask=full_filter)
full_filter &= (price >= min_price)
if min_volume > 0:
volume = SimpleMovingAverage(inputs=[USEquityPricing.volume],
window_length=21, mask=full_filter)
full_filter &= (volume >= min_volume)
return full_filter
def make_pipeline(context):
full_filter = high_volume_universe(top_liquid=context.universe_size)
pipe = Pipeline()
pipe.set_screen(full_filter)
sector = morningstar.asset_classification.morningstar_sector_code.latest
pipe.add(sector, 'sector')
return pipe
def initialize(context):
set_slippage(slippage.FixedSlippage(spread=0))
set_commission(commission.PerShare(cost=0.00, min_trade_cost=0.00))
context.dont_buys = security_lists.restrict_leveraged_etfs
set_asset_restrictions(context.dont_buys)
# strategy specific variables
context.hedge_ratio_lookback = 20 # used for regression
context.z_window = 20 # used for zscore calculation, must be <= lookback
context.universe_size = 500 # number of stock to select everyday for cointegration test
context.cointegration_loopback = 252 # how many days to use in cointegration test
context.stock_pairs = Pairs( [] ) # dynamic pair universe
attach_pipeline(make_pipeline(context), 'universe')
schedule_function(func=clear_expired_pairs, date_rule=date_rules.every_day(), time_rule=time_rules.market_open())
schedule_function(func=check_pair_status, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(hours=3))
def before_trading_start(context, data):
results = pipeline_output('universe')
results = results.drop(context.dont_buys, axis=0, errors='ignore')
new_universe = list(results.index)
#
# Remove pairs that are no longer cointegrated
#
current_pair_universe = context.stock_pairs.get_security_list()
prices = data.history(current_pair_universe, 'price', context.cointegration_loopback, '1d')
pairs_to_clear = []
for i, pair in context.stock_pairs.map.items():
out = coint(prices[pair.y], prices[pair.x])
score = out[0]
pvalue = out[1]
if pvalue >= 0.05: #this threashold is higher than the one used to enter a pair, to avoid excessive turnover
# pair no more cointegrated
pairs_to_clear.append(pair)
del context.stock_pairs.map[i]
#
# Find new cointegrated pairs on stocks that are not in current pairs
#
search_universe = list(set(new_universe) - set(context.stock_pairs.get_security_list()))
prices = data.history(search_universe, 'price', context.cointegration_loopback, '1d')
num_stocks = len(prices.columns)
pairs = {}
for y in range(num_stocks):
for x in range(y+1, num_stocks):
stocky = prices.columns[y]
stockx = prices.columns[x]
# force pair to be in the same sector
if results['sector'][stocky] != results['sector'][stockx]:
continue
py = prices[stocky]
px = prices[stockx]
out = coint(py, px)
score = out[0]
pvalue = out[1]
if pvalue < 0.01:
spread = (py-px).std()
key = 1.0/spread # prioritize high spread pairs
_list = pairs.get(key, list())
_list.append( (stocky, stockx) )
pairs[key] = _list
#
# Add new pairs to the previous ones: if a stock is present
# in multiple pairs, the one with lower key in 'pairs' dictionary
# will be chosen
#
prev_secs = set( context.stock_pairs.get_security_list() )
new_secs = set()
for key in sorted(pairs.keys()):
for x,y in pairs[key]:
xy_set = set([x,y])
if xy_set&new_secs or xy_set&prev_secs:
continue
new_secs |= xy_set
context.stock_pairs.add_pair( Pair(y, x) )
#
# log/record some info
#
new_pairs = len(new_secs)/2
expired_pairs = len(pairs_to_clear)
turnover = new_pairs+expired_pairs
num_pairs = context.stock_pairs.get_num_pairs()
num_pos = len(context.portfolio.positions)
print 'Basket of stocks %d num_pos %d num_pairs %d new_pairs %d expired_pairs %d turnover %d' % (len(new_universe), num_pos, num_pairs, new_pairs, expired_pairs, turnover)
record(leverage=context.account.leverage,
exposure=context.account.net_leverage,
#new_pairs=new_pairs,
#expired_pairs=expired_pairs,
turnover=turnover,
num_pairs=num_pairs,
num_pos=num_pos)
def clear_expired_pairs(context, data):
pair_security_list = context.stock_pairs.get_security_list()
for s in context.portfolio.positions:
if s not in pair_security_list and data.can_trade(s):
order_target_percent(s, 0)
def check_pair_status(context, data):
if get_open_orders():
return
pair_security_list = context.stock_pairs.get_security_list()
prices = data.history(pair_security_list, 'price', context.hedge_ratio_lookback, '1d')
orders = {}
for i, pair in context.stock_pairs.map.iteritems():
Y = prices[pair.y]
X = prices[pair.x]
try:
hedge = hedge_ratio(Y, X, add_const=True)
except (IndexError, ValueError) as e:
log.error(e)
continue
new_spread = Y[-1] - hedge * X[-1]
pair.spread = np.append(pair.spread, new_spread)[-context.z_window:]
#pair.hedgeRatioTS = np.append(pair.hedgeRatioTS, hedge)[-context.z_window:]
if pair.spread.shape[0] >= context.z_window:
# Keep only the z-score lookback period
spreads = pair.spread[-context.z_window:]
zscore = (spreads[-1] - spreads.mean()) / spreads.std()
if pair.inShort and zscore < 0.0 and all(data.can_trade([pair.y,pair.x])):
pair.inShort = False
pair.inLong = False
orders[i] = (0, 0)
elif pair.inLong and zscore > 0.0 and all(data.can_trade([pair.y,pair.x])):
pair.inShort = False
pair.inLong = False
orders[i] = (0, 0)
elif zscore < -1.0 and (not pair.inLong) and all(data.can_trade([pair.y,pair.x])):
# Only trade if NOT already in a trade
y_target_shares = 1
X_target_shares = -hedge
pair.inLong = True
pair.inShort = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares,X_target_shares, Y[-1], X[-1] )
orders[i] = (y_target_pct, x_target_pct)
elif zscore > 1.0 and (not pair.inShort) and all(data.can_trade([pair.y,pair.x])):
# Only trade if NOT already in a trade
y_target_shares = -1
X_target_shares = hedge
pair.inShort = True
pair.inLong = False
(y_target_pct, x_target_pct) = computeHoldingsPct( y_target_shares, X_target_shares, Y[-1], X[-1] )
orders[i] = (y_target_pct, x_target_pct)
#
# rebalance
#
active_pairs = sum( [(p.inLong or p.inShort) for p in context.stock_pairs.map.itervalues()] )
#active_pairs = context.stock_pairs.get_num_pairs() # this results in under-leverage because not all pairs are traded at any given time
for i, weights in orders.iteritems():
pair = context.stock_pairs.map[i]
if not data.can_trade(pair.y) or not data.can_trade(pair.x):
continue
order_target_percent( pair.y, weights[0] / float(active_pairs) )
order_target_percent( pair.x, weights[1] / float(active_pairs) )
def hedge_ratio(Y, X, add_const=True):
if add_const:
X = sm.add_constant(X)
model = sm.OLS(Y, X).fit()
return model.params[1]
model = sm.OLS(Y, X).fit()
return model.params.values
def computeHoldingsPct(yShares, xShares, yPrice, xPrice):
yDol = yShares * yPrice
xDol = xShares * xPrice
notionalDol = abs(yDol) + abs(xDol)
y_target_pct = yDol / notionalDol
x_target_pct = xDol / notionalDol
return (y_target_pct, x_target_pct)