from collections import OrderedDict from time import time import pandas as pd import numpy as np from sklearn import ensemble, preprocessing, metrics, linear_model from sklearn.ensemble import RandomForestClassifier from quantopian.algorithm import ( attach_pipeline, date_rules, order_optimal_portfolio, pipeline_output, record, schedule_function, set_commission, set_slippage, time_rules, ) import quantopian.optimize as opt from quantopian.pipeline import Pipeline from quantopian.pipeline.classifiers.fundamentals import Sector as _Sector from quantopian.pipeline.data import Fundamentals from quantopian.pipeline.data.builtin import USEquityPricing from quantopian.pipeline.factors import ( CustomFactor, Returns, MACDSignal, ) from quantopian.pipeline.filters.fundamentals import Q1500US from zipline.utils.numpy_utils import ( repeat_first_axis, repeat_last_axis, ) # If you have eventvestor, it's a good idea to screen out aquisition targets # Comment out & ~IsAnnouncedAcqTarget() as well. You can also run this over # the free period. # from quantopian.pipeline.filters.eventvestor import IsAnnouncedAcqTarget # Will be split 50% long and 50% short N_STOCKS_TO_TRADE = 200 # Number of days to train the classifier on, easy to run out of memory here ML_TRAINING_WINDOW = 504 # train on returns over N days into the future PRED_N_FORWARD_DAYS = 5 # How often to trade, for daily, set to date_rules.every_day() TRADE_FREQ = date_rules.month_start(days_offset=1) #date_rules.every_day() class Sector(_Sector): window_safe = True WIN_LIMIT = 1 def preprocess(a): a = a.astype(np.float64) a[np.isinf(a)] = np.nan a=np.clip(a,np.nanpercentile(a,WIN_LIMIT),np.nanpercentile(a,100-WIN_LIMIT)) return a quarter_lenght = 65 latest = -1 one_year_ago = -4*quarter_lenght ttm = [ -1, -quarter_lenght, -2*quarter_lenght, -3*quarter_lenght] ttm_py = [-4*quarter_lenght, -5*quarter_lenght, -6*quarter_lenght, -7*quarter_lenght] class DSRI(CustomFactor): """ 1. DSRI = Days Sales in Receivables Index = (Receivables_t / Revenue_t) / (Receivables_t-1 / Revenue_t-1) receivable_turnover = revenue / receivables """ inputs = [Fundamentals.receivable_turnover] window_length = 4*quarter_lenght + 1 def compute(self, today, assets, out, receivable_turnover): dsri = receivable_turnover[one_year_ago] / receivable_turnover[latest] out[:] = preprocess(dsri) class GMI(CustomFactor): """ 2. GMI = Gross Margin Index = GrossMargin_t-1 / GrossMargin_t """ inputs = [Fundamentals.gross_margin] window_length = 4*quarter_lenght + 1 def compute(self, today, assets, out, gross_margin): gmi = gross_margin[one_year_ago] / gross_margin[latest] out[:] = preprocess(gmi) class AQI(CustomFactor): """ 3. AQI = Asset Quality Index AQI = (1 - (CurrentAssets_t + PPE_t) / TotalAssets_t) / (1 - (CurrentAssets_t-1 + PPE_t-1) / TotalAssets_t-1) """ inputs = [Fundamentals.current_assets, Fundamentals.net_ppe, Fundamentals.total_assets] window_length = 4*quarter_lenght + 1 def compute(self, today, assets, out, current_assets, net_ppe, total_assets): aqi = (1 - (current_assets[latest] + net_ppe[latest]) / total_assets[latest]) aqi = aqi / (1 - (current_assets[one_year_ago] + net_ppe[one_year_ago]) / total_assets[one_year_ago]) out[:] = preprocess(aqi) class SGI(CustomFactor): """ 4. SGI = Sales Growth Index = Sales_t / Sales_t-1 """ inputs = [Fundamentals.revenue_growth] window_length = 1 def compute(self, today, assets, out, revenue_growth): sgi = revenue_growth[latest] + 1.0 out[:] = preprocess(sgi) class DEPI(CustomFactor): """ 5. DEPI = Depreciation Index DEPI = (Depreciation_t-1 / (Depreciaton_t-1 + PPE_t-1)) / (Depreciation_t / (Depreciaton_t + PPE_t)) """ inputs = [Fundamentals.net_ppe, Fundamentals.depreciation_amortization_depletion_income_statement] window_length = 7*quarter_lenght + 1 def compute(self, today, assets, out, net_ppe, depreciation): dep_ttm = np.sum(depreciation[ttm], axis=0) dep_ttm_py = np.sum(depreciation[ttm_py], axis=0) depi = (dep_ttm_py / (dep_ttm_py + net_ppe[one_year_ago])) / (dep_ttm / (dep_ttm + net_ppe[latest])) out[:] = preprocess(depi) class SGAI(CustomFactor): """ 6. SGAI = (SG&A_t/Sales_t)/(SG&A_t-1/Sales_t-1) """ inputs = [Fundamentals.selling_general_and_administration, Fundamentals.total_revenue] window_length = 7*quarter_lenght + 1 def compute(self, today, assets, out, selling_general_and_administration,total_revenue): sga_ttm = np.sum(selling_general_and_administration[ttm], axis=0) rev_ttm = np.sum(total_revenue[ttm], axis=0) sga_ttm_py = np.sum(selling_general_and_administration[ttm_py], axis=0) rev_ttm_py = np.sum(total_revenue[ttm_py], axis=0) sgai =(sga_ttm/rev_ttm)/ (sga_ttm_py/rev_ttm_py) out[:] = preprocess(sgai) class ACC(CustomFactor): """ 7. ACC=(Income before extraordinary item-cash from operations)/total asset """ inputs = [Fundamentals.net_income_income_statement, Fundamentals.net_income_extraordinary, Fundamentals.operating_cash_flow, Fundamentals.total_assets] window_length = 7*quarter_lenght + 1 def compute(self, today, assets, out, net_income_income_statement,net_income_extraordinary,operating_cash_flow,total_assets): netinc_ttm = np.sum(net_income_income_statement[ttm], axis=0) extra_ttm=np.sum(np.nan_to_num(net_income_extraordinary[ttm]), axis=0) cfo_ttm = np.sum(operating_cash_flow[ttm], axis=0) acc = (netinc_ttm-extra_ttm-cfo_ttm)/total_assets[latest] out[:] = preprocess(acc) class LEVI(CustomFactor): """ 8. LEVI = (leverage_t) / (leverage_t-1) leverage = debt / assets """ inputs = [Fundamentals.debtto_assets] window_length = 4*quarter_lenght + 1 def compute(self, today, assets, out, debtto_assets): levi = debtto_assets[latest]/debtto_assets[one_year_ago] out[:] = preprocess(levi) dsri = DSRI() gmi = GMI() aqi = AQI() sgi = SGI() depi = DEPI() sgai=SGAI() acc=ACC() levi=LEVI() features = { 'dsri': dsri, 'gmi': gmi, 'aqi': aqi, 'sgi': sgi, 'depi': depi, 'sgai': sgai, 'acc': acc, 'levi': levi, } def shift_mask_data(features, labels, n_forward_days, lower_percentile, upper_percentile): """Align features to the labels ``n_forward_days`` into the future and return the discrete, flattened features and masked labels. Parameters ---------- features : np.ndarray A 3d array of (days, assets, feature). labels : np.ndarray The labels to predict. n_forward_days : int How many days into the future are we predicting? lower_percentile : float The lower percentile in the range [0, 100]. upper_percentile : float The upper percentile in the range [0, 100]. Returns ------- selected_features : np.ndarray The flattened features that are not masked out. selected_labels : np.ndarray The labels that are not masked out. """ # Slice off rolled elements shift_by = n_forward_days + 1 aligned_features = features[:-shift_by] aligned_labels = labels[shift_by:] cutoffs = np.nanpercentile( aligned_labels, [lower_percentile, upper_percentile], axis=1, ) discrete_labels = np.select( [ aligned_labels <= cutoffs[0, :, np.newaxis], aligned_labels >= cutoffs[1, :, np.newaxis], ], [-1, 1], ) # flatten the features per day flattened_features = aligned_features.reshape( -1, aligned_features.shape[-1], ) # Drop stocks that did not move much, meaning they are in between # ``lower_percentile`` and ``upper_percentile``. mask = discrete_labels != 0 selected_features = flattened_features[mask.ravel()] selected_labels = discrete_labels[mask] return selected_features, selected_labels class ML(CustomFactor): """ """ train_on_weekday = 1 def __init__(self, *args, **kwargs): CustomFactor.__init__(self, *args, **kwargs) self._imputer = preprocessing.Imputer() self._scaler = preprocessing.MinMaxScaler() self._classifier = RandomForestClassifier(max_depth=10) self._trained = False #ensemble.AdaBoostClassifier( # random_state=1337, # n_estimators=50, #) def _compute(self, *args, **kwargs): ret = CustomFactor._compute(self, *args, **kwargs) return ret def _train_model(self, today, returns, inputs): log.info('training model for window starting on: {}'.format(today)) imputer = self._imputer scaler = self._scaler classifier = self._classifier features, labels = shift_mask_data( np.dstack(inputs), returns, n_forward_days=PRED_N_FORWARD_DAYS, lower_percentile=30, upper_percentile=70, ) features = scaler.fit_transform(imputer.fit_transform(features)) start = time() classifier.fit(features, labels) log.info('training took {} secs'.format(time() - start)) self._trained = True def _maybe_train_model(self, today, returns, inputs): if (today.weekday() == self.train_on_weekday) or not self._trained: self._train_model(today, returns, inputs) def compute(self, today, assets, out, returns, *inputs): # inputs is a list of factors, for example, assume we have 2 alpha # signals, 3 stocks, and a lookback of 2 days. Each element in the # inputs list will be data of one signal, so len(inputs) == 2. Then # each element will contain a 2-D array of shape [time x stocks]. For # example: # inputs[0]: # [[1, 3, 2], # factor 1 rankings of day t-1 for 3 stocks # [3, 2, 1]] # factor 1 rankings of day t for 3 stocks # inputs[1]: # [[2, 3, 1], # factor 2 rankings of day t-1 for 3 stocks # [1, 2, 3]] # factor 2 rankings of day t for 3 stocks self._maybe_train_model(today, returns, inputs) # Predict # Get most recent factor values (inputs always has the full history) last_factor_values = np.vstack([input_[-1] for input_ in inputs]).T last_factor_values = self._imputer.transform(last_factor_values) last_factor_values = self._scaler.transform(last_factor_values) # Predict the probability for each stock going up # (column 2 of the output of .predict_proba()) and # return it via assignment to out. #out[:] = self._classifier.predict_proba(last_factor_values)[:, 1] out[:] = self._classifier.predict(last_factor_values) def make_ml_pipeline(universe, window_length=21, n_forward_days=5): pipeline_columns = OrderedDict() # ensure that returns is the first input pipeline_columns['Returns'] = Returns( inputs=(USEquityPricing.open,), mask=universe, window_length=n_forward_days + 1, ) # rank all the factors and put them after returns pipeline_columns.update({ k: v.rank(mask=universe) for k, v in list(features.items()) }) # Create our ML pipeline factor. The window_length will control how much # lookback the passed in data will have. pipeline_columns['ML'] = ML( inputs=list(pipeline_columns.values()), window_length=window_length + 1, mask=universe, ) pipeline_columns['Sector'] = Sector() return Pipeline(screen=universe, columns=pipeline_columns) def initialize(context): """ Called once at the start of the algorithm. """ set_slippage(slippage.FixedSlippage(spread=0.00)) set_commission(commission.PerShare(cost=0, min_trade_cost=0)) schedule_function( rebalance, TRADE_FREQ, time_rules.market_open(minutes=1), ) # Record tracking variables at the end of each day. schedule_function( record_vars, date_rules.every_day(), time_rules.market_close(), ) # Set up universe, alphas and ML pipline context.universe = Q1500US() # if you are using IsAnnouncedAcqTarget, uncomment the next line # context.universe &= IsAnnouncedAcqTarget() ml_pipeline = make_ml_pipeline( context.universe, n_forward_days=PRED_N_FORWARD_DAYS, window_length=ML_TRAINING_WINDOW, ) # Create our dynamic stock selector. attach_pipeline(ml_pipeline, 'alpha_model') context.past_predictions = {} context.hold_out_accuracy = 0 context.hold_out_log_loss = 0 context.hold_out_returns_spread_bps = 0 def evaluate_and_shift_hold_out(output, context): # Look at past predictions to evaluate classifier accuracy on hold-out data # A day has passed, shift days and drop old ones context.past_predictions = { k - 1: v for k, v in list(context.past_predictions.items()) if k > 0 } if 0 in context.past_predictions: # Past predictions for the current day exist, so we can use todays' # n-back returns to evaluate them raw_returns = output['Returns'] raw_predictions = context.past_predictions[0] # Join to match up equities returns, predictions = raw_returns.align(raw_predictions, join='inner') # Binarize returns returns_binary = returns > returns.median() predictions_binary = predictions > 0.5 # Compute performance metrics context.hold_out_accuracy = metrics.accuracy_score( returns_binary.values, predictions_binary.values, ) context.hold_out_log_loss = metrics.log_loss( returns_binary.values, predictions.values, ) long_rets = returns[predictions_binary == 1].mean() short_rets = returns[predictions_binary == 0].mean() context.hold_out_returns_spread_bps = (long_rets - short_rets) * 10000 # Store current predictions context.past_predictions[PRED_N_FORWARD_DAYS] = context.predicted_probs def before_trading_start(context, data): """ Called every day before market open. """ output = pipeline_output('alpha_model') context.predicted_probs = output['ML'] context.predicted_probs.index.rename(['date', 'equity'], inplace=True) context.risk_factors = pipeline_output('alpha_model')[['Sector']] context.risk_factors.index.rename(['date', 'equity'], inplace=True) context.risk_factors.Sector = context.risk_factors.Sector.map( Sector.SECTOR_NAMES, ) evaluate_and_shift_hold_out(output, context) # These are the securities that we are interested in trading each day. context.security_list = context.predicted_probs.index def rebalance(context, data): """ Execute orders according to our schedule_function() timing. """ predictions = context.predicted_probs # Filter out stocks that can not be traded predictions = predictions.loc[data.can_trade(predictions.index)] # Select top and bottom N stocks n_long_short = min(N_STOCKS_TO_TRADE // 2, len(predictions) // 2) predictions_top_bottom = pd.concat([ predictions.nlargest(n_long_short), predictions.nsmallest(n_long_short), ]) # If classifier predicts many identical values, the top might contain # duplicate stocks predictions_top_bottom = predictions_top_bottom.iloc[ ~predictions_top_bottom.index.duplicated() ] # predictions are probabilities ranging from 0 to 1 predictions_top_bottom = (predictions_top_bottom - 0.5) * 2 # Setup Optimization Objective objective = opt.MaximizeAlpha(predictions_top_bottom) # Setup Optimization Constraints constrain_gross_leverage = opt.MaxGrossExposure(1.0) constrain_pos_size = opt.PositionConcentration.with_equal_bounds( -0.02, +0.02, ) market_neutral = opt.DollarNeutral() if predictions_top_bottom.index.duplicated().any(): log.debug(predictions_top_bottom.head()) sector_neutral = opt.NetGroupExposure.with_equal_bounds( labels=context.risk_factors.Sector.dropna(), min=-0.0001, max=0.0001, ) # Run the optimization. This will calculate new portfolio weights and # manage moving our portfolio toward the target. order_optimal_portfolio( objective=objective, constraints=[ constrain_gross_leverage, constrain_pos_size, market_neutral, sector_neutral, ], ) def record_vars(context, data): """ Plot variables at the end of each day. """ record( leverage=context.account.leverage, hold_out_accuracy=context.hold_out_accuracy, hold_out_log_loss=context.hold_out_log_loss, hold_out_returns_spread_bps=context.hold_out_returns_spread_bps, ) def handle_data(context, data): pass