Back to Community
Executing a pipeline on a fetched universe expands the universe

Hi. total newbie here, so probably doing something obviously wrong.

I am importing a predefined universe via fetch, then attempting to filter down that universe based on some signals. when i apply the pipeline, the size of the universe grows. is there a way to apply the filter to just the existing universe? I'm having some issues with fetch as well, so this is just a working sample with dummy data.

def initialize(context):  
    #set universe to the contents of our file  
    fetch_csv(  
        'https://dl.dropboxusercontent.com/u/169032081/SP500.csv',  
        date_column = 'date', universe_func=my_universe, date_format = '%M/%D/%Y')  
    #setup filter  
    pipe = Pipeline()  
    pipe = attach_pipeline(pipe, name='my_pipeline')  
    sma_1 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=1)  
    sma_8 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=8)  
    sma_20 = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=20)  
    dollar_volume = AvgDailyDollarVolumeTraded(window_length=7)  
    pipe.add(sma_1, 'sma_1')  
    pipe.add(sma_8, 'sma_8')  
    pipe.add(sma_20, 'sma_20')  
    pipe.add(dollar_volume, 'dollar_volume')  
    pipe.set_screen( (sma_8>sma_20) & (sma_1>sma_20) & (dollar_volume>400000.0) )  
# my_universe returns a set of securities that define your universe passed to the fetcher function  
def my_universe(context, fetcher_data):  
    # limit universe to securities that meet our uptrend filter  
    # set my_stocks to be every security in the fetcher_data  
    my_stocks = set(fetcher_data['sid'])  
    return my_stocks  
# The handle_data function is run every bar.  
def handle_data(context,data):  
    pass

def before_trading_start(context, data):  
    if len(data) > 0:  
        # limit universe to securities that meet our filter  
        log.debug('pre-filter universe size {c}'.format(c=len(data)))  
        sids = pipeline_output('my_pipeline')  
        log.debug('post-filter universe size {c}'.format(c=len(sids)))  
        update_universe(sids)  
    record(universe_size = len(data))

logs show:
2013-10-02before_trading_start:56DEBUGpre-filter universe size 494
2013-10-02before_trading_start:58DEBUGpost-filter universe size 3223

any ideas? the call to update_universe(sids) is also failing, which i suspect is related

8 responses

Hi Ethan,
These two lines of code,

sids = pipeline_output('my_pipeline')  
update_universe(sids)  

Are updating your universe with everything that is returned by the pipeline.

Everything returned by the pipeline is all securities (8000+ on any given day) that pass your screen.
pipe.set_screen( (sma_8>sma_20) & (sma_1>sma_20) & (dollar_volume>400000.0) )

It also looks like fetch_csv is updating your universe with additional securities (which may or may not be in the pipeline list).

Can I ask why you are pre-defining your universe as opposed to using the pipeline to get the values for all securities in the universe and then filtering down from there? A couple of people have asked for this since we launched the API, and I am trying to understand why you would want to manually set the universe as opposed to using the pipeline functionality to do this.

Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Hi Karen,

I'm testing an external news event signal that i've been working on. being news based, the signal is per ticker, not a general long/short signal. basic research has shown that this signal correlates to prices moves anywhere from hours to months after the trigger event. buying the entire universe yield positive results, but not alpha. I want to try narrow down my event trigger universe to stocks that are trending up

from your docs it looked to me like I could use fetch_csv to define my universe on a daily basis for backtesting. the pipeline seemed like it would be a good way to the further filter that down. It appears that the data set used by the pipeline also define the universe. it would be quite valuable to allow those constructs to be separated.

I think for now, I will try to figure out some way to determine the intersection between my universe and the filter results and then use that for subsequent processing.

Thanks for the explanation! This is great detail.

I think I would import your list using fetch_csv, but not set it as your algo universe.

Then calculate the factors you want using the pipeline don't worry about narrowing down the universe, you can do the calculations for 8000+ securities in the pipeline.

Then in before_trading_starts get the output of the pipeline and use pandas to reduce the output dataframe to the list of SIDs you imported in fetch_CSV.

Hi Karen,

thanks again for following up. just a few more points:

  1. since my signaled tickers change every day (new ones added, old ones fall off), I need a way to process the specific tickers that were valid on a given day in the back test. since fetch_csv is called once during initialize, I don't see how I could get just the subset of tickers that are valid on a given day without using the universe parameter to set fetch_csv could be called in before_trading_starts and it returned a dataframe, I could make sense of that, but not
    with the current implementation

  2. fetch_csv timeout. given my scenario (a different set of tickers for each day), my input file is very large and runs into timeout issues. I understand the need to have timeouts and keep this optimized, but... there have to be better ways to do this. maybe support a "valid for x number of days" column for each start date, or an end date. happy to work with someone on this offline if that would help

  3. do you have any examples/samples of matching up the output of the dataframe returned from pipeline_output against a universe? I tried so many variants of this and nothing seems to work:

    longs = {}  
    if len(data) > 0 :  
        filter_results = pipeline_output('my_pipeline')

        for f_sid in filter_results.index.values :  
            if f_sid in data:  
                longs[f_sid] = data[f_sid]

    context.long_list = longs  
    record(Longs = len(context.long_list))  

I realize this is probably more of a datframe syntax question, but matching up keys should not be this hard. pointer to an existing working example would be fine

thanks for your help

Hi Ethan,

To use Fetcher in conjunction with Pipeline, the Fetcher securities need to be handled in handle_data or in a scheduled function. The reason for this is that Fetcher data is not injected into the universe until after before_trading_start. The tricky part here is that all sids in your universe (whether they came from pipeline or fetcher) are added to data. The way to distinguish which securities came from fetcher and which ones did not is to add a column in the fetcher file as a 'tag'. If you add a column like 'tag' and give it some arbitrary value, securities that come from Fetcher will have this column as an added property in the data variable.

I've attached a backtest which is a copy of the pipeline mean reversion strategy with the addition of a fetched .csv file. If you take a look at lines 126-129, this is where I look for an instance of my fetcher data in the pipeline output. I created a column called 'meta' in the .csv file and check each element in data to see if they have the 'meta' property. If so, it was in my fetcher file.

I believe this should help you get what you want!

Regarding the fetch_csv timeout, the fetcher file is read in once at the start of your algorithm, so the timeout is related to how long it takes to read in the file. Unless I'm misunderstanding, I don't think that a "valid for x number of days" column would actually help!

Let me know if you have any questions.

Clone Algorithm
740
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# This is a sample mean-reversion algorithm on Quantopian for you to test and adapt.
# This example uses pipeline to set its universe daily.

# Algorithm investment thesis: 
# Top-performing stocks from last week will do worse this week, and vice-versa.

# Every Monday, we rank high-volume stocks based on their previous 5 day returns. 
# We go long the bottom 10% of stocks with the WORST returns over the past 5 days.
# We go short the top 10% of stocks with the BEST returns over the past 5 days.

# This type of algorithm may be used in live trading and in the Quantopian Open.

# Import the libraries we will use here
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline import CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
import math

# Custom factors are defined as a class object, outside of initialize or handle data.
# RecentReturns will calculate the returns for a security over the n-most recent days.
class RecentReturns(CustomFactor):
    
    # Set the default list of inputs as well as the default window_length.
    # Default values are used if the optional parameters are not specified.
    inputs = [USEquityPricing.close] 
    window_length = 5

    # Computes the returns over the last n days where n = window_length.
    # Any calculation can be performed here and is applied to all stocks
    # in the universe.
    def compute(self, today, assets, out, close):
        out[:] = (close[-1] - close[0]) / close[0]
        
# DollarVolume will calculate yesterday's dollar volume for each stock in the universe.
class DollarVolume(CustomFactor):
    
    # We need close price and trade volume for this calculation.
    inputs = [USEquityPricing.close, USEquityPricing.volume]
    window_length = 1
    
    # Dollar volume is volume * closing price.
    def compute(self, today, assets, out, close, volume):
        out[:] = (close[0] * volume[0])

# The initialize function is the place to create your pipeline and set trading
# conditions such as commission and slippage. It is called once at the start of
# the simulation and also where global context variables can be set.
def initialize(context):
    
    # Define global context variables
    context.long_leverage = 0.5
    context.short_leverage = -0.5
    context.lower_percentile = 10
    context.upper_percentile = 90
    context.returns_lookback = 5
    
    fetch_csv('https://dl.dropboxusercontent.com/u/78860126/Workbook1.csv',
        date_column='date',
        date_format='%Y-%m-%d')
           
    # Rebalance every Monday (or the first trading day if it's a holiday).
    # At 11AM ET, which is 1 hour and 30 minutes after market open.
    schedule_function(rebalance, 
                      date_rules.week_start(days_offset=0),
                      time_rules.market_open(hours = 1, minutes = 30)) 
    
    # Create, register and name a pipeline. 
    pipe = Pipeline()
    attach_pipeline(pipe, 'mean_reversion_example')
    
    # Create a recent_returns factor with a 5-day returns lookback.
    recent_returns = RecentReturns(window_length=context.returns_lookback)
    pipe.add(recent_returns, 'recent_returns')
    
    # Create a dollar_volume factor using default inputs and window_length.
    dollar_volume = DollarVolume()
    pipe.add(dollar_volume, 'dollar_volume')
    
    # Define high dollar-volume filter to be the top 2% of stocks by dollar volume.
    high_dollar_volume = dollar_volume.percentile_between(98, 100)
    
    # Define high and low returns filters to be the bottom 10% and top 10% of
    # securities in the high dollar volume group.
    low_returns = recent_returns.percentile_between(0,10,mask=high_dollar_volume)
    high_returns = recent_returns.percentile_between(90,100,mask=high_dollar_volume)
    
    # Factors return a scalar value for each security in the entire universe
    # of securities. Here, we add the recent_returns rank factor to our pipeline
    # and we provide it with a mask such that securities that do not pass the mask
    # (i.e. do not have high dollar volume), are not considered in the ranking.
    pipe.add(recent_returns.rank(mask=high_dollar_volume), 'recent_returns_rank')
    
    # Add a filter to the pipeline such that only high-return and low-return
    # securities are kept.
    pipe.set_screen(low_returns | high_returns)
    
    # Add the low_returns and high_returns filters as columns to the pipeline so
    # that when we refer to securities remaining in our pipeline later, we know
    # which ones belong to which category.
    pipe.add(low_returns, 'low_returns')
    pipe.add(high_returns, 'high_returns')
    

# Called every day before market open. This is where we access the securities
# that made it through the pipeline.
def before_trading_start(context, data):
    
    # Pipeline_output returns the constructed dataframe.
    context.output = pipeline_output('mean_reversion_example')
    
    # Sets the list of securities we want to long as the securities with a 'True'
    # value in the low_returns column.
    context.long_secs = context.output[context.output['low_returns']]
    
    # Sets the list of securities we want to short as the securities with a 'True'
    # value in the high_returns column.
    context.short_secs = context.output[context.output['high_returns']]
    
    # Update our universe to contain the securities that we would like to long and short.
    update_universe(context.long_secs.index.union(context.short_secs.index))
    
# This rebalancing is called according to our schedule_function settings.     
def rebalance(context,data):
    
    for sid in data:
        if 'meta' in data[sid] and sid in context.long_secs.index:
            print 'Fetcher sid in Pipeline output:' + str(sid)
            print ''
    
    # Get any stocks we ordered last time that still have open orders.
    open_orders = get_open_orders()
        
    # Set the allocations to even weights in each portfolio.
    long_weight = context.long_leverage / (len(context.long_secs) + len(open_orders)/2)
    short_weight = context.short_leverage / (len(context.short_secs) + len(open_orders)/2)
    
    # For each security in our universe, order long or short positions according
    # to our context.long_secs and context.short_secs lists, and sell all previously
    # held positions not in either list.
    for stock in data:
        # Guard against ordering too much of a given security if a previous order
        # is still unfilled.
        if stock not in open_orders:
            if stock in context.long_secs.index:
                order_target_percent(stock, long_weight)
            elif stock in context.short_secs.index:
                order_target_percent(stock, short_weight)
            else:
                order_target_percent(stock, 0)
    
    # Log the long and short orders each week.
    log.info("This week's longs: "+", ".join([long_.symbol for long_ in context.long_secs.index]))
    log.info("This week's shorts: "  +", ".join([short_.symbol for short_ in context.short_secs.index]))
        
        
# The handle_data function is run every bar.    
def handle_data(context,data):
    
    # Record and plot the leverage of our portfolio over time. Even in minute
    # mode, only the end-of-day leverage is plotted.
    record(leverage = context.account.leverage)

    # We also want to monitor the number of long and short positions 
    # in our portfolio over time. This loop will check our positition sizes 
    # and add the count of longs and shorts to our plot.
    longs = shorts = 0
    for position in context.portfolio.positions.itervalues():
        if position.amount > 0:
            longs += 1
        if position.amount < 0:
            shorts += 1
    record(long_count=longs, short_count=shorts)
We have migrated this algorithm to work with a new version of the Quantopian API. The code is different than the original version, but the investment rationale of the algorithm has not changed. We've put everything you need to know here on one page.
There was a runtime error.
Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Can one use fether data in pipeline? If so, how would one approach?

My thinking is I would use fetcher to pull in a dataset, create a multi-index dataframe (the data has multiple dimensions), and then use the dataframe as a an input to a custom factor, and then add to pipeline - would that work?

I think I could use the proposed notebook attached, but running into an issue with data.current in Research, saying 'data' is not defined. How would one reference fetched data from within a customfactor's input? What would be the proper syntax?

Loading notebook preview...
Notebook previews are currently unavailable.

I am think I am getting closer, but now get the following error: TypeError: zipline.pipeline.pipeline.add() expected a value of type zipline.pipeline.term.Term for argument 'term', but got abc.ABCMeta instead in research.

How do I adjust a local csv data file, so pipeline can add to it's index? What elements are needed?

Loading notebook preview...
Notebook previews are currently unavailable.