Back to Community
PipeLine Calculating Beta

As I understand the PipeLine loops over the entire stocks universe available and allow access to its closes and other columns

What I am trying to do is calculate the Beta Coefficent for each stock, to do so I have to access that last X closes for that stock and the SPY for the same period, here is my implementation so for:

window_length=100

class Beta(CustomFactor):  

    inputs = [USEquityPricing.close]

    def compute(self, today, assets, out, close):  
        # 8554 is SPY  
        spy_column = close[:, assets.searchsorted(8554)]  
        #spy_closes = spy_column[:, None]

        stock_returns = np.diff(close[-1:-100:-1])  
        #print (close[-1:-100:-1])  
        spy_returns = np.diff(spy_column)

        beta = np.cov(spy_returns,stock_returns)/np.var(spy_returns)

        out[:] = beta  

I got an error : ValueError: all the input array dimensions except for the concatenation axis must match exactly
I even tried to make the diemensions similar by:-

    arr2d = np.array([[],spy_column])  
    spy_returns = np.diff(arr2d[1])  

Is there a way to get access to the entire column for that particular stock?

20 responses

Hi there,

I've actually been playing around with a similar algorithm myself as part of a separate project I've been working on. Here is a pipeline algorithm that computes beta for each stock. It's fairly messy right now and by no means a finished product, but should give you the code you need.

Hope this helps.

Thanks,
Delaney

Clone Algorithm
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Import the libraries we will use here
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline import CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
import numpy as np

from statsmodels import regression
import statsmodels.api as sm

# Custom factors are defined as a class object, outside of initialize or handle data
class Beta(CustomFactor):
    
    # Set the default list of inputs as well as the default window_length.
    # Default values are used if the optional parameters are not specified.
    inputs = [USEquityPricing.close] 
    window_length = 100

    # Any calculation can be performed here and is applied to all stocks
    # in the universe.
    def compute(self, today, assets, out, close):
        
        benchmark_index = np.where((assets == 8554) == True)[0][0]
        
        benchmark_p = close[:, benchmark_index]
        log_benchmark_p = np.log(benchmark_p)
        log_benchmark_r = np.diff(log_benchmark_p)[1:]
        benchmark_r = 1 - np.exp(log_benchmark_r)
        X = benchmark_r
        X = sm.add_constant(X)
        
        for i in range(len(assets)):
            p = close[:, i]
            log_p = np.log(p)
            log_r = np.diff(log_p)[1:]
            r = 1 - np.exp(log_r)
            Y = r
            
            model = regression.linear_model.OLS(Y, X).fit()
            
            alpha, beta = model.params
            
            out[i] = beta
        
# DollarVolume will calculate yesterday's dollar volume for each stock in the universe.
class DollarVolume(CustomFactor):
    
    # We need close price and trade volume for this calculation.
    inputs = [USEquityPricing.close, USEquityPricing.volume]
    window_length = 1
    
    # Dollar volume is volume * closing price.
    def compute(self, today, assets, out, close, volume):
        out[:] = (close[0] * volume[0])

def initialize(context):
    # Set execution cost assumptions. For live trading with Interactive Brokers 
    # we will assume a $1.00 minimum per trade fee, with a per share cost of $0.0075. 
    set_commission(commission.PerShare(cost=0.0075, min_trade_cost=1.00))
    
    # Set market impact assumptions. We limit the simulation to 
    # trade up to 2.5% of the traded volume for any one minute,
    # and our price impact constant is 0.1. 
    set_slippage(slippage.VolumeShareSlippage(volume_limit=0.025, price_impact=0.10))
    
    # Define the other variables
    context.long_leverage = 0.5
    context.short_leverage = -0.5
    context.long_number = 250
    context.short_number = 250
    context.returns_lookback = 100
           
    # Rebalance every month
    schedule_function(rebalance, 
                      date_rules.month_start(),
                      time_rules.market_open(minutes = 30)) 
    
    # Create, register and name a pipeline. 
    pipe = Pipeline()
    attach_pipeline(pipe, 'beta_example')
    
    # Create the recent_returns factor with a 5-day returns lookback.
    beta = Beta(window_length=context.returns_lookback)
    
    # Create the dollar_volume factor using default inputs and window_length
    dollar_volume = DollarVolume()
    
    pipe.add(dollar_volume, 'dollar_volume')
    
    # Factors return a scalar value for each security in the entire universe
    # of securities. Here, we add the recent_returns factor to our data object
    # and filter our results with the mask optional parameter to stocks in the 
    # top N% dollar-volume.
    
    beta_rank = beta.rank(mask=dollar_volume.percentile_between(20, 100), ascending=False)
    pipe.add(beta_rank, 'beta_rank')
    
    low_rank = beta_rank.bottom(250)
    high_rank = beta_rank.top(250)
    
    # Filters for rows with high or low beta
    pipe.set_screen(low_rank | high_rank)
    
    # Adds these filters as columns to our pipe
    pipe.add(low_rank, 'low_rank')
    pipe.add(high_rank, 'high_rank')
    

# Called every day before market open. This is a good place to access our factors.
def before_trading_start(context, data):
    
    # Pipeline_output returns the constructed dataframe.
    context.output = pipeline_output('beta_example')
    # Sets the list of securities we want to long as the securities with a 'True'
    # value in the high_rank column.
    context.long_secs = context.output[context.output['high_rank']]
    
    # Sets the list of securities we want to short as the securities with a 'True'
    # value in the low_rank column.
    context.short_secs = context.output[context.output['low_rank']]
    
    # Update our universe to contain the securities that we would like to long and short.
    update_universe(context.long_secs.index.union(context.short_secs.index))
    
# This rebalancing is called according to our schedule_function settings.     
def rebalance(context,data):
    # Get any stocks we ordered last time that still have open orders.
    open_orders = get_open_orders()
        
    # Set the allocations to even weights in each portfolio.
    long_weight = context.long_leverage / (len(context.long_secs) + len(open_orders)/2)
    short_weight = context.short_leverage / (len(context.short_secs) + len(open_orders)/2)
    
    # For each security in our universe, order long or short positions according
    # to our context.long_secs and context.short_secs lists, and sell all previously
    # held positions not in either list.
    for stock in data:
        # Guard against ordering too much of a given security if a previous order
        # is still unfilled.
        if stock not in open_orders:
            if stock in context.long_secs.index:
                order_target_percent(stock, long_weight)
            elif stock in context.short_secs.index:
                order_target_percent(stock, short_weight)
            else:
                order_target_percent(stock, 0)
    
    # Log the long and short orders each week.
    log.info("This week's longs: "+", ".join([long_.symbol for long_ in context.long_secs.index]))
    log.info("This week's shorts: "  +", ".join([short_.symbol for short_ in context.short_secs.index]))
        
        
# The handle_data function is run every bar.    
def handle_data(context,data):    
    # Record and plot the leverage of our portfolio over time. 
    record(leverage = context.account.leverage,
           positions = len(context.portfolio.positions))

    # We also want to monitor the number of long and short positions 
    # in our portfolio over time. This loop will check our positition sizes 
    # and add the count of longs and shorts to our plot.
    longs = shorts = 0
    for position in context.portfolio.positions.itervalues():
        if position.amount > 0:
            longs += 1
        if position.amount < 0:
            shorts += 1
    record(long_count=longs, short_count=shorts)
There was a runtime error.
Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Thank you, but the code runs very slow and I get regular time outs!, I think the problem is in the loop inside the beta class, If we just can get the whole column of the current asset in play as in my try above, it will be faster.

Thanks again and if you optimized it I'd like to see it.

Yeah the issue is that as far as I know, linear regression doesn't have a vectorized implementation in Python. Your approach of using covariance is actually pretty interesting and I'll add it to my queue to modify my algo and see if it works. I'll let you know if I get anywhere.

So this caught my eye a few weeks ago when it was originally posted and I figured I'd give it a go, but just got around to it tonight. I tried computing one giant covariance matrix in compute and then just take the column with the covarinces against SPY. As you can imagine computing a covariance matrix of that size that many times is pretty tough and I immediately got a timeout.

It turns out it is "faster" to compute the covariance between each security and SPY iteratively, I used the pandas apply method. I tried to do as much work outside of the function I feed apply, that's why you see the returns data, and the benchmark variance computed and fed in as args. I think there are some pretty neat BLAS algorithms for the fast computation of covariance matrices, I couldn't speak to whether numpy implements those or not as I've only come across them in passing.

The strategy the algo uses is pretty dumb. It just longs stocks with a low magnitude of beta and shorts those with a high magnitude. For fun I compared it to the ETF BTAL an "anti-beta" ETF from QuantShares I think.

Clone Algorithm
65
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import CustomFactor

import pandas as pd
import numpy as np
from scipy import stats


def _beta(ts, benchmark, benchmark_var):
    return np.cov(ts, benchmark)[0, 1] / benchmark_var 


class Beta(CustomFactor):
    
    inputs = [USEquityPricing.close]
    window_length = 60
    
    def compute(self, today, assets, out, close):
        returns = pd.DataFrame(close, columns=assets).pct_change()[1:]
        spy_returns = returns[sid(8554)]
        spy_returns_var = np.var(spy_returns)
        out[:] = returns.apply(_beta, args=(spy_returns,spy_returns_var,))

        
class AvgDailyDollarVolumeTraded(CustomFactor):
    
    inputs = [USEquityPricing.close, USEquityPricing.volume]
    window_length = 20
    
    def compute(self, today, assets, out, close_price, volume):
        out[:] = np.mean(close_price * volume, axis=0)

# Put any initialization logic here.  The context object will be passed to
# the other methods in your algorithm.
def initialize(context):
    
    pipe = Pipeline()
    pipe = attach_pipeline(pipe, name='beta_metrics')
    pipe.add(Beta(), "beta")
    
    dollar_volume = AvgDailyDollarVolumeTraded()
    
    # Screen out penny stocks and low liquidity securities.
    pipe.set_screen(dollar_volume > 10**7)
    context.shorts = None
    context.longs = None
    
    schedule_function(remove_to_be_delisted, date_rules.every_day(), time_rules.market_open())
    schedule_function(rebalance, date_rules.month_start())
    schedule_function(cancel_open_orders, date_rules.every_day(),
                      time_rules.market_close())
    set_commission(commission.PerShare(cost=0, min_trade_cost=0))
    set_slippage(slippage.FixedSlippage(spread=0))
    set_benchmark(sid(41891))
    
def before_trading_start(context, data):
    results = pipeline_output('beta_metrics').dropna()         
    ranks = results["beta"].abs().rank().order()
    
    context.shorts = ranks.tail(200)
    context.longs = ranks.head(200)
    
    print results.loc[context.longs.head().index]
    print results.loc[context.shorts.head().index]
    
    # The pipe character "|" is the pandas union operator
    update_universe(context.longs.index | context.shorts.index)
    

# Will be called on every trade event for the securities you specify. 
def handle_data(context, data):
    record(lever=context.account.leverage,
           exposure=context.account.net_leverage,
           num_pos=len(context.portfolio.positions),
           oo=len(get_open_orders()))

    
def cancel_open_orders(context, data):
    for security in get_open_orders():
        for order in get_open_orders(security):
            cancel_order(order)
            
def remove_to_be_delisted(context, data):
    for security in context.portfolio.positions:        
        if (security.end_date - get_datetime()).days < 5: 
            order_target_percent(security, 0)
        
    
def rebalance(context, data):
    for security in context.shorts.index:
        if get_open_orders(security):
            continue
        if security in data:
            order_target_percent(security, -0.5 / len(context.shorts))
            
    for security in context.longs.index:
        if get_open_orders(security):
            continue
        if security in data:
            order_target_percent(security, 0.5 / len(context.longs))
            
    for security in context.portfolio.positions:
        if get_open_orders(security):
            continue
        if security in data:
            if security not in (context.longs.index | context.shorts.index):
                order_target_percent(security, 0)
There was a runtime error.
Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Pretty good implementation, that's what I was looking for, Thank you!

Hi!
Noob question regarding pipeline and data processing.

These lines:

        log_benchmark_r = np.diff(log_benchmark_p)[1:]  
        benchmark_r = 1 - np.exp(log_benchmark_r)  

And this lines:

# DollarVolume will calculate yesterday's dollar volume for each stock in the universe.  
    # Dollar volume is volume * closing price.  
    def compute(self, today, assets, out, close, volume):  
        out[:] = (close[0] * volume[0])  

somewhat suggest that the data is stored in decresing by peirod order. I.e. the latest (yesterday) observation has index [0], the day before yesterday - [1], etc.

I have just run a code with this simple compute method:

    def compute(self, today, assets, out, close):  
        benchmark_index = np.where((assets == 8554) == True)[0][0]  
        benchmark_p = close[:, benchmark_index]  
        print(benchmark_p)  
        out[:] = 0  

i.e. simply prining benchmark_p. And here is the result (window_length = 3) for 4 days:

PRINT [ 200.79197124  205.66508     206.53      ]  
PRINT [ 205.66508  206.53     207.45   ]  
PRINT [ 206.53  207.45  207.75]  
PRINT [ 207.45  207.75  207.8 ]  

The numbers shift from right to left, so it's kinda means that the observation [0] is the earliest, not the latest. How come?
Thanks!

Hey Oleg,

Good catch. The data are stored in ascending period order, with 0 being the earliest and -1 being the latest observation. The dollar volume factor is slightly confusing, as we are looking at an array of length 1, as specified in the line window_length = 1 in the custom factor. In this case the first and last observations are the same thing, so 0 and -1 can be used equivalently. It would probably lead to more readable code to swap in -1 instead of zero.

Thanks,
Delaney

Thanks, Delaney, I see it now!

But I still don't quite get this block:

        benchmark_p = close[:, benchmark_index]  
        log_benchmark_p = np.log(benchmark_p)  
        log_benchmark_r = np.diff(log_benchmark_p)[1:]  
        benchmark_r = 1 - np.exp(log_benchmark_r)  

It gets us consequently:

[..., P_t, P_{t+1}, ...]
[..., log(P_t), log(P_{t+1}), ...]
[..., log(P_t/P_{t-1}), log(P_{t+1}/P_t), ...] (as numpy.diff computes out[n] = a[n+1] - a[n])
[..., (P_{t-1} - P_t)/P_{t-1}, (P_t - P_{t+1})/P_t, ...]

and the last line is kind of {-r_t} whereas I would expect just {r_t}. Not that it affects beta computation, it's just I have a feeling I'm missing some point here.

Looking at the code again I think you may be right. I'm not sure about the last line in your derivation, but either way I think it might have to be

benchmark_r = np.exp(log_benchmark_r) - 1  

The reasoning being that you get P_{t+1}/P_{t} in the second to last line, and this will be something like 1.05, not 0.05 like we want. Seems like there's a bug in the code. There are new built-in factors coming out soon that should make this significantly easier, though, so rather than mucking with the old code I posted I'd rather show a more efficient beta computation using those factors when they're available.

Ok, I see. Thanks, Delaney, I'll be waiting for the updates!

Delaney,

Any idea when new built in factors are coming out?

Hello Miles, upon reviewing the recently released Returns factor, it still doesn't quite match the functionality here. I'm working on a new set of pipeline projects that aim to replicate industry quant workflows, so it might be best to wait for those to come out and explore that code.

Just updating the post because this is the first result when searching "pipeline beta".

We now have an official solution: new pipeline factors.

Also, here is the code for custom factors that calculate Beta and abnormal returns (returns in excess of Beta), just in case someone needs to do little customization:

Edit 8/7/2016: fixed beta calculation. I inverted the stats.linregress arguments, so instead of getting the Beta the function was returning 1/Beta.

def _beta(stock_prices, bench_prices):  
    # `linregress` returns its results in the following order:  
    # slope, intercept, r-value, p-value, stderr  
    regr_results = stats.linregress(y=stock_prices, x=bench_prices)  
    #alpha = regr_results[1]  
    beta = regr_results[0]  
    #r_value = regr_results[2]  
    p_value = regr_results[3]  
    #stderr = regr_results[4]  
    # Check null hypothesis  
    if p_value > 0.05:  
        beta = 0.  
    return beta

class Beta(CustomFactor):  
    inputs = [USEquityPricing.close]  
    params = ('delta_days', 'market_sid',)  
    def compute(self, today, assets, out, close, delta_days, market_sid):  
        returns = (close[delta_days:] - close[:-delta_days]) # absolute returns  
        returns /= close[:-delta_days]                       # percentage returns  
        market_idx = assets.get_loc(market_sid)  
        market_returns = returns[:,market_idx]  
        betas = np.apply_along_axis(_beta, 0, returns, market_returns)  
        out[:] = betas

class ReturnsBetaExcess(CustomFactor):  
    """  
    Calculates the percent change in close price (beta adjusted) over the given window_length.  
    **Default Inputs**: [USEquityPricing.close]  
    """  
    params = ('delta_days', 'market_sid',)  
    inputs = [USEquityPricing.close]  
    window_length = 60  
    def compute(self, today, assets, out, close, delta_days, market_sid):  
        returns = (close[delta_days:] - close[:-delta_days]) # absolute returns  
        returns /= close[:-delta_days]                       # percentage returns  
        market_idx = assets.get_loc(market_sid)  
        market_returns = returns[:,market_idx]  
        betas = np.apply_along_axis(_beta, 0, returns, market_returns)  
        returns -= (returns[:, [market_idx] ] * betas) # remove returns due to beta  
        out[:] = returns[-1]

For Beta calculated on daily returns and using  SPY as marked, use them like this:

market = symbol('SPY')

Beta(delta_days=1, market_sid=market.sid)  
ReturnsBetaExcess(delta_days=1, market_sid=market.sid)

Thanks very much, Luca. I hadn't gotten around to this yet.

I actually found a bug in the beta calculation (now fixed in my previous post). So I decided to create a NB with a test case, to double check my code and as a reference for the future. In my test I use SPY as a benchmark and S&P500 levertaged ETFs ('SSO', 'UPRO', 'SPXL', 'SPXS', 'SDS', 'SPXU') as test cases.

EDIT: Not sure why I wrote ".head()", it doesn't make any sense and it should be removed.

leverage = (ret / ret[market] ).head()  
Loading notebook preview...

This notebook is awesome, Luca.

Cov matrix can be computed using Beta1 * Beta2 * SDIndex which is less computationally intense. Also the Beta Pipeline factor can perhaps have another name and can return alpha, beta and SEs and p-values of model and coefficients as they are also calculated. Otherwise if you need them you have to make multiple such factors with redundant calculations.

"Talk is cheap. Show me the code." Linus Torvalds

No offence ;)

@Luca I just came across your code above for ReturnsBetaExcess. It's exactly what I want. You are the best.

The new Slice feature is also useful. I am reimplementing the above, as I didn't have SPY in my universe.

To make sure I have the benchmark (e.g. SPY) for ReturnsBetaExcess in my universe I usually do the following:

market = symbol('SPY')  
# my_filter is the screen I use in my pipeline and pipeline factor's masks  
my_filter |= create_sid_screen([market.sid])  

Compared to the Slice feature this has the side effect that you end up having SPY in the universe instead of using it for the computation only (the code for create_sid_screen is in my NB of few replies above).