Critical Line Algorithm for Portfolio Optimization

The critical line algorithm (CLA) is a portfolio optimization routine developed by Harry Markowitz. I stumbled on an open sourced version of the routine that happened to be done in Python, so I got it working in Quantopian.

This paper gives a pretty thorough step-by-step explanation of the algorithm. This version uses the weights that maximize the sharpe ratio, but the algorithm produces several sets of weights along the efficient frontier curve.

David

257
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))

# Constrained to rebalance once/week
# Min/Max weights that can be assigned to a single security
context.lower_bound = -0.20
context.upper_bound = 1.0

# Will be called on every trade event for the securities you specify.
def handle_data(context, data):

now = get_datetime()
return

prices = history(300, '1d', 'price').dropna(axis=1)

N = len(prices.columns)
R = np.log(prices).diff().dropna()
mean = R.mean().reshape((N, 1))

lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1))
upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1))
covar = R.cov().values

# Try statement due to occasional singular matrices.
try:
cla = CLA(mean, covar, lower_bounds, upper_bounds)
cla.solve()

# Maximum sharpe ratio and the corresponding wieghts
sharpe, w_sharpe = cla.getMaxSR()
w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
weights = {}
for stock in data:
if stock in w_sharpe.index:
weights[stock.symbol] = w_sharpe[stock]
order_target_percent(stock, w_sharpe[stock])
else:
order_target(stock, 0)
log.info("\n%s"%pd.Series(weights))
record(sharpe=sharpe)

except Exception as e:
# Reset the trade date to try again on the next bar
log.debug(e)

class CLA:
def __init__(self,mean,covar,lB,uB):
# Initialize the class
self.mean=mean
self.covar=covar
self.lB=lB
self.uB=uB
self.w=[] # solution
self.l=[] # lambdas
self.g=[] # gammas
self.f=[] # free weights
#---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f,w=self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
#1) case a): Bound one free weight
l_in=None
if len(f)>1:
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0
for i in f:
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
if l>l_in:l_in,i_in,bi_in=l,i,bi
j+=1
#2) case b): Free one bounded weight
l_out=None
if len(f)<self.mean.shape[0]:
b=self.getB(f)
for i in b:
covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
covarF_inv=np.linalg.inv(covarF)
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
self.w[-1][i])
if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i
if (l_in==None or l_in<0) and (l_out==None or l_out<0):
#3) compute minimum variance solution
self.l.append(0)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
meanF=np.zeros(meanF.shape)
else:
#4) decide lambda
if l_in>l_out:
self.l.append(l_in)
f.remove(i_in)
w[i_in]=bi_in # set value at the correct boundary
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
#5) compute solution vector
wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
for i in range(len(f)):w[f[i]]=wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.l[-1]==0:break
#6) Purge turning points
self.purgeNumErr(10e-10)
self.purgeExcess()
#---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
#1) Form structured array
a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
#2) Sort structured array
b=np.sort(a,order='mu')
#3) First free weight
i,w=b.shape[0],np.copy(self.lB)
while sum(w)<1:
i-=1
w[b[i][0]]=self.uB[b[i][0]]
w[b[i][0]]+=1-sum(w)
return [b[i][0]],w
#---------------------------------------------------------------
def computeBi(self,c,bi):
if c>0:
bi=bi[1][0]
if c<0:
bi=bi[0][0]
return bi
#---------------------------------------------------------------
def computeW(self,covarF_inv,covarFB,meanF,wB):
#1) compute gamma
onesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
if wB==None:
g,w1=float(-self.l[-1]*g1/g2+1/g2),0
else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T,wB)
g4=np.dot(covarF_inv,covarFB)
w1=np.dot(g4,wB)
g4=np.dot(onesF.T,w1)
g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
#2) compute weights
w2=np.dot(covarF_inv,onesF)
w3=np.dot(covarF_inv,meanF)
return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
#1) C
onesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
c2=np.dot(covarF_inv,meanF)
c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
c4=np.dot(covarF_inv,onesF)
c=-c1*c2[i]+c3*c4[i]
if c==0:return None,None
#2) bi
if type(bi)==list:bi=self.computeBi(c,bi)
#3) Lambda
if wB==None:
# All free assets
return float((c4[i]-c1*bi)/c),bi
else:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T,wB)
l2=np.dot(covarF_inv,covarFB)
l3=np.dot(l2,wB)
l2=np.dot(onesF.T,l3)
return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
def getMatrices(self,f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF=self.reduceMatrix(self.covar,f,f)
meanF=self.reduceMatrix(self.mean,f,[0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar,f,b)
wB=self.reduceMatrix(self.w[-1],b,[0])
return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
def getB(self,f):
return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
def diffLists(self,list1,list2):
return list(set(list1)-set(list2))
#---------------------------------------------------------------
def reduceMatrix(self,matrix,listX,listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX)==0 or len(listY)==0:return
matrix_=matrix[:,listY[0]:listY[0]+1]
for i in listY[1:]:
a=matrix[:,i:i+1]
matrix_=np.append(matrix_,a,1)
matrix__=matrix_[listX[0]:listX[0]+1,:]
for i in listX[1:]:
a=matrix_[i:i+1,:]
matrix__=np.append(matrix__,a,0)
return matrix__
#---------------------------------------------------------------
def purgeNumErr(self,tol):
# Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
i=0
while True:
flag=False
if i==len(self.w):break
if abs(sum(self.w[i])-1)>tol:
flag=True
else:
for j in range(self.w[i].shape[0]):
if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
flag=True;break
if flag==True:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
else:
i+=1
return
#---------------------------------------------------------------
def purgeExcess(self):
# Remove violations of the convex hull
i,repeat=0,False
while True:
if repeat==False:i+=1
if i==len(self.w)-1:break
w=self.w[i]
mu=np.dot(w.T,self.mean)[0,0]
j,repeat=i+1,False
while True:
if j==len(self.w):break
w=self.w[j]
mu_=np.dot(w.T,self.mean)[0,0]
if mu<mu_:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
repeat=True
break
else:
j+=1
return
#---------------------------------------------------------------
def getMinVar(self):
# Get the minimum variance solution
var=[]
for w in self.w:
a=np.dot(np.dot(w.T,self.covar),w)
var.append(a)
return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
def getMaxSR(self):
# Get the max Sharpe ratio portfolio
#1) Compute the local max SR portfolio between any two neighbor turning points
w_sr,sr=[],[]
for i in range(len(self.w)-1):
w0=np.copy(self.w[i])
w1=np.copy(self.w[i+1])
kargs={'minimum':False,'args':(w0,w1)}
a,b=self.goldenSection(self.evalSR,0,1,**kargs)
w_sr.append(a*w0+(1-a)*w1)
sr.append(b)
return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
def evalSR(self,a,w0,w1):
# Evaluate SR of the portfolio within the convex combination
w=a*w0+(1-a)*w1
b=np.dot(w.T,self.mean)[0,0]
c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
return b/c
#---------------------------------------------------------------
def goldenSection(self,obj,a,b,**kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
from math import log,ceil
tol,sign,args=1.0e-9,1,None
if 'minimum' in kargs and kargs['minimum']==False:sign=-1
if 'args' in kargs:args=kargs['args']
numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
r=0.618033989
c=1.0-r
# Initialize
x1=r*a+c*b;x2=c*a+r*b
f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
# Loop
for i in range(numIter):
if f1>f2:
a=x1
x1=x2;f1=f2
x2=c*a+r*b;f2=sign*obj(x2,*args)
else:
b=x2
x2=x1;f2=f1
x1=r*a+c*b;f1=sign*obj(x1,*args)
if f1<f2:return x1,sign*f1
else:return x2,sign*f2
#---------------------------------------------------------------
def efFrontier(self,points):
# Get the efficient frontier
mu,sigma,weights=[],[],[]
a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
b=range(len(self.w)-1)
for i in b:
w0,w1=self.w[i],self.w[i+1]
if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
for j in a:
w=w1*j+(1-j)*w0
weights.append(np.copy(w))
mu.append(np.dot(w.T,self.mean)[0,0])
sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
return mu,sigma,weights

There was a runtime error.
8 responses

CLA on the sector etf plus TLT

202
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
# David Edwards  CLA  on the sector etf plus TLT
# https://www.quantopian.com/posts/critical-line-algorithm-for-portfolio-optimization

"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
#set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
context.securities =symbols( 'tlt','xlf', 'xle', 'xlu', 'xlk', 'xlb', 'xlp', 'xly','xli', 'xlv')
# Constrained to rebalance once/week
# Min/Max weights that can be assigned to a single security
context.lower_bound = -0.20
context.upper_bound = 1.0

# Will be called on every trade event for the securities you specify.
def handle_data(context, data):

now = get_datetime()
return

prices = history(80, '1d', 'price').dropna(axis=1)

N = len(prices.columns)
R = np.log(prices).diff().dropna()
mean = R.mean().reshape((N, 1))

lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1))
upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1))
covar = R.cov().values

# Try statement due to occasional singular matrices.
try:
cla = CLA(mean, covar, lower_bounds, upper_bounds)
cla.solve()

# Maximum sharpe ratio and the corresponding wieghts
sharpe, w_sharpe = cla.getMaxSR()
w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
weights = {}
for stock in data:
if stock in w_sharpe.index:
weights[stock.symbol] = w_sharpe[stock]
order_target_percent(stock, w_sharpe[stock])
else:
order_target(stock, 0)
log.info("\n%s"%pd.Series(weights))
record(sharpe=sharpe)

except Exception as e:
# Reset the trade date to try again on the next bar
log.debug(e)

class CLA:
def __init__(self,mean,covar,lB,uB):
# Initialize the class
self.mean=mean
self.covar=covar
self.lB=lB
self.uB=uB
self.w=[] # solution
self.l=[] # lambdas
self.g=[] # gammas
self.f=[] # free weights
#---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f,w=self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
#1) case a): Bound one free weight
l_in=None
if len(f)>1:
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0
for i in f:
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
if l>l_in:l_in,i_in,bi_in=l,i,bi
j+=1
#2) case b): Free one bounded weight
l_out=None
if len(f)<self.mean.shape[0]:
b=self.getB(f)
for i in b:
covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
covarF_inv=np.linalg.inv(covarF)
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
self.w[-1][i])
if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i
if (l_in==None or l_in<0) and (l_out==None or l_out<0):
#3) compute minimum variance solution
self.l.append(0)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
meanF=np.zeros(meanF.shape)
else:
#4) decide lambda
if l_in>l_out:
self.l.append(l_in)
f.remove(i_in)
w[i_in]=bi_in # set value at the correct boundary
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
#5) compute solution vector
wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
for i in range(len(f)):w[f[i]]=wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.l[-1]==0:break
#6) Purge turning points
self.purgeNumErr(10e-10)
self.purgeExcess()
#---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
#1) Form structured array
a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
#2) Sort structured array
b=np.sort(a,order='mu')
#3) First free weight
i,w=b.shape[0],np.copy(self.lB)
while sum(w)<1:
i-=1
w[b[i][0]]=self.uB[b[i][0]]
w[b[i][0]]+=1-sum(w)
return [b[i][0]],w
#---------------------------------------------------------------
def computeBi(self,c,bi):
if c>0:
bi=bi[1][0]
if c<0:
bi=bi[0][0]
return bi
#---------------------------------------------------------------
def computeW(self,covarF_inv,covarFB,meanF,wB):
#1) compute gamma
onesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
if wB==None:
g,w1=float(-self.l[-1]*g1/g2+1/g2),0
else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T,wB)
g4=np.dot(covarF_inv,covarFB)
w1=np.dot(g4,wB)
g4=np.dot(onesF.T,w1)
g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
#2) compute weights
w2=np.dot(covarF_inv,onesF)
w3=np.dot(covarF_inv,meanF)
return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
#1) C
onesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
c2=np.dot(covarF_inv,meanF)
c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
c4=np.dot(covarF_inv,onesF)
c=-c1*c2[i]+c3*c4[i]
if c==0:return None,None
#2) bi
if type(bi)==list:bi=self.computeBi(c,bi)
#3) Lambda
if wB==None:
# All free assets
return float((c4[i]-c1*bi)/c),bi
else:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T,wB)
l2=np.dot(covarF_inv,covarFB)
l3=np.dot(l2,wB)
l2=np.dot(onesF.T,l3)
return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
def getMatrices(self,f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF=self.reduceMatrix(self.covar,f,f)
meanF=self.reduceMatrix(self.mean,f,[0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar,f,b)
wB=self.reduceMatrix(self.w[-1],b,[0])
return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
def getB(self,f):
return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
def diffLists(self,list1,list2):
return list(set(list1)-set(list2))
#---------------------------------------------------------------
def reduceMatrix(self,matrix,listX,listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX)==0 or len(listY)==0:return
matrix_=matrix[:,listY[0]:listY[0]+1]
for i in listY[1:]:
a=matrix[:,i:i+1]
matrix_=np.append(matrix_,a,1)
matrix__=matrix_[listX[0]:listX[0]+1,:]
for i in listX[1:]:
a=matrix_[i:i+1,:]
matrix__=np.append(matrix__,a,0)
return matrix__
#---------------------------------------------------------------
def purgeNumErr(self,tol):
# Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
i=0
while True:
flag=False
if i==len(self.w):break
if abs(sum(self.w[i])-1)>tol:
flag=True
else:
for j in range(self.w[i].shape[0]):
if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
flag=True;break
if flag==True:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
else:
i+=1
return
#---------------------------------------------------------------
def purgeExcess(self):
# Remove violations of the convex hull
i,repeat=0,False
while True:
if repeat==False:i+=1
if i==len(self.w)-1:break
w=self.w[i]
mu=np.dot(w.T,self.mean)[0,0]
j,repeat=i+1,False
while True:
if j==len(self.w):break
w=self.w[j]
mu_=np.dot(w.T,self.mean)[0,0]
if mu<mu_:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
repeat=True
break
else:
j+=1
return
#---------------------------------------------------------------
def getMinVar(self):
# Get the minimum variance solution
var=[]
for w in self.w:
a=np.dot(np.dot(w.T,self.covar),w)
var.append(a)
return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
def getMaxSR(self):
# Get the max Sharpe ratio portfolio
#1) Compute the local max SR portfolio between any two neighbor turning points
w_sr,sr=[],[]
for i in range(len(self.w)-1):
w0=np.copy(self.w[i])
w1=np.copy(self.w[i+1])
kargs={'minimum':False,'args':(w0,w1)}
a,b=self.goldenSection(self.evalSR,0,1,**kargs)
w_sr.append(a*w0+(1-a)*w1)
sr.append(b)
return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
def evalSR(self,a,w0,w1):
# Evaluate SR of the portfolio within the convex combination
w=a*w0+(1-a)*w1
b=np.dot(w.T,self.mean)[0,0]
c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
return b/c
#---------------------------------------------------------------
def goldenSection(self,obj,a,b,**kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
from math import log,ceil
tol,sign,args=1.0e-9,1,None
if 'minimum' in kargs and kargs['minimum']==False:sign=-1
if 'args' in kargs:args=kargs['args']
numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
r=0.618033989
c=1.0-r
# Initialize
x1=r*a+c*b;x2=c*a+r*b
f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
# Loop
for i in range(numIter):
if f1>f2:
a=x1
x1=x2;f1=f2
x2=c*a+r*b;f2=sign*obj(x2,*args)
else:
b=x2
x2=x1;f2=f1
x1=r*a+c*b;f1=sign*obj(x1,*args)
if f1<f2:return x1,sign*f1
else:return x2,sign*f2
#---------------------------------------------------------------
def efFrontier(self,points):
# Get the efficient frontier
mu,sigma,weights=[],[],[]
a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
b=range(len(self.w)-1)
for i in b:
w0,w1=self.w[i],self.w[i+1]
if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
for j in a:
w=w1*j+(1-j)*w0
weights.append(np.copy(w))
mu.append(np.dot(w.T,self.mean)[0,0])
sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
return mu,sigma,weights

There was a runtime error.

Looks pretty solid Vladimir, I find that these optimization routines work much better with fewer assets, sector ETFs are usually a go to for me as well.

Look long term.

13
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
# David Edwards  CLA  on the sector etf plus TLT
# https://www.quantopian.com/posts/critical-line-algorithm-for-portfolio-optimization

"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
#set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
context.securities =symbols( 'tlt','xlf', 'xle', 'xlu', 'xlk', 'xlb', 'xlp', 'xly','xli', 'xlv')
# Constrained to rebalance once/week
# Min/Max weights that can be assigned to a single security
context.lower_bound = -0.20
context.upper_bound = 1.0

# Will be called on every trade event for the securities you specify.
def handle_data(context, data):

now = get_datetime()
return

prices = history(80, '1d', 'price').dropna(axis=1)

N = len(prices.columns)
R = np.log(prices).diff().dropna()
mean = R.mean().reshape((N, 1))

lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1))
upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1))
covar = R.cov().values

# Try statement due to occasional singular matrices.
try:
cla = CLA(mean, covar, lower_bounds, upper_bounds)
cla.solve()

# Maximum sharpe ratio and the corresponding wieghts
sharpe, w_sharpe = cla.getMaxSR()
w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
weights = {}
for stock in data:
if stock in w_sharpe.index:
weights[stock.symbol] = w_sharpe[stock]
order_target_percent(stock, w_sharpe[stock])
else:
order_target(stock, 0)
log.info("\n%s"%pd.Series(weights))
record(sharpe=sharpe)

except Exception as e:
# Reset the trade date to try again on the next bar
log.debug(e)

class CLA:
def __init__(self,mean,covar,lB,uB):
# Initialize the class
self.mean=mean
self.covar=covar
self.lB=lB
self.uB=uB
self.w=[] # solution
self.l=[] # lambdas
self.g=[] # gammas
self.f=[] # free weights
#---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f,w=self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
#1) case a): Bound one free weight
l_in=None
if len(f)>1:
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0
for i in f:
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
if l>l_in:l_in,i_in,bi_in=l,i,bi
j+=1
#2) case b): Free one bounded weight
l_out=None
if len(f)<self.mean.shape[0]:
b=self.getB(f)
for i in b:
covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
covarF_inv=np.linalg.inv(covarF)
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
self.w[-1][i])
if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i
if (l_in==None or l_in<0) and (l_out==None or l_out<0):
#3) compute minimum variance solution
self.l.append(0)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
meanF=np.zeros(meanF.shape)
else:
#4) decide lambda
if l_in>l_out:
self.l.append(l_in)
f.remove(i_in)
w[i_in]=bi_in # set value at the correct boundary
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
#5) compute solution vector
wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
for i in range(len(f)):w[f[i]]=wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.l[-1]==0:break
#6) Purge turning points
self.purgeNumErr(10e-10)
self.purgeExcess()
#---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
#1) Form structured array
a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
#2) Sort structured array
b=np.sort(a,order='mu')
#3) First free weight
i,w=b.shape[0],np.copy(self.lB)
while sum(w)<1:
i-=1
w[b[i][0]]=self.uB[b[i][0]]
w[b[i][0]]+=1-sum(w)
return [b[i][0]],w
#---------------------------------------------------------------
def computeBi(self,c,bi):
if c>0:
bi=bi[1][0]
if c<0:
bi=bi[0][0]
return bi
#---------------------------------------------------------------
def computeW(self,covarF_inv,covarFB,meanF,wB):
#1) compute gamma
onesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
if wB==None:
g,w1=float(-self.l[-1]*g1/g2+1/g2),0
else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T,wB)
g4=np.dot(covarF_inv,covarFB)
w1=np.dot(g4,wB)
g4=np.dot(onesF.T,w1)
g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
#2) compute weights
w2=np.dot(covarF_inv,onesF)
w3=np.dot(covarF_inv,meanF)
return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
#1) C
onesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
c2=np.dot(covarF_inv,meanF)
c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
c4=np.dot(covarF_inv,onesF)
c=-c1*c2[i]+c3*c4[i]
if c==0:return None,None
#2) bi
if type(bi)==list:bi=self.computeBi(c,bi)
#3) Lambda
if wB==None:
# All free assets
return float((c4[i]-c1*bi)/c),bi
else:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T,wB)
l2=np.dot(covarF_inv,covarFB)
l3=np.dot(l2,wB)
l2=np.dot(onesF.T,l3)
return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
def getMatrices(self,f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF=self.reduceMatrix(self.covar,f,f)
meanF=self.reduceMatrix(self.mean,f,[0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar,f,b)
wB=self.reduceMatrix(self.w[-1],b,[0])
return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
def getB(self,f):
return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
def diffLists(self,list1,list2):
return list(set(list1)-set(list2))
#---------------------------------------------------------------
def reduceMatrix(self,matrix,listX,listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX)==0 or len(listY)==0:return
matrix_=matrix[:,listY[0]:listY[0]+1]
for i in listY[1:]:
a=matrix[:,i:i+1]
matrix_=np.append(matrix_,a,1)
matrix__=matrix_[listX[0]:listX[0]+1,:]
for i in listX[1:]:
a=matrix_[i:i+1,:]
matrix__=np.append(matrix__,a,0)
return matrix__
#---------------------------------------------------------------
def purgeNumErr(self,tol):
# Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
i=0
while True:
flag=False
if i==len(self.w):break
if abs(sum(self.w[i])-1)>tol:
flag=True
else:
for j in range(self.w[i].shape[0]):
if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
flag=True;break
if flag==True:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
else:
i+=1
return
#---------------------------------------------------------------
def purgeExcess(self):
# Remove violations of the convex hull
i,repeat=0,False
while True:
if repeat==False:i+=1
if i==len(self.w)-1:break
w=self.w[i]
mu=np.dot(w.T,self.mean)[0,0]
j,repeat=i+1,False
while True:
if j==len(self.w):break
w=self.w[j]
mu_=np.dot(w.T,self.mean)[0,0]
if mu<mu_:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
repeat=True
break
else:
j+=1
return
#---------------------------------------------------------------
def getMinVar(self):
# Get the minimum variance solution
var=[]
for w in self.w:
a=np.dot(np.dot(w.T,self.covar),w)
var.append(a)
return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
def getMaxSR(self):
# Get the max Sharpe ratio portfolio
#1) Compute the local max SR portfolio between any two neighbor turning points
w_sr,sr=[],[]
for i in range(len(self.w)-1):
w0=np.copy(self.w[i])
w1=np.copy(self.w[i+1])
kargs={'minimum':False,'args':(w0,w1)}
a,b=self.goldenSection(self.evalSR,0,1,**kargs)
w_sr.append(a*w0+(1-a)*w1)
sr.append(b)
return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
def evalSR(self,a,w0,w1):
# Evaluate SR of the portfolio within the convex combination
w=a*w0+(1-a)*w1
b=np.dot(w.T,self.mean)[0,0]
c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
return b/c
#---------------------------------------------------------------
def goldenSection(self,obj,a,b,**kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
from math import log,ceil
tol,sign,args=1.0e-9,1,None
if 'minimum' in kargs and kargs['minimum']==False:sign=-1
if 'args' in kargs:args=kargs['args']
numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
r=0.618033989
c=1.0-r
# Initialize
x1=r*a+c*b;x2=c*a+r*b
f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
# Loop
for i in range(numIter):
if f1>f2:
a=x1
x1=x2;f1=f2
x2=c*a+r*b;f2=sign*obj(x2,*args)
else:
b=x2
x2=x1;f2=f1
x1=r*a+c*b;f1=sign*obj(x1,*args)
if f1<f2:return x1,sign*f1
else:return x2,sign*f2
#---------------------------------------------------------------
def efFrontier(self,points):
# Get the efficient frontier
mu,sigma,weights=[],[],[]
a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
b=range(len(self.w)-1)
for i in b:
w0,w1=self.w[i],self.w[i+1]
if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
for j in a:
w=w1*j+(1-j)*w0
weights.append(np.copy(w))
mu.append(np.dot(w.T,self.mean)[0,0])
sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
return mu,sigma,weights

There was a runtime error.

backtest result if rebalance weekly, remoe tlt, and disable commission fee.

15
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
 Returns 1 Month 3 Month 6 Month 12 Month
 Alpha 1 Month 3 Month 6 Month 12 Month
 Beta 1 Month 3 Month 6 Month 12 Month
 Sharpe 1 Month 3 Month 6 Month 12 Month
 Sortino 1 Month 3 Month 6 Month 12 Month
 Volatility 1 Month 3 Month 6 Month 12 Month
 Max Drawdown 1 Month 3 Month 6 Month 12 Month
# David Edwards  CLA  on the sector etf plus TLT
# https://www.quantopian.com/posts/critical-line-algorithm-for-portfolio-optimization

"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
#set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
context.securities =symbols('xlf', 'xle', 'xlu', 'xlk', 'xlb', 'xlp', 'xly','xli', 'xlv')
# Constrained to rebalance once/week
# Min/Max weights that can be assigned to a single security
context.lower_bound = -0.20
context.upper_bound = 1.0
schedule_function(assgin_weights,date_rule=date_rules.week_start(0),time_rule=time_rules.market_open(hours=1, minutes=1))

# Will be called on every trade event for the securities you specify.
def handle_data(context, data):
pass

def assgin_weights(context, data):
now = get_datetime()
return

prices = history(80, '1d', 'price').dropna(axis=1)

N = len(prices.columns)
R = np.log(prices).diff().dropna()
mean = R.mean().reshape((N, 1))

lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1))
upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1))
covar = R.cov().values

# Try statement due to occasional singular matrices.
try:
cla = CLA(mean, covar, lower_bounds, upper_bounds)
cla.solve()

# Maximum sharpe ratio and the corresponding wieghts
sharpe, w_sharpe = cla.getMaxSR()
w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
weights = {}
for stock in data:
if stock in w_sharpe.index:
weights[stock.symbol] = w_sharpe[stock]
order_target_percent(stock, w_sharpe[stock])
else:
order_target(stock, 0)
log.info("\n%s"%pd.Series(weights))
record(sharpe=sharpe)

except Exception as e:
# Reset the trade date to try again on the next bar
log.debug(e)

class CLA:
def __init__(self,mean,covar,lB,uB):
# Initialize the class
self.mean=mean
self.covar=covar
self.lB=lB
self.uB=uB
self.w=[] # solution
self.l=[] # lambdas
self.g=[] # gammas
self.f=[] # free weights
#---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f,w=self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
#1) case a): Bound one free weight
l_in=None
if len(f)>1:
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0
for i in f:
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
if l>l_in:l_in,i_in,bi_in=l,i,bi
j+=1
#2) case b): Free one bounded weight
l_out=None
if len(f)<self.mean.shape[0]:
b=self.getB(f)
for i in b:
covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
covarF_inv=np.linalg.inv(covarF)
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
self.w[-1][i])
if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i
if (l_in==None or l_in<0) and (l_out==None or l_out<0):
#3) compute minimum variance solution
self.l.append(0)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
meanF=np.zeros(meanF.shape)
else:
#4) decide lambda
if l_in>l_out:
self.l.append(l_in)
f.remove(i_in)
w[i_in]=bi_in # set value at the correct boundary
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
#5) compute solution vector
wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
for i in range(len(f)):w[f[i]]=wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.l[-1]==0:break
#6) Purge turning points
self.purgeNumErr(10e-10)
self.purgeExcess()
#---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
#1) Form structured array
a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
#2) Sort structured array
b=np.sort(a,order='mu')
#3) First free weight
i,w=b.shape[0],np.copy(self.lB)
while sum(w)<1:
i-=1
w[b[i][0]]=self.uB[b[i][0]]
w[b[i][0]]+=1-sum(w)
return [b[i][0]],w
#---------------------------------------------------------------
def computeBi(self,c,bi):
if c>0:
bi=bi[1][0]
if c<0:
bi=bi[0][0]
return bi
#---------------------------------------------------------------
def computeW(self,covarF_inv,covarFB,meanF,wB):
#1) compute gamma
onesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
if wB==None:
g,w1=float(-self.l[-1]*g1/g2+1/g2),0
else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T,wB)
g4=np.dot(covarF_inv,covarFB)
w1=np.dot(g4,wB)
g4=np.dot(onesF.T,w1)
g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
#2) compute weights
w2=np.dot(covarF_inv,onesF)
w3=np.dot(covarF_inv,meanF)
return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
#1) C
onesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
c2=np.dot(covarF_inv,meanF)
c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
c4=np.dot(covarF_inv,onesF)
c=-c1*c2[i]+c3*c4[i]
if c==0:return None,None
#2) bi
if type(bi)==list:bi=self.computeBi(c,bi)
#3) Lambda
if wB==None:
# All free assets
return float((c4[i]-c1*bi)/c),bi
else:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T,wB)
l2=np.dot(covarF_inv,covarFB)
l3=np.dot(l2,wB)
l2=np.dot(onesF.T,l3)
return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
def getMatrices(self,f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF=self.reduceMatrix(self.covar,f,f)
meanF=self.reduceMatrix(self.mean,f,[0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar,f,b)
wB=self.reduceMatrix(self.w[-1],b,[0])
return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
def getB(self,f):
return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
def diffLists(self,list1,list2):
return list(set(list1)-set(list2))
#---------------------------------------------------------------
def reduceMatrix(self,matrix,listX,listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX)==0 or len(listY)==0:return
matrix_=matrix[:,listY[0]:listY[0]+1]
for i in listY[1:]:
a=matrix[:,i:i+1]
matrix_=np.append(matrix_,a,1)
matrix__=matrix_[listX[0]:listX[0]+1,:]
for i in listX[1:]:
a=matrix_[i:i+1,:]
matrix__=np.append(matrix__,a,0)
return matrix__
#---------------------------------------------------------------
def purgeNumErr(self,tol):
# Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
i=0
while True:
flag=False
if i==len(self.w):break
if abs(sum(self.w[i])-1)>tol:
flag=True
else:
for j in range(self.w[i].shape[0]):
if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
flag=True;break
if flag==True:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
else:
i+=1
return
#---------------------------------------------------------------
def purgeExcess(self):
# Remove violations of the convex hull
i,repeat=0,False
while True:
if repeat==False:i+=1
if i==len(self.w)-1:break
w=self.w[i]
mu=np.dot(w.T,self.mean)[0,0]
j,repeat=i+1,False
while True:
if j==len(self.w):break
w=self.w[j]
mu_=np.dot(w.T,self.mean)[0,0]
if mu<mu_:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
repeat=True
break
else:
j+=1
return
#---------------------------------------------------------------
def getMinVar(self):
# Get the minimum variance solution
var=[]
for w in self.w:
a=np.dot(np.dot(w.T,self.covar),w)
var.append(a)
return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
def getMaxSR(self):
# Get the max Sharpe ratio portfolio
#1) Compute the local max SR portfolio between any two neighbor turning points
w_sr,sr=[],[]
for i in range(len(self.w)-1):
w0=np.copy(self.w[i])
w1=np.copy(self.w[i+1])
kargs={'minimum':False,'args':(w0,w1)}
a,b=self.goldenSection(self.evalSR,0,1,**kargs)
w_sr.append(a*w0+(1-a)*w1)
sr.append(b)
return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
def evalSR(self,a,w0,w1):
# Evaluate SR of the portfolio within the convex combination
w=a*w0+(1-a)*w1
b=np.dot(w.T,self.mean)[0,0]
c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
return b/c
#---------------------------------------------------------------
def goldenSection(self,obj,a,b,**kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
from math import log,ceil
tol,sign,args=1.0e-9,1,None
if 'minimum' in kargs and kargs['minimum']==False:sign=-1
if 'args' in kargs:args=kargs['args']
numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
r=0.618033989
c=1.0-r
# Initialize
x1=r*a+c*b;x2=c*a+r*b
f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
# Loop
for i in range(numIter):
if f1>f2:
a=x1
x1=x2;f1=f2
x2=c*a+r*b;f2=sign*obj(x2,*args)
else:
b=x2
x2=x1;f2=f1
x1=r*a+c*b;f1=sign*obj(x1,*args)
if f1<f2:return x1,sign*f1
else:return x2,sign*f2
#---------------------------------------------------------------
def efFrontier(self,points):
# Get the efficient frontier
mu,sigma,weights=[],[],[]
a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
b=range(len(self.w)-1)
for i in b:
w0,w1=self.w[i],self.w[i+1]
if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
for j in a:
w=w1*j+(1-j)*w0
weights.append(np.copy(w))
mu.append(np.dot(w.T,self.mean)[0,0])
sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
return mu,sigma,weights

There was a runtime error.

Good strategy, thanks for sharing!

In the paper, a target volatility, for example 5% or 10%, which are treated as Offensive or Defensive solutions, were recommended. Question is, how can we target a volatility by CLA api? Or we have to search on Effective Front to find it by ourself?

Best,
David

Has anyone tried to fix this algorithm for Python 3.5. I got it to run fixing the that zip() no longer returns a list, and x==None should be "x is None". However the results are not the same. I have been going like by line but I think it takes a firm understanding of CLA.

Hi! I changed a few lines and now the algorithm is correctly working in Python 3.7.5. Here you have the updated class

class CLA:
def __init__(self,mean,covar,lB,uB):
# Initialize the class
self.mean=mean
self.covar=covar
self.lB=lB
self.uB=uB
self.w=[] # solution
self.l=[] # lambdas
self.g=[] # gammas
self.f=[] # free weights
#---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f,w=self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(None)
self.g.append(None)
self.f.append(f[:])
while True:
#1) case a): Bound one free weight
l_in=None
if len(f)>1:
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0
for i in f:
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
if (l_in is None or l>l_in):l_in,i_in,bi_in=l,i,bi
j+=1
#2) case b): Free one bounded weight
l_out=None
if len(f)<self.mean.shape[0]:
b=self.getB(f)
for i in b:
covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
covarF_inv=np.linalg.inv(covarF)
l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
self.w[-1][i]) ##########################
if (self.l[-1]==None or l<self.l[-1]) and (l_out is None or l>l_out):l_out,i_out=l,i
if (l_in==None or l_in<0) and (l_out==None or l_out<0):
#3) compute minimum variance solution
self.l.append(0)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
meanF=np.zeros(meanF.shape)
else:
#4) decide lambda
if l_out is None or (l_in is not None and l_in>l_out):
self.l.append(l_in)
f.remove(l_in)
w[i_in]=bi_in # set value at the correct boundary
else:
self.l.append(l_out)
f.append(i_out)
covarF,covarFB,meanF,wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
#5) compute solution vector
wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
for i in range(len(f)):w[f[i]]=wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if self.l[-1]==0:break
#6) Purge turning points
self.purgeNumErr(10e-10)
self.purgeExcess()
#---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
#1) Form structured array
a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:]=list(zip(range(self.mean.shape[0]),b)) # fill structured array
#2) Sort structured array
b=np.sort(a,order='mu')
#3) First free weight
i,w=b.shape[0],np.copy(self.lB)
while sum(w)<1:
i-=1
w[b[i][0]]=self.uB[b[i][0]]
w[b[i][0]]+=1-sum(w)
return [b[i][0]],w
#---------------------------------------------------------------
def computeBi(self,c,bi):
if c>0:
bi=bi[1][0]
if c<0:
bi=bi[0][0]
return bi
#---------------------------------------------------------------
def computeW(self,covarF_inv,covarFB,meanF,wB):
#1) compute gamma
onesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
if wB is None:
g,w1=float(-self.l[-1]*g1/g2+1/g2),0
else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T,wB)
g4=np.dot(covarF_inv,covarFB)
w1=np.dot(g4,wB)
g4=np.dot(onesF.T,w1)
g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
#2) compute weights
w2=np.dot(covarF_inv,onesF)
w3=np.dot(covarF_inv,meanF)
return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
#1) C
onesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
c2=np.dot(covarF_inv,meanF)
c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
c4=np.dot(covarF_inv,onesF)
c=-c1*c2[i]+c3*c4[i]
if c==0:return
#2) bi
if type(bi)==list:bi=self.computeBi(c,bi)
#3) Lambda
if wB is None:
# All free assets
return float((c4[i]-c1*bi)/c),bi
else:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T,wB)
l2=np.dot(covarF_inv,covarFB)
l3=np.dot(l2,wB)
l2=np.dot(onesF.T,l3)
return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
def getMatrices(self,f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF=self.reduceMatrix(self.covar,f,f)
meanF=self.reduceMatrix(self.mean,f,[0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar,f,b)
wB=self.reduceMatrix(self.w[-1].reshape(-1,1),b,[0])
return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
def getB(self,f):
return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
def diffLists(self,list1,list2):
return list(set(list1)-set(list2))
#---------------------------------------------------------------
def reduceMatrix(self,matrix,listX,listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX)==0 or len(listY)==0:return
matrix_=matrix[:,listY[0]:listY[0]+1]
for i in listY[1:]:
a=matrix[:,i:i+1]
matrix_=np.append(matrix_,a,1)
matrix__=matrix_[listX[0]:listX[0]+1,:]
for i in listX[1:]:
a=matrix_[i:i+1,:]
matrix__=np.append(matrix__,a,0)
return matrix__
#---------------------------------------------------------------
def purgeNumErr(self,tol):
# Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
i=0
while True:
if i==len(self.w):break
w=self.w[i]
for j in range(w.shape[0]):
if w[j]-self.lB[j]<-tol or w[j]-self.uB[j]>tol:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
break
i+=1
#---------------------------------------------------------------
def purgeExcess(self):
# Remove violations of the convex hull
i,repeat=0,False
while True:
if repeat==False:i+=1
if i==len(self.w)-1:break
w=self.w[i]
mu=np.dot(w.T,self.mean)[0,0]
j,repeat=i+1,False
while True:
if j==len(self.w):break
w=self.w[j]
mu_=np.dot(w.T,self.mean)[0,0]
if mu<mu_:
del self.w[i]
del self.l[i]
del self.g[i]
del self.f[i]
repeat=True
break
else:
j+=1
#---------------------------------------------------------------
def getMinVar(self):
# Get the minimum variance solution
var=[]
for w in self.w:
a=np.dot(np.dot(w.T,self.covar),w)[0,0]
var.append(a)
return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
def getMaxSR(self):
# Get the max Sharpe ratio portfolio
#1) Compute the local max SR portfolio between any two neighbor turning points
w_sr,sr=[],[]
for i in range(len(self.w)-1):
w0=np.copy(self.w[i])
w1=np.copy(self.w[i+1])
kargs={'minimum':False,'args':(w0,w1)}
a,b=self.goldenSection(self.evalSR,0,1,**kargs)
w_sr.append(a*w0+(1-a)*w1)
sr.append(b)
return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
def evalSR(self,a,w0,w1):
# Evaluate SR of the portfolio within the convex combination
w=a*w0+(1-a)*w1
b=np.dot(w.T,self.mean)[0,0]
c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
return b/c
#---------------------------------------------------------------
def goldenSection(self,obj,a,b,**kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
from math import log,ceil
tol,sign,args=1.0e-9,1,None
if 'minimum' in kargs and kargs['minimum']==False:sign=-1
if 'args' in kargs:args=kargs['args']
numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
r=0.618033989
c=1.0-r
# Initialize
x1=r*a+c*b;x2=c*a+r*b
f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
# Loop
for i in range(numIter):
if f1>f2:
a=x1
x1=x2;f1=f2
x2=c*a+r*b;f2=sign*obj(x2,*args)
else:
b=x2
x2=x1;f2=f1
x1=r*a+c*b;f1=sign*obj(x1,*args)
if f1<f2:return x1,sign*f1
else:return x2,sign*f2
#---------------------------------------------------------------
def efFrontier(self,points):
# Get the efficient frontier
mu,sigma,weights=[],[],[]
a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
b=range(len(self.w)-1)
for i in b:
w0,w1=self.w[i],self.w[i+1]
if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
for j in a:
w=w1*j+(1-j)*w0
weights.append(np.copy(w))
mu.append(np.dot(w.T,self.mean)[0,0])
sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
return mu,sigma,weights


And here you have the code to reproduce the results of MLP

data = pd.read_csv('http://www.quantresearch.org/CLA_Data.csv.txt')
mu = data.loc[0,:].values.reshape(-1,1)
sigma = data.loc[3:,:].values
lb = data.loc[1,:].values.reshape(-1,1)
ub = data.loc[2,:].values.reshape(-1,1)
cla = CLA_2(mu, sigma, lb, ub)
cla.solve()
print(cla.getMinVar())
print(cla.getMaxSR())
mu, sigma, weights = cla.efFrontier(100)
fig, ax = plt.subplots(figsize=(12,8))
ax.plot(sigma, mu)
ax.set_xlabel('Risk', fontsize=16)
ax.set_ylabel('Expected Excess Return', fontsize=16)
plt.xticks(rotation='vertical')
ax.set_title('CLA-derived Efficient Frontier', fontsize=18);
plt.tight_layout()