Back to Community
alpha factor combination in Pipeline - how to fancify it?

Anyone have guidance on how to combine a set of Pipeline factors using something fancier than the typical sum-of-z-scores method? For example say I wanted to weight each factor by dividing by the variance in its returns? Or compute the minimum variance portfolio of factors? Or perhaps compute the information coefficient for each factor, and use it to drop/add factors?

28 responses

Just found this in the help:

https://www.quantopian.com/help#quantopian_pipeline_CustomFactor

class MultipleOutputs(CustomFactor):  
    inputs = [USEquityPricing.close]  
    outputs = ['alpha', 'beta']  
    window_length = N

    def compute(self, today, assets, out, close):  
        computed_alpha, computed_beta = some_function(close)  
        out.alpha[:] = computed_alpha  
        out.beta[:] = computed_beta

# Each output is returned as its own Factor upon instantiation.  
alpha, beta = MultipleOutputs()

# Equivalently, we can create a single factor instance and access each  
# output as an attribute of that instance.  
multiple_outputs = MultipleOutputs()  
alpha = multiple_outputs.alpha  
beta = multiple_outputs.beta  

So, perhaps for each factor I can output the factor itself and some statistics that could be used to combine it with other factors?

@Grant,
There are two ways...both of which you've participated in before...so perhaps this is not what you want.

A. Use the expression compiler(Pipeline) to combine factors that run across the data.
The best example I know of that does this is variants of the ML pipeline.
I use this, and until it gets passed over the data in before_trading_starts, it's just a big expression of Pipeline code.
Here is the business end of specifying combining a bunch of factors by applying a function to the array of factors.
https://www.quantopian.com/posts/machine-learning-on-quantopian-part-3-building-an-algorithm

def make_ml_pipeline(universe, window_length=21, n_forward_days=5):  
    pipeline_columns = OrderedDict()

    # ensure that returns is the first input  
    pipeline_columns['Returns'] = Returns(  
        inputs=(USEquityPricing.open,),  
        mask=universe, window_length=n_forward_days + 1,  
    )

    # rank all the factors and put them after returns  
    pipeline_columns.update({  
        k: v.rank(mask=universe) for k, v in features.items()  
    })

    # Create our ML pipeline factor. The window_length will control how much  
    # lookback the passed in data will have.  
    pipeline_columns['ML'] = ML(  
        inputs=pipeline_columns.values(),  
        window_length=window_length + 1,  
        mask=universe,  
    )

    pipeline_columns['Sector'] = Sector()

    return Pipeline(screen=universe, columns=pipeline_columns)

B. Take the output of
context.output = algo.pipeline_output("pipe") as an output dataframe, and combine (alpha factors) columns using all the methods you want(e.g. dataframe methods), to create an alpha factor combination as a new column...usually done in something like a rebalance method.

alan

Thanks Alan -

Your "B" above might be the preferred approach, but my understanding is that Pipeline does not neatly support output of a trailing window of values (if I'm wrong, someone please correct me). One can only output a Pandas DataFrame with row labels corresponding to the current universe, and column labels applied within Pipeline. I suppose one could create a column for each trailing day. For example, for 2 alpha factors and 5 days, I would have columns with labels:

alpha00, alpha01, alpha02, alpha03, alpha04,  
alpha10, alpha11, alpha12, alpha13, alpha14  

In a similar fashion, I would add columns for returns:

ret00, ret01, ret02, ret03, ret04,  
ret10, ret11, ret12, ret13, ret14  

Then, in before_trading_start I would do the alpha combination. Alternatively, the alpha combination step could be postponed until during the trading day, if it could be done within ~50 seconds, and it were advantageous (e.g. generally, the alpha combination computation could include mid-day minutely OHLCV data).

This seems workable (and would seem to be the only data structure supported by Pandas anyway, since Pandas Panel has been deprecated...although I see that the non-Pandas xarray has developed an alternative).

Doing the potentially computationally expensive alpha combination within before_trading_start (with a full 5 minutes allocated per day) would seem to be the way to go, versus the chunked computations within Pipeline, over a 10 minute limit (where, for back testing, one gets nowhere near 5 minutes per trading day for computations).

Another potential approach would be to create a separate Pipeline for each trailing day, but this seems very awkward, and my understanding is that one could bump into overhead issues, since each Pipeline runs independently with respect to fetching chunks of data (I think).

@Grant,
Good idea for B!...I'll have to try that!

For A., it's a bit of magic to me, as it seems like Pipeline is a kind of dataflow compiler, and I could never figure out why inputs=pipeline_columns.values() works in ML above, in that the other columns are computed and ready for the ML method to be run. How does that happen?

alan

@ Alan -

I don't think I'll try to sort out how to do the alpha combination within Pipeline, since it would seem advantageous to do it in before_trading_start. Generally, the more I can do in common Python, I think the better off I'll be. I'm sure that the Q Pipeline API is pure wonderfulness, but if it is not used widely, and I can't Google, etc. for help, I lose patience.

Thanks for your feedback, by the way. Somehow, it made me realize that I could get the data required out of Pipeline.

Perhaps try, Grant a routine I use in Pipeline to append data by columns:

steps = np.arange(start, end, interval)  
for s, step in enumerate(steps):  
    a = myCustomFactor(inputs=[myInputs], window_length=step, mask=myUniverse)  
    myPipeline.add(a, 'alpha' + str(s))  

Thanks Karl -

I'll need something like your code to automatically label the alphas and anything else to be spit out by Pipeline.

An example of a possible data structure mapping is given in the "Deprecate Panel" section of https://pandas.pydata.org/pandas-docs/stable/dsintro.html, in the form of a MultiIndex DataFrame:

                     ItemA     ItemB     ItemC  
major      minor  
2000-01-03 A     -0.390201 -1.624062 -0.605044  
           B      1.562443  0.483103  0.583129  
           C     -1.085663  0.768159 -0.273458  
           D      0.136235 -0.021763 -0.700648  
2000-01-04 A      1.207122 -0.758514  0.878404  
           B      0.763264  0.061495 -0.876690  
           C     -1.114738  0.225441 -0.335117  
           D      0.886313 -0.047152 -1.166607  
2000-01-05 A      0.178690 -0.560859 -0.921485  
           B      0.162027  0.240767 -1.919354  
           C     -0.058216  0.543294 -0.476268  
           D     -1.350722  0.088472 -0.367236  
2000-01-06 A     -1.004168 -0.589005 -0.200312  
           B     -0.902704  0.782413 -0.572707  
           C     -0.486768  0.771931 -1.765602  
           D     -0.886348 -0.857435  1.296674  

Using this example, the columns would be SIDs, the major axis would be some sort of datetime stamp, and the minor axis would be the alpha factors along with any additional data required to combine them (N alpha factors, followed by M data sets).

I've never bothered with it, but presumably Pipeline has datetime stamps? I'll have to dig into that one.

Pandas API Reference for MultiIndex and Advance Indexing methods.

Thanks Karl -

That's quite a reference, with lots of examples! I'll have to search around for examples of how to do this:

DataFrame --> MultiIndex

Then I can back out how to construct the DataFrame in Pipeline so that I can create the MultiIndex in before_trading_start without too much fuss.

One possible set of Pipeline columns would be (datetime stamp followed by two alpha factors and the returns of each SID):

dt0, alpha00, alpha01, r0, dt1, alpha10, alpha11, r1, dt2, alpha20, alpha21, r2  

Then, this would be converted into a MultiIndex, with the major axis the datetime stamps, and the minor axis the alphas and the SID returns, with SIDs as the column labels.

Karl -

So are you exporting only point-in-time alpha factors from Pipeline? Or a trailing window of alpha factors?

Point-in-time alpha columns, Grant from Pipeline upon start date then append subsequent point-in-time data columns into a context.DataFrame for global access throughout the backtest period until end date, and beyond for OOS point-in-time where relevant.

Quite simple to initialise a DataFrame in def initialise():

context.myAlpha = pd.DataFrame(np.nan, index=range(0, 1), columns=['alpha0', 'alpha1', 'alpha2', 'alpha3'])  

Thanks Karl -

The thing is, when you start a backtest or live trading, you have to wait N days to fill whatever trailing window you need. If I wanted a year's worth of data, then I don't think building up the data set would be workable. The trailing window needs to come out of Pipeline from the get-go.

Another degree of freedom to crack this nut would be to have a separate pipeline for each alpha factor, to help with the bookkeeping. Then one could output lagged values of the factor along the columns of its pipeline output (e.g. see https://www.quantopian.com/posts/get-lagged-output-of-pipeline-custom-factor). Then, everything could be pieced together in before_trading_start.

That's creative thinking, Grant :) I may try that too!

The attached algo is an example of how to do simple alpha combination in before_trading_start. Each alpha is a daily vector stored in the pandas dataframe as alpha_N. This is accomplished with:

def factor_pipeline():  
    factors = make_factors()  
    pipeline_columns = {}  
    for k,f in enumerate(factors):  
        pipeline_columns['alpha_'+str(k)] = f(mask=QTradableStocksUS())  
    pipe = Pipeline(columns = pipeline_columns,  
    screen = QTradableStocksUS())  
    return pipe  

If a trailing window of alpha values is needed in before_trading_start then one tidy approach would be to use a separate pipeline for each alpha factor. Then each column could correspond to a set of lagged alpha values. For example, for the Nth alpha factor (e.g. N = 2), the columns could be labelled (for a 5-day window):

alpha_2_0, alpha_2_1, alpha_2_2, alpha_N_3, alpha_2_4

The announcement for multiple pipelines is https://www.quantopian.com/posts/multiple-pipelines-available-in-algorithms. It is worth noting:

Multiple pipelines can easily lead to a slowdown in your algorithm, because the pipeline machinery can optimize your data fetching within a single pipeline, but does not optimize data fetching across separate pipelines. In general, it's better to use a single pipeline.

Note also this announcement: https://www.quantopian.com/posts/before-trading-start-timeout-fix. It basically says that for all pipelines combined, 10 minutes per trading day is allocated (however, large chunks of data are processed, as discussed here). The change also enables a full, predictable 5 minutes per trading day for before_trading_start (and so, my thinking is that alpha combination should be done there, and not in pipeline, even though it is awkward to get the data out).

Clone Algorithm
72
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output, order_optimal_portfolio
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import CustomFactor, SimpleBeta, Returns
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import Fundamentals
import quantopian.optimize as opt
from sklearn import preprocessing
from quantopian.pipeline.experimental import risk_loading_pipeline
from quantopian.pipeline.filters import QTradableStocksUS
from quantopian.pipeline.data.psychsignal import stocktwits
from scipy.stats.mstats import winsorize
from zipline.utils.numpy_utils import (
    repeat_first_axis,
    repeat_last_axis,
)
from quantopian.pipeline.data import factset
 
import numpy as np

#############################
# algo settings

WIN_LIMIT = 0 # winsorize limit in factor preprocess function

# Optimize API constraints
MAX_POSITION_SIZE = 0.01 # set to 0.01 for ~100 positions
BETA_EXPOSURE = 0
USE_MaxTurnover = True # set to True to use Optimize API MaxTurnover constraint
MIN_TURN = 0.15 # Optimize API MaxTurnover constraint (if optimize fails, incrementally higher constraints will be attempted)

#############################

def preprocess(a):
    
    a = a.astype(np.float64)
    
    a[np.isinf(a)] = np.nan
    
    a = np.nan_to_num(a - np.nanmean(a))
    
    a = winsorize(a, limits=[WIN_LIMIT,WIN_LIMIT])
    
    return preprocessing.scale(a)

def make_factors():
    
    class MessageSum(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, stocktwits.bull_scored_messages, stocktwits.bear_scored_messages, stocktwits.total_scanned_messages]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, high, low, close, bull, bear, total):
            v = np.nansum((high-low)/close, axis=0)
            out[:] = preprocess(v*np.nansum(total*(bear-bull), axis=0))
                
    class fcf(CustomFactor):
        inputs = [Fundamentals.fcf_yield]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf_yield):
            out[:] = preprocess(np.nan_to_num(fcf_yield[-1,:]))
                
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, open, close):
            p = (close-open)/close
            out[:] = preprocess(np.nansum(-p,axis=0))
                
    class mean_rev(CustomFactor):   
        inputs = [USEquityPricing.high,USEquityPricing.low,USEquityPricing.close]
        window_length = 30
        window_safe = True
        def compute(self, today, assets, out, high, low, close):
            
            p = (high+low+close)/3
 
            m = len(close[0,:])
            n = len(close[:,0])
                
            b = np.zeros(m)
            a = np.zeros(m)
                
            for k in range(10,n+1):
                price_rel = np.nanmean(p[-k:,:],axis=0)/p[-1,:]
                wt = np.nansum(price_rel)
                b += wt*price_rel
                price_rel = 1.0/price_rel
                wt = np.nansum(price_rel)
                a += wt*price_rel
                
            out[:] = preprocess(b-a)
                
    class volatility(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume]
        window_length = 5
        window_safe = True
        def compute(self, today, assets, out, high, low, close, volume):
            vol = np.nansum(volume,axis=0)*np.nansum(np.absolute((high-low)/close),axis=0)
            out[:] = preprocess(-vol)
                
    class growthscore(CustomFactor):
        inputs = [Fundamentals.growth_score]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, growth_score):
            out[:] = preprocess(growth_score[-1,:])
                
    class peg_ratio(CustomFactor):
        inputs = [Fundamentals.peg_ratio]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, peg_ratio):
            out[:] = preprocess(-1.0/peg_ratio[-1,:])
                
    class MoneyflowVolume5d(CustomFactor):
        inputs = (USEquityPricing.close, USEquityPricing.volume)
 
        # we need one more day to get the direction of the price on the first
        # day of our desired window of 5 days
        window_length = 6
        window_safe = True
            
        def compute(self, today, assets, out, close_extra, volume_extra):
            # slice off the extra row used to get the direction of the close
            # on the first day
            close = close_extra[1:]
            volume = volume_extra[1:]
                
            dollar_volume = close * volume
            denominator = dollar_volume.sum(axis=0)
                
            difference = np.diff(close_extra, axis=0)
            direction = np.where(difference > 0, 1, -1)
            numerator = (direction * dollar_volume).sum(axis=0)
                
            out[:] = preprocess(-np.divide(numerator, denominator))
                
    class Trendline(CustomFactor):
        inputs = [USEquityPricing.close]
        window_length = 252
        window_safe = True
            
        _x = np.arange(window_length)
        _x_var = np.var(_x)
 
        def compute(self, today, assets, out, close):
            
            x_matrix = repeat_last_axis(
            (self.window_length - 1) / 2 - self._x,
            len(assets),
            )
 
            y_bar = np.nanmean(close, axis=0)
            y_bars = repeat_first_axis(y_bar, self.window_length)
            y_matrix = close - y_bars
 
            out[:] = preprocess(-np.divide(
            (x_matrix * y_matrix).sum(axis=0) / self._x_var,
            self.window_length
            ))
                
    class SalesGrowth(CustomFactor):
        inputs = [factset.Fundamentals.sales_gr_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, sales_growth):
            sales_growth = np.nan_to_num(sales_growth)
            sales_growth = preprocessing.scale(sales_growth,axis=0)
            out[:] = preprocess(sales_growth[-1])
 
    class GrossMarginChange(CustomFactor):
        window_length = 2*252
        window_safe = True
        inputs = [factset.Fundamentals.ebit_oper_mgn_qf]
        def compute(self, today, assets, out, ebit_oper_mgn):
            ebit_oper_mgn = np.nan_to_num(ebit_oper_mgn)
            ebit_oper_mgn = preprocessing.scale(ebit_oper_mgn,axis=0)
            out[:] = preprocess(ebit_oper_mgn[-1])
 
    class Gross_Income_Margin(CustomFactor):
        #Gross Income Margin:
        #Gross Profit divided by Net Sales
        #Notes:
        #High value suggests that the company is generating large profits
        inputs = [Fundamentals.cost_of_revenue, Fundamentals.total_revenue]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, cost_of_revenue, sales):
            gross_income_margin = sales[-1]/sales[-1] - cost_of_revenue[-1]/sales[-1]
            out[:] = preprocess(-gross_income_margin)
 
    class MaxGap(CustomFactor): 
        # the biggest absolute overnight gap in the previous 90 sessions
        inputs = [USEquityPricing.close] ; window_length = 90
        window_safe = True
        def compute(self, today, assets, out, close):
            abs_log_rets = np.abs(np.diff(np.log(close),axis=0))
            max_gap = np.max(abs_log_rets, axis=0)
            out[:] = preprocess(max_gap)
        
    class CapEx_Vol(CustomFactor):
        inputs=[
            factset.Fundamentals.capex_assets_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, capex_assets):
                 
            out[:] = preprocess(-np.ptp(capex_assets,axis=0))
                
    class fcf_ev(CustomFactor):
        inputs=[
            Fundamentals.fcf_per_share,
            Fundamentals.shares_outstanding,
            Fundamentals.enterprise_value,]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, shares, ev):
            v = fcf*shares/ev
            v[np.isinf(v)] = np.nan
                 
            out[:] = preprocess(v[-1])
                
    class DebtToTotalAssets(CustomFactor):
        inputs = [Fundamentals.long_term_debt,
            Fundamentals.current_debt,
            Fundamentals.cash_and_cash_equivalents,
            Fundamentals.total_assets]
        window_length = 1
        window_safe = True
            
        def compute(self, today, assets, out, ltd, std, cce, ta):
            std_part = np.maximum(std - cce, np.zeros(std.shape))
            v = np.divide(ltd + std_part, ta)
            v[np.isinf(v)] = np.nan
            out[:] = preprocess(np.ravel(v))
                
    class TEM(CustomFactor):
        """
        TEM = standard deviation of past 6 quarters' reports
        """
        inputs=[factset.Fundamentals.capex_qf_asof_date,
            factset.Fundamentals.capex_qf,
            factset.Fundamentals.assets]
        window_length = 390
        window_safe = True
        def compute(self, today, assets, out, asof_date, capex, total_assets):
            values = capex/total_assets
            values[np.isinf(values)] = np.nan
            out_temp = np.zeros_like(values[-1,:])
            for column_ix in range(asof_date.shape[1]):
                _, unique_indices = np.unique(asof_date[:, column_ix], return_index=True)
                quarterly_values = values[unique_indices, column_ix]
                if len(quarterly_values) < 6:
                    quarterly_values = np.hstack([
                    np.repeat([np.nan], 6 - len(quarterly_values)),
                    quarterly_values,
                    ])
            
                out_temp[column_ix] = np.std(quarterly_values[-6:])
                
            out[:] = preprocess(-out_temp)
                
    class Piotroski(CustomFactor):
        inputs = [
                Fundamentals.roa,
                Fundamentals.operating_cash_flow,
                Fundamentals.cash_flow_from_continuing_operating_activities,
                Fundamentals.long_term_debt_equity_ratio,
                Fundamentals.current_ratio,
                Fundamentals.shares_outstanding,
                Fundamentals.gross_margin,
                Fundamentals.assets_turnover,
                ]
 
        window_length = 100
        window_safe = True
    
        def compute(self, today, assets, out,roa, cash_flow, cash_flow_from_ops, long_term_debt_ratio, current_ratio, shares_outstanding, gross_margin, assets_turnover):
            
            profit = (
                        (roa[-1] > 0).astype(int) +
                        (cash_flow[-1] > 0).astype(int) +
                        (roa[-1] > roa[0]).astype(int) +
                        (cash_flow_from_ops[-1] > roa[-1]).astype(int)
                    )
        
            leverage = (
                        (long_term_debt_ratio[-1] < long_term_debt_ratio[0]).astype(int) +
                        (current_ratio[-1] > current_ratio[0]).astype(int) + 
                        (shares_outstanding[-1] <= shares_outstanding[0]).astype(int)
                        )
        
            operating = (
                        (gross_margin[-1] > gross_margin[0]).astype(int) +
                        (assets_turnover[-1] > assets_turnover[0]).astype(int)
                        )
        
            out[:] = preprocess(profit + leverage + operating)
            
    class Altman_Z(CustomFactor):
        inputs=[factset.Fundamentals.zscore_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, zscore_qf):
            out[:] = preprocess(zscore_qf[-1])
                
    class Quick_Ratio(CustomFactor):
        inputs=[factset.Fundamentals.quick_ratio_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, quick_ratio_qf):
            out[:] = preprocess(quick_ratio_qf[-1])
                
    class AdvancedMomentum(CustomFactor):
        inputs = (USEquityPricing.close, Returns(window_length=126))
        window_length = 252
        window_safe = True
 
        def compute(self, today, assets, out, prices, returns):
            am = np.divide(
            (
            (prices[-21] - prices[-252]) / prices[-252] -
            prices[-1] - prices[-21]
            ) / prices[-21],
            np.nanstd(returns, axis=0)
            )
                
            out[:] = preprocess(-am)

    factors = [
            MessageSum,
            fcf,
            Direction,
            mean_rev,
            volatility,
            growthscore,
            peg_ratio,
            MoneyflowVolume5d,
            Trendline,
            SalesGrowth,
            GrossMarginChange,
            Gross_Income_Margin,
            MaxGap,
            CapEx_Vol,
            fcf_ev,
            DebtToTotalAssets,
            TEM,
            Piotroski,
            Altman_Z,
            Quick_Ratio,
            AdvancedMomentum,
        ]
    
    return factors
 
def factor_pipeline():
    
    factors = make_factors()
    
    pipeline_columns = {}
    for k,f in enumerate(factors):
        pipeline_columns['alpha_'+str(k)] = f(mask=QTradableStocksUS())
 
    pipe = Pipeline(columns = pipeline_columns,
    screen = QTradableStocksUS())
    return pipe

def beta_pipeline():
    
    beta = SimpleBeta(target=sid(8554),regression_length=260,
                      allowed_missing_percentage=1.0
                     )
    
    pipe = Pipeline(columns = {'beta': beta},
    screen = QTradableStocksUS())
    return pipe
    
def initialize(context):
 
    attach_pipeline(factor_pipeline(), 'factor_pipeline')
    attach_pipeline(risk_loading_pipeline(), 'risk_loading_pipeline')
    attach_pipeline(beta_pipeline(), 'beta_pipeline')
 
    # Schedule my rebalance function
    schedule_function(func=rebalance,
                      date_rule=date_rules.every_day(),
                      time_rule=time_rules.market_open(minutes=60),
                      half_days=True)
    # record my portfolio variables at the end of day
    schedule_function(func=recording_statements,
                      date_rule=date_rules.every_day(),
                      time_rule=time_rules.market_close(),
                      half_days=True)
    
    context.init = True
    
    # set_commission(commission.PerShare(cost=0, min_trade_cost=0))
    # set_slippage(slippage.FixedSlippage(spread=0))
    
def before_trading_start(context, data):
 
    risk_loadings = pipeline_output('risk_loading_pipeline')
    risk_loadings.fillna(risk_loadings.median(), inplace=True)
    context.risk_loadings = risk_loadings
    context.beta_pipeline = pipeline_output('beta_pipeline')
    
    context.combined_alpha = pipeline_output('factor_pipeline').sum(axis=1)
    
def recording_statements(context, data):
 
    record(num_positions=len(context.portfolio.positions))
    record(leverage=context.account.leverage)
 
def rebalance(context, data):
    
    combined_alpha = context.combined_alpha
 
    # demean and normalize
    combined_alpha = combined_alpha - combined_alpha.mean()
    denom = combined_alpha.abs().sum()
    combined_alpha = combined_alpha/denom
    
    objective = opt.MaximizeAlpha(combined_alpha)
    
    constraints = []
    
    constraints.append(opt.MaxGrossExposure(1.0))
    
    constraints.append(opt.DollarNeutral())
    
    constraints.append(
        opt.PositionConcentration.with_equal_bounds(
            min=-MAX_POSITION_SIZE,
            max=MAX_POSITION_SIZE
        ))
    
    risk_model_exposure = opt.experimental.RiskModelExposure(
        context.risk_loadings,
        version=opt.Newest,
    )
      
    constraints.append(risk_model_exposure)
    
    beta_neutral = opt.FactorExposure(
        loadings=context.beta_pipeline[['beta']],
        min_exposures={'beta':-BETA_EXPOSURE},
        max_exposures={'beta':BETA_EXPOSURE}
        )
    constraints.append(beta_neutral)
    
    if context.init:
        order_optimal_portfolio(
                objective=objective,
                constraints=constraints,
                )
        if USE_MaxTurnover:
            context.init = False
        return
    
    turnover = np.linspace(MIN_TURN,0.65,num=100)
    
    for max_turnover in turnover:
        
        constraints.append(opt.MaxTurnover(max_turnover))
        
        try:
            order_optimal_portfolio(
                objective=objective,
                constraints=constraints,
                )
            constraints = constraints[:-1]
            record(max_turnover = max_turnover)
            return
        except:
            constraints = constraints[:-1]
There was a runtime error.

On your thinking, Grant that:

for before_trading_start ..that alpha combination should be
done there, and not in pipeline

It works fine in before_trading_start when the scheduled_function is daily (as in yours) though I noticed before_trading_start keeps running everyday even if the schedule_function is weekly.

In algorithms where scheduled weekly, perhaps alpha combination (if outside Pipeline) may have move further down in the order of work flow.

Hi Karl -

The function before_trading_start is special in that a full 5 minutes is allocated for it to complete every trading day. During the trading day, in any given minute, everything has to be done in ~ 50 secs.

You can still use before_trading_startfor less frequent trading. For example, say you wanted to rebalance every Thursday. Schedule a function for Wednesday just to set a flag for the next day, context.run_alpha_combination = True. Then in before_trading_start run if context.run_alpha_combination = True and set context.run_alpha_combination = False once the run is complete.

Here's a first-cut at assigning each alpha factor its own pipeline, for the purpose of exporting a trailing window of each alpha factor to before_trading_start. For the example, I've limited the number of factors to 2, and the length of the trailing window to 5 days.

Clone Algorithm
224
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import CustomFactor, SimpleBeta, Returns
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import Fundamentals
import quantopian.optimize as opt
from sklearn import preprocessing
from quantopian.pipeline.experimental import risk_loading_pipeline
from quantopian.pipeline.filters import QTradableStocksUS
from quantopian.pipeline.data.psychsignal import stocktwits
from scipy.stats.mstats import winsorize
from zipline.utils.numpy_utils import (
    repeat_first_axis,
    repeat_last_axis,
)
from quantopian.pipeline.data import factset

from scipy.stats.mstats import gmean
 
import numpy as np

WIN_LIMIT = 0
N_FACTOR_WINDOW = 5 # trailing window of alpha factors exported to before_trading_start

def preprocess(a):
    
    a = a.astype(np.float64)
    
    a[np.isinf(a)] = np.nan
    
    a = np.nan_to_num(a - np.nanmean(a))
    
    a = winsorize(a, limits=[WIN_LIMIT,WIN_LIMIT])
    
    return preprocessing.scale(a)

def make_factors():
    
    class MessageSum(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, stocktwits.bull_scored_messages, stocktwits.bear_scored_messages, stocktwits.total_scanned_messages]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, high, low, close, bull, bear, total):
            v = np.nansum((high-low)/close, axis=0)
            out[:] = preprocess(v*np.nansum(total*(bear-bull), axis=0))
                
    class fcf(CustomFactor):
        inputs = [Fundamentals.fcf_yield]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf_yield):
            out[:] = preprocess(np.nan_to_num(fcf_yield[-1,:]))
                
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, open, close):
            p = (close-open)/close
            out[:] = preprocess(np.nansum(-p,axis=0))
                
    class mean_rev(CustomFactor):   
        inputs = [USEquityPricing.high,USEquityPricing.low,USEquityPricing.close]
        window_length = 30
        window_safe = True
        def compute(self, today, assets, out, high, low, close):
            
            p = (high+low+close)/3
 
            m = len(close[0,:])
            n = len(close[:,0])
                
            b = np.zeros(m)
            a = np.zeros(m)
                
            for k in range(10,n+1):
                price_rel = np.nanmean(p[-k:,:],axis=0)/p[-1,:]
                wt = np.nansum(price_rel)
                b += wt*price_rel
                price_rel = 1.0/price_rel
                wt = np.nansum(price_rel)
                a += wt*price_rel
                
            out[:] = preprocess(b-a)
                
    class volatility(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume]
        window_length = 5
        window_safe = True
        def compute(self, today, assets, out, high, low, close, volume):
            vol = np.nansum(volume,axis=0)*np.nansum(np.absolute((high-low)/close),axis=0)
            out[:] = preprocess(-vol)
                
    class growthscore(CustomFactor):
        inputs = [Fundamentals.growth_score]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, growth_score):
            out[:] = preprocess(growth_score[-1,:])
                
    class peg_ratio(CustomFactor):
        inputs = [Fundamentals.peg_ratio]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, peg_ratio):
            out[:] = preprocess(-1.0/peg_ratio[-1,:])
                
    class MoneyflowVolume5d(CustomFactor):
        inputs = (USEquityPricing.close, USEquityPricing.volume)
 
        # we need one more day to get the direction of the price on the first
        # day of our desired window of 5 days
        window_length = 6
        window_safe = True
            
        def compute(self, today, assets, out, close_extra, volume_extra):
            # slice off the extra row used to get the direction of the close
            # on the first day
            close = close_extra[1:]
            volume = volume_extra[1:]
                
            dollar_volume = close * volume
            denominator = dollar_volume.sum(axis=0)
                
            difference = np.diff(close_extra, axis=0)
            direction = np.where(difference > 0, 1, -1)
            numerator = (direction * dollar_volume).sum(axis=0)
                
            out[:] = preprocess(-np.divide(numerator, denominator))
                
    class Trendline(CustomFactor):
        inputs = [USEquityPricing.close]
        window_length = 252
        window_safe = True
            
        _x = np.arange(window_length)
        _x_var = np.var(_x)
 
        def compute(self, today, assets, out, close):
            
            x_matrix = repeat_last_axis(
            (self.window_length - 1) / 2 - self._x,
            len(assets),
            )
 
            y_bar = np.nanmean(close, axis=0)
            y_bars = repeat_first_axis(y_bar, self.window_length)
            y_matrix = close - y_bars
 
            out[:] = preprocess(-np.divide(
            (x_matrix * y_matrix).sum(axis=0) / self._x_var,
            self.window_length
            ))
                
    class SalesGrowth(CustomFactor):
        inputs = [factset.Fundamentals.sales_gr_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, sales_growth):
            sales_growth = np.nan_to_num(sales_growth)
            sales_growth = preprocessing.scale(sales_growth,axis=0)
            out[:] = preprocess(sales_growth[-1])
 
    class GrossMarginChange(CustomFactor):
        window_length = 2*252
        window_safe = True
        inputs = [factset.Fundamentals.ebit_oper_mgn_qf]
        def compute(self, today, assets, out, ebit_oper_mgn):
            ebit_oper_mgn = np.nan_to_num(ebit_oper_mgn)
            ebit_oper_mgn = preprocessing.scale(ebit_oper_mgn,axis=0)
            out[:] = preprocess(ebit_oper_mgn[-1])
 
    class Gross_Income_Margin(CustomFactor):
        #Gross Income Margin:
        #Gross Profit divided by Net Sales
        #Notes:
        #High value suggests that the company is generating large profits
        inputs = [Fundamentals.cost_of_revenue, Fundamentals.total_revenue]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, cost_of_revenue, sales):
            gross_income_margin = sales[-1]/sales[-1] - cost_of_revenue[-1]/sales[-1]
            out[:] = preprocess(-gross_income_margin)
 
    class MaxGap(CustomFactor): 
        # the biggest absolute overnight gap in the previous 90 sessions
        inputs = [USEquityPricing.close] ; window_length = 90
        window_safe = True
        def compute(self, today, assets, out, close):
            abs_log_rets = np.abs(np.diff(np.log(close),axis=0))
            max_gap = np.max(abs_log_rets, axis=0)
            out[:] = preprocess(max_gap)
        
    class CapEx_Vol(CustomFactor):
        inputs=[
            factset.Fundamentals.capex_assets_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, capex_assets):
                 
            out[:] = preprocess(-np.ptp(capex_assets,axis=0))
                
    class fcf_ev(CustomFactor):
        inputs=[
            Fundamentals.fcf_per_share,
            Fundamentals.shares_outstanding,
            Fundamentals.enterprise_value,]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, shares, ev):
            v = fcf*shares/ev
            v[np.isinf(v)] = np.nan
                 
            out[:] = preprocess(v[-1])
                
    class DebtToTotalAssets(CustomFactor):
        inputs = [Fundamentals.long_term_debt,
            Fundamentals.current_debt,
            Fundamentals.cash_and_cash_equivalents,
            Fundamentals.total_assets]
        window_length = 1
        window_safe = True
            
        def compute(self, today, assets, out, ltd, std, cce, ta):
            std_part = np.maximum(std - cce, np.zeros(std.shape))
            v = np.divide(ltd + std_part, ta)
            v[np.isinf(v)] = np.nan
            out[:] = preprocess(np.ravel(v))
                
    class TEM(CustomFactor):
        """
        TEM = standard deviation of past 6 quarters' reports
        """
        inputs=[factset.Fundamentals.capex_qf_asof_date,
            factset.Fundamentals.capex_qf,
            factset.Fundamentals.assets]
        window_length = 390
        window_safe = True
        def compute(self, today, assets, out, asof_date, capex, total_assets):
            values = capex/total_assets
            values[np.isinf(values)] = np.nan
            out_temp = np.zeros_like(values[-1,:])
            for column_ix in range(asof_date.shape[1]):
                _, unique_indices = np.unique(asof_date[:, column_ix], return_index=True)
                quarterly_values = values[unique_indices, column_ix]
                if len(quarterly_values) < 6:
                    quarterly_values = np.hstack([
                    np.repeat([np.nan], 6 - len(quarterly_values)),
                    quarterly_values,
                    ])
            
                out_temp[column_ix] = np.std(quarterly_values[-6:])
                
            out[:] = preprocess(-out_temp)
                
    class Piotroski(CustomFactor):
        inputs = [
                Fundamentals.roa,
                Fundamentals.operating_cash_flow,
                Fundamentals.cash_flow_from_continuing_operating_activities,
                Fundamentals.long_term_debt_equity_ratio,
                Fundamentals.current_ratio,
                Fundamentals.shares_outstanding,
                Fundamentals.gross_margin,
                Fundamentals.assets_turnover,
                ]
 
        window_length = 100
        window_safe = True
    
        def compute(self, today, assets, out,roa, cash_flow, cash_flow_from_ops, long_term_debt_ratio, current_ratio, shares_outstanding, gross_margin, assets_turnover):
            
            profit = (
                        (roa[-1] > 0).astype(int) +
                        (cash_flow[-1] > 0).astype(int) +
                        (roa[-1] > roa[0]).astype(int) +
                        (cash_flow_from_ops[-1] > roa[-1]).astype(int)
                    )
        
            leverage = (
                        (long_term_debt_ratio[-1] < long_term_debt_ratio[0]).astype(int) +
                        (current_ratio[-1] > current_ratio[0]).astype(int) + 
                        (shares_outstanding[-1] <= shares_outstanding[0]).astype(int)
                        )
        
            operating = (
                        (gross_margin[-1] > gross_margin[0]).astype(int) +
                        (assets_turnover[-1] > assets_turnover[0]).astype(int)
                        )
        
            out[:] = preprocess(profit + leverage + operating)
            
    class Altman_Z(CustomFactor):
        inputs=[factset.Fundamentals.zscore_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, zscore_qf):
            out[:] = preprocess(zscore_qf[-1])
                
    class Quick_Ratio(CustomFactor):
        inputs=[factset.Fundamentals.quick_ratio_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, quick_ratio_qf):
            out[:] = preprocess(quick_ratio_qf[-1])
                
    class AdvancedMomentum(CustomFactor):
        inputs = (USEquityPricing.close, Returns(window_length=126))
        window_length = 252
        window_safe = True
 
        def compute(self, today, assets, out, prices, returns):
            am = np.divide(
            (
            (prices[-21] - prices[-252]) / prices[-252] -
            prices[-1] - prices[-21]
            ) / prices[-21],
            np.nanstd(returns, axis=0)
            )
                
            out[:] = preprocess(-am)

    class STA(CustomFactor):  
        inputs = [Fundamentals.operating_cash_flow,  
                  Fundamentals.net_income_continuous_operations,  
                  Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ocf, ni, ta):  
            ta = np.where(np.isnan(ta), 0, ta)  
            ocf = np.where(np.isnan(ocf), 0, ocf)  
            ni = np.where(np.isnan(ni), 0, ni)  
            out[:] = preprocess(abs(ni[-1] - ocf[-1])/ ta[-1])
            
    class SNOA(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                 Fundamentals.cash_and_cash_equivalents,  
                 Fundamentals.current_debt, # same as short-term debt?  
                 Fundamentals.minority_interest_balance_sheet,  
                 Fundamentals.long_term_debt, # check same?  
                 Fundamentals.preferred_stock] # check same?  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ta, cace, cd, mi, ltd, ps):  
            ta = np.where(np.isnan(ta), 0, ta)  
            cace = np.where(np.isnan(cace), 0, cace)  
            cd = np.where(np.isnan(cd), 0, cd)  
            mi = np.where(np.isnan(mi), 0, mi)  
            ltd = np.where(np.isnan(ltd), 0, ltd)  
            ps = np.where(np.isnan(ps), 0, ps)  
            results = ((ta[-1]-cace[-1])-(ta[-1]-cace[-1]-ltd[-1]-cd[-1]-ps[-1]-mi[-1]))/ta[-1]  
            out[:] = preprocess(np.where(np.isnan(results),0,results))
            
    class ROA(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(np.where(roa[-1]>0,1,0))
            
    class FCFTA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                 Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>0,1,0))
            
    class ROA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = np.where(roa[-1]>roa[-252],1,0)
            
    class FCFTA_ROA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets,  
                  Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta, roa):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>roa[-1],1,0))
            
    class FCFTA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>fcf[-252]/ta[-252],1,0))
            
    class LTD_GROWTH(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                  Fundamentals.long_term_debt]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, ta, ltd):  
            out[:] = preprocess(np.where(ltd[-1]/ta[-1]<ltd[-252]/ta[-252],1,0))
            
    class CR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.current_ratio]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, cr):  
            out[:] = preprocess(np.where(cr[-1]>cr[-252],1,0))
            
    class GM_GROWTH(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.where(gm[-1]>gm[-252],1,0))
            
    class ATR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.assets_turnover]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, atr):  
            out[:] = preprocess(np.where(atr[-1]>atr[-252],1,0))
            
    class NEQISS(CustomFactor):  
        inputs = [Fundamentals.shares_outstanding]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, so):  
            out[:] = preprocess(np.where(so[-1]-so[-252]<1,1,0))
            
    class GM_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-252]+1,gm[-504]+1])-1)
            
    class GM_STABILITY_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.std([gm[-1]-gm[-252],gm[-252]-gm[-504]],axis=0)) 
            
    class ROA_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]+1, roa[-252]+1,roa[-504]+1])-1)
            
    class ROIC_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]+1, roic[-252]+1,roic[-504]+1])-1)
            
    class GM_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 8
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-2]+1, gm[-3]+1, gm[-4]+1, gm[-5]+1, gm[-6]+1, gm[-7]+1, gm[-8]+1])-1)        
            
    class GM_STABILITY_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gm[-8]) 
            
    class ROA_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]/100+1, roa[-2]/100+1,roa[-3]/100+1,roa[-4]/100+1,roa[-5]/100+1,roa[-6]/100+1,roa[-7]/100+1,roa[-8]/100+1])-1) 
            
    class ROIC_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]/100+1, roic[-2]/100+1,roic[-3]/100+1,roic[-4]/100+1,roic[-5]/100+1,roic[-6]/100+1,roic[-7]/100+1,roic[-8]/100+1])-1)
            
    factors = [
            MessageSum,
            fcf,
            # Direction,
            # mean_rev,
            # volatility,
            # growthscore,
            # peg_ratio,
            # MoneyflowVolume5d,
            # Trendline,
            # SalesGrowth,
            # GrossMarginChange,
            # Gross_Income_Margin,
            # MaxGap,
            # CapEx_Vol,
            # fcf_ev,
            # DebtToTotalAssets,
            # TEM,
            # Piotroski,
            # Altman_Z,
            # Quick_Ratio,
            # AdvancedMomentum,
            # STA,
            # SNOA, 
            # ROA,  
            # FCFTA,  
            # ROA_GROWTH,  
            # FCFTA_ROA,
            # FCFTA_GROWTH, 
            # LTD_GROWTH,  
            # CR_GROWTH,  
            # GM_GROWTH,  
            # ATR_GROWTH, 
            # NEQISS,
            # GM_GROWTH_2YR,  
            # GM_STABILITY_2YR, 
            # ROA_GROWTH_2YR,  
            # ROIC_GROWTH_2YR,  
            # GM_STABILITY_8YR,  
            # ROA_GROWTH_8YR,  
            # ROIC_GROWTH_8YR,  
        ]
    
    return factors

class Factor_N_Days_Ago(CustomFactor):
    
    def compute(self, today, assets, out, input_factor): 
        out[:] = input_factor[0]

def factor_pipeline():
    
    factors = make_factors()
    
    pipe = []
    for k,f in enumerate(factors):
        pipeline_columns = {}
        for days_ago in range(N_FACTOR_WINDOW):
            pipeline_columns['alpha_'+str(days_ago)] = Factor_N_Days_Ago([f()], window_length=days_ago+1, mask=QTradableStocksUS())
        pipe.append(Pipeline(columns = pipeline_columns, screen = QTradableStocksUS()))
        
    return pipe

def beta_pipeline():
    
    beta = SimpleBeta(target=sid(8554),regression_length=260,
                      allowed_missing_percentage=1.0
                     )
    
    pipe = Pipeline(columns = {'beta': beta},
    screen = QTradableStocksUS())
    return pipe
    
def initialize(context):
    
    factors = factor_pipeline()
    
    context.N_factors = len(factors)
    
    for k in range(context.N_factors):        
        attach_pipeline(factors[k], 'factor_pipeline_'+str(k))
      
    attach_pipeline(risk_loading_pipeline(), 'risk_loading_pipeline')
    attach_pipeline(beta_pipeline(), 'beta_pipeline')
    
def before_trading_start(context, data):
 
    risk_loadings = pipeline_output('risk_loading_pipeline')
    risk_loadings.fillna(risk_loadings.median(), inplace=True)
    context.risk_loadings = risk_loadings
    context.beta_pipeline = pipeline_output('beta_pipeline')
    
    alphas = []
    for k in range(context.N_factors):
        alphas.append(pipeline_output('factor_pipeline_'+str(k)))
        
    print alphas[0]
There was a runtime error.

Here's a different approach that uses a single Pipeline for all of the factors (more in line with the Q guidance on scaling with the number of Pipelines). Oddly, if I scale up N_FACTOR_WINDOW I get an timeout for initialize within less than 2 minutes, which is sorta odd. I didn't realize that there is a timeout for initialize.

Clone Algorithm
224
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import CustomFactor, SimpleBeta, Returns
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import Fundamentals
import quantopian.optimize as opt
from sklearn import preprocessing
from quantopian.pipeline.experimental import risk_loading_pipeline
from quantopian.pipeline.filters import QTradableStocksUS
from quantopian.pipeline.data.psychsignal import stocktwits
from scipy.stats.mstats import winsorize
from zipline.utils.numpy_utils import (
    repeat_first_axis,
    repeat_last_axis,
)
from quantopian.pipeline.data import factset

from scipy.stats.mstats import gmean
 
import numpy as np

WIN_LIMIT = 0
N_FACTOR_WINDOW = 5 # trailing window of alpha factors exported to before_trading_start

def preprocess(a):
    
    a = a.astype(np.float64)
    
    a[np.isinf(a)] = np.nan
    
    a = np.nan_to_num(a - np.nanmean(a))
    
    a = winsorize(a, limits=[WIN_LIMIT,WIN_LIMIT])
    
    return preprocessing.scale(a)

def make_factors():
    
    class MessageSum(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, stocktwits.bull_scored_messages, stocktwits.bear_scored_messages, stocktwits.total_scanned_messages]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, high, low, close, bull, bear, total):
            v = np.nansum((high-low)/close, axis=0)
            out[:] = preprocess(v*np.nansum(total*(bear-bull), axis=0))
                
    class fcf(CustomFactor):
        inputs = [Fundamentals.fcf_yield]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf_yield):
            out[:] = preprocess(np.nan_to_num(fcf_yield[-1,:]))
                
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, open, close):
            p = (close-open)/close
            out[:] = preprocess(np.nansum(-p,axis=0))
                
    class mean_rev(CustomFactor):   
        inputs = [USEquityPricing.high,USEquityPricing.low,USEquityPricing.close]
        window_length = 30
        window_safe = True
        def compute(self, today, assets, out, high, low, close):
            
            p = (high+low+close)/3
 
            m = len(close[0,:])
            n = len(close[:,0])
                
            b = np.zeros(m)
            a = np.zeros(m)
                
            for k in range(10,n+1):
                price_rel = np.nanmean(p[-k:,:],axis=0)/p[-1,:]
                wt = np.nansum(price_rel)
                b += wt*price_rel
                price_rel = 1.0/price_rel
                wt = np.nansum(price_rel)
                a += wt*price_rel
                
            out[:] = preprocess(b-a)
                
    class volatility(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume]
        window_length = 5
        window_safe = True
        def compute(self, today, assets, out, high, low, close, volume):
            vol = np.nansum(volume,axis=0)*np.nansum(np.absolute((high-low)/close),axis=0)
            out[:] = preprocess(-vol)
                
    class growthscore(CustomFactor):
        inputs = [Fundamentals.growth_score]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, growth_score):
            out[:] = preprocess(growth_score[-1,:])
                
    class peg_ratio(CustomFactor):
        inputs = [Fundamentals.peg_ratio]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, peg_ratio):
            out[:] = preprocess(-1.0/peg_ratio[-1,:])
                
    class MoneyflowVolume5d(CustomFactor):
        inputs = (USEquityPricing.close, USEquityPricing.volume)
 
        # we need one more day to get the direction of the price on the first
        # day of our desired window of 5 days
        window_length = 6
        window_safe = True
            
        def compute(self, today, assets, out, close_extra, volume_extra):
            # slice off the extra row used to get the direction of the close
            # on the first day
            close = close_extra[1:]
            volume = volume_extra[1:]
                
            dollar_volume = close * volume
            denominator = dollar_volume.sum(axis=0)
                
            difference = np.diff(close_extra, axis=0)
            direction = np.where(difference > 0, 1, -1)
            numerator = (direction * dollar_volume).sum(axis=0)
                
            out[:] = preprocess(-np.divide(numerator, denominator))
                
    class Trendline(CustomFactor):
        inputs = [USEquityPricing.close]
        window_length = 252
        window_safe = True
            
        _x = np.arange(window_length)
        _x_var = np.var(_x)
 
        def compute(self, today, assets, out, close):
            
            x_matrix = repeat_last_axis(
            (self.window_length - 1) / 2 - self._x,
            len(assets),
            )
 
            y_bar = np.nanmean(close, axis=0)
            y_bars = repeat_first_axis(y_bar, self.window_length)
            y_matrix = close - y_bars
 
            out[:] = preprocess(-np.divide(
            (x_matrix * y_matrix).sum(axis=0) / self._x_var,
            self.window_length
            ))
                
    class SalesGrowth(CustomFactor):
        inputs = [factset.Fundamentals.sales_gr_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, sales_growth):
            sales_growth = np.nan_to_num(sales_growth)
            sales_growth = preprocessing.scale(sales_growth,axis=0)
            out[:] = preprocess(sales_growth[-1])
 
    class GrossMarginChange(CustomFactor):
        window_length = 2*252
        window_safe = True
        inputs = [factset.Fundamentals.ebit_oper_mgn_qf]
        def compute(self, today, assets, out, ebit_oper_mgn):
            ebit_oper_mgn = np.nan_to_num(ebit_oper_mgn)
            ebit_oper_mgn = preprocessing.scale(ebit_oper_mgn,axis=0)
            out[:] = preprocess(ebit_oper_mgn[-1])
 
    class Gross_Income_Margin(CustomFactor):
        #Gross Income Margin:
        #Gross Profit divided by Net Sales
        #Notes:
        #High value suggests that the company is generating large profits
        inputs = [Fundamentals.cost_of_revenue, Fundamentals.total_revenue]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, cost_of_revenue, sales):
            gross_income_margin = sales[-1]/sales[-1] - cost_of_revenue[-1]/sales[-1]
            out[:] = preprocess(-gross_income_margin)
 
    class MaxGap(CustomFactor): 
        # the biggest absolute overnight gap in the previous 90 sessions
        inputs = [USEquityPricing.close] ; window_length = 90
        window_safe = True
        def compute(self, today, assets, out, close):
            abs_log_rets = np.abs(np.diff(np.log(close),axis=0))
            max_gap = np.max(abs_log_rets, axis=0)
            out[:] = preprocess(max_gap)
        
    class CapEx_Vol(CustomFactor):
        inputs=[
            factset.Fundamentals.capex_assets_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, capex_assets):
                 
            out[:] = preprocess(-np.ptp(capex_assets,axis=0))
                
    class fcf_ev(CustomFactor):
        inputs=[
            Fundamentals.fcf_per_share,
            Fundamentals.shares_outstanding,
            Fundamentals.enterprise_value,]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, shares, ev):
            v = fcf*shares/ev
            v[np.isinf(v)] = np.nan
                 
            out[:] = preprocess(v[-1])
                
    class DebtToTotalAssets(CustomFactor):
        inputs = [Fundamentals.long_term_debt,
            Fundamentals.current_debt,
            Fundamentals.cash_and_cash_equivalents,
            Fundamentals.total_assets]
        window_length = 1
        window_safe = True
            
        def compute(self, today, assets, out, ltd, std, cce, ta):
            std_part = np.maximum(std - cce, np.zeros(std.shape))
            v = np.divide(ltd + std_part, ta)
            v[np.isinf(v)] = np.nan
            out[:] = preprocess(np.ravel(v))
                
    class TEM(CustomFactor):
        """
        TEM = standard deviation of past 6 quarters' reports
        """
        inputs=[factset.Fundamentals.capex_qf_asof_date,
            factset.Fundamentals.capex_qf,
            factset.Fundamentals.assets]
        window_length = 390
        window_safe = True
        def compute(self, today, assets, out, asof_date, capex, total_assets):
            values = capex/total_assets
            values[np.isinf(values)] = np.nan
            out_temp = np.zeros_like(values[-1,:])
            for column_ix in range(asof_date.shape[1]):
                _, unique_indices = np.unique(asof_date[:, column_ix], return_index=True)
                quarterly_values = values[unique_indices, column_ix]
                if len(quarterly_values) < 6:
                    quarterly_values = np.hstack([
                    np.repeat([np.nan], 6 - len(quarterly_values)),
                    quarterly_values,
                    ])
            
                out_temp[column_ix] = np.std(quarterly_values[-6:])
                
            out[:] = preprocess(-out_temp)
                
    class Piotroski(CustomFactor):
        inputs = [
                Fundamentals.roa,
                Fundamentals.operating_cash_flow,
                Fundamentals.cash_flow_from_continuing_operating_activities,
                Fundamentals.long_term_debt_equity_ratio,
                Fundamentals.current_ratio,
                Fundamentals.shares_outstanding,
                Fundamentals.gross_margin,
                Fundamentals.assets_turnover,
                ]
 
        window_length = 100
        window_safe = True
    
        def compute(self, today, assets, out,roa, cash_flow, cash_flow_from_ops, long_term_debt_ratio, current_ratio, shares_outstanding, gross_margin, assets_turnover):
            
            profit = (
                        (roa[-1] > 0).astype(int) +
                        (cash_flow[-1] > 0).astype(int) +
                        (roa[-1] > roa[0]).astype(int) +
                        (cash_flow_from_ops[-1] > roa[-1]).astype(int)
                    )
        
            leverage = (
                        (long_term_debt_ratio[-1] < long_term_debt_ratio[0]).astype(int) +
                        (current_ratio[-1] > current_ratio[0]).astype(int) + 
                        (shares_outstanding[-1] <= shares_outstanding[0]).astype(int)
                        )
        
            operating = (
                        (gross_margin[-1] > gross_margin[0]).astype(int) +
                        (assets_turnover[-1] > assets_turnover[0]).astype(int)
                        )
        
            out[:] = preprocess(profit + leverage + operating)
            
    class Altman_Z(CustomFactor):
        inputs=[factset.Fundamentals.zscore_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, zscore_qf):
            out[:] = preprocess(zscore_qf[-1])
                
    class Quick_Ratio(CustomFactor):
        inputs=[factset.Fundamentals.quick_ratio_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, quick_ratio_qf):
            out[:] = preprocess(quick_ratio_qf[-1])
                
    class AdvancedMomentum(CustomFactor):
        inputs = (USEquityPricing.close, Returns(window_length=126))
        window_length = 252
        window_safe = True
 
        def compute(self, today, assets, out, prices, returns):
            am = np.divide(
            (
            (prices[-21] - prices[-252]) / prices[-252] -
            prices[-1] - prices[-21]
            ) / prices[-21],
            np.nanstd(returns, axis=0)
            )
                
            out[:] = preprocess(-am)

    class STA(CustomFactor):  
        inputs = [Fundamentals.operating_cash_flow,  
                  Fundamentals.net_income_continuous_operations,  
                  Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ocf, ni, ta):  
            ta = np.where(np.isnan(ta), 0, ta)  
            ocf = np.where(np.isnan(ocf), 0, ocf)  
            ni = np.where(np.isnan(ni), 0, ni)  
            out[:] = preprocess(abs(ni[-1] - ocf[-1])/ ta[-1])
            
    class SNOA(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                 Fundamentals.cash_and_cash_equivalents,  
                 Fundamentals.current_debt, # same as short-term debt?  
                 Fundamentals.minority_interest_balance_sheet,  
                 Fundamentals.long_term_debt, # check same?  
                 Fundamentals.preferred_stock] # check same?  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ta, cace, cd, mi, ltd, ps):  
            ta = np.where(np.isnan(ta), 0, ta)  
            cace = np.where(np.isnan(cace), 0, cace)  
            cd = np.where(np.isnan(cd), 0, cd)  
            mi = np.where(np.isnan(mi), 0, mi)  
            ltd = np.where(np.isnan(ltd), 0, ltd)  
            ps = np.where(np.isnan(ps), 0, ps)  
            results = ((ta[-1]-cace[-1])-(ta[-1]-cace[-1]-ltd[-1]-cd[-1]-ps[-1]-mi[-1]))/ta[-1]  
            out[:] = preprocess(np.where(np.isnan(results),0,results))
            
    class ROA(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(np.where(roa[-1]>0,1,0))
            
    class FCFTA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                 Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>0,1,0))
            
    class ROA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = np.where(roa[-1]>roa[-252],1,0)
            
    class FCFTA_ROA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets,  
                  Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta, roa):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>roa[-1],1,0))
            
    class FCFTA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>fcf[-252]/ta[-252],1,0))
            
    class LTD_GROWTH(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                  Fundamentals.long_term_debt]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, ta, ltd):  
            out[:] = preprocess(np.where(ltd[-1]/ta[-1]<ltd[-252]/ta[-252],1,0))
            
    class CR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.current_ratio]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, cr):  
            out[:] = preprocess(np.where(cr[-1]>cr[-252],1,0))
            
    class GM_GROWTH(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.where(gm[-1]>gm[-252],1,0))
            
    class ATR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.assets_turnover]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, atr):  
            out[:] = preprocess(np.where(atr[-1]>atr[-252],1,0))
            
    class NEQISS(CustomFactor):  
        inputs = [Fundamentals.shares_outstanding]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, so):  
            out[:] = preprocess(np.where(so[-1]-so[-252]<1,1,0))
            
    class GM_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-252]+1,gm[-504]+1])-1)
            
    class GM_STABILITY_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.std([gm[-1]-gm[-252],gm[-252]-gm[-504]],axis=0)) 
            
    class ROA_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]+1, roa[-252]+1,roa[-504]+1])-1)
            
    class ROIC_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]+1, roic[-252]+1,roic[-504]+1])-1)
            
    class GM_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 8
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-2]+1, gm[-3]+1, gm[-4]+1, gm[-5]+1, gm[-6]+1, gm[-7]+1, gm[-8]+1])-1)        
            
    class GM_STABILITY_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gm[-8]) 
            
    class ROA_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]/100+1, roa[-2]/100+1,roa[-3]/100+1,roa[-4]/100+1,roa[-5]/100+1,roa[-6]/100+1,roa[-7]/100+1,roa[-8]/100+1])-1) 
            
    class ROIC_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]/100+1, roic[-2]/100+1,roic[-3]/100+1,roic[-4]/100+1,roic[-5]/100+1,roic[-6]/100+1,roic[-7]/100+1,roic[-8]/100+1])-1)
            
    factors = [
            MessageSum,
            fcf,
            Direction,
            mean_rev,
            volatility,
            growthscore,
            peg_ratio,
            MoneyflowVolume5d,
            Trendline,
            SalesGrowth,
            GrossMarginChange,
            Gross_Income_Margin,
            MaxGap,
            CapEx_Vol,
            fcf_ev,
            DebtToTotalAssets,
            TEM,
            Piotroski,
            Altman_Z,
            Quick_Ratio,
            AdvancedMomentum,
            STA,
            SNOA, 
            ROA,  
            FCFTA,  
            ROA_GROWTH,  
            FCFTA_ROA,
            FCFTA_GROWTH, 
            LTD_GROWTH,  
            CR_GROWTH,  
            GM_GROWTH,  
            ATR_GROWTH, 
            NEQISS,
            GM_GROWTH_2YR,  
            GM_STABILITY_2YR, 
            ROA_GROWTH_2YR,  
            ROIC_GROWTH_2YR,  
            GM_STABILITY_8YR,  
            ROA_GROWTH_8YR,  
            ROIC_GROWTH_8YR,  
        ]
    
    return factors

class Factor_N_Days_Ago(CustomFactor):
    
    def compute(self, today, assets, out, input_factor): 
        out[:] = input_factor[0]

def factor_pipeline():
    
    factors = make_factors()
    
    pipeline_columns = {}
    for k,f in enumerate(factors):
        for days_ago in range(N_FACTOR_WINDOW):
            pipeline_columns['alpha_'+str(k)+'_'+str(days_ago)] = Factor_N_Days_Ago([f(mask=QTradableStocksUS())], window_length=days_ago+1, mask=QTradableStocksUS())
        
    pipe = Pipeline(columns = pipeline_columns,
    screen = QTradableStocksUS())
    
    return pipe

def beta_pipeline():
    
    beta = SimpleBeta(target=sid(8554),regression_length=260,
                      allowed_missing_percentage=1.0
                     )
    
    pipe = Pipeline(columns = {'beta': beta},
    screen = QTradableStocksUS())
    return pipe
    
def initialize(context):    
    
    attach_pipeline(risk_loading_pipeline(), 'risk_loading_pipeline')
    attach_pipeline(beta_pipeline(), 'beta_pipeline')
    attach_pipeline(factor_pipeline(), 'factor_pipeline')
    
def before_trading_start(context, data):
 
    risk_loadings = pipeline_output('risk_loading_pipeline')
    risk_loadings.fillna(risk_loadings.median(), inplace=True)
    context.risk_loadings = risk_loadings
    context.beta_pipeline = pipeline_output('beta_pipeline')
    
    context.alphas = pipeline_output('factor_pipeline')
    
    print context.alphas.head().iloc[:,0:5]
There was a runtime error.

Grant, interesting idea. Please note that attaching multiple pipelines to an algorithm can introduce inefficiencies if you aren't careful.

The thing you want to avoid is attaching multiple pipelines that use the same data fields. For example, if you have multiple alpha factors that make use of USEquityPricing.close.latest, you should perform all logic that needs that particular data in one pipeline (or in post processing in before_trading_start()).

The problem simplifies to this: If you attach two pipelines that use the same data field, the entire dataset will be loaded twice.

Thanks Cal -

My objective here is to get data from Pipeline into before_trading_start where it would be processed. I think I'll abandon the idea of using a Pipeline per factor, and output all of the factors in one Pipeline (each of M factors having a trailing window length of N). And there will be K stocks in the QTradableStocksUS. So, we are talking about a Pandas dataframe with MxN columns and K rows . So what is a reasonable limit on MxN (given that K is fixed as the number of stocks in the `QTradableStocksUS')?

Also, is there no way to set the chunk size in Pipeline in the backtester? Presumably this would help with memory management (at the expense of more data loads, I suppose, which, if taken to an extreme, would cause a timeout of Pipeline?).

Hi Cal -

Another question - would multiple Pipelines help in memory management? My thinking is that the Pipelines must be processed serially. Data are read into a buffer, processed, and the results stored in an output buffer. Once all of the Pipelines have been processed, then the algo starts outputting the results to before_trading_start. So, if the input buffer memory space is recycled as the algo steps from one Pipeline to the next, then it should be easier to avoid memory limitations, right?

Here's an architecture I've been developing. The basic idea is to output trailing windows of each of N alpha factors computed in Pipeline, and do the alpha combination in before_trading_start. I've managed to get clustering to work, which is kinda cool. My thinking is that the clustering would be used in a factor weighting scheme. For example, say there are five factors, alpha_0, alpha_1, alpha_2, alpha_3, alpha_4, and they are clustered like this:

Cluster 0: alpha_0, alpha_3, alpha_4
Cluster 1: alpha_1, alpha_2

The factors would then be combined something like this:

alpha = wc_0*(w_0*alpha_0 + w_3*alpha_3  + w_4*alpha_4) + wc_1*(w_1*alpha_1 + w_2*alpha_2)  

The individual factor weights (e.g. w_0, w_1, ...) would be static, and based on some global, long-term analysis if the value of each factor in the portfolio (e.g. from Alphalens or some such thing). The cluster weights (e.g. wc_0, wc_1, etc.) would be dynamic and based on the short-term clustering result.

EDIT: See post below for a minor update to the code.

Clone Algorithm
224
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import CustomFactor, SimpleBeta, Returns
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import Fundamentals
import quantopian.optimize as opt
from sklearn import preprocessing
from quantopian.pipeline.experimental import risk_loading_pipeline
from quantopian.pipeline.filters import QTradableStocksUS
from quantopian.pipeline.data.psychsignal import stocktwits
from scipy.stats.mstats import winsorize
from zipline.utils.numpy_utils import (
    repeat_first_axis,
    repeat_last_axis,
)
from quantopian.pipeline.data import factset

from scipy.stats.mstats import gmean
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import SpectralClustering
 
import numpy as np

WIN_LIMIT = 0
N_FACTORS = None
N_FACTOR_WINDOW = 5 # trailing window of alpha factors exported to before_trading_start

def preprocess(a):
    
    a = a.astype(np.float64)
    a[np.isinf(a)] = np.nan
    a = np.nan_to_num(a - np.nanmean(a))
    a = winsorize(a, limits=[WIN_LIMIT,WIN_LIMIT])
    
    return preprocessing.scale(a)

def make_factors():
    
    class MessageSum(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, stocktwits.bull_scored_messages, stocktwits.bear_scored_messages, stocktwits.total_scanned_messages]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, high, low, close, bull, bear, total):
            v = np.nansum((high-low)/close, axis=0)
            out[:] = preprocess(v*np.nansum(total*(bear-bull), axis=0))
                
    class fcf(CustomFactor):
        inputs = [Fundamentals.fcf_yield]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf_yield):
            out[:] = preprocess(np.nan_to_num(fcf_yield[-1,:]))
                
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, open, close):
            p = (close-open)/close
            out[:] = preprocess(np.nansum(-p,axis=0))
                
    class mean_rev(CustomFactor):   
        inputs = [USEquityPricing.high,USEquityPricing.low,USEquityPricing.close]
        window_length = 30
        window_safe = True
        def compute(self, today, assets, out, high, low, close):
            
            p = (high+low+close)/3
 
            m = len(close[0,:])
            n = len(close[:,0])
                
            b = np.zeros(m)
            a = np.zeros(m)
                
            for k in range(10,n+1):
                price_rel = np.nanmean(p[-k:,:],axis=0)/p[-1,:]
                wt = np.nansum(price_rel)
                b += wt*price_rel
                price_rel = 1.0/price_rel
                wt = np.nansum(price_rel)
                a += wt*price_rel
                
            out[:] = preprocess(b-a)
                
    class volatility(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume]
        window_length = 5
        window_safe = True
        def compute(self, today, assets, out, high, low, close, volume):
            vol = np.nansum(volume,axis=0)*np.nansum(np.absolute((high-low)/close),axis=0)
            out[:] = preprocess(-vol)
                
    class growthscore(CustomFactor):
        inputs = [Fundamentals.growth_score]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, growth_score):
            out[:] = preprocess(growth_score[-1,:])
                
    class peg_ratio(CustomFactor):
        inputs = [Fundamentals.peg_ratio]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, peg_ratio):
            out[:] = preprocess(-1.0/peg_ratio[-1,:])
                
    class MoneyflowVolume5d(CustomFactor):
        inputs = (USEquityPricing.close, USEquityPricing.volume)
 
        # we need one more day to get the direction of the price on the first
        # day of our desired window of 5 days
        window_length = 6
        window_safe = True
            
        def compute(self, today, assets, out, close_extra, volume_extra):
            # slice off the extra row used to get the direction of the close
            # on the first day
            close = close_extra[1:]
            volume = volume_extra[1:]
                
            dollar_volume = close * volume
            denominator = dollar_volume.sum(axis=0)
                
            difference = np.diff(close_extra, axis=0)
            direction = np.where(difference > 0, 1, -1)
            numerator = (direction * dollar_volume).sum(axis=0)
                
            out[:] = preprocess(-np.divide(numerator, denominator))
                
    class Trendline(CustomFactor):
        inputs = [USEquityPricing.close]
        window_length = 252
        window_safe = True
            
        _x = np.arange(window_length)
        _x_var = np.var(_x)
 
        def compute(self, today, assets, out, close):
            
            x_matrix = repeat_last_axis(
            (self.window_length - 1) / 2 - self._x,
            len(assets),
            )
 
            y_bar = np.nanmean(close, axis=0)
            y_bars = repeat_first_axis(y_bar, self.window_length)
            y_matrix = close - y_bars
 
            out[:] = preprocess(-np.divide(
            (x_matrix * y_matrix).sum(axis=0) / self._x_var,
            self.window_length
            ))
                
    class SalesGrowth(CustomFactor):
        inputs = [factset.Fundamentals.sales_gr_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, sales_growth):
            sales_growth = np.nan_to_num(sales_growth)
            sales_growth = preprocessing.scale(sales_growth,axis=0)
            out[:] = preprocess(sales_growth[-1])
 
    class GrossMarginChange(CustomFactor):
        window_length = 2*252
        window_safe = True
        inputs = [factset.Fundamentals.ebit_oper_mgn_qf]
        def compute(self, today, assets, out, ebit_oper_mgn):
            ebit_oper_mgn = np.nan_to_num(ebit_oper_mgn)
            ebit_oper_mgn = preprocessing.scale(ebit_oper_mgn,axis=0)
            out[:] = preprocess(ebit_oper_mgn[-1])
 
    class Gross_Income_Margin(CustomFactor):
        #Gross Income Margin:
        #Gross Profit divided by Net Sales
        #Notes:
        #High value suggests that the company is generating large profits
        inputs = [Fundamentals.cost_of_revenue, Fundamentals.total_revenue]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, cost_of_revenue, sales):
            gross_income_margin = sales[-1]/sales[-1] - cost_of_revenue[-1]/sales[-1]
            out[:] = preprocess(-gross_income_margin)
 
    class MaxGap(CustomFactor): 
        # the biggest absolute overnight gap in the previous 90 sessions
        inputs = [USEquityPricing.close] ; window_length = 90
        window_safe = True
        def compute(self, today, assets, out, close):
            abs_log_rets = np.abs(np.diff(np.log(close),axis=0))
            max_gap = np.max(abs_log_rets, axis=0)
            out[:] = preprocess(max_gap)
        
    class CapEx_Vol(CustomFactor):
        inputs=[
            factset.Fundamentals.capex_assets_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, capex_assets):
                 
            out[:] = preprocess(-np.ptp(capex_assets,axis=0))
                
    class fcf_ev(CustomFactor):
        inputs=[
            Fundamentals.fcf_per_share,
            Fundamentals.shares_outstanding,
            Fundamentals.enterprise_value,]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, shares, ev):
            v = fcf*shares/ev
            v[np.isinf(v)] = np.nan
                 
            out[:] = preprocess(v[-1])
                
    class DebtToTotalAssets(CustomFactor):
        inputs = [Fundamentals.long_term_debt,
            Fundamentals.current_debt,
            Fundamentals.cash_and_cash_equivalents,
            Fundamentals.total_assets]
        window_length = 1
        window_safe = True
            
        def compute(self, today, assets, out, ltd, std, cce, ta):
            std_part = np.maximum(std - cce, np.zeros(std.shape))
            v = np.divide(ltd + std_part, ta)
            v[np.isinf(v)] = np.nan
            out[:] = preprocess(np.ravel(v))
                
    class TEM(CustomFactor):
        """
        TEM = standard deviation of past 6 quarters' reports
        """
        inputs=[factset.Fundamentals.capex_qf_asof_date,
            factset.Fundamentals.capex_qf,
            factset.Fundamentals.assets]
        window_length = 390
        window_safe = True
        def compute(self, today, assets, out, asof_date, capex, total_assets):
            values = capex/total_assets
            values[np.isinf(values)] = np.nan
            out_temp = np.zeros_like(values[-1,:])
            for column_ix in range(asof_date.shape[1]):
                _, unique_indices = np.unique(asof_date[:, column_ix], return_index=True)
                quarterly_values = values[unique_indices, column_ix]
                if len(quarterly_values) < 6:
                    quarterly_values = np.hstack([
                    np.repeat([np.nan], 6 - len(quarterly_values)),
                    quarterly_values,
                    ])
            
                out_temp[column_ix] = np.std(quarterly_values[-6:])
                
            out[:] = preprocess(-out_temp)
                
    class Piotroski(CustomFactor):
        inputs = [
                Fundamentals.roa,
                Fundamentals.operating_cash_flow,
                Fundamentals.cash_flow_from_continuing_operating_activities,
                Fundamentals.long_term_debt_equity_ratio,
                Fundamentals.current_ratio,
                Fundamentals.shares_outstanding,
                Fundamentals.gross_margin,
                Fundamentals.assets_turnover,
                ]
 
        window_length = 100
        window_safe = True
    
        def compute(self, today, assets, out,roa, cash_flow, cash_flow_from_ops, long_term_debt_ratio, current_ratio, shares_outstanding, gross_margin, assets_turnover):
            
            profit = (
                        (roa[-1] > 0).astype(int) +
                        (cash_flow[-1] > 0).astype(int) +
                        (roa[-1] > roa[0]).astype(int) +
                        (cash_flow_from_ops[-1] > roa[-1]).astype(int)
                    )
        
            leverage = (
                        (long_term_debt_ratio[-1] < long_term_debt_ratio[0]).astype(int) +
                        (current_ratio[-1] > current_ratio[0]).astype(int) + 
                        (shares_outstanding[-1] <= shares_outstanding[0]).astype(int)
                        )
        
            operating = (
                        (gross_margin[-1] > gross_margin[0]).astype(int) +
                        (assets_turnover[-1] > assets_turnover[0]).astype(int)
                        )
        
            out[:] = preprocess(profit + leverage + operating)
            
    class Altman_Z(CustomFactor):
        inputs=[factset.Fundamentals.zscore_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, zscore_qf):
            out[:] = preprocess(zscore_qf[-1])
                
    class Quick_Ratio(CustomFactor):
        inputs=[factset.Fundamentals.quick_ratio_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, quick_ratio_qf):
            out[:] = preprocess(quick_ratio_qf[-1])
                
    class AdvancedMomentum(CustomFactor):
        inputs = (USEquityPricing.close, Returns(window_length=126))
        window_length = 252
        window_safe = True
 
        def compute(self, today, assets, out, prices, returns):
            am = np.divide(
            (
            (prices[-21] - prices[-252]) / prices[-252] -
            prices[-1] - prices[-21]
            ) / prices[-21],
            np.nanstd(returns, axis=0)
            )
                
            out[:] = preprocess(-am)

    class STA(CustomFactor):  
        inputs = [Fundamentals.operating_cash_flow,  
                  Fundamentals.net_income_continuous_operations,  
                  Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ocf, ni, ta):  
            ta = np.where(np.isnan(ta), 0, ta)  
            ocf = np.where(np.isnan(ocf), 0, ocf)  
            ni = np.where(np.isnan(ni), 0, ni)  
            out[:] = preprocess(abs(ni[-1] - ocf[-1])/ ta[-1])
            
    class SNOA(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                 Fundamentals.cash_and_cash_equivalents,  
                 Fundamentals.current_debt, # same as short-term debt?  
                 Fundamentals.minority_interest_balance_sheet,  
                 Fundamentals.long_term_debt, # check same?  
                 Fundamentals.preferred_stock] # check same?  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ta, cace, cd, mi, ltd, ps):  
            ta = np.where(np.isnan(ta), 0, ta)  
            cace = np.where(np.isnan(cace), 0, cace)  
            cd = np.where(np.isnan(cd), 0, cd)  
            mi = np.where(np.isnan(mi), 0, mi)  
            ltd = np.where(np.isnan(ltd), 0, ltd)  
            ps = np.where(np.isnan(ps), 0, ps)  
            results = ((ta[-1]-cace[-1])-(ta[-1]-cace[-1]-ltd[-1]-cd[-1]-ps[-1]-mi[-1]))/ta[-1]  
            out[:] = preprocess(np.where(np.isnan(results),0,results))
            
    class ROA(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(np.where(roa[-1]>0,1,0))
            
    class FCFTA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                 Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>0,1,0))
            
    class ROA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = np.where(roa[-1]>roa[-252],1,0)
            
    class FCFTA_ROA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets,  
                  Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta, roa):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>roa[-1],1,0))
            
    class FCFTA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>fcf[-252]/ta[-252],1,0))
            
    class LTD_GROWTH(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                  Fundamentals.long_term_debt]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, ta, ltd):  
            out[:] = preprocess(np.where(ltd[-1]/ta[-1]<ltd[-252]/ta[-252],1,0))
            
    class CR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.current_ratio]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, cr):  
            out[:] = preprocess(np.where(cr[-1]>cr[-252],1,0))
            
    class GM_GROWTH(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.where(gm[-1]>gm[-252],1,0))
            
    class ATR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.assets_turnover]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, atr):  
            out[:] = preprocess(np.where(atr[-1]>atr[-252],1,0))
            
    class NEQISS(CustomFactor):  
        inputs = [Fundamentals.shares_outstanding]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, so):  
            out[:] = preprocess(np.where(so[-1]-so[-252]<1,1,0))
            
    class GM_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-252]+1,gm[-504]+1])-1)
            
    class GM_STABILITY_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.std([gm[-1]-gm[-252],gm[-252]-gm[-504]],axis=0)) 
            
    class ROA_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]+1, roa[-252]+1,roa[-504]+1])-1)
            
    class ROIC_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]+1, roic[-252]+1,roic[-504]+1])-1)
            
    class GM_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 8
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-2]+1, gm[-3]+1, gm[-4]+1, gm[-5]+1, gm[-6]+1, gm[-7]+1, gm[-8]+1])-1)        
            
    class GM_STABILITY_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gm[-8]) 
            
    class ROA_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]/100+1, roa[-2]/100+1,roa[-3]/100+1,roa[-4]/100+1,roa[-5]/100+1,roa[-6]/100+1,roa[-7]/100+1,roa[-8]/100+1])-1) 
            
    class ROIC_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]/100+1, roic[-2]/100+1,roic[-3]/100+1,roic[-4]/100+1,roic[-5]/100+1,roic[-6]/100+1,roic[-7]/100+1,roic[-8]/100+1])-1)
            
    factors = [
            MessageSum,
            fcf,
            Direction,
            mean_rev,
            volatility,
            growthscore,
            peg_ratio,
            MoneyflowVolume5d,
            Trendline,
            SalesGrowth,
            GrossMarginChange,
            Gross_Income_Margin,
            MaxGap,
            CapEx_Vol,
            fcf_ev,
            DebtToTotalAssets,
            TEM,
            Piotroski,
            Altman_Z,
            Quick_Ratio,
            # AdvancedMomentum,
            # STA,
            # SNOA, 
            # ROA,  
            # FCFTA,  
            # ROA_GROWTH,  
            # FCFTA_ROA,
            # FCFTA_GROWTH, 
            # LTD_GROWTH,  
            # CR_GROWTH,  
            # GM_GROWTH,  
            # ATR_GROWTH, 
            # NEQISS,
            # GM_GROWTH_2YR,  
            # GM_STABILITY_2YR, 
            # ROA_GROWTH_2YR,  
            # ROIC_GROWTH_2YR,  
            # GM_STABILITY_8YR,  
            # ROA_GROWTH_8YR,  
            # ROIC_GROWTH_8YR,  
        ]
    
    return factors

class Factor_N_Days_Ago(CustomFactor):
    
    def compute(self, today, assets, out, input_factor):
        out[:] = input_factor[0]

def factor_pipeline():
    
    factors = make_factors()
    
    pipeline_columns = {}
    for k,f in enumerate(factors):
        for days_ago in range(N_FACTOR_WINDOW):
            pipeline_columns['alpha_'+str(k)+'_'+str(days_ago)] = Factor_N_Days_Ago([f(mask=QTradableStocksUS())], window_length=days_ago+1, mask=QTradableStocksUS())
    
    pipe = Pipeline(columns = pipeline_columns,
    screen = QTradableStocksUS())
    
    return pipe

def beta_pipeline():
    
    beta = SimpleBeta(target=sid(8554),regression_length=260,
                      allowed_missing_percentage=1.0
                     )
    
    pipe = Pipeline(columns = {'beta': beta},
    screen = QTradableStocksUS())
    return pipe
    
def initialize(context):    
    
    attach_pipeline(risk_loading_pipeline(), 'risk_loading_pipeline')
    attach_pipeline(beta_pipeline(), 'beta_pipeline')
    attach_pipeline(factor_pipeline(), 'factor_pipeline')
    
def before_trading_start(context, data):
 
    risk_loadings = pipeline_output('risk_loading_pipeline')
    risk_loadings.fillna(risk_loadings.median(), inplace=True)
    context.risk_loadings = risk_loadings
    context.beta_pipeline = pipeline_output('beta_pipeline')
    
    alphas = pipeline_output('factor_pipeline').dropna()
    
    n_factors = len(alphas.columns)/N_FACTOR_WINDOW
    n_stocks = len(alphas.index)
    
    alphas_flattened = np.zeros((n_factors,n_stocks*N_FACTOR_WINDOW))
    
    for f in range(n_factors):
        a = alphas.iloc[:,f*N_FACTOR_WINDOW:(f+1)*N_FACTOR_WINDOW].values
        alphas_flattened[f,:] = np.ravel(a)
    
    X = cosine_similarity(alphas_flattened)
    
    clustering = SpectralClustering(n_clusters=3,assign_labels="discretize",random_state=0).fit(X)
    
    print clustering.labels_
There was a runtime error.

Man that is just super cool! Are you thinking a ‘Value Cluster’, a ‘Momentum Cluster’, etc?

What are you thinking of using to determine the dynamic cluster weights? And what trailing window would you use (or would this be dynamic as well, based on something, say market volatility)?

A few options I can think of: Absolute Returns, Risk-adjusted Returns, Volatility of Returns (inverse weight), or IC (mean or risk-adjusted).

Not really sure how to get any of that in Pipeline or anywhere in the IDE for that matter, so I’m not much help unfortunately. Maybe you or someone else know?

Other things to consider may be ‘momentum weight’ or ‘reversal weight.’ If a cluster has done really well during the trailing window, will it continue to do well, or will it ‘reverse?’ Is the trend increasing or decreasing, from a high or low value? Maybe use some sort of smoothing (eg SMA or EWMA) of whatever you’re using? Maybe use both a ‘momentum weight’ (longer trailing window) and a ‘reversal weight’ (shorter trailing window)?

Pretty obvious stuff maybe, just sharing my thoughts. :)

Thanks Joakim -

One thought is that clustering may be a way of avoiding having too much weight on a given alpha source, should multiple factors be tapping into it. For example, have a look at https://scikit-learn.org/stable/auto_examples/applications/plot_stock_market.html. You can see that companies in similar industries are clustered together. The same should be true of alpha factors, I figure (even if the reason for the clustering may not be so easy to sort out). Basically, I think one could think of all of the factors that land in a given cluster to be from the same hidden sector.

Here's a minor update. I'm still getting up the learning curve, but I think the clustering should be done like this:

clustering = SpectralClustering(n_clusters=3,assign_labels="discretize",random_state=0).fit(alphas_flattened)  

SpectralClustering takes the data and creates an affinity matrix (whatever that is...), not a similarity matrix.

Clone Algorithm
224
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.factors import CustomFactor, SimpleBeta, Returns
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import Fundamentals
import quantopian.optimize as opt
from sklearn import preprocessing
from quantopian.pipeline.experimental import risk_loading_pipeline
from quantopian.pipeline.filters import QTradableStocksUS
from quantopian.pipeline.data.psychsignal import stocktwits
from scipy.stats.mstats import winsorize
from zipline.utils.numpy_utils import (
    repeat_first_axis,
    repeat_last_axis,
)
from quantopian.pipeline.data import factset

from scipy.stats.mstats import gmean
from sklearn.cluster import SpectralClustering
 
import numpy as np

WIN_LIMIT = 0
N_FACTORS = None
N_FACTOR_WINDOW = 5 # trailing window of alpha factors exported to before_trading_start

def preprocess(a):
    
    a = a.astype(np.float64)
    a[np.isinf(a)] = np.nan
    a = np.nan_to_num(a - np.nanmean(a))
    a = winsorize(a, limits=[WIN_LIMIT,WIN_LIMIT])
    
    return preprocessing.scale(a)

def make_factors():
    
    class MessageSum(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, stocktwits.bull_scored_messages, stocktwits.bear_scored_messages, stocktwits.total_scanned_messages]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, high, low, close, bull, bear, total):
            v = np.nansum((high-low)/close, axis=0)
            out[:] = preprocess(v*np.nansum(total*(bear-bull), axis=0))
                
    class fcf(CustomFactor):
        inputs = [Fundamentals.fcf_yield]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf_yield):
            out[:] = preprocess(np.nan_to_num(fcf_yield[-1,:]))
                
    class Direction(CustomFactor):
        inputs = [USEquityPricing.open, USEquityPricing.close]
        window_length = 21
        window_safe = True
        def compute(self, today, assets, out, open, close):
            p = (close-open)/close
            out[:] = preprocess(np.nansum(-p,axis=0))
                
    class mean_rev(CustomFactor):   
        inputs = [USEquityPricing.high,USEquityPricing.low,USEquityPricing.close]
        window_length = 30
        window_safe = True
        def compute(self, today, assets, out, high, low, close):
            
            p = (high+low+close)/3
 
            m = len(close[0,:])
            n = len(close[:,0])
                
            b = np.zeros(m)
            a = np.zeros(m)
                
            for k in range(10,n+1):
                price_rel = np.nanmean(p[-k:,:],axis=0)/p[-1,:]
                wt = np.nansum(price_rel)
                b += wt*price_rel
                price_rel = 1.0/price_rel
                wt = np.nansum(price_rel)
                a += wt*price_rel
                
            out[:] = preprocess(b-a)
                
    class volatility(CustomFactor):
        inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume]
        window_length = 5
        window_safe = True
        def compute(self, today, assets, out, high, low, close, volume):
            vol = np.nansum(volume,axis=0)*np.nansum(np.absolute((high-low)/close),axis=0)
            out[:] = preprocess(-vol)
                
    class growthscore(CustomFactor):
        inputs = [Fundamentals.growth_score]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, growth_score):
            out[:] = preprocess(growth_score[-1,:])
                
    class peg_ratio(CustomFactor):
        inputs = [Fundamentals.peg_ratio]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, peg_ratio):
            out[:] = preprocess(-1.0/peg_ratio[-1,:])
                
    class MoneyflowVolume5d(CustomFactor):
        inputs = (USEquityPricing.close, USEquityPricing.volume)
 
        # we need one more day to get the direction of the price on the first
        # day of our desired window of 5 days
        window_length = 6
        window_safe = True
            
        def compute(self, today, assets, out, close_extra, volume_extra):
            # slice off the extra row used to get the direction of the close
            # on the first day
            close = close_extra[1:]
            volume = volume_extra[1:]
                
            dollar_volume = close * volume
            denominator = dollar_volume.sum(axis=0)
                
            difference = np.diff(close_extra, axis=0)
            direction = np.where(difference > 0, 1, -1)
            numerator = (direction * dollar_volume).sum(axis=0)
                
            out[:] = preprocess(-np.divide(numerator, denominator))
                
    class Trendline(CustomFactor):
        inputs = [USEquityPricing.close]
        window_length = 252
        window_safe = True
            
        _x = np.arange(window_length)
        _x_var = np.var(_x)
 
        def compute(self, today, assets, out, close):
            
            x_matrix = repeat_last_axis(
            (self.window_length - 1) / 2 - self._x,
            len(assets),
            )
 
            y_bar = np.nanmean(close, axis=0)
            y_bars = repeat_first_axis(y_bar, self.window_length)
            y_matrix = close - y_bars
 
            out[:] = preprocess(-np.divide(
            (x_matrix * y_matrix).sum(axis=0) / self._x_var,
            self.window_length
            ))
                
    class SalesGrowth(CustomFactor):
        inputs = [factset.Fundamentals.sales_gr_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, sales_growth):
            sales_growth = np.nan_to_num(sales_growth)
            sales_growth = preprocessing.scale(sales_growth,axis=0)
            out[:] = preprocess(sales_growth[-1])
 
    class GrossMarginChange(CustomFactor):
        window_length = 2*252
        window_safe = True
        inputs = [factset.Fundamentals.ebit_oper_mgn_qf]
        def compute(self, today, assets, out, ebit_oper_mgn):
            ebit_oper_mgn = np.nan_to_num(ebit_oper_mgn)
            ebit_oper_mgn = preprocessing.scale(ebit_oper_mgn,axis=0)
            out[:] = preprocess(ebit_oper_mgn[-1])
 
    class Gross_Income_Margin(CustomFactor):
        #Gross Income Margin:
        #Gross Profit divided by Net Sales
        #Notes:
        #High value suggests that the company is generating large profits
        inputs = [Fundamentals.cost_of_revenue, Fundamentals.total_revenue]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, cost_of_revenue, sales):
            gross_income_margin = sales[-1]/sales[-1] - cost_of_revenue[-1]/sales[-1]
            out[:] = preprocess(-gross_income_margin)
 
    class MaxGap(CustomFactor): 
        # the biggest absolute overnight gap in the previous 90 sessions
        inputs = [USEquityPricing.close] ; window_length = 90
        window_safe = True
        def compute(self, today, assets, out, close):
            abs_log_rets = np.abs(np.diff(np.log(close),axis=0))
            max_gap = np.max(abs_log_rets, axis=0)
            out[:] = preprocess(max_gap)
        
    class CapEx_Vol(CustomFactor):
        inputs=[
            factset.Fundamentals.capex_assets_qf]
        window_length = 2*252
        window_safe = True
        def compute(self, today, assets, out, capex_assets):
                 
            out[:] = preprocess(-np.ptp(capex_assets,axis=0))
                
    class fcf_ev(CustomFactor):
        inputs=[
            Fundamentals.fcf_per_share,
            Fundamentals.shares_outstanding,
            Fundamentals.enterprise_value,]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, shares, ev):
            v = fcf*shares/ev
            v[np.isinf(v)] = np.nan
                 
            out[:] = preprocess(v[-1])
                
    class DebtToTotalAssets(CustomFactor):
        inputs = [Fundamentals.long_term_debt,
            Fundamentals.current_debt,
            Fundamentals.cash_and_cash_equivalents,
            Fundamentals.total_assets]
        window_length = 1
        window_safe = True
            
        def compute(self, today, assets, out, ltd, std, cce, ta):
            std_part = np.maximum(std - cce, np.zeros(std.shape))
            v = np.divide(ltd + std_part, ta)
            v[np.isinf(v)] = np.nan
            out[:] = preprocess(np.ravel(v))
                
    class TEM(CustomFactor):
        """
        TEM = standard deviation of past 6 quarters' reports
        """
        inputs=[factset.Fundamentals.capex_qf_asof_date,
            factset.Fundamentals.capex_qf,
            factset.Fundamentals.assets]
        window_length = 390
        window_safe = True
        def compute(self, today, assets, out, asof_date, capex, total_assets):
            values = capex/total_assets
            values[np.isinf(values)] = np.nan
            out_temp = np.zeros_like(values[-1,:])
            for column_ix in range(asof_date.shape[1]):
                _, unique_indices = np.unique(asof_date[:, column_ix], return_index=True)
                quarterly_values = values[unique_indices, column_ix]
                if len(quarterly_values) < 6:
                    quarterly_values = np.hstack([
                    np.repeat([np.nan], 6 - len(quarterly_values)),
                    quarterly_values,
                    ])
            
                out_temp[column_ix] = np.std(quarterly_values[-6:])
                
            out[:] = preprocess(-out_temp)
                
    class Piotroski(CustomFactor):
        inputs = [
                Fundamentals.roa,
                Fundamentals.operating_cash_flow,
                Fundamentals.cash_flow_from_continuing_operating_activities,
                Fundamentals.long_term_debt_equity_ratio,
                Fundamentals.current_ratio,
                Fundamentals.shares_outstanding,
                Fundamentals.gross_margin,
                Fundamentals.assets_turnover,
                ]
 
        window_length = 100
        window_safe = True
    
        def compute(self, today, assets, out,roa, cash_flow, cash_flow_from_ops, long_term_debt_ratio, current_ratio, shares_outstanding, gross_margin, assets_turnover):
            
            profit = (
                        (roa[-1] > 0).astype(int) +
                        (cash_flow[-1] > 0).astype(int) +
                        (roa[-1] > roa[0]).astype(int) +
                        (cash_flow_from_ops[-1] > roa[-1]).astype(int)
                    )
        
            leverage = (
                        (long_term_debt_ratio[-1] < long_term_debt_ratio[0]).astype(int) +
                        (current_ratio[-1] > current_ratio[0]).astype(int) + 
                        (shares_outstanding[-1] <= shares_outstanding[0]).astype(int)
                        )
        
            operating = (
                        (gross_margin[-1] > gross_margin[0]).astype(int) +
                        (assets_turnover[-1] > assets_turnover[0]).astype(int)
                        )
        
            out[:] = preprocess(profit + leverage + operating)
            
    class Altman_Z(CustomFactor):
        inputs=[factset.Fundamentals.zscore_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, zscore_qf):
            out[:] = preprocess(zscore_qf[-1])
                
    class Quick_Ratio(CustomFactor):
        inputs=[factset.Fundamentals.quick_ratio_qf]
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, quick_ratio_qf):
            out[:] = preprocess(quick_ratio_qf[-1])
                
    class AdvancedMomentum(CustomFactor):
        inputs = (USEquityPricing.close, Returns(window_length=126))
        window_length = 252
        window_safe = True
 
        def compute(self, today, assets, out, prices, returns):
            am = np.divide(
            (
            (prices[-21] - prices[-252]) / prices[-252] -
            prices[-1] - prices[-21]
            ) / prices[-21],
            np.nanstd(returns, axis=0)
            )
                
            out[:] = preprocess(-am)

    class STA(CustomFactor):  
        inputs = [Fundamentals.operating_cash_flow,  
                  Fundamentals.net_income_continuous_operations,  
                  Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ocf, ni, ta):  
            ta = np.where(np.isnan(ta), 0, ta)  
            ocf = np.where(np.isnan(ocf), 0, ocf)  
            ni = np.where(np.isnan(ni), 0, ni)  
            out[:] = preprocess(abs(ni[-1] - ocf[-1])/ ta[-1])
            
    class SNOA(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                 Fundamentals.cash_and_cash_equivalents,  
                 Fundamentals.current_debt, # same as short-term debt?  
                 Fundamentals.minority_interest_balance_sheet,  
                 Fundamentals.long_term_debt, # check same?  
                 Fundamentals.preferred_stock] # check same?  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, ta, cace, cd, mi, ltd, ps):  
            ta = np.where(np.isnan(ta), 0, ta)  
            cace = np.where(np.isnan(cace), 0, cace)  
            cd = np.where(np.isnan(cd), 0, cd)  
            mi = np.where(np.isnan(mi), 0, mi)  
            ltd = np.where(np.isnan(ltd), 0, ltd)  
            ps = np.where(np.isnan(ps), 0, ps)  
            results = ((ta[-1]-cace[-1])-(ta[-1]-cace[-1]-ltd[-1]-cd[-1]-ps[-1]-mi[-1]))/ta[-1]  
            out[:] = preprocess(np.where(np.isnan(results),0,results))
            
    class ROA(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(np.where(roa[-1]>0,1,0))
            
    class FCFTA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                 Fundamentals.total_assets]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>0,1,0))
            
    class ROA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = np.where(roa[-1]>roa[-252],1,0)
            
    class FCFTA_ROA(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets,  
                  Fundamentals.roa]  
        window_length = 1
        window_safe = True
        def compute(self, today, assets, out, fcf, ta, roa):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>roa[-1],1,0))
            
    class FCFTA_GROWTH(CustomFactor):  
        inputs = [Fundamentals.free_cash_flow,  
                  Fundamentals.total_assets]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, fcf, ta):  
            out[:] = preprocess(np.where(fcf[-1]/ta[-1]>fcf[-252]/ta[-252],1,0))
            
    class LTD_GROWTH(CustomFactor):  
        inputs = [Fundamentals.total_assets,  
                  Fundamentals.long_term_debt]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, ta, ltd):  
            out[:] = preprocess(np.where(ltd[-1]/ta[-1]<ltd[-252]/ta[-252],1,0))
            
    class CR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.current_ratio]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, cr):  
            out[:] = preprocess(np.where(cr[-1]>cr[-252],1,0))
            
    class GM_GROWTH(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.where(gm[-1]>gm[-252],1,0))
            
    class ATR_GROWTH(CustomFactor):  
        inputs = [Fundamentals.assets_turnover]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, atr):  
            out[:] = preprocess(np.where(atr[-1]>atr[-252],1,0))
            
    class NEQISS(CustomFactor):  
        inputs = [Fundamentals.shares_outstanding]  
        window_length = 252
        window_safe = True
        def compute(self, today, assets, out, so):  
            out[:] = preprocess(np.where(so[-1]-so[-252]<1,1,0))
            
    class GM_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-252]+1,gm[-504]+1])-1)
            
    class GM_STABILITY_2YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(np.std([gm[-1]-gm[-252],gm[-252]-gm[-504]],axis=0)) 
            
    class ROA_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]+1, roa[-252]+1,roa[-504]+1])-1)
            
    class ROIC_GROWTH_2YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 504
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]+1, roic[-252]+1,roic[-504]+1])-1)
            
    class GM_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 8
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gmean([gm[-1]+1, gm[-2]+1, gm[-3]+1, gm[-4]+1, gm[-5]+1, gm[-6]+1, gm[-7]+1, gm[-8]+1])-1)        
            
    class GM_STABILITY_8YR(CustomFactor):  
        inputs = [Fundamentals.gross_margin]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, gm):  
            out[:] = preprocess(gm[-8]) 
            
    class ROA_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roa]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roa):  
            out[:] = preprocess(gmean([roa[-1]/100+1, roa[-2]/100+1,roa[-3]/100+1,roa[-4]/100+1,roa[-5]/100+1,roa[-6]/100+1,roa[-7]/100+1,roa[-8]/100+1])-1) 
            
    class ROIC_GROWTH_8YR(CustomFactor):  
        inputs = [Fundamentals.roic]  
        window_length = 9
        window_safe = True
        def compute(self, today, assets, out, roic):  
            out[:] = preprocess(gmean([roic[-1]/100+1, roic[-2]/100+1,roic[-3]/100+1,roic[-4]/100+1,roic[-5]/100+1,roic[-6]/100+1,roic[-7]/100+1,roic[-8]/100+1])-1)
            
    factors = [
            MessageSum,
            fcf,
            Direction,
            mean_rev,
            volatility,
            growthscore,
            peg_ratio,
            MoneyflowVolume5d,
            Trendline,
            SalesGrowth,
            GrossMarginChange,
            Gross_Income_Margin,
            MaxGap,
            CapEx_Vol,
            fcf_ev,
            DebtToTotalAssets,
            TEM,
            Piotroski,
            Altman_Z,
            Quick_Ratio,
            # AdvancedMomentum,
            # STA,
            # SNOA, 
            # ROA,  
            # FCFTA,  
            # ROA_GROWTH,  
            # FCFTA_ROA,
            # FCFTA_GROWTH, 
            # LTD_GROWTH,  
            # CR_GROWTH,  
            # GM_GROWTH,  
            # ATR_GROWTH, 
            # NEQISS,
            # GM_GROWTH_2YR,  
            # GM_STABILITY_2YR, 
            # ROA_GROWTH_2YR,  
            # ROIC_GROWTH_2YR,  
            # GM_STABILITY_8YR,  
            # ROA_GROWTH_8YR,  
            # ROIC_GROWTH_8YR,  
        ]
    
    return factors

class Factor_N_Days_Ago(CustomFactor):
    
    def compute(self, today, assets, out, input_factor):
        out[:] = input_factor[0]

def factor_pipeline():
    
    factors = make_factors()
    
    pipeline_columns = {}
    for k,f in enumerate(factors):
        for days_ago in range(N_FACTOR_WINDOW):
            pipeline_columns['alpha_'+str(k)+'_'+str(days_ago)] = Factor_N_Days_Ago([f(mask=QTradableStocksUS())], window_length=days_ago+1, mask=QTradableStocksUS())
    
    pipe = Pipeline(columns = pipeline_columns,
    screen = QTradableStocksUS())
    
    return pipe

def beta_pipeline():
    
    beta = SimpleBeta(target=sid(8554),regression_length=260,
                      allowed_missing_percentage=1.0
                     )
    
    pipe = Pipeline(columns = {'beta': beta},
    screen = QTradableStocksUS())
    return pipe
    
def initialize(context):    
    
    attach_pipeline(risk_loading_pipeline(), 'risk_loading_pipeline')
    attach_pipeline(beta_pipeline(), 'beta_pipeline')
    attach_pipeline(factor_pipeline(), 'factor_pipeline')
    
def before_trading_start(context, data):
 
    risk_loadings = pipeline_output('risk_loading_pipeline')
    risk_loadings.fillna(risk_loadings.median(), inplace=True)
    context.risk_loadings = risk_loadings
    context.beta_pipeline = pipeline_output('beta_pipeline')
    
    alphas = pipeline_output('factor_pipeline').dropna()
    
    n_factors = len(alphas.columns)/N_FACTOR_WINDOW
    n_stocks = len(alphas.index)
    
    alphas_flattened = np.zeros((n_factors,n_stocks*N_FACTOR_WINDOW))
    
    for f in range(n_factors):
        a = alphas.iloc[:,f*N_FACTOR_WINDOW:(f+1)*N_FACTOR_WINDOW].values
        alphas_flattened[f,:] = np.ravel(a)
    
    clustering = SpectralClustering(n_clusters=3,assign_labels="discretize",random_state=0).fit(alphas_flattened)
    
    print clustering.labels_
There was a runtime error.

I published a complete algo on https://www.quantopian.com/posts/alpha-combination-via-clustering. Thanks all for the helpful input.