Back to Community
Merging Tables Into Pipeline

Hi,

I'm having difficulty merging a table of industry averages that I calculated into the Pipeline dataframe and retaining the structure of the original dataframe so that it can be picked up by the order function later in the algorithm. The below code is where I am running into issues. Note that I reset the index of the pipeline dataframe so that I can merge my industry average table into it. I think that the issue I am having is recreating the multi-index that the order function can action. This gets slightly more confusing because when I log.info(top_outlook) it appears like the output is in the correct format.

Any help here is really appreciated. Please let me know if you need any additional info.

Thanks!
Stefan

   context.output = pipeline_output('my_pipeline')  
    #log.info(context.output)  
    mean_val = calc_means(context, data)  
    #log.info(context.output)  
    #context.output.head()  
    curr_df = context.output.reset_index()  
    df = curr_df.merge(mean_val, on = 'Sector Code')  
    #log.info(df)  
    df.head()

    df = df[np.isfinite(df['Gross Margin'])]  
    #df = df.set_index(['level_0','level_1']) #for some reason this kicks out an error but when I run it in research works fine  
    df['outlook'] = ((df['Gross Margin'] > df['Ind MEAN GM']).astype(int)*1)  
    top_outlook = df.sort_values(['outlook'], ascending=False).head(20)  
    log.info(top_outlook)  
    # These are the securities that we are interested in trading each day.  
    context.security_list = top_outlook.index  
Clone Algorithm
3
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
"""
This is a template algorithm on Quantopian for you to adapt and fill in.
"""
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import AverageDollarVolume
from quantopian.pipeline.filters.morningstar import Q500US
from quantopian.pipeline.data import morningstar
import numpy as np
 
def initialize(context):
    """
    Called once at the start of the algorithm.
    """   
    # Rebalance every day, 1 hour after market open.
    schedule_function(my_rebalance, date_rules.every_day(), time_rules.market_open(hours=1))
     
    # Record tracking variables at the end of each day.
    schedule_function(my_record_vars, date_rules.every_day(), time_rules.market_close())
     
    # Create our dynamic stock selector.
    attach_pipeline(make_pipeline(), 'my_pipeline')

def make_pipeline():
    
    Q500 = Q500US()
    Energy = morningstar.asset_classification.morningstar_sector_code.latest != 309.0 
    Mining = morningstar.asset_classification.morningstar_sector_code.latest != 101.0
    
    sector = morningstar.asset_classification.morningstar_sector_code.latest
    
    tradeable_stocks = (
         Q500
         & Energy
         & Mining)
    
    gross_margin = morningstar.operation_ratios.gross_margin.latest
    
    return Pipeline(
        columns={
            'Sector Code': sector,
            'Gross Margin': gross_margin
        }, screen = tradeable_stocks)
 
def before_trading_start(context, data):
    """
    Called every day before market open.
    """
    context.output = pipeline_output('my_pipeline')
    
    #log.info(context.output)
    
    mean_val = calc_means(context, data)
    
    #log.info(context.output)
    
    context.output.head()
    
    curr_df = context.output.reset_index()
    
    df = curr_df.merge(mean_val, on = 'Sector Code')
    #log.info(df)
    df.head()

    df = df[np.isfinite(df['Gross Margin'])]
    #df = df.set_index(['level_0','level_1']) #for some reason this kicks out an error but when I run it in research works fine
    
    df['outlook'] = ((df['Gross Margin'] > df['Ind MEAN GM']).astype(int)*1)
    
    top_outlook = df.sort_values(['outlook'], ascending=False).head(20)
    log.info(top_outlook)
  
    # These are the securities that we are interested in trading each day.
    context.security_list = top_outlook.index
    
def calc_means(context, data):
    
    mean_start = pipeline_output('my_pipeline')
    mean_val = mean_start.groupby('Sector Code').mean()
    mean_val.reset_index(inplace=True)
    mean_val = mean_val.rename(index=str, columns = {"Gross Margin":"Ind MEAN GM"})
    
    return mean_val

     
def my_assign_weights(context, data):
    """
    Assign weights to securities that we want to order.
    """
    pass
 
def my_rebalance(context,data):
    """
    Execute orders according to our schedule_function() timing. 
    """
    for stock in context.security_list:
        log.info(stock)
        #order_target_percent(stock,1.0)
    
    pass
 
def my_record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass
 
def handle_data(context,data):
    """
    Called every minute.
    """
    pass

There was a runtime error.
4 responses

A few things...

First, typically don't reset the index of the pipeline in an algorithm. The index is the list of securities. The reason you maybe did this in a notebook is because the pipeline dataframe returned there has a multi index (indexed by security AND day). In an algorithm the index is a simple index (indexed by security only). An easy way to make a simple index dataframe within a notebook and make it look like an algorithm dataframe, is to use the '.xs' method (see http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.xs.html). All in one step. No need to reset indexes etc.

# Run the pipeline across several days  
results = run_pipeline(pipe, '2016-07-08', '2016-08-08')

# Use the '.xs' method to get a single days slice of the dataframe (in this case '2016-07-08')  
# This new dataframe looks like the one returned when using pipeline in algorithms  
single_day_results = results.xs('2016-07-08')

Second, I'd use the '.groupby' and '.apply' methods to calculate the means. This does the job in a single line (and a small function). Everyone has their preference, and this is just one approach.

context.output = pipeline_output('my_pipeline')

def _sector_mean(grp):  
    #  Helper function to get the sector means - used below  
    grp['sector_mean'] = grp['gross_margin'].mean()  
    return grp  


# The following adds a column called 'sector_mean' to the pipe output  
context.output = context.output.groupby('sector_code').apply(_sector_mean)  

Finally, while it's often helpful to add columns to an existing pipe output, in this specific case it's probably not needed. Take a look at the '.demean' method (see the docs https://www.quantopian.com/help#quantopian_pipeline_factors_Factor ). Used in conjunction with the 'groupby' parameter this gets you, I believe, exactly what you want all within the pipeline definition.

def make_pipeline():  
    Q500 = Q500US()  
    Energy = morningstar.asset_classification.morningstar_sector_code.latest != 309.0  
    Mining = morningstar.asset_classification.morningstar_sector_code.latest != 101.0  
    sector = morningstar.asset_classification.morningstar_sector_code.latest  
    tradeable_stocks = (  
         Q500  
         & Energy  
         & Mining)  
    gross_margin = morningstar.operation_ratios.gross_margin.latest  
    gross_margin_demeaned = gross_margin.demean(groupby = sector)  
    return Pipeline(  
        columns={  
            'sector_code': sector,  
            'gross_margin': gross_margin,  
            'gross_margin_demeaned': gross_margin_demeaned,  
            },  
        screen = tradeable_stocks)  

Then, to get the stocks where the gross margin is greater than the sector mean, all you need to do is check for 'gross_margin_demeaned >0'.

    # Below is how to get the securities which are greater than the mean for it's sector  
    # using the pipeline method 'demean' and not needing to calculate and add  
    # the 'sector_mean' column  
    context.security_list_demean = (context.output  
                             .query('gross_margin_demeaned > 0')  
                             .sort_values('gross_margin', ascending=False)  
                             .head(20)  
                             .index  
                             )  

Attached is an algorithm which includes both methods and logs the results. Note the two logs are the same.

Clone Algorithm
2
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
"""
This is a template algorithm on Quantopian for you to adapt and fill in.
"""
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline, CustomFactor
from quantopian.pipeline.data.builtin import USEquityPricing
from quantopian.pipeline.factors import AverageDollarVolume
from quantopian.pipeline.filters.morningstar import Q500US
from quantopian.pipeline.data import morningstar
import numpy as np
 
def initialize(context):
    """
    Called once at the start of the algorithm.
    """   
    # Rebalance every day, 1 hour after market open.
    schedule_function(my_rebalance, date_rules.every_day(), time_rules.market_open(hours=1))
     
    # Record tracking variables at the end of each day.
    schedule_function(my_record_vars, date_rules.every_day(), time_rules.market_close())
     
    # Create our dynamic stock selector.
    attach_pipeline(make_pipeline(), 'my_pipeline')

def make_pipeline():
    
    Q500 = Q500US()
    Energy = morningstar.asset_classification.morningstar_sector_code.latest != 309.0 
    Mining = morningstar.asset_classification.morningstar_sector_code.latest != 101.0
    
    sector = morningstar.asset_classification.morningstar_sector_code.latest
    
    tradeable_stocks = (
         Q500
         & Energy
         & Mining)
    
    gross_margin = morningstar.operation_ratios.gross_margin.latest
    gross_margin_demeaned = gross_margin.demean(groupby = sector)
    
    return Pipeline(
        columns={
            'sector_code': sector,
            'gross_margin': gross_margin,
            'gross_margin_demeaned': gross_margin_demeaned,
            }, 
        screen = tradeable_stocks)
 
    
def before_trading_start(context, data):
    """
    Called every day before market open.
    """
    
    def _sector_mean(grp):
        #  Helper function to get the sector means - used below
        grp['sector_mean'] = grp['gross_margin'].mean()
        return grp
    
    
    context.output = pipeline_output('my_pipeline')

    # The following adds a column called 'sector_mean' to the pipe output
    context.output = context.output.groupby('sector_code').apply(_sector_mean)
    
    # Get the 20 securities with the highest gross margin 
    # where the gross margin is greater than the mean
    # These are the securities that we are interested in trading each day.

    context.security_list = (context.output
                             .query('gross_margin > sector_mean')
                             .sort_values('gross_margin', ascending=False)
                             .head(20)
                             .index
                             )
    
    # Below is how to get the securities which are greater than the mean for it's sector
    # using the pipeline method 'demean' and not needing to calculate and add
    # the 'sector_mean' column
    context.security_list_demean = (context.output
                             .query('gross_margin_demeaned > 0')
                             .sort_values('gross_margin', ascending=False)
                             .head(20)
                             .index
                             )
    
    # Print both lists out to see that they are the same
    log.info(context.security_list)
    log.info(context.security_list_demean)
     
        
def my_assign_weights(context, data):
    """
    Assign weights to securities that we want to order.
    """
    pass
 
def my_rebalance(context,data):
    """
    Execute orders according to our schedule_function() timing. 
    """
    for stock in context.security_list:
        log.info(stock)
        #order_target_percent(stock,1.0)
    
    pass
 
def my_record_vars(context, data):
    """
    Plot variables at the end of each day.
    """
    pass
 
def handle_data(context,data):
    """
    Called every minute.
    """
    pass

There was a runtime error.

Dan - Thank you so much for the help here, I didn't even realize that those built in factor calculations existed. This will help me out a ton.

-Stefan

Hi Dan - One more question; say I want to evaluate multiple rows/criteria, initially my 'outlook' framework that I was using would be well suited for this because I could just add Booleans and the corresponding weight. Is there a way to evaluate multiple Booleans with the .query() method?

@Stefan

The '.query' method is pretty powerful, so yes, you can probably do a lot of what you want within the query statement. Take a look here for the documentation http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.query.html . Here's a couple of the highlights...

Compare pipeline columns to other columns or to constants. Variables can also be used. Put a @ character in front of any variable names. Note that it's easiest if column names do not contain spaces. Use underscore instead.

results = context.output.query('latest_price > 5')  
results = context.output.query('latest_price > week_ago_price')  
results = context.output.query('latest_price > @my_variable')  

Any of the standard Python logical expressions (eg and, or, not etc) or arithmetic expressions (eg +, -, ^, %, // etc) can be used. Use parenthesis if in doubt about precedence. Here's a good list of operators and there precedence https://docs.python.org/2/reference/expressions.html#grammar-token-expression_list.

results = context.output.query('latest_price > 5 or week_ago_price < 10')  
results = context.output.query('(buy == True) and (week_ago_price < 10)')

Since python treats True as an integer 1 and False as an integer 0, one can create weights based upon boolean factors.

strong_buys  = context.output.query('(buy * 2 + buy_now * 5 ) > 5')

One can also query the index using the name 'index'

spy = symbol('SPY')  
high_or_spy = context.output.query('(price > 100) or (index == @spy)')

Also, the query is just a string. For readability, or to perhaps build up rules from small chucks, the rules can be placed into a string variable.

buy_rules = 'price > low_price'  
buys  = context.output.query(buy_rules)

The .query method is pretty powerful. One caution is that any errors it generates are sometimes vague or non-existent. You will usually get pointed to the '.query' line number but will need to figure out what part of the query caused the error.