In the spirit of sharing on Q forums, I am posting my code for Gaussian Copula class that you can use for pair trading or basket trading.

The gist of using copulas is that you identify the conditional cdf of a series based on other series and build a score around it. There are many references to it if you search for copula based pairs trading on google. Happy researching :)

```
def cov2corr( A ):
d = np.sqrt(A.diagonal())
A = ((A.T/d).T)/d
return A
from sklearn.svm import SVR
from scipy.stats import mvn
#from statsmodels.sandbox.distributions.extras import mvnormcdf
# avioding boundary trouble
loc_eps = 0.0000001
# ================================================================
# copula classes
# ================================================================
class GaussianCopulaConditional:
""" """
def __init__(self, corr_matrix=np.eye(2), conditionals=None):
"""
Gaussian covariance matrix and variables variables list;
target variables will be kept in 'targets' list
"""
self.cr = pd.DataFrame(corr_matrix)
cls = self.cr.columns.values
# by default conditionals contains everything except the last component
if conditionals is None:
conditionals = cls[:-1]
# the rest of components are targets
targets = [s for s in cls if s not in conditionals]
# keep the variables lists
self.targets = targets
self.conditionals = conditionals
# covariance submatrices
sII = np.array(self.cr.loc[conditionals, conditionals])
sIJ = np.array(self.cr.loc[conditionals, targets])
sJI = np.array(self.cr.loc[targets, conditionals])
sJJ = np.array(self.cr.loc[targets, targets])
# conditional mean and covariance
pr = np.linalg.solve(sII, sIJ)
self.cond_cov = sJJ - sJI.dot(pr)
self.mupr = np.linalg.solve(sII, sIJ).T
def make_input(self, u, u_cond):
""" """
if u is None:
u = 0.5 * np.ones((1, len(self.targets)))
if u_cond is None:
u_cond = 0.5 * np.ones((1, len(self.conditionals)))
u[u == 0] = loc_eps
u[u == 1] = 1 - loc_eps
return u, u_cond
def pdf(self, u=None, u_cond=None):
"""
u is 2-dim array n_obs x n_targets
u_cond is 2-dim array 1 x n_conditionals
"""
u, u_cond = self.make_input(u, u_cond)
x_cond = ss.norm.ppf(u_cond)
x = ss.norm.ppf(u)
mn = self.mupr.dot(x_cond.T).T[0]
if len(self.targets) <= 1:
res = ss.norm.pdf(x, mn[0], self.cond_cov[0, 0])
fx = ss.norm.pdf(x)
res = pd.Series((res / fx)[:, 0], index=u[:, 0])
else:
res = ss.multivariate_normal.pdf(x, mean=mn, cov=self.cond_cov)
fx = ss.norm.pdf(x).prod(axis=1)
res = pd.Series(res / fx, index=range(x.shape[0]))
res.name = 'Cond PDF of ' + ', '.join(self.targets)
return res
def cdf(self, u=None, u_cond=None):
"""
u is 2-dim array n_obs x n_targets
u_cond is 2-dim array 1 x n_conditionals
"""
u, u_cond = self.make_input(u, u_cond)
x_cond = ss.norm.ppf(u_cond)
x = ss.norm.ppf(u)
mn = self.mupr.dot(x_cond.T).T[0]
if len(self.targets) <= 1:
# univariate normal
res = ss.norm.cdf(x, mn[0], self.cond_cov[0, 0])[0,0]
else:
# mvnormcdf does not accept multiple input points
res = np.zeros(x.shape[0])
for i in range(x.shape[0]):
res[i] = mvnormcdf(x[i, :], mn, self.cond_cov)
return res
class GaussianCopula:
""" """
def __init__(self, corr_matr=pd.DataFrame(np.eye(2))):
"""
initialize copula (as two dimensional indipendent one by default)
"""
self.set_corr(corr_matr)
def set_corr(self, cr):
""" set correlation matrix parameter and compute its inverse """
self.cr = pd.DataFrame(cr)
self.dim = self.cr.shape[0]
cls = self.cr.columns
self.cri = pd.DataFrame(np.linalg.inv(self.cr), columns=cls, index=cls)
def fit(self, data):
"""
fit the copula from data
the data should be 'm times n' Numpy array or Pandas DataFrame,
m = number of samples (observations), n = number of components
"""
cr = pd.DataFrame(data).corr(method='spearman')
self.set_corr(pearson_from_spearman(cr))
def rvs(self, size=100000):
"""
simulate sample for Monte Carlo method
size = sample volume, 100,000 by default
"""
sam = ss.multivariate_normal.rvs(cov=self.cr, size=size)
return pd.DataFrame(ss.norm.cdf(sam), columns=self.cr.columns)
def pdf(self, u):
""" probability density function of the copula """
x = ss.norm.ppf(u)
mat = self.cri - np.eye(self.dim)
pok = -0.5 * x.dot(mat).dot(x)
mult = 1 / np.sqrt(np.linalg.det(self.cri))
return mult * np.exp(pok)
def probability(self, event_func, n_obs=100000):
"""
calculates probability of an event A, which is defined via its
indicator function "event_func"
"""
u = self.rvs(size=n_obs)
return event_func(u).mean()
def conditional_probability(self, event_func, cond_func, n_obs=100000):
"""
calculates conditional probability of an event A, given event B,
which are defined via there indicator functions "event_func" and
"cond_func", respectively
"""
u = self.rvs(size=n_obs)
prob_of_cond = cond_func(u).mean()
prob_of_event_and_cond = (event_func(u) & cond_func(u)).mean()
if prob_of_cond > 0:
res = prob_of_event_and_cond / prob_of_cond
else:
res = 0
return res
def conditional_mean_variance_event(self, y_func, cond_func, n_obs=100000):
"""
calculates conditional mean and variance of a random variable Y = f(U),
given condition A, with function f defined by "y_func", and
condition A defined by its indicator function "cond_func"
"""
u = self.rvs(size=n_obs)
cond_ser = cond_func(u).astype('float')
prob_of_cond = cond_ser.mean()
event_and_cond = (y_func(u) * cond_ser).mean()
event2_and_cond = (y_func(u) ** 2 * cond_ser).mean()
if prob_of_cond > 0:
me = event_and_cond / prob_of_cond
va = event2_and_cond / prob_of_cond - me ** 2
else:
me, va = 0, 0
return me, va
def conditional_mean_variance_vector(self, y_func, x_func, n_obs=5000):
"""
creates an SVR model for computing expected mean of Y given X,
where X, Y are functions of U;
returns the model 'mod';
to predict value of Y in a new point X, call mod.predict(X), as usual
"""
u = self.rvs(size=n_obs)
# expected value portion
y = y_func(u)
x = x_func(u)
mod_ev = SVR()
mod_ev.fit(x, y)
ym = mod_ev.predict(x)
# variance portion
yv = (y - ym) ** 2
mod_va = SVR()
mod_va.fit(x, yv)
return mod_ev, mod_va
# =================================================================
# conversion from Pearson correlations to Spearman correlations
# and vice versa
# =================================================================
def spearman_from_pearson(r):
""" convertion for normal multivariate """
s = 6 / np.pi * np.arcsin(0.5 * r)
return s
def pearson_from_spearman(s):
""" conversion for normal multivariate """
r = 2 * np.sin(s * np.pi / 6)
return r
# =====================================================================
# Gaussian multivariate normal cdf
# =====================================================================
informcode = {0: 'normal completion with ERROR < EPS',
1: '''completion with ERROR > EPS and MAXPTS function values used;
increase MAXPTS to decrease ERROR;''',
2: 'N > 500 or N < 1'}
def mvstdnormcdf(lower, upper, corrcoef, **kwds):
n = len(lower)
lower = np.array(lower)
upper = np.array(upper)
corrcoef = np.array(corrcoef)
correl = np.zeros(int(n*(n-1)/2.0)) #dtype necessary?
if (lower.ndim != 1) or (upper.ndim != 1):
raise ValueError('can handle only 1D bounds')
if len(upper) != n:
raise ValueError('bounds have different lengths')
if n==2 and corrcoef.size==1:
correl = corrcoef
elif corrcoef.ndim == 1 and len(corrcoef) == n*(n-1)/2.0:
correl = corrcoef
elif corrcoef.shape == (n,n):
correl = corrcoef[np.tril_indices(n, -1)]
else:
raise ValueError('corrcoef has incorrect dimension')
if not 'maxpts' in kwds:
if n >2:
kwds['maxpts'] = 10000*n
lowinf = np.isneginf(lower)
uppinf = np.isposinf(upper)
infin = 2.0*np.ones(n)
np.putmask(infin,lowinf,0)# infin.putmask(0,lowinf)
np.putmask(infin,uppinf,1) #infin.putmask(1,uppinf)
#this has to be last
np.putmask(infin,lowinf*uppinf,-1)
error, cdfvalue, inform = sp.stats.mvn.mvndst(lower,upper,infin,correl,**kwds)
if inform:
print('something wrong', informcode[inform], error)
return cdfvalue
def mvnormcdf(upper, mu, cov, lower=None, **kwds):
upper = np.array(upper)
if lower is None:
lower = -np.ones(upper.shape) * np.inf
else:
lower = np.array(lower)
cov = np.array(cov)
stdev = np.sqrt(np.diag(cov)) # standard deviation vector
#do I need to make sure stdev is float and not int?
#is this correct to normalize to corr?
lower = (lower - mu)/stdev
upper = (upper - mu)/stdev
divrow = np.atleast_2d(stdev)
corr = cov/divrow/divrow.T
#v/np.sqrt(np.atleast_2d(np.diag(covv)))/np.sqrt(np.atleast_2d(np.diag(covv))).T
return mvstdnormcdf(lower, upper, corr, **kwds)
```