Back to Community
Critical Line Algorithm for Portfolio Optimization

The critical line algorithm (CLA) is a portfolio optimization routine developed by Harry Markowitz. I stumbled on an open sourced version of the routine that happened to be done in Python, so I got it working in Quantopian.

This paper gives a pretty thorough step-by-step explanation of the algorithm. This version uses the weights that maximize the sharpe ratio, but the algorithm produces several sets of weights along the efficient frontier curve.

David

Clone Algorithm
264
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd



def initialize(context):
    set_commission(commission.PerShare(cost=0.01, min_trade_cost=1.0))
    set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
    
    # Constrained to rebalance once/week
    context.trade_offset = pd.tseries.offsets.Week()
    context.trade_date = pd.Timestamp('1900-01-01', tz='utc')
    # Min/Max weights that can be assigned to a single security
    context.lower_bound = -0.20
    context.upper_bound = 1.0

# Will be called on every trade event for the securities you specify. 
def handle_data(context, data):
    
    now = get_datetime()
    if now < context.trade_date:
        return
    context.trade_date = context.trade_offset.apply(now)

    prices = history(300, '1d', 'price').dropna(axis=1)
    
    N = len(prices.columns)
    R = np.log(prices).diff().dropna()
    mean = R.mean().reshape((N, 1))
    
    lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1)) 
    upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1)) 
    covar = R.cov().values
    
    # Try statement due to occasional singular matrices.
    try:
        cla = CLA(mean, covar, lower_bounds, upper_bounds)
        cla.solve()
    
        # Maximum sharpe ratio and the corresponding wieghts
        sharpe, w_sharpe = cla.getMaxSR()
        w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
        weights = {}
        for stock in data:
            if stock in w_sharpe.index:
                weights[stock.symbol] = w_sharpe[stock]
                order_target_percent(stock, w_sharpe[stock])
            else:
                order_target(stock, 0)
        log.info("\n%s"%pd.Series(weights))
        record(sharpe=sharpe)

    except Exception as e:
        # Reset the trade date to try again on the next bar
        context.trade_date = now
        log.debug(e)
            
            
    



class CLA:
    def __init__(self,mean,covar,lB,uB):
        # Initialize the class
        self.mean=mean
        self.covar=covar
        self.lB=lB
        self.uB=uB
        self.w=[] # solution
        self.l=[] # lambdas
        self.g=[] # gammas
        self.f=[] # free weights
#---------------------------------------------------------------
    def solve(self):
        # Compute the turning points,free sets and weights
        f,w=self.initAlgo()
        self.w.append(np.copy(w)) # store solution
        self.l.append(None)
        self.g.append(None)
        self.f.append(f[:])
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i                
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                meanF=np.zeros(meanF.shape)
            else:
                #4) decide lambda
                if l_in>l_out:
                    self.l.append(l_in)
                    f.remove(i_in)
                    w[i_in]=bi_in # set value at the correct boundary
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
            #5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
        #6) Purge turning points
        self.purgeNumErr(10e-10)
        self.purgeExcess()
#---------------------------------------------------------------    
    def initAlgo(self):
        # Initialize the algo
        #1) Form structured array
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
        a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
        #2) Sort structured array
        b=np.sort(a,order='mu')
        #3) First free weight
        i,w=b.shape[0],np.copy(self.lB)
        while sum(w)<1:
            i-=1
            w[b[i][0]]=self.uB[b[i][0]]
        w[b[i][0]]+=1-sum(w)
        return [b[i][0]],w
#---------------------------------------------------------------    
    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1][0]
        if c<0:
            bi=bi[0][0]
        return bi
#---------------------------------------------------------------
    def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if wB==None:
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return None,None
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if wB==None:
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.w[-1],b,[0])
        return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))
#---------------------------------------------------------------
    def reduceMatrix(self,matrix,listX,listY):
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__
#---------------------------------------------------------------    
    def purgeNumErr(self,tol):
        # Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
        i=0
        while True:
            flag=False
            if i==len(self.w):break
            if abs(sum(self.w[i])-1)>tol:
                flag=True
            else:
                for j in range(self.w[i].shape[0]):
                    if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
                        flag=True;break
            if flag==True:
                del self.w[i]
                del self.l[i]
                del self.g[i]
                del self.f[i]
            else:
                i+=1
        return
#---------------------------------------------------------------    
    def purgeExcess(self):
        # Remove violations of the convex hull
        i,repeat=0,False
        while True:
            if repeat==False:i+=1
            if i==len(self.w)-1:break
            w=self.w[i]
            mu=np.dot(w.T,self.mean)[0,0]
            j,repeat=i+1,False
            while True:
                if j==len(self.w):break
                w=self.w[j]
                mu_=np.dot(w.T,self.mean)[0,0]
                if mu<mu_:
                    del self.w[i]
                    del self.l[i]
                    del self.g[i]
                    del self.f[i]
                    repeat=True
                    break
                else:
                    j+=1
        return
#---------------------------------------------------------------
    def getMinVar(self):
        # Get the minimum variance solution
        var=[]
        for w in self.w:
            a=np.dot(np.dot(w.T,self.covar),w)
            var.append(a)
        return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
    def getMaxSR(self):
        # Get the max Sharpe ratio portfolio
        #1) Compute the local max SR portfolio between any two neighbor turning points
        w_sr,sr=[],[]
        for i in range(len(self.w)-1):
            w0=np.copy(self.w[i])
            w1=np.copy(self.w[i+1])
            kargs={'minimum':False,'args':(w0,w1)}
            a,b=self.goldenSection(self.evalSR,0,1,**kargs)
            w_sr.append(a*w0+(1-a)*w1)
            sr.append(b)
        return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
    def evalSR(self,a,w0,w1):
        # Evaluate SR of the portfolio within the convex combination
        w=a*w0+(1-a)*w1
        b=np.dot(w.T,self.mean)[0,0]
        c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
        return b/c
#---------------------------------------------------------------
    def goldenSection(self,obj,a,b,**kargs):
        # Golden section method. Maximum if kargs['minimum']==False is passed 
        from math import log,ceil
        tol,sign,args=1.0e-9,1,None
        if 'minimum' in kargs and kargs['minimum']==False:sign=-1
        if 'args' in kargs:args=kargs['args']
        numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
        r=0.618033989
        c=1.0-r
        # Initialize
        x1=r*a+c*b;x2=c*a+r*b
        f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
        # Loop
        for i in range(numIter):
            if f1>f2:
                a=x1
                x1=x2;f1=f2
                x2=c*a+r*b;f2=sign*obj(x2,*args)
            else:
                b=x2
                x2=x1;f2=f1
                x1=r*a+c*b;f1=sign*obj(x1,*args)
        if f1<f2:return x1,sign*f1
        else:return x2,sign*f2
#---------------------------------------------------------------
    def efFrontier(self,points):
        # Get the efficient frontier
        mu,sigma,weights=[],[],[]
        a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
        b=range(len(self.w)-1)
        for i in b:
            w0,w1=self.w[i],self.w[i+1]
            if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
            for j in a:
                w=w1*j+(1-j)*w0
                weights.append(np.copy(w))
                mu.append(np.dot(w.T,self.mean)[0,0])
                sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
        return mu,sigma,weights
There was a runtime error.
8 responses

CLA on the sector etf plus TLT

Clone Algorithm
203
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# David Edwards  CLA  on the sector etf plus TLT
# https://www.quantopian.com/posts/critical-line-algorithm-for-portfolio-optimization

"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
    set_commission(commission.PerShare(cost=0.01, min_trade_cost=1.0))
    #set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
    context.securities =symbols( 'tlt','xlf', 'xle', 'xlu', 'xlk', 'xlb', 'xlp', 'xly','xli', 'xlv')
    # Constrained to rebalance once/week
    context.trade_offset = pd.tseries.offsets.Week()
    context.trade_date = pd.Timestamp('1900-01-01', tz='utc')
    # Min/Max weights that can be assigned to a single security
    context.lower_bound = -0.20
    context.upper_bound = 1.0

# Will be called on every trade event for the securities you specify. 
def handle_data(context, data):
    
    now = get_datetime()
    if now < context.trade_date:
        return
    context.trade_date = context.trade_offset.apply(now)

    prices = history(80, '1d', 'price').dropna(axis=1)
    
    N = len(prices.columns)
    R = np.log(prices).diff().dropna()
    mean = R.mean().reshape((N, 1))
    
    lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1)) 
    upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1)) 
    covar = R.cov().values
    
    # Try statement due to occasional singular matrices.
    try:
        cla = CLA(mean, covar, lower_bounds, upper_bounds)
        cla.solve()
    
        # Maximum sharpe ratio and the corresponding wieghts
        sharpe, w_sharpe = cla.getMaxSR()
        w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
        weights = {}
        for stock in data:
            if stock in w_sharpe.index:
                weights[stock.symbol] = w_sharpe[stock]
                order_target_percent(stock, w_sharpe[stock])
            else:
                order_target(stock, 0)
        log.info("\n%s"%pd.Series(weights))
        record(sharpe=sharpe)

    except Exception as e:
        # Reset the trade date to try again on the next bar
        context.trade_date = now
        log.debug(e)             

class CLA:
    def __init__(self,mean,covar,lB,uB):
        # Initialize the class
        self.mean=mean
        self.covar=covar
        self.lB=lB
        self.uB=uB
        self.w=[] # solution
        self.l=[] # lambdas
        self.g=[] # gammas
        self.f=[] # free weights
#---------------------------------------------------------------
    def solve(self):
        # Compute the turning points,free sets and weights
        f,w=self.initAlgo()
        self.w.append(np.copy(w)) # store solution
        self.l.append(None)
        self.g.append(None)
        self.f.append(f[:])
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i                
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                meanF=np.zeros(meanF.shape)
            else:
                #4) decide lambda
                if l_in>l_out:
                    self.l.append(l_in)
                    f.remove(i_in)
                    w[i_in]=bi_in # set value at the correct boundary
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
            #5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
        #6) Purge turning points
        self.purgeNumErr(10e-10)
        self.purgeExcess()
#---------------------------------------------------------------    
    def initAlgo(self):
        # Initialize the algo
        #1) Form structured array
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
        a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
        #2) Sort structured array
        b=np.sort(a,order='mu')
        #3) First free weight
        i,w=b.shape[0],np.copy(self.lB)
        while sum(w)<1:
            i-=1
            w[b[i][0]]=self.uB[b[i][0]]
        w[b[i][0]]+=1-sum(w)
        return [b[i][0]],w
#---------------------------------------------------------------    
    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1][0]
        if c<0:
            bi=bi[0][0]
        return bi
#---------------------------------------------------------------
    def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if wB==None:
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return None,None
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if wB==None:
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.w[-1],b,[0])
        return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))
#---------------------------------------------------------------
    def reduceMatrix(self,matrix,listX,listY):
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__
#---------------------------------------------------------------    
    def purgeNumErr(self,tol):
        # Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
        i=0
        while True:
            flag=False
            if i==len(self.w):break
            if abs(sum(self.w[i])-1)>tol:
                flag=True
            else:
                for j in range(self.w[i].shape[0]):
                    if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
                        flag=True;break
            if flag==True:
                del self.w[i]
                del self.l[i]
                del self.g[i]
                del self.f[i]
            else:
                i+=1
        return
#---------------------------------------------------------------    
    def purgeExcess(self):
        # Remove violations of the convex hull
        i,repeat=0,False
        while True:
            if repeat==False:i+=1
            if i==len(self.w)-1:break
            w=self.w[i]
            mu=np.dot(w.T,self.mean)[0,0]
            j,repeat=i+1,False
            while True:
                if j==len(self.w):break
                w=self.w[j]
                mu_=np.dot(w.T,self.mean)[0,0]
                if mu<mu_:
                    del self.w[i]
                    del self.l[i]
                    del self.g[i]
                    del self.f[i]
                    repeat=True
                    break
                else:
                    j+=1
        return
#---------------------------------------------------------------
    def getMinVar(self):
        # Get the minimum variance solution
        var=[]
        for w in self.w:
            a=np.dot(np.dot(w.T,self.covar),w)
            var.append(a)
        return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
    def getMaxSR(self):
        # Get the max Sharpe ratio portfolio
        #1) Compute the local max SR portfolio between any two neighbor turning points
        w_sr,sr=[],[]
        for i in range(len(self.w)-1):
            w0=np.copy(self.w[i])
            w1=np.copy(self.w[i+1])
            kargs={'minimum':False,'args':(w0,w1)}
            a,b=self.goldenSection(self.evalSR,0,1,**kargs)
            w_sr.append(a*w0+(1-a)*w1)
            sr.append(b)
        return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
    def evalSR(self,a,w0,w1):
        # Evaluate SR of the portfolio within the convex combination
        w=a*w0+(1-a)*w1
        b=np.dot(w.T,self.mean)[0,0]
        c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
        return b/c
#---------------------------------------------------------------
    def goldenSection(self,obj,a,b,**kargs):
        # Golden section method. Maximum if kargs['minimum']==False is passed 
        from math import log,ceil
        tol,sign,args=1.0e-9,1,None
        if 'minimum' in kargs and kargs['minimum']==False:sign=-1
        if 'args' in kargs:args=kargs['args']
        numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
        r=0.618033989
        c=1.0-r
        # Initialize
        x1=r*a+c*b;x2=c*a+r*b
        f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
        # Loop
        for i in range(numIter):
            if f1>f2:
                a=x1
                x1=x2;f1=f2
                x2=c*a+r*b;f2=sign*obj(x2,*args)
            else:
                b=x2
                x2=x1;f2=f1
                x1=r*a+c*b;f1=sign*obj(x1,*args)
        if f1<f2:return x1,sign*f1
        else:return x2,sign*f2
#---------------------------------------------------------------
    def efFrontier(self,points):
        # Get the efficient frontier
        mu,sigma,weights=[],[],[]
        a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
        b=range(len(self.w)-1)
        for i in b:
            w0,w1=self.w[i],self.w[i+1]
            if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
            for j in a:
                w=w1*j+(1-j)*w0
                weights.append(np.copy(w))
                mu.append(np.dot(w.T,self.mean)[0,0])
                sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
        return mu,sigma,weights
There was a runtime error.

Looks pretty solid Vladimir, I find that these optimization routines work much better with fewer assets, sector ETFs are usually a go to for me as well.

Look long term.

Clone Algorithm
13
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# David Edwards  CLA  on the sector etf plus TLT
# https://www.quantopian.com/posts/critical-line-algorithm-for-portfolio-optimization

"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
    set_commission(commission.PerShare(cost=0.01, min_trade_cost=1.0))
    #set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
    context.securities =symbols( 'tlt','xlf', 'xle', 'xlu', 'xlk', 'xlb', 'xlp', 'xly','xli', 'xlv')
    # Constrained to rebalance once/week
    context.trade_offset = pd.tseries.offsets.Week()
    context.trade_date = pd.Timestamp('1900-01-01', tz='utc')
    # Min/Max weights that can be assigned to a single security
    context.lower_bound = -0.20
    context.upper_bound = 1.0

# Will be called on every trade event for the securities you specify. 
def handle_data(context, data):
    
    now = get_datetime()
    if now < context.trade_date:
        return
    context.trade_date = context.trade_offset.apply(now)

    prices = history(80, '1d', 'price').dropna(axis=1)
    
    N = len(prices.columns)
    R = np.log(prices).diff().dropna()
    mean = R.mean().reshape((N, 1))
    
    lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1)) 
    upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1)) 
    covar = R.cov().values
    
    # Try statement due to occasional singular matrices.
    try:
        cla = CLA(mean, covar, lower_bounds, upper_bounds)
        cla.solve()
    
        # Maximum sharpe ratio and the corresponding wieghts
        sharpe, w_sharpe = cla.getMaxSR()
        w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
        weights = {}
        for stock in data:
            if stock in w_sharpe.index:
                weights[stock.symbol] = w_sharpe[stock]
                order_target_percent(stock, w_sharpe[stock])
            else:
                order_target(stock, 0)
        log.info("\n%s"%pd.Series(weights))
        record(sharpe=sharpe)

    except Exception as e:
        # Reset the trade date to try again on the next bar
        context.trade_date = now
        log.debug(e)             

class CLA:
    def __init__(self,mean,covar,lB,uB):
        # Initialize the class
        self.mean=mean
        self.covar=covar
        self.lB=lB
        self.uB=uB
        self.w=[] # solution
        self.l=[] # lambdas
        self.g=[] # gammas
        self.f=[] # free weights
#---------------------------------------------------------------
    def solve(self):
        # Compute the turning points,free sets and weights
        f,w=self.initAlgo()
        self.w.append(np.copy(w)) # store solution
        self.l.append(None)
        self.g.append(None)
        self.f.append(f[:])
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i                
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                meanF=np.zeros(meanF.shape)
            else:
                #4) decide lambda
                if l_in>l_out:
                    self.l.append(l_in)
                    f.remove(i_in)
                    w[i_in]=bi_in # set value at the correct boundary
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
            #5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
        #6) Purge turning points
        self.purgeNumErr(10e-10)
        self.purgeExcess()
#---------------------------------------------------------------    
    def initAlgo(self):
        # Initialize the algo
        #1) Form structured array
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
        a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
        #2) Sort structured array
        b=np.sort(a,order='mu')
        #3) First free weight
        i,w=b.shape[0],np.copy(self.lB)
        while sum(w)<1:
            i-=1
            w[b[i][0]]=self.uB[b[i][0]]
        w[b[i][0]]+=1-sum(w)
        return [b[i][0]],w
#---------------------------------------------------------------    
    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1][0]
        if c<0:
            bi=bi[0][0]
        return bi
#---------------------------------------------------------------
    def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if wB==None:
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return None,None
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if wB==None:
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.w[-1],b,[0])
        return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))
#---------------------------------------------------------------
    def reduceMatrix(self,matrix,listX,listY):
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__
#---------------------------------------------------------------    
    def purgeNumErr(self,tol):
        # Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
        i=0
        while True:
            flag=False
            if i==len(self.w):break
            if abs(sum(self.w[i])-1)>tol:
                flag=True
            else:
                for j in range(self.w[i].shape[0]):
                    if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
                        flag=True;break
            if flag==True:
                del self.w[i]
                del self.l[i]
                del self.g[i]
                del self.f[i]
            else:
                i+=1
        return
#---------------------------------------------------------------    
    def purgeExcess(self):
        # Remove violations of the convex hull
        i,repeat=0,False
        while True:
            if repeat==False:i+=1
            if i==len(self.w)-1:break
            w=self.w[i]
            mu=np.dot(w.T,self.mean)[0,0]
            j,repeat=i+1,False
            while True:
                if j==len(self.w):break
                w=self.w[j]
                mu_=np.dot(w.T,self.mean)[0,0]
                if mu<mu_:
                    del self.w[i]
                    del self.l[i]
                    del self.g[i]
                    del self.f[i]
                    repeat=True
                    break
                else:
                    j+=1
        return
#---------------------------------------------------------------
    def getMinVar(self):
        # Get the minimum variance solution
        var=[]
        for w in self.w:
            a=np.dot(np.dot(w.T,self.covar),w)
            var.append(a)
        return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
    def getMaxSR(self):
        # Get the max Sharpe ratio portfolio
        #1) Compute the local max SR portfolio between any two neighbor turning points
        w_sr,sr=[],[]
        for i in range(len(self.w)-1):
            w0=np.copy(self.w[i])
            w1=np.copy(self.w[i+1])
            kargs={'minimum':False,'args':(w0,w1)}
            a,b=self.goldenSection(self.evalSR,0,1,**kargs)
            w_sr.append(a*w0+(1-a)*w1)
            sr.append(b)
        return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
    def evalSR(self,a,w0,w1):
        # Evaluate SR of the portfolio within the convex combination
        w=a*w0+(1-a)*w1
        b=np.dot(w.T,self.mean)[0,0]
        c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
        return b/c
#---------------------------------------------------------------
    def goldenSection(self,obj,a,b,**kargs):
        # Golden section method. Maximum if kargs['minimum']==False is passed 
        from math import log,ceil
        tol,sign,args=1.0e-9,1,None
        if 'minimum' in kargs and kargs['minimum']==False:sign=-1
        if 'args' in kargs:args=kargs['args']
        numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
        r=0.618033989
        c=1.0-r
        # Initialize
        x1=r*a+c*b;x2=c*a+r*b
        f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
        # Loop
        for i in range(numIter):
            if f1>f2:
                a=x1
                x1=x2;f1=f2
                x2=c*a+r*b;f2=sign*obj(x2,*args)
            else:
                b=x2
                x2=x1;f2=f1
                x1=r*a+c*b;f1=sign*obj(x1,*args)
        if f1<f2:return x1,sign*f1
        else:return x2,sign*f2
#---------------------------------------------------------------
    def efFrontier(self,points):
        # Get the efficient frontier
        mu,sigma,weights=[],[],[]
        a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
        b=range(len(self.w)-1)
        for i in b:
            w0,w1=self.w[i],self.w[i+1]
            if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
            for j in a:
                w=w1*j+(1-j)*w0
                weights.append(np.copy(w))
                mu.append(np.dot(w.T,self.mean)[0,0])
                sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
        return mu,sigma,weights
There was a runtime error.

backtest result if rebalance weekly, remoe tlt, and disable commission fee.

Clone Algorithm
15
Loading...
Total Returns
--
Alpha
--
Beta
--
Sharpe
--
Sortino
--
Max Drawdown
--
Benchmark Returns
--
Volatility
--
Returns 1 Month 3 Month 6 Month 12 Month
Alpha 1 Month 3 Month 6 Month 12 Month
Beta 1 Month 3 Month 6 Month 12 Month
Sharpe 1 Month 3 Month 6 Month 12 Month
Sortino 1 Month 3 Month 6 Month 12 Month
Volatility 1 Month 3 Month 6 Month 12 Month
Max Drawdown 1 Month 3 Month 6 Month 12 Month
# David Edwards  CLA  on the sector etf plus TLT
# https://www.quantopian.com/posts/critical-line-algorithm-for-portfolio-optimization

"""
Critical Line Algorithm for portfolio optimization

ref: http://papers.ssrn.com/sol3/papers.cfm?abstract_id=2197616
"""

import numpy as np
import pandas as pd

def initialize(context):
    #set_commission(commission.PerShare(cost=0.01, min_trade_cost=1.0))
    #set_universe(universe.DollarVolumeUniverse(floor_percentile=99.7, ceiling_percentile=100))
    context.securities =symbols('xlf', 'xle', 'xlu', 'xlk', 'xlb', 'xlp', 'xly','xli', 'xlv')
    # Constrained to rebalance once/week
    context.trade_offset = pd.tseries.offsets.Week()
    context.trade_date = pd.Timestamp('1900-01-01', tz='utc')
    # Min/Max weights that can be assigned to a single security
    context.lower_bound = -0.20
    context.upper_bound = 1.0
    schedule_function(assgin_weights,date_rule=date_rules.week_start(0),time_rule=time_rules.market_open(hours=1, minutes=1))
        
# Will be called on every trade event for the securities you specify. 
def handle_data(context, data):
    pass

def assgin_weights(context, data): 
    now = get_datetime()
    if now < context.trade_date:
        return
    context.trade_date = context.trade_offset.apply(now)

    prices = history(80, '1d', 'price').dropna(axis=1)
    
    N = len(prices.columns)
    R = np.log(prices).diff().dropna()
    mean = R.mean().reshape((N, 1))
    
    lower_bounds = np.array([context.lower_bound]*N).reshape((N, 1)) 
    upper_bounds = np.array([context.upper_bound]*N).reshape((N, 1)) 
    covar = R.cov().values
    
    # Try statement due to occasional singular matrices.
    try:
        cla = CLA(mean, covar, lower_bounds, upper_bounds)
        cla.solve()
    
        # Maximum sharpe ratio and the corresponding wieghts
        sharpe, w_sharpe = cla.getMaxSR()
        w_sharpe = pd.Series(w_sharpe.flatten(), index=R.columns)
        weights = {}
        for stock in data:
            if stock in w_sharpe.index:
                weights[stock.symbol] = w_sharpe[stock]
                order_target_percent(stock, w_sharpe[stock])
            else:
                order_target(stock, 0)
        log.info("\n%s"%pd.Series(weights))
        record(sharpe=sharpe)

    except Exception as e:
        # Reset the trade date to try again on the next bar
        context.trade_date = now
        log.debug(e)             

class CLA:
    def __init__(self,mean,covar,lB,uB):
        # Initialize the class
        self.mean=mean
        self.covar=covar
        self.lB=lB
        self.uB=uB
        self.w=[] # solution
        self.l=[] # lambdas
        self.g=[] # gammas
        self.f=[] # free weights
#---------------------------------------------------------------
    def solve(self):
        # Compute the turning points,free sets and weights
        f,w=self.initAlgo()
        self.w.append(np.copy(w)) # store solution
        self.l.append(None)
        self.g.append(None)
        self.f.append(f[:])
        while True:
            #1) case a): Bound one free weight
            l_in=None
            if len(f)>1:
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                j=0
                for i in f:
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])
                    if l>l_in:l_in,i_in,bi_in=l,i,bi
                    j+=1
            #2) case b): Free one bounded weight
            l_out=None
            if len(f)<self.mean.shape[0]:
                b=self.getB(f)
                for i in b:
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])
                    covarF_inv=np.linalg.inv(covarF)
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \
                        self.w[-1][i])
                    if (self.l[-1]==None or l<self.l[-1]) and l>l_out:l_out,i_out=l,i                
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):
                #3) compute minimum variance solution
                self.l.append(0)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
                meanF=np.zeros(meanF.shape)
            else:
                #4) decide lambda
                if l_in>l_out:
                    self.l.append(l_in)
                    f.remove(i_in)
                    w[i_in]=bi_in # set value at the correct boundary
                else:
                    self.l.append(l_out)
                    f.append(i_out)
                covarF,covarFB,meanF,wB=self.getMatrices(f)
                covarF_inv=np.linalg.inv(covarF)
            #5) compute solution vector
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)
            for i in range(len(f)):w[f[i]]=wF[i]
            self.w.append(np.copy(w)) # store solution
            self.g.append(g)
            self.f.append(f[:])
            if self.l[-1]==0:break
        #6) Purge turning points
        self.purgeNumErr(10e-10)
        self.purgeExcess()
#---------------------------------------------------------------    
    def initAlgo(self):
        # Initialize the algo
        #1) Form structured array
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
        a[:]=zip(range(self.mean.shape[0]),b) # fill structured array
        #2) Sort structured array
        b=np.sort(a,order='mu')
        #3) First free weight
        i,w=b.shape[0],np.copy(self.lB)
        while sum(w)<1:
            i-=1
            w[b[i][0]]=self.uB[b[i][0]]
        w[b[i][0]]+=1-sum(w)
        return [b[i][0]],w
#---------------------------------------------------------------    
    def computeBi(self,c,bi):
        if c>0:
            bi=bi[1][0]
        if c<0:
            bi=bi[0][0]
        return bi
#---------------------------------------------------------------
    def computeW(self,covarF_inv,covarFB,meanF,wB):
        #1) compute gamma
        onesF=np.ones(meanF.shape)
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        if wB==None:
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0
        else:
            onesB=np.ones(wB.shape)
            g3=np.dot(onesB.T,wB)
            g4=np.dot(covarF_inv,covarFB)
            w1=np.dot(g4,wB)
            g4=np.dot(onesF.T,w1)
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)
        #2) compute weights
        w2=np.dot(covarF_inv,onesF)
        w3=np.dot(covarF_inv,meanF)
        return -w1+g*w2+self.l[-1]*w3,g
#---------------------------------------------------------------
    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):
        #1) C
        onesF=np.ones(meanF.shape)
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)
        c2=np.dot(covarF_inv,meanF)
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)
        c4=np.dot(covarF_inv,onesF)
        c=-c1*c2[i]+c3*c4[i]
        if c==0:return None,None
        #2) bi
        if type(bi)==list:bi=self.computeBi(c,bi)
        #3) Lambda
        if wB==None:
            # All free assets
            return float((c4[i]-c1*bi)/c),bi
        else:
            onesB=np.ones(wB.shape)
            l1=np.dot(onesB.T,wB)
            l2=np.dot(covarF_inv,covarFB)
            l3=np.dot(l2,wB)
            l2=np.dot(onesF.T,l3)
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi
#---------------------------------------------------------------
    def getMatrices(self,f):
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
        covarF=self.reduceMatrix(self.covar,f,f)
        meanF=self.reduceMatrix(self.mean,f,[0])
        b=self.getB(f)
        covarFB=self.reduceMatrix(self.covar,f,b)
        wB=self.reduceMatrix(self.w[-1],b,[0])
        return covarF,covarFB,meanF,wB
#---------------------------------------------------------------
    def getB(self,f):
        return self.diffLists(range(self.mean.shape[0]),f)
#---------------------------------------------------------------
    def diffLists(self,list1,list2):
        return list(set(list1)-set(list2))
#---------------------------------------------------------------
    def reduceMatrix(self,matrix,listX,listY):
        # Reduce a matrix to the provided list of rows and columns
        if len(listX)==0 or len(listY)==0:return
        matrix_=matrix[:,listY[0]:listY[0]+1]
        for i in listY[1:]:
            a=matrix[:,i:i+1]
            matrix_=np.append(matrix_,a,1)
        matrix__=matrix_[listX[0]:listX[0]+1,:]
        for i in listX[1:]:
            a=matrix_[i:i+1,:]
            matrix__=np.append(matrix__,a,0)
        return matrix__
#---------------------------------------------------------------    
    def purgeNumErr(self,tol):
        # Purge violations of inequality constraints (associated with ill-conditioned covar matrix)
        i=0
        while True:
            flag=False
            if i==len(self.w):break
            if abs(sum(self.w[i])-1)>tol:
                flag=True
            else:
                for j in range(self.w[i].shape[0]):
                    if self.w[i][j]-self.lB[j]<-tol or self.w[i][j]-self.uB[j]>tol:
                        flag=True;break
            if flag==True:
                del self.w[i]
                del self.l[i]
                del self.g[i]
                del self.f[i]
            else:
                i+=1
        return
#---------------------------------------------------------------    
    def purgeExcess(self):
        # Remove violations of the convex hull
        i,repeat=0,False
        while True:
            if repeat==False:i+=1
            if i==len(self.w)-1:break
            w=self.w[i]
            mu=np.dot(w.T,self.mean)[0,0]
            j,repeat=i+1,False
            while True:
                if j==len(self.w):break
                w=self.w[j]
                mu_=np.dot(w.T,self.mean)[0,0]
                if mu<mu_:
                    del self.w[i]
                    del self.l[i]
                    del self.g[i]
                    del self.f[i]
                    repeat=True
                    break
                else:
                    j+=1
        return
#---------------------------------------------------------------
    def getMinVar(self):
        # Get the minimum variance solution
        var=[]
        for w in self.w:
            a=np.dot(np.dot(w.T,self.covar),w)
            var.append(a)
        return min(var)**.5,self.w[var.index(min(var))]
#---------------------------------------------------------------
    def getMaxSR(self):
        # Get the max Sharpe ratio portfolio
        #1) Compute the local max SR portfolio between any two neighbor turning points
        w_sr,sr=[],[]
        for i in range(len(self.w)-1):
            w0=np.copy(self.w[i])
            w1=np.copy(self.w[i+1])
            kargs={'minimum':False,'args':(w0,w1)}
            a,b=self.goldenSection(self.evalSR,0,1,**kargs)
            w_sr.append(a*w0+(1-a)*w1)
            sr.append(b)
        return max(sr),w_sr[sr.index(max(sr))]
#---------------------------------------------------------------
    def evalSR(self,a,w0,w1):
        # Evaluate SR of the portfolio within the convex combination
        w=a*w0+(1-a)*w1
        b=np.dot(w.T,self.mean)[0,0]
        c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5
        return b/c
#---------------------------------------------------------------
    def goldenSection(self,obj,a,b,**kargs):
        # Golden section method. Maximum if kargs['minimum']==False is passed 
        from math import log,ceil
        tol,sign,args=1.0e-9,1,None
        if 'minimum' in kargs and kargs['minimum']==False:sign=-1
        if 'args' in kargs:args=kargs['args']
        numIter=int(ceil(-2.078087*log(tol/abs(b-a))))
        r=0.618033989
        c=1.0-r
        # Initialize
        x1=r*a+c*b;x2=c*a+r*b
        f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)
        # Loop
        for i in range(numIter):
            if f1>f2:
                a=x1
                x1=x2;f1=f2
                x2=c*a+r*b;f2=sign*obj(x2,*args)
            else:
                b=x2
                x2=x1;f2=f1
                x1=r*a+c*b;f1=sign*obj(x1,*args)
        if f1<f2:return x1,sign*f1
        else:return x2,sign*f2
#---------------------------------------------------------------
    def efFrontier(self,points):
        # Get the efficient frontier
        mu,sigma,weights=[],[],[]
        a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications
        b=range(len(self.w)-1)
        for i in b:
            w0,w1=self.w[i],self.w[i+1]
            if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration
            for j in a:
                w=w1*j+(1-j)*w0
                weights.append(np.copy(w))
                mu.append(np.dot(w.T,self.mean)[0,0])
                sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)
        return mu,sigma,weights
There was a runtime error.

Good strategy, thanks for sharing!

Please refer to paper: https://papers.ssrn.com/sol3/papers.cfm?abstract_id=2606884

In the paper, a target volatility, for example 5% or 10%, which are treated as Offensive or Defensive solutions, were recommended. Question is, how can we target a volatility by CLA api? Or we have to search on Effective Front to find it by ourself?

Best,
David

Has anyone tried to fix this algorithm for Python 3.5. I got it to run fixing the that zip() no longer returns a list, and x==None should be "x is None". However the results are not the same. I have been going like by line but I think it takes a firm understanding of CLA.

Hi! I changed a few lines and now the algorithm is correctly working in Python 3.7.5. Here you have the updated class

class CLA:  
    def __init__(self,mean,covar,lB,uB):  
        # Initialize the class  
        self.mean=mean  
        self.covar=covar  
        self.lB=lB  
        self.uB=uB  
        self.w=[] # solution  
        self.l=[] # lambdas  
        self.g=[] # gammas  
        self.f=[] # free weights  
#---------------------------------------------------------------  
    def solve(self):  
        # Compute the turning points,free sets and weights  
        f,w=self.initAlgo()  
        self.w.append(np.copy(w)) # store solution  
        self.l.append(None)  
        self.g.append(None)  
        self.f.append(f[:])  
        while True:  
            #1) case a): Bound one free weight  
            l_in=None  
            if len(f)>1:  
                covarF,covarFB,meanF,wB=self.getMatrices(f)  
                covarF_inv=np.linalg.inv(covarF)  
                j=0  
                for i in f:  
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,j,[self.lB[i],self.uB[i]])  
                    if (l_in is None or l>l_in):l_in,i_in,bi_in=l,i,bi  
                    j+=1  
            #2) case b): Free one bounded weight  
            l_out=None  
            if len(f)<self.mean.shape[0]:  
                b=self.getB(f)  
                for i in b:  
                    covarF,covarFB,meanF,wB=self.getMatrices(f+[i])  
                    covarF_inv=np.linalg.inv(covarF)  
                    l,bi=self.computeLambda(covarF_inv,covarFB,meanF,wB,meanF.shape[0]-1, \  
                        self.w[-1][i]) ##########################  
                    if (self.l[-1]==None or l<self.l[-1]) and (l_out is None or l>l_out):l_out,i_out=l,i  
            if (l_in==None or l_in<0) and (l_out==None or l_out<0):  
                #3) compute minimum variance solution  
                self.l.append(0)  
                covarF,covarFB,meanF,wB=self.getMatrices(f)  
                covarF_inv=np.linalg.inv(covarF)  
                meanF=np.zeros(meanF.shape)  
            else:  
                #4) decide lambda  
                if l_out is None or (l_in is not None and l_in>l_out):  
                    self.l.append(l_in)  
                    f.remove(l_in)  
                    w[i_in]=bi_in # set value at the correct boundary  
                else:  
                    self.l.append(l_out)  
                    f.append(i_out)  
                covarF,covarFB,meanF,wB=self.getMatrices(f)  
                covarF_inv=np.linalg.inv(covarF)  
            #5) compute solution vector  
            wF,g=self.computeW(covarF_inv,covarFB,meanF,wB)  
            for i in range(len(f)):w[f[i]]=wF[i]  
            self.w.append(np.copy(w)) # store solution  
            self.g.append(g)  
            self.f.append(f[:])  
            if self.l[-1]==0:break  
        #6) Purge turning points  
        self.purgeNumErr(10e-10)  
        self.purgeExcess()  
#---------------------------------------------------------------  
    def initAlgo(self):  
        # Initialize the algo  
        #1) Form structured array  
        a=np.zeros((self.mean.shape[0]),dtype=[('id',int),('mu',float)])  
        b=[self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list  
        a[:]=list(zip(range(self.mean.shape[0]),b)) # fill structured array  
        #2) Sort structured array  
        b=np.sort(a,order='mu')  
        #3) First free weight  
        i,w=b.shape[0],np.copy(self.lB)  
        while sum(w)<1:  
            i-=1  
            w[b[i][0]]=self.uB[b[i][0]]  
        w[b[i][0]]+=1-sum(w)  
        return [b[i][0]],w  
#---------------------------------------------------------------  
    def computeBi(self,c,bi):  
        if c>0:  
            bi=bi[1][0]  
        if c<0:  
            bi=bi[0][0]  
        return bi  
#---------------------------------------------------------------  
    def computeW(self,covarF_inv,covarFB,meanF,wB):  
        #1) compute gamma  
        onesF=np.ones(meanF.shape)  
        g1=np.dot(np.dot(onesF.T,covarF_inv),meanF)  
        g2=np.dot(np.dot(onesF.T,covarF_inv),onesF)  
        if wB is None:  
            g,w1=float(-self.l[-1]*g1/g2+1/g2),0  
        else:  
            onesB=np.ones(wB.shape)  
            g3=np.dot(onesB.T,wB)  
            g4=np.dot(covarF_inv,covarFB)  
            w1=np.dot(g4,wB)  
            g4=np.dot(onesF.T,w1)  
            g=float(-self.l[-1]*g1/g2+(1-g3+g4)/g2)  
        #2) compute weights  
        w2=np.dot(covarF_inv,onesF)  
        w3=np.dot(covarF_inv,meanF)  
        return -w1+g*w2+self.l[-1]*w3,g  
#---------------------------------------------------------------  
    def computeLambda(self,covarF_inv,covarFB,meanF,wB,i,bi):  
        #1) C  
        onesF=np.ones(meanF.shape)  
        c1=np.dot(np.dot(onesF.T,covarF_inv),onesF)  
        c2=np.dot(covarF_inv,meanF)  
        c3=np.dot(np.dot(onesF.T,covarF_inv),meanF)  
        c4=np.dot(covarF_inv,onesF)  
        c=-c1*c2[i]+c3*c4[i]  
        if c==0:return  
        #2) bi  
        if type(bi)==list:bi=self.computeBi(c,bi)  
        #3) Lambda  
        if wB is None:  
            # All free assets  
            return float((c4[i]-c1*bi)/c),bi  
        else:  
            onesB=np.ones(wB.shape)  
            l1=np.dot(onesB.T,wB)  
            l2=np.dot(covarF_inv,covarFB)  
            l3=np.dot(l2,wB)  
            l2=np.dot(onesF.T,l3)  
            return float(((1-l1+l2)*c4[i]-c1*(bi+l3[i]))/c),bi  
#---------------------------------------------------------------  
    def getMatrices(self,f):  
        # Slice covarF,covarFB,covarB,meanF,meanB,wF,wB  
        covarF=self.reduceMatrix(self.covar,f,f)  
        meanF=self.reduceMatrix(self.mean,f,[0])  
        b=self.getB(f)  
        covarFB=self.reduceMatrix(self.covar,f,b)  
        wB=self.reduceMatrix(self.w[-1].reshape(-1,1),b,[0])  
        return covarF,covarFB,meanF,wB  
#---------------------------------------------------------------  
    def getB(self,f):  
        return self.diffLists(range(self.mean.shape[0]),f)  
#---------------------------------------------------------------  
    def diffLists(self,list1,list2):  
        return list(set(list1)-set(list2))  
#---------------------------------------------------------------  
    def reduceMatrix(self,matrix,listX,listY):  
        # Reduce a matrix to the provided list of rows and columns  
        if len(listX)==0 or len(listY)==0:return  
        matrix_=matrix[:,listY[0]:listY[0]+1]  
        for i in listY[1:]:  
            a=matrix[:,i:i+1]  
            matrix_=np.append(matrix_,a,1)  
        matrix__=matrix_[listX[0]:listX[0]+1,:]  
        for i in listX[1:]:  
            a=matrix_[i:i+1,:]  
            matrix__=np.append(matrix__,a,0)  
        return matrix__  
#---------------------------------------------------------------  
    def purgeNumErr(self,tol):  
        # Purge violations of inequality constraints (associated with ill-conditioned covar matrix)  
        i=0  
        while True:  
            if i==len(self.w):break  
            w=self.w[i]  
            for j in range(w.shape[0]):  
                if w[j]-self.lB[j]<-tol or w[j]-self.uB[j]>tol:  
                    del self.w[i]  
                    del self.l[i]  
                    del self.g[i]  
                    del self.f[i]  
                    break  
            i+=1  
#---------------------------------------------------------------  
    def purgeExcess(self):  
        # Remove violations of the convex hull  
        i,repeat=0,False  
        while True:  
            if repeat==False:i+=1  
            if i==len(self.w)-1:break  
            w=self.w[i]  
            mu=np.dot(w.T,self.mean)[0,0]  
            j,repeat=i+1,False  
            while True:  
                if j==len(self.w):break  
                w=self.w[j]  
                mu_=np.dot(w.T,self.mean)[0,0]  
                if mu<mu_:  
                    del self.w[i]  
                    del self.l[i]  
                    del self.g[i]  
                    del self.f[i]  
                    repeat=True  
                    break  
                else:  
                    j+=1  
#---------------------------------------------------------------  
    def getMinVar(self):  
        # Get the minimum variance solution  
        var=[]  
        for w in self.w:  
            a=np.dot(np.dot(w.T,self.covar),w)[0,0]  
            var.append(a)  
        return min(var)**.5,self.w[var.index(min(var))]  
#---------------------------------------------------------------  
    def getMaxSR(self):  
        # Get the max Sharpe ratio portfolio  
        #1) Compute the local max SR portfolio between any two neighbor turning points  
        w_sr,sr=[],[]  
        for i in range(len(self.w)-1):  
            w0=np.copy(self.w[i])  
            w1=np.copy(self.w[i+1])  
            kargs={'minimum':False,'args':(w0,w1)}  
            a,b=self.goldenSection(self.evalSR,0,1,**kargs)  
            w_sr.append(a*w0+(1-a)*w1)  
            sr.append(b)  
        return max(sr),w_sr[sr.index(max(sr))]  
#---------------------------------------------------------------  
    def evalSR(self,a,w0,w1):  
        # Evaluate SR of the portfolio within the convex combination  
        w=a*w0+(1-a)*w1  
        b=np.dot(w.T,self.mean)[0,0]  
        c=np.dot(np.dot(w.T,self.covar),w)[0,0]**.5  
        return b/c  
#---------------------------------------------------------------  
    def goldenSection(self,obj,a,b,**kargs):  
        # Golden section method. Maximum if kargs['minimum']==False is passed  
        from math import log,ceil  
        tol,sign,args=1.0e-9,1,None  
        if 'minimum' in kargs and kargs['minimum']==False:sign=-1  
        if 'args' in kargs:args=kargs['args']  
        numIter=int(ceil(-2.078087*log(tol/abs(b-a))))  
        r=0.618033989  
        c=1.0-r  
        # Initialize  
        x1=r*a+c*b;x2=c*a+r*b  
        f1=sign*obj(x1,*args);f2=sign*obj(x2,*args)  
        # Loop  
        for i in range(numIter):  
            if f1>f2:  
                a=x1  
                x1=x2;f1=f2  
                x2=c*a+r*b;f2=sign*obj(x2,*args)  
            else:  
                b=x2  
                x2=x1;f2=f1  
                x1=r*a+c*b;f1=sign*obj(x1,*args)  
        if f1<f2:return x1,sign*f1  
        else:return x2,sign*f2  
#---------------------------------------------------------------  
    def efFrontier(self,points):  
        # Get the efficient frontier  
        mu,sigma,weights=[],[],[]  
        a=np.linspace(0,1,points/len(self.w))[:-1] # remove the 1, to avoid duplications  
        b=range(len(self.w)-1)  
        for i in b:  
            w0,w1=self.w[i],self.w[i+1]  
            if i==b[-1]:a=np.linspace(0,1,points/len(self.w)) # include the 1 in the last iteration  
            for j in a:  
                w=w1*j+(1-j)*w0  
                weights.append(np.copy(w))  
                mu.append(np.dot(w.T,self.mean)[0,0])  
                sigma.append(np.dot(np.dot(w.T,self.covar),w)[0,0]**.5)  
        return mu,sigma,weights  

And here you have the code to reproduce the results of MLP

data = pd.read_csv('http://www.quantresearch.org/CLA_Data.csv.txt')  
mu = data.loc[0,:].values.reshape(-1,1)  
sigma = data.loc[3:,:].values  
lb = data.loc[1,:].values.reshape(-1,1)  
ub = data.loc[2,:].values.reshape(-1,1)  
cla = CLA_2(mu, sigma, lb, ub)  
cla.solve()  
print(cla.getMinVar())  
print(cla.getMaxSR())  
mu, sigma, weights = cla.efFrontier(100)  
fig, ax = plt.subplots(figsize=(12,8))  
ax.plot(sigma, mu)  
ax.set_xlabel('Risk', fontsize=16)  
ax.set_ylabel('Expected Excess Return', fontsize=16)  
plt.xticks(rotation='vertical')  
ax.set_title('CLA-derived Efficient Frontier', fontsize=18);  
plt.tight_layout()  

Hope it's helpful to you!

@Matias,

Why not simply attach the corrected algo here? :-)