Back to Community
API Tool: Time/Event Management for minute data

Hello all,
I wanted to share some useful code to make switching to, and working with, minute data easier.

The aim is to make it easy to get from one day to the next, so only your entry point during the day needs to be worried about. It tends to shorten your handle_data function too, which is nice.

This backtest was run on daily data with batch_transform, the next backtest is the same algorithm switched to minute data to give an example of making the switch.

The algorithm is an MPT tangency portfolio. It imports the 3 month t-bill rates from Quandl to be used as the risk free rate.

Dave

Clone Algorithm
15
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
'''
This algorithm uses the 3-month treasury bill rate as the risk free rate 
to construct a modern portfolio theory tangency portfolio.

Ref:
http://faculty.washington.edu/ezivot/econ424/portfolioTheoryMatrix.pdf
'''

import pandas as pd
import numpy as np 
import numpy.linalg as la
import math
import datetime
from pytz import timezone



# data accumulator for trailing window
@batch_transform(window_length=50)
def accumulate_data(data):
    return data


##############
# Math Utils #
##############

def tangent_portfolio(R, rfr=0.0, leverage=1.0):
    '''
    Solves the efficient frontier problem 
    given a risk free rate (rfr).
    '''
    c_inv = np.linalg.inv(R.cov())
    mu = R.mean()
    ones = np.ones(len(mu))
    rf = rfr * ones
    t = c_inv.dot(mu - rf) / ones.T.dot(c_inv.dot(mu - rf))
    return pd.Series(t, index=R.columns) * leverage


########################
# Quantopian Functions #
########################

def fetcher_pre_func(df):
    ''' This should convert % to decimals but it makes everything blow up '''
    # df['Value'] = df['Value'] / 100.0
    return df


def initialize(context):
    '''
    Called once at the very beginning of a backtest (and live trading). 
    Use this method to set up any bookkeeping variables.
    
    The context object is passed to all the other methods in your algorithm.

    Parameters

    context: An initialized and empty Python dictionary that has been 
             augmented so that properties can be accessed using dot 
             notation as well as the traditional bracket notation.
    
    Returns None
    '''
    # Set Account Leverage
    context.leverage = 1.0
    
    # Initialize treasury yield variable
    context.t_yield = 0.0
    
    # Define Universe
    context.sids = [ 
        sid(19656), sid(19655), sid(19660), sid(19658), 
        sid(19654), sid(19659), sid(19662), sid(19657),
        sid(19661),
    ]
    # Use the fetcher to import the 3 month t-bill rate
    fetch_csv('http://www.quandl.com/api/v1/datasets/FRED/DTB3.csv\
?request_source=python&request_version=2&sort_order=asc',
        date_column='Date',
        symbol='treasury_yield',
        post_func=fetcher_pre_func,
        date_format='%Y-%m-%d'
    )
    context.days_traded = 0
    context.rebal_days = 21
    

def handle_data(context, data):
    '''
    Called when a market event occurs for any of the algorithm's 
    securities. 

    Parameters

    data: A dictionary keyed by security id containing the current 
          state of the securities in the algo's universe.

    context: The same context object from the initialize function.
             Stores the up to date portfolio as well as any state 
             variables defined.

    Returns None
    '''
    # Set the risk free rate to the latest treasury yield. 
    # The first day throws an error, hence the try.
    try:
        context.t_yield = data['treasury_yield']['Value']
        record(treasury_yield=context.t_yield)
    except KeyError as e:
        log.debug('KeyError in t_yield: %s'%e)
        
        
    # Get trailing window of data, return if not enough data is accumulated
    datapanel = accumulate_data(data)
    
    # Wait until batch_transform has accumulated enough bars
    if datapanel is None:
        return 
    
    # Check if it's a rebalance day
    if context.days_traded % context.rebal_days != 0:
        context.days_traded += 1
        return
    context.days_traded += 1
    
    # Get target portfolio weights 
    prices = datapanel['price'][context.sids]
    returns = prices.pct_change().dropna()
    weights = tangent_portfolio(
        returns, 
        rfr=context.t_yield,
        leverage=context.leverage
    )
    # Place orders
    for sec in context.sids:
        order_target_percent(sec, weights[sec])

There was a runtime error.
4 responses

This is the version run on minute data. It also shows how you can use the EventManager class to manage several different tasks.
I'd like to get some feedback on this, there are a few issues, but it's a good start.

Clone Algorithm
16
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
'''
This algo is a demo of a time management class to help deal with 
dates, periodic events, and limiting the number of times per day
things can happen.


The idea is to make getting from one day to the next simple. That way
you only need to worry about an intraday entry/exit strategy.


Use Cases:

- Interday strategies that only rebalance periodically.

- Capping the number of times something can happen per day

- Periodic dynamic portfolio selection from a larger universe of stocks.

- Open/close of day functions for accounting or cancelling orders.



The algorithm itself uses the 3-month treasury bill rate as the risk free rate 
to construct a modern portfolio theory tangency portfolio.

Ref:
http://faculty.washington.edu/ezivot/econ424/portfolioTheoryMatrix.pdf
'''

import pandas as pd
import numpy as np 
import numpy.linalg as la
import math
import datetime
from pytz import timezone



class EventManager(object):
    '''
    Manager for periodic events.

    parameters

    period: integer            
        number of days between events
        default: 1

    max_daily_hits: integer
        upper limit on the number of times per day the event is triggered
        default: 1

    rule_func: function (returns a boolean)
        decision function for timimng an intraday entry point

    work_func: not implemented
        the plan is to have a rule for when to execute, and a func to execute

    start_date: datetime.date
        initial date the event can take place
        default: datetime.date(1900,1,1)

    open_time: datetime.time
        earliest time in the intraday window 
        default: 9:31
    
    close_time: datetime.time
        latest time in the intraday window 
        default: 15:29

    tz: pytz.timezone
        All datetimes converted to this timezone
        default: US/Eastern
    '''

    def __init__(self, 
                 period=1,
                 rule_func=None,
                 work_func=None,
                 max_daily_hits=1,
                 start_date=datetime.date(1900,1,1),
                 open_time=datetime.time(9,31,0),
                 close_time=datetime.time(16,0,0),
                 tz=timezone('US/Eastern')):

        self.delta_t = datetime.timedelta(days=period)
        self.max_daily_hits = max_daily_hits
        self.remaining_hits = max_daily_hits
        self.start_date = start_date
        self.open_time = open_time
        self.close_time = close_time
        self.next_trade_date = self.start_date
        self.tz = tz
        self._rule_func = rule_func
        self._work_func = work_func

    def __call__(self, *args, **kwargs):
        '''
        Entry point for the rule_func
        All arguments are passed to rule_func

        Note: The next trade date will not update
              until max_daily_hits has been reached.

        '''
        now = get_datetime().astimezone(self.tz)
        if now.date() < self.next_trade_date:
            return False
        if not self.open_for_biz(now):
            return False
        decision = self._rule_func(*args, **kwargs)
        if decision:
            self.remaining_hits -= 1
            if self.remaining_hits <= 0:
                self.set_next_trade_date(now)
        return decision
    
    def set_next_trade_date(self, dt):
        self.remaining_hits = self.max_daily_hits
        today = dt.date()
        self.next_trade_date = today + self.delta_t

    def open_for_biz(self, dt):
        t = dt.astimezone(self.tz).time()
        closed_for_day = (t > self.close_time)
        open_for_day = (t >= self.open_time)
        return open_for_day and not closed_for_day
    
####################################
# Steps for using the EventManager #
####################################

# 1. Define intraday entry rule functions.

def start_of_day_entry(dt):
    '''
    Enter in the first 10 min  
    '''
    dt = dt.astimezone(timezone('US/Eastern'))
    return dt.hour == 9 and dt.minute <= 40
    
def end_of_day_entry(dt):
    '''
    Enter after 3:25pm  
    '''
    dt = dt.astimezone(timezone('US/Eastern'))
    return dt.hour >= 15 and dt.minute > 50

# 2. Initialize instances with the rule functions

task_1_manager  = EventManager(
    period = 200, 
    rule_func = start_of_day_entry, 
    max_daily_hits = 2
)

task_2_manager  = EventManager(
    period=100, 
    rule_func=end_of_day_entry, 
    max_daily_hits=5
)
task_3_manager  = EventManager(
    period=2, 
    rule_func=start_of_day_entry, 
    max_daily_hits=1
)
task_N_manager  = EventManager(
    period=30,
    rule_func=end_of_day_entry, 
    max_daily_hits=1
)
# 3. Call from handle_data



def task_printer(x):
    market_time = get_datetime().astimezone(timezone('US/Eastern'))
    log.info('Task %s completed on %s'%(x, market_time))


##############
# Math Utils #
##############

def tangent_portfolio(R, rfr=0.0, leverage=1.0):
    '''
    Solves for the tangency portfolio weights
    given a risk free rate (rfr).
    '''
    c_inv = np.linalg.inv(R.cov())
    mu = R.mean()
    ones = np.ones(len(mu), dtype=float)
    rf = rfr * ones
    t = c_inv.dot(mu - rf) / ones.T.dot(c_inv.dot(mu - rf))
    return pd.Series(t, index=R.columns) * leverage

########################
# Quantopian Functions #
########################

def fetcher_pre_func(df):
    ''' This should convert % to decimals but it makes everything blow up '''
    # df['Value'] = df['Value'] / 100.0
    return df


def initialize(context):
    '''
    Called once at the very beginning of a backtest (and live trading). 
    Use this method to set up any bookkeeping variables.
    
    The context object is passed to all the other methods in your algorithm.

    Parameters

    context: An initialized and empty Python dictionary that has been 
             augmented so that properties can be accessed using dot 
             notation as well as the traditional bracket notation.
    
    Returns None
    '''
    # Set Account Leverage
    context.leverage = 1.0
    
    # Initialize treasury yield variable
    context.t_yield = 0.0
    
    # Define Universe
    context.sids = [ 
        sid(19656), sid(19655), sid(19660), sid(19658), 
        sid(19654), sid(19659), sid(19662), sid(19657),
        sid(19661),
    ]
    # Use the fetcher to import the 3 month t-bill rate
    fetch_csv('http://www.quandl.com/api/v1/datasets/FRED/DTB3.csv\
?request_source=python&request_version=2&sort_order=asc',
        date_column='Date',
        symbol='treasury_yield',
        post_func=fetcher_pre_func,
        date_format='%Y-%m-%d'
    )

def handle_data(context, data):
    '''
    Called when a market event occurs for any of the algorithm's 
    securities. 

    Parameters

    data: A dictionary keyed by security id containing the current 
          state of the securities in the algo's universe.

    context: The same context object from the initialize function.
             Stores the up to date portfolio as well as any state 
             variables defined.

    Returns None
    '''
    try:
        # set the risk free rate to the latest treasury yield
        context.t_yield = data['treasury_yield']['Value']
        record(treasury_yield=context.t_yield)
    except KeyError as e:
        log.debug('KeyError getting t_yield: %s'%e)
        
        
    # Check for entry signals from each manager
    now = get_datetime()
    if task_1_manager(now):
        # do task 1
        task_printer('1')
        
    if task_2_manager(now):
        # do task 2
        task_printer('2')
        
    if task_3_manager(now):
        # do task 3
        task_printer('3')
        
    if task_N_manager(now):
        # You get it
        task_printer('N')
        
        
        # Beginning of actual algorithm
        
        # Use history instead of batch_transform.
        prices = history(50, '1d', 'price')
        returns = prices.pct_change().dropna()
        weights = tangent_portfolio(
            returns, 
            rfr=context.t_yield, 
            leverage=context.leverage
        )
        # Place orders
        for sec in context.sids:
            order_target_percent(sec, weights[sec])
        
        
     
There was a runtime error.

Currently the most glaring problem with this has to to with setting the maximum number of hits per day. It will not update to a new trade date until the maximum number of hits per day has been satisfied. If the max daily hits is greater than the number of times the entry_func is satisfied in one day, it will cause problems.

I plan to make another version that uses the trading calendar as well so keep your eyes peeled.

Hey David,

"Business days" is pretty close to the trading calendar for most purposes.

-Russell

import pandas as pd  
from pandas.tseries.offsets import BDay

def getBDayHeld(context, data, stock):  
    BDayHeld = 0  
    exchange_time = pd.Timestamp(get_datetime()).tz_convert('US/Eastern')  
    try:  
        if context.purchaseDate.has_key(stock.symbol):  
            s = pd.date_range(context.purchaseDate[stock.symbol],exchange_time.date(),freq='D')  
            df = pd.DataFrame(0,index=s,columns=list('N'))  
            BDayHeld = len(df.asfreq(BDay())) - 1  
        else:  
            log.info (" in getBDayHeld context.purchaseDate.has_key MISSING " + stock.symbol)  
    except:  
        log.info (" EXCEPTION: in getBDayHeld context.purchaseDate.has_key MISSING " + stock.symbol)  
    return BDayHeld  

Here is a version that uses the trading calendar in Zipline. It is a little simpler and the max_daily_ hits works as intended. It makes sure that the current date is between the open and close of the event date, then it calls the rule_func to get an entry decision.

Clone Algorithm
16
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
'''
This is a second version that uses the trading calendar in Zipline.

If it is between the open and close time on the event date, 
the passed entry_func is called for the entry decision. The
maximum daily hits works correctly for this version. 
'''

import pandas as pd
import numpy as np 
import numpy.linalg as la
import math
import datetime
from pytz import timezone
from zipline.utils import tradingcalendar as calendar


class EventManager(object):
    '''
    Manager for periodic events.

    parameters

    period: integer            
        number of business days between events
        default: 1

    max_daily_hits: integer
        upper limit on the number of times per day the event is triggered.
        (trading controls could work for this too)
        default: 1

    rule_func: function (returns a boolean)
        decision function for timimng an intraday entry point
    '''

    def __init__(self, 
                 period=1,
                 rule_func=None,
                 max_daily_hits=1):
        
        self.period = period
        self.max_daily_hits = max_daily_hits
        self.remaining_hits = max_daily_hits
        self._rule_func = rule_func
        self.next_event_date = None
        self.market_open = None
        self.market_close = None
    
    @property
    def todays_index(self):
        dt = calendar.canonicalize_datetime(get_datetime())
        return calendar.trading_days.searchsorted(dt)
    
    def open_and_close(self, dt):
        return calendar.open_and_closes.T[dt]

    def __call__(self, *args, **kwargs):
        '''
        Entry point for the rule_func
        All arguments are passed to rule_func
        '''
        now = get_datetime()
        dt = calendar.canonicalize_datetime(now)
        if self.next_event_date is None:
            self.next_event_date = dt
            times = self.open_and_close(dt)
            self.market_open = times['market_open'] 
            self.market_close = times['market_close'] 
        if now < self.market_open:
            return False
        if now == self.market_close:
            self.set_next_event_date()
        decision = self._rule_func(*args, **kwargs)
        if decision:
            self.remaining_hits -= 1
            if self.remaining_hits <= 0:
                self.set_next_event_date()
        return decision
    
    def set_next_event_date(self):
        self.remaining_hits = self.max_daily_hits
        tdays = calendar.trading_days
        idx = self.todays_index + self.period
        self.next_event_date = tdays[idx]
        times = self.open_and_close(self.next_event_date)
        self.market_open = times['market_open']
        self.market_close = times['market_close']
        
    
####################################
# Steps for using the EventManager #
####################################

# 1. Define intraday entry rule functions.

def start_of_day_entry(dt):
    '''
    Enter in the first 20 min  
    '''
    dt = dt.astimezone(timezone('US/Eastern'))
    return dt.hour == 9 and dt.minute <= 40
    
def end_of_day_entry(dt):
    '''
    Enter after 3:25pm  
    '''
    dt = dt.astimezone(timezone('US/Eastern'))
    return dt.hour >= 15 and dt.minute > 50

# 2. Initialize instances with the rule functions

task_1_manager  = EventManager(
    period = 200, 
    rule_func = start_of_day_entry, 
    max_daily_hits = 1
)

task_2_manager  = EventManager(
    period=100, 
    rule_func=end_of_day_entry, 
    max_daily_hits=1
)
task_3_manager  = EventManager(
    period=50, 
    rule_func=start_of_day_entry, 
    max_daily_hits=3
)
task_N_manager  = EventManager(
    period=21,
    rule_func=end_of_day_entry, 
    max_daily_hits=1
)
# 3. Call from handle_data



def task_printer(x):
    market_time = get_datetime().astimezone(timezone('US/Eastern'))
    log.info('Task %s completed on %s'%(x, market_time))


##############
# Math Utils #
##############

def tangent_portfolio(R, rfr=0.0, leverage=1.0):
    '''
    Solves for the tangency portfolio weights
    given a risk free rate (rfr).
    '''
    c_inv = np.linalg.inv(R.cov())
    mu = R.mean()
    ones = np.ones(len(mu), dtype=float)
    rf = rfr * ones
    t = c_inv.dot(mu - rf) / ones.T.dot(c_inv.dot(mu - rf))
    return pd.Series(t, index=R.columns) * leverage

########################
# Quantopian Functions #
########################

def fetcher_pre_func(df):
    ''' This should convert % to decimals but it makes everything blow up '''
    #df['Value'] = df['Value'] / 100.0
    return df


def initialize(context):
    '''
    Called once at the very beginning of a backtest (and live trading). 
    Use this method to set up any bookkeeping variables.
    
    The context object is passed to all the other methods in your algorithm.

    Parameters

    context: An initialized and empty Python dictionary that has been 
             augmented so that properties can be accessed using dot 
             notation as well as the traditional bracket notation.
    
    Returns None
    '''
    # Set Account Leverage
    context.leverage = 1.0
    
    # Initialize treasury yield variable
    context.t_yield = 0.0
    
    # Define Universe
    context.sids = [ 
        sid(19656), sid(19655), sid(19660), sid(19658), 
        sid(19654), sid(19659), sid(19662), sid(19657),
        sid(19661),
    ]
    # Use the fetcher to import the 3 month t-bill rate
    fetch_csv('http://www.quandl.com/api/v1/datasets/FRED/DTB3.csv\
?request_source=python&request_version=2&sort_order=asc',
        date_column='Date',
        symbol='treasury_yield',
        post_func=fetcher_pre_func,
        date_format='%Y-%m-%d'
    )

def handle_data(context, data):
    '''
    Called when a market event occurs for any of the algorithm's 
    securities. 

    Parameters

    data: A dictionary keyed by security id containing the current 
          state of the securities in the algo's universe.

    context: The same context object from the initialize function.
             Stores the up to date portfolio as well as any state 
             variables defined.

    Returns None
    '''
    try:
        # set the risk free rate to the latest treasury yield
        context.t_yield = data['treasury_yield']['Value']
        record(treasury_yield=context.t_yield)
    except KeyError as e:
        log.debug('KeyError getting t_yield: %s'%e)
        
    # Check for entry signals from each manager
    now = get_datetime()
    if task_1_manager(now):
        # do task 1
        task_printer('1')
        
    if task_2_manager(now):
        # do task 2
        task_printer('2')
        
    if task_3_manager(now):
        # do task 3
        task_printer('3')
        
    if task_N_manager(now):
        # You get it
        task_printer('N')
        
        # Beginning of actual algorithm
        
        # Use history instead of batch_transform.
        prices = history(252, '1d', 'price')
        returns = prices.pct_change().dropna()
        weights = tangent_portfolio(
            returns, 
            rfr=context.t_yield, 
            leverage=context.leverage
        )
        # Place orders
        for sec in context.sids:
            order_target_percent(sec, weights[sec])
        
        
     
There was a runtime error.