Back to Community
Labeling pairs within the Pipeline: stocks belonging to more than one pair

I am trying to implement a strategy which involves finding stocks belonging to a certain group (for example stock pairs) given certain criteria (for example if 2 stocks are cointegrated they form a pair).
The computation is quite heavy because it has to compare many stocks (let's say around 2000).
I have no problem to implement it in a custom function. Indeed I tried to do this computation in a custom function but it times out (I think the functions cannot spend more than 50 s in their execution).
I then found out that Pipeline() can be used to execute heavy computations up to 10 minutes.

That is great, but I cannot find any way to implement the strategy within the Pipeline.
The Pipeline works in a way that each stock is labeled with a factor (for example a CustomFactor).
I was thinking that I might label each stock with an ID number, which refers to the pair it belongs to.
For example: if I label (StockA: 1, StockB: 30, StockC: 24, StockD: 30 ...) it means that (StockB, StockD) form a pair.
However it could happen that the same stock belongs to more than one pair. In this case I would need to add more labels for each stock, which I cannot do (a factor can only take a numerical value, not a list of values)...
For example: (StockA: 1, StockB: [30, 1], StockC: 24, StockD: 30 ...) means that StockB is in 2 different pairs: (StockB, StockD) and (StockA, StockB).

I really don't know how to get around this.

Thanks

Alex

4 responses

In less words:
How do I create a pipeline of stock pairs instead of single stocks?

Hello,

Please find what i'm using when I have to find pairs (its only a part of the code that permits to get clusters, you just have to refine a bit after this step to get cointegrated pairs by applying ad-fuller test on each pairs inside of clusters for example)

PCA_COMPONENTS = 30  
EPS = 1.5  
MIN_SAMPLES = 2.  
CLSTR_SIZE_MIN = 5  
CLSTR_SIZE_LIMIT = 9999  
LOOKBACK_PERIOD = 250  
COMPUTE_PERIOD = 50


def initialize(context):  
    """  
    Called once at the start of the algorithm.  
    """  
    set_benchmark(symbol('SPY'))  
    #set_commission(commission.PerShare(cost=0.000, min_trade_cost=0))  
    #set_slippage(slippage.FixedSlippage(spread=0))  
    monthly_top_volume = (  
        AverageDollarVolume(window_length=200)  
        .top(1000, mask=QTradableStocksUS())  
        .downsample('week_start')  
        )  
    universe = QTradableStocksUS() #& monthly_top_volume  
    close_prices = USEquityPricing.close.latest  
    beta = 0.66*RollingLinearRegressionOfReturns(  
                    target=sid(8554),  
                    returns_length=5,  
                    regression_length=260,  
                    mask=universe  
                    ).beta + 0.33*1.0

    pipe = Pipeline(  
        columns={  
            'close_prices':close_prices,  
            'sector': Sector(),  
            'beta': beta,  
        },  
        screen=(universe),  
    )  
    # Create our dynamic stock selector.  
    algo.attach_pipeline(pipe, 'pipe')  
    # Update clusters once a month  
    schedule_function(  
        compute_clusters,  
        date_rules.month_end(),  
        time_rules.market_close(minutes=30),  
    )  
    # Rebalance every day, 1 hour after market open.  
    schedule_function(  
        rebalance,  
        date_rules.every_day(),  
        time_rules.market_close(minutes=30),  
    )  
    # Record any custom data at the end of each day  
    schedule_function(record_data,  
                      date_rules.every_day(),  
                      time_rules.market_close())  


def get_cluster(prices):  
    returns = prices.pct_change()  
    returns = returns.iloc[1:,:].dropna(axis=1)  
    pca = PCA(n_components=PCA_COMPONENTS)  
    pca.fit(returns)  
    X = pca.components_.T  
    #X = np.hstack(  
    #(pca.components_.T,  
    # MktCap[returns.columns].values[:, np.newaxis],)  
#)  
    X = preprocessing.StandardScaler().fit_transform(X)  
    clf = DBSCAN(eps=EPS, min_samples=MIN_SAMPLES)

    clf.fit(X)  
    labels = clf.labels_  
    n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)

    clustered = clf.labels_  
    clustered_series = pd.Series(index=returns.columns, data=clustered.flatten())  
    clustered_series = clustered_series[clustered_series != -1]  
    nb_clusters = clustered_series.value_counts()  
    ticker_count_reduced = nb_clusters[(nb_clusters>=CLSTR_SIZE_MIN) & (nb_clusters<=CLSTR_SIZE_LIMIT)]  
    return clustered_series[clustered_series.isin(ticker_count_reduced.index)==True]  


def compute_clusters(context, data):  
    # Get the alpha factor data from the pipeline output  
    context.pipeline_data = algo.pipeline_output('pipe')  
    context.securities = context.pipeline_data.index.tolist()  
    prices = data.history(context.pipeline_data.index, 'price', bar_count=LOOKBACK_PERIOD, frequency='1d')  
    context.clusters = get_cluster(prices)  

def rebalance(context, data):  
    try:  
        context.clusters  
    except:  
        compute_clusters(context, data)  
    # Get the alpha factor data from the pipeline output  
    context.pipeline_data = algo.pipeline_output('pipe')  
    context.securities = context.pipeline_data.index.tolist()  
    context.sector_betas = context.pipeline_data.sector  
    prices = data.history(context.pipeline_data.index, 'price', bar_count=COMPUTE_PERIOD, frequency='1d')  
    for s in context.clusters.index:  
            if s not in prices.columns:  
                context.clusters.drop(s, inplace=True)

     # Continue the code...

what do you use 'sector' and 'beta' in the pipeline for?

To feed the constraints if you want to (with order_optimal_portfolio)