Back to Community
How to speed up pipeline calculations

Hello,

In the interest of getting up to speed with the new pipeline API I've attempted to implement my own version of the Piotroski score which I've attached. I've wrapped the CustomFactor around some code that rebalances yearly, and I believe the code to get the pipeline output 'output = pipeline_output('piotroski')' is only being called once. However, based on the logging statements in the compute method of the custom factor I can see that compute is being called many times, which I believe is causing a significant slowdown when executing the algorithm. Could someone please assist with this. Is there a way to speed this algorithm up?

Regards,
Mark

Clone Algorithm
20
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import SimpleMovingAverage
from quantopian.pipeline.data import morningstar
from quantopian.pipeline import CustomFactor
import numpy as np
import pandas as pd


class Piotroski(CustomFactor):
    
    inputs = [morningstar.operation_ratios.roa,
              morningstar.cash_flow_statement.operating_cash_flow,
              morningstar.operation_ratios.long_term_debt_equity_ratio,
              morningstar.operation_ratios.gross_margin,
              morningstar.operation_ratios.current_ratio,
              morningstar.valuation.shares_outstanding,
              morningstar.operation_ratios.assets_turnover,
              morningstar.income_statement.net_income]
    
    window_length = 200 #context.days_in_year    
    
    def compute(self, today, assets, out, roa, ocf, lt_debt_equity, gross_margin, current_ratio, shares_outstanding, assets_turnover,net_income):
        log.info("in compute")
        p_score = [0] * len(out)
        if self.window_length > 1:
            #****************************************
            #Profitability
            #****************************************
            #1 net income
            temp = np.where(net_income[0] > 0.0,1,0)
            p_score = p_score + temp
            
            #2 operating cash flow
            temp = np.where(ocf[0] > 0.0,1,0)
            p_score = p_score + temp
            
            #3 return on assets
            temp = roa[0] - roa[-1]
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp
            
            #4 quality of earnings
            temp = ocf[0] - net_income[0] 
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp
            
            #****************************************
            #Leverage Liquidity Source of Funds
            #****************************************
            
            #5 decrease in leverage
            temp = lt_debt_equity[-1] - lt_debt_equity[0]
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp
            
            #6 increase in liquidity
            temp = current_ratio[0] - current_ratio[-1]
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp
            
            #7 absence of dilution
            temp = lt_debt_equity[-1] - lt_debt_equity[0]
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp
            
            #****************************************
            #Operating Efficiency
            #****************************************
            
            #8 gross margin
            temp = gross_margin[0] - gross_margin[-1]
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp
            
            #9 asset turnover
            temp = assets_turnover[0] - assets_turnover[-1]
            temp = np.where(temp>0,1,0)
            p_score = p_score + temp

        out[:] = p_score

        
def initialize(context):
    context.days_in_year = 200
    context.lower_mktcap = 500e6
    context.limit = 500
    context.shy = symbol('IEF')
    context.num_positions = 30
    context.base_weight = 1.0/context.num_positions
    context.rebalance_month = 6 #June
    context.is_rebalance_month = False
    context.rebalance_day = 0
    
    # Create and attach an empty Pipeline.
    pipe = Pipeline()
    pipe = attach_pipeline(pipe, name='piotroski')

    # Construct Factors.
    p_score = Piotroski()
    pipe.add(p_score,'p_score')
    pipe.set_screen(p_score > 5)
    
    #schedule rebalance day and sell and buy dates
    #separate sell and buy dates by 1 week to allow sell orders to clear before executing buys
    schedule_function(set_rebalance_day,date_rules.month_start(days_offset=10),
                      time_rules.market_close(minutes=5)) 
    schedule_function(rebalance_sell,date_rules.month_start(days_offset=11),
                      time_rules.market_open(minutes=120)) 
    schedule_function(rebalance_buy,date_rules.month_start(days_offset=16),
                      time_rules.market_open(minutes=120))    

    
def rebalance_sell(context,data):
    if context.is_rebalance_month :
        for s in context.portfolio.positions:
            order_target_percent(s,0)

        
def rebalance_buy(context,data):
    if context.is_rebalance_month :
        for s in context.longs:
            if s in data:
                order_target_percent(s,context.base_weight)

        context.is_rebalance_month = False

                      
def set_rebalance_day(context,data):
    today = get_datetime()
    
    if today.month == context.rebalance_month:
        context.is_rebalance_month = True
        context.rebalance_day = today.day
    else:
        context.is_rebalance_month = False

                      
def before_trading_start(context, data):
    today = get_datetime()
    if context.is_rebalance_month and today.day-1 == context.rebalance_day:
        
        #get stocks using traditional sql query
        getstocks(context)

        # Pipeline_output returns the constructed dataframe.
        output = pipeline_output('piotroski')

        context.longs=[]
        context.p_score_stocks = output.sort('p_score', ascending=False)
        
        #check if selected stocks in p_score list
        for s in  context.stocks:
            if s in context.p_score_stocks.index:
                context.longs.append(s)
        #context.longs = context.my_universe
        context.longs = context.longs[:context.num_positions]
        #check all stocks in list
        log.info("number of longs: " + str(len(context.longs)))
        update_universe(context.longs)

        
def getstocks(context):     
    sector_code = fundamentals.asset_classification.morningstar_sector_code
    fundamental_df = get_fundamentals(
        query(
            sector_code,
            fundamentals.valuation.market_cap,
            fundamentals.balance_sheet.total_assets,  
        )
        .filter(fundamentals.share_class_reference.is_primary_share == True)
        .filter(fundamentals.company_reference.primary_exchange_id != "OTCPK")
        .filter(fundamentals.share_class_reference.is_depositary_receipt == False)
        .filter(fundamentals.valuation.market_cap > context.lower_mktcap ) 
        .filter(fundamentals.valuation_ratios.ev_to_ebitda > 0)
        .order_by(fundamentals.valuation.market_cap.asc())
        .limit(context.limit)
    )
    
    context.stocks = fundamental_df.columns[:context.num_positions]
    

def handle_data(context, data):
    record(leverage = context.account.leverage)
There was a runtime error.
6 responses

@Mark

Your compute function is always run for every day of your backtest. The idea here is that each day you receive a trailing window of data ending on that day, and the responsibility of your compute is to produce one value per asset each time it's called.

Looking at your compute, I doubt that's a significant bottleneck for performance. The more likely issue is that you're loading 200 trailing rows of fundamentals data for every field you're using, but you only need a single row for many of them. I'd suggest trying to pull out the expressions that need long trailing windows into separate factors, computing that score separately, and then adding them together.

In the longer term, your example makes me think that we need a way to take a Filter and convert it back into a Factor by setting True to 1 and False to 0. This would let you write something like:

piotroski_net_income = (morningstar.income_statement.net_income.latest > 0).as_factor()  
piotroski_cash_flow = (morningstar.cash_flow_statement.operating_cash_flow.latest > 0).as_factor()  
...

piotroski_score = (piotroski_net_income + piotroski_cash_flow + ...)  
Disclaimer

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

Separate note: I think you have the date convention for the trailing windows backwards. The first row is always the oldest row, and the last row is always the newest. This means, for example, that "roa this year - roa last year" should be roa[-1] - roa[0], which is the inverse of what you have listed I think.

I put together a short notebook that shows how I'd go about computing Piotrosky Score. It's still a fair bit slower than I'd like it to be, but the bottleneck at this point is almost entirely network IO with our database.

The bad news here is that there isn't much that you can do as a user to speed this up.
The good news is that this should get faster for free over time as we figure out how to optimize the fundamentals database for pipeline usage patterns.

Hope this is helpful,
-Scott

Loading notebook preview...
Notebook previews are currently unavailable.

Hi Scott,

thanks a lot for taking a look at this. Your comments are very helpful. I've updated my implementation with your code if anyone is interested.

Regards,
Mark

Clone Algorithm
20
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import SimpleMovingAverage
from quantopian.pipeline.data import morningstar as m
from sqlalchemy import or_
from quantopian.pipeline import CustomFactor
import numpy as np
import pandas as pd

TRADING_DAYS_PER_YEAR = 252
OLDEST = -1
NEWEST = 0


class OperatingCashFlowPositive(CustomFactor):
    """
    Factor returning 1.0 if operating cash flow is currently positive.
    Otherwise returns 0.
    
    'Operating Cash Flow: A better earnings gauge. 
     Score 1 if last year cash flow is positive.'
    """
    inputs = [m.cash_flow_statement.operating_cash_flow]
    window_length = 1
    
    def compute(self, today, assets, out, ocf):
        # See note in NetPositiveIncome.compute for an explanation of why 
        # this assignment is allowed.
        out[:] = (ocf[NEWEST] > 0)
        
        
class NetIncomePositive(CustomFactor):
    """
    Factor returning 1.0 if net_income is currently positive.
    Otherwise returns 0.
    
    'Net Income: Bottom line. 
     Score 1 if last year net income is positive.'
    """
    inputs = [m.income_statement.net_income]
    window_length = 1
    
    def compute(self, today, assets, out, net_income):
        # NOTE:
        # This is actually more complicated than it looks.
        # (net_income > 0) will return a numpy array of dtype `bool`,
        # but `out` is an array of dtype `float64`.
        # 
        # Numpy allows casts from bool to float though, with the result 
        # being that True values are cast to 1.0, and False values are cast to 0.0
        out[:] = (net_income[NEWEST] > 0)

        
class ReturnOnAssetsIncreased(CustomFactor):
    """
    Factor returning 1.0 if this year's ROA exceeds last year's ROA.
    Otherwise returns 0.
    
    'Return On Assets: Measures Profitability.
     Score 1 if last year ROA exceeds prior-year ROA.'
    """
    inputs = [m.operation_ratios.roa]
    window_length = TRADING_DAYS_PER_YEAR
    
    def compute(self, today, assets, out, roa):
        # See note in NetPositiveIncome.compute for an explanation of why 
        # this assignment is allowed.
        out[:] = (roa[NEWEST] > roa[OLDEST])

        
class EarningsQuality(CustomFactor):
    """
    Factor returning 1.0 if operating cash flow exceeds net income.
    Otherwise returns 0.
    
    'Quality of Earnings: Warns of Accounting Tricks. 
     Score 1 if last year operating cash flow exceeds net income.'
    """
    inputs = [m.cash_flow_statement.operating_cash_flow,
              m.income_statement.net_income,
             ]
    window_length = 1
    
    def compute(self, today, assets, out, ocf, net_income):
        # See note in NetPositiveIncome.compute for an explanation of why 
        # this assignment is allowed.
        out[:] = (ocf[NEWEST] > net_income[NEWEST])
        

class LongTermDebtDecreased(CustomFactor):
    """
    Factor returning 1.0 if Long Term Debt to Equity Ratio decreased year over year.
    Otherwise returns 0.
    
    'Long-Term Debt vs. Assets: Is Debt Decreasing? 
     Score 1 if the ratio of long-term debt to assets is down from the year-ago value. 
     (If LTD is zero but assets are increasing, score 1 anyway.)'
    """
    inputs = [m.operation_ratios.long_term_debt_equity_ratio]
    window_length = TRADING_DAYS_PER_YEAR
    
    def compute(self, today, assets, out, debt_to_equity):
        # See note in NetPositiveIncome.compute for an explanation of why 
        # this assignment is allowed.
        out[:] = (debt_to_equity[NEWEST] < debt_to_equity[OLDEST])


class CurrentRatioIncreased(CustomFactor):
    """
    Factor returning 1.0 if Current Ratio increased year over year.
    
    'Current Ratio:  Measures increasing working capital. 
     Score 1 if CR has increased from the prior year.'
    """
    inputs = [m.operation_ratios.current_ratio]
    window_length = TRADING_DAYS_PER_YEAR
    
    def compute(self, today, assets, out, current_ratio):
        # See note in NetPositiveIncome.compute for an explanation of why 
        # this assignment is allowed.
        out[:] = (current_ratio[NEWEST] > current_ratio[OLDEST])


        
class SharesOutstandingNotIncreased(CustomFactor):
    """
    Factor returning 1.0 if shares outstanding decreased or stayed constant.
    
    'Shares Outstanding: A Measure of potential dilution. 
     Score 1 if the number of shares outstanding is no greater than the year-ago figure.'
    """
    inputs = [m.valuation.shares_outstanding]
    window_length = TRADING_DAYS_PER_YEAR
    

    def compute(self, today, assets, out, shares_outstanding):
        # See note in NetPositiveIncome.compute for an explanation of why 
        # this assignment is allowed.
        out[:] = (shares_outstanding[NEWEST] <= shares_outstanding[OLDEST])    

        
class GrossMarginIncreased(CustomFactor):
    """
    Factor returning 1.0 if gross margin increased year over year.
    
    'Gross Margin: A measure of improving competitive position. 
     Score 1 if full-year GM exceeds the prior-year GM.'
    """
    inputs = [m.operation_ratios.gross_margin]
    window_length = TRADING_DAYS_PER_YEAR
    
    def compute(self, today, assets, out, gross_margin):
        out[:] = (gross_margin[NEWEST] > gross_margin[OLDEST])

        
class AssetTurnover(CustomFactor):
    """
    Factor returning 1.0 asset turnover ratio increased year over year.
    
    'Asset Turnover: Measures productivity. 
     Score 1 if the percentage increase in sales exceeds the percentage increase in total assets.'
    """
    inputs = [m.operation_ratios.assets_turnover]
    window_length = TRADING_DAYS_PER_YEAR
    
    def compute(self, today, assets, out, turnover):
        out[:] = (turnover[NEWEST] > turnover[OLDEST])   
 
        
def initialize(context):
    context.p_threshold = 5
    context.lower_mktcap = 500e6
    context.limit = 500
    context.shy = symbol('IEF')
    context.num_positions = 100
    context.base_weight = 1.0/context.num_positions
    context.rebalance_month = 6 #June
    context.is_rebalance_month = False
    context.rebalance_day = 0
    
   # Construct Factors.
    p_score = (
    NetIncomePositive() + 
    OperatingCashFlowPositive() + 
    ReturnOnAssetsIncreased() + 
    EarningsQuality() +
    LongTermDebtDecreased() +
    CurrentRatioIncreased() +
    SharesOutstandingNotIncreased() +
    GrossMarginIncreased() +
    AssetTurnover()
        )

    # Create and attach a Pipeline.
    pipe = Pipeline(columns={'p_score': p_score, 
                         'close': USEquityPricing.close.latest}, 
                screen=p_score > context.p_threshold)
    
    pipe = attach_pipeline(pipe, name='piotroski')
    
    #schedule rebalance day and sell and buy dates
    #separate sell and buy dates by 1 week to allow sell orders to clear before executing buys
    schedule_function(set_rebalance_day,date_rules.month_start(days_offset=10),
                      time_rules.market_close(minutes=5)) 
    schedule_function(rebalance_sell,date_rules.month_start(days_offset=11),
                      time_rules.market_open(minutes=120)) 
    schedule_function(rebalance_buy,date_rules.month_start(days_offset=16),
                      time_rules.market_open(minutes=120))    

    
def rebalance_sell(context,data):
    if context.is_rebalance_month :
        for s in context.portfolio.positions:
            order_target_percent(s,0)

        
def rebalance_buy(context,data):
    if context.is_rebalance_month :
        for s in context.longs:
            if s in data:
                order_target_percent(s,context.base_weight)

        context.is_rebalance_month = False

                      
def set_rebalance_day(context,data):
    today = get_datetime()
    
    if today.month == context.rebalance_month:
        context.is_rebalance_month = True
        context.rebalance_day = today.day
    else:
        context.is_rebalance_month = False

                      
def before_trading_start(context, data):
    today = get_datetime()
    if context.is_rebalance_month and today.day-1 == context.rebalance_day:
        
        #get stocks using traditional sql query
        getstocks(context)

        # Pipeline_output returns the constructed dataframe.
        context.p_score_stocks = pipeline_output('piotroski')

        context.longs=[]
        
        #check if selected stocks in p_score list
        for s in  context.stocks:
            if s in context.p_score_stocks.index:
                context.longs.append(s)
        context.longs = context.longs[:context.num_positions]
        #check all stocks in list
        log.info("number of longs: " + str(len(context.longs)))
        update_universe(context.longs)

        
def getstocks(context):     
    sector_code = fundamentals.asset_classification.morningstar_sector_code
    fundamental_df = get_fundamentals(
        query(
            sector_code,
            fundamentals.valuation.market_cap,
            fundamentals.balance_sheet.total_assets,  
        )
        .filter(fundamentals.share_class_reference.is_primary_share == True)
        .filter(fundamentals.company_reference.primary_exchange_id != "OTCPK")
        .filter(fundamentals.share_class_reference.is_depositary_receipt == False)
        .filter(fundamentals.valuation.market_cap > context.lower_mktcap ) 
        .filter(fundamentals.valuation_ratios.ev_to_ebitda > 0)
        .order_by(fundamentals.valuation.market_cap.asc())
        .limit(context.limit)
    )
    
    context.stocks = fundamental_df.columns[:context.num_positions]
    

def handle_data(context, data):
    record(leverage = context.account.leverage)
There was a runtime error.

Hi Mark,

It could be my confusion however I'm wondering if the comparisons to the previous years' values, e.g. ROA has a bug?

The code defines ...

OLDEST = -1  
NEWEST = 0  

and then awards a point to the Piotroski score if the Return on Assets (ROA) now is greater than ROA a year ago.
However for the code awards a point if ROA a year ago (index 0) is greater than current ROA (-1)

        out[:] = (roa[NEWEST] > roa[OLDEST])

Or am I confused about the indexing of the 'roa' array?

Thanks in advance
Phil

Hi Phil.

thanks for pointing this out. the code above is taken from Scott's notebook. I think you are correct there is a discrepancy, Scott notes that the convention is the newest row is the last row and the oldest is the first so I believe the code should be,

OLDEST = 0  
NEWEST = -1  

Certainly making this change improves the backtest.