Back to Community
Exponentially Weighted Moving Average & Standard Deviation in Pipeline

Scott shared the details of algorithm profiling that he recently completed on an algorithm using the new Pipeline API. This resulted in the addition of two new built in factors that are now available for use.

  • Exponentially weighted moving average (EWMA) - which allows you to calculate a moving average while weighting the importance of the data based on recency
  • Exponentially weighted moving standard deviation (EWMSTD) - which allows you to see the variance in the EWMA

The attached backtest was the backtest shared with us by Simon Thornington which resulted in the addition of these two new built in factors. I thought it relevant to share and have updated it to use the two new factors. Simon asked that I give a nod to Systematic Trading: A unique new method for designing trading and investing systems since most of the naive risk parity sizing etc came from his book.

Clone Algorithm
Total Returns
Max Drawdown
Benchmark Returns
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
import numpy as np
import pandas as pd
from scipy import stats
from pytz import timezone
import datetime
import math
import time
import functools
import random
import itertools
from statsmodels.stats.moment_helpers import cov2corr, corr2cov, se_cov
from quantopian.algorithm import attach_pipeline, pipeline_output
from quantopian.pipeline import Pipeline
from import USEquityPricing
from import morningstar
from quantopian.pipeline.factors import SimpleMovingAverage, Latest, EWMSTD, EWMA
from quantopian.pipeline import CustomFactor

PctDailyVolatilityTarget = 0.008
#DiversificationMultiplier = 1.5 # pre-calculated in Research
VolHalfLife = 5 * 4 # four week half life - mid-term
PositionInertiaLevel = 0.1
LeverageCap = 2.0
PortfolioSize = 200

# replaced with EWMA built in factor
# class EWMA(CustomFactor):
#     inputs = [USEquityPricing.close]
#     def compute(self, today, assets, out, close):
#         span = self.window_length / 2
#         df = pd.DataFrame(close,columns=assets)
#         out[:] = pd.ewma(df,span=span).iloc[-1]

# replaced with EWMSTD built in factor
# class EWMSTD(CustomFactor):
#     inputs = [USEquityPricing.close]
#     def compute(self, today, assets, out, close):
#         span = self.window_length / 2
#         df = pd.DataFrame(close,columns=assets)
#         out[:] = pd.ewmstd(df,span=span).iloc[-1]

def initialize(context):
    pipe = Pipeline()
    pipe = attach_pipeline(pipe, name='pipeline')
    ewma16 = EWMA.from_span([USEquityPricing.close], window_length=32, span=16)
    pipe.add(ewma16, "ewma16")
    ewma32 = EWMA.from_span([USEquityPricing.close], window_length=64, span=32)
    pipe.add(ewma32, "ewma32")
    ewma64 = EWMA.from_span([USEquityPricing.close], window_length=128, span=64)
    pipe.add(ewma64, "ewma64")
    ewma128 = EWMA.from_span([USEquityPricing.close], window_length=256, span=128)
    pipe.add(ewma128, "ewma128")
    ewma256 = EWMA.from_span([USEquityPricing.close], window_length=512, span=256)
    pipe.add(ewma256, "ewma256")

    ewmstd64 = EWMSTD.from_span([USEquityPricing.close], window_length=128, span=64)
    pipe.add(ewmstd64, "ewmstd64")
    ewmstd128 = EWMSTD.from_span([USEquityPricing.close], window_length=256, span=128)
    pipe.add(ewmstd128, "ewmstd128")
    ewmstd256 = EWMSTD.from_span([USEquityPricing.close], window_length=512, span=256)
    pipe.add(ewmstd256, "ewmstd256")

    not_penny = Latest(inputs=[USEquityPricing.close]) > 1.0
    volume = SimpleMovingAverage(inputs=[USEquityPricing.volume], window_length=20)
    volume_rank = volume.rank(ascending=False)
    mask = (not_penny &
    # calculate our allocation every day mid-morning
    schedule_function(allocation, date_rule=date_rules.every_day(), time_rule=time_rules.market_open(hours=1))
    schedule_function(cancel_all, date_rule=date_rules.every_day(), time_rule=time_rules.market_close(minutes=1))

def calc_ewmac(ewma_short, ewma_long, ewmstd, forecast_scalar):
    raw_ewmac = ewma_short - ewma_long
    price_ewmstd = ewmstd * ewma_short
    ewmac = (raw_ewmac / price_ewmstd).dropna(axis=0)
    adj_forecast = ewmac * forecast_scalar
    adj_forecast[adj_forecast > 20.0] = 20.0
    adj_forecast[adj_forecast < -20.0] = -20.0
    return adj_forecast

def before_trading_start(context, data):
    results = pipeline_output('pipeline').dropna()
    a = calc_ewmac(results['ewma16'], results['ewma64'], results['ewmstd64'], 4) # forecast_scalars backtested in research
    b = calc_ewmac(results['ewma32'], results['ewma128'], results['ewmstd128'], 2)
    c = calc_ewmac(results['ewma64'], results['ewma256'], results['ewmstd256'], 1.25)
    combined = (0.4*a + 0.2*b + 0.4*c) * 1.1 # forecast diversification multiplier, from book
    combined[combined > 20.0] = 20.0
    combined[combined < -20.0] = -20.0
    context.mean_forecast = combined.mean()
    half = PortfolioSize / 2
    garbage = combined.index[half:-half]
    remaining = combined.drop(garbage)
    context.forecasts = remaining
def handle_data(context, data):

def allocation(context, data):
    daily_cash_volatility_target = PctDailyVolatilityTarget*context.portfolio.portfolio_value
    closes = history(256, "1d", "price")
    # initialize weights to 0
    weights = pd.Series([0], index=closes.columns)
    # then equal-weight all our desired universe.  This will cause us to sell those things
    # that drop out because their forecasts worsened (relatively)
    weights.update(pd.Series([1.0/float(PortfolioSize)], index=context.forecasts.index))
    positions = calc_target_position(closes, 
    prices = closes.iloc[-1]
    print (positions*prices).sum()
    std = calc_std(np.log(closes).diff().dropna())

    for x in positions.index:
        desired_position = positions[x]
        current_position = context.portfolio.positions[x].amount
        delta = desired_position - current_position
        off_by = abs(delta / current_position)
        if ((off_by > PositionInertiaLevel) | (desired_position == 0)):
            price = closes[x].iloc[-1] 
            if ((delta > 0) & (context.account.leverage < LeverageCap)):
      "BUY %d %s @ MARKET (currently at %03.02f)" % (delta, x.symbol, price))
                order(x, delta, style=MarketOrder())
                # don't mess around when deleveraging
      "SELL %d %s @ MARKET (currently at %03.02f)" % (delta, x.symbol, price))
                order(x, delta, style=MarketOrder())

def cancel_all(context, data):
    sids_cancelled = set()
    logged_cancel = False
    open_orders = get_open_orders()
    for security, orders in open_orders.iteritems():  
        for oo in orders:
            if (not logged_cancel):
                log.warn("Cancelling orders at close")
                logged_cancel = True
    return sids_cancelled 
def floor_corr(corr):
    corr[corr<0] = 0
    return corr

def calc_std(returns):
    downside_only = False
    if (downside_only):
        returns = returns.copy()
        returns[returns > 0.0] = np.nan
    b = pd.ewmstd(returns, halflife=VolHalfLife, adjust=True, ignore_na=True).dropna()
    return b.iloc[-1]

def calc_vol_scalar(prices, daily_cash_vol_target):
    shares_per_block = 1.0
    # ignore FX
    rets = np.log(prices).diff().dropna()
    block_value = (shares_per_block * prices.iloc[-1])
    price_vol = calc_std(rets)
    # instrument_currency_volatility not necessary since we don't have FX
    instrument_value_volatility = block_value * price_vol
    volatility_scalar = daily_cash_vol_target / instrument_value_volatility
    return volatility_scalar

def pipeline_forecast(context):
    return context.forecasts

def calc_instrument_diversification_multiplier(prices, instrument_weights):
    rets = np.log(prices).diff().dropna()
    corr = floor_corr(rets.corr())
    return 1.0 / np.sqrt(,,instrument_weights.T)))

def calc_target_position(prices, forecast, daily_cash_vol_target, instrument_weights):
    volatility_scalar = calc_vol_scalar(prices, daily_cash_vol_target)
    subsystem_position_blocks = (forecast * volatility_scalar) / 10.0
    diversification_multiplier = calc_instrument_diversification_multiplier(prices, instrument_weights)
#    diversification_multiplier = DiversificationMultiplier
    portfolio_position_blocks = subsystem_position_blocks * instrument_weights * diversification_multiplier
    return portfolio_position_blocks.round()
There was a runtime error.

The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by Quantopian. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. No information contained herein should be regarded as a suggestion to engage in or refrain from any investment-related course of action as none of Quantopian nor any of its affiliates is undertaking to provide investment advice, act as an adviser to any plan or entity subject to the Employee Retirement Income Security Act of 1974, as amended, individual retirement account or individual retirement annuity, or give advice in a fiduciary capacity with respect to the materials presented herein. If you are an individual retirement or other investor, contact your financial advisor or other fiduciary unrelated to Quantopian about whether any given investment idea, strategy, product or service described herein may be appropriate for your circumstances. All investments involve risk, including loss of principal. Quantopian makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances.

12 responses

I would just like to point out that I am not trading this algo or anything, but it has a good starting point for the sort of naive risk parity position sizing Rob Carver describes in his book!

Thank you for interesting algo. I am reading Carver’s book as well and like his approach of determining the position size based on desired volatily, underlying volatility and the forecast.
But It looks like the poor performance (negative Sharp) of the algo comes from under-investment. The algo invests only 0.008 of the capital or $8000 out of $1M.
In think the idea in Carver’s book was to volatility adjust the positions to reach the optimal volatility of the total portfolio. So if volatility of S&P is 12.8%, the algo should invest 100% into equities to reach e.g. 0.008 daily or 12.8% annualized. In your algo, the volatility of the portfolio is close to 0.

If you have found an error, please share the fixed algo!

Simon, unfortunately i couldn't make the algo work as desired :(
On the side note, the volatility adjusted position sizing, described in Carver's book works, makes more sense then applied with low correlated instrument. For now, Q doesn't allow futures trading, but one should achieve similar results with futures proxies ETFs which would replicate the diversified asset classes (equity , commodities, bonds, etc).
It would be great if someone applied the logic from Carvers' book to a trend follow algo here

The book is pretty clear, what is the issue implementing his methodology? I am using it for a couple of systems, but I can't post them.

I agree that the book explains very well the methodology, it's just me lacking python skills to implement it on Q.
Do you use it with Quantopian and ETFs as proxies or running on a different platform with futures?

One thing I'm not clear on looking at this algo is how the forecast scalars were calculated.
In the algo it says "from research".
In Robert's book he also just says "these are numbers which I've found from multiple tests" but I would love to know how I could work them out myself.

Here is Carver's explanation how to calculate the forecast scalar

I used bootstrapped simulations for most of those numbers. I shared that technique in a research notebook last year.

Thanks Simon and Maxim.

Thanks guys

I tried cloning this algo and got 2 errors that impede having the algo do a build.:
100 Warning Undefined name 'update_universe'
121 Warning Local variable 'std' is assigned to but never used

Being a neophyte here, could someone point out how to fix them?