from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline import factors, filters, classifiers
from quantopian.pipeline.factors import CustomFactor, SimpleMovingAverage, AverageDollarVolume, Returns
from quantopian.pipeline.filters import StaticAssets, Q500US, Q1500US, Q3000US, QTradableStocksUS
from quantopian.pipeline.filters.fundamentals import IsPrimaryShare
from quantopian.pipeline.classifiers.fundamentals import Sector
from quantopian.pipeline.data.builtin import USEquityPricing
import pandas as pd
import numpy as np
import scipy.stats as stats
###################### Custom Factors #####################################
class Volatility(CustomFactor):
#inputs = [Returns(window_length=2)]
def compute(self, today, assets, out, returns):
out[:] = np.nanstd(returns, axis=0)
def MeanReversion(mask, days=5):
dmean_returns = Returns(window_length=2, mask=mask).demean(mask=mask)
ret = SimpleMovingAverage(inputs=[dmean_returns], window_length=days, mask=mask)
vol = Volatility(inputs=[dmean_returns], window_length=days, mask=mask)
return -ret/vol
class JoinFactors(CustomFactor):
#inputs = [factor1, factor2, ...]
window_length = 1
def compute(self, today, assets, out, *inputs):
array = np.concatenate(inputs, axis=0)
out[:] = np.nansum(array, axis=0)
out[ np.all(np.isnan(array), axis=0) ] = np.nan
def make_MeanReversionBySector(mask):
PCAs = []
sector = Sector(mask=mask)
for sector_code in Sector.SECTOR_NAMES.keys():
sector_mask = sector.eq(sector_code)
pca = MeanReversion(mask=sector_mask)
pca.window_safe = True
PCAs.append(pca)
return JoinFactors(mask=mask, inputs=PCAs)
###########################################################################
class ExposureMngr(object):
"""
Keep track of leverage and long/short exposure
One Class to rule them all, One Class to define them,
One Class to monitor them all and in the bytecode bind them
Usage:
Define your targets at initialization: I want leverage 1.3 and 60%/40% Long/Short balance
context.exposure = ExposureMngr(target_leverage = 1.3,
target_long_exposure_perc = 0.60,
target_short_exposure_perc = 0.40)
update internal state (open orders and positions)
context.exposure.update(context, data)
After update is called, you can access the following information:
how much cash available for trading
context.exposure.get_available_cash(consider_open_orders = True)
get long and short available cash as two distinct values
context.exposure.get_available_cash_long_short(consider_open_orders = True)
same as account.leverage but this keeps track of open orders
context.exposure.get_current_leverage(consider_open_orders = True)
sum of long and short positions current value
context.exposure.get_exposure(consider_open_orders = True)
get long and short position values as two distinct values
context.exposure.get_long_short_exposure(consider_open_orders = True)
get long and short exposure as percentage
context.exposure.get_long_short_exposure_pct(consider_open_orders = True, consider_unused_cash = True)
"""
def __init__(self, target_leverage = 1.0, target_long_exposure_perc = 0.50, target_short_exposure_perc = 0.50):
self.target_leverage = target_leverage
self.target_long_exposure_perc = target_long_exposure_perc
self.target_short_exposure_perc = target_short_exposure_perc
self.short_exposure = 0.0
self.long_exposure = 0.0
self.open_order_short_exposure = 0.0
self.open_order_long_exposure = 0.0
def get_current_leverage(self, context, consider_open_orders = True):
curr_cash = context.portfolio.cash - (self.short_exposure * 2)
if consider_open_orders:
curr_cash -= self.open_order_short_exposure
curr_cash -= self.open_order_long_exposure
curr_leverage = (context.portfolio.portfolio_value - curr_cash) / context.portfolio.portfolio_value
return curr_leverage
def get_exposure(self, context, consider_open_orders = True):
long_exposure, short_exposure = self.get_long_short_exposure(context, consider_open_orders)
return long_exposure + short_exposure
def get_long_short_exposure(self, context, consider_open_orders = True):
long_exposure = self.long_exposure
short_exposure = self.short_exposure
if consider_open_orders:
long_exposure += self.open_order_long_exposure
short_exposure += self.open_order_short_exposure
return (long_exposure, short_exposure)
def get_long_short_exposure_pct(self, context, consider_open_orders = True, consider_unused_cash = True):
long_exposure, short_exposure = self.get_long_short_exposure(context, consider_open_orders)
total_cash = long_exposure + short_exposure
if consider_unused_cash:
total_cash += self.get_available_cash(context, consider_open_orders)
long_exposure_pct = long_exposure / total_cash if total_cash > 0 else 0
short_exposure_pct = short_exposure / total_cash if total_cash > 0 else 0
return (long_exposure_pct, short_exposure_pct)
def get_available_cash(self, context, consider_open_orders = True):
curr_cash = context.portfolio.cash - (self.short_exposure * 2)
if consider_open_orders:
curr_cash -= self.open_order_short_exposure
curr_cash -= self.open_order_long_exposure
leverage_cash = context.portfolio.portfolio_value * (self.target_leverage - 1.0)
return curr_cash + leverage_cash
def get_available_cash_long_short(self, context, consider_open_orders = True):
total_available_cash = self.get_available_cash(context, consider_open_orders)
long_exposure = self.long_exposure
short_exposure = self.short_exposure
if consider_open_orders:
long_exposure += self.open_order_long_exposure
short_exposure += self.open_order_short_exposure
current_exposure = long_exposure + short_exposure + total_available_cash
target_long_exposure = current_exposure * self.target_long_exposure_perc
target_short_exposure = current_exposure * self.target_short_exposure_perc
long_available_cash = target_long_exposure - long_exposure
short_available_cash = target_short_exposure - short_exposure
return (long_available_cash, short_available_cash)
def update(self, context, data):
#
# calculate cash needed to complete open orders
#
self.open_order_short_exposure = 0.0
self.open_order_long_exposure = 0.0
for stock, orders in get_open_orders().iteritems():
price = data.current(stock, 'price')
if np.isnan(price):
continue
amount = 0 if stock not in context.portfolio.positions else context.portfolio.positions[stock].amount
for oo in orders:
order_amount = oo.amount - oo.filled
if order_amount < 0 and amount <= 0:
self.open_order_short_exposure += (price * -order_amount)
elif order_amount > 0 and amount >= 0:
self.open_order_long_exposure += (price * order_amount)
#
# calculate long/short positions exposure
#
self.short_exposure = 0.0
self.long_exposure = 0.0
for stock, position in context.portfolio.positions.iteritems():
amount = position.amount
last_sale_price = position.last_sale_price
if amount < 0:
self.short_exposure += (last_sale_price * -amount)
elif amount > 0:
self.long_exposure += (last_sale_price * amount)
class OrderMngr(object):
"""
Buy/sell order manager
"""
def __init__(self, sec_volume_limit_perc = None, min_shares_order = 1):
'''
sec_volume_limit_perc : max percentage of stock volume per minute to fill with our orders
min_shares_order : min number of shares to order per stock (this is because brokers
have a minimum fees other than a cost per share)
'''
self.sec_volume_limit_perc = sec_volume_limit_perc
self.min_shares_order = min_shares_order
self.order_queue = {}
def set_orders(self, orders):
self.order_queue = orders
def process_order_queue(self, data):
'''
Scan order queue and perform orders: the order queue allows to spread orders along the
day to avoid excessive slipapge
'''
if not self.order_queue:
return
price = data.current(self.order_queue.keys(), 'price')
volume_history = data.history(self.order_queue.keys(), fields='volume', bar_count=30, frequency='1m')
volume = volume_history.mean()
for sec, amount in self.order_queue.items():
amount = round(amount)
if amount == 0:
del self.order_queue[sec]
continue
if get_open_orders(sec):
continue
if not data.can_trade(sec):
continue
if self.sec_volume_limit_perc is not None:
max_share_allowed = round(self.sec_volume_limit_perc * volume[sec])
if max_share_allowed < self.min_shares_order:
max_share_allowed = self.min_shares_order
allowed_amount = min(amount, max_share_allowed) if amount > 0 else max(amount, -max_share_allowed)
if abs(amount - allowed_amount) >= self.min_shares_order:
amount = allowed_amount
order(sec, amount)
self.order_queue[sec] -= amount
log.debug( '%s $ %d voume %f %% order %d shares remaining %d' %
(str(sec), (price[sec]*amount), float(amount)/volume[sec], amount, self.order_queue[sec]) )
def has_open_orders(self, sec):
if get_open_orders(sec):
return True
if sec in self.order_queue:
return True
return False
def cancel_open_orders(self, data):
for sec, amount in self.order_queue.items():
amount = round(amount)
if amount == 0:
del self.order_queue[sec]
continue
log.warn('Security %s had queued orders (amount=%d): now removed' % (str(sec),amount))
self.order_queue = {}
for security in get_open_orders():
for order in get_open_orders(security):
cancel_order(order)
log.warn('Security %s had open orders: now cancelled' % (str(security)))
def clear_positions(self, context, data, security = None):
if security is not None: # clear security positions
if get_open_orders(security):
return
if not data.can_trade(security):
return
if security in self.order_queue:
del self.order_queue[security]
price = data.current(security, 'price')
order_target_percent(security, 0, style=LimitOrder(price))
else: # clear all positions
for stock in context.portfolio.positions:
if stock is None:
continue
self.clear_positions(context, data, stock)
def get_weights(pipe_out, rank_cols, max_long_sec, max_short_sec, group_neutral):
if group_neutral:
pipe_out = pipe_out[rank_cols + ['group']]
else:
pipe_out = pipe_out[rank_cols]
pipe_out = pipe_out.replace([np.inf, -np.inf], np.nan)
pipe_out = pipe_out.dropna()
def to_weights(factor, is_long_short):
if is_long_short:
demeaned_vals = factor - factor.mean()
return demeaned_vals / demeaned_vals.abs().sum()
else:
return factor / factor.abs().sum()
#
# rank stocks so that we can select long/short ones
#
weights = pd.Series(0., index=pipe_out.index)
for rank_col in rank_cols:
if not group_neutral: # rank regardless of sector code
weights += to_weights(pipe_out[rank_col], True)
else: # weight each sector equally
weights += pipe_out.groupby(['group'])[rank_col].apply(to_weights, True)
if not group_neutral: # rank regardless of sector/group code
longs = weights[ weights > 0 ]
shorts = weights[ weights < 0 ].abs()
if max_long_sec:
longs = longs.order(ascending=False).head(max_long_sec)
if max_short_sec:
shorts = shorts.order(ascending=False).head(max_short_sec)
else: # weight each group/sector equally
sectors = pipe_out['group'].unique()
num_sectors = len(sectors)
longs = pd.Series()
shorts = pd.Series()
for current_sector in sectors:
_w = weights[ pipe_out['group'] == current_sector ]
_longs = _w[ _w > 0 ]
_shorts = _w[ _w < 0 ].abs()
if max_long_sec:
_longs = _longs.order(ascending=False).head(max_long_sec/num_sectors)
if max_short_sec:
_shorts = _shorts.order(ascending=False).head(max_short_sec/num_sectors)
_longs /= _longs.sum()
_shorts /= _shorts.sum()
longs = longs.append( _longs )
shorts = shorts.append( _shorts )
longs = longs[ longs > 0 ]
shorts = shorts[ shorts > 0 ]
longs /= longs.sum()
shorts /= shorts.sum()
return longs, shorts
def add_positions(d1, d2):
return { k:(d1.get(k,0)+d2.get(k,0)) for k in set(d1) | set(d2) if (d1.get(k,0)+d2.get(k,0)) != 0 }
def diff_positions(d1, d2):
return { k:(d1.get(k,0)-d2.get(k,0)) for k in set(d1) | set(d2) if (d1.get(k,0)-d2.get(k,0)) != 0 }
class Strategy(object):
def __init__(self, rebalance_days, max_long_sec, max_short_sec, group_neutral, factors):
self.rebalance_days = rebalance_days
self.max_long_sec = max_long_sec
self.max_short_sec = max_short_sec
self.group_neutral = group_neutral
self.factors = factors
self.shorts = None
self.longs = None
self.curr_day = -1
self.days = {}
def set_weights(self, pipeline_output):
self.longs, self.shorts = get_weights(pipeline_output, self.factors,
self.max_long_sec,self.max_short_sec,
self.group_neutral)
print 'longs weighted (length %d, sum %f):\n' % (len(self.longs.index), self.longs.sum())
print 'shorts weighted (length %d, sum %f):\n' % (len(self.shorts.index), self.shorts.sum())
def expected_positions(self):
expected_positions = {}
for day, pos in self.days.items():
expected_positions = add_positions(expected_positions, pos)
return expected_positions
def fix_positions(self, missing_positions):
# called before rebalance, so self.curr_day is previous day
prev_day = self.curr_day
missing_positions = dict(missing_positions) # copy
if prev_day in self.days:
# update yesterday positions with actual ones
prev_pos = self.days[prev_day]
for sec, amount in missing_positions.items():
if sec in prev_pos:
if amount > 0 and prev_pos[sec] > 0:
fixed_amount = min(prev_pos[sec], amount)
prev_pos[sec] -= fixed_amount
missing_positions[sec] -= fixed_amount
elif amount < 0 and prev_pos[sec] < 0:
fixed_amount = max(prev_pos[sec], amount)
prev_pos[sec] -= fixed_amount
missing_positions[sec] -= fixed_amount
return missing_positions
def rebalance(self, data, long_cash, short_cash):
#
# Move to next rebalancing day
#
self.curr_day = (self.curr_day+1) % self.rebalance_days
#
# Get the positions we previously entered for this day slot
#
prev_positions = self.days[self.curr_day] if self.curr_day in self.days else {}
#
# we share the available cash between every trading day
#
today_long_cash = long_cash / self.rebalance_days
today_short_cash = short_cash / self.rebalance_days
log.debug( 'Curr_day %d today_long_cash %f today_short_cash %f' % (self.curr_day, today_long_cash, today_short_cash) )
#
# calculate new positions
#
new_positions = {}
universe = (self.longs.index | self.shorts.index)
current_price = data.current(universe, 'price')
if today_short_cash > 0:
for sec in self.shorts.index:
amount = - (self.shorts[sec] * today_short_cash / current_price[sec])
new_positions[sec] = round(amount)
if today_long_cash > 0:
for sec in self.longs.index:
amount = self.longs[sec] * today_long_cash / current_price[sec]
new_positions[sec] = round(amount)
# save daily positions so that next time we know what we have to rebalance
self.days[self.curr_day] = new_positions
self.shorts = None
self.longs = None
return new_positions, prev_positions
def high_volume_universe(top_liquid, min_price = None, min_volume = None):
"""
Computes a security universe of liquid stocks and filtering out
hard to trade ones
Returns
-------
high_volume_tradable - zipline.pipeline.filter
"""
if top_liquid == "QTradableStocksUS":
universe = QTradableStocksUS()
elif top_liquid == 500:
universe = Q500US()
elif top_liquid == 1500:
universe = Q1500US()
elif top_liquid == 3000:
universe = Q3000US()
else:
universe = filters.make_us_equity_universe(
target_size=top_liquid,
rankby=factors.AverageDollarVolume(window_length=200),
mask=filters.default_us_equity_universe_mask(),
groupby=Sector(),
max_group_weight=0.3,
smoothing_func=lambda f: f.downsample('month_start'),
)
if min_price is not None:
price = SimpleMovingAverage(inputs=[USEquityPricing.close],
window_length=21, mask=universe)
universe &= (price >= min_price)
if min_volume is not None:
volume = SimpleMovingAverage(inputs=[USEquityPricing.volume],
window_length=21, mask=universe)
universe &= (volume >= min_volume)
return universe
def make_pipeline(context):
universe = high_volume_universe(top_liquid=context.universe_size)
pipe = Pipeline()
pipe.set_screen(universe)
#
# Add grouping factors
#
if context.group_neutral:
group = Sector(mask=universe) # any group you like
pipe.add(group, "group")
#
# Add any custom factor here
#
mr = make_MeanReversionBySector(mask=universe)
pipe.add(mr, "mr")
return pipe
# Put any initialization logic here. The context object will be passed to
# the other methods in your algorithm.
def initialize(context):
#
# Algo configuration
#
set_commission(commission.PerShare(cost=0., min_trade_cost=0))
set_slippage(slippage.FixedSlippage(spread=0.00))
context.exposure = ExposureMngr(target_leverage = 1.0,
target_long_exposure_perc = 0.50,
target_short_exposure_perc = 0.50)
context.order_mngr = OrderMngr(sec_volume_limit_perc = None, # Order all shares in one chunk, no volume limit
min_shares_order = 1) # This makes sense only with volume limit
context.universe_size = "QTradableStocksUS"
context.group_neutral = True
s1 = Strategy(rebalance_days=3, max_long_sec=300, max_short_sec=300,
group_neutral=context.group_neutral,
factors=["mr"])
context.strategies = [s1]
#
# Algo logic starts
#
attach_pipeline(make_pipeline(context), 'factors')
schedule_function(rebalance, date_rules.every_day(), time_rules.market_open())
cancel_open_orders = lambda context, data: context.order_mngr.cancel_open_orders(data)
schedule_function(cancel_open_orders, date_rules.every_day(), time_rules.market_close())
schedule_function(log_stats, date_rules.every_day(), time_rules.market_close())
# Will be called on every trade event for the securities you specify.
def handle_data(context, data):
context.order_mngr.process_order_queue(data)
def before_trading_start(context, data):
pipeout = pipeline_output('factors')
print 'Basket of stocks %d' % len(pipeout)
for strategy in context.strategies:
strategy.set_weights(pipeout)
def rebalance(context, data):
#
# Fix saved positions with actual positions (unfilled/cancelled orders from previous day or
# some existing portfolio positions not generated from this algorithm)
#
expected_positions = {}
for strategy in context.strategies:
expected_positions = add_positions(expected_positions, strategy.expected_positions())
actual_positions = { sec:position.amount for sec, position in context.portfolio.positions.iteritems() }
missing_positions = diff_positions(expected_positions, actual_positions)
for strategy in context.strategies:
missing_positions = strategy.fix_positions(missing_positions)
#
# Calculate how much money we have for rebalancing today
#
context.exposure.update(context, data)
long_available_cash, short_available_cash = context.exposure.get_available_cash_long_short(context)
long_exposure, short_exposure = context.exposure.get_long_short_exposure(context)
long_cash_per_strategy = (long_available_cash + long_exposure) / len(context.strategies)
short_cash_per_strategy = (short_available_cash + short_exposure) / len(context.strategies)
log.debug( 'long_available_cash %f short_available_cash %f long_exposure %f short_exposure %f' %
(long_available_cash, short_available_cash, long_exposure, short_exposure) )
log.debug( 'long_cash_per_strategy %f short_cash_per_strategy %f' % (long_cash_per_strategy, short_cash_per_strategy) )
#
# calculate new positions
#
new_positions = {}
prev_positions = {}
for strategy in context.strategies:
s_new_positions, s_prev_positions = strategy.rebalance(data, long_cash_per_strategy, short_cash_per_strategy)
new_positions = add_positions(new_positions, s_new_positions)
prev_positions = add_positions(prev_positions, s_prev_positions)
#
# get rid of leftovers (maybe the algo was started with a non empty portfolio)
#
prev_positions = diff_positions(prev_positions, missing_positions)
#
# Clear previous positions for this day
#
all_orders = diff_positions(new_positions, prev_positions)
#
# To avoid excessive slippage enter new positions in the order queue instead of ordering now
#
context.order_mngr.set_orders(all_orders)
def log_stats(context, data):
context.exposure.update(context, data)
long_exposure_pct, short_exposure_pct = context.exposure.get_long_short_exposure_pct(context)
record(lever=context.account.leverage,
exposure=context.account.net_leverage,
num_pos=len(context.portfolio.positions),
long_exposure_pct=long_exposure_pct,
short_exposure_pct=short_exposure_pct)