Back to Community
Forward filling nans in pipeline custom factors

[Edit 7/2019, there's an issue with this, see messages below]

A way to forward fill nans in pipeline adapted from stackoverflow.

Example:

class Quality(CustomFactor):  
    inputs = [Fundamentals.total_revenue]  
    window_length = 24  
    def compute(self, today, assets, out, total_revenue):  
        total_revenue = nanfill(total_revenue)  
        out[:] = total_revenue

def nanfill(arr):  
    mask = np.isnan(arr)  
    idx  = np.where(~mask,np.arange(mask.shape[1]),0)  
    np.maximum.accumulate(idx,axis=1, out=idx)  
    arr[mask] = arr[np.nonzero(mask)[0], idx[mask]]  
    return arr  
13 responses

Why would Fundamentals.total_revenue require forward filling? Are there companies for which total_revenue is not reported by the company? Or are these errors in the Fundamentals database?

Fundamental reporting is not consistent from all companies.

Counting nans also and logging counts if there are any nans:

def nanfill(arr):  
    nan_num = np.count_nonzero(np.isnan(arr))  
    if nan_num:  
        log.info(nan_num)  
        log.info(str(arr))  
    mask = np.isnan(arr)  
    idx  = np.where(~mask,np.arange(mask.shape[1]),0)  
    np.maximum.accumulate(idx,axis=1, out=idx)  
    arr[mask] = arr[np.nonzero(mask)[0], idx[mask]]  
    if nan_num:  
        log.info(str(arr))  
    return arr  

In my experience in backtests with nans forward filled this way I've seen some improved performance.
Try for example with factors in the Notebook at https://www.quantopian.com/posts/faster-fundamental-data

Very helpful, thank you

@ Blue Seahawk:
your implementation forward fills values from one row to the next and not from one timestamp to the next. This has the unfortunate outcome, that you are forward filling values from other stocks with the same time stamp, not from the same stock with different time stamps.

I am also looking for an implementation but could not find any yet... Does anybody have a correct implementation?

I've been using it and always thought it was right. Can you post an algo to demonstrate? If so, try axis=0 and see what happens then eh? Thanks.

Hi,
I have made a quick pipeline calculating Capex to Cashflows. One time without forward fill, one time with your forward fill implementation. If you scroll through the selections you will see that most of the time the nan value will be replaced with the value above it from another stock...unfortunately.

Or do you see anything wrong in the implementation?

When I try to switch axis=0, then all null values have suddenly the same value - so this might also not be the solution.

Loading notebook preview...
Notebook previews are currently unavailable.

Thanks for the content to work on. I see flaws in your usage but that doesn't mean you aren't onto something to look into, meaning, I think nanfill() is behaving ok normally but my hunch is that it could be improved for extreme cases, I'll explain at the end.

Quite a few points to make:
1. In the custom factor, feed it a good window_length so it has something to forward fill from, rather than 1 for window_length.
2. Inputs to the custom factors shouldn't be .latest. Instead like below.
3. Do nanfill() first thing in compute, before any calculations.
4. Then after nanfill(), can do the calculations using [-1] for example, or [-3:].mean() or whatever, and if the window length is long enough to go back far enough to the point where there is a value, then all nans beyond it to the end should become that value.

Something along this line ...

    class Capex_To_Cashflows(CustomFactor):  
        inputs=[cfs.capital_expenditure, cfs.free_cash_flow]  
        window_length=10  
        def compute(self, today, assets, out, capital_expenditure, free_cash_flow):  
            out[:] = (capital_expenditure[-1] * 4.) / (free_cash_flow[-1] * 4.)  
    class Capex_To_Cashflows_forwardfill(CustomFactor):  
        inputs=[cfs.capital_expenditure, cfs.free_cash_flow]  
        window_length=10  
        def compute(self, today, assets, out, capital_expenditure, free_cash_flow):  
            capital_expenditure = nanfill(capital_expenditure)  
            free_cash_flow      = nanfill(free_cash_flow)  
            out[:] = (capital_expenditure[-1] * 4.) / (free_cash_flow[-1] * 4.)  
            #out[:] = nanfill((capital_expenditure * 4.) / (free_cash_flow * 4.))  

With the change made, as you can see, the default window_lengths are set to 10, however in this case I'm feeding it an override when they are called, where window_length is set to 120, and the outputs seemed to be reasonable the first time I ran that, all different. But there's a fly in the ointment. When I had tried window_length=30, there were two stocks with--not the value of the one before them--but instead with the same value although separated from each other (BCE and BNS). As if, when it fails from no value to forward fill with, it has some information saved and just uses that instead, like in mask or something, I don't know. Then alongside the fly there's an elephant. On rerunning with 120, it was back to the same problem as with 30. Odd. This is a job for a Dan Whitnable or something.

Loading notebook preview...
Notebook previews are currently unavailable.

Thanks for your explanation, makes it a bit clearer for me. But you know what I find interesting. When I clone your posted notebook now and then run all cells with your updated code, I don't get the same feed forward values you got. I get again the value from the stock in the line above...

Funny that we don't get the same results with the same code.

Right, there's inconsistency going on, holy smokes. And on the main issue ...

Yep, your point is valid, glad you took the time to bring that to light.

I can see from your imports Stefan that it must have taken some effort to trim back for the example provided, thanks again, just that I had a bit of trouble wrapping my head around its complexity even so.

Here, simplified further, and in case the IDE debugger might help someone looking into this.

Shows that even with a large window length the problem is sometimes there.

                       Capex_To_Cashflows  Capex_To_Cashflows_filled  
    Equity(755 [BC])             0.525970                   0.525970  
    Equity(766 [BCE])                 NaN                   0.525970  
    Equity(794 [BDX])           -0.332198                  -0.332198  
    2019-07-03 05:45 before_trading_start:46 INFO .  
                        Capex_To_Cashflows  Capex_To_Cashflows_filled  
    Equity(980 [BMY])            -0.172007                  -0.172007  
    Equity(1010 [BNS])                 NaN                  -0.172007  
    Equity(1023 [BOH])           -2.092683                  -2.092683  
    2019-07-03 05:45 before_trading_start:46 INFO .  
                         Capex_To_Cashflows  Capex_To_Cashflows_filled  
    Equity(1385 [CDNS])            -0.08978                  -0.089780  
    Equity(1402 [CEF])                  NaN                   3.365413  
    Equity(1419 [CERN])            -1.77185                  -1.771850  
    2019-07-03 05:45 before_trading_start:46 INFO .  
                           Capex_To_Cashflows  Capex_To_Cashflows_filled  
    Equity(1637 [CMCS_A])           -0.574695                  -0.574695  
    Equity(1655 [CMO])                    NaN                 -55.563744  
    Equity(1665 [CMS])              -4.660550                  -4.660550  
    2019-07-03 05:45 before_trading_start:46 INFO .  
                        Capex_To_Cashflows  Capex_To_Cashflows_filled  
    Equity(1789 [COT])           10.440000                  10.440000  
    Equity(1792 [CP])                  NaN                  10.440000  
    Equity(1795 [CPB])           -0.336283                  -0.336283

    [ ............... ]  

It was something I had copied from stackoverflow.

Clone Algorithm
0
Loading...
Backtest from to with initial capital
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from quantopian.pipeline.data import USEquityPricing
from quantopian.pipeline.filters import QTradableStocksUS
from collections import OrderedDict

import pandas as pd
import numpy as np

from quantopian.pipeline.data import Fundamentals
from quantopian.pipeline.data import morningstar
from quantopian.pipeline import CustomFactor

cfs = morningstar.cash_flow_statement   # or replace with simply Fundamentals with category cash_flow_statement not needed.

class Capex_To_Cashflows(CustomFactor):
    #inputs=[cfs.capital_expenditure.latest,cfs.free_cash_flow.latest]
    inputs=[cfs.capital_expenditure, cfs.free_cash_flow]
    window_length=10
    def compute(self, today, assets, out, capital_expenditure, free_cash_flow):
        out[:] = (capital_expenditure[-1] * 4.) / (free_cash_flow[-1] * 4.)

class Capex_To_Cashflows_forwardfill(CustomFactor):
    #inputs=[cfs.capital_expenditure.latest,cfs.free_cash_flow.latest]
    inputs=[cfs.capital_expenditure, cfs.free_cash_flow]
    window_length=10
    def compute(self, today, assets, out, capital_expenditure, free_cash_flow):
        
        capital_expenditure = nanfill(capital_expenditure)
        free_cash_flow      = nanfill(free_cash_flow)
        
        out[:] = (capital_expenditure[-1] * 4.) / (free_cash_flow[-1] * 4.)
        #out[:] = nanfill((capital_expenditure * 4.) / (free_cash_flow * 4.))

def initialize(context):
    attach_pipeline(make_ml_pipeline(), 'pipe')

def before_trading_start(context, data):
    out = pipeline_output('pipe')
    
    #print out.sort_values(by='Capex_To_Cashflows_filled', ascending=True)  # cropped
    
    count = 0
    for s in out.index:  # When nan or not equal, log with the lines before and after.
        if out.loc[s].Capex_To_Cashflows != out.loc[s].Capex_To_Cashflows_filled:
            log.info('.\n{}'.format(out[count-1 : count+2]))
        count += 1

    do_log_preview = 1    # a way to toggle this off when it becomes annoying
    if do_log_preview:
        try:    context.log_data_done
        except: log_data(context, out, 9)        # show pipe info once

def make_ml_pipeline():
    
    wndw = 252  # even with a very high window_length, the problem is still there sometimes ...
    '''
                       Capex_To_Cashflows  Capex_To_Cashflows_filled
    Equity(755 [BC])             0.525970                   0.525970
    Equity(766 [BCE])                 NaN                   0.525970
    Equity(794 [BDX])           -0.332198                  -0.332198
    2019-07-03 05:45 before_trading_start:46 INFO .
                        Capex_To_Cashflows  Capex_To_Cashflows_filled
    Equity(980 [BMY])            -0.172007                  -0.172007
    Equity(1010 [BNS])                 NaN                  -0.172007
    Equity(1023 [BOH])           -2.092683                  -2.092683
    2019-07-03 05:45 before_trading_start:46 INFO .
                         Capex_To_Cashflows  Capex_To_Cashflows_filled
    Equity(1385 [CDNS])            -0.08978                  -0.089780
    Equity(1402 [CEF])                  NaN                   3.365413
    Equity(1419 [CERN])            -1.77185                  -1.771850
    2019-07-03 05:45 before_trading_start:46 INFO .
                           Capex_To_Cashflows  Capex_To_Cashflows_filled
    Equity(1637 [CMCS_A])           -0.574695                  -0.574695
    Equity(1655 [CMO])                    NaN                 -55.563744
    Equity(1665 [CMS])              -4.660550                  -4.660550
    2019-07-03 05:45 before_trading_start:46 INFO .
                        Capex_To_Cashflows  Capex_To_Cashflows_filled
    Equity(1789 [COT])           10.440000                  10.440000
    Equity(1792 [CP])                  NaN                  10.440000
    Equity(1795 [CPB])           -0.336283                  -0.336283
    '''
    
    fltr = QTradableStocksUS()
    return Pipeline(
        screen  = fltr, 
        columns = {
           'Capex_To_Cashflows'       : Capex_To_Cashflows            (window_length=wndw, mask=fltr),
           'Capex_To_Cashflows_filled': Capex_To_Cashflows_forwardfill(window_length=wndw, mask=fltr),
        }
    )

def nanfill(arr):
    # https://www.quantopian.com/posts/forward-filling-nans-in-pipeline-custom-factors
    mask = np.isnan(arr)
    idx  = np.where(~mask,np.arange(mask.shape[1]),0)
    np.maximum.accumulate(idx,axis=1, out=idx)
    arr[mask] = arr[np.nonzero(mask)[0], idx[mask]]
    return arr

def log_data(context, z, num, fields=None):
    ''' Log info about pipeline output or, z can be any DataFrame or Series
    https://quantopian.com/posts/overview-of-pipeline-content-easy-to-add-to-your-backtest
    '''
    if not len(z):
        log.info('Empty pipe')
        return

    try: context.log_data_done
    except:
        log.info('starting_cash ${:,}'.format(int(context.portfolio.cash)))
        context.log_data_done = 1

    # Options
    log_nan_only = 0          # Only log if nans are present.
    show_sectors = 0          # If sectors, see them or not.
    show_sorted_details = 1   # [num] high & low securities sorted, each column.
    padmax = 6                # num characters for each field, starting point.

    def out(lines):  # log data lines of output efficiently
        buffer_len = 1024   # each group
        chunk = ':'
        for line in lines:
            if line is None or not len(line):
                continue    # skip if empty string for example
            if len(chunk) + len(line) < buffer_len:
                # Add to chunk if will still be under buffer_len
                chunk += '\n{}'.format(line)
            else:  # Or log chunk and start over with new line.
                log.info(chunk)
                chunk = ':\n{}'.format(line)
        if len(chunk) > 2:       # if anything remaining
            log.info(chunk)

    if 'dict' in str(type(z)):
        log.info('Not set up to handle a dictionary, only dataframe & series, bailing out of log_data()')
        return
    elif 'MultiIndex' in str(type(z.index)):
        log.info('Found MultiIndex, not set up to handle it, bailing out of log_data()')
        return
    # Change index to just symbols for readability, meanwhile, right-aligned
    z = z.rename(index=dict(zip(z.index.tolist(), [i.symbol.rjust(6) for i in z.index.tolist()])))

    # Series ......
    if 'Series' in str(type(z)):    # is Series, not DataFrame
        nan_count = len(z[z != z])
        nan_count = 'NaNs {}/{}'.format(nan_count, len(z)) if nan_count else ''
        if (log_nan_only and nan_count) or not log_nan_only:
            pad = max( padmax, len('%.5f' % z.max()) )
            log.info('{}{}{}   Series  len {}'.format('min'.rjust(pad+5),
                'mean'.rjust(pad+5), 'max'.rjust(pad+5), len(z)))
            log.info('{}{}{} {}'.format(
                ('%.5f' % z.round(6). min()).rjust(pad+5),
                ('%.5f' % z.round(6).mean()).rjust(pad+5),
                ('%.5f' % z.round(6). max()).rjust(pad+5),
                nan_count
            ))
            log.info('High\n{}'.format(z.sort_values(ascending=False).head(num)))
            log.info('Low\n{}' .format(z.sort_values(ascending=False).tail(num)))
        return

    # DataFrame ......
    content_min_max = [ ['','min','mean','max',''] ] ; content = []
    for col in z.columns:
        try: z[col].max()
        except: continue   # skip non-numeric
        if col == 'sector' and not show_sectors: continue
        nan_count = len(z[col][z[col] != z[col]])
        nan_count = 'NaNs {}/{}'.format(nan_count, len(z)) if nan_count else ''
        padmax    = max( padmax, len(str(z[col].max())) ) ; mean_ = ''
        if len(str(z[col].max())) > 8 and 'float' in str(z[col].dtype):
            z[col] = z[col].round(6)   # Reduce number of decimal places for floating point values
        if 'float' in str(z[col].dtype): mean_ = str(round(z[col].mean(), 6))
        elif 'int' in str(z[col].dtype): mean_ = str(round(z[col].mean(), 1))
        content_min_max.append([col, str(z[col] .min()), mean_, str(z[col] .max()), nan_count])
    if log_nan_only and nan_count or not log_nan_only:
        log.info('Rows: {}  Columns: {}'.format(z.shape[0], z.shape[1]))
        if len(z.columns) == 1: content.append('Rows: {}'.format(z.shape[0]))

        paddings = [6 for i in range(4)]
        for lst in content_min_max:    # set max lengths
            i = 0
            for val in lst[:4]:    # value in each sub-list
                paddings[i] = max(paddings[i], len(str(val)))
                i += 1
        headr = content_min_max[0]
        content.append(('{}{}{}{}{}'.format(
             headr[0] .rjust(paddings[0]),
            (headr[1]).rjust(paddings[1]+5),
            (headr[2]).rjust(paddings[2]+5),
            (headr[3]).rjust(paddings[3]+5),
            ''
        )))
        for lst in content_min_max[1:]:    # populate content using max lengths
            content.append(('{}{}{}{}     {}'.format(
                lst[0].rjust(paddings[0]),
                lst[1].rjust(paddings[1]+5),
                lst[2].rjust(paddings[2]+5),
                lst[3].rjust(paddings[3]+5),
                lst[4],
            )))
    out(content)

    if not show_sorted_details: return
    if len(z.columns) == 1:     return     # skip detail if only 1 column
    if fields == None: details = z.columns
    content = []
    for detail in details:
        if detail == 'sector' and not show_sectors: continue
        hi = z[details].sort_values(by=detail, ascending=False).head(num)
        lo = z[details].sort_values(by=detail, ascending=False).tail(num)
        content.append(('_ _ _   {}   _ _ _'  .format(detail)))
        content.append(('{} highs ...\n{}'.format(detail, str(hi))))
        content.append(('{} lows  ...\n{}'.format(detail, str(lo))))
        if log_nan_only and not len(lo[lo[detail] != lo[detail]]):
            continue  # skip if no nans
    out(content)
    
There was a runtime error.

[Edit: Their challenge would be, going back in time for each nan stock individually until a latest value is found].

Can Q provide us with a forward fill option for custom factors?

Yes, that would be great. Just to explain to Q why forward filling is an essential tool to have:

If you are doing machine learning on a time series you need to have the NaN values changed into numbers. Typically, the mean, mode or whatever would suffice for data that is NOT a time series, but time series are heavily autocorrelated. Due to this autocorrelation the only logical choice for replacing nulls is with the last known value => therefore forward filling is essential for machine learning.

Hi! You can use this code for backfilling:

        arr = arr[::-1].T  
        mask = np.isnan(arr)  
        idx = np.where(~mask,np.arange(mask.shape[1]),0)  
        np.maximum.accumulate(idx,axis=1, out=idx)  
        arr = arr[np.arange(idx.shape[0])[:,None], idx]  
        arr = arr.T[::-1]  

Regards,
Mark

Anybody from Quantopian already had time to look into the wrong forward filling of NaNs as described above by me and Blue Seahawk?