Labeling pairs within the Pipeline: stocks belonging to more than one pair

I am trying to implement a strategy which involves finding stocks belonging to a certain group (for example stock pairs) given certain criteria (for example if 2 stocks are cointegrated they form a pair).
The computation is quite heavy because it has to compare many stocks (let's say around 2000).
I have no problem to implement it in a custom function. Indeed I tried to do this computation in a custom function but it times out (I think the functions cannot spend more than 50 s in their execution).
I then found out that Pipeline() can be used to execute heavy computations up to 10 minutes.

That is great, but I cannot find any way to implement the strategy within the Pipeline.
The Pipeline works in a way that each stock is labeled with a factor (for example a CustomFactor).
I was thinking that I might label each stock with an ID number, which refers to the pair it belongs to.
For example: if I label (StockA: 1, StockB: 30, StockC: 24, StockD: 30 ...) it means that (StockB, StockD) form a pair.
However it could happen that the same stock belongs to more than one pair. In this case I would need to add more labels for each stock, which I cannot do (a factor can only take a numerical value, not a list of values)...
For example: (StockA: 1, StockB: [30, 1], StockC: 24, StockD: 30 ...) means that StockB is in 2 different pairs: (StockB, StockD) and (StockA, StockB).

I really don't know how to get around this.

Thanks

Alex

4 responses

In less words:
How do I create a pipeline of stock pairs instead of single stocks?

Hello,

Please find what i'm using when I have to find pairs (its only a part of the code that permits to get clusters, you just have to refine a bit after this step to get cointegrated pairs by applying ad-fuller test on each pairs inside of clusters for example)

PCA_COMPONENTS = 30
EPS = 1.5
MIN_SAMPLES = 2.
CLSTR_SIZE_MIN = 5
CLSTR_SIZE_LIMIT = 9999
LOOKBACK_PERIOD = 250
COMPUTE_PERIOD = 50

def initialize(context):
"""
Called once at the start of the algorithm.
"""
set_benchmark(symbol('SPY'))
monthly_top_volume = (
AverageDollarVolume(window_length=200)
.downsample('week_start')
)
close_prices = USEquityPricing.close.latest
beta = 0.66*RollingLinearRegressionOfReturns(
target=sid(8554),
returns_length=5,
regression_length=260,
).beta + 0.33*1.0

pipe = Pipeline(
columns={
'close_prices':close_prices,
'sector': Sector(),
'beta': beta,
},
screen=(universe),
)
# Create our dynamic stock selector.
algo.attach_pipeline(pipe, 'pipe')
# Update clusters once a month
schedule_function(
compute_clusters,
date_rules.month_end(),
time_rules.market_close(minutes=30),
)
# Rebalance every day, 1 hour after market open.
schedule_function(
rebalance,
date_rules.every_day(),
time_rules.market_close(minutes=30),
)
# Record any custom data at the end of each day
schedule_function(record_data,
date_rules.every_day(),
time_rules.market_close())

def get_cluster(prices):
returns = prices.pct_change()
returns = returns.iloc[1:,:].dropna(axis=1)
pca = PCA(n_components=PCA_COMPONENTS)
pca.fit(returns)
X = pca.components_.T
#X = np.hstack(
#(pca.components_.T,
# MktCap[returns.columns].values[:, np.newaxis],)
#)
X = preprocessing.StandardScaler().fit_transform(X)
clf = DBSCAN(eps=EPS, min_samples=MIN_SAMPLES)

clf.fit(X)
labels = clf.labels_
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)

clustered = clf.labels_
clustered_series = pd.Series(index=returns.columns, data=clustered.flatten())
clustered_series = clustered_series[clustered_series != -1]
nb_clusters = clustered_series.value_counts()
ticker_count_reduced = nb_clusters[(nb_clusters>=CLSTR_SIZE_MIN) & (nb_clusters<=CLSTR_SIZE_LIMIT)]
return clustered_series[clustered_series.isin(ticker_count_reduced.index)==True]

def compute_clusters(context, data):
# Get the alpha factor data from the pipeline output
context.pipeline_data = algo.pipeline_output('pipe')
context.securities = context.pipeline_data.index.tolist()
prices = data.history(context.pipeline_data.index, 'price', bar_count=LOOKBACK_PERIOD, frequency='1d')
context.clusters = get_cluster(prices)

def rebalance(context, data):
try:
context.clusters
except:
compute_clusters(context, data)
# Get the alpha factor data from the pipeline output
context.pipeline_data = algo.pipeline_output('pipe')
context.securities = context.pipeline_data.index.tolist()
context.sector_betas = context.pipeline_data.sector
prices = data.history(context.pipeline_data.index, 'price', bar_count=COMPUTE_PERIOD, frequency='1d')
for s in context.clusters.index:
if s not in prices.columns:
context.clusters.drop(s, inplace=True)

# Continue the code...



what do you use 'sector' and 'beta' in the pipeline for?

To feed the constraints if you want to (with order_optimal_portfolio)