import quantopian.algorithm as algo
import quantopian.optimize as opt
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import SimpleMovingAverage
from quantopian.pipeline.filters import QTradableStocksUS
from quantopian.pipeline.experimental import risk_loading_pipeline
from quantopian.pipeline.data import Fundamentals as msf
from quantopian.pipeline.data.factset import Fundamentals as fsf
# Constraint Parameters
MAX_GROSS_LEVERAGE = 1.0
TOTAL_POSITIONS = 2000
# Here we define the maximum position size that can be held for any
# given stock. If you have a different idea of what these maximum
# sizes should be, feel free to change them. Keep in mind that the
# optimizer needs some leeway in order to operate. Namely, if your
# maximum is too small, the optimizer may be overly-constrained.
MAX_SHORT_POSITION_SIZE = 2.0 / TOTAL_POSITIONS
MAX_LONG_POSITION_SIZE = 2.0 / TOTAL_POSITIONS
def initialize(context):
"""
A core function called automatically once at the beginning of a backtest.
Use this function for initializing state or other bookkeeping.
Parameters
----------
context : AlgorithmContext
An object that can be used to store state that you want to maintain in
your algorithm. context is automatically passed to initialize,
before_trading_start, handle_data, and any functions run via schedule_function.
context provides the portfolio attribute, which can be used to retrieve information
about current positions.
"""
algo.attach_pipeline(make_pipeline(), 'long_short_equity_template')
# Attach the pipeline for the risk model factors that we
# want to neutralize in the optimization step. The 'risk_factors' string is
# used to retrieve the output of the pipeline in before_trading_start below.
algo.attach_pipeline(risk_loading_pipeline(), 'risk_factors')
# Schedule rebalance function
algo.schedule_function(func=rebalance,
date_rule=algo.date_rules.every_day(), #week_start(),
time_rule=algo.time_rules.market_open(hours=0, minutes=30),
half_days=True)
# Record portfolio variables at the end of day
algo.schedule_function(func=record_vars,
date_rule=algo.date_rules.every_day(),
time_rule=algo.time_rules.market_close(),
half_days=True)
def make_pipeline():
"""
A function that creates and returns pipeline.
We break this piece of logic out into its own function to make it easier to
test and modify in isolation. In particular, this function can be
copy/pasted into research and run by itself.
Returns
-------
pipe : Pipeline
Represents computation we would like to perform on the assets that make
it through the pipeline screen.
"""
# The factors we create here are based on fundamentals data and a moving
# average of sentiment data
# value = msf.ebit.latest / msf.enterprise_value.latest
# quality = msf.roe.latest
# sentiment_score = SimpleMovingAverage(
# inputs=[stocktwits.bull_minus_bear],
# window_length=3,
# )
use_the_mask_that_works = 1 # 0 to use the original which fails, or 1 to use the mask
universe = QTradableStocksUS()
ebit_oper_ltm = fsf.ebit_oper_ltm.latest
entrpr_val_qf = fsf.entrpr_val_qf.latest #+ 0.00000001
if use_the_mask_that_works: # This works, using the mask in both places
ebit_oper_ltm_win = ebit_oper_ltm.winsorize(min_percentile=0.01, max_percentile=0.99, mask=universe)
entrpr_val_qf_win = entrpr_val_qf.winsorize(min_percentile=0.01, max_percentile=0.99, mask=universe)
elif not use_the_mask_that_works:
ebit_oper_ltm_win = ebit_oper_ltm.winsorize(min_percentile=0.01, max_percentile=0.99)
entrpr_val_qf_win = entrpr_val_qf.winsorize(min_percentile=0.01, max_percentile=0.99)
fsf_value = ebit_oper_ltm_win / entrpr_val_qf_win
# Here we combine winsorized factors, z-scoring them to equalize their influence
combined_factor = (
# value_winsorized.zscore() +
# quality_winsorized.zscore() +
# sentiment_score_winsorized.zscore()
#fsf_value.zscore() # test. also fail if the only change
fsf_value.zscore(mask=universe)
)
# Build Filters representing the top and bottom baskets of stocks by our
# combined ranking system. We'll use these as tradeable universe each
# day.
longs = combined_factor.top (TOTAL_POSITIONS//2, mask=universe)
shorts = combined_factor.bottom(TOTAL_POSITIONS//2, mask=universe)
# The final output of pipeline should only include
# the top/bottom 300 stocks by criteria
#long_short_screen = (longs | shorts)
long_short_screen = ~fsf_value.percentile_between(10, 90, mask=universe)
# Create pipeline
pipe = Pipeline(
columns={
'ebt_ltm' : ebit_oper_ltm,
'ebt_win' : ebit_oper_ltm_win,
'qf' : entrpr_val_qf,
'qf_win' : entrpr_val_qf_win,
'fsf' : fsf_value,
'longs' : longs,
'shorts' : shorts,
'combined_factor': combined_factor,
},
screen=long_short_screen
)
return pipe
def before_trading_start(context, data):
"""
Optional core function called automatically before the open of each market day.
Parameters
----------
context : AlgorithmContext
See description above.
data : BarData
An object that provides methods to get price and volume data, check
whether a security exists, and check the last time a security traded.
"""
# Call algo.pipeline_output to get the output
# Note: this is a dataframe where the index is the SIDs for all
# securities to pass screen and the columns are the factors
# added to the pipeline object above
context.pipeline_data = algo.pipeline_output('long_short_equity_template')
# This dataframe will contain all of risk loadings
context.risk_loadings = algo.pipeline_output('risk_factors')
return
if 'log_data_done' not in context: # show values once
log_data(context, data, context.pipeline_data, 4) # all fields (columns) if unspecified
def record_vars(context, data):
"""
A function scheduled to run every day at market close in order to record
strategy information.
Parameters
----------
context : AlgorithmContext
See description above.
data : BarData
See description above.
"""
# Plot the number of positions over time.
algo.record(num_positions=len(context.portfolio.positions))
# Called at the start of every month in order to rebalance
# the longs and shorts lists
def rebalance(context, data):
"""
A function scheduled to run once every Monday at 10AM ET in order to
rebalance the longs and shorts lists.
Parameters
----------
context : AlgorithmContext
See description above.
data : BarData
See description above.
"""
# Retrieve pipeline output
pipeline_data = context.pipeline_data
risk_loadings = context.risk_loadings
# Here we define objective for the Optimize API. We have
# selected MaximizeAlpha because we believe combined factor
# ranking to be proportional to expected returns. This routine
# will optimize the expected return of algorithm, going
# long on the highest expected return and short on the lowest.
try:
objective = opt.MaximizeAlpha(pipeline_data.combined_factor)
log.info('ok, pipeline_data.combined_factor len {}'.format(len(pipeline_data.combined_factor)))
except:
log_data(context, data, context.pipeline_data, 4)
log.info( algo.pipeline_output('long_short_equity_template') )
return
# Define the list of constraints
constraints = []
# Constrain maximum gross leverage
constraints.append(opt.MaxGrossExposure(MAX_GROSS_LEVERAGE))
# Require algorithm to remain dollar neutral
constraints.append(opt.DollarNeutral())
# Add the RiskModelExposure constraint to make use of the
# default risk model constraints
neutralize_risk_factors = opt.experimental.RiskModelExposure(
risk_model_loadings=risk_loadings,
version=0
)
constraints.append(neutralize_risk_factors)
# With this constraint we enforce that no position can make up
# greater than MAX_SHORT_POSITION_SIZE on the short side and
# no greater than MAX_LONG_POSITION_SIZE on the long side. This
# ensures that we do not overly concentrate portfolio in
# one security or a small subset of securities.
constraints.append(
opt.PositionConcentration.with_equal_bounds(
min=-MAX_SHORT_POSITION_SIZE,
max=MAX_LONG_POSITION_SIZE
))
# Put together all the pieces we defined above by passing
# them into the algo.order_optimal_portfolio function. This handles
# all of ordering logic, assigning appropriate weights
# to the securities in universe to maximize alpha with
# respect to the given constraints.
algo.order_optimal_portfolio(
objective=objective,
constraints=constraints
)
def log_data(context, data, z, num, fields=None):
''' Log info about pipeline output or, z can be any DataFrame or Series
https://www.quantopian.com/posts/overview-of-pipeline-content-easy-to-add-to-your-backtest
'''
# off for clarity here
if 0 and 'log_init_done' not in context: # {:,} magic for adding commas
log.info('${:,} {} to {}'.format(int(context.portfolio.starting_cash),
get_environment('start').date(), get_environment('end').date()))
context.log_data_done = 1
if not len(z):
log.info('Empty')
return
# Options
log_nan_only = 0 # Only log if nans are present
show_sectors = 0 # If sectors, do you want to see them or not
show_sorted_details = 1 # [num] high & low securities sorted, each column
padmax = 6 # num characters for each field, starting point
# Series ......
if 'Series' in str(type(z)): # is Series, not DataFrame
nan_count = len(z[z != z])
nan_count = 'NaNs {}/{}'.format(nan_count, len(z)) if nan_count else ''
if (log_nan_only and nan_count) or not log_nan_only:
pad = max( padmax, len('%.5f' % z.max()) )
log.info('{}{}{} Series len {}'.format('min'.rjust(pad+5),
'mean'.rjust(pad+5), 'max'.rjust(pad+5), len(z)))
log.info('{}{}{} {}'.format(
('%.5f' % z.min()) .rjust(pad+5),
('%.5f' % z.mean()).rjust(pad+5),
('%.5f' % z.max()) .rjust(pad+5),
nan_count
))
log.info('High\n{}'.format(z.sort_values(ascending=False).head(num)))
log.info('Low\n{}' .format(z.sort_values(ascending=False).tail(num)))
return
# DataFrame ......
content_min_max = [ ['','min','mean','max',''] ] ; content = ''
for col in z.columns:
try: z[col].max()
except: continue # skip non-numeric
if col == 'sector' and not show_sectors: continue
nan_count = len(z[col][z[col] != z[col]])
nan_count = 'NaNs {}/{}'.format(nan_count, len(z)) if nan_count else ''
padmax = max( padmax, len(str(z[col].max())) )
content_min_max.append([col, str(z[col] .min()), str(z[col].mean()), str(z[col] .max()), nan_count])
if log_nan_only and nan_count or not log_nan_only:
content = 'Rows: {} Columns: {}'.format(z.shape[0], z.shape[1])
if len(z.columns) == 1: content = 'Rows: {}'.format(z.shape[0])
paddings = [6 for i in range(4)]
for lst in content_min_max: # set max lengths
i = 0
for val in lst[:4]: # value in each sub-list
paddings[i] = max(paddings[i], len(str(val)))
i += 1
headr = content_min_max[0]
content += ('\n{}{}{}{}{}'.format(
headr[0] .rjust(paddings[0]),
(headr[1]).rjust(paddings[1]+5),
(headr[2]).rjust(paddings[2]+5),
(headr[3]).rjust(paddings[3]+5),
''
))
for lst in content_min_max[1:]: # populate content using max lengths
content += ('\n{}{}{}{} {}'.format(
lst[0].rjust(paddings[0]),
lst[1].rjust(paddings[1]+5),
lst[2].rjust(paddings[2]+5),
lst[3].rjust(paddings[3]+5),
lst[4],
))
log.info(content)
if not show_sorted_details: return
if len(z.columns) == 1: return # skip detail if only 1 column
if fields == None: details = z.columns
for detail in details:
if detail == 'sector' and not show_sectors: continue
hi = z[details].sort_values(by=detail, ascending=False).head(num)
lo = z[details].sort_values(by=detail, ascending=False).tail(num)
content = ''
content += ('_ _ _ {} _ _ _' .format(detail))
content += ('\n\t... {} highs\n{}'.format(detail, str(hi)))
content += ('\n\t... {} lows \n{}'.format(detail, str(lo)))
if log_nan_only and not len(lo[lo[detail] != lo[detail]]):
continue # skip if no nans
log.info(content)