Back to Community
Pipeline - Mean Reversion Example

Earlier this summer, we released an example algorithm that implements a mean reversion strategy. With the release of the Pipeline API, I redesigned the example to take advantage of the new features offered in Pipeline. Every day, this strategy ranks all stocks in the top 2% of Dollar Volume by their returns over the last 5 trading days. Every Monday at 11:00am it trades, closing all previously held positions. Then, the worst-performing stocks are longed and the top-performing stocks are shorted.

This example demonstrates how Pipeline can be used to create custom factors such as RecentReturns and DollarVolume, how to rank securities by these factors, and how to filter them down to a specific subset.

The mean reversion strategy is a good example of a strategy that uses hedging.

Please feel free to clone and tinker with this algorithm. I encourage you to post here with other similar strategies!

Clone Algorithm
733
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# This is a sample mean-reversion algorithm on Quantopian for you to test and adapt.
# This example uses pipeline to set its universe daily.

# Algorithm investment thesis: 
# Top-performing stocks from last week will do worse this week, and vice-versa.

# Every Monday, we rank high-volume stocks based on their previous 5 day returns. 
# We go long the bottom 10% of stocks with the WORST returns over the past 5 days.
# We go short the top 10% of stocks with the BEST returns over the past 5 days.

# This type of algorithm may be used in live trading and in the Quantopian Open.

# Import the libraries we will use here
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline import CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
import math

# Custom factors are defined as a class object, outside of initialize or handle data.
# RecentReturns will calculate the returns for a security over the n-most recent days.
class RecentReturns(CustomFactor):
    
    # Set the default list of inputs as well as the default window_length.
    # Default values are used if the optional parameters are not specified.
    inputs = [USEquityPricing.close] 
    window_length = 10

    # Computes the returns over the last n days where n = window_length.
    # Any calculation can be performed here and is applied to all stocks
    # in the universe.
    def compute(self, today, assets, out, close):
        out[:] = (close[-1] - close[0]) / close[0]
        
# DollarVolume will calculate yesterday's dollar volume for each stock in the universe.
class DollarVolume(CustomFactor):
    
    # We need close price and trade volume for this calculation.
    inputs = [USEquityPricing.close, USEquityPricing.volume]
    window_length = 1
    
    # Dollar volume is volume * closing price.
    def compute(self, today, assets, out, close, volume):
        out[:] = (close[0] * volume[0])

# The initialize function is the place to create your pipeline and set trading
# conditions such as commission and slippage.
def initialize(context):
    # Set execution cost assumptions. For live trading with Interactive Brokers 
    # we will assume a $1.00 minimum per trade fee, with a per share cost of $0.0075. 
    set_commission(commission.PerShare(cost=0.0075, min_trade_cost=1.00))
    
    # Set market impact assumptions. We limit the simulation to trade up to 2.5% 
    # of the traded volume for any one minute, and  our price impact constant is 0.1. 
    set_slippage(slippage.VolumeShareSlippage(volume_limit=0.025, price_impact=0.10))
    
    # Define the other variables
    context.long_leverage = 0.5
    context.short_leverage = -0.5
    context.lower_percentile = 10
    context.upper_percentile = 90
    context.returns_lookback = 5
           
    # Rebalance every Monday (or the first trading day if it's a holiday).
    # At 11AM ET, which is 1 hour and 30 minutes after market open.
    schedule_function(rebalance, 
                      date_rules.week_start(days_offset=0),
                      time_rules.market_open(hours = 1, minutes = 30)) 
    
    # Create, register and name a pipeline. 
    pipe = Pipeline()
    attach_pipeline(pipe, 'mean_reversion_example')
    
    # Create a recent_returns factor with a 5-day returns lookback.
    recent_returns = RecentReturns(window_length=context.returns_lookback)
    pipe.add(recent_returns, 'recent_returns')
    
    # Create a dollar_volume factor using default inputs and window_length
    dollar_volume = DollarVolume()
    pipe.add(dollar_volume, 'dollar_volume')
    
    # Define high dollar-volume filter
    high_dollar_volume = dollar_volume.percentile_between(98, 100)
    
    # Define high and low returns filters to be the bottom 10% and top 10% of
    # securities in the high dollar volume group.
    low_returns = recent_returns.percentile_between(0,10,mask=high_dollar_volume)
    high_returns = recent_returns.percentile_between(90,100,mask=high_dollar_volume)
    
    # Factors return a scalar value for each security in the entire universe
    # of securities. Here, we add the recent_returns rank factor to our pipeline
    # and we provide it with a mask such that securities that do not pass the mask
    # (i.e. do not have high dollar volume), are not considered in the ranking.
    pipe.add(recent_returns.rank(mask=high_dollar_volume), 'recent_returns_rank')
    
    # Add a filter to the pipeline such that only high-return and low-return
    # securities are kept.
    pipe.set_screen(low_returns | high_returns)
    
    # Add the low_returns and high_returns filters as columns to the pipeline so
    # that when we refer to securities remaining in our pipeline later, we know
    # which ones belong to which category.
    pipe.add(low_returns, 'low_returns')
    pipe.add(high_returns, 'high_returns')
    

# Called every day before market open. This is where we access the securities
# that made it through the pipeline.
def before_trading_start(context, data):
    
    # Pipeline_output returns the constructed dataframe.
    context.output = pipeline_output('mean_reversion_example')
    
    # Sets the list of securities we want to long as the securities with a 'True'
    # value in the low_returns column.
    context.long_secs = context.output[context.output['low_returns']]
    
    # Sets the list of securities we want to short as the securities with a 'True'
    # value in the high_returns column.
    context.short_secs = context.output[context.output['high_returns']]
    
    # Update our universe to contain the securities that we would like to long and short.
    update_universe(context.long_secs.index.union(context.short_secs.index))
    
# This rebalancing is called according to our schedule_function settings.     
def rebalance(context,data):
    
    # Get any stocks we ordered last time that still have open orders.
    open_orders = get_open_orders()
        
    # Set the allocations to even weights in each portfolio.
    long_weight = context.long_leverage / (len(context.long_secs) + len(open_orders)/2)
    short_weight = context.short_leverage / (len(context.short_secs) + len(open_orders)/2)
    
    # For each security in our universe, order long or short positions according
    # to our context.long_secs and context.short_secs lists, and sell all previously
    # held positions not in either list.
    for stock in data:
        # Guard against ordering too much of a given security if a previous order
        # is still unfilled.
        if stock not in open_orders:
            if stock in context.long_secs.index:
                order_target_percent(stock, long_weight)
            elif stock in context.short_secs.index:
                order_target_percent(stock, short_weight)
            else:
                order_target_percent(stock, 0)
    
    # Log the long and short orders each week.
    log.info("This week's longs: "+", ".join([long_.symbol for long_ in context.long_secs.index]))
    log.info("This week's shorts: "  +", ".join([short_.symbol for short_ in context.short_secs.index]))
        
        
# The handle_data function is run every bar.    
def handle_data(context,data):    
    # Record and plot the leverage of our portfolio over time. 
    record(leverage = context.account.leverage)

    # We also want to monitor the number of long and short positions 
    # in our portfolio over time. This loop will check our positition sizes 
    # and add the count of longs and shorts to our plot.
    longs = shorts = 0
    for position in context.portfolio.positions.itervalues():
        if position.amount > 0:
            longs += 1
        if position.amount < 0:
            shorts += 1
    record(long_count=longs, short_count=shorts)
We have migrated this algorithm to work with a new version of the Quantopian API. The code is different than the original version, but the investment rationale of the algorithm has not changed. We've put everything you need to know here on one page.
There was a runtime error.
Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

11 responses

I've been trying to write a similar algo to this one, but that uses the built-in RSI factor rather than returns. I keep getting the same error.

Here's the algo:

from quantopian.algorithm import attach_pipeline, pipeline_output  
from quantopian.pipeline import Pipeline  
from quantopian.pipeline import CustomFactor  
from quantopian.pipeline.data.builtin import USEquityPricing  
from quantopian.pipeline.factors import RSI  
import math

# DollarVolume will calculate yesterday's dollar volume for each stock in the universe.  
class DollarVolume(CustomFactor):  
    # We need close price and trade volume for this calculation.  
    inputs = [USEquityPricing.close, USEquityPricing.volume]  
    window_length = 1  
    # Dollar volume is volume * closing price.  
    def compute(self, today, assets, out, close, volume):  
        out[:] = (close[0] * volume[0]) 

def initialize(context):  
    # Define the other variables  
    context.long_leverage = 0.5  
    context.short_leverage = -0.5  
    context.rsi_lookback = 2  
    # Rebalance every Monday (or the first trading day if it's a holiday).  
    # At 11AM ET, which is 1 hour and 30 minutes after market open.  
    schedule_function(rebalance,  
                      date_rules.every_day(),  
                      time_rules.market_close())  
    # Create, register and name a pipeline in initialize.  
    pipe = Pipeline()  
    attach_pipeline(pipe, 'rsi_example')  
    # Create a dollar_volume factor using default inputs and window_length  
    dollar_volume = DollarVolume()  
    pipe.add(dollar_volume, 'dollar_volume')  
    # Define high dollar-volume filter  
    high_dollar_volume = dollar_volume.percentile_between(98, 100)  
    # Construct RSI factor.  
    rsi = RSI(inputs=[USEquityPricing.close], window_length=context.rsi_lookback)  
    # Define high and low RSI to be the bottom 10% and top 10% of  
    # securities in the high dollar volume group.  
    rsi_high = rsi.percentile_between(90, 100, mask=high_dollar_volume)  
    rsi_low = rsi.percentile_between(0, 10, mask=high_dollar_volume)  
    # Add RSI rank factor to our pipeline, using high dollar volume mask.  
    pipe.add(rsi.rank(mask=high_dollar_volume), 'rsi')  
    # Add a filter to the pipeline such that only high-RSI and low-RSI  
    # securities are kept.  
    pipe.set_screen(rsi_high | rsi_low)  
    # Add the rsi_high and rsi_low filters as columns to the pipeline so  
    # that when we refer to securities remaining in our pipeline later,  
    # we know which ones belong to which category.  
    pipe.add(rsi_low, 'rsi_low')  
    pipe.add(rsi_high, 'rsi_high')

# Called every day before market open. This is where we access the securities  
# that made it through the pipeline.  
def before_trading_start(context, data):  
    # Pipeline_output returns the constructed dataframe.  
    context.output = pipeline_output('rsi_example')  
    # Sets the list of securities we want to long as the securities with a 'True'  
    # value in the rsi_low column.  
    context.long_secs = context.output[context.output['rsi_low']]  
    # Sets the list of securities we want to short as the securities with a 'True'  
    # value in the rsi_high column.  
    context.short_secs = context.output[context.output['rsi_high']]  
    # Update our universe to contain the securities that we would like to long and short.  
    update_universe(context.long_secs.index.union(context.short_secs.index))

# This rebalancing is called according to our schedule_function settings.  
def rebalance(context, data):  
    # Get any stocks we ordered last time that still have open orders.  
    open_orders = get_open_orders()  
    # Set the allocations to even weights in each portfolio.  
    long_weight = context.long_leverage / (len(context.long_secs) + len(open_orders)/2)  
    short_weight = context.short_leverage / (len(context.short_secs) + len(open_orders)/2)  
    # For each security in our universe, order long or short positions according  
    # to our context.long_secs and context.short_secs lists, and sell all previously  
    # held positions not in either list.  
    for stock in data:  
        # Guard against ordering too much of a given security if a previous order  
        # is still unfilled.  
        if stock not in open_orders:  
            if stock in context.long_secs.index:  
                order_target_percent(stock, long_weight)  
            elif stock in context.short_secs.index:  
                order_target_percent(stock, short_weight)  
            else:  
                order_target_percent(stock, 0)  
    # Log the long and short orders each week.  
    log.info("Todays's longs: "+", ".join([long_.symbol for long_ in context.long_secs.index]))  
    log.info("Todays's shorts: "  +", ".join([short_.symbol for short_ in context.short_secs.index]))  

# The handle_data function is run every bar.  
def handle_data(context,data):  
    # Record and plot the leverage of our portfolio over time.  
    record(leverage = context.account.leverage)

    # We also want to monitor the number of long and short positions  
    # in our portfolio over time. This loop will check our positition sizes  
    # and add the count of longs and shorts to our plot.  
    longs = shorts = 0  
    for position in context.portfolio.positions.itervalues():  
        if position.amount > 0:  
            longs += 1  
        if position.amount < 0:  
            shorts += 1  
    record(long_count=longs, short_count=shorts)  

And here's the error:


ValueError: Unknown keyword argument 'locals_dict'  
There was a runtime error on line 68.

Any help would be much appreciated.

Graham

Graham,
I can see the issue you are having. It looks like something is definitely up with the RSI built in factor. I'll have someone take a look at it.

Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Hi Karen,

Glad I wasn't just making a simple mistake. Thanks for the quick response.

Graham

I'm having the same issue :(, with ValueError: Unknown keyword argument 'locals_dict'

Same problem for me.
When using RSI as factor or screen -> "ValueError: Unknown keyword argument 'locals_dict'"

Same problem.

Me too

Hi folks,
The RSI custom factor has been fixed. Please let me know if you have any other issues.

Can the Pipeline be used as a market scanner? For example, in a lot of technical trading platform you can specify a particular pattern and scan the entire universe of stocks that fit that? But my criteria of interest for the scanner might be more complex than most traditional technical trading platform would offer. Hence the interest in Quantopian. I'm hoping the Pipeline be my daily or real-time market scanner.

Please advise.

Thanks!

@jamie - I'm curious why you picked 11am? Backtest fitting or some specific reason?

@Peter: I think I understand what you're asking, and it sounds like Pipeline is a good fit. You can use pipeline to create selection rules that scan all 8000+ securities each day. Check out the Pipeline Tutorial for more.

@Matthew: I picked 11am arbitrarily. One of the goals of this example was to be a good example for people to use to figure out the API. 11AM required me to specify hours and minutes offset in the schedule_function call. On that note, most of the parameters of the algo were selected arbitrarily, it was meant to be a bit of a template for the community to play with more than it was a high quality strategy!