Gaussian Conditional Copula

In the spirit of sharing on Q forums, I am posting my code for Gaussian Copula class that you can use for pair trading or basket trading.

The gist of using copulas is that you identify the conditional cdf of a series based on other series and build a score around it. There are many references to it if you search for copula based pairs trading on google. Happy researching :)

def cov2corr( A ):
d = np.sqrt(A.diagonal())
A = ((A.T/d).T)/d
return A

from sklearn.svm import SVR
from scipy.stats import mvn
#from statsmodels.sandbox.distributions.extras import mvnormcdf

# avioding boundary trouble
loc_eps = 0.0000001

# ================================================================
# copula classes
# ================================================================

class GaussianCopulaConditional:
""" """

def __init__(self, corr_matrix=np.eye(2), conditionals=None):
"""
Gaussian covariance matrix and variables variables list;
target variables will be kept in 'targets' list
"""
self.cr = pd.DataFrame(corr_matrix)
cls = self.cr.columns.values

# by default conditionals contains everything except the last component
if conditionals is None:
conditionals = cls[:-1]

# the rest of components are targets
targets = [s for s in cls if s not in conditionals]

# keep the variables lists
self.targets = targets
self.conditionals = conditionals

# covariance submatrices
sII = np.array(self.cr.loc[conditionals, conditionals])
sIJ = np.array(self.cr.loc[conditionals, targets])
sJI = np.array(self.cr.loc[targets, conditionals])
sJJ = np.array(self.cr.loc[targets, targets])

# conditional mean and covariance
pr = np.linalg.solve(sII, sIJ)
self.cond_cov = sJJ - sJI.dot(pr)
self.mupr = np.linalg.solve(sII, sIJ).T

def make_input(self, u, u_cond):
""" """
if u is None:
u = 0.5 * np.ones((1, len(self.targets)))
if u_cond is None:
u_cond = 0.5 * np.ones((1, len(self.conditionals)))
u[u == 0] = loc_eps
u[u == 1] = 1 - loc_eps
return u, u_cond

def pdf(self, u=None, u_cond=None):
"""
u is 2-dim array n_obs x n_targets
u_cond is 2-dim array 1 x n_conditionals
"""
u, u_cond = self.make_input(u, u_cond)
x_cond = ss.norm.ppf(u_cond)
x = ss.norm.ppf(u)
mn = self.mupr.dot(x_cond.T).T
if len(self.targets) <= 1:
res = ss.norm.pdf(x, mn, self.cond_cov[0, 0])
fx = ss.norm.pdf(x)
res = pd.Series((res / fx)[:, 0], index=u[:, 0])
else:
res = ss.multivariate_normal.pdf(x, mean=mn, cov=self.cond_cov)
fx = ss.norm.pdf(x).prod(axis=1)
res = pd.Series(res / fx, index=range(x.shape))
res.name = 'Cond PDF of ' + ', '.join(self.targets)
return res

def cdf(self, u=None, u_cond=None):
"""
u is 2-dim array n_obs x n_targets
u_cond is 2-dim array 1 x n_conditionals
"""
u, u_cond = self.make_input(u, u_cond)
x_cond = ss.norm.ppf(u_cond)
x = ss.norm.ppf(u)
mn = self.mupr.dot(x_cond.T).T
if len(self.targets) <= 1:
# univariate normal
res = ss.norm.cdf(x, mn, self.cond_cov[0, 0])[0,0]
else:
# mvnormcdf does not accept multiple input points
res = np.zeros(x.shape)
for i in range(x.shape):
res[i] = mvnormcdf(x[i, :], mn, self.cond_cov)
return res

class GaussianCopula:
""" """

def __init__(self, corr_matr=pd.DataFrame(np.eye(2))):
"""
initialize copula (as two dimensional indipendent one by default)
"""
self.set_corr(corr_matr)

def set_corr(self, cr):
""" set correlation matrix parameter and compute its inverse """
self.cr = pd.DataFrame(cr)
self.dim = self.cr.shape
cls = self.cr.columns
self.cri = pd.DataFrame(np.linalg.inv(self.cr), columns=cls, index=cls)

def fit(self, data):
"""
fit the copula from data
the data should be 'm times n' Numpy array or Pandas DataFrame,
m = number of samples (observations), n = number of components
"""
cr = pd.DataFrame(data).corr(method='spearman')
self.set_corr(pearson_from_spearman(cr))

def rvs(self, size=100000):
"""
simulate sample for Monte Carlo method
size = sample volume, 100,000 by default
"""
sam = ss.multivariate_normal.rvs(cov=self.cr, size=size)
return pd.DataFrame(ss.norm.cdf(sam), columns=self.cr.columns)

def pdf(self, u):
""" probability density function of the copula """
x = ss.norm.ppf(u)
mat = self.cri - np.eye(self.dim)
pok = -0.5 * x.dot(mat).dot(x)
mult = 1 / np.sqrt(np.linalg.det(self.cri))
return mult * np.exp(pok)

def probability(self, event_func, n_obs=100000):
"""
calculates probability of an event A, which is defined via its
indicator function "event_func"
"""
u = self.rvs(size=n_obs)
return event_func(u).mean()

def conditional_probability(self, event_func, cond_func, n_obs=100000):
"""
calculates conditional probability of an event A, given event B,
which are defined via there indicator functions "event_func" and
"cond_func", respectively
"""
u = self.rvs(size=n_obs)
prob_of_cond = cond_func(u).mean()
prob_of_event_and_cond = (event_func(u) & cond_func(u)).mean()
if prob_of_cond > 0:
res = prob_of_event_and_cond / prob_of_cond
else:
res = 0
return res

def conditional_mean_variance_event(self, y_func, cond_func, n_obs=100000):
"""
calculates conditional mean and variance of a random variable Y = f(U),
given condition A, with function f defined by "y_func", and
condition A defined by its indicator function "cond_func"
"""
u = self.rvs(size=n_obs)
cond_ser = cond_func(u).astype('float')
prob_of_cond = cond_ser.mean()
event_and_cond = (y_func(u) * cond_ser).mean()
event2_and_cond = (y_func(u) ** 2 * cond_ser).mean()
if prob_of_cond > 0:
me = event_and_cond / prob_of_cond
va = event2_and_cond / prob_of_cond - me ** 2
else:
me, va = 0, 0
return me, va

def conditional_mean_variance_vector(self, y_func, x_func, n_obs=5000):
"""
creates an SVR model for computing expected mean of Y given X,
where X, Y are functions of U;
returns the model 'mod';
to predict value of Y in a new point X, call mod.predict(X), as usual
"""
u = self.rvs(size=n_obs)

# expected value portion
y = y_func(u)
x = x_func(u)
mod_ev = SVR()
mod_ev.fit(x, y)
ym = mod_ev.predict(x)

# variance portion
yv = (y - ym) ** 2
mod_va = SVR()
mod_va.fit(x, yv)

return mod_ev, mod_va

# =================================================================
# conversion from Pearson correlations to Spearman correlations
# and vice versa
# =================================================================

def spearman_from_pearson(r):
""" convertion for normal multivariate """
s = 6 / np.pi * np.arcsin(0.5 * r)
return s

def pearson_from_spearman(s):
""" conversion for normal multivariate """
r = 2 * np.sin(s * np.pi / 6)
return r

# =====================================================================
# Gaussian multivariate normal cdf
# =====================================================================

informcode = {0: 'normal completion with ERROR < EPS',
1: '''completion with ERROR > EPS and MAXPTS function values used;
increase MAXPTS to decrease ERROR;''',
2: 'N > 500 or N < 1'}

def mvstdnormcdf(lower, upper, corrcoef, **kwds):
n = len(lower)
lower = np.array(lower)
upper = np.array(upper)
corrcoef = np.array(corrcoef)

correl = np.zeros(int(n*(n-1)/2.0))  #dtype necessary?

if (lower.ndim != 1) or (upper.ndim != 1):
raise ValueError('can handle only 1D bounds')
if len(upper) != n:
raise ValueError('bounds have different lengths')
if n==2 and corrcoef.size==1:
correl = corrcoef
elif corrcoef.ndim == 1 and len(corrcoef) == n*(n-1)/2.0:
correl = corrcoef
elif corrcoef.shape == (n,n):
correl = corrcoef[np.tril_indices(n, -1)]
else:
raise ValueError('corrcoef has incorrect dimension')

if not 'maxpts' in kwds:
if n >2:
kwds['maxpts'] = 10000*n

lowinf = np.isneginf(lower)
uppinf = np.isposinf(upper)
infin = 2.0*np.ones(n)

#this has to be last

error, cdfvalue, inform = sp.stats.mvn.mvndst(lower,upper,infin,correl,**kwds)
if inform:
print('something wrong', informcode[inform], error)
return cdfvalue

def mvnormcdf(upper, mu, cov, lower=None,  **kwds):
upper = np.array(upper)
if lower is None:
lower = -np.ones(upper.shape) * np.inf
else:
lower = np.array(lower)
cov = np.array(cov)
stdev = np.sqrt(np.diag(cov)) # standard deviation vector
#do I need to make sure stdev is float and not int?
#is this correct to normalize to corr?
lower = (lower - mu)/stdev
upper = (upper - mu)/stdev
divrow = np.atleast_2d(stdev)
corr = cov/divrow/divrow.T
#v/np.sqrt(np.atleast_2d(np.diag(covv)))/np.sqrt(np.atleast_2d(np.diag(covv))).T

return mvstdnormcdf(lower, upper, corr, **kwds)

13 responses

I will put up a notebook later on how to use it.

Attached is a notebook.

66

Excellent +3 Aqua thanks for sharing!

Very nice work I have to say!
Have you ever tried to use a different family of copulas (like Archimedean) ?

@Magnus, thanks. I haven't tried using Archimedean yet. Will have to figure out how to implement multiple dependence (using Vine Copulas?)

@Aqua, would you recommend any book to come upto speed with the level of Math that seems to be involved in your shared code above?

@Leo, there is not much on Q so use https://www.quantconnect.com/tutorials/pairs-trading-strategy/. Good tutorial in my view.

@Leo, I started learning math several years ago. I still use professional mathematicians to implement my models. So if you have ideas and want a mathematician to develop your model drop me a note and I will put you in touch.

@Aqua, thanks. I want to learn things and come unto speed on my own at the moment, because it is more fun that way. I will check with you if I need professional mathematicians help. My experience is exclusively in software programming, I wish I had done some math college degrees to be able to understand some of the more complicated things that are described in the papers you have shared in the forums.

@Aqua, great information here as always. I must thank you for your public research and algorithms, I have learnt a lot from your posts so credit to you. Do you have a backtest you can post using the Copula Method? I can't seem to get this to work :(

@Leo, first start by learning the math behind mean-reversion of pairs. @MaxMargenot from Q gives a good explanation,
(https://www.youtube.com/watch?v=g-qvFjvyqcs) and Ernest Chan's book on "Algorithmic Trading-winning strategies and their rational" (look at page 87). Then look at @Aqua's notebook.
I hope that helps.

Cheers

@TWI_proptrader, sure. Either post or privately share what you are trying to do and I can put up something to show how to use these classes.

@TWI_proptrader. I am talking about advanced level math in the papers and in Aqua's implementations. The math behind mean reversion of pairs etc that you referred to are just too basic.

Has anybody tried Marshall-Olkin copula, for pairs trading or other trading projects? I am interested in how it performs relative to well-established copula families. 15 years ago, during the credit derivatives boom, there was much talk about the Marshall-Olkin copula making more sense than many other alternatives. But that discussion kind of died out with the recent events.