API Tool: Time/Event Management for minute data

Hello all,
I wanted to share some useful code to make switching to, and working with, minute data easier.

The aim is to make it easy to get from one day to the next, so only your entry point during the day needs to be worried about. It tends to shorten your handle_data function too, which is nice.

This backtest was run on daily data with batch_transform, the next backtest is the same algorithm switched to minute data to give an example of making the switch.

The algorithm is an MPT tangency portfolio. It imports the 3 month t-bill rates from Quandl to be used as the risk free rate.

Dave

15
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
'''
This algorithm uses the 3-month treasury bill rate as the risk free rate
to construct a modern portfolio theory tangency portfolio.

Ref:
http://faculty.washington.edu/ezivot/econ424/portfolioTheoryMatrix.pdf
'''

import pandas as pd
import numpy as np
import numpy.linalg as la
import math
import datetime
from pytz import timezone

# data accumulator for trailing window
@batch_transform(window_length=50)
def accumulate_data(data):
return data

##############
# Math Utils #
##############

def tangent_portfolio(R, rfr=0.0, leverage=1.0):
'''
Solves the efficient frontier problem
given a risk free rate (rfr).
'''
c_inv = np.linalg.inv(R.cov())
mu = R.mean()
ones = np.ones(len(mu))
rf = rfr * ones
t = c_inv.dot(mu - rf) / ones.T.dot(c_inv.dot(mu - rf))
return pd.Series(t, index=R.columns) * leverage

########################
# Quantopian Functions #
########################

def fetcher_pre_func(df):
''' This should convert % to decimals but it makes everything blow up '''
# df['Value'] = df['Value'] / 100.0
return df

def initialize(context):
'''
Called once at the very beginning of a backtest (and live trading).
Use this method to set up any bookkeeping variables.

The context object is passed to all the other methods in your algorithm.

Parameters

context: An initialized and empty Python dictionary that has been
augmented so that properties can be accessed using dot
notation as well as the traditional bracket notation.

Returns None
'''
# Set Account Leverage
context.leverage = 1.0

# Initialize treasury yield variable
context.t_yield = 0.0

# Define Universe
context.sids = [
sid(19656), sid(19655), sid(19660), sid(19658),
sid(19654), sid(19659), sid(19662), sid(19657),
sid(19661),
]
# Use the fetcher to import the 3 month t-bill rate
fetch_csv('http://www.quandl.com/api/v1/datasets/FRED/DTB3.csv\
?request_source=python&request_version=2&sort_order=asc',
date_column='Date',
symbol='treasury_yield',
post_func=fetcher_pre_func,
date_format='%Y-%m-%d'
)
context.rebal_days = 21

def handle_data(context, data):
'''
Called when a market event occurs for any of the algorithm's
securities.

Parameters

data: A dictionary keyed by security id containing the current
state of the securities in the algo's universe.

context: The same context object from the initialize function.
Stores the up to date portfolio as well as any state
variables defined.

Returns None
'''
# Set the risk free rate to the latest treasury yield.
# The first day throws an error, hence the try.
try:
context.t_yield = data['treasury_yield']['Value']
record(treasury_yield=context.t_yield)
except KeyError as e:
log.debug('KeyError in t_yield: %s'%e)

# Get trailing window of data, return if not enough data is accumulated
datapanel = accumulate_data(data)

# Wait until batch_transform has accumulated enough bars
if datapanel is None:
return

# Check if it's a rebalance day
if context.days_traded % context.rebal_days != 0:
return

# Get target portfolio weights
prices = datapanel['price'][context.sids]
returns = prices.pct_change().dropna()
weights = tangent_portfolio(
returns,
rfr=context.t_yield,
leverage=context.leverage
)
# Place orders
for sec in context.sids:
order_target_percent(sec, weights[sec])


There was a runtime error.
4 responses

This is the version run on minute data. It also shows how you can use the EventManager class to manage several different tasks.
I'd like to get some feedback on this, there are a few issues, but it's a good start.

16
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
'''
This algo is a demo of a time management class to help deal with
dates, periodic events, and limiting the number of times per day
things can happen.

The idea is to make getting from one day to the next simple. That way

Use Cases:

- Interday strategies that only rebalance periodically.

- Capping the number of times something can happen per day

- Periodic dynamic portfolio selection from a larger universe of stocks.

- Open/close of day functions for accounting or cancelling orders.

The algorithm itself uses the 3-month treasury bill rate as the risk free rate
to construct a modern portfolio theory tangency portfolio.

Ref:
http://faculty.washington.edu/ezivot/econ424/portfolioTheoryMatrix.pdf
'''

import pandas as pd
import numpy as np
import numpy.linalg as la
import math
import datetime
from pytz import timezone

class EventManager(object):
'''
Manager for periodic events.

parameters

period: integer
number of days between events
default: 1

max_daily_hits: integer
upper limit on the number of times per day the event is triggered
default: 1

rule_func: function (returns a boolean)
decision function for timimng an intraday entry point

work_func: not implemented
the plan is to have a rule for when to execute, and a func to execute

start_date: datetime.date
initial date the event can take place
default: datetime.date(1900,1,1)

open_time: datetime.time
earliest time in the intraday window
default: 9:31

close_time: datetime.time
latest time in the intraday window
default: 15:29

tz: pytz.timezone
All datetimes converted to this timezone
default: US/Eastern
'''

def __init__(self,
period=1,
rule_func=None,
work_func=None,
max_daily_hits=1,
start_date=datetime.date(1900,1,1),
open_time=datetime.time(9,31,0),
close_time=datetime.time(16,0,0),
tz=timezone('US/Eastern')):

self.delta_t = datetime.timedelta(days=period)
self.max_daily_hits = max_daily_hits
self.remaining_hits = max_daily_hits
self.start_date = start_date
self.open_time = open_time
self.close_time = close_time
self.tz = tz
self._rule_func = rule_func
self._work_func = work_func

def __call__(self, *args, **kwargs):
'''
Entry point for the rule_func
All arguments are passed to rule_func

Note: The next trade date will not update
until max_daily_hits has been reached.

'''
now = get_datetime().astimezone(self.tz)
return False
if not self.open_for_biz(now):
return False
decision = self._rule_func(*args, **kwargs)
if decision:
self.remaining_hits -= 1
if self.remaining_hits <= 0:
return decision

self.remaining_hits = self.max_daily_hits
today = dt.date()

def open_for_biz(self, dt):
t = dt.astimezone(self.tz).time()
closed_for_day = (t > self.close_time)
open_for_day = (t >= self.open_time)
return open_for_day and not closed_for_day

####################################
# Steps for using the EventManager #
####################################

# 1. Define intraday entry rule functions.

def start_of_day_entry(dt):
'''
Enter in the first 10 min
'''
dt = dt.astimezone(timezone('US/Eastern'))
return dt.hour == 9 and dt.minute <= 40

def end_of_day_entry(dt):
'''
Enter after 3:25pm
'''
dt = dt.astimezone(timezone('US/Eastern'))
return dt.hour >= 15 and dt.minute > 50

# 2. Initialize instances with the rule functions

period = 200,
rule_func = start_of_day_entry,
max_daily_hits = 2
)

period=100,
rule_func=end_of_day_entry,
max_daily_hits=5
)
period=2,
rule_func=start_of_day_entry,
max_daily_hits=1
)
period=30,
rule_func=end_of_day_entry,
max_daily_hits=1
)
# 3. Call from handle_data

market_time = get_datetime().astimezone(timezone('US/Eastern'))
log.info('Task %s completed on %s'%(x, market_time))

##############
# Math Utils #
##############

def tangent_portfolio(R, rfr=0.0, leverage=1.0):
'''
Solves for the tangency portfolio weights
given a risk free rate (rfr).
'''
c_inv = np.linalg.inv(R.cov())
mu = R.mean()
ones = np.ones(len(mu), dtype=float)
rf = rfr * ones
t = c_inv.dot(mu - rf) / ones.T.dot(c_inv.dot(mu - rf))
return pd.Series(t, index=R.columns) * leverage

########################
# Quantopian Functions #
########################

def fetcher_pre_func(df):
''' This should convert % to decimals but it makes everything blow up '''
# df['Value'] = df['Value'] / 100.0
return df

def initialize(context):
'''
Called once at the very beginning of a backtest (and live trading).
Use this method to set up any bookkeeping variables.

The context object is passed to all the other methods in your algorithm.

Parameters

context: An initialized and empty Python dictionary that has been
augmented so that properties can be accessed using dot
notation as well as the traditional bracket notation.

Returns None
'''
# Set Account Leverage
context.leverage = 1.0

# Initialize treasury yield variable
context.t_yield = 0.0

# Define Universe
context.sids = [
sid(19656), sid(19655), sid(19660), sid(19658),
sid(19654), sid(19659), sid(19662), sid(19657),
sid(19661),
]
# Use the fetcher to import the 3 month t-bill rate
fetch_csv('http://www.quandl.com/api/v1/datasets/FRED/DTB3.csv\
?request_source=python&request_version=2&sort_order=asc',
date_column='Date',
symbol='treasury_yield',
post_func=fetcher_pre_func,
date_format='%Y-%m-%d'
)

def handle_data(context, data):
'''
Called when a market event occurs for any of the algorithm's
securities.

Parameters

data: A dictionary keyed by security id containing the current
state of the securities in the algo's universe.

context: The same context object from the initialize function.
Stores the up to date portfolio as well as any state
variables defined.

Returns None
'''
try:
# set the risk free rate to the latest treasury yield
context.t_yield = data['treasury_yield']['Value']
record(treasury_yield=context.t_yield)
except KeyError as e:
log.debug('KeyError getting t_yield: %s'%e)

# Check for entry signals from each manager
now = get_datetime()

# You get it

# Beginning of actual algorithm

# Use history instead of batch_transform.
prices = history(50, '1d', 'price')
returns = prices.pct_change().dropna()
weights = tangent_portfolio(
returns,
rfr=context.t_yield,
leverage=context.leverage
)
# Place orders
for sec in context.sids:
order_target_percent(sec, weights[sec])


There was a runtime error.

Currently the most glaring problem with this has to to with setting the maximum number of hits per day. It will not update to a new trade date until the maximum number of hits per day has been satisfied. If the max daily hits is greater than the number of times the entry_func is satisfied in one day, it will cause problems.

I plan to make another version that uses the trading calendar as well so keep your eyes peeled.

Hey David,

"Business days" is pretty close to the trading calendar for most purposes.

-Russell

import pandas as pd
from pandas.tseries.offsets import BDay

def getBDayHeld(context, data, stock):
BDayHeld = 0
exchange_time = pd.Timestamp(get_datetime()).tz_convert('US/Eastern')
try:
if context.purchaseDate.has_key(stock.symbol):
s = pd.date_range(context.purchaseDate[stock.symbol],exchange_time.date(),freq='D')
df = pd.DataFrame(0,index=s,columns=list('N'))
BDayHeld = len(df.asfreq(BDay())) - 1
else:
log.info (" in getBDayHeld context.purchaseDate.has_key MISSING " + stock.symbol)
except:
log.info (" EXCEPTION: in getBDayHeld context.purchaseDate.has_key MISSING " + stock.symbol)
return BDayHeld


Here is a version that uses the trading calendar in Zipline. It is a little simpler and the max_daily_ hits works as intended. It makes sure that the current date is between the open and close of the event date, then it calls the rule_func to get an entry decision.

16
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
'''
This is a second version that uses the trading calendar in Zipline.

If it is between the open and close time on the event date,
the passed entry_func is called for the entry decision. The
maximum daily hits works correctly for this version.
'''

import pandas as pd
import numpy as np
import numpy.linalg as la
import math
import datetime
from pytz import timezone
from zipline.utils import tradingcalendar as calendar

class EventManager(object):
'''
Manager for periodic events.

parameters

period: integer
number of business days between events
default: 1

max_daily_hits: integer
upper limit on the number of times per day the event is triggered.
(trading controls could work for this too)
default: 1

rule_func: function (returns a boolean)
decision function for timimng an intraday entry point
'''

def __init__(self,
period=1,
rule_func=None,
max_daily_hits=1):

self.period = period
self.max_daily_hits = max_daily_hits
self.remaining_hits = max_daily_hits
self._rule_func = rule_func
self.next_event_date = None
self.market_open = None
self.market_close = None

@property
def todays_index(self):
dt = calendar.canonicalize_datetime(get_datetime())

def open_and_close(self, dt):
return calendar.open_and_closes.T[dt]

def __call__(self, *args, **kwargs):
'''
Entry point for the rule_func
All arguments are passed to rule_func
'''
now = get_datetime()
dt = calendar.canonicalize_datetime(now)
if self.next_event_date is None:
self.next_event_date = dt
times = self.open_and_close(dt)
self.market_open = times['market_open']
self.market_close = times['market_close']
if now < self.market_open:
return False
if now == self.market_close:
self.set_next_event_date()
decision = self._rule_func(*args, **kwargs)
if decision:
self.remaining_hits -= 1
if self.remaining_hits <= 0:
self.set_next_event_date()
return decision

def set_next_event_date(self):
self.remaining_hits = self.max_daily_hits
idx = self.todays_index + self.period
self.next_event_date = tdays[idx]
times = self.open_and_close(self.next_event_date)
self.market_open = times['market_open']
self.market_close = times['market_close']

####################################
# Steps for using the EventManager #
####################################

# 1. Define intraday entry rule functions.

def start_of_day_entry(dt):
'''
Enter in the first 20 min
'''
dt = dt.astimezone(timezone('US/Eastern'))
return dt.hour == 9 and dt.minute <= 40

def end_of_day_entry(dt):
'''
Enter after 3:25pm
'''
dt = dt.astimezone(timezone('US/Eastern'))
return dt.hour >= 15 and dt.minute > 50

# 2. Initialize instances with the rule functions

period = 200,
rule_func = start_of_day_entry,
max_daily_hits = 1
)

period=100,
rule_func=end_of_day_entry,
max_daily_hits=1
)
period=50,
rule_func=start_of_day_entry,
max_daily_hits=3
)
period=21,
rule_func=end_of_day_entry,
max_daily_hits=1
)
# 3. Call from handle_data

market_time = get_datetime().astimezone(timezone('US/Eastern'))
log.info('Task %s completed on %s'%(x, market_time))

##############
# Math Utils #
##############

def tangent_portfolio(R, rfr=0.0, leverage=1.0):
'''
Solves for the tangency portfolio weights
given a risk free rate (rfr).
'''
c_inv = np.linalg.inv(R.cov())
mu = R.mean()
ones = np.ones(len(mu), dtype=float)
rf = rfr * ones
t = c_inv.dot(mu - rf) / ones.T.dot(c_inv.dot(mu - rf))
return pd.Series(t, index=R.columns) * leverage

########################
# Quantopian Functions #
########################

def fetcher_pre_func(df):
''' This should convert % to decimals but it makes everything blow up '''
#df['Value'] = df['Value'] / 100.0
return df

def initialize(context):
'''
Called once at the very beginning of a backtest (and live trading).
Use this method to set up any bookkeeping variables.

The context object is passed to all the other methods in your algorithm.

Parameters

context: An initialized and empty Python dictionary that has been
augmented so that properties can be accessed using dot
notation as well as the traditional bracket notation.

Returns None
'''
# Set Account Leverage
context.leverage = 1.0

# Initialize treasury yield variable
context.t_yield = 0.0

# Define Universe
context.sids = [
sid(19656), sid(19655), sid(19660), sid(19658),
sid(19654), sid(19659), sid(19662), sid(19657),
sid(19661),
]
# Use the fetcher to import the 3 month t-bill rate
fetch_csv('http://www.quandl.com/api/v1/datasets/FRED/DTB3.csv\
?request_source=python&request_version=2&sort_order=asc',
date_column='Date',
symbol='treasury_yield',
post_func=fetcher_pre_func,
date_format='%Y-%m-%d'
)

def handle_data(context, data):
'''
Called when a market event occurs for any of the algorithm's
securities.

Parameters

data: A dictionary keyed by security id containing the current
state of the securities in the algo's universe.

context: The same context object from the initialize function.
Stores the up to date portfolio as well as any state
variables defined.

Returns None
'''
try:
# set the risk free rate to the latest treasury yield
context.t_yield = data['treasury_yield']['Value']
record(treasury_yield=context.t_yield)
except KeyError as e:
log.debug('KeyError getting t_yield: %s'%e)

# Check for entry signals from each manager
now = get_datetime()

# You get it

# Beginning of actual algorithm

# Use history instead of batch_transform.
prices = history(252, '1d', 'price')
returns = prices.pct_change().dropna()
weights = tangent_portfolio(
returns,
rfr=context.t_yield,
leverage=context.leverage
)
# Place orders
for sec in context.sids:
order_target_percent(sec, weights[sec])


There was a runtime error.