from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.filters import Q500US, make_us_equity_universe
from quantopian.pipeline.data.morningstar import asset_classification, valuation
from itertools import combinations
import statsmodels.api as statm
from statsmodels.tsa.stattools import adfuller
import pandas as pd
from scipy import odr
class PairsTradingAlgo(object):
population_size = 300
max_pairs = 20
stationary_window = 63
signi_level = 0.05
deviation_cutoff = 1.
close_cutoff = 0.25
stop_loss = 6.0
margin = 0.1725
def __init__(self, context):
self.context = context
self.pair_pos = list()
def get_pairs(self):
pairs = list()
for _, group in self.context.assets[
[ self.check_Int1( asset ) for asset in self.context.assets.index ]
].groupby("industry"):
if group.shape[0] > 1:
pairs.extend(
[
pair for pair in combinations(
sorted(
group.index.tolist(),
key = lambda x: x.sid
),
2
) if self.is_cointegrated( pair )
]
)
return pairs
def get_weight(self, pair, dev_cutoff, dev_ceil = 3):
data_x = self.context.price_hist[ pair[0] ]
data_y = self.context.price_hist[ pair[1] ]
mean_diff, sd_diff = ( data_x - data_y ).pipe(
lambda ds: ( ds.mean(), ds.std() )
)
price1 = self.data.current( pair[0],"price" )
price2 = self.data.current( pair[1],"price" )
cur_diff = price1 - price2
cur_deviation = abs( cur_diff - mean_diff ) / sd_diff
if (
sd_diff / max( price1, price2 ) <= 0.1
and
cur_deviation > dev_cutoff
and
cur_deviation < dev_ceil
):
if (cur_diff > mean_diff):
weight = -1.0 * self.get_betas( data_x, data_y )[0]
else:
weight = 1.0
else:
weight = 0.0
return weight
def check_Int1(self, asset):
return self.context.price_hist[ asset ].diff()[1:].pipe(
lambda ds:
ds.notnull().all()
and
self.is_stationary( ds )
)
def is_stationary(self, data_x):
return adfuller( data_x )[1] < self.signi_level
def get_betas(self, data_x, data_y):
const, beta = statm.OLS(
data_y,
statm.add_constant(
data_x
)
).fit().params
return odr.ODR(
odr.RealData(
data_x,
data_y,
sx = data_x.std(),
sy = data_y.std()
),
odr.Model(
lambda B, x :
B[0] * x + B[1]
),
beta0 = [ beta, const ]
).run().beta
def is_cointegrated(self, pair):
data_x = self.context.price_hist[ pair[0] ]
data_y = self.context.price_hist[ pair[1] ]
beta = self.get_betas( data_x, data_y )
return (
beta[0] >= 0
and
beta[0] <= 2
and
self.is_stationary(
data_y
-
beta[0] * data_x
-
beta[1]
)
)
def trade(self, data):
self.data = data
pair_pos = list()
for pair in self.pair_pos:
if self.get_weight(
pair,
self.close_cutoff,
self.stop_loss
):
pair_pos.append( pair )
else:
self.close_pair( pair )
self.pair_pos = pair_pos
if len( self.pair_pos ) == self.max_pairs:
return
pairs = self.get_pairs()
if not pairs:
return
rel_weights = [
self.get_weight(
pair,
self.deviation_cutoff
)
for pair in pairs
]
def get_ordered_nonzero_index():
return [
i for i, weight in sorted(
enumerate( rel_weights ),
key = lambda x: abs( x[1] ),
reverse = True
) if weight
]
cur_pos = [ asset for pair in self.pair_pos for asset in pair]
for i in get_ordered_nonzero_index():
pair = pairs[ i ]
if pair[0] in cur_pos or pair[1] in cur_pos:
rel_weights[ i ] = 0
else:
cur_pos.append( pair[0] )
cur_pos.append( pair[1] )
value = self.context.portfolio.cash / self.max_pairs / self.margin
for i in get_ordered_nonzero_index()[ :self.max_pairs - len( self.pair_pos ) ]:
self.order_pair(
pairs[ i ],
value if rel_weights[ i ] > 0 else -value
)
self.pair_pos.append( pairs[ i ] )
def order_pair(self, pair, value):
order_target_value( pair[0], value )
order_target_value( pair[1], -value )
def close_pair(self, pair):
order_target_percent( pair[0], 0 )
order_target_percent( pair[1], 0 )
def initialize(context):
attach_pipeline( pipeline_data(), 'assets' )
context.pairs_trading_algo = PairsTradingAlgo( context )
context.price_hist = None
schedule_function( trade, date_rules.every_day(), time_rules.market_open( hours = 1 ) )
for i in range(1, 391):
schedule_function(pvr, date_rules.every_day(), time_rules.market_open(minutes=i))
for i in range(1, 391):
schedule_function(track_orders, date_rules.every_day(), time_rules.market_open(minutes=i))
# To limit partial fills
#schedule_function(cncl_oos, date_rules.every_day(), time_rules.market_open(minutes=61))
def cncl_oos(context, data): # Primarily to prevent the logging of unfilled orders at end of day
oo = get_open_orders() # Can also be use at any time to limit partial fills.
for s in oo:
for o in oo[s]:
cancel_order(o.id)
def track_orders(context, data):
''' Show orders when made and filled.
Info: https://www.quantopian.com/posts/track-orders
for https://www.quantopian.com/posts/hanyang-univ-team-3-results
... I added leverage to the output here.
'''
c = context
if 'trac' not in c:
c.t_opts = { # __________ O P T I O N S __________
#'symbols' : ['LMT', 'WB', 'KMG'],
'symbols' : [], # List of symbols to filter for, like ['TSLA', 'SPY']
'log_neg_cash': 1, # Show cash only when negative.
'log_cash' : 1, # Show cash values in logging window or not.
'log_ids' : 1, # Include order id's in logging window or not.
'log_unfilled': 1, # When orders are unfilled. (stop & limit excluded).
'log_cancels' : 0, # When orders are canceled.
} # Move these to initialize() for better efficiency.
c.trac = {}
c.t_dates = { # To not overwhelm the log window, start/stop dates can be entered.
'active': 0,
'start' : [], # Start dates, option like ['2007-05-07', '2010-04-26']
'stop' : [] # Stop dates, option like ['2008-02-13', '2010-11-15']
}
from pytz import timezone as _tz # Python only does once, makes this portable.
# Move to top of algo for better efficiency.
# If 'start' or 'stop' lists have something in them, triggers ...
if c.t_dates['start'] or c.t_dates['stop']:
_date = str(get_datetime().date())
if _date in c.t_dates['start']: # See if there's a match to start
c.t_dates['active'] = 1
elif _date in c.t_dates['stop']: # ... or to stop
c.t_dates['active'] = 0
else: c.t_dates['active'] = 1 # Set to active b/c no conditions.
if c.t_dates['active'] == 0: return # Skip if not active.
def _minute(): # To preface each line with the minute of the day.
bar_dt = get_datetime().astimezone(_tz('US/Eastern'))
return (bar_dt.hour * 60) + bar_dt.minute - 570 # (-570 = 9:31a)
def _trac(to_log): # So all logging comes from the same line number,
log.info(' {} {} {}'.format(str(_minute()).rjust(3), to_log, '%.1f' % c.account.leverage)) # for vertical alignment in the logging window.
for oid in c.trac.copy(): # Existing known orders
o = get_order(oid)
if o.dt == o.created: continue # No chance of fill yet.
cash = ''
prc = data.current(o.sid, 'price') if data.can_trade(o.sid) else c.portfolio.positions[o.sid].last_sale_price
if (c.t_opts['log_neg_cash'] and c.portfolio.cash < 0) or c.t_opts['log_cash']:
cash = str(int(c.portfolio.cash))
if o.status == 2: # Canceled
do = 'Buy' if o.amount > 0 else 'Sell' ; style = ''
if o.stop:
style = ' stop {}'.format(o.stop)
if o.limit: style = ' stop {} limit {}'.format(o.stop, o.limit)
elif o.limit: style = ' limit {}'.format(o.limit)
if not c.t_opts['symbols'] or (c.t_opts['symbols'] and o.sid.symbol in c.t_opts['symbols']):
if c.t_opts['log_cancels']:
_trac(' Canceled {} {} {}{} at {} {} {}'.format(do, o.amount,
o.sid.symbol, style, prc, cash, o.id[-4:] if c.t_opts['log_ids'] else ''))
del c.trac[o.id]
elif o.filled: # Filled at least some.
filled = '{}'.format(o.amount)
filled_amt = 0
if o.status == 1: # Complete
if 0 < c.trac[o.id] < o.amount:
filled = 'all {}/{}'.format(o.filled - c.trac[o.id], o.amount)
filled_amt = o.filled
else: # c.trac[o.id] value is previously filled total
filled_amt = o.filled - c.trac[o.id] # filled this time, can be 0
c.trac[o.id] = o.filled # save fill value for increments math
filled = '{}/{}'.format(filled_amt, o.amount)
if filled_amt:
now = ' ({})'.format(c.portfolio.positions[o.sid].amount) if c.portfolio.positions[o.sid].amount else ' _'
pnl = '' # for the trade only
amt = c.portfolio.positions[o.sid].amount ; style = ''
if (amt - o.filled) * o.filled < 0: # Profit-taking scenario including short-buyback
cb = c.portfolio.positions[o.sid].cost_basis
if cb:
pnl = -filled_amt * (prc - cb)
sign = '+' if pnl > 0 else '-'
pnl = ' ({}{})'.format(sign, '%.0f' % abs(pnl))
if o.stop:
style = ' stop {}'.format(o.stop)
if o.limit: style = ' stop () limit {}'.format(o.stop, o.limit)
elif o.limit: style = ' limit {}'.format(o.limit)
if o.filled == o.amount: del c.trac[o.id]
if not c.t_opts['symbols'] or (c.t_opts['symbols'] and o.sid.symbol in c.t_opts['symbols']):
_trac(' {} {} {}{} at {}{}{}'.format(
'Bot' if o.amount > 0 else 'Sold', filled, o.sid.symbol, now,
'%.2f' % prc, pnl, style).ljust(52) + ' {} {}'.format(cash, o.id[-4:] if c.t_opts['log_ids'] else ''))
elif c.t_opts['log_unfilled'] and not (o.stop or o.limit):
if not c.t_opts['symbols'] or (c.t_opts['symbols'] and o.sid.symbol in c.t_opts['symbols']):
_trac(' {} {}{} unfilled {}'.format(o.sid.symbol, o.amount,
' limit' if o.limit else '', o.id[-4:] if c.t_opts['log_ids'] else ''))
oo = get_open_orders().values()
if not oo: return # Handle new orders
cash = ''
if (c.t_opts['log_neg_cash'] and c.portfolio.cash < 0) or c.t_opts['log_cash']:
cash = str(int(c.portfolio.cash))
for oo_list in oo:
for o in oo_list:
if o.id in c.trac: continue # Only new orders beyond this point
prc = data.current(o.sid, 'price') if data.can_trade(o.sid) else c.portfolio.positions[o.sid].last_sale_price
c.trac[o.id] = 0 ; style = ''
now = ' ({})'.format(c.portfolio.positions[o.sid].amount) if c.portfolio.positions[o.sid].amount else ' _'
if o.stop:
style = ' stop {}'.format(o.stop)
if o.limit: style = ' stop {} limit {}'.format(o.stop, o.limit)
elif o.limit: style = ' limit {}'.format(o.limit)
if not c.t_opts['symbols'] or (c.t_opts['symbols'] and o.sid.symbol in c.t_opts['symbols']):
_trac('{} {} {}{} at {}{}'.format('Buy' if o.amount > 0 else 'Sell',
o.amount, o.sid.symbol, now, '%.2f' % prc, style).ljust(52) + ' {} {}'.format(cash, o.id[-4:] if c.t_opts['log_ids'] else ''))
def trade(context,data):
context.pairs_trading_algo.trade( data )
def before_trading_start(context, data):
context.assets = pipeline_output( 'assets' )
if context.price_hist is None:
context.price_hist = data.history(
context.assets.index,
"price",
PairsTradingAlgo.stationary_window,
'1d'
)
else:
context.price_hist = pd.concat(
[
context.price_hist.iloc[1:,:],
data.history(
context.price_hist.columns,
"price",
1,
'1d'
)
],
axis = 0
)
context.price_hist = pd.concat(
[
context.price_hist,
data.history(
[
asset for asset in context.assets.index
if asset not in context.price_hist.columns
],
"price",
PairsTradingAlgo.stationary_window,
'1d'
)
],
axis = 1
)
def pipeline_data():
return Pipeline(
columns = {
'industry': asset_classification.morningstar_industry_code.latest
},
screen = make_us_equity_universe(
target_size = PairsTradingAlgo.population_size,
rankby = valuation.market_cap.latest,
mask = Q500US(),
max_group_weight = 0.1,
groupby = asset_classification.morningstar_industry_code.latest
)
)
def pvr(context, data):
''' Custom chart and/or logging of profit_vs_risk returns and related information
http://quantopian.com/posts/pvr
'''
import time
from datetime import datetime as _dt
from pytz import timezone # Python will only do once, makes this portable.
# Move to top of algo for better efficiency.
c = context # Brevity is the soul of wit -- Shakespeare [for readability]
if 'pvr' not in c:
# For real money, you can modify this to total cash input minus any withdrawals
manual_cash = c.portfolio.starting_cash
time_zone = 'US/Pacific' # Optionally change to your own time zone for wall clock time
c.pvr = {
'options': {
# # # # # # # # # # Options # # # # # # # # # #
'logging' : 0, # Info to logging window with some new maximums
'log_summary' : 126, # Summary every x days. 252/yr
'record_pvr' : 1, # Profit vs Risk returns (percentage)
'record_pvrp' : 0, # PvR (p)roportional neg cash vs portfolio value
'record_cash' : 0, # Cash available
'record_max_lvrg' : 1, # Maximum leverage encountered
'record_max_risk' : 1, # Highest risk overall
'record_shorting' : 0, # Total value of any shorts
'record_max_shrt' : 1, # Max value of shorting total
'record_cash_low' : 1, # Any new lowest cash level
'record_q_return' : 0, # Quantopian returns (percentage)
'record_pnl' : 0, # Profit-n-Loss
'record_risk' : 0, # Risked, max cash spent or shorts beyond longs+cash
'record_leverage' : 0, # End of day leverage (context.account.leverage)
# All records are end-of-day or the last data sent to chart during any day.
# The way the chart operates, only the last value of the day will be seen.
# # # # # # # # # End options # # # # # # # # #
},
'pvr' : 0, # Profit vs Risk returns based on maximum spent
'cagr' : 0,
'max_lvrg' : 0,
'max_shrt' : 0,
'max_risk' : 0,
'days' : 0.0,
'date_prv' : '',
'date_end' : get_environment('end').date(),
'cash_low' : manual_cash,
'cash' : manual_cash,
'start' : manual_cash,
'tz' : time_zone,
'begin' : time.time(), # For run time
'run_str' : '{} to {} ${} {} {}'.format(get_environment('start').date(), get_environment('end').date(), int(manual_cash), _dt.now(timezone(time_zone)).strftime("%Y-%m-%d %H:%M"), time_zone)
}
if c.pvr['options']['record_pvrp']: c.pvr['options']['record_pvr'] = 0 # if pvrp is active, straight pvr is off
if get_environment('arena') not in ['backtest', 'live']: c.pvr['options']['log_summary'] = 1 # Every day when real money
log.info(c.pvr['run_str'])
p = c.pvr ; o = c.pvr['options'] ; pf = c.portfolio ; pnl = pf.portfolio_value - p['start']
def _pvr(c):
p['cagr'] = ((pf.portfolio_value / p['start']) ** (1 / (p['days'] / 252.))) - 1
ptype = 'PvR' if o['record_pvr'] else 'PvRp'
log.info('{} {} %/day cagr {} Portfolio value {} PnL {}'.format(ptype, '%.4f' % (p['pvr'] / p['days']), '%.3f' % p['cagr'], '%.0f' % pf.portfolio_value, '%.0f' % pnl))
log.info(' Profited {} on {} activated/transacted for PvR of {}%'.format('%.0f' % pnl, '%.0f' % p['max_risk'], '%.1f' % p['pvr']))
log.info(' QRet {} PvR {} CshLw {} MxLv {} MxRisk {} MxShrt {}'.format('%.2f' % (100 * pf.returns), '%.2f' % p['pvr'], '%.0f' % p['cash_low'], '%.2f' % p['max_lvrg'], '%.0f' % p['max_risk'], '%.0f' % p['max_shrt']))
def _minut():
dt = get_datetime().astimezone(timezone(p['tz']))
return str((dt.hour * 60) + dt.minute - 570).rjust(3) # (-570 = 9:31a)
date = get_datetime().date()
if p['date_prv'] != date:
p['date_prv'] = date
p['days'] += 1.0
do_summary = 0
if o['log_summary'] and p['days'] % o['log_summary'] == 0 and _minut() == '100':
do_summary = 1 # Log summary every x days
if do_summary or date == p['date_end']:
p['cash'] = pf.cash
elif p['cash'] == pf.cash and not o['logging']: return # for speed
shorts = sum([z.amount * z.last_sale_price for s, z in pf.positions.items() if z.amount < 0])
new_key_hi = 0 # To trigger logging if on.
cash = pf.cash
cash_dip = int(max(0, p['start'] - cash))
risk = int(max(cash_dip, -shorts))
if o['record_pvrp'] and cash < 0: # Let negative cash ding less when portfolio is up.
cash_dip = int(max(0, cash_dip * p['start'] / pf.portfolio_value))
# Imagine: Start with 10, grows to 1000, goes negative to -10, should not be 200% risk.
if int(cash) < p['cash_low']: # New cash low
new_key_hi = 1
p['cash_low'] = int(cash) # Lowest cash level hit
if o['record_cash_low']: record(CashLow = p['cash_low'])
if c.account.leverage > p['max_lvrg']:
new_key_hi = 1
p['max_lvrg'] = c.account.leverage # Maximum intraday leverage
if o['record_max_lvrg']: record(MxLv = p['max_lvrg'])
if shorts < p['max_shrt']:
new_key_hi = 1
p['max_shrt'] = shorts # Maximum shorts value
if o['record_max_shrt']: record(MxShrt = p['max_shrt'])
if risk > p['max_risk']:
new_key_hi = 1
p['max_risk'] = risk # Highest risk overall
if o['record_max_risk']: record(MxRisk = p['max_risk'])
# Profit_vs_Risk returns based on max amount actually invested, long or short
if p['max_risk'] != 0: # Avoid zero-divide
p['pvr'] = 100 * pnl / p['max_risk']
ptype = 'PvRp' if o['record_pvrp'] else 'PvR'
if o['record_pvr'] or o['record_pvrp']: record(**{ptype: p['pvr']})
if o['record_shorting']: record(Shorts = shorts) # Shorts value as a positve
if o['record_leverage']: record(Lv = c.account.leverage) # Leverage
if o['record_cash'] : record(Cash = cash) # Cash
if o['record_risk'] : record(Risk = risk) # Amount in play, maximum of shorts or cash used
if o['record_q_return']: record(QRet = 100 * pf.returns)
if o['record_pnl'] : record(PnL = pnl) # Profit|Loss
if o['logging'] and new_key_hi:
log.info('{}{}{}{}{}{}{}{}{}{}{}{}'.format(_minut(),
' Lv ' + '%.1f' % c.account.leverage,
' MxLv ' + '%.2f' % p['max_lvrg'],
' QRet ' + '%.1f' % (100 * pf.returns),
' PvR ' + '%.1f' % p['pvr'],
' PnL ' + '%.0f' % pnl,
' Cash ' + '%.0f' % cash,
' CshLw ' + '%.0f' % p['cash_low'],
' Shrt ' + '%.0f' % shorts,
' MxShrt ' + '%.0f' % p['max_shrt'],
' Risk ' + '%.0f' % risk,
' MxRisk ' + '%.0f' % p['max_risk']
))
if do_summary: _pvr(c)
if get_datetime() == get_environment('end'): # Summary at end of run
_pvr(c) ; elapsed = (time.time() - p['begin']) / 60 # minutes
log.info( '{}\nRuntime {} hr {} min'.format(p['run_str'], int(elapsed / 60), '%.1f' % (elapsed % 60)))