Back to Community
measuring money flows?

In the context of Quantopian, is there any way to estimate money flows? For example, during the 2007-2008 financial crisis, I figure that money was flowing quickly out of stocks and into more secure investments. As the market has recovered, money has been flowing back into the market, right? I'd like to write an algorithm that would illustrate the relative flow in and out of the market, over the 2007-2013 time frame. Any guidance?

Thanks,

Grant

15 responses

It looks like WSJ has Dow Jones money flow data by sector. The daily historical tables go back to August 2010.

In the same section the WSJ has a list of individual stocks with the highest outflow of money and the largest inflow of money. These two charts are updated every 15 minutes and are available as a tab-delimited text file. The daily historical record for these tables goes back to May 2007.

ICI has an interesting spreadsheet of monthly mutual fund money flow from 2007 onward. It is broken down by region (world/domestic) and asset type (equity/bond).

This might be useful to validate the Dow Jones data.

The US Treasury Dept has a nice monthly CSV file of Cross-Border Portfolio Financial Flows going back to 1978.

When using this dataset keep in mind that it is 2 month lagged (e.g. they don't release April data until June).

Thanks Dennis,

Would there be any way to use the Quantopian dataset to obtain the indicator? Admittedly, I have not done my homework here, but I figure that there ought to be some way to show that, for example, during the 2007-2008 crisis, investors shifted out of stocks and into bonds/cash/etc. The indicator should be pretty dramatic for this time period, and might shed some light on how the market behaves when there is fear, uncertainty and doubt (FUD).

Grant

As it stands I think the Quantopian dataset only uses unsigned volume (e.g. it doesn't indicate which direction the money is going).

However money out-flow is very likely to depress the trading price (since there is more supply than demand).

Using this assumption it is possible to roughly estimate the money flow from the OHLCV dataset provided by Quantopian.

You could then calculate the money flow indicator for bellweather securities (SPY, BND, UUP, GLD) and that would give you a sense of enthusiasm for stocks, bonds, cash and gold. You could probably do the same for different sector ETFs.

http://www.investopedia.com/terms/m/moneyflow.asp
http://en.wikipedia.org/wiki/Money_flow_index
http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:chaikin_money_flow

Thanks Dennis,

The Wikipedia site provides "steps to calculate the Money flow index over N days" that can be implemented in Quantopian straightforwardly.

Grant

Dennis,

Here's my first crack at computing the money flow index, for SPY & BND. I haven't checked the code line-by-line, but it sorta yields the anticipated behavior. If you find bugs or identify improvements, please let me know.

Grant

Clone Algorithm
32
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Reference:
# 
# Money flow index
# http://en.wikipedia.org/wiki/Money_flow_index

import numpy as np

# globals for batch transform decorator
R_P = 1 # refresh period in days
W_L = 30 # window length in days
        
def initialize(context):
    
    context.stocks = [sid(8554),sid(33652)] # SPY & BND

def handle_data(context, data):
        
    # get MFI
    MFI = get_MFI(data, context.stocks)
    if MFI is None:
        return
    
    print MFI
    
    record(MFI_SPY = MFI[-1,0])
    record(MFI_BND = MFI[-1,1])
    
@batch_transform(refresh_period=R_P, window_length=W_L) # set globals R_P & W_L above
def get_MFI(datapanel,sids):
    H = datapanel['high'].as_matrix(sids)
    L = datapanel['low'].as_matrix(sids)
    C = datapanel['price'].as_matrix(sids)
    V = datapanel['volume'].as_matrix(sids)
    
    typical_price = np.add(H,L)
    typical_price = np.add(typical_price,C)
    typical_price = np.divide(typical_price,3)
    
    sign_MF = np.subtract(typical_price[1:,:],typical_price[0:-1,:])
    sign_MF = np.sign(sign_MF)
    
    MF = np.multiply(sign_MF, typical_price[1:,:])
    MF = np.multiply(MF,V[1:,:])
    
    positive_MF = np.copy(MF)
    positive_MF[positive_MF <= 0] = 0
    positive_MF = np.cumsum(positive_MF, axis = 0)
    
    negative_MF = np.copy(MF)
    negative_MF[negative_MF >= 0] = 0
    negative_MF = np.absolute(negative_MF)
    negative_MF = np.cumsum(negative_MF, axis = 0)
    
    denom = np.add(positive_MF, negative_MF)
    MFI = 100*np.divide(positive_MF,denom)
    
    return MFI
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.

The custom data plot looks very convincing. Nice job.

Here is a strategy that goes "all in" either SPY or BND depending on which MFI indicator is bigger. The code is flexible enough to use other daily rebalancing schemes if desired.

Clone Algorithm
37
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Reference:
# 
# Money flow index
# http://en.wikipedia.org/wiki/Money_flow_index

import numpy as np

# globals for batch transform decorator
R_P = 1 # refresh period in days
W_L = 30 # window length in days
        
def initialize(context):
    
    context.stocks = [sid(8554),sid(33652)] # SPY & BND

def handle_data(context, data):
        
    # get MFI
    MFI = get_MFI(data, context.stocks)
    if MFI is None:
        return
    
    print MFI
    
    record(MFI_SPY = MFI[-1,0])
    record(MFI_BND = MFI[-1,1])
    
    rebalance(context, data, MFI[-1,0], MFI[-1,1], 1.0)
    #rebalance(context, data, MFI[-1,0], MFI[-1,1])
    #rebalance(context, data, 60, 40)
    #rebalance(context, data)
    
# rebalance portfolio according to MFI ratio
def rebalance(context, data, spy=1.0, bnd=1.0, ratio_boost=0.0):
    
    spy = float(spy)
    bnd = float(bnd)
    
    if spy <= 0 or bnd <= 0:
        return

    cash = context.portfolio.starting_cash + context.portfolio.pnl
    
    sid_spy = context.stocks[0]
    sid_bnd = context.stocks[1]
    
    base = min(spy, bnd) * ratio_boost  # factor to force ratio to be more extreme [0..1]
    
    spy_ratio = (spy - base)/(spy + bnd - 2*base)
    bnd_ratio = (bnd - base)/(spy + bnd - 2*base)
    
    qty_spy = context.portfolio.positions[sid_spy].amount
    qty_bnd = context.portfolio.positions[sid_bnd].amount
    
    spy_price = data[sid_spy].price
    bnd_price = data[sid_bnd].price
    
    lotsize = 100
    
    shares_spy = int(cash * spy_ratio / spy_price / lotsize) * lotsize
    shares_bnd = int(cash * bnd_ratio / bnd_price / lotsize) * lotsize
    
    order(sid_spy, shares_spy - qty_spy)
    order(sid_bnd, shares_bnd - qty_bnd)

@batch_transform(refresh_period=R_P, window_length=W_L) # set globals R_P & W_L above
def get_MFI(datapanel,sids):
    H = datapanel['high'].as_matrix(sids)
    L = datapanel['low'].as_matrix(sids)
    C = datapanel['price'].as_matrix(sids)
    V = datapanel['volume'].as_matrix(sids)
    
    typical_price = np.add(H,L)
    typical_price = np.add(typical_price,C)
    typical_price = np.divide(typical_price,3)
    
    sign_MF = np.subtract(typical_price[1:,:],typical_price[0:-1,:])
    sign_MF = np.sign(sign_MF)
    
    MF = np.multiply(sign_MF, typical_price[1:,:])
    MF = np.multiply(MF,V[1:,:])
    
    positive_MF = np.copy(MF)
    positive_MF[positive_MF <= 0] = 0
    positive_MF = np.cumsum(positive_MF, axis = 0)
    
    negative_MF = np.copy(MF)
    negative_MF[negative_MF >= 0] = 0
    negative_MF = np.absolute(negative_MF)
    negative_MF = np.cumsum(negative_MF, axis = 0)
    
    denom = np.add(positive_MF, negative_MF)
    MFI = 100*np.divide(positive_MF,denom)
    
    return MFI
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.

Here is a version of the "all in" strategy that is tailored for small accounts. It starts with $10k and enforces a 3 day waiting period after any sell order. To achieve the waiting period it either goes all in SPY or waits in cash.

Another change is the use of a smoothing function on the MFI indicator. In this case I'm using an exponential average with decay of 10%. In other words the new MFI value only has a 10% impact on the previous remembered MFI value.

Clone Algorithm
37
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Reference:
# 
# Money flow index
# http://en.wikipedia.org/wiki/Money_flow_index

import numpy as np

# globals for batch transform decorator
R_P = 1 # refresh period in days
W_L = 30 # window length in days
        
def initialize(context):
    
    context.stocks = [sid(8554),sid(33652)] # SPY, BND
    
    context.MFI_SPY = 0   # variable to hold exponential average of MFI_SPY
    context.MFI_BND = 0   # ditto for MFI_BND

    context.lotsize = 25  # block size of shares to purchase
    
    context.settle = 3    # waiting period after a SELL order
    context.wait = 0      # counter of current wait

def handle_data(context, data):
        
    # get MFI
    MFI = get_MFI(data, context.stocks)
    if MFI is None:
        return
    
    #print MFI
    
    decay = .1  # decay rate for exponential average [0..1]
    
    context.MFI_SPY = context.MFI_SPY*(1.0-decay) + MFI[-1,0]*decay
    context.MFI_BND = context.MFI_BND*(1.0-decay) + MFI[-1,1]*decay
    
    record(MFI_SPY = context.MFI_SPY)
    record(MFI_BND = context.MFI_BND)
    
    if not context.wait:   # don't rebalance position if in waiting period
        long_or_cash(context, data, context.MFI_SPY, context.MFI_BND, 1.0)
    else:
        context.wait -= 1  # decrement waiting counter

# long SPY or wait in cash, according to MFI ratio
def long_or_cash(context, data, spy=1.0, bnd=1.0, ratio_boost=0.0):
    
    spy = float(spy)
    bnd = float(bnd)
    
    if spy <= 0 or bnd <= 0:
        return

    cash = context.portfolio.starting_cash + context.portfolio.pnl
    
    sid_spy = context.stocks[0]
    
    base = min(spy, bnd) * ratio_boost  # factor to force ratio to be more extreme [0..1]
    
    spy_ratio = (spy - base)/(spy + bnd - 2*base)
    
    qty_spy = context.portfolio.positions[sid_spy].amount
    
    spy_price = data[sid_spy].price
    
    shares_spy = int(cash * spy_ratio / spy_price / context.lotsize) * context.lotsize
    
    if shares_spy < qty_spy:
        context.wait = context.settle
    
    order(sid_spy, shares_spy - qty_spy)

@batch_transform(refresh_period=R_P, window_length=W_L) # set globals R_P & W_L above
def get_MFI(datapanel,sids):
    H = datapanel['high'].as_matrix(sids)
    L = datapanel['low'].as_matrix(sids)
    C = datapanel['price'].as_matrix(sids)
    V = datapanel['volume'].as_matrix(sids)
    
    typical_price = np.add(H,L)
    typical_price = np.add(typical_price,C)
    typical_price = np.divide(typical_price,3)
    
    sign_MF = np.subtract(typical_price[1:,:],typical_price[0:-1,:])
    sign_MF = np.sign(sign_MF)
    
    MF = np.multiply(sign_MF, typical_price[1:,:])
    MF = np.multiply(MF,V[1:,:])
    
    positive_MF = np.copy(MF)
    positive_MF[positive_MF <= 0] = 0
    positive_MF = np.cumsum(positive_MF, axis = 0)
    
    negative_MF = np.copy(MF)
    negative_MF[negative_MF >= 0] = 0
    negative_MF = np.absolute(negative_MF)
    negative_MF = np.cumsum(negative_MF, axis = 0)
    
    denom = np.add(positive_MF, negative_MF)
    MFI = 100*np.divide(positive_MF,denom)
    
    return MFI
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.

My previous version stayed out of the market about 50% of the time. Here is a version (still tailored for small accounts) that switches between SPY and BND. In addition to the 3 day waiting period (following a sell) this version also enforces a 1 day waiting period after a buy.

Clone Algorithm
37
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Reference:
# 
# Money flow index
# http://en.wikipedia.org/wiki/Money_flow_index

import numpy as np

# globals for batch transform decorator
R_P = 1 # refresh period in days
W_L = 30 # window length in days
        
def initialize(context):
    
    context.stocks = [sid(8554),sid(33652)] # SPY, BND
    
    context.MFI_SPY = 0   # variable to hold exponential average of MFI_SPY
    context.MFI_BND = 0   # ditto for MFI_BND

    context.lotsize = 50  # block size of shares to purchase
    
    context.settle = 3    # waiting period after a SELL order
    context.wait = 0      # counter of current wait

def handle_data(context, data):
        
    # get MFI
    MFI = get_MFI(data, context.stocks)
    if MFI is None:
        return
    
    #print MFI
    
    decay = .1  # decay rate for exponential average [0..1]
    
    context.MFI_SPY = context.MFI_SPY*(1.0-decay) + MFI[-1,0]*decay
    context.MFI_BND = context.MFI_BND*(1.0-decay) + MFI[-1,1]*decay
    
    record(MFI_SPY = context.MFI_SPY)
    record(MFI_BND = context.MFI_BND)
    
    if not context.wait:   # don't rebalance position if in waiting period
        spy_vs_bnd(context, data, context.MFI_SPY, context.MFI_BND, 1.0)
    else:
        context.wait -= 1  # decrement waiting counter

# long SPY or BND, with waiting period, according to MFI ratio
def spy_vs_bnd(context, data, spy=1.0, bnd=1.0, ratio_boost=0.0):
    
    spy = float(spy)
    bnd = float(bnd)
    
    if spy <= 0 or bnd <= 0:
        return

    cash = context.portfolio.starting_cash + context.portfolio.pnl
    
    sid_spy = context.stocks[0]
    sid_bnd = context.stocks[1]
    
    base = min(spy, bnd) * ratio_boost  # factor to force ratio to be more extreme [0..1]
    
    spy_ratio = (spy - base)/(spy + bnd - 2*base)
    bnd_ratio = (bnd - base)/(spy + bnd - 2*base)
    
    qty_spy = context.portfolio.positions[sid_spy].amount
    qty_bnd = context.portfolio.positions[sid_bnd].amount
    
    spy_price = data[sid_spy].price
    bnd_price = data[sid_bnd].price
    
    shares_spy = int(cash * spy_ratio / spy_price / context.lotsize) * context.lotsize
    shares_bnd = int(cash * bnd_ratio / bnd_price / context.lotsize) * context.lotsize
    
    if shares_spy < qty_spy:
        context.wait = context.settle
        order(sid_spy, shares_spy - qty_spy)

    elif shares_bnd < qty_bnd:
        context.wait = context.settle
        order(sid_bnd, shares_bnd - qty_bnd)

    elif shares_spy > qty_spy:
        context.wait = 1
        order(sid_spy, shares_spy - qty_spy)

    elif shares_bnd > qty_bnd:
        context.wait = 1
        order(sid_bnd, shares_bnd - qty_bnd)

@batch_transform(refresh_period=R_P, window_length=W_L) # set globals R_P & W_L above
def get_MFI(datapanel,sids):
    H = datapanel['high'].as_matrix(sids)
    L = datapanel['low'].as_matrix(sids)
    C = datapanel['price'].as_matrix(sids)
    V = datapanel['volume'].as_matrix(sids)
    
    typical_price = np.add(H,L)
    typical_price = np.add(typical_price,C)
    typical_price = np.divide(typical_price,3)
    
    sign_MF = np.subtract(typical_price[1:,:],typical_price[0:-1,:])
    sign_MF = np.sign(sign_MF)
    
    MF = np.multiply(sign_MF, typical_price[1:,:])
    MF = np.multiply(MF,V[1:,:])
    
    positive_MF = np.copy(MF)
    positive_MF[positive_MF <= 0] = 0
    positive_MF = np.cumsum(positive_MF, axis = 0)
    
    negative_MF = np.copy(MF)
    negative_MF[negative_MF >= 0] = 0
    negative_MF = np.absolute(negative_MF)
    negative_MF = np.cumsum(negative_MF, axis = 0)
    
    denom = np.add(positive_MF, negative_MF)
    MFI = 100*np.divide(positive_MF,denom)
    
    return MFI
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.

Thanks Dennis,

Glad to see that you had some fun with it!

Grant

Hello Dennis,

I tweaked the code a bit so that with a single call to the batch transform, the MFI over a range of N would be returned as a matrix. Each column corresponds, respectively, to a sid:

context.stocks = [sid(8554),sid(33652)] # SPY & BND  

So, columns 0 & 1 correspond to the MFI's for SPY & BND, respectively.

Each row of the returned matrix corresponds to the MFI computed over N days. Row 0 corresponds to 1 day, row 1 to 2 days, and so on, up to a maximum of W_L-1 days (where W_L is the length of the trailing window used by the batch transform).

This way, if one wanted to write a trading algorithm that compared, say, MFI(N=30 days) to MFI(N=60 days), only a single call would be required to the batch transform. In general, it facilitates more complex analyses of the MFI versus N.

Grant

Clone Algorithm
32
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# Reference:
# 
# Money flow index
# http://en.wikipedia.org/wiki/Money_flow_index

import numpy as np

# globals for batch transform decorator
R_P = 1 # refresh period in days
W_L = 61 # window length in days
        
def initialize(context):
    
    context.stocks = [sid(8554),sid(33652)] # SPY & BND

def handle_data(context, data):
        
    # get MFI
    MFI = get_MFI(data, context.stocks)
    if MFI is None:
        return
    
    record(MFI_SPY = MFI[W_L-2,0])
    record(MFI_BND = MFI[W_L-2,1])
    record(MFI_diff = MFI[W_L-2,1]-MFI[W_L-2,0])
    
@batch_transform(refresh_period=R_P, window_length=W_L) # set globals R_P & W_L above
def get_MFI(datapanel,sids):
    H = datapanel['high'].as_matrix(sids)
    L = datapanel['low'].as_matrix(sids)
    C = datapanel['price'].as_matrix(sids)
    V = datapanel['volume'].as_matrix(sids)
    
    typical_price = np.add(H,L)
    typical_price = np.add(typical_price,C)
    typical_price = np.divide(typical_price,3)
    
    sign_MF = np.subtract(typical_price[1:,:],typical_price[0:-1,:])
    sign_MF = np.sign(sign_MF)
    
    MF = np.multiply(sign_MF, typical_price[1:,:])
    MF = np.multiply(MF,V[1:,:])
    
    positive_MF = np.copy(MF)
    positive_MF[positive_MF <= 0] = 0
    positive_MF = np.cumsum(np.flipud(positive_MF), axis = 0)
    
    negative_MF = np.copy(MF)
    negative_MF[negative_MF >= 0] = 0
    negative_MF = np.absolute(negative_MF)
    negative_MF = np.cumsum(np.flipud(negative_MF), axis = 0)
    
    denom = np.add(positive_MF, negative_MF)
    MFI = 100*np.divide(positive_MF,denom)
    
    return MFI
This backtest was created using an older version of the backtester. Please re-run this backtest to see results using the latest backtester. Learn more about the recent changes.
There was a runtime error.

Thanks Dennis, Grant

Thanks for the great work on this algorithm. I seem to be getting a error message every time I try to build the algorithm posted by Dennis on the 24th June 2013. The error message reads as follows:-

KeyError: 'high'
There was a runtime error on line 28.

Any ideas on how I can resolve?

Bevan

A money flow index (MFI) Pipeline custom factor was published here:

https://www.quantopian.com/posts/ta-lib-for-pipeline

Here's the code (I haven't checked and tested it):


class MFI(CustomFactor):  
    """  
    Money Flow Index

    Volume Indicator

    **Default Inputs:**  USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume

    **Default Window Length:** 15 (14 + 1-day for difference in prices)

    http://www.fmlabs.com/reference/default.htm?url=MoneyFlowIndex.htm  
    """     

    inputs = [USEquityPricing.high, USEquityPricing.low, USEquityPricing.close, USEquityPricing.volume]  
    window_length = 15

    def compute(self, today, assets, out, high, low, close, vol):

        # calculate typical price  
        typical_price = (high + low + close) / 3.

        # calculate money flow of typical price  
        money_flow = typical_price * vol

        # get differences in daily typical prices  
        tprice_diff = (typical_price - np.roll(typical_price, 1, axis=0))[1:]

        # create masked arrays for positive and negative money flow  
        pos_money_flow = np.ma.masked_array(money_flow[1:], tprice_diff < 0, fill_value = 0.)  
        neg_money_flow = np.ma.masked_array(money_flow[1:], tprice_diff > 0, fill_value = 0.)

        # calculate money ratio  
        money_ratio = np.sum(pos_money_flow, axis=0) / np.sum(neg_money_flow, axis=0)

        # MFI  
        out[:] = 100. - (100. / (1. + money_ratio))