Back to Community
Custom Factor Calculation Over-Iterating! Help!

Hi guys,

I am trying to create an algorithm that takes in multiple fundamental indicators and creates a weighted outlook for each stock based on those indicators. The problem is, when I try to add more than three indicators, I get hit with an error that says I use too much memory.

Doing some debugging in the attached algorithm, you can see that the algorithm is calculating the 'outlook' for every stock in the universe multiple times and hosing Quantopians memory, but it only needs to calculate out the array once. I'm probably missing something here, can someone help me fix this problem?

Thanks for the help!

Clone Algorithm
3
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import SimpleMovingAverage, AverageDollarVolume
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline.filters import Q500US



def initialize(context):
    # Schedule our rebalance function to run at the start of each week.
    schedule_function(my_rebalance, date_rules.month_start(), time_rules.market_open(hours=1))

    # Record variables at the end of each day.
    schedule_function(my_record_vars, date_rules.every_day(), time_rules.market_close())
    
    # Create our pipeline and attach it to our algorithm.
    my_pipe = make_pipeline()
    attach_pipeline(my_pipe, 'my_pipeline')

    
class outlook_calc(CustomFactor):
    
    log.info('running outlook custom factor')
    
    inputs = [ morningstar.income_statement.net_income,
              morningstar.income_statement.total_revenue,
              ]
    
    def compute(self, today, assets, out, net_income, total_revenue):
        
        log.info('running outlook compute')
        
        out[:] = (
                 (((net_income[-1] - net_income[-365]) > 0).astype(int)*0.5)
                  + (((total_revenue[-1] - total_revenue[-365]) > 0).astype(int)*0.5)
                 )
        
        log.info(out[:])

def make_pipeline():
    
    log.info('running pipeline')
   
    #standard filters
    primary_share = IsPrimaryShare()
    common_stock = morningstar.share_class_reference.security_type.latest.eq('ST00000001')
    not_depositary = ~morningstar.share_class_reference.is_depositary_receipt.latest
    not_otc = ~morningstar.share_class_reference.exchange_id.latest.startswith('OTC')
    not_wi = ~morningstar.share_class_reference.symbol.latest.endswith('.WI')
    not_lp_name = ~morningstar.company_reference.standard_name.latest.matches('.* L[. ]?P.?$')
    not_lp_balance_sheet = morningstar.balance_sheet.limited_partnership.latest.isnull()
    have_market_cap = morningstar.valuation.market_cap.latest.notnull()
    cap_threshold = morningstar.valuation.market_cap.latest > 5000000000
    
    Q500 = Q500US()
    
    tradeable_stocks = (
        primary_share
        & common_stock
        & not_depositary
        & not_otc
        & not_wi
        & not_lp_name
        & not_lp_balance_sheet
        & have_market_cap
        & cap_threshold
        & Q500
    ) 
    
    outlook = outlook_calc(window_length = 365, mask = tradeable_stocks)

    top_outlook = outlook.percentile_between(99.9,100)
    
    pipeline_stocks = (
        tradeable_stocks
        & top_outlook
    )

    
    return Pipeline(
        columns={

            'Outlook': outlook,
            
        }, screen = pipeline_stocks)



def my_compute_weights(context):
    pass

def before_trading_start(context, data):
    context.output = pipeline_output('my_pipeline')
    log.info(context.output)

def my_rebalance(context, data):
    pass
    
def my_record_vars(context, data):
    """
    Record variables at the end of each day.
    """
    stocks = 0
    for position in context.portfolio.positions.itervalues():
        if position.amount > 0:
            stocks += 1

    # Record our variables.
    record(leverage=context.account.leverage, stock_count=stocks)
There was a runtime error.
7 responses

Your algorithm runs ok for me? No memory errors. Your 'outlook' custom factor is implemented correctly and shouldn't 'run multiple times' and 'hose Quantopians memory' as you say. You may have an issue if any of the fields are ever NaN? You also my want to change the indexing from -365 to 0 ( eg 'total_revenue[-365]' to 'total_revenue[0]'). That will make it work with any window_length you choose.

As a side note... if you want to look back a year then use a window_length of 250 and not 365. The data is in trading days and not calendar days.

Good luck

Thanks for the reply Dan, really appreciate it. With only two fundamental indicators, the algorithm appears to run ok, but when I go above three I start running into issues. If you take a look at the logs in the first backtest above, the outlook function generates an outlook array multiple times in a single minute which is redundant. In the attached backtest it just dies. Any thoughts there?

Also - can you elaborate more on changing the window length from -365 to 0? I thought by selecting 365 I was telling it to go back a year, is there another way to do this?

P.S - thanks for the window length tip, will definitely implement :)

Clone Algorithm
3
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import SimpleMovingAverage, AverageDollarVolume
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline.filters import Q500US



def initialize(context):
    # Schedule our rebalance function to run at the start of each week.
    schedule_function(my_rebalance, date_rules.month_start(), time_rules.market_open(hours=1))

    # Record variables at the end of each day.
    schedule_function(my_record_vars, date_rules.every_day(), time_rules.market_close())
    
    # Create our pipeline and attach it to our algorithm.
    my_pipe = make_pipeline()
    attach_pipeline(my_pipe, 'my_pipeline')

    
class outlook_calc(CustomFactor):
    
    log.info('running outlook custom factor')
    
    inputs = [ morningstar.income_statement.net_income,
              morningstar.income_statement.total_revenue,
              morningstar.valuation_ratios.pb_ratio,
              morningstar.valuation_ratios.pe_ratio,
              ]
    
    
    def compute(self, today, assets, out, net_income, total_revenue, pb_ratio, pe_ratio):
            log.info('running outlook compute')
        
            out[:] = (
                         (((net_income[-1] - net_income[-365]) > 0).astype(int)*0.5)
                          + (((total_revenue[-1] - total_revenue[-365]) > 0).astype(int)*0.5)
                +((pb_ratio[-1] < 5).astype(int)*0.25)
                +((pe_ratio[-1] > 10).astype(int)*0.5)
                         )
                        
        
            log.info(out[:])
            

def make_pipeline():
    
    log.info('running pipeline')
   
    #standard filters
    primary_share = IsPrimaryShare()
    common_stock = morningstar.share_class_reference.security_type.latest.eq('ST00000001')
    not_depositary = ~morningstar.share_class_reference.is_depositary_receipt.latest
    not_otc = ~morningstar.share_class_reference.exchange_id.latest.startswith('OTC')
    not_wi = ~morningstar.share_class_reference.symbol.latest.endswith('.WI')
    not_lp_name = ~morningstar.company_reference.standard_name.latest.matches('.* L[. ]?P.?$')
    not_lp_balance_sheet = morningstar.balance_sheet.limited_partnership.latest.isnull()
    have_market_cap = morningstar.valuation.market_cap.latest.notnull()
    cap_threshold = morningstar.valuation.market_cap.latest > 5000000000
    
    Q500 = Q500US()
    
    tradeable_stocks = (
        primary_share
        & common_stock
        & not_depositary
        & not_otc
        & not_wi
        & not_lp_name
        & not_lp_balance_sheet
        & have_market_cap
        & cap_threshold
        & Q500
    ) 
    
    outlook = outlook_calc(window_length = 365, mask = tradeable_stocks)

    top_outlook = outlook.percentile_between(99.9,100)
    
    pipeline_stocks = (
        tradeable_stocks
        & top_outlook
    )

    
    return Pipeline(
        columns={

            'Outlook': outlook,
            
        }, screen = pipeline_stocks)



def my_compute_weights(context):
    pass

def before_trading_start(context, data):
    context.output = pipeline_output('my_pipeline')
    log.info(context.output)

def my_rebalance(context, data):
    pass
    
def my_record_vars(context, data):
    """
    Record variables at the end of each day.
    """
    stocks = 0
    for position in context.portfolio.positions.itervalues():
        if position.amount > 0:
            stocks += 1

    # Record our variables.
    record(leverage=context.account.leverage, stock_count=stocks)
There was a runtime error.

Stefan,

First, about your concern that the pipeline 'generates an outlook array multiple times in a single minute which is redundant'. Actually, what you are seeing is the pre-fetch asynchronous optimization at work. That's normal and it really isn't doing any more work than it needs to. The pipeline is designed to pre-fetch and pre-calculate the data for efficiency.

Specifically,

When you run a Pipeline that uses a 90-day trailing window, we don't
just query the last 90 days every day, since that would be glacially
slow because you'd spend almost all of your backtest time doing
disk/network IO. Instead, what we do is load all the data we think
we'll need to run your pipeline for the next ~252 days (the number of
trading days in a year). We use that cache to pre-compute your entire
pipeline for a year and then feed the results to your backtest on the
dates when it would become available.

Take a look at this post https://www.quantopian.com/posts/introducing-the-pipeline-api. Also, if you really want to get into the details, the pipeline thinks in terms of a 'directed acyclic graph' to optimize the computations ( see https://en.wikipedia.org/wiki/Directed_acyclic_graph).

To see first hand how this works, I've attached your algorithm but added a counter for each time the compute function is called in your custom factor. The counter prints out before trading each day when the pipeline_output is called. The backtest is for a single year (1/4/2011 - 1/4/2012) or 253 trading days . Look at the logs and you will see:

2011-01-04 07:45  PRINT iterations through output_calc: 6  
2011-01-05 07:45  PRINT iterations through output_calc: 6  
2011-01-06 07:45  PRINT iterations through output_calc: 6  
2011-01-07 07:45  PRINT iterations through output_calc: 6  
2011-01-10 07:45  PRINT iterations through output_calc: 6  
2011-01-11 07:45  PRINT iterations through output_calc: 6  
2011-01-12 07:45  PRINT iterations through output_calc: 133  
2011-01-13 07:45  PRINT iterations through output_calc: 133  
...

2012-01-03 07:45  PRINT iterations through output_calc: 253  
2012-01-04 07:45  PRINT iterations through output_calc: 253  

The first 6 days worth of calculations were done on day one. By day 7 the pipeline had calculated 133 days worth of data. It finally ends up going through the compute function 253 times. This is just the number of times needed to get the results for 1/4/2011 - 1/4/2012.

Also... about the memory errors. A couple of things you can do to reduce memory is to reduce the factors. I commented out the following

'''  
#standard filters  
primary_share = IsPrimaryShare()  
common_stock = morningstar.share_class_reference.security_type.latest.eq('ST00000001')  
not_depositary = ~morningstar.share_class_reference.is_depositary_receipt.latest  
not_otc = ~morningstar.share_class_reference.exchange_id.latest.startswith('OTC')  
not_wi = ~morningstar.share_class_reference.symbol.latest.endswith('.WI')  
not_lp_name = ~morningstar.company_reference.standard_name.latest.matches('.* L[. ]?P.?$')  
not_lp_balance_sheet = morningstar.balance_sheet.limited_partnership.latest.isnull()  
have_market_cap = morningstar.valuation.market_cap.latest.notnull()  
'''

and it runs now. Those are redundant because they are included in the Q500US filter anyway. Another thing to reduce memory is to reduce the window_length for factors. In your case you maybe need just 252 days which is a typical trading year. The window_length is in trading days not calendar days. See https://en.wikipedia.org/wiki/Trading_day for more info.

Also, I changed

        out[:] = (  
                 (((net_income[-1] - net_income[-365]) > 0).astype(int)*0.5)  
                  + (((total_revenue[-1] - total_revenue[-365]) > 0).astype(int)*0.5)  
                 )  

to

        out[:] = (  
                 (((net_income[-1] - net_income[0]) > 0).astype(int)*0.5)  
                  + (((total_revenue[-1] - total_revenue[0]) > 0).astype(int)*0.5)  
                 )  

Notice the index of 0 (not -365). This way you can set the window_length when instantiating the factor and the compute logic can remain the same. It will just take the first (0) and last (-1) pieces of data.

oops, forgot to attach the algorithm...

Clone Algorithm
0
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.factors import SimpleMovingAverage, AverageDollarVolume, Returns
from quantopian.pipeline.filters.morningstar import IsPrimaryShare
from quantopian.pipeline.filters import Q500US



def initialize(context):
    # Schedule our rebalance function to run at the start of each week.
    schedule_function(my_rebalance, date_rules.month_start(), time_rules.market_open(hours=1))

    # Record variables at the end of each day.
    schedule_function(my_record_vars, date_rules.every_day(), time_rules.market_close())
    
    # Create our pipeline and attach it to our algorithm.
    my_pipe = make_pipeline(context)
    attach_pipeline(my_pipe, 'my_pipeline')

    
class outlook_calc(CustomFactor):
    
    # log.info('running outlook custom factor')
    
    inputs = [ morningstar.income_statement.net_income,
              morningstar.income_statement.total_revenue,
              ]

    def compute(self, today, assets, out, net_income, total_revenue):
        self.iterations += 1
        # print(self.iterations)
        
        out[:] = (
                 (((net_income[-1] - net_income[0]) > 0).astype(int)*0.5)
                  + (((total_revenue[-1] - total_revenue[0]) > 0).astype(int)*0.5)
                 )
        
        #log.info(out[:])

def make_pipeline(context):
    
    log.info('running pipeline')
    '''
    #standard filters
    primary_share = IsPrimaryShare()
    common_stock = morningstar.share_class_reference.security_type.latest.eq('ST00000001')
    not_depositary = ~morningstar.share_class_reference.is_depositary_receipt.latest
    not_otc = ~morningstar.share_class_reference.exchange_id.latest.startswith('OTC')
    not_wi = ~morningstar.share_class_reference.symbol.latest.endswith('.WI')
    not_lp_name = ~morningstar.company_reference.standard_name.latest.matches('.* L[. ]?P.?$')
    not_lp_balance_sheet = morningstar.balance_sheet.limited_partnership.latest.isnull()
    have_market_cap = morningstar.valuation.market_cap.latest.notnull()
    '''
    cap_threshold = morningstar.valuation.market_cap.latest > 5000000000
    
    Q500 = Q500US()
    
    tradeable_stocks = (cap_threshold & Q500) 
    
    outlook = outlook_calc(window_length = 365, mask=tradeable_stocks )
    outlook.iterations = 0
    context.outlook = outlook
    
    # top_outlook = outlook.percentile_between(99.9,100)
    
    # pipeline_stocks = (tradeable_stocks & top_outlook)

    
    return Pipeline(
        columns={

            'Outlook': outlook,
            
        }, # screen = pipeline_stocks
    )



def my_compute_weights(context):
    pass

def before_trading_start(context, data):
    context.output = pipeline_output('my_pipeline')
    print ("iterations through output_calc: {}").format(context.outlook.iterations)
    
    # log.info(context.output)

def my_rebalance(context, data):
    pass
    
def my_record_vars(context, data):
    """
    Record variables at the end of each day.
    """
    stocks = 0
    for position in context.portfolio.positions.itervalues():
        if position.amount > 0:
            stocks += 1

    # Record our variables.
    record(leverage=context.account.leverage, stock_count=stocks)
There was a runtime error.

Thanks Dan, it seems to be working now! This is an awesome explanation that admittedly I will need to take some time to thoroughly parse through as I am still a novice coder. One thing that I am not sure I entirely understand is that when instantiating, the factor knows to replace the [0] with the window length and not the [-1], can you explain that a little bit more for me?

Thanks again for all of the help here, i've been stuck on this problem for a while and I am really looking forward to progressing the algorithm further.

Stefan,

The factor doesn't 'know' to replace [0] with the window length. This is just python and numpy indexing syntax.

'net_income' is a numpy array which is passed to the 'compute' function. There is one row for each day of data you request (ie the window_length) and one column for each security in the Quantopian dataset. If one sets a window length of 252, the 'net_income' array will have 252 rows and about 8000 columns (that's about how many securities are in the Quantopian dataset). The dates are sorted ascending meaning smallest to largest. The first row has the oldest data (252 trading days ago in this case) and the last row in the array has the latest (yesterday's) data. Python uses zero base indexing, so 'net_income[0,0] will return a single value which is the net income for the oldest date for the first security. Python can also use negative indexing which simply counts from the end. 'net_income[-1,-1] will return the net income for the latest date for the last security. Maybe look at some of the many online docs for more info on ways to index arrays (I found this to be as good as any http://www.sam.math.ethz.ch/~raoulb/teaching/PythonTutorial/intro_numpy.html)

Not sure that you appreciated it when writing your code but by using the syntax 'net_income[-1]' one is not fetching a single value but a 1D array of values. In this case the whole last row of data. This array is the latest net_income for each security. 'net_income[0]' returns an array with the first or earliest date. 'net_income[0]' is just another way of writing 'net_income[-252]' (since you can use either positive or negative indexing). The benefit of using 0 is that it always gets 'the first' row and you then don't need to know how many rows of data there are. By the way, the explicit way to get the whole first row of data would be 'net_income[0, : ]' (that explicitly gets row 0 and all the columns but is less easy on the eyes).

The statement 'net_income[-1] - net_income[0]' is actually finding the difference between the last and the first value of net_income for EVERY security in one simple statement. It's performing the math on the two 1D arrays and returning a third 1D array (that's the wonderfulness of python).

Hope that clears it up a little?

Dan - that makes perfect sense. Thanks for the super clear explanation, I learned a lot from reading this and will definitely check out the link!